Orcha Ingestion: Email Integration

Architecture for OAuth-based email provider integration (Outlook & Gmail).


Architecture Diagram

flowchart TB
    subgraph "Email Providers"
        OutlookAPI["Microsoft Graph API"]
        GmailAPI["Gmail API"]
    end

    subgraph "Push Notifications"
        MSWebhook["Outlook Webhook<br/>POST /webhooks/outlook"]
        PubSub["Google Pub/Sub"]
        GmailWebhook["Gmail Webhook<br/>POST /webhooks/gmail"]
    end

    subgraph "OAuth Flow"
        OAuthRedirect["OAuth Redirect<br/>/oauth/{provider}/authorize"]
        OAuthCallback["OAuth Callback<br/>/oauth/{provider}/callback"]
        SSM[("AWS SSM<br/>Encrypted Tokens")]
    end

    subgraph "Webhook Processing"
        Validate["Validate Request<br/>clientState / verification token"]
        Lookup["Lookup ap_doc_source<br/>by subscription_id / email"]
        Queue["Upsert pending sync<br/>ap_doc_source_email_pending_sync"]
    end

    subgraph "Sync Worker"
        Claim["Claim pending sync<br/>SELECT FOR UPDATE SKIP LOCKED"]
        RefreshToken["Refresh OAuth Token<br/>from SSM"]
        DeltaSync["Provider Sync API<br/>delta query / history.list"]
        Heuristic["Heuristic Filter<br/>accept/reject email"]
        FetchMsg["Fetch Full Message<br/>with attachments"]
    end

    subgraph "Existing Ingestion Pipeline"
        S3[("S3<br/>documents/{id}.pdf")]
        QueueIngestion["queue-for-ingestion!<br/>dedup + create records"]
        SQS[["SQS Queue"]]
        PG[("PostgreSQL<br/>ap_ingestion table")]
    end

    subgraph "Ingestion Worker"
        ClaimIng["claim-ingestion!"]
        LLMClassify["LLM Classification<br/>process vs skip"]
        Transcribe["transcribe!<br/>OCR pipeline"]
        Extract["extract!<br/>Claude LLM"]
        Complete["complete-ingestion!"]
    end

    subgraph "Notifications"
        Trigger["PostgreSQL NOTIFY"]
        SSE["SSE Events<br/>/documents/events"]
        Browser["Browser"]
    end

    %% OAuth Flow
    OAuthRedirect --> OutlookAPI
    OAuthRedirect --> GmailAPI
    OutlookAPI --> OAuthCallback
    GmailAPI --> OAuthCallback
    OAuthCallback --> SSM

    %% Push notification paths
    OutlookAPI -->|"subscription webhook"| MSWebhook
    GmailAPI -->|"watch notification"| PubSub
    PubSub --> GmailWebhook

    %% Webhook processing
    MSWebhook --> Validate
    GmailWebhook --> Validate
    Validate --> Lookup
    Lookup --> Queue
    Queue -->|"return 200 OK"| MSWebhook
    Queue -->|"return 200 OK"| GmailWebhook

    %% Sync worker flow
    Queue --> Claim
    Claim --> RefreshToken
    RefreshToken --> SSM
    RefreshToken --> DeltaSync
    DeltaSync --> OutlookAPI
    DeltaSync --> GmailAPI
    DeltaSync --> Heuristic
    Heuristic -->|"accepted"| FetchMsg
    Heuristic -->|"rejected"| Claim

    %% Into existing pipeline
    FetchMsg --> S3
    FetchMsg --> QueueIngestion
    QueueIngestion --> PG
    QueueIngestion --> SQS

    %% Ingestion worker
    SQS --> ClaimIng
    ClaimIng --> LLMClassify
    LLMClassify -->|"skip"| Complete
    LLMClassify -->|"process"| Transcribe
    Transcribe --> Extract
    Extract --> Complete

    %% Notifications
    Complete --> PG
    PG --> Trigger
    Trigger --> SSE
    SSE --> Browser

    %% Styling
    classDef provider fill:#e3f2fd,stroke:#1565c0
    classDef webhook fill:#fff3e0,stroke:#ef6c00
    classDef storage fill:#e8f5e9,stroke:#2e7d32
    classDef queue fill:#fce4ec,stroke:#c2185b
    classDef worker fill:#f3e5f5,stroke:#7b1fa2

    class OutlookAPI,GmailAPI provider
    class MSWebhook,GmailWebhook,PubSub webhook
    class S3,SSM,PG storage
    class SQS,Queue queue
    class Claim,RefreshToken,DeltaSync,Heuristic,FetchMsg,ClaimIng,LLMClassify,Transcribe,Extract,Complete worker

Data Flow Summary

  1. OAuth → User authorizes → tokens stored in SSM → subscription created
  2. Push → Provider notifies webhook → validated → queued in ap_doc_source_email_pending_sync
  3. Sync → Worker claims pending sync → refreshes token → fetches new messages via delta API
  4. Filter → Heuristic rejects obvious non-invoices → accepted emails have attachments uploaded to S3
  5. Ingestqueue-for-ingestion! creates records → SQS message sent
  6. Process → Ingestion worker claims → LLM classifies → transcribes → extracts → completes
  7. Notify → PostgreSQL trigger → SSE → browser updates

