Orcha Ingestion: Processing Pipeline

Worker architecture and job processing details.


Concurrency Architecture

The worker system is designed for high concurrency on I/O-bound work while properly isolating CPU-intensive tasks.

Components

SQS Poller

A single virtual thread dedicated to polling SQS.

Main Job Executor

Uses Java 21's Executors.newVirtualThreadPerTaskExecutor().

Heartbeat Scheduler

A shared ScheduledThreadPoolExecutor with 2-4 platform threads.

CPU Pool

A fixed-size ThreadPoolExecutor sized to Runtime.getRuntime().availableProcessors().

Why This Design

Workload ~% of Jobs Handled By
I/O-bound (API calls, S3, DB) 90% Virtual threads in main executor
CPU-bound (OpenCV) 10% Dedicated CPU pool

Virtual threads excel at I/O-bound concurrency but don't magically speed up CPU-bound work—they just prevent it from blocking other work. By isolating CPU tasks to a bounded pool sized to actual cores, we get predictable CPU utilization without interference.


Job Processing Pipeline

Each job executes the following pipeline within its virtual thread:

Step 1: Validate Message

Parse the SQS message body as a UUID. The message body contains the ingestion ID.

If invalid UUID: Log the error and delete the message immediately. This handles malformed messages that cannot be processed.

If valid UUID: Proceed to Step 2.

Step 2: Claim Ingestion

Perform an atomic conditional update to claim the ingestion:

UPDATE ingestion
SET started_at = now()
WHERE id = {ingestion-id}
  AND status = 'in-progress'
  AND (started_at IS NULL OR started_at < now() - interval '5 minutes')
RETURNING *

If the update affects 0 rows, either:

In either case, skip processing and delete the message.

Why the stale check? The started_at timestamp serves as a lock:

This single check provides both duplicate prevention and crash recovery. The threshold is configurable via [:com.getorcha.workers/orchestrator :stale-ingestion-threshold-minutes].

Step 3: Load Document

After successfully claiming, load the associated document from the database:

SELECT * FROM document WHERE id = {ingestion.document_id}

This returns the ingestion map structure used throughout the pipeline:

{:id         ingestion-id
 :document   {:document/id ...
              :document/file-path ...
              ...}}

Step 4: Schedule Heartbeat

Schedule the heartbeat task on the shared scheduler. This ensures visibility timeout is extended during processing.

The heartbeat is only started for claimed ingestions because:

Step 5: Fetch Document from S3

Retrieve the original document from S3 using the document's file_path:

s3://{bucket}/documents/{document-id}.{ext}

The file contents and mime-type are added to the ingestion map:

{:id       ingestion-id
 :document {...}
 :file     {:contents  <byte-array>
            :mime-type "application/pdf"}}

Step 6: Transcription

The transcription phase extracts text from the document, automatically preprocessing low-quality images when needed.

The transcription provider:

  1. Runs OCR using Google Document AI Enterprise with:

  2. Evaluates quality against a configurable threshold (e.g., 0.7)

  3. If quality is below threshold and document hasn't been preprocessed:

  4. Returns a result containing:

Note: The extraction quality gate (Step 9) is the ultimate decider of document processing success—even low-quality transcription may produce acceptable extraction results, so processing proceeds regardless of transcription quality score.

Step 7: Store Transcription Output

Upload transcription result to S3 as EDN at an ingestion-specific path:

s3://{bucket}/ingestions/{ingestion-id}/transcription-output.edn

If preprocessing was performed, also upload:

s3://{bucket}/ingestions/{ingestion-id}/preprocessed.{ext}

The EDN format preserves the full transcription response structure including extracted text, quality score, page count, per-page quality breakdowns, and any other metadata returned by the transcription provider.

Update ingestion record with transcription metadata:

UPDATE ingestion
SET transcription_file_path = 'ingestions/{id}/transcription-output.edn',
    transcription_quality_score = {quality-score},
    preprocessed_file_path = 'ingestions/{id}/preprocessed.pdf'  -- if applicable
WHERE id = {ingestion-id}

Step 8: Extraction

Send the transcribed text to the LLM with a prompt requesting structured data extraction. The LLM returns JSON containing:

Step 9: Extraction Quality Gate

Evaluate the LLM response:

Success criteria:

If success: Proceed to Step 10.

