Architecture for OAuth-based email provider integration (Outlook & Gmail).
flowchart TB
subgraph "Email Providers"
OutlookAPI["Microsoft Graph API"]
GmailAPI["Gmail API"]
end
subgraph "Push Notifications"
MSWebhook["Outlook Webhook<br/>POST /webhooks/outlook"]
PubSub["Google Pub/Sub"]
GmailWebhook["Gmail Webhook<br/>POST /webhooks/gmail"]
end
subgraph "OAuth Flow"
OAuthRedirect["OAuth Redirect<br/>/oauth/{provider}/authorize"]
OAuthCallback["OAuth Callback<br/>/oauth/{provider}/callback"]
SSM[("AWS SSM<br/>Encrypted Tokens")]
end
subgraph "Webhook Processing"
Validate["Validate Request<br/>clientState / verification token"]
Lookup["Lookup ap_doc_source<br/>by subscription_id / email"]
Queue["Upsert pending sync<br/>ap_doc_source_email_pending_sync"]
end
subgraph "Sync Worker"
Claim["Claim pending sync<br/>SELECT FOR UPDATE SKIP LOCKED"]
RefreshToken["Refresh OAuth Token<br/>from SSM"]
DeltaSync["Provider Sync API<br/>delta query / history.list"]
Heuristic["Heuristic Filter<br/>accept/reject email"]
FetchMsg["Fetch Full Message<br/>with attachments"]
end
subgraph "Existing Ingestion Pipeline"
S3[("S3<br/>documents/{id}.pdf")]
QueueIngestion["queue-for-ingestion!<br/>dedup + create records"]
SQS[["SQS Queue"]]
PG[("PostgreSQL<br/>ap_ingestion table")]
end
subgraph "Ingestion Worker"
ClaimIng["claim-ingestion!"]
LLMClassify["LLM Classification<br/>process vs skip"]
Transcribe["transcribe!<br/>OCR pipeline"]
Extract["extract!<br/>Claude LLM"]
Complete["complete-ingestion!"]
end
subgraph "Notifications"
Trigger["PostgreSQL NOTIFY"]
SSE["SSE Events<br/>/documents/events"]
Browser["Browser"]
end
%% OAuth Flow
OAuthRedirect --> OutlookAPI
OAuthRedirect --> GmailAPI
OutlookAPI --> OAuthCallback
GmailAPI --> OAuthCallback
OAuthCallback --> SSM
%% Push notification paths
OutlookAPI -->|"subscription webhook"| MSWebhook
GmailAPI -->|"watch notification"| PubSub
PubSub --> GmailWebhook
%% Webhook processing
MSWebhook --> Validate
GmailWebhook --> Validate
Validate --> Lookup
Lookup --> Queue
Queue -->|"return 200 OK"| MSWebhook
Queue -->|"return 200 OK"| GmailWebhook
%% Sync worker flow
Queue --> Claim
Claim --> RefreshToken
RefreshToken --> SSM
RefreshToken --> DeltaSync
DeltaSync --> OutlookAPI
DeltaSync --> GmailAPI
DeltaSync --> Heuristic
Heuristic -->|"accepted"| FetchMsg
Heuristic -->|"rejected"| Claim
%% Into existing pipeline
FetchMsg --> S3
FetchMsg --> QueueIngestion
QueueIngestion --> PG
QueueIngestion --> SQS
%% Ingestion worker
SQS --> ClaimIng
ClaimIng --> LLMClassify
LLMClassify -->|"skip"| Complete
LLMClassify -->|"process"| Transcribe
Transcribe --> Extract
Extract --> Complete
%% Notifications
Complete --> PG
PG --> Trigger
Trigger --> SSE
SSE --> Browser
%% Styling
classDef provider fill:#e3f2fd,stroke:#1565c0
classDef webhook fill:#fff3e0,stroke:#ef6c00
classDef storage fill:#e8f5e9,stroke:#2e7d32
classDef queue fill:#fce4ec,stroke:#c2185b
classDef worker fill:#f3e5f5,stroke:#7b1fa2
class OutlookAPI,GmailAPI provider
class MSWebhook,GmailWebhook,PubSub webhook
class S3,SSM,PG storage
class SQS,Queue queue
class Claim,RefreshToken,DeltaSync,Heuristic,FetchMsg,ClaimIng,LLMClassify,Transcribe,Extract,Complete worker
ap_doc_source_email_pending_syncqueue-for-ingestion! creates records → SQS message sent| Capability | Outlook (Microsoft Graph) | Gmail |
|---|---|---|
| OAuth | Microsoft identity platform | Google OAuth 2.0 |
| Scope | Mail.Read |
gmail.readonly |
| Push notifications | Webhook subscriptions (per-user) | Pub/Sub (shared topic) |
| Sync API | Delta query (/messages/delta) |
History API (history.list) |
| Sync watermark | deltaLink token |
historyId (BIGINT) |
| Subscription lifetime | 7 days (messages) | 7 days |
| Message fetch | GET /messages/{id} |
messages.get |
(defprotocol EmailProvider
(create-subscription! [this doc-source-id]
"Set up push notifications, return subscription metadata")
(renew-subscription! [this doc-source-id]
"Extend subscription before expiry")
(sync-since! [this doc-source-id last-sync-token]
"Fetch new messages since token, return messages + new token")
(fetch-message [this message-id]
"Get full message with attachments")
(refresh-token! [this doc-source-id]
"Refresh OAuth access token, return new token"))
The webhook handler and sync worker are provider-agnostic, delegating to protocol implementations. Provider-specific details (Pub/Sub topic, Graph subscriptions, token formats) are encapsulated in each implementation.
Webhooks must respond quickly to avoid provider timeouts. Processing is deferred to background workers.
Webhook Handler Background Worker
────────────────── ─────────────────
1. Validate request
2. Lookup ap_doc_source
3. Upsert pending_sync
4. Return 200 OK
5. Claim pending (FOR UPDATE SKIP LOCKED)
6. Refresh token
7. Call delta/history API
8. Process messages
9. Delete pending row
CREATE TYPE email_pending_sync_status AS ENUM ('pending', 'processing');
CREATE TABLE ap_doc_source_email_pending_sync (
doc_source_id UUID PRIMARY KEY, -- coalesces notifications
claimed_at TIMESTAMPTZ, -- NULL when pending, set when worker claims
attempts INTEGER,
last_error TEXT,
status email_pending_sync_status DEFAULT 'pending'
);
Using doc_source_id as PK ensures multiple notifications coalesce into one sync operation.
The claim mechanism handles worker crashes:
claimed_at are owned by an active workerclaimed_at (older than stuck-processing-timeout-seconds) are considered crashed and can be reclaimedState transitions:
Webhook arrives: INSERT/UPDATE → status='pending', claimed_at=NULL
Worker claims: UPDATE → status='processing', claimed_at=now()
Worker completes: DELETE
Worker crashes: (stuck) → reclaimable after timeout
Validating webhook authenticity is critical - without it, attackers could trigger syncs or inject forged notifications.
Microsoft Graph webhooks use a per-subscription clientState secret.
Flow:
At subscription creation (OAuth callback):
generate-client-state(doc-source-id) → "{doc-source-id}:{random-bytes}"doc_source_email.webhook_client_stateOn webhook receipt:
clientState in every notificationImplementation:
;; Timing-safe comparison prevents timing attacks
(java.security.MessageDigest/isEqual
(.getBytes incoming-client-state "UTF-8")
(.getBytes stored-client-state "UTF-8"))
Why timing-safe? Regular string comparison (=) can leak information through timing differences. An attacker could probe character-by-character to discover the secret.
Database column: ap_doc_source_email.webhook_client_state (TEXT, nullable - unused for Gmail)
Gmail uses Google Cloud Pub/Sub for push notifications, which has different security characteristics.
Proposed approach:
Static verification token:
Subscription path validation:
subscription field matches expected pathprojects/{project}/subscriptions/{subscription-name}Email address verification:
emailAddress in decoded data fielddoc_source_email tableConfig structure:
;; In config.edn
{:gmail {:pubsub-verification-token #env "GMAIL_PUBSUB_TOKEN"
:expected-subscription "projects/orcha/subscriptions/gmail-webhooks"}}
Webhook handler pseudocode:
(defn gmail-webhook [request]
(let [token (get-in request [:headers "x-goog-pubsub-token"])
subscription (get-in request [:body-params :subscription])
data (decode-base64 (get-in request [:body-params :message :data]))
email-address (:emailAddress data)]
(cond
;; Verify static token
(not= token config/pubsub-verification-token)
(do (log/warn "Invalid Pub/Sub token") (respond 403))
;; Verify subscription path
(not= subscription config/expected-subscription)
(do (log/warn "Unknown subscription") (respond 403))
;; Lookup by email (not doc-source-id like Outlook)
(nil? (lookup-by-email email-address))
(do (log/warn "Unknown email address") (respond 200)) ; acknowledge to prevent retries
:else
(do (queue-sync-by-email! email-address)
(respond 200)))))
Key differences from Outlook:
| Aspect | Outlook | Gmail |
|---|---|---|
| Secret type | Per-subscription (random) | Static (config) |
| Stored in | ap_doc_source_email.webhook_client_state |
Application config |
| Lookup key | doc-source-id (from clientState) | email address (from data) |
| Verification | Compare clientState secret | Token + subscription path |
The sync worker uses a polling loop (default: every 5 seconds) rather than being triggered directly by webhooks. This was a deliberate choice.
Alternatives considered:
Both alternatives would provide lower latency (immediate vs up to 5 seconds), but polling was chosen for:
Backpressure control: The poll interval acts as a natural throttle. If provider APIs are rate-limited or the system is under heavy load, increasing the interval provides immediate backpressure without code changes. Event-driven approaches would require explicit throttling mechanisms.
Simplicity: A single polling loop is easy to reason about, monitor, and debug. The "waste" of polling an empty table is negligible (~0.1ms per query, ~12 queries/minute).
Coalescing still works: The pending_sync table coalesces notifications regardless of trigger mechanism. Multiple webhooks arriving between polls result in one sync operation.
Known limitation - horizontal scaling: The current design has one global poller. With many tenants generating high webhook volume, a single poller becomes a bottleneck. Future options:
This is not an immediate concern - the single poller can handle significant load since actual work (API calls, token refresh) dominates poll overhead.
Emails pass through two relevancy filters:
| Stage | Location | Method | Purpose |
|---|---|---|---|
| 1 | Sync worker | Heuristic | Fast reject of obvious non-invoices |
| 2 | Ingestion worker | LLM | Accurate classification, can mark as "skipped" |
Quick check before uploading to S3:
<table> elementsErrs toward accepting (false positives acceptable, false negatives lose invoices).
After attachments are in S3, the ingestion worker decides:
fetch-files-from-s3! stage for email-sourced ingestionsskippedAfter the sync worker fetches new messages via delta query, it processes each message:
For each message from delta sync:
1. Deduplication check → doc_source_email_processed_messages
2. Fetch full message → protocol/fetch-message
3. Heuristic filter → accept-email?
4. Attachment filter → PDF/PNG/JPG/TIFF, not inline, <25MB
5. Queue attachments → queue-for-ingestion!
6. Mark processed → insert into processed_messages
The ap_doc_source_email_processed_messages table tracks which messages have been fully processed:
CREATE TABLE ap_doc_source_email_processed_messages (
doc_source_id UUID NOT NULL REFERENCES ap_doc_source(id) ON DELETE CASCADE,
message_id TEXT NOT NULL, -- Provider message ID
processed_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (doc_source_id, message_id)
);
This prevents re-processing when:
Attachments are filtered before uploading to S3:
| Include | Exclude |
|---|---|
application/pdf |
Inline images (is-inline=true) |
image/png |
Signature files (.sig, .p7s, .smime) |
image/jpeg |
Calendar files (.ics, .vcf) |
image/tiff |
Oversized files (>25MB) |
Each ingestion stores email context for traceability:
{
"provider": "outlook",
"message_id": "AAMkAGI2...",
"from": "supplier@example.com",
"subject": "Invoice #12345",
"received_at": "2026-01-02T10:30:00Z",
"attachment_filename": "invoice.pdf"
}
CREATE TYPE email_connection_status AS ENUM (
'active', -- working normally
'token_refresh_failed', -- needs re-authorization
'subscription_expired', -- webhook subscription lapsed
'provider_error' -- API returning errors
);
connection_status = 'subscription_expired'ap_doc_source.is_active = false| Column | Type | Description |
|---|---|---|
connection_status |
email_connection_status |
Current health state |
subscription_expires_at |
TIMESTAMPTZ |
When to renew subscription |
provider_subscription_id |
TEXT |
Outlook subscription ID |
webhook_client_state |
TEXT |
Outlook clientState for verification (NULL for Gmail) |
last_sync_token |
TEXT |
deltaLink or historyId |
last_successful_sync_at |
TIMESTAMPTZ |
Health monitoring |
| Column | Type | Description |
|---|---|---|
status = 'skipped' |
enum value | For irrelevant attachments |
skip_reason |
TEXT |
Why ingestion was skipped |
Email ingestions store metadata in ap_ingestion.source_metadata:
{
"provider": "outlook",
"message_id": "<abc123@mail.example.com>",
"from": "supplier@example.com",
"subject": "Invoice #12345 - January 2026",
"received_at": "2026-01-02T10:30:00Z",
"attachment_filename": "invoice-12345.pdf"
}
Three-layer deduplication:
| Layer | Mechanism | Prevents |
|---|---|---|
| Sync token | last_sync_token watermark |
Re-fetching seen emails |
| Message ID | Track per ap_doc_source | Re-processing same email |
| Content hash | queue-for-ingestion! |
Duplicate S3 uploads |