Email Acquisition Architecture

Comprehensive architecture for email-based document acquisition in the Orcha system.


Overview

Email acquisition enables automatic extraction of invoices and financial documents from customer email inboxes. The system:

  1. Receives push notifications when new emails arrive
  2. Fetches email content via provider APIs (Microsoft Graph, Gmail)
  3. Triages emails using LLM to identify extractable documents
  4. Uploads documents to S3
  5. Queues documents for ingestion (transcription + extraction)

Key principle: Acquisition and Ingestion are separate concerns.


Service Architecture

┌──────────────────────────────────────────────────────────────────────────────┐
│                               ERP SERVICE                                     │
│                                                                               │
│  ┌─────────────────────┐    ┌─────────────────────────────────────────────┐  │
│  │  OAuth Endpoints    │    │  Webhook Handlers                           │  │
│  │                     │    │                                             │  │
│  │  GET /oauth/        │    │  POST /webhooks/outlook                     │  │
│  │    outlook/         │    │    1. Validate clientState                  │  │
│  │    authorize        │    │    2. Look up ap_doc_source                 │  │
│  │                     │    │    3. Send SQS message                      │  │
│  │  GET /oauth/        │    │    4. Return 200 OK (~50ms)                 │  │
│  │    outlook/         │    │                                             │  │
│  │    callback         │    │  POST /webhooks/gmail                       │  │
│  │                     │    │    (similar, validates Pub/Sub token)       │  │
│  │  Stores tokens      │    │                                             │  │
│  │  in AWS SSM         │    └─────────────────────┬───────────────────────┘  │
│  └─────────────────────┘                          │                          │
│                                                   │                          │
└───────────────────────────────────────────────────┼──────────────────────────┘
                                                    │
                                          SQS: email-sync-queue
                                          Message: {doc_source_id: UUID}
                                                    │
                                                    ▼
┌──────────────────────────────────────────────────────────────────────────────┐
│                             WORKERS SERVICE                                   │
│                                                                               │
│  ┌─────────────────────────────────────┐   ┌───────────────────────────────┐ │
│  │  Acquisition Orchestrator           │   │  Ingestion Orchestrator       │ │
│  │  (::ap.acquisition/orchestrator)    │   │  (::ap.ingestion/orchestrator)   │ │
│  │                                     │   │                               │ │
│  │  • Polls acquisition SQS queue      │   │  • Polls ingestion SQS queue  │ │
│  │  • Dispatches to vthread pool       │   │  • Dispatches to vthread pool │ │
│  │  • Simpler: no heartbeat/preproc    │   │  • Handles visibility beat    │ │
│  └────────────────┬────────────────────┘   └───────────────┬───────────────┘ │
│                   │                                        │                  │
│                   ▼                                        ▼                  │
│  ┌─────────────────────────────────────┐   ┌───────────────────────────────┐ │
│  │  Email Acquisition Worker           │   │  Ingestion Worker             │ │
│  │                                     │   │                               │ │
│  │  1. Claim inbox (coalesce)          │   │  1. Claim ingestion           │ │
│  │  2. Load/refresh OAuth token        │   │  2. Fetch from S3             │ │
│  │  3. Delta sync (fetch new msgs)     │   │  3. Transcribe (OCR/PDF/HTML) │ │
│  │  4. For each message:               │   │  4. Extract (LLM)             │ │
│  │     a. Relevancy filter             │   │  5. Validate & complete       │ │
│  │     b. LLM triage                   │   │                               │ │
│  │     c. Upload to S3                 │   │  Assumes input is valid       │ │
│  │     d. Queue ingestion              │   │                               │ │
│  │  5. Update sync token               │   │                               │ │
│  │  6. Delete SQS message              │   │                               │ │
│  └────────────────┬────────────────────┘   └───────────────────────────────┘ │
│                   │                                        ▲                  │
│                   │         SQS: ingestion-queue           │                  │
│                   └────────────────────────────────────────┘                  │
│                                                                               │
└──────────────────────────────────────────────────────────────────────────────┘

Namespace Organization