Provider Comparison

Capability Outlook (Microsoft Graph) Gmail
OAuth Microsoft identity platform Google OAuth 2.0
Scope Mail.Read gmail.readonly
Push notifications Webhook subscriptions (per-user) Pub/Sub (shared topic)
Sync API Delta query (/messages/delta) History API (history.list)
Sync watermark deltaLink token historyId (BIGINT)
Subscription lifetime 7 days (messages) 7 days
Message fetch GET /messages/{id} messages.get

Provider Abstraction Protocol

(defprotocol EmailProvider
  (create-subscription! [this doc-source-id]
    "Set up push notifications, return subscription metadata")
  (renew-subscription! [this doc-source-id]
    "Extend subscription before expiry")
  (sync-since! [this doc-source-id last-sync-token]
    "Fetch new messages since token, return messages + new token")
  (fetch-message [this message-id]
    "Get full message with attachments")
  (refresh-token! [this doc-source-id]
    "Refresh OAuth access token, return new token"))

The webhook handler and sync worker are provider-agnostic, delegating to protocol implementations. Provider-specific details (Pub/Sub topic, Graph subscriptions, token formats) are encapsulated in each implementation.


Webhook Processing

Webhooks must respond quickly to avoid provider timeouts. Processing is deferred to background workers.

Async Processing Pattern

Webhook Handler                    Background Worker
──────────────────                 ─────────────────
1. Validate request
2. Lookup ap_doc_source
3. Upsert pending_sync
4. Return 200 OK
                                   5. Claim pending (FOR UPDATE SKIP LOCKED)
                                   6. Refresh token
                                   7. Call delta/history API
                                   8. Process messages
                                   9. Delete pending row

Pending Sync Table

CREATE TYPE email_pending_sync_status AS ENUM ('pending', 'processing');

CREATE TABLE ap_doc_source_email_pending_sync (
    doc_source_id UUID PRIMARY KEY,  -- coalesces notifications
    claimed_at    TIMESTAMPTZ,       -- NULL when pending, set when worker claims
    attempts      INTEGER,
    last_error    TEXT,
    status        email_pending_sync_status DEFAULT 'pending'
);

Using doc_source_id as PK ensures multiple notifications coalesce into one sync operation.

Crash Recovery

The claim mechanism handles worker crashes:

State transitions:

Webhook arrives:  INSERT/UPDATE → status='pending', claimed_at=NULL
Worker claims:    UPDATE → status='processing', claimed_at=now()
Worker completes: DELETE
Worker crashes:   (stuck) → reclaimable after timeout

Webhook Security

Validating webhook authenticity is critical - without it, attackers could trigger syncs or inject forged notifications.

Outlook: clientState Verification

Microsoft Graph webhooks use a per-subscription clientState secret.

Flow:

  1. At subscription creation (OAuth callback):

  2. On webhook receipt:

Implementation:

;; Timing-safe comparison prevents timing attacks
(java.security.MessageDigest/isEqual
  (.getBytes incoming-client-state "UTF-8")
  (.getBytes stored-client-state "UTF-8"))

Why timing-safe? Regular string comparison (=) can leak information through timing differences. An attacker could probe character-by-character to discover the secret.

Database column: ap_doc_source_email.webhook_client_state (TEXT, nullable - unused for Gmail)

Gmail: Pub/Sub Verification (Proposal)

Gmail uses Google Cloud Pub/Sub for push notifications, which has different security characteristics.

Proposed approach:

  1. Static verification token:

  2. Subscription path validation:

  3. Email address verification:

Config structure:

;; In config.edn
{:gmail {:pubsub-verification-token #env "GMAIL_PUBSUB_TOKEN"
         :expected-subscription "projects/orcha/subscriptions/gmail-webhooks"}}

Webhook handler pseudocode:

(defn gmail-webhook [request]
  (let [token (get-in request [:headers "x-goog-pubsub-token"])
        subscription (get-in request [:body-params :subscription])
        data (decode-base64 (get-in request [:body-params :message :data]))
        email-address (:emailAddress data)]
    (cond
      ;; Verify static token
      (not= token config/pubsub-verification-token)
      (do (log/warn "Invalid Pub/Sub token") (respond 403))

      ;; Verify subscription path
      (not= subscription config/expected-subscription)
      (do (log/warn "Unknown subscription") (respond 403))

      ;; Lookup by email (not doc-source-id like Outlook)
      (nil? (lookup-by-email email-address))
      (do (log/warn "Unknown email address") (respond 200)) ; acknowledge to prevent retries

      :else
      (do (queue-sync-by-email! email-address)
          (respond 200)))))

Key differences from Outlook:

