High-level system architecture for document processing.
---
title: Orcha Document Processing System
---
flowchart TD
subgraph Ingestion["Document Ingestion"]
upload["User Upload"]
email["Email Watcher<br/>(Gmail + Pub/Sub)"]
api["API/Webhook"]
end
subgraph WebhookHandler["Webhook Handler"]
upsert_doc["Upsert Document<br/>(by content hash)"]
upload_s3{"New<br/>document?"}
store_s3["Upload to S3"]
create_ingestion["Create Ingestion Record<br/>(with source info)"]
enqueue["Enqueue Ingestion ID"]
end
subgraph ConcurrencyArch["Worker Process (JVM)"]
subgraph Poller["SQS Poller<br/>(1 virtual thread)"]
poll_loop["Long-poll SQS<br/>WaitTimeSeconds=20<br/>MaxNumberOfMessages=10"]
end
subgraph MainExecutor["Main Job Executor<br/>(newVirtualThreadPerTaskExecutor)"]
submit_job["Submit job task"]
subgraph JobTask["Job Task (per message)"]
validate_uuid{"Valid UUID?"}
invalid_msg["Log error"]
claim{"Claim: UPDATE ingestion<br/>SET started_at = now()<br/>WHERE status = in-progress<br/>AND started_at IS NULL or stale"}
skip["Skip: already claimed"]
load_doc["Load document from DB"]
schedule_hb["Schedule heartbeat<br/>on shared scheduler"]
fetch_doc["Fetch from S3"]
subgraph TranscriptionPhase["Transcription"]
call_transcription["Document AI OCR<br/>(with preprocessing if needed)"]
store_transcription["Store transcription (EDN)<br/>to /ingestions/{id}/"]
end
subgraph ExtractionPhase["Extraction"]
call_extraction["LLM extraction"]
extraction_check{"Extraction OK?"}
store_result["Store JSON in DB<br/>→ trigger updates document"]
end
subgraph FailureHandling["Failure Handling"]
inc_attempt["Increment attempt<br/>on ingestion"]
max_check{"Attempts<br/>< max?"}
requeue["Let visibility expire<br/>(will retry)"]
mark_failed["status: failed<br/>→ human review"]
end
cancel_hb["Cancel heartbeat"]
delete_msg["Delete SQS message"]
end
end
subgraph SharedScheduler["Heartbeat Scheduler<br/>(ScheduledThreadPool, 2-4 threads)"]
hb_task["ChangeMessageVisibility<br/>every 30s"]
end
subgraph CPUPool["CPU Pool<br/>(FixedThreadPool, N cores)"]
opencv["OpenCV preprocessing<br/>(deblur, contrast)"]
end
subgraph Shutdown["Graceful Shutdown"]
shutdown_hook["JVM Shutdown Hook"]
stop_poller["Set poller flag = false"]
shutdown_exec["executor.shutdown()"]
await_term["awaitTermination<br/>(10-15 min)"]
force_shutdown["shutdownNow() if needed"]
end
end
subgraph External["External Services"]
sqs[("SQS")]
s3[("S3")]
postgres[("PostgreSQL")]
docai["Document AI"]
llm_api["LLM API"]
end
%% Ingestion flow
upload --> upsert_doc
email --> upsert_doc
api --> upsert_doc
upsert_doc --> upload_s3
upload_s3 -->|"Yes"| store_s3
upload_s3 -->|"No"| create_ingestion
store_s3 --> create_ingestion
create_ingestion --> enqueue
enqueue -.-> sqs
%% Poller flow
sqs -.-> poll_loop
poll_loop --> submit_job
%% Job task flow
submit_job --> validate_uuid
validate_uuid -->|"No"| invalid_msg
invalid_msg --> delete_msg
validate_uuid -->|"Yes"| claim
claim -->|"0 rows"| skip
skip --> delete_msg
claim -->|"1 row"| load_doc
load_doc --> schedule_hb
schedule_hb -.-> hb_task
schedule_hb --> fetch_doc
fetch_doc --> call_transcription
call_transcription -.-> opencv
call_transcription --> store_transcription
store_transcription --> call_extraction
call_extraction --> extraction_check
extraction_check -->|"Yes"| store_result
extraction_check -->|"No"| inc_attempt
store_result --> cancel_hb
cancel_hb --> delete_msg
delete_msg -.-> sqs
inc_attempt --> max_check
max_check -->|"Yes"| requeue
max_check -->|"No"| mark_failed
requeue --> cancel_hb
mark_failed --> cancel_hb
%% Shutdown flow
shutdown_hook --> stop_poller
stop_poller --> shutdown_exec
shutdown_exec --> await_term
await_term --> force_shutdown
%% External connections
fetch_doc -.-> s3
store_transcription -.-> s3
call_transcription -.-> docai
call_extraction -.-> llm_api
store_result -.-> postgres
claim -.-> postgres
hb_task -.-> sqs
%% Styling
classDef ingestion fill:#e1f5fe,stroke:#01579b
classDef handler fill:#fff3e0,stroke:#e65100
classDef poller fill:#e8eaf6,stroke:#3949ab
classDef executor fill:#f3e5f5,stroke:#7b1fa2
classDef scheduler fill:#e0f2f1,stroke:#00695c
classDef cpupool fill:#fce4ec,stroke:#c2185b
classDef decision fill:#fff9c4,stroke:#f57f17
classDef failure fill:#ffebee,stroke:#c62828
classDef success fill:#e8f5e9,stroke:#2e7d32
classDef shutdown fill:#efebe9,stroke:#4e342e
classDef external fill:#eceff1,stroke:#546e7a
class upload,email,api ingestion
class upsert_doc,upload_s3,store_s3,create_ingestion,enqueue handler
class poll_loop poller
class submit_job,schedule_hb,fetch_doc,call_transcription,store_transcription,call_extraction,cancel_hb,delete_msg,skip,load_doc executor
class hb_task scheduler
class opencv cpupool
class validate_uuid,claim,extraction_check,max_check decision
class invalid_msg,inc_attempt,requeue,mark_failed failure
class store_result success
class shutdown_hook,stop_poller,shutdown_exec,await_term,force_shutdown shutdown
class sqs,s3,postgres,docai,llm_api external
The system separates two distinct concepts:
| Entity | Represents | Key Attributes |
|---|---|---|
document |
A unique piece of content (deduplicated by hash) | content_hash, file_path, structured_data |
ingestion |
A processing request for a document | document_id, source info, transcription/extraction stats |
Why the separation:
Documents do not have an explicit status column. Instead, their state is derived:
| Condition | Meaning |
|---|---|
structured_data IS NOT NULL |
Document is ready (successfully processed) |
Has in-progress ingestion |
Document is being processed |
needs_human_review = true |
Latest ingestion failed validation |
This approach avoids ambiguity when multiple ingestions exist for the same document.
Each ingestion has an explicit status:
| Status | Description |
|---|---|
in-progress |
Active ingestion (unclaimed or being processed) |
completed |
Successfully extracted structured data |
failed |
Max retries exceeded, requires human review |
The claim mechanism uses started_at as a lock: an ingestion can only be claimed if started_at is NULL (never claimed) or older than a configurable threshold (previous worker crashed). This provides both duplicate prevention and crash recovery in one check.
The document table is a content entity:
| Column | Type | Description |
|---|---|---|
id |
UUID | Primary key (UUIDv7 for time-ordering) |
tenant_id |
UUID | Tenant ownership |
content_hash |
TEXT | SHA-256 hash for deduplication |
file_path |
TEXT | S3 path to original document |
structured_data |
JSONB | Final extraction output (nullable) |
needs_human_review |
BOOLEAN | Flag for failed ingestions |
created_at |
TIMESTAMP | When document was first ingested |
updated_at |
TIMESTAMP | Last modification time |
The ingestion table tracks processing:
| Column | Type | Description |
|---|---|---|
id |
UUID | Primary key (UUIDv7) |
document_id |
UUID | Reference to document |
doc_source_id |
UUID | Source that triggered ingestion (XOR with uploaded_by) |
uploaded_by |
UUID | User who uploaded (XOR with doc_source_id) |
status |
ENUM | in-progress, completed, failed |
attempt_count |
INTEGER | Number of processing attempts |
source_metadata |
JSONB | Origin-specific data |
transcription_file_path |
TEXT | S3 path to transcription output |
preprocessed_file_path |
TEXT | S3 path to preprocessed image |
transcription_quality_score |
DECIMAL | Quality score from transcription |
structured_data |
JSONB | Extraction output for this ingestion |
started_at |
TIMESTAMP | When processing began |
completed_at |
TIMESTAMP | When processing completed |
A database trigger copies structured_data from ingestion to document on completion.
Configuration is defined in resources/com/getorcha/config.edn. Key paths use vector notation: [:component :key].
| Path | Default | Description |
|---|---|---|
[:com.getorcha/aws :config :region] |
"eu-central-1" |
AWS region |
[:com.getorcha/aws :config :endpoint] |
nil |
MiniStack endpoint (local dev only) |
[:com.getorcha/aws :queues :ingestion] |
"v1-orcha-global-ingest" |
SQS ingestion queue name |
[:com.getorcha/aws :queues :acquisition] |
"v1-orcha-global-email-acquire" |
SQS acquisition queue name |
[:com.getorcha/aws :s3-buckets :storage] |
"v1-orcha-global-storage-{suffix}" |
S3 bucket for documents |
[:com.getorcha/aws :s3-buckets :ses-emails] |
"v1-orcha-ses-emails-{suffix}" |
S3 bucket for SES emails |
| Path | Default | Description |
|---|---|---|
[:com.getorcha.workers.ingestion/worker-pools :heartbeat-pool-size] |
2 | Threads in heartbeat scheduler |
[:com.getorcha.workers.ingestion/worker-pools :preprocessing-pool-size-dec] |
2 | Cores reserved (CPU pool = available - this) |
[:com.getorcha.workers.ingestion/worker-pools :timeout-minutes] |
10 | Graceful shutdown grace period |
| Path | Default | Description |
|---|---|---|
[:com.getorcha.workers.ingestion/orchestrator :wait-time-seconds] |
20 | SQS long-poll duration |
[:com.getorcha.workers.ingestion/orchestrator :max-queue-messages] |
10 | Messages per poll |
[:com.getorcha.workers.ingestion/orchestrator :heartbeat-rate-seconds] |
60 | How often to extend visibility |
[:com.getorcha.workers.ingestion/orchestrator :heartbeat-extension-seconds] |
300 | Visibility extension amount |
[:com.getorcha.workers.ingestion/orchestrator :ocr-quality-threshold] |
0.7 | Min quality score before preprocessing |
[:com.getorcha.workers.ingestion/orchestrator :max-retry-attempts] |
3 | Retries before marking failed |
Bucket structure:
bucket/
├── documents/
│ ├── {document-uuid}.pdf
│ ├── {document-uuid}.png
│ └── ...
└── ingestions/
└── {ingestion-uuid}/
├── transcription-output.edn
└── preprocessed.pdf (if applicable)
documents/{document-id}.{ext}ingestions/{ingestion-id}/transcription-output.edningestions/{ingestion-id}/preprocessed.{ext}Structured logging with consistent fields:
ingestion_id: Primary trace field for all processing logsdocument_id: Content referenceworker_id: Identifies which worker instanceattempt: Current attempt numberstep: Which pipeline step (claim, transcription, extraction, etc.)duration_ms: Time taken for operationsstatus: Outcome of step