com.getorcha.workers
├── ap/
│   ├── acquisition.clj                ; AP acquisition orchestrator (poller, job dispatch)
│   ├── acquisition/
│   │   ├── email.clj                  ; Email-specific acquisition logic
│   │   ├── email/
│   │   │   ├── outlook.clj            ; Microsoft Graph provider
│   │   │   └── gmail.clj              ; Gmail API provider (future)
│   │   └── triage.clj                 ; LLM email triage
│   │
│   ├── ingestion.clj                  ; AP ingestion orchestrator (poller, job dispatch)
│   └── ingestion/
│       ├── transcription.clj          ; Document → text
│       └── extraction.clj             ; Text → structured data

Data Flow: Email Arrival to Structured Data

Step 1: Webhook Notification

Provider sends push notification when new email arrives.

;; Outlook webhook payload
{:value [{:subscriptionId "abc123"
          :changeType "created"
          :resource "Users/{user-id}/Messages/{message-id}"
          :clientState "{doc-source-id}:{secret}"}]}

;; Gmail Pub/Sub payload
{:message {:data "<base64: {emailAddress, historyId}>"
           :messageId "..."}}

Webhook handler (ERP service):

(defn outlook-webhook-handler
  [{:keys [db-pool sqs-client queue-url]} request]
  ;; 1. Handle validation challenge (Microsoft sends on subscription creation)
  (when-let [validation-token (get-in request [:params :validationToken])]
    (return {:status 200 :body validation-token}))

  ;; 2. Parse and validate notifications
  (let [notifications (get-in request [:body :value])]
    (doseq [{:keys [clientState]} notifications]
      ;; 3. Validate clientState (timing-safe comparison)
      (let [[doc-source-id secret] (parse-client-state clientState)
            stored-state (db/get-client-state db-pool doc-source-id)]
        (when (timing-safe-equals? clientState stored-state)
          ;; 4. Queue to SQS
          (aws/send-message! sqs-client queue-url (str doc-source-id))))))

  ;; 5. Return immediately
  {:status 202})

Step 2: Acquisition Orchestrator

Polls SQS queue and dispatches to worker pool.