If failure: Branch to failure handling (see Failure Handling section below).

Step 10: Store Result

Update the ingestion record with the extraction result:

UPDATE ingestion
SET status = 'completed',
    structured_data = {json-result},
    valid_structured_data = true,
    extraction_input_tokens = {tokens},
    extraction_output_tokens = {tokens},
    extraction_model = {model},
    completed_at = now()
WHERE id = {ingestion-id}

A database trigger automatically propagates the result to the document:

-- Trigger: update_document_from_ingestion()
UPDATE document
SET structured_data = NEW.structured_data,
    needs_human_review = NOT COALESCE(NEW.valid_structured_data, true),
    updated_at = now()
WHERE id = NEW.document_id

Step 11: Cleanup (Success Path)

The job is complete.


Failure Handling

All failures—whether soft (extraction quality gate) or hard (S3/OCR/LLM exceptions)—are handled uniformly.

Unified Error Handling

When any exception occurs after claiming, the system:

  1. Increments attempt_count on the ingestion (always, regardless of failure type)
  2. Checks if max attempts reached (configured as 3)
  3. Updates ingestion status if max attempts reached
UPDATE ingestion
SET attempt_count = attempt_count + 1,
    status = CASE
      WHEN attempt_count + 1 >= 3 THEN 'failed'
      ELSE status
    END,
    completed_at = CASE
      WHEN attempt_count + 1 >= 3 THEN now()
      ELSE completed_at
    END
WHERE id = {ingestion-id}
RETURNING attempt_count, status

Failure Outcomes

Based on the returned status after the update:

Status Action Reason
failed Delete SQS message Max attempts reached, route to human review
in-progress Let visibility expire Will retry from beginning

When an ingestion fails, the database trigger updates the document:

-- needs_human_review is set to true when valid_structured_data is false/null
UPDATE document SET needs_human_review = true WHERE id = {document-id}

Transcription Caching Within Same Ingestion

When a worker crashes after transcription but before extraction (or any later stage), the SQS message will be redelivered and another attempt begins. To avoid redundant transcription calls:

  1. Before running transcription, check if the transcription output file exists in S3 at ingestions/{id}/transcription-output.edn
  2. If it exists, download and reuse it
  3. If not, run transcription and store the result

This is a simple file existence check, not a status-based checkpoint system. Each ingestion stores its own transcription results, providing full auditability.

Design Rationale

This unified approach was chosen over separate soft/hard failure handling because:

Failure Scenarios and Retry Strategies

Failure Type Likely Cause Retry Strategy
Transcription quality low Blurry/dark image Preprocess and retry
Extraction timeout Transient API issue Retry as-is
Extraction rate limited Quota exceeded Retry with backoff (SQS delay)
Extraction incomplete Ambiguous document Retry once, then human review
Critical fields missing Non-standard format Human review
S3/DB errors Infrastructure issue Retry as-is
Transcription API error Transient API issue Retry as-is

Dead Letter Queue

Configure SQS with a dead-letter queue (DLQ) as a safety net. Messages that fail repeatedly beyond SQS's own retry policy land in the DLQ for investigation. This catches edge cases not handled by application-level retry logic.

Stale Ingestion Handling

If a worker crashes after claiming but before the catch block executes:

This is handled directly in the claim query's stale check—no separate cleanup job required.


Graceful Shutdown

The system supports graceful shutdown to prevent work loss during deployments.

Shutdown Sequence

  1. JVM shutdown hook triggers (SIGTERM, SIGINT, or programmatic shutdown)

  2. Stop the poller: Set the atomic shutdown flag to false. The poller completes its current long-poll cycle (up to 20 seconds) and exits without fetching more messages.

  3. Shutdown main executor: Call executor.shutdown(). No new tasks are accepted, but in-flight jobs continue.

  4. Await termination: Wait for all in-flight jobs to complete with a generous timeout (e.g., 10-15 minutes). Jobs continue running, including their heartbeats maintaining SQS visibility.

  5. Force shutdown if needed: If timeout expires and jobs are still running, call executor.shutdownNow(). Any interrupted jobs will have their messages become visible again in SQS after visibility timeout expires, enabling reprocessing by another worker.

  6. Cleanup schedulers and pools: Shut down the heartbeat scheduler and CPU pool.

Deployment Considerations


Future Considerations

See Potential Improvements for detailed proposals on: