Orcha Ingestion: Document Sources

How documents enter the system and the webhook handler design.


Document Sources

Documents enter the system through multiple sources, all converging to a common queue-for-ingestion! function:

Source Endpoint Status
User upload POST /document/new/upload Implemented
Outlook webhook POST /webhooks/outlook Planned (Phase 1)
Gmail Pub/Sub POST /webhooks/gmail Planned (Phase 5)
Direct API/webhook POST /webhook/:tenant-slug Implemented

See Email Integration for details on OAuth-based email ingestion.

User Upload

Multipart file upload from web interface. Accepts PDF and image files.

curl -X POST -F "file=@invoice.pdf" http://localhost:8888/document/new/upload

Response:

{"ingestion/id": "019234ab-...", "document/id": "019234cd-...", "created?": true}

Email Webhooks (Outlook & Gmail)

OAuth-based email integration receives push notifications when new emails arrive. Both providers follow a similar pattern:

  1. Webhook receives notification (provider-specific format)
  2. Webhook validates request and queues sync task
  3. Background worker claims and processes pending syncs
  4. Attachments are extracted and passed to queue-for-ingestion!

See Email Integration for full details.


Webhook Handler Design

All intake endpoints share a common queue-for-ingestion! function located in src/com/getorcha/erp/http/ingestion.clj that:

  1. Computes content hash (SHA-256)
  2. Upserts document by content hash (using PostgreSQL ON CONFLICT)
  3. Detects insert vs update using xmax system column (xmax = 0 means INSERT, xmax > 0 means UPDATE)
  4. Uploads to S3 only if document was newly created
  5. Creates ingestion record with source information
  6. Enqueues ingestion ID to SQS

Document ID Strategy: UUIDv7 + Content Hash

Primary Key: UUIDv7

Documents use UUIDv7 as the primary key instead of serial integers or random UUIDs.

Why UUIDv7?

Property UUIDv7 Random UUID (v4) Serial Integer
B-tree locality Excellent (time-ordered) Poor (random distribution) Excellent
Pre-DB generation Yes Yes No
Collision-free Practically yes Practically yes Yes
Information leakage No No Yes (sequence reveals volume)
Distributed generation Yes Yes Requires coordination

This gives time-ordered inserts for B-tree friendliness while being globally unique without coordination.

PostgreSQL 18+ provides native uuidv7() function. We generate UUIDv7 in Clojure before S3 upload so the document ID is known for the S3 path, then INSERT with explicit ID.

Deduplication: Content Hash

SHA-256 hash of document content stored as a unique secondary index.

CREATE UNIQUE INDEX idx_document_content_hash
ON document(content_hash)
WHERE content_hash IS NOT NULL;

Why not use content hash as primary key?

Content hash could theoretically serve as the document ID (content-addressed storage), but:

  1. S3 path before hashing: We need the document ID for the S3 path before computing the hash (can't hash without reading the content, can't upload without a path)
  2. Re-ingestion scenarios: A document should retain its original ID regardless of re-processing
  3. Hash algorithm changes: If we ever need to change the hash algorithm, the secondary index approach allows migration without rewriting all IDs

Document vs Ingestion: Source Tracking

Source information is stored on the ingestion (ap_ingestion), not the document:

Entity Tracks
document Content only (hash, file path, structured data)
ap_ingestion Who requested processing (uploaded_by OR doc_source_id)

Why this design:

XOR Constraint

Each ingestion must have exactly one source:

CONSTRAINT ingestion_source_xor CHECK (num_nonnulls(doc_source_id, uploaded_by) = 1)
doc_source_id uploaded_by Valid?
SET NULL Yes - from doc-source
NULL SET Yes - manual upload
SET SET No - rejected
NULL NULL No - rejected

Duplicate Handling

When a document is ingested, behavior depends on whether a document with the same content hash exists:

Existing? Action
No Create new document, upload to S3, create ingestion, queue
Yes Reuse existing document, create new ingestion, queue

Key behaviors:

Re-ingestion Scenarios

When the same content is submitted again:

  1. Document upsert finds existing row (xmax > 0)
  2. S3 upload is skipped (file already exists)
  3. New ingestion is created with current source info
  4. Ingestion ID is queued for processing
  5. Worker processes the new ingestion

This means:


File Extension Mapping

The file extension for S3 storage is derived from Content-Type header:

(def mime-type->extension
  {"application/pdf" "pdf"
   "image/png"       "png"
   "image/jpeg"      "jpg"
   "image/tiff"      "tiff"
   "image/gif"       "gif"
   "image/bmp"       "bmp"
   "image/webp"      "webp"})

S3 path format: documents/{document-id}.{ext}


Source Metadata

Each ingestion can store source-specific metadata in ap_ingestion.source_metadata JSONB column:

Source Metadata Example
Upload {:filename "invoice-march.pdf"}
Gmail {:message-id "...", :from "supplier@example.com", :subject "Invoice #123"}
Webhook {:webhook-path "/webhook/erp", :source-ip "..."}

This metadata lives on the ingestion (not document) because:


Email Integration (Outlook & Gmail)

Email ingestion allows customers to connect their email inboxes via OAuth. The system automatically extracts invoice attachments from incoming emails.

Supported providers:

For complete specification, see .prd/email-ingestion.md.

Architecture Overview

┌──────────────┐     ┌──────────────────────┐     ┌─────────────────┐
│ Email        │────▶│ Webhook Handler      │────▶│ pending_sync    │
│ Provider     │     │ POST /webhooks/{prov}│     │ table           │
│ (push notif) │     └──────────────────────┘     └────────┬────────┘
└──────────────┘                                           │
                                                           ▼
┌──────────────┐     ┌──────────────────────┐     ┌─────────────────┐
│ S3           │◀────│ Sync Worker          │◀────│ Claim pending   │
│ + SQS        │     │ (background)         │     │ FOR UPDATE      │
└──────────────┘     └──────────────────────┘     │ SKIP LOCKED     │
       │                                           └─────────────────┘
       ▼
┌──────────────────────────────────────────────────────────────────┐
│            Existing Ingestion Pipeline (SQS → Workers)           │
└──────────────────────────────────────────────────────────────────┘

Provider Abstraction

Both providers implement a common protocol:

(defprotocol EmailProvider
  (create-subscription! [this doc-source-id])
  (renew-subscription! [this doc-source-id])
  (sync-since! [this doc-source-id last-sync-token])
  (fetch-message [this message-id])
  (refresh-token! [this doc-source-id]))
Capability Outlook Gmail
Push method Direct webhook Pub/Sub topic
Sync API Delta query History API
Watermark deltaLink URL historyId BIGINT
Subscription lifetime 7 days 7 days

OAuth Flow

  1. User clicks "Connect Outlook" or "Connect Gmail"
  2. Redirect to provider OAuth consent screen
  3. User grants read-only mail access
  4. Callback exchanges code for tokens
  5. Refresh token stored in AWS SSM (KMS-encrypted)
  6. Webhook subscription created with provider

Webhook Processing

Webhooks are processed asynchronously to ensure fast response times:

  1. Validate - Check clientState (Outlook) or verification token (Gmail)
  2. Lookup - Find doc_source_id by subscription ID or email address
  3. Queue - Upsert into ap_doc_source_email_pending_sync table
  4. Respond - Return 200 OK immediately

The ap_doc_source_email_pending_sync table uses doc_source_id as primary key, ensuring multiple notifications coalesce into a single sync operation.

Sync Worker

Background worker polls for pending syncs:

;; Claim atomically with SELECT FOR UPDATE SKIP LOCKED
(db/execute!
  "UPDATE ap_doc_source_email_pending_sync
   SET status = 'processing'
   WHERE doc_source_id = (
     SELECT doc_source_id FROM ap_doc_source_email_pending_sync
     WHERE status = 'pending'
     LIMIT 1 FOR UPDATE SKIP LOCKED
   ) RETURNING doc_source_id")

For each claimed sync:

  1. Refresh OAuth access token from SSM
  2. Call provider delta/history API with stored watermark
  3. For each new message, apply heuristic filter
  4. Extract attachments from accepted emails
  5. Call queue-for-ingestion! for each attachment
  6. Update watermark in ap_doc_source_email.last_sync_token

Two-Stage Classification

Emails pass through two relevancy filters:

Stage Location Method Purpose
1 Sync worker Heuristic Fast reject of obvious non-invoices
2 Ingestion worker LLM Accurate classification, can mark as "skipped"

Stage 1 (Heuristic):

Stage 2 (LLM):

Database Schema

ap_doc_source_email extensions:

connection_status         email_connection_status  -- active, token_refresh_failed, etc.
subscription_expires_at   TIMESTAMPTZ              -- for proactive renewal
provider_subscription_id  TEXT                     -- Outlook subscription ID
last_sync_token          TEXT                     -- deltaLink or historyId
last_successful_sync_at  TIMESTAMPTZ              -- health monitoring

ap_doc_source_email_pending_sync table:

doc_source_id  UUID PRIMARY KEY  -- coalesces notifications
queued_at      TIMESTAMPTZ
attempts       INTEGER           -- retry tracking
last_error     TEXT
status         TEXT              -- 'pending' or 'processing'

ap_ingestion extensions:

status = 'skipped'  -- new enum value
skip_reason TEXT    -- e.g., "marketing_material", "signature_image"

Source Metadata for Email

Email ingestions store metadata in ap_ingestion.source_metadata:

{
  "provider": "outlook",
  "message_id": "<abc123@mail.example.com>",
  "from": "supplier@example.com",
  "subject": "Invoice #12345 - January 2026",
  "received_at": "2026-01-02T10:30:00Z",
  "attachment_filename": "invoice-12345.pdf"
}

This enables:

Deduplication

Three-layer deduplication prevents duplicate processing:

Layer Mechanism What it prevents
Sync token last_sync_token watermark Re-fetching seen emails
Message ID Track message_id per source Re-processing same email
Content hash Existing queue-for-ingestion! Duplicate S3 uploads

Connection Health

Connection status is tracked for monitoring:

CREATE TYPE email_connection_status AS ENUM (
    'active',              -- working normally
    'token_refresh_failed', -- needs re-authorization
    'subscription_expired', -- webhook subscription lapsed
    'provider_error'        -- API returning errors
);