(defmethod ig/init-key ::orchestrator
  [_ {:keys [aws worker-pools] :as config}]
  (let [{:keys [email-sync-queue-name]} aws
        {:keys [executor]}              worker-pools
        polling?                        (atom true)
        sqs-client                      (aws/build-sqs-client (:config aws))
        queue-url                       (aws/get-queue-url sqs-client email-sync-queue-name)]

    ;; Start polling loop on virtual thread
    (.submit executor
      (fn []
        (while @polling?
          (let [messages (aws/receive-messages sqs-client queue-url
                           {:max-messages 10
                            :wait-time-seconds 20})]
            (doseq [message messages]
              ;; Dispatch each message to worker pool
              (.submit executor (acquisition-job-handler config message)))))))

    ;; Return shutdown fn
    {:shutdown! #(reset! polling? false)}))

Step 3: Inbox Claim & Coalescing

Multiple webhooks for the same inbox coalesce into one sync.

(defn ^:private claim-inbox!
  "Claims inbox for processing. Uses pending_sync table for coalescing.
   Returns claimed doc-source-id or nil if already processing."
  [db-pool doc-source-id timeout-seconds]
  (db.sql/execute-one!
    db-pool
    {:update :ap-doc-source-email-pending-sync
     :set    {:status     [:cast "processing" :email-pending-sync-status]
              :claimed-at [:now]}
     :where  [:and
              [:= :doc-source-id doc-source-id]
              [:or
               [:= :status [:cast "pending" :email-pending-sync-status]]
               ;; Crash recovery: reclaim stale processing rows
               [:and
                [:= :status [:cast "processing" :email-pending-sync-status]]
                [:< :claimed-at
                 [:- [:now] [:raw (str "interval '" timeout-seconds " seconds'")]]]]]]
     :returning [:doc-source-id]}))

;; Before claiming, upsert to ensure row exists
(defn ^:private ensure-pending-sync!
  "Creates or updates pending sync row. Coalesces multiple webhooks."
  [db-pool doc-source-id]
  (db.sql/execute-one!
    db-pool
    {:insert-into   :ap-doc-source-email-pending-sync
     :values        [{:doc-source-id doc-source-id
                      :status        [:cast "pending" :email-pending-sync-status]}]
     :on-conflict   [:doc-source-id]
     :do-update-set {:status     [:cast "pending" :email-pending-sync-status]
                     :claimed-at nil}}))

Coalescing behavior:

Step 4: Token Management

Load OAuth tokens from SSM, refresh if expiring soon.

(defn ^:private ensure-access-token!
  "Returns valid access token, refreshing if needed."
  [{:keys [ssm-client provider-config]} doc-source-id]
  (let [token-path (str (:token-path-pattern provider-config) doc-source-id)
        {:keys [access-token refresh-token expires-at]}
        (ssm/get-parameter-json ssm-client token-path)]

    ;; Proactive refresh: refresh if < 5 minutes to expiry
    (if (token-valid-for? expires-at (Duration/ofMinutes 5))
      access-token
      (let [new-tokens (outlook/refresh-tokens! provider-config refresh-token)]
        (ssm/put-parameter-json! ssm-client token-path new-tokens)
        (:access-token new-tokens)))))

Step 5: Delta Sync

Fetch new messages since last sync using provider's delta API.

;; Outlook delta sync
(defn sync-messages!
  "Fetches new messages using delta query. Returns {:messages [...] :delta-link \"...\"}."
  [access-token last-delta-link]
  (let [url (or last-delta-link
                "https://graph.microsoft.com/v1.0/me/mailFolders('Inbox')/messages/delta?$select=id,subject,from,receivedDateTime,hasAttachments")]
    (loop [url url
           all-messages []]
      (let [response (http/get url {:oauth-token access-token})
            messages (get-in response [:body :value])
            next-link (get-in response [:body "@odata.nextLink"])
            delta-link (get-in response [:body "@odata.deltaLink"])]
        (if next-link
          (recur next-link (into all-messages messages))
          {:messages (into all-messages messages)
           :delta-link delta-link})))))

Step 6: Message Processing Pipeline

For each new message: relevancy filter → LLM triage → upload → queue.

(defn ^:private process-messages!
  "Processes messages through filter → triage → upload → queue pipeline."
  [{:keys [db-pool aws llm-config] :as context}
   {:keys [doc-source-id access-token]}
   messages]
  (doseq [message messages]
    (let [message-id (:id message)]
      ;; Skip if already processed (deduplication)
      (when-not (message-processed? db-pool doc-source-id message-id)
        (try
          (process-message! context doc-source-id access-token message)
          (catch Exception e
            ;; Log error, continue with next message
            (record-message-error! db-pool doc-source-id message-id e)))))))

(defn ^:private process-message!
  "Processes single message through pipeline."
  [{:keys [db-pool aws llm-config] :as context}
   doc-source-id access-token message]
  (let [;; Fetch full message with body and attachment metadata
        full-message (outlook/fetch-message access-token (:id message))

        ;; Step 6a: Relevancy filter (fast heuristic)
        filter-result (relevant-email? full-message)]

    (if-not (:relevant? filter-result)
      ;; Rejected by filter - record and skip
      (record-message! db-pool doc-source-id (:id message)
                       :rejected (:reason filter-result))

      ;; Step 6b: LLM triage
      (let [triage-result (triage/analyze-email llm-config full-message)]
        (if (empty? (:extractable-items triage-result))
          ;; Nothing extractable - record and skip
          (record-message! db-pool doc-source-id (:id message)
                           :rejected (:skip-reason triage-result))

          ;; Step 6c & 6d: Upload and queue each extractable item
          (do
            (doseq [item (:extractable-items triage-result)]
              (upload-and-queue! context doc-source-id full-message item triage-result))
            (record-message! db-pool doc-source-id (:id message) :processed nil)))))))

Step 7: Relevancy Filter

Fast heuristic check to reject obvious non-invoices before LLM call.

(def ^:private spam-sender-patterns
  #{"newsletter" "no-reply" "noreply" "marketing" "update" "digest"
    "subscriptions" "info@" "promo" "notification"})

(def ^:private spam-subject-patterns
  #{"deal" "offer" "sale" "weekly" "monthly" "digest" "newsletter"
    "update" "promotion" "unsubscribe"})

(def ^:private invoice-keywords
  #{"invoice" "rechnung" "bill" "factura" "receipt" "quittung" "faktura"})

(defn relevant-email?
  "Quick heuristic check. Returns {:relevant? bool :reason string}."
  [{:keys [from subject body attachments]}]
  (let [from-lower    (str/lower-case (or from ""))
        subject-lower (str/lower-case (or subject ""))
        has-invoice-keyword? (some #(str/includes? subject-lower %) invoice-keywords)
        has-document-attachments? (some document-attachment? attachments)]

    (cond
      ;; Reject spam senders first (unless invoice keyword present)
      (some #(str/includes? from-lower %) spam-sender-patterns)
      {:relevant? false :reason "spam-sender"}

      ;; Reject spam subjects
      (some #(str/includes? subject-lower %) spam-subject-patterns)
      {:relevant? false :reason "spam-subject"}

      ;; Accept if has document attachments
      has-document-attachments?
      {:relevant? true :reason "has-document-attachments"}

      ;; Accept if subject has invoice keywords
      has-invoice-keyword?
      {:relevant? true :reason "invoice-keyword-in-subject"}

      ;; Accept if body has financial keywords
      (>= (count-financial-keywords body) 3)
      {:relevant? true :reason "financial-keywords-in-body"}

      ;; Default: reject
      :else
      {:relevant? false :reason "no-invoice-signals"})))

Step 8: LLM Email Triage

Determines what documents can be extracted from the email.

(def TriageResult
  "Schema for triage output."
  [:map
   [:action [:enum :process :skip]]
   [:skip-reason {:optional true} [:maybe :string]]
   [:extractable-items
    [:vector
     [:map
      [:kind [:enum :attachment :body :download-link]]
      [:filename {:optional true} :string]
      [:content-type {:optional true} :string]
      [:format {:optional true} [:enum :html :text]]
      [:url {:optional true} :string]
      [:document-type-hint {:optional true} [:enum :invoice :purchase-order :receipt :unknown]]]]]
   [:reasoning :string]
   [:confidence [:double {:min 0.0 :max 1.0}]]])

(defn analyze-email
  "LLM triage: determines extractable items from email."
  [{:keys [provider model]} {:keys [from subject body-text body-html attachments]}]
  (let [prompt (build-triage-prompt from subject body-text attachments)
        response (llm/complete provider model prompt {:response-format :json})]
    (parse-triage-response response)))

(defn ^:private build-triage-prompt
  [from subject body-preview attachments]
  (str "You are an email triage system for invoice processing.\n\n"
       "Analyze this email and determine what documents can be extracted.\n\n"
       "Email details:\n"
       "- From: " from "\n"
       "- Subject: " subject "\n"
       "- Attachments: " (format-attachments attachments) "\n"
       "- Body preview (first 2000 chars):\n" (subs body-preview 0 (min 2000 (count body-preview))) "\n\n"
       "Respond with JSON:\n"
       "{\n"
       "  \"action\": \"process\" or \"skip\",\n"
       "  \"skip_reason\": \"spam\" | \"unsupported\" | \"no_relevant_content\" (if skipping),\n"
       "  \"extractable_items\": [\n"
       "    {\"kind\": \"attachment\", \"filename\": \"invoice.pdf\", \"document_type_hint\": \"invoice\"},\n"
       "    {\"kind\": \"body\", \"format\": \"html\", \"document_type_hint\": \"invoice\"}\n"
       "  ],\n"
       "  \"reasoning\": \"Brief explanation\",\n"
       "  \"confidence\": 0.0-1.0\n"
       "}\n\n"
       "Rules:\n"
       "- Return extractable_items for EACH relevant document\n"
       "- kind=attachment: PDF/image attachment that looks like an invoice\n"
       "- kind=body: The email body itself IS an invoice (HTML invoice, not just a notification)\n"
       "- kind=download_link: Direct PDF download link (no login required) - mark as unsupported for now\n"
       "- Ignore signature images, logos, .sig/.ics/.vcf files\n"
       "- Marketing emails, newsletters, notifications → skip with reason\n"))

Example triage outputs:

;; Email with invoice PDF attachment
{:action :process
 :extractable-items [{:kind :attachment
                      :filename "invoice-2026-001.pdf"
                      :content-type "application/pdf"
                      :document-type-hint :invoice}]
 :reasoning "PDF attachment with invoice naming pattern"
 :confidence 0.95}

;; Email with HTML invoice in body (no attachments)
{:action :process
 :extractable-items [{:kind :body
                      :format :html
                      :document-type-hint :invoice}]
 :reasoning "Email body contains invoice details with line items and totals"
 :confidence 0.88}

;; Email with multiple relevant attachments
{:action :process
 :extractable-items [{:kind :attachment
                      :filename "invoice.pdf"
                      :document-type-hint :invoice}
                     {:kind :attachment
                      :filename "purchase-order.pdf"
                      :document-type-hint :purchase-order}]
 :reasoning "Two relevant financial documents attached"
 :confidence 0.92}

;; Marketing email - skip
{:action :skip
 :skip-reason "spam"
 :extractable-items []
 :reasoning "Newsletter from marketing department"
 :confidence 0.99}

Step 9: Upload & Queue

Upload each extractable item to S3 and queue for ingestion.

(defn ^:private upload-and-queue!
  "Uploads extractable item to S3 and queues for ingestion."
  [{:keys [db-pool aws]} doc-source-id full-message item triage-result]
  (let [{:keys [s3-client s3-bucket sqs-client ingestion-queue-url]} aws
        {:keys [kind filename format]} item

        ;; Get content based on kind
        {:keys [content content-type file-extension]}
        (case kind
          :attachment (get-attachment-content full-message filename)
          :body       {:content      (get-body-content full-message format)
                       :content-type (if (= format :html) "text/html" "text/plain")
                       :file-extension (if (= format :html) "html" "txt")})

        ;; Compute content hash for deduplication
        content-hash (sha256 content)

        ;; Create document record (upsert by content hash)
        {:keys [document-id created?]}
        (db/upsert-document! db-pool
          {:tenant-id     (get-tenant-id db-pool doc-source-id)
           :content-hash  content-hash
           :file-extension file-extension
           :original-name (or filename "email-body")})

        ;; Upload to S3 if new document
        s3-path (str "documents/" document-id "." file-extension)]

    (when created?
      (aws/put-object! s3-client s3-bucket s3-path content content-type))

    ;; Create ingestion record
    (let [ingestion-id (db/create-ingestion! db-pool
                         {:document-id     document-id
                          :doc-source-id   doc-source-id
                          :source-metadata {:provider            "outlook"
                                            :message-id          (:id full-message)
                                            :from                (:from full-message)
                                            :subject             (:subject full-message)
                                            :received-at         (:received-at full-message)
                                            :extractable-item    item
                                            :triage-confidence   (:confidence triage-result)}})]

      ;; Queue for ingestion
      (aws/send-message! sqs-client ingestion-queue-url (str ingestion-id)))))

Database Schema

Core Tables

-- Email-specific ap_doc_source extension
CREATE TABLE ap_doc_source_email (
    doc_source_id             UUID PRIMARY KEY REFERENCES ap_doc_source(id),
    email_address             TEXT NOT NULL UNIQUE,
    provider                  email_provider NOT NULL,  -- 'outlook' | 'gmail'
    folder                    TEXT NOT NULL DEFAULT 'INBOX',
    connection_status         connection_status NOT NULL DEFAULT 'active',
    provider_subscription_id  TEXT,           -- Outlook subscription UUID
    subscription_expires_at   TIMESTAMPTZ,    -- When to renew (7 days max)
    webhook_client_state      TEXT,           -- For Outlook validation
    last_sync_token           TEXT,           -- deltaLink (Outlook) or historyId (Gmail)
    last_successful_sync_at   TIMESTAMPTZ
);

-- Coalescing and crash recovery for email syncs
CREATE TABLE ap_doc_source_email_pending_sync (
    doc_source_id  UUID PRIMARY KEY REFERENCES ap_doc_source(id),
    status         email_pending_sync_status NOT NULL DEFAULT 'pending',
    claimed_at     TIMESTAMPTZ,      -- NULL when pending, set when claimed
    attempts       INTEGER NOT NULL DEFAULT 0,
    last_error     TEXT
);

-- Message deduplication and error tracking
CREATE TABLE ap_doc_source_email_processed_messages (
    doc_source_id  UUID NOT NULL REFERENCES ap_doc_source(id),
    message_id     TEXT NOT NULL,    -- Provider message ID
    processed_at   TIMESTAMPTZ NOT NULL DEFAULT now(),
    status         email_message_status NOT NULL,  -- processed | rejected | errored
    error_type     TEXT,             -- spam-sender, spam-subject, triage-failure, etc.
    error_reason   TEXT,             -- Human-readable description
    PRIMARY KEY (doc_source_id, message_id)
);

Ingestion Table Updates

-- source_metadata stores triage result for body-sourced documents
-- Example:
-- {
--   "provider": "outlook",
--   "message_id": "AAMk...",
--   "from": "supplier@example.com",
--   "subject": "Invoice #12345",
--   "received_at": "2026-01-04T10:30:00Z",
--   "extractable_item": {
--     "kind": "body",
--     "format": "html",
--     "document_type_hint": "invoice"
--   },
--   "triage_confidence": 0.88
-- }

Integrant Configuration

;; config.edn (simplified)
{:com.getorcha/aws
 {:config    {:region   "eu-central-1"
              :endpoint nil}  ; MiniStack for dev/test
  :queues     {:ingestion   "v1-orcha-global-ingest"
               :acquisition "v1-orcha-global-email-acquire"}
  :s3-buckets {:storage    "v1-orcha-global-storage-{suffix}"
               :ses-emails "v1-orcha-ses-emails-{suffix}"}}

 :com.getorcha.workers.ap.acquisition/worker-pools
 {:timeout-minutes 5}

 :com.getorcha.workers.ap.acquisition/orchestrator
 {:db-pool                          #integrant/ref :com.getorcha.db/pool
  :aws                              #ref [:com.getorcha/aws]
  :worker-pools                     #integrant/ref :com.getorcha.workers.ap.acquisition/worker-pools
  :stuck-processing-timeout-seconds 300
  :max-queue-messages               10
  :wait-time-seconds                20
  :providers                        {:outlook {:client-id     "..."
                                               :client-secret "..."
                                               :redirect-uri  "..."
                                               :webhook-url   "..."
                                               :state-secret  "..."
                                               :token-path-pattern "/orcha/prod/email-tokens/"}}}

 :com.getorcha.workers.ap.ingestion/orchestrator
 {:db-pool      #integrant/ref :com.getorcha.db/pool
  :aws          #ref [:com.getorcha/aws]
  :worker-pools #integrant/ref :com.getorcha.workers.ap.ingestion/worker-pools
  ;; ... transcription, extraction config
  }}

Error Handling

Message-Level Errors

Individual message failures don't block the sync. Errors are recorded and processing continues.

(defn ^:private record-message-error!
  [db-pool doc-source-id message-id ^Exception e]
  (db.sql/execute-one!
    db-pool
    {:insert-into :ap-doc-source-email-processed-messages
     :values      [{:doc-source-id doc-source-id
                    :message-id    message-id
                    :status        [:cast "errored" :email-message-status]
                    :error-type    (.getName (class e))
                    :error-reason  (.getMessage e)}]}))

Sync-Level Errors

If the entire sync fails (token refresh, API error), the pending_sync row remains and will be retried.

(defn ^:private handle-sync-error!
  [db-pool doc-source-id ^Exception e]
  (log/error e "Sync failed" {:doc-source-id doc-source-id})
  (db.sql/execute-one!
    db-pool
    {:update :ap-doc-source-email-pending-sync
     :set    {:status     [:cast "pending" :email-pending-sync-status]
              :claimed-at nil
              :attempts   [:+ :attempts 1]
              :last-error (.getMessage e)}
     :where  [:= :doc-source-id doc-source-id]}))

Triage Failures

If LLM triage fails, log the error and skip the message (don't block other messages).

(try
  (triage/analyze-email llm-config full-message)
  (catch Exception e
    (log/warn e "Triage failed, skipping message" {:message-id (:id message)})
    (record-message! db-pool doc-source-id (:id message) :errored "triage-failure")
    nil))  ; Return nil to skip this message

HTML Body Transcription

For extractable items with :kind :body, the ingestion pipeline needs a special transcription path.

;; In ingestion worker
(defn ^:private transcribe!
  [context {:keys [document source-metadata] :as ingestion}]
  (let [extractable-item (get source-metadata :extractable-item)]
    (if (= :body (:kind extractable-item))
      ;; HTML body: extract text directly, skip OCR
      (html-transcription! context ingestion)
      ;; Normal document: existing transcription pipeline
      (document-transcription! context ingestion))))

(defn ^:private html-transcription!
  "Transcribes HTML email body to text."
  [{:keys [s3-client s3-bucket]} {:keys [id document]}]
  (let [html-content (aws/get-object s3-client s3-bucket (:file-path document))
        text         (html->text html-content)]  ; Use jsoup or similar
    {:method      :html
     :text        text
     :page-count  1
     :quality-score 1.0  ; No OCR quality concerns
     :started-at  (Instant/now)
     :ended-at    (Instant/now)}))

Metrics & Observability

Key Metrics

Metric Description
email_sync_duration_seconds Time from SQS receive to sync complete
email_messages_processed_total Counter by status (processed, rejected, errored)
email_triage_duration_seconds LLM triage latency
email_triage_confidence Histogram of triage confidence scores
email_sync_queue_depth SQS queue approximate message count

Logging

;; Structured logging for key events
(log/info "Email sync started" {:doc-source-id doc-source-id})
(log/info "Messages fetched" {:doc-source-id doc-source-id :count (count messages)})
(log/info "Message triaged" {:message-id id :action action :confidence confidence})
(log/info "Document queued for ingestion" {:ingestion-id id :kind kind})
(log/info "Email sync completed" {:doc-source-id doc-source-id
                                   :messages-processed n
                                   :documents-queued m})

Security Considerations

Webhook Validation

Outlook: Validate clientState using timing-safe comparison.

(defn timing-safe-equals?
  "Constant-time string comparison to prevent timing attacks."
  [^String a ^String b]
  (java.security.MessageDigest/isEqual
    (.getBytes a StandardCharsets/UTF_8)
    (.getBytes b StandardCharsets/UTF_8)))

Gmail: Validate Pub/Sub verification token.

Token Storage

Content Handling


Testing Strategy

Unit Tests

Integration Tests

Contract Tests


Implementation Phases

Phase 3: Email Acquisition Infrastructure ✓

Moved email processing to Workers without LLM triage. Complete.

  1. ✓ Created acquisition SQS queue (:queues :acquisition in config)
  2. ✓ Updated webhook to send SQS message
  3. ✓ Created acquisition orchestrator (simpler than ingestion - no heartbeat/preprocessing)
  4. ✓ Moved email code from ERP to Workers (workers/ap/acquisition/email.clj)
  5. ✓ Verified identical behavior

Phase 4: LLM Email Triage

Add LLM-based triage.

  1. Create triage module with prompt
  2. Integrate triage into message processing
  3. Handle body-only invoices (upload HTML)
  4. Add triage error handling

Phase 5: HTML Body Transcription

Support email body as document source.

  1. Add :html transcription method
  2. Detect body-sourced documents in pipeline
  3. Implement HTML text extraction

Glossary

Term Definition
Acquisition Stage that fetches, triages, and uploads documents
Ingestion Stage that transcribes and extracts structured data
Relevancy Filter Fast heuristic to reject obvious non-invoices
Email Triage LLM analysis to identify extractable documents
Delta Sync Incremental sync using provider's change tracking API
Extractable Item A document that can be extracted from an email (attachment or body)
Coalescing Combining multiple webhooks into single sync operation