Orcha Ingestion: Overview

High-level system architecture for document processing.

---
title: Orcha Document Processing System
---
flowchart TD
    subgraph Ingestion["Document Ingestion"]
        upload["User Upload"]
        email["Email Watcher<br/>(Gmail + Pub/Sub)"]
        api["API/Webhook"]
    end

    subgraph WebhookHandler["Webhook Handler"]
        upsert_doc["Upsert Document<br/>(by content hash)"]
        upload_s3{"New<br/>document?"}
        store_s3["Upload to S3"]
        create_ingestion["Create Ingestion Record<br/>(with source info)"]
        enqueue["Enqueue Ingestion ID"]
    end

    subgraph ConcurrencyArch["Worker Process (JVM)"]

        subgraph Poller["SQS Poller<br/>(1 virtual thread)"]
            poll_loop["Long-poll SQS<br/>WaitTimeSeconds=20<br/>MaxNumberOfMessages=10"]
        end

        subgraph MainExecutor["Main Job Executor<br/>(newVirtualThreadPerTaskExecutor)"]
            submit_job["Submit job task"]

            subgraph JobTask["Job Task (per message)"]
                validate_uuid{"Valid UUID?"}
                invalid_msg["Log error"]
                claim{"Claim: UPDATE ingestion<br/>SET started_at = now()<br/>WHERE status = in-progress<br/>AND started_at IS NULL or stale"}
                skip["Skip: already claimed"]
                load_doc["Load document from DB"]
                schedule_hb["Schedule heartbeat<br/>on shared scheduler"]

                fetch_doc["Fetch from S3"]

                subgraph TranscriptionPhase["Transcription"]
                    call_transcription["Document AI OCR<br/>(with preprocessing if needed)"]
                    store_transcription["Store transcription (EDN)<br/>to /ingestions/{id}/"]
                end

                subgraph ExtractionPhase["Extraction"]
                    call_extraction["LLM extraction"]
                    extraction_check{"Extraction OK?"}
                    store_result["Store JSON in DB<br/>→ trigger updates document"]
                end

                subgraph FailureHandling["Failure Handling"]
                    inc_attempt["Increment attempt<br/>on ingestion"]
                    max_check{"Attempts<br/>< max?"}
                    requeue["Let visibility expire<br/>(will retry)"]
                    mark_failed["status: failed<br/>→ human review"]
                end

                cancel_hb["Cancel heartbeat"]
                delete_msg["Delete SQS message"]
            end
        end

        subgraph SharedScheduler["Heartbeat Scheduler<br/>(ScheduledThreadPool, 2-4 threads)"]
            hb_task["ChangeMessageVisibility<br/>every 30s"]
        end

        subgraph CPUPool["CPU Pool<br/>(FixedThreadPool, N cores)"]
            opencv["OpenCV preprocessing<br/>(deblur, contrast)"]
        end

        subgraph Shutdown["Graceful Shutdown"]
            shutdown_hook["JVM Shutdown Hook"]
            stop_poller["Set poller flag = false"]
            shutdown_exec["executor.shutdown()"]
            await_term["awaitTermination<br/>(10-15 min)"]
            force_shutdown["shutdownNow() if needed"]
        end
    end

    subgraph External["External Services"]
        sqs[("SQS")]
        s3[("S3")]
        postgres[("PostgreSQL")]
        docai["Document AI"]
        llm_api["LLM API"]
    end

    %% Ingestion flow
    upload --> upsert_doc
    email --> upsert_doc
    api --> upsert_doc
    upsert_doc --> upload_s3
    upload_s3 -->|"Yes"| store_s3
    upload_s3 -->|"No"| create_ingestion
    store_s3 --> create_ingestion
    create_ingestion --> enqueue
    enqueue -.-> sqs

    %% Poller flow
    sqs -.-> poll_loop
    poll_loop --> submit_job

    %% Job task flow
    submit_job --> validate_uuid
    validate_uuid -->|"No"| invalid_msg
    invalid_msg --> delete_msg
    validate_uuid -->|"Yes"| claim
    claim -->|"0 rows"| skip
    skip --> delete_msg
    claim -->|"1 row"| load_doc
    load_doc --> schedule_hb
    schedule_hb -.-> hb_task
    schedule_hb --> fetch_doc

    fetch_doc --> call_transcription
    call_transcription -.-> opencv
    call_transcription --> store_transcription
    store_transcription --> call_extraction
    call_extraction --> extraction_check
    extraction_check -->|"Yes"| store_result
    extraction_check -->|"No"| inc_attempt

    store_result --> cancel_hb
    cancel_hb --> delete_msg
    delete_msg -.-> sqs

    inc_attempt --> max_check
    max_check -->|"Yes"| requeue
    max_check -->|"No"| mark_failed
    requeue --> cancel_hb
    mark_failed --> cancel_hb

    %% Shutdown flow
    shutdown_hook --> stop_poller
    stop_poller --> shutdown_exec
    shutdown_exec --> await_term
    await_term --> force_shutdown

    %% External connections
    fetch_doc -.-> s3
    store_transcription -.-> s3
    call_transcription -.-> docai
    call_extraction -.-> llm_api
    store_result -.-> postgres
    claim -.-> postgres
    hb_task -.-> sqs

    %% Styling
    classDef ingestion fill:#e1f5fe,stroke:#01579b
    classDef handler fill:#fff3e0,stroke:#e65100
    classDef poller fill:#e8eaf6,stroke:#3949ab
    classDef executor fill:#f3e5f5,stroke:#7b1fa2
    classDef scheduler fill:#e0f2f1,stroke:#00695c
    classDef cpupool fill:#fce4ec,stroke:#c2185b
    classDef decision fill:#fff9c4,stroke:#f57f17
    classDef failure fill:#ffebee,stroke:#c62828
    classDef success fill:#e8f5e9,stroke:#2e7d32
    classDef shutdown fill:#efebe9,stroke:#4e342e
    classDef external fill:#eceff1,stroke:#546e7a

    class upload,email,api ingestion
    class upsert_doc,upload_s3,store_s3,create_ingestion,enqueue handler
    class poll_loop poller
    class submit_job,schedule_hb,fetch_doc,call_transcription,store_transcription,call_extraction,cancel_hb,delete_msg,skip,load_doc executor
    class hb_task scheduler
    class opencv cpupool
    class validate_uuid,claim,extraction_check,max_check decision
    class invalid_msg,inc_attempt,requeue,mark_failed failure
    class store_result success
    class shutdown_hook,stop_poller,shutdown_exec,await_term,force_shutdown shutdown
    class sqs,s3,postgres,docai,llm_api external