Aspect Outlook Gmail
Secret type Per-subscription (random) Static (config)
Stored in ap_doc_source_email.webhook_client_state Application config
Lookup key doc-source-id (from clientState) email address (from data)
Verification Compare clientState secret Token + subscription path

Why Polling Instead of Event-Driven

The sync worker uses a polling loop (default: every 5 seconds) rather than being triggered directly by webhooks. This was a deliberate choice.

Alternatives considered:

  1. Direct webhook→sync: Webhook spawns a virtual thread that immediately attempts to claim and process
  2. LISTEN/NOTIFY: Webhook sends PostgreSQL NOTIFY, worker wakes on LISTEN

Both alternatives would provide lower latency (immediate vs up to 5 seconds), but polling was chosen for:

Backpressure control: The poll interval acts as a natural throttle. If provider APIs are rate-limited or the system is under heavy load, increasing the interval provides immediate backpressure without code changes. Event-driven approaches would require explicit throttling mechanisms.

Simplicity: A single polling loop is easy to reason about, monitor, and debug. The "waste" of polling an empty table is negligible (~0.1ms per query, ~12 queries/minute).

Coalescing still works: The pending_sync table coalesces notifications regardless of trigger mechanism. Multiple webhooks arriving between polls result in one sync operation.

Known limitation - horizontal scaling: The current design has one global poller. With many tenants generating high webhook volume, a single poller becomes a bottleneck. Future options:

This is not an immediate concern - the single poller can handle significant load since actual work (API calls, token refresh) dominates poll overhead.


Two-Stage Classification

Emails pass through two relevancy filters:

Stage Location Method Purpose
1 Sync worker Heuristic Fast reject of obvious non-invoices
2 Ingestion worker LLM Accurate classification, can mark as "skipped"

Stage 1: Heuristic Filter

Quick check before uploading to S3:

Errs toward accepting (false positives acceptable, false negatives lose invoices).

Stage 2: LLM Classification

After attachments are in S3, the ingestion worker decides:


Attachment Extraction (Phase 2)

After the sync worker fetches new messages via delta query, it processes each message:

Processing Flow

For each message from delta sync:
1. Deduplication check → doc_source_email_processed_messages
2. Fetch full message → protocol/fetch-message
3. Heuristic filter → accept-email?
4. Attachment filter → PDF/PNG/JPG/TIFF, not inline, <25MB
5. Queue attachments → queue-for-ingestion!
6. Mark processed → insert into processed_messages

Message Deduplication

The ap_doc_source_email_processed_messages table tracks which messages have been fully processed:

CREATE TABLE ap_doc_source_email_processed_messages (
    doc_source_id UUID NOT NULL REFERENCES ap_doc_source(id) ON DELETE CASCADE,
    message_id    TEXT NOT NULL,  -- Provider message ID
    processed_at  TIMESTAMPTZ NOT NULL DEFAULT now(),
    PRIMARY KEY (doc_source_id, message_id)
);

This prevents re-processing when:

Attachment Filter Criteria

Attachments are filtered before uploading to S3:

Include Exclude
application/pdf Inline images (is-inline=true)
image/png Signature files (.sig, .p7s, .smime)
image/jpeg Calendar files (.ics, .vcf)
image/tiff Oversized files (>25MB)

Source Metadata

Each ingestion stores email context for traceability:

{
  "provider": "outlook",
  "message_id": "AAMkAGI2...",
  "from": "supplier@example.com",
  "subject": "Invoice #12345",
  "received_at": "2026-01-02T10:30:00Z",
  "attachment_filename": "invoice.pdf"
}

Connection Health

Status Tracking

CREATE TYPE email_connection_status AS ENUM (
    'active',              -- working normally
    'token_refresh_failed', -- needs re-authorization
    'subscription_expired', -- webhook subscription lapsed
    'provider_error'        -- API returning errors
);

Subscription Renewal

Token Refresh


Database Schema Extensions

ap_doc_source_email additions

Column Type Description
connection_status email_connection_status Current health state
subscription_expires_at TIMESTAMPTZ When to renew subscription
provider_subscription_id TEXT Outlook subscription ID
webhook_client_state TEXT Outlook clientState for verification (NULL for Gmail)
last_sync_token TEXT deltaLink or historyId
last_successful_sync_at TIMESTAMPTZ Health monitoring

ap_ingestion additions

Column Type Description
status = 'skipped' enum value For irrelevant attachments
skip_reason TEXT Why ingestion was skipped

Source Metadata

Email ingestions store metadata in ap_ingestion.source_metadata:

{
  "provider": "outlook",
  "message_id": "<abc123@mail.example.com>",
  "from": "supplier@example.com",
  "subject": "Invoice #12345 - January 2026",
  "received_at": "2026-01-02T10:30:00Z",
  "attachment_filename": "invoice-12345.pdf"
}

Deduplication

Three-layer deduplication:

Layer Mechanism Prevents
Sync token last_sync_token watermark Re-fetching seen emails
Message ID Track per ap_doc_source Re-processing same email
Content hash queue-for-ingestion! Duplicate S3 uploads