Comprehensive architecture for email-based document acquisition in the Orcha system.
Email acquisition enables automatic extraction of invoices and financial documents from customer email inboxes. The system:
Key principle: Acquisition and Ingestion are separate concerns.
┌──────────────────────────────────────────────────────────────────────────────┐
│ ERP SERVICE │
│ │
│ ┌─────────────────────┐ ┌─────────────────────────────────────────────┐ │
│ │ OAuth Endpoints │ │ Webhook Handlers │ │
│ │ │ │ │ │
│ │ GET /oauth/ │ │ POST /webhooks/outlook │ │
│ │ outlook/ │ │ 1. Validate clientState │ │
│ │ authorize │ │ 2. Look up ap_doc_source │ │
│ │ │ │ 3. Send SQS message │ │
│ │ GET /oauth/ │ │ 4. Return 200 OK (~50ms) │ │
│ │ outlook/ │ │ │ │
│ │ callback │ │ POST /webhooks/gmail │ │
│ │ │ │ (similar, validates Pub/Sub token) │ │
│ │ Stores tokens │ │ │ │
│ │ in AWS SSM │ └─────────────────────┬───────────────────────┘ │
│ └─────────────────────┘ │ │
│ │ │
└───────────────────────────────────────────────────┼──────────────────────────┘
│
SQS: email-sync-queue
Message: {doc_source_id: UUID}
│
▼
┌──────────────────────────────────────────────────────────────────────────────┐
│ WORKERS SERVICE │
│ │
│ ┌─────────────────────────────────────┐ ┌───────────────────────────────┐ │
│ │ Acquisition Orchestrator │ │ Ingestion Orchestrator │ │
│ │ (::ap.acquisition/orchestrator) │ │ (::ap.ingestion/orchestrator) │ │
│ │ │ │ │ │
│ │ • Polls acquisition SQS queue │ │ • Polls ingestion SQS queue │ │
│ │ • Dispatches to vthread pool │ │ • Dispatches to vthread pool │ │
│ │ • Simpler: no heartbeat/preproc │ │ • Handles visibility beat │ │
│ └────────────────┬────────────────────┘ └───────────────┬───────────────┘ │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────────────────────────────────┐ ┌───────────────────────────────┐ │
│ │ Email Acquisition Worker │ │ Ingestion Worker │ │
│ │ │ │ │ │
│ │ 1. Claim inbox (coalesce) │ │ 1. Claim ingestion │ │
│ │ 2. Load/refresh OAuth token │ │ 2. Fetch from S3 │ │
│ │ 3. Delta sync (fetch new msgs) │ │ 3. Transcribe (OCR/PDF/HTML) │ │
│ │ 4. For each message: │ │ 4. Extract (LLM) │ │
│ │ a. Relevancy filter │ │ 5. Validate & complete │ │
│ │ b. LLM triage │ │ │ │
│ │ c. Upload to S3 │ │ Assumes input is valid │ │
│ │ d. Queue ingestion │ │ │ │
│ │ 5. Update sync token │ │ │ │
│ │ 6. Delete SQS message │ │ │ │
│ └────────────────┬────────────────────┘ └───────────────────────────────┘ │
│ │ ▲ │
│ │ SQS: ingestion-queue │ │
│ └────────────────────────────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────────────────┘
com.getorcha.workers
├── ap/
│ ├── acquisition.clj ; AP acquisition orchestrator (poller, job dispatch)
│ ├── acquisition/
│ │ ├── email.clj ; Email-specific acquisition logic
│ │ ├── email/
│ │ │ ├── outlook.clj ; Microsoft Graph provider
│ │ │ └── gmail.clj ; Gmail API provider (future)
│ │ └── triage.clj ; LLM email triage
│ │
│ ├── ingestion.clj ; AP ingestion orchestrator (poller, job dispatch)
│ └── ingestion/
│ ├── transcription.clj ; Document → text
│ └── extraction.clj ; Text → structured data
Provider sends push notification when new email arrives.
;; Outlook webhook payload
{:value [{:subscriptionId "abc123"
:changeType "created"
:resource "Users/{user-id}/Messages/{message-id}"
:clientState "{doc-source-id}:{secret}"}]}
;; Gmail Pub/Sub payload
{:message {:data "<base64: {emailAddress, historyId}>"
:messageId "..."}}
Webhook handler (ERP service):
(defn outlook-webhook-handler
[{:keys [db-pool sqs-client queue-url]} request]
;; 1. Handle validation challenge (Microsoft sends on subscription creation)
(when-let [validation-token (get-in request [:params :validationToken])]
(return {:status 200 :body validation-token}))
;; 2. Parse and validate notifications
(let [notifications (get-in request [:body :value])]
(doseq [{:keys [clientState]} notifications]
;; 3. Validate clientState (timing-safe comparison)
(let [[doc-source-id secret] (parse-client-state clientState)
stored-state (db/get-client-state db-pool doc-source-id)]
(when (timing-safe-equals? clientState stored-state)
;; 4. Queue to SQS
(aws/send-message! sqs-client queue-url (str doc-source-id))))))
;; 5. Return immediately
{:status 202})
Polls SQS queue and dispatches to worker pool.
(defmethod ig/init-key ::orchestrator
[_ {:keys [aws worker-pools] :as config}]
(let [{:keys [email-sync-queue-name]} aws
{:keys [executor]} worker-pools
polling? (atom true)
sqs-client (aws/build-sqs-client (:config aws))
queue-url (aws/get-queue-url sqs-client email-sync-queue-name)]
;; Start polling loop on virtual thread
(.submit executor
(fn []
(while @polling?
(let [messages (aws/receive-messages sqs-client queue-url
{:max-messages 10
:wait-time-seconds 20})]
(doseq [message messages]
;; Dispatch each message to worker pool
(.submit executor (acquisition-job-handler config message)))))))
;; Return shutdown fn
{:shutdown! #(reset! polling? false)}))
Multiple webhooks for the same inbox coalesce into one sync.
(defn ^:private claim-inbox!
"Claims inbox for processing. Uses pending_sync table for coalescing.
Returns claimed doc-source-id or nil if already processing."
[db-pool doc-source-id timeout-seconds]
(db.sql/execute-one!
db-pool
{:update :ap-doc-source-email-pending-sync
:set {:status [:cast "processing" :email-pending-sync-status]
:claimed-at [:now]}
:where [:and
[:= :doc-source-id doc-source-id]
[:or
[:= :status [:cast "pending" :email-pending-sync-status]]
;; Crash recovery: reclaim stale processing rows
[:and
[:= :status [:cast "processing" :email-pending-sync-status]]
[:< :claimed-at
[:- [:now] [:raw (str "interval '" timeout-seconds " seconds'")]]]]]]
:returning [:doc-source-id]}))
;; Before claiming, upsert to ensure row exists
(defn ^:private ensure-pending-sync!
"Creates or updates pending sync row. Coalesces multiple webhooks."
[db-pool doc-source-id]
(db.sql/execute-one!
db-pool
{:insert-into :ap-doc-source-email-pending-sync
:values [{:doc-source-id doc-source-id
:status [:cast "pending" :email-pending-sync-status]}]
:on-conflict [:doc-source-id]
:do-update-set {:status [:cast "pending" :email-pending-sync-status]
:claimed-at nil}}))
Coalescing behavior:
Load OAuth tokens from SSM, refresh if expiring soon.
(defn ^:private ensure-access-token!
"Returns valid access token, refreshing if needed."
[{:keys [ssm-client provider-config]} doc-source-id]
(let [token-path (str (:token-path-pattern provider-config) doc-source-id)
{:keys [access-token refresh-token expires-at]}
(ssm/get-parameter-json ssm-client token-path)]
;; Proactive refresh: refresh if < 5 minutes to expiry
(if (token-valid-for? expires-at (Duration/ofMinutes 5))
access-token
(let [new-tokens (outlook/refresh-tokens! provider-config refresh-token)]
(ssm/put-parameter-json! ssm-client token-path new-tokens)
(:access-token new-tokens)))))
Fetch new messages since last sync using provider's delta API.
;; Outlook delta sync
(defn sync-messages!
"Fetches new messages using delta query. Returns {:messages [...] :delta-link \"...\"}."
[access-token last-delta-link]
(let [url (or last-delta-link
"https://graph.microsoft.com/v1.0/me/mailFolders('Inbox')/messages/delta?$select=id,subject,from,receivedDateTime,hasAttachments")]
(loop [url url
all-messages []]
(let [response (http/get url {:oauth-token access-token})
messages (get-in response [:body :value])
next-link (get-in response [:body "@odata.nextLink"])
delta-link (get-in response [:body "@odata.deltaLink"])]
(if next-link
(recur next-link (into all-messages messages))
{:messages (into all-messages messages)
:delta-link delta-link})))))
For each new message: relevancy filter → LLM triage → upload → queue.
(defn ^:private process-messages!
"Processes messages through filter → triage → upload → queue pipeline."
[{:keys [db-pool aws llm-config] :as context}
{:keys [doc-source-id access-token]}
messages]
(doseq [message messages]
(let [message-id (:id message)]
;; Skip if already processed (deduplication)
(when-not (message-processed? db-pool doc-source-id message-id)
(try
(process-message! context doc-source-id access-token message)
(catch Exception e
;; Log error, continue with next message
(record-message-error! db-pool doc-source-id message-id e)))))))
(defn ^:private process-message!
"Processes single message through pipeline."
[{:keys [db-pool aws llm-config] :as context}
doc-source-id access-token message]
(let [;; Fetch full message with body and attachment metadata
full-message (outlook/fetch-message access-token (:id message))
;; Step 6a: Relevancy filter (fast heuristic)
filter-result (relevant-email? full-message)]
(if-not (:relevant? filter-result)
;; Rejected by filter - record and skip
(record-message! db-pool doc-source-id (:id message)
:rejected (:reason filter-result))
;; Step 6b: LLM triage
(let [triage-result (triage/analyze-email llm-config full-message)]
(if (empty? (:extractable-items triage-result))
;; Nothing extractable - record and skip
(record-message! db-pool doc-source-id (:id message)
:rejected (:skip-reason triage-result))
;; Step 6c & 6d: Upload and queue each extractable item
(do
(doseq [item (:extractable-items triage-result)]
(upload-and-queue! context doc-source-id full-message item triage-result))
(record-message! db-pool doc-source-id (:id message) :processed nil)))))))
Fast heuristic check to reject obvious non-invoices before LLM call.
(def ^:private spam-sender-patterns
#{"newsletter" "no-reply" "noreply" "marketing" "update" "digest"
"subscriptions" "info@" "promo" "notification"})
(def ^:private spam-subject-patterns
#{"deal" "offer" "sale" "weekly" "monthly" "digest" "newsletter"
"update" "promotion" "unsubscribe"})
(def ^:private invoice-keywords
#{"invoice" "rechnung" "bill" "factura" "receipt" "quittung" "faktura"})
(defn relevant-email?
"Quick heuristic check. Returns {:relevant? bool :reason string}."
[{:keys [from subject body attachments]}]
(let [from-lower (str/lower-case (or from ""))
subject-lower (str/lower-case (or subject ""))
has-invoice-keyword? (some #(str/includes? subject-lower %) invoice-keywords)
has-document-attachments? (some document-attachment? attachments)]
(cond
;; Reject spam senders first (unless invoice keyword present)
(some #(str/includes? from-lower %) spam-sender-patterns)
{:relevant? false :reason "spam-sender"}
;; Reject spam subjects
(some #(str/includes? subject-lower %) spam-subject-patterns)
{:relevant? false :reason "spam-subject"}
;; Accept if has document attachments
has-document-attachments?
{:relevant? true :reason "has-document-attachments"}
;; Accept if subject has invoice keywords
has-invoice-keyword?
{:relevant? true :reason "invoice-keyword-in-subject"}
;; Accept if body has financial keywords
(>= (count-financial-keywords body) 3)
{:relevant? true :reason "financial-keywords-in-body"}
;; Default: reject
:else
{:relevant? false :reason "no-invoice-signals"})))
Determines what documents can be extracted from the email.
(def TriageResult
"Schema for triage output."
[:map
[:action [:enum :process :skip]]
[:skip-reason {:optional true} [:maybe :string]]
[:extractable-items
[:vector
[:map
[:kind [:enum :attachment :body :download-link]]
[:filename {:optional true} :string]
[:content-type {:optional true} :string]
[:format {:optional true} [:enum :html :text]]
[:url {:optional true} :string]
[:document-type-hint {:optional true} [:enum :invoice :purchase-order :receipt :unknown]]]]]
[:reasoning :string]
[:confidence [:double {:min 0.0 :max 1.0}]]])
(defn analyze-email
"LLM triage: determines extractable items from email."
[{:keys [provider model]} {:keys [from subject body-text body-html attachments]}]
(let [prompt (build-triage-prompt from subject body-text attachments)
response (llm/complete provider model prompt {:response-format :json})]
(parse-triage-response response)))
(defn ^:private build-triage-prompt
[from subject body-preview attachments]
(str "You are an email triage system for invoice processing.\n\n"
"Analyze this email and determine what documents can be extracted.\n\n"
"Email details:\n"
"- From: " from "\n"
"- Subject: " subject "\n"
"- Attachments: " (format-attachments attachments) "\n"
"- Body preview (first 2000 chars):\n" (subs body-preview 0 (min 2000 (count body-preview))) "\n\n"
"Respond with JSON:\n"
"{\n"
" \"action\": \"process\" or \"skip\",\n"
" \"skip_reason\": \"spam\" | \"unsupported\" | \"no_relevant_content\" (if skipping),\n"
" \"extractable_items\": [\n"
" {\"kind\": \"attachment\", \"filename\": \"invoice.pdf\", \"document_type_hint\": \"invoice\"},\n"
" {\"kind\": \"body\", \"format\": \"html\", \"document_type_hint\": \"invoice\"}\n"
" ],\n"
" \"reasoning\": \"Brief explanation\",\n"
" \"confidence\": 0.0-1.0\n"
"}\n\n"
"Rules:\n"
"- Return extractable_items for EACH relevant document\n"
"- kind=attachment: PDF/image attachment that looks like an invoice\n"
"- kind=body: The email body itself IS an invoice (HTML invoice, not just a notification)\n"
"- kind=download_link: Direct PDF download link (no login required) - mark as unsupported for now\n"
"- Ignore signature images, logos, .sig/.ics/.vcf files\n"
"- Marketing emails, newsletters, notifications → skip with reason\n"))
Example triage outputs:
;; Email with invoice PDF attachment
{:action :process
:extractable-items [{:kind :attachment
:filename "invoice-2026-001.pdf"
:content-type "application/pdf"
:document-type-hint :invoice}]
:reasoning "PDF attachment with invoice naming pattern"
:confidence 0.95}
;; Email with HTML invoice in body (no attachments)
{:action :process
:extractable-items [{:kind :body
:format :html
:document-type-hint :invoice}]
:reasoning "Email body contains invoice details with line items and totals"
:confidence 0.88}
;; Email with multiple relevant attachments
{:action :process
:extractable-items [{:kind :attachment
:filename "invoice.pdf"
:document-type-hint :invoice}
{:kind :attachment
:filename "purchase-order.pdf"
:document-type-hint :purchase-order}]
:reasoning "Two relevant financial documents attached"
:confidence 0.92}
;; Marketing email - skip
{:action :skip
:skip-reason "spam"
:extractable-items []
:reasoning "Newsletter from marketing department"
:confidence 0.99}
Upload each extractable item to S3 and queue for ingestion.
(defn ^:private upload-and-queue!
"Uploads extractable item to S3 and queues for ingestion."
[{:keys [db-pool aws]} doc-source-id full-message item triage-result]
(let [{:keys [s3-client s3-bucket sqs-client ingestion-queue-url]} aws
{:keys [kind filename format]} item
;; Get content based on kind
{:keys [content content-type file-extension]}
(case kind
:attachment (get-attachment-content full-message filename)
:body {:content (get-body-content full-message format)
:content-type (if (= format :html) "text/html" "text/plain")
:file-extension (if (= format :html) "html" "txt")})
;; Compute content hash for deduplication
content-hash (sha256 content)
;; Create document record (upsert by content hash)
{:keys [document-id created?]}
(db/upsert-document! db-pool
{:tenant-id (get-tenant-id db-pool doc-source-id)
:content-hash content-hash
:file-extension file-extension
:original-name (or filename "email-body")})
;; Upload to S3 if new document
s3-path (str "documents/" document-id "." file-extension)]
(when created?
(aws/put-object! s3-client s3-bucket s3-path content content-type))
;; Create ingestion record
(let [ingestion-id (db/create-ingestion! db-pool
{:document-id document-id
:doc-source-id doc-source-id
:source-metadata {:provider "outlook"
:message-id (:id full-message)
:from (:from full-message)
:subject (:subject full-message)
:received-at (:received-at full-message)
:extractable-item item
:triage-confidence (:confidence triage-result)}})]
;; Queue for ingestion
(aws/send-message! sqs-client ingestion-queue-url (str ingestion-id)))))
-- Email-specific ap_doc_source extension
CREATE TABLE ap_doc_source_email (
doc_source_id UUID PRIMARY KEY REFERENCES ap_doc_source(id),
email_address TEXT NOT NULL UNIQUE,
provider email_provider NOT NULL, -- 'outlook' | 'gmail'
folder TEXT NOT NULL DEFAULT 'INBOX',
connection_status connection_status NOT NULL DEFAULT 'active',
provider_subscription_id TEXT, -- Outlook subscription UUID
subscription_expires_at TIMESTAMPTZ, -- When to renew (7 days max)
webhook_client_state TEXT, -- For Outlook validation
last_sync_token TEXT, -- deltaLink (Outlook) or historyId (Gmail)
last_successful_sync_at TIMESTAMPTZ
);
-- Coalescing and crash recovery for email syncs
CREATE TABLE ap_doc_source_email_pending_sync (
doc_source_id UUID PRIMARY KEY REFERENCES ap_doc_source(id),
status email_pending_sync_status NOT NULL DEFAULT 'pending',
claimed_at TIMESTAMPTZ, -- NULL when pending, set when claimed
attempts INTEGER NOT NULL DEFAULT 0,
last_error TEXT
);
-- Message deduplication and error tracking
CREATE TABLE ap_doc_source_email_processed_messages (
doc_source_id UUID NOT NULL REFERENCES ap_doc_source(id),
message_id TEXT NOT NULL, -- Provider message ID
processed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
status email_message_status NOT NULL, -- processed | rejected | errored
error_type TEXT, -- spam-sender, spam-subject, triage-failure, etc.
error_reason TEXT, -- Human-readable description
PRIMARY KEY (doc_source_id, message_id)
);
-- source_metadata stores triage result for body-sourced documents
-- Example:
-- {
-- "provider": "outlook",
-- "message_id": "AAMk...",
-- "from": "supplier@example.com",
-- "subject": "Invoice #12345",
-- "received_at": "2026-01-04T10:30:00Z",
-- "extractable_item": {
-- "kind": "body",
-- "format": "html",
-- "document_type_hint": "invoice"
-- },
-- "triage_confidence": 0.88
-- }
;; config.edn (simplified)
{:com.getorcha/aws
{:config {:region "eu-central-1"
:endpoint nil} ; MiniStack for dev/test
:queues {:ingestion "v1-orcha-global-ingest"
:acquisition "v1-orcha-global-email-acquire"}
:s3-buckets {:storage "v1-orcha-global-storage-{suffix}"
:ses-emails "v1-orcha-ses-emails-{suffix}"}}
:com.getorcha.workers.ap.acquisition/worker-pools
{:timeout-minutes 5}
:com.getorcha.workers.ap.acquisition/orchestrator
{:db-pool #integrant/ref :com.getorcha.db/pool
:aws #ref [:com.getorcha/aws]
:worker-pools #integrant/ref :com.getorcha.workers.ap.acquisition/worker-pools
:stuck-processing-timeout-seconds 300
:max-queue-messages 10
:wait-time-seconds 20
:providers {:outlook {:client-id "..."
:client-secret "..."
:redirect-uri "..."
:webhook-url "..."
:state-secret "..."
:token-path-pattern "/orcha/prod/email-tokens/"}}}
:com.getorcha.workers.ap.ingestion/orchestrator
{:db-pool #integrant/ref :com.getorcha.db/pool
:aws #ref [:com.getorcha/aws]
:worker-pools #integrant/ref :com.getorcha.workers.ap.ingestion/worker-pools
;; ... transcription, extraction config
}}
Individual message failures don't block the sync. Errors are recorded and processing continues.
(defn ^:private record-message-error!
[db-pool doc-source-id message-id ^Exception e]
(db.sql/execute-one!
db-pool
{:insert-into :ap-doc-source-email-processed-messages
:values [{:doc-source-id doc-source-id
:message-id message-id
:status [:cast "errored" :email-message-status]
:error-type (.getName (class e))
:error-reason (.getMessage e)}]}))
If the entire sync fails (token refresh, API error), the pending_sync row remains and will be retried.
(defn ^:private handle-sync-error!
[db-pool doc-source-id ^Exception e]
(log/error e "Sync failed" {:doc-source-id doc-source-id})
(db.sql/execute-one!
db-pool
{:update :ap-doc-source-email-pending-sync
:set {:status [:cast "pending" :email-pending-sync-status]
:claimed-at nil
:attempts [:+ :attempts 1]
:last-error (.getMessage e)}
:where [:= :doc-source-id doc-source-id]}))
If LLM triage fails, log the error and skip the message (don't block other messages).
(try
(triage/analyze-email llm-config full-message)
(catch Exception e
(log/warn e "Triage failed, skipping message" {:message-id (:id message)})
(record-message! db-pool doc-source-id (:id message) :errored "triage-failure")
nil)) ; Return nil to skip this message
For extractable items with :kind :body, the ingestion pipeline needs a special transcription path.
;; In ingestion worker
(defn ^:private transcribe!
[context {:keys [document source-metadata] :as ingestion}]
(let [extractable-item (get source-metadata :extractable-item)]
(if (= :body (:kind extractable-item))
;; HTML body: extract text directly, skip OCR
(html-transcription! context ingestion)
;; Normal document: existing transcription pipeline
(document-transcription! context ingestion))))
(defn ^:private html-transcription!
"Transcribes HTML email body to text."
[{:keys [s3-client s3-bucket]} {:keys [id document]}]
(let [html-content (aws/get-object s3-client s3-bucket (:file-path document))
text (html->text html-content)] ; Use jsoup or similar
{:method :html
:text text
:page-count 1
:quality-score 1.0 ; No OCR quality concerns
:started-at (Instant/now)
:ended-at (Instant/now)}))
| Metric | Description |
|---|---|
email_sync_duration_seconds |
Time from SQS receive to sync complete |
email_messages_processed_total |
Counter by status (processed, rejected, errored) |
email_triage_duration_seconds |
LLM triage latency |
email_triage_confidence |
Histogram of triage confidence scores |
email_sync_queue_depth |
SQS queue approximate message count |
;; Structured logging for key events
(log/info "Email sync started" {:doc-source-id doc-source-id})
(log/info "Messages fetched" {:doc-source-id doc-source-id :count (count messages)})
(log/info "Message triaged" {:message-id id :action action :confidence confidence})
(log/info "Document queued for ingestion" {:ingestion-id id :kind kind})
(log/info "Email sync completed" {:doc-source-id doc-source-id
:messages-processed n
:documents-queued m})
Outlook: Validate clientState using timing-safe comparison.
(defn timing-safe-equals?
"Constant-time string comparison to prevent timing attacks."
[^String a ^String b]
(java.security.MessageDigest/isEqual
(.getBytes a StandardCharsets/UTF_8)
(.getBytes b StandardCharsets/UTF_8)))
Gmail: Validate Pub/Sub verification token.
Moved email processing to Workers without LLM triage. Complete.
:queues :acquisition in config)workers/ap/acquisition/email.clj)Add LLM-based triage.
Support email body as document source.
:html transcription method| Term | Definition |
|---|---|
| Acquisition | Stage that fetches, triages, and uploads documents |
| Ingestion | Stage that transcribes and extracts structured data |
| Relevancy Filter | Fast heuristic to reject obvious non-invoices |
| Email Triage | LLM analysis to identify extractable documents |
| Delta Sync | Incremental sync using provider's change tracking API |
| Extractable Item | A document that can be extracted from an email (attachment or body) |
| Coalescing | Combining multiple webhooks into single sync operation |