Core Concepts

Document vs Ingestion

The system separates two distinct concepts:

Entity Represents Key Attributes
document A unique piece of content (deduplicated by hash) content_hash, file_path, structured_data
ingestion A processing request for a document document_id, source info, transcription/extraction stats

Why the separation:

Document Lifecycle

Documents do not have an explicit status column. Instead, their state is derived:

Condition Meaning
structured_data IS NOT NULL Document is ready (successfully processed)
Has in-progress ingestion Document is being processed
needs_human_review = true Latest ingestion failed validation

This approach avoids ambiguity when multiple ingestions exist for the same document.

Ingestion Lifecycle

Each ingestion has an explicit status:

Status Description
in-progress Active ingestion (unclaimed or being processed)
completed Successfully extracted structured data
failed Max retries exceeded, requires human review

The claim mechanism uses started_at as a lock: an ingestion can only be claimed if started_at is NULL (never claimed) or older than a configurable threshold (previous worker crashed). This provides both duplicate prevention and crash recovery in one check.


Database Schema

The document table is a content entity:

Column Type Description
id UUID Primary key (UUIDv7 for time-ordering)
tenant_id UUID Tenant ownership
content_hash TEXT SHA-256 hash for deduplication
file_path TEXT S3 path to original document
structured_data JSONB Final extraction output (nullable)
needs_human_review BOOLEAN Flag for failed ingestions
created_at TIMESTAMP When document was first ingested
updated_at TIMESTAMP Last modification time

The ingestion table tracks processing:

Column Type Description
id UUID Primary key (UUIDv7)
document_id UUID Reference to document
doc_source_id UUID Source that triggered ingestion (XOR with uploaded_by)
uploaded_by UUID User who uploaded (XOR with doc_source_id)
status ENUM in-progress, completed, failed
attempt_count INTEGER Number of processing attempts
source_metadata JSONB Origin-specific data
transcription_file_path TEXT S3 path to transcription output
preprocessed_file_path TEXT S3 path to preprocessed image
transcription_quality_score DECIMAL Quality score from transcription
structured_data JSONB Extraction output for this ingestion
started_at TIMESTAMP When processing began
completed_at TIMESTAMP When processing completed

A database trigger copies structured_data from ingestion to document on completion.


Configuration Parameters

Configuration is defined in resources/com/getorcha/config.edn. Key paths use vector notation: [:component :key].

AWS Configuration

Path Default Description
[:com.getorcha/aws :config :region] "eu-central-1" AWS region
[:com.getorcha/aws :config :endpoint] nil MiniStack endpoint (local dev only)
[:com.getorcha/aws :queues :ingestion] "v1-orcha-global-ingest" SQS ingestion queue name
[:com.getorcha/aws :queues :acquisition] "v1-orcha-global-email-acquire" SQS acquisition queue name
[:com.getorcha/aws :s3-buckets :storage] "v1-orcha-global-storage-{suffix}" S3 bucket for documents
[:com.getorcha/aws :s3-buckets :ses-emails] "v1-orcha-ses-emails-{suffix}" S3 bucket for SES emails

Worker Pool Configuration

Path Default Description
[:com.getorcha.workers.ingestion/worker-pools :heartbeat-pool-size] 2 Threads in heartbeat scheduler
[:com.getorcha.workers.ingestion/worker-pools :preprocessing-pool-size-dec] 2 Cores reserved (CPU pool = available - this)
[:com.getorcha.workers.ingestion/worker-pools :timeout-minutes] 10 Graceful shutdown grace period

Orchestrator Configuration

Path Default Description
[:com.getorcha.workers.ingestion/orchestrator :wait-time-seconds] 20 SQS long-poll duration
[:com.getorcha.workers.ingestion/orchestrator :max-queue-messages] 10 Messages per poll
[:com.getorcha.workers.ingestion/orchestrator :heartbeat-rate-seconds] 60 How often to extend visibility
[:com.getorcha.workers.ingestion/orchestrator :heartbeat-extension-seconds] 300 Visibility extension amount
[:com.getorcha.workers.ingestion/orchestrator :ocr-quality-threshold] 0.7 Min quality score before preprocessing
[:com.getorcha.workers.ingestion/orchestrator :max-retry-attempts] 3 Retries before marking failed

External Service Configuration

AWS SQS

AWS S3

Bucket structure:

bucket/
├── documents/
│   ├── {document-uuid}.pdf
│   ├── {document-uuid}.png
│   └── ...
└── ingestions/
    └── {ingestion-uuid}/
        ├── transcription-output.edn
        └── preprocessed.pdf (if applicable)

Google Document AI

LLM API


Observability

Metrics to Track

Alerts

Logging

Structured logging with consistent fields: