Worker architecture and job processing details.
The worker system is designed for high concurrency on I/O-bound work while properly isolating CPU-intensive tasks.
A single virtual thread dedicated to polling SQS.
WaitTimeSeconds=20 and MaxNumberOfMessages=10MaxNumberOfMessages limit provides natural backpressure on intake rateUses Java 21's Executors.newVirtualThreadPerTaskExecutor().
A shared ScheduledThreadPoolExecutor with 2-4 platform threads.
ChangeMessageVisibilityA fixed-size ThreadPoolExecutor sized to Runtime.getRuntime().availableProcessors().
| Workload | ~% of Jobs | Handled By |
|---|---|---|
| I/O-bound (API calls, S3, DB) | 90% | Virtual threads in main executor |
| CPU-bound (OpenCV) | 10% | Dedicated CPU pool |
Virtual threads excel at I/O-bound concurrency but don't magically speed up CPU-bound work—they just prevent it from blocking other work. By isolating CPU tasks to a bounded pool sized to actual cores, we get predictable CPU utilization without interference.
Each job executes the following pipeline within its virtual thread:
Parse the SQS message body as a UUID. The message body contains the ingestion ID.
If invalid UUID: Log the error and delete the message immediately. This handles malformed messages that cannot be processed.
If valid UUID: Proceed to Step 2.
Perform an atomic conditional update to claim the ingestion:
UPDATE ingestion
SET started_at = now()
WHERE id = {ingestion-id}
AND status = 'in-progress'
AND (started_at IS NULL OR started_at < now() - interval '5 minutes')
RETURNING *
If the update affects 0 rows, either:
In either case, skip processing and delete the message.
Why the stale check? The started_at timestamp serves as a lock:
started_at IS NULL: ingestion was never claimed, safe to claimstarted_at is recent: another worker is actively processing, skipstarted_at is old (exceeds configurable threshold): previous worker crashed, safe to reclaimThis single check provides both duplicate prevention and crash recovery. The threshold is configurable via [:com.getorcha.workers/orchestrator :stale-ingestion-threshold-minutes].
After successfully claiming, load the associated document from the database:
SELECT * FROM document WHERE id = {ingestion.document_id}
This returns the ingestion map structure used throughout the pipeline:
{:id ingestion-id
:document {:document/id ...
:document/file-path ...
...}}
Schedule the heartbeat task on the shared scheduler. This ensures visibility timeout is extended during processing.
The heartbeat is only started for claimed ingestions because:
Retrieve the original document from S3 using the document's file_path:
s3://{bucket}/documents/{document-id}.{ext}
The file contents and mime-type are added to the ingestion map:
{:id ingestion-id
:document {...}
:file {:contents <byte-array>
:mime-type "application/pdf"}}
The transcription phase extracts text from the document, automatically preprocessing low-quality images when needed.
The transcription provider:
Runs OCR using Google Document AI Enterprise with:
processOptions.ocrConfig.enableImageQualityScores: true — Returns page-level quality assessment (0-1 score) and defect breakdown (blurry, dark, glare, small fonts, etc.)processOptions.ocrConfig.enableNativePdfParsing: true — Extracts embedded text from digital PDFs when availableEvaluates quality against a configurable threshold (e.g., 0.7)
If quality is below threshold and document hasn't been preprocessed:
Returns a result containing:
Note: The extraction quality gate (Step 9) is the ultimate decider of document processing success—even low-quality transcription may produce acceptable extraction results, so processing proceeds regardless of transcription quality score.
Upload transcription result to S3 as EDN at an ingestion-specific path:
s3://{bucket}/ingestions/{ingestion-id}/transcription-output.edn
If preprocessing was performed, also upload:
s3://{bucket}/ingestions/{ingestion-id}/preprocessed.{ext}
The EDN format preserves the full transcription response structure including extracted text, quality score, page count, per-page quality breakdowns, and any other metadata returned by the transcription provider.
Update ingestion record with transcription metadata:
UPDATE ingestion
SET transcription_file_path = 'ingestions/{id}/transcription-output.edn',
transcription_quality_score = {quality-score},
preprocessed_file_path = 'ingestions/{id}/preprocessed.pdf' -- if applicable
WHERE id = {ingestion-id}
Send the transcribed text to the LLM with a prompt requesting structured data extraction. The LLM returns JSON containing:
extraction_successful: booleanconfidence: "high" | "medium" | "low"missing_fields: array of field names that could not be extractedinvoice_number, invoice_date, supplier_name, supplier_address, line_items, subtotal, vat_amount, total, etc.Evaluate the LLM response:
Success criteria:
extraction_successful: trueconfidence not "low"If success: Proceed to Step 10.
If failure: Branch to failure handling (see Failure Handling section below).
Update the ingestion record with the extraction result:
UPDATE ingestion
SET status = 'completed',
structured_data = {json-result},
valid_structured_data = true,
extraction_input_tokens = {tokens},
extraction_output_tokens = {tokens},
extraction_model = {model},
completed_at = now()
WHERE id = {ingestion-id}
A database trigger automatically propagates the result to the document:
-- Trigger: update_document_from_ingestion()
UPDATE document
SET structured_data = NEW.structured_data,
needs_human_review = NOT COALESCE(NEW.valid_structured_data, true),
updated_at = now()
WHERE id = NEW.document_id
The job is complete.
All failures—whether soft (extraction quality gate) or hard (S3/OCR/LLM exceptions)—are handled uniformly.
When any exception occurs after claiming, the system:
attempt_count on the ingestion (always, regardless of failure type)UPDATE ingestion
SET attempt_count = attempt_count + 1,
status = CASE
WHEN attempt_count + 1 >= 3 THEN 'failed'
ELSE status
END,
completed_at = CASE
WHEN attempt_count + 1 >= 3 THEN now()
ELSE completed_at
END
WHERE id = {ingestion-id}
RETURNING attempt_count, status
Based on the returned status after the update:
| Status | Action | Reason |
|---|---|---|
failed |
Delete SQS message | Max attempts reached, route to human review |
in-progress |
Let visibility expire | Will retry from beginning |
When an ingestion fails, the database trigger updates the document:
-- needs_human_review is set to true when valid_structured_data is false/null
UPDATE document SET needs_human_review = true WHERE id = {document-id}
When a worker crashes after transcription but before extraction (or any later stage), the SQS message will be redelivered and another attempt begins. To avoid redundant transcription calls:
ingestions/{id}/transcription-output.ednThis is a simple file existence check, not a status-based checkpoint system. Each ingestion stores its own transcription results, providing full auditability.
This unified approach was chosen over separate soft/hard failure handling because:
attempt_count always reflects total attempts| Failure Type | Likely Cause | Retry Strategy |
|---|---|---|
| Transcription quality low | Blurry/dark image | Preprocess and retry |
| Extraction timeout | Transient API issue | Retry as-is |
| Extraction rate limited | Quota exceeded | Retry with backoff (SQS delay) |
| Extraction incomplete | Ambiguous document | Retry once, then human review |
| Critical fields missing | Non-standard format | Human review |
| S3/DB errors | Infrastructure issue | Retry as-is |
| Transcription API error | Transient API issue | Retry as-is |
Configure SQS with a dead-letter queue (DLQ) as a safety net. Messages that fail repeatedly beyond SQS's own retry policy land in the DLQ for investigation. This catches edge cases not handled by application-level retry logic.
If a worker crashes after claiming but before the catch block executes:
started_at timestamp remains setstarted_at < now() - 5 minutesThis is handled directly in the claim query's stale check—no separate cleanup job required.
The system supports graceful shutdown to prevent work loss during deployments.
JVM shutdown hook triggers (SIGTERM, SIGINT, or programmatic shutdown)
Stop the poller: Set the atomic shutdown flag to false. The poller completes its current long-poll cycle (up to 20 seconds) and exits without fetching more messages.
Shutdown main executor: Call executor.shutdown(). No new tasks are accepted, but in-flight jobs continue.
Await termination: Wait for all in-flight jobs to complete with a generous timeout (e.g., 10-15 minutes). Jobs continue running, including their heartbeats maintaining SQS visibility.
Force shutdown if needed: If timeout expires and jobs are still running, call executor.shutdownNow(). Any interrupted jobs will have their messages become visible again in SQS after visibility timeout expires, enabling reprocessing by another worker.
Cleanup schedulers and pools: Shut down the heartbeat scheduler and CPU pool.
See Potential Improvements for detailed proposals on: