How documents enter the system and the webhook handler design.
Documents enter the system through multiple sources, all converging to a common queue-for-ingestion! function:
| Source | Endpoint | Status |
|---|---|---|
| User upload | POST /document/new/upload |
Implemented |
| Outlook webhook | POST /webhooks/outlook |
Planned (Phase 1) |
| Gmail Pub/Sub | POST /webhooks/gmail |
Planned (Phase 5) |
| Direct API/webhook | POST /webhook/:tenant-slug |
Implemented |
See Email Integration for details on OAuth-based email ingestion.
Multipart file upload from web interface. Accepts PDF and image files.
curl -X POST -F "file=@invoice.pdf" http://localhost:8888/document/new/upload
Response:
{"ingestion/id": "019234ab-...", "document/id": "019234cd-...", "created?": true}
OAuth-based email integration receives push notifications when new emails arrive. Both providers follow a similar pattern:
queue-for-ingestion!See Email Integration for full details.
All intake endpoints share a common queue-for-ingestion! function located in src/com/getorcha/erp/http/ingestion.clj that:
ON CONFLICT)xmax system column (xmax = 0 means INSERT, xmax > 0 means UPDATE)Documents use UUIDv7 as the primary key instead of serial integers or random UUIDs.
Why UUIDv7?
| Property | UUIDv7 | Random UUID (v4) | Serial Integer |
|---|---|---|---|
| B-tree locality | Excellent (time-ordered) | Poor (random distribution) | Excellent |
| Pre-DB generation | Yes | Yes | No |
| Collision-free | Practically yes | Practically yes | Yes |
| Information leakage | No | No | Yes (sequence reveals volume) |
| Distributed generation | Yes | Yes | Requires coordination |
This gives time-ordered inserts for B-tree friendliness while being globally unique without coordination.
PostgreSQL 18+ provides native uuidv7() function. We generate UUIDv7 in Clojure before S3 upload so the document ID is known for the S3 path, then INSERT with explicit ID.
SHA-256 hash of document content stored as a unique secondary index.
CREATE UNIQUE INDEX idx_document_content_hash
ON document(content_hash)
WHERE content_hash IS NOT NULL;
Why not use content hash as primary key?
Content hash could theoretically serve as the document ID (content-addressed storage), but:
Source information is stored on the ingestion (ap_ingestion), not the document:
| Entity | Tracks |
|---|---|
document |
Content only (hash, file path, structured data) |
ap_ingestion |
Who requested processing (uploaded_by OR doc_source_id) |
Why this design:
Each ingestion must have exactly one source:
CONSTRAINT ingestion_source_xor CHECK (num_nonnulls(doc_source_id, uploaded_by) = 1)
doc_source_id |
uploaded_by |
Valid? |
|---|---|---|
| SET | NULL | Yes - from doc-source |
| NULL | SET | Yes - manual upload |
| SET | SET | No - rejected |
| NULL | NULL | No - rejected |
When a document is ingested, behavior depends on whether a document with the same content hash exists:
| Existing? | Action |
|---|---|
| No | Create new document, upload to S3, create ingestion, queue |
| Yes | Reuse existing document, create new ingestion, queue |
Key behaviors:
When the same content is submitted again:
This means:
The file extension for S3 storage is derived from Content-Type header:
(def mime-type->extension
{"application/pdf" "pdf"
"image/png" "png"
"image/jpeg" "jpg"
"image/tiff" "tiff"
"image/gif" "gif"
"image/bmp" "bmp"
"image/webp" "webp"})
S3 path format: documents/{document-id}.{ext}
Each ingestion can store source-specific metadata in ap_ingestion.source_metadata JSONB column:
| Source | Metadata Example |
|---|---|
| Upload | {:filename "invoice-march.pdf"} |
| Gmail | {:message-id "...", :from "supplier@example.com", :subject "Invoice #123"} |
| Webhook | {:webhook-path "/webhook/erp", :source-ip "..."} |
This metadata lives on the ingestion (not document) because:
Email ingestion allows customers to connect their email inboxes via OAuth. The system automatically extracts invoice attachments from incoming emails.
Supported providers:
For complete specification, see .prd/email-ingestion.md.
┌──────────────┐ ┌──────────────────────┐ ┌─────────────────┐
│ Email │────▶│ Webhook Handler │────▶│ pending_sync │
│ Provider │ │ POST /webhooks/{prov}│ │ table │
│ (push notif) │ └──────────────────────┘ └────────┬────────┘
└──────────────┘ │
▼
┌──────────────┐ ┌──────────────────────┐ ┌─────────────────┐
│ S3 │◀────│ Sync Worker │◀────│ Claim pending │
│ + SQS │ │ (background) │ │ FOR UPDATE │
└──────────────┘ └──────────────────────┘ │ SKIP LOCKED │
│ └─────────────────┘
▼
┌──────────────────────────────────────────────────────────────────┐
│ Existing Ingestion Pipeline (SQS → Workers) │
└──────────────────────────────────────────────────────────────────┘
Both providers implement a common protocol:
(defprotocol EmailProvider
(create-subscription! [this doc-source-id])
(renew-subscription! [this doc-source-id])
(sync-since! [this doc-source-id last-sync-token])
(fetch-message [this message-id])
(refresh-token! [this doc-source-id]))
| Capability | Outlook | Gmail |
|---|---|---|
| Push method | Direct webhook | Pub/Sub topic |
| Sync API | Delta query | History API |
| Watermark | deltaLink URL |
historyId BIGINT |
| Subscription lifetime | 7 days | 7 days |
Webhooks are processed asynchronously to ensure fast response times:
clientState (Outlook) or verification token (Gmail)doc_source_id by subscription ID or email addressap_doc_source_email_pending_sync tableThe ap_doc_source_email_pending_sync table uses doc_source_id as primary key, ensuring multiple notifications coalesce into a single sync operation.
Background worker polls for pending syncs:
;; Claim atomically with SELECT FOR UPDATE SKIP LOCKED
(db/execute!
"UPDATE ap_doc_source_email_pending_sync
SET status = 'processing'
WHERE doc_source_id = (
SELECT doc_source_id FROM ap_doc_source_email_pending_sync
WHERE status = 'pending'
LIMIT 1 FOR UPDATE SKIP LOCKED
) RETURNING doc_source_id")
For each claimed sync:
queue-for-ingestion! for each attachmentap_doc_source_email.last_sync_tokenEmails pass through two relevancy filters:
| Stage | Location | Method | Purpose |
|---|---|---|---|
| 1 | Sync worker | Heuristic | Fast reject of obvious non-invoices |
| 2 | Ingestion worker | LLM | Accurate classification, can mark as "skipped" |
Stage 1 (Heuristic):
Stage 2 (LLM):
fetch-files-from-s3! stage for email-sourced ingestionsskipped with reason (e.g., "marketing_material")ap_doc_source_email extensions:
connection_status email_connection_status -- active, token_refresh_failed, etc.
subscription_expires_at TIMESTAMPTZ -- for proactive renewal
provider_subscription_id TEXT -- Outlook subscription ID
last_sync_token TEXT -- deltaLink or historyId
last_successful_sync_at TIMESTAMPTZ -- health monitoring
ap_doc_source_email_pending_sync table:
doc_source_id UUID PRIMARY KEY -- coalesces notifications
queued_at TIMESTAMPTZ
attempts INTEGER -- retry tracking
last_error TEXT
status TEXT -- 'pending' or 'processing'
ap_ingestion extensions:
status = 'skipped' -- new enum value
skip_reason TEXT -- e.g., "marketing_material", "signature_image"
Email ingestions store metadata in ap_ingestion.source_metadata:
{
"provider": "outlook",
"message_id": "<abc123@mail.example.com>",
"from": "supplier@example.com",
"subject": "Invoice #12345 - January 2026",
"received_at": "2026-01-02T10:30:00Z",
"attachment_filename": "invoice-12345.pdf"
}
This enables:
message_idThree-layer deduplication prevents duplicate processing:
| Layer | Mechanism | What it prevents |
|---|---|---|
| Sync token | last_sync_token watermark |
Re-fetching seen emails |
| Message ID | Track message_id per source |
Re-processing same email |
| Content hash | Existing queue-for-ingestion! |
Duplicate S3 uploads |
Connection status is tracked for monitoring:
CREATE TYPE email_connection_status AS ENUM (
'active', -- working normally
'token_refresh_failed', -- needs re-authorization
'subscription_expired', -- webhook subscription lapsed
'provider_error' -- API returning errors
);