Note (2026-04-24): After this document was written,
legal_entitywas renamed totenantand the oldtenantwas renamed toorganization. Read references to these terms with the pre-rename meaning.
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Introduce document.diagnostics (materialized JSONB snapshot of derived outputs) and a unified document_processor_run audit table; add a workers-service-hosted, throttled (60s debounce) recompute pipeline triggered on edits; render per-section stale/recomputing UI states with auto-update via the existing SSE stream.
Architecture: Move validation-results, fraud-flags, tax-issues, and per-line-item vat-validation out of document.structured_data into document.diagnostics; replace ap_ingestion_post_process_stat and the per-document matching/reconciliation status columns with a single document_processor_run table; add a new SQS queue + consumer that debounces recomputes 60s after the last edit using an idempotent handler. Detail view classifies each diagnostic section as current/stale/recomputing/failed/never-run from the latest run row, reuses the existing document-events SSE stream with two new event types (diagnostic-run-started, diagnostic-run-completed), and re-renders sections in place as runs complete.
Tech Stack: Clojure, PostgreSQL (JSONB + ENUMs + triggers + pg_notify), Migratus migrations, AWS SQS (production via CDK; LocalStack via Testcontainers in tests), HTMX SSE, core.async pub/sub, Integrant, Malli schemas, clojure.test, embedded-postgres for tests.
Reference spec: docs/superpowers/specs/2026-04-13-document-diagnostics-design.md
Depends on: Edit-history plan — must be merged first. This plan assumes document.version, document_history, the app-level ingestion-completion handler, and resources/migrations/PENDING-CLEANUPS.md already exist.
resources/migrations/20260413120000-add-document-diagnostics.up.sqlresources/migrations/20260413120000-add-document-diagnostics.down.sqlsrc/com/getorcha/schema/diagnostics.clj — Malli schemas for the document.diagnostics shape (Validations, FraudFlag, TaxIssue, VatValidation, MatchingSummary, ReconciliationSummary).src/com/getorcha/db/document_processor_run.clj — DB layer for the new table (insert-running!, complete-run!, fail-run!, latest-runs-per-processor, count-runs).src/com/getorcha/workers/diagnostics_recompute.clj — SQS consumer entry-point + idempotency check + dispatch.src/com/getorcha/workers/diagnostics_recompute/orchestrator.clj — Phase ordering for diagnostic processors.test/com/getorcha/db/document_processor_run_test.cljtest/com/getorcha/workers/diagnostics_recompute_test.cljresources/migrations/PENDING-CLEANUPS.md (created by edit-history plan)src/com/getorcha/schema/invoice/structured_data.cljsrc/com/getorcha/schema/purchase_order/structured_data.cljsrc/com/getorcha/schema/goods_received_note/structured_data.cljsrc/com/getorcha/schema/contract/structured_data.cljsrc/com/getorcha/workers/ap/ingestion/extraction.clj (4 prompt templates — drop missing-fields)src/com/getorcha/workers/ap/ingestion.clj (ingestion-completion: write to document.diagnostics, strip moved keys, recompute needs_human_review)src/com/getorcha/workers/ap/ingestion/post_process.clj (write run rows two-phase per processor)src/com/getorcha/workers/ap/ingestion/validation.clj (extract vat-validation as a standalone processor)src/com/getorcha/workers/ap/matching/worker.clj (write matching run row + diagnostics.matching)src/com/getorcha/workers/ap/matching/reconciliation.clj (write reconciliation run row + diagnostics.reconciliation)src/com/getorcha/app/http/documents/view/shared.clj (SSE exec-fn extension for new events)src/com/getorcha/app/http/documents/view/invoice.clj (read diagnostics from document.diagnostics)src/com/getorcha/app/http/documents/view/purchase_order.cljsrc/com/getorcha/app/http/documents/view/goods_received_note.cljsrc/com/getorcha/app/http/documents/view/contract.cljsrc/com/getorcha/app/ui/components.clj (per-section state classifier + stale/recomputing rendering)resources/app/public/css/style.css (stale + recomputing styles)resources/com/getorcha/config.edn (queue URL config)src/com/getorcha/system.clj (Integrant config: register diagnostics-recompute consumer)infra/stacks/foundation_stack.py (CDK SQS queue + DLQ)scripts/init_aws.clj (LocalStack queue creation)test/com/getorcha/test/fixtures.clj (create new queue at test startup)scripts/debug_fetch_document.clj (copy document_processor_run rows when fetching prod doc)scripts/debug_common.clj (jsonb-keys for new column + table).claude/skills/debug-doc/SKILL.md.claude/skills/ingestion-regression-test/inspector-prompt.mdLays the schema. After this phase, the DB has the new table, the new column, and backfilled data — but no Clojure code reads or writes either yet.
Files:
Create: resources/migrations/20260413120000-add-document-diagnostics.up.sql
Step 1: Create the migration file with new types, table, indexes, and column
Write to resources/migrations/20260413120000-add-document-diagnostics.up.sql:
-- New ENUM types for unified processor run tracking
CREATE TYPE processor_run_status AS ENUM ('pending', 'running', 'completed', 'failed');
--;;
CREATE TYPE processor_run_trigger AS ENUM ('ingestion', 'edit', 'manual');
--;;
-- Unified per-processor run table; replaces ap_ingestion_post_process_stat
-- and absorbs per-document matching/reconciliation status tracking.
CREATE TABLE document_processor_run (
id UUID PRIMARY KEY DEFAULT uuidv7(),
document_id UUID NOT NULL REFERENCES document(id) ON DELETE CASCADE,
processor_id TEXT NOT NULL,
trigger_kind processor_run_trigger NOT NULL,
ingestion_id UUID REFERENCES ap_ingestion(id) ON DELETE CASCADE,
triggered_by_history_id UUID REFERENCES document_history(id),
document_version INT NOT NULL,
started_at TIMESTAMPTZ NOT NULL DEFAULT now(),
ended_at TIMESTAMPTZ,
input_tokens INTEGER,
output_tokens INTEGER,
model TEXT,
commit_sha TEXT,
status processor_run_status NOT NULL DEFAULT 'running',
result JSONB,
error TEXT,
CONSTRAINT doc_processor_run_trigger_xor CHECK (
(trigger_kind = 'ingestion' AND ingestion_id IS NOT NULL AND triggered_by_history_id IS NULL)
OR (trigger_kind = 'edit' AND ingestion_id IS NULL AND triggered_by_history_id IS NOT NULL)
OR (trigger_kind = 'manual' AND ingestion_id IS NULL AND triggered_by_history_id IS NULL)
)
);
--;;
CREATE INDEX idx_doc_processor_run_doc_proc_version
ON document_processor_run(document_id, processor_id, document_version DESC);
--;;
CREATE INDEX idx_doc_processor_run_ingestion
ON document_processor_run(ingestion_id) WHERE ingestion_id IS NOT NULL;
--;;
-- Materialized snapshot of all diagnostic outputs (validations, fraud, tax,
-- per-line VAT, matching summary, reconciliation summary).
ALTER TABLE document ADD COLUMN diagnostics JSONB;
--;;
git add resources/migrations/20260413120000-add-document-diagnostics.up.sql
git commit -m "feat: add document_processor_run table and document.diagnostics column"
Files:
Create: resources/migrations/20260413120000-add-document-diagnostics.down.sql
Step 1: Write the down migration
Write to resources/migrations/20260413120000-add-document-diagnostics.down.sql:
ALTER TABLE document DROP COLUMN IF EXISTS diagnostics;
--;;
DROP INDEX IF EXISTS idx_doc_processor_run_ingestion;
--;;
DROP INDEX IF EXISTS idx_doc_processor_run_doc_proc_version;
--;;
DROP TABLE IF EXISTS document_processor_run;
--;;
DROP TYPE IF EXISTS processor_run_trigger;
--;;
DROP TYPE IF EXISTS processor_run_status;
--;;
git add resources/migrations/20260413120000-add-document-diagnostics.down.sql
git commit -m "feat: down migration for document diagnostics schema"
ap_ingestion_post_process_statFiles:
Modify: resources/migrations/20260413120000-add-document-diagnostics.up.sql
Step 1: Append backfill SQL for post-process stats
Append to the end of resources/migrations/20260413120000-add-document-diagnostics.up.sql:
-- Backfill: copy every legacy post-process stat row into the new run table.
-- These rows are LLM-driven processors run during ingestion. We have no
-- per-row result snapshot for them; result stays NULL.
INSERT INTO document_processor_run
(document_id, processor_id, trigger_kind, ingestion_id, document_version,
started_at, ended_at, input_tokens, output_tokens, model, status, commit_sha)
SELECT i.document_id,
s.processor_id,
'ingestion'::processor_run_trigger,
s.ingestion_id,
1,
s.started_at,
s.ended_at,
s.input_tokens,
s.output_tokens,
s.model,
'completed'::processor_run_status,
i.commit_sha
FROM ap_ingestion_post_process_stat s
JOIN ap_ingestion i ON i.id = s.ingestion_id
WHERE i.document_id IS NOT NULL;
--;;
git add resources/migrations/20260413120000-add-document-diagnostics.up.sql
git commit -m "feat: backfill document_processor_run from ap_ingestion_post_process_stat"
Files:
Modify: resources/migrations/20260413120000-add-document-diagnostics.up.sql
Step 1: Append matching backfill
Append to resources/migrations/20260413120000-add-document-diagnostics.up.sql:
-- Backfill: one matching-run row per document with non-null matching_status.
-- This loses per-attempt fidelity (matching_attempts > 1 docs collapse to a
-- single "latest state" row), but preserves the audit signal.
INSERT INTO document_processor_run
(document_id, processor_id, trigger_kind, ingestion_id, document_version,
started_at, ended_at, status, error)
SELECT d.id,
'matching',
'ingestion'::processor_run_trigger,
(SELECT id FROM ap_ingestion
WHERE document_id = d.id
ORDER BY completed_at DESC NULLS LAST LIMIT 1),
1,
COALESCE(d.matching_failed_at, d.updated_at),
CASE WHEN d.matching_status IN ('succeeded','failed','skipped')
THEN COALESCE(d.matching_failed_at, d.updated_at) END,
(CASE d.matching_status
WHEN 'pending' THEN 'pending'
WHEN 'in-progress' THEN 'running'
WHEN 'succeeded' THEN 'completed'
WHEN 'failed' THEN 'failed'
WHEN 'skipped' THEN 'completed'
END)::processor_run_status,
d.matching_error
FROM document d
WHERE d.matching_status IS NOT NULL;
--;;
git add resources/migrations/20260413120000-add-document-diagnostics.up.sql
git commit -m "feat: backfill matching runs from document.matching_status columns"
Files:
Modify: resources/migrations/20260413120000-add-document-diagnostics.up.sql
Step 1: Append reconciliation backfill
Append to resources/migrations/20260413120000-add-document-diagnostics.up.sql:
-- Backfill: one reconciliation-run row per document with non-null reconciliation_status.
INSERT INTO document_processor_run
(document_id, processor_id, trigger_kind, ingestion_id, document_version,
started_at, ended_at, status, result)
SELECT d.id,
'reconciliation',
'ingestion'::processor_run_trigger,
(SELECT id FROM ap_ingestion
WHERE document_id = d.id
ORDER BY completed_at DESC NULLS LAST LIMIT 1),
1,
d.updated_at,
d.updated_at,
'completed'::processor_run_status,
jsonb_build_object('status', d.reconciliation_status)
FROM document d
WHERE d.reconciliation_status IS NOT NULL;
--;;
git add resources/migrations/20260413120000-add-document-diagnostics.up.sql
git commit -m "feat: backfill reconciliation runs from document.reconciliation_status"
Files:
Modify: resources/migrations/20260413120000-add-document-diagnostics.up.sql
Step 1: Append synthetic validations backfill
Append to resources/migrations/20260413120000-add-document-diagnostics.up.sql:
-- Backfill: one synthetic 'validations' run per document with validation-results.
-- Validations were previously not recorded as a discrete run anywhere;
-- this gives every existing doc a baseline audit row.
INSERT INTO document_processor_run
(document_id, processor_id, trigger_kind, ingestion_id, document_version,
started_at, ended_at, status, result)
SELECT d.id,
'validations',
'ingestion'::processor_run_trigger,
(SELECT id FROM ap_ingestion
WHERE document_id = d.id
ORDER BY completed_at DESC NULLS LAST LIMIT 1),
1,
d.updated_at,
d.updated_at,
'completed'::processor_run_status,
d.structured_data -> 'validation-results'
FROM document d
WHERE d.structured_data ? 'validation-results';
--;;
git add resources/migrations/20260413120000-add-document-diagnostics.up.sql
git commit -m "feat: backfill synthetic validations runs"
document.diagnostics from structured_dataFiles:
Modify: resources/migrations/20260413120000-add-document-diagnostics.up.sql
Step 1: Append diagnostics seeding SQL
Append to resources/migrations/20260413120000-add-document-diagnostics.up.sql:
-- Seed document.diagnostics from existing structured_data keys.
-- jsonb_strip_nulls trims absent slices so we don't carry NULL keys.
UPDATE document SET diagnostics = jsonb_strip_nulls(jsonb_build_object(
'validations', structured_data -> 'validation-results',
'fraud-flags', structured_data -> 'fraud-flags',
'tax-issues', structured_data -> 'tax-issues',
'line-items', (SELECT jsonb_object_agg(li ->> 'id',
jsonb_build_object('vat-validation',
li -> 'vat-validation'))
FROM jsonb_array_elements(structured_data -> 'line-items') li
WHERE li ? 'vat-validation'),
'reconciliation', CASE WHEN reconciliation_status IS NOT NULL
THEN jsonb_build_object('status', reconciliation_status)
END
))
WHERE structured_data IS NOT NULL;
--;;
git add resources/migrations/20260413120000-add-document-diagnostics.up.sql
git commit -m "feat: seed document.diagnostics from structured_data and matching/reconciliation columns"
structured_dataFiles:
Modify: resources/migrations/20260413120000-add-document-diagnostics.up.sql
Step 1: Append top-level key stripping
Append to resources/migrations/20260413120000-add-document-diagnostics.up.sql:
-- Strip moved keys from structured_data.
-- 'missing-fields' is removed entirely (dead LLM metadata; see spec §8).
UPDATE document SET
structured_data = structured_data
- 'validation-results'
- 'fraud-flags'
- 'tax-issues'
- 'missing-fields'
WHERE structured_data IS NOT NULL;
--;;
Append:
-- Strip vat-validation from each line item; preserve order using the :order
-- attribute introduced by the edit-history migration.
UPDATE document SET structured_data = jsonb_set(
structured_data, '{line-items}',
COALESCE(
(SELECT jsonb_agg(li - 'vat-validation' ORDER BY (li ->> 'order')::int)
FROM jsonb_array_elements(structured_data -> 'line-items') li),
'[]'::jsonb))
WHERE structured_data ? 'line-items'
AND jsonb_typeof(structured_data -> 'line-items') = 'array';
--;;
git add resources/migrations/20260413120000-add-document-diagnostics.up.sql
git commit -m "feat: strip moved diagnostic keys from structured_data"
Files:
Modify: resources/migrations/20260413120000-add-document-diagnostics.up.sql
Step 1: Drop the old trigger and function, create the new one
Append to resources/migrations/20260413120000-add-document-diagnostics.up.sql:
-- Drop the old matching-status notify trigger; the column is going away.
DROP TRIGGER IF EXISTS trigger_document_matching_event ON document;
--;;
DROP FUNCTION IF EXISTS notify_matching_event();
--;;
-- New trigger fires on every processor run row insert/status update,
-- emitting diagnostic-run-started / diagnostic-run-completed events.
CREATE OR REPLACE FUNCTION notify_processor_run_event()
RETURNS TRIGGER AS $$
DECLARE
payload jsonb;
le_id uuid;
le_tenant_id uuid;
BEGIN
IF NEW.status IS NULL OR NEW.status NOT IN ('running', 'completed', 'failed') THEN
RETURN NEW;
END IF;
SELECT d.legal_entity_id, le.tenant_id
INTO le_id, le_tenant_id
FROM document d
JOIN legal_entity le ON le.id = d.legal_entity_id
WHERE d.id = NEW.document_id;
payload := jsonb_build_object(
'event/type', CASE WHEN NEW.status = 'running'
THEN 'diagnostic-run-started'
ELSE 'diagnostic-run-completed' END,
'document/id', NEW.document_id::text,
'processor/id', NEW.processor_id,
'document-version', NEW.document_version,
'run-status', NEW.status::text,
'legal-entity/id', le_id::text,
'tenant/id', le_tenant_id::text
);
PERFORM pg_notify('document_events', payload::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
--;;
CREATE TRIGGER trigger_processor_run_event
AFTER INSERT OR UPDATE OF status ON document_processor_run
FOR EACH ROW
WHEN (NEW.status IN ('running', 'completed', 'failed'))
EXECUTE FUNCTION notify_processor_run_event();
--;;
Prepend (so the down recreates the old trigger before dropping the new) to resources/migrations/20260413120000-add-document-diagnostics.down.sql:
DROP TRIGGER IF EXISTS trigger_processor_run_event ON document_processor_run;
--;;
DROP FUNCTION IF EXISTS notify_processor_run_event();
--;;
CREATE OR REPLACE FUNCTION notify_matching_event()
RETURNS TRIGGER AS $$
DECLARE
payload jsonb;
le_tenant_id uuid;
BEGIN
IF NEW.matching_status NOT IN ('succeeded', 'failed') THEN
RETURN NEW;
END IF;
SELECT le.tenant_id INTO le_tenant_id
FROM legal_entity le WHERE le.id = NEW.legal_entity_id;
payload := jsonb_build_object(
'event/type', 'matching',
'document/id', NEW.id::text,
'document/matching-status', NEW.matching_status::text,
'legal-entity/id', NEW.legal_entity_id::text,
'tenant/id', le_tenant_id::text,
'old-status', CASE WHEN OLD.matching_status IS NULL
THEN NULL ELSE OLD.matching_status::text END
);
PERFORM pg_notify('document_events', payload::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
--;;
CREATE TRIGGER trigger_document_matching_event
AFTER UPDATE OF matching_status ON document
FOR EACH ROW
WHEN (OLD.matching_status IS DISTINCT FROM NEW.matching_status)
EXECUTE FUNCTION notify_matching_event();
--;;
git add resources/migrations/20260413120000-add-document-diagnostics.up.sql resources/migrations/20260413120000-add-document-diagnostics.down.sql
git commit -m "feat: replace matching pg_notify with processor_run_event trigger"
Files:
Modify: resources/migrations/PENDING-CLEANUPS.md
Step 1: Append new entries below the existing edit-history entries
Append to resources/migrations/PENDING-CLEANUPS.md:
## `ap_ingestion_post_process_stat` (entire table)
- **Replaced by:** `document_processor_run` (unified per-processor run history,
any trigger kind).
- **Stopped being written:** <DATE>, when the diagnostics recompute pipeline
shipped and post-process handlers started writing to `document_processor_run`.
- **Gate to drop:** backfill verified, no open queries against the old table.
## `document.matching_status`, `matching_error`, `matching_attempts`, `matching_failed_at`
- **Replaced by:** `document_processor_run` rows where `processor_id = 'matching'`.
Latest status via `DISTINCT ON (processor_id) ... ORDER BY document_version DESC`;
attempts via `COUNT(*)`.
- **Stopped being written:** <DATE>.
- **Gate to drop:** all readers migrated to the new table; the pg_notify
trigger replaced by `trigger_processor_run_event`.
## `matching_status` ENUM type
- **Replaced by:** `processor_run_status` ENUM.
- **Gate to drop:** after the columns above are dropped.
## `document.reconciliation_status`
- **Replaced by:** `document.diagnostics.reconciliation.status` (materialized)
and `document_processor_run` rows where `processor_id = 'reconciliation'`.
- **Stopped being written:** <DATE>.
- **Gate to drop:** reconciliation UI reads from `document.diagnostics`.
## `structured_data.{validation-results, fraud-flags, tax-issues}`
## `structured_data.line-items[*].vat-validation`
- **Replaced by:** `document.diagnostics` + `document_processor_run`.
- **Stopped being written:** <DATE>.
- **Gate to drop:** migration verified, all readers moved to `document.diagnostics`.
git add resources/migrations/PENDING-CLEANUPS.md
git commit -m "docs: queue diagnostics-related columns/tables for cleanup"
Files:
(no source changes — verification only)
Step 1: Bring up the dev system, which auto-runs migrations
clj -M:dev
In the REPL:
(integrant.repl/reset-all)
Expected: system starts; logs show migratus applying 20260413120000-add-document-diagnostics. No errors.
psql -h localhost -U postgres -d orcha -c "\d document_processor_run"
psql -h localhost -U postgres -d orcha -c "\dT processor_run_status processor_run_trigger"
psql -h localhost -U postgres -d orcha -c "SELECT column_name, data_type FROM information_schema.columns WHERE table_name = 'document' AND column_name = 'diagnostics';"
Expected:
document_processor_run table prints with all expected columns.
Both ENUM types exist with the documented values.
document.diagnostics exists with type jsonb.
Step 3: Verify the trigger is installed and the old one is gone
psql -h localhost -U postgres -d orcha -c "SELECT tgname FROM pg_trigger WHERE tgrelid = 'document_processor_run'::regclass AND NOT tgisinternal;"
psql -h localhost -U postgres -d orcha -c "SELECT tgname FROM pg_trigger WHERE tgrelid = 'document'::regclass AND tgname = 'trigger_document_matching_event';"
Expected:
First query lists trigger_processor_run_event.
Second query returns no rows (the old trigger was dropped).
Step 4: Verify backfill happened for any pre-existing test data
psql -h localhost -U postgres -d orcha -c "SELECT processor_id, COUNT(*) FROM document_processor_run GROUP BY processor_id ORDER BY processor_id;"
psql -h localhost -U postgres -d orcha -c "SELECT COUNT(*) FROM document WHERE diagnostics IS NOT NULL;"
Expected: counts reflect what was in the local DB before migration. If the local DB is empty, both queries return 0 rows / 0 — that's fine.
No commit — verification only.
After this phase, the Malli schemas reflect the new shape, the diagnostics namespace exists, and the LLM extraction prompts no longer ask for missing-fields.
schema/diagnostics.cljFiles:
Create: src/com/getorcha/schema/diagnostics.clj
Step 1: Create the diagnostics namespace with reused sub-schemas
Write to src/com/getorcha/schema/diagnostics.clj:
(ns com.getorcha.schema.diagnostics
"Schema for document.diagnostics — derived outputs computed from
editable document state. Not editable by users; recomputed by the
diagnostics-recompute pipeline after edits."
(:require [malli.core :as m]))
(def ValidationCheck
"Single validation check result.
:pass checks have no message; other statuses require message."
[:map
[:status [:enum "pass" "warning" "error" "uncertain"]]
[:field {:optional true} :string]
[:message {:optional true} :string]
[:details {:optional true} :map]
[:resolved-value {:optional true} :any]
[:confidence {:optional true} :double]
[:reasoning {:optional true} :string]])
(def Validations
"Map of check-name -> ValidationCheck."
[:map-of :keyword ValidationCheck])
(def FraudFlagType
[:enum
:bank-account-mismatch :high-risk-country :missing-vat-id
:supplier-bank-change :supplier-tax-id-change :invoice-splitting
:sender-domain-change :urgency-language :vague-description])
(def FraudSeverity
[:enum :critical :warning :info :not-applicable])
(def FraudFlag
[:map
[:rule-id :keyword]
[:type FraudFlagType]
[:severity FraudSeverity]
[:message :string]
[:suggestion {:optional true} :string]
[:details {:optional true} :map]])
(def TaxIssue
[:map
[:type [:enum :missing-vat-id :missing-tax-id :incorrect-vat-treatment
:rate-mismatch :missing-compliance-statement :other]]
[:severity [:enum "error" "warning"]]
[:message :string]
[:suggestion {:optional true} :string]])
(def VatValidation
"Per-line-item VAT validation result."
[:map
[:status [:enum "valid" "invalid" "warning" "skipped"]]
[:expected-rate {:optional true} number?]
[:reasoning [:maybe :string]]
[:suggestion {:optional true} :string]])
(def MatchingSummary
"Snapshot of the matches produced by the matching processor.
The relational document_match table remains the primary query source;
this snapshot is for self-contained diagnostic-run audit and UI render."
[:map
[:matches [:vector
[:map
[:document-id :string]
[:blended-score number?]
[:llm-confidence [:enum "high" "medium" "low"]]
[:match-method [:enum "rule-based" "llm"]]]]]])
(def ReconciliationSummary
[:map
[:status [:enum "reconciled" "incomplete" "error"]]
[:details {:optional true} :map]])
(def Diagnostics
"Schema for the document.diagnostics JSONB column. Top-level keys are
absent when the corresponding processor has never successfully run."
(m/schema
[:map
[:validations {:optional true} Validations]
[:fraud-flags {:optional true} [:vector FraudFlag]]
[:tax-issues {:optional true} [:vector TaxIssue]]
[:line-items {:optional true} [:map-of :string [:map [:vat-validation {:optional true} VatValidation]]]]
[:matching {:optional true} MatchingSummary]
[:reconciliation {:optional true} ReconciliationSummary]]))
Run:
clj-kondo --lint src/com/getorcha/schema/diagnostics.clj
Expected: no problems reported.
git add src/com/getorcha/schema/diagnostics.clj
git commit -m "feat: add schema/diagnostics.clj for document.diagnostics shape"
Files:
Modify: src/com/getorcha/schema/invoice/structured_data.clj
Step 1: Read the file
Read src/com/getorcha/schema/invoice/structured_data.clj to confirm the current location of the keys to be removed (per spec §3.3): :missing-fields, :tax-issues, :fraud-flags, :validation-results, plus the :vat-validation field on LineItem and the ValidationCheck / ValidationResults defs.
InvoiceDataEdit src/com/getorcha/schema/invoice/structured_data.clj and delete these lines from the InvoiceData body:
;; LLM-provided metadata
[:missing-fields [:maybe [:vector :string]]]
;; Post-processing results (optional, added after extraction)
;; Note: cost-center and bu-code are per line-item, not invoice-level
[:service-category {:optional true} ServiceCategory]
[:tax-issues {:optional true} [:vector TaxIssue]]
;; Fraud detection results (added by with-fraud-detection phase)
[:fraud-flags {:optional true} [:vector FraudFlag]]
;; Validation results (added by with-validations phase)
[:validation-results {:optional true} ValidationResults]
Keep :service-category (still editable per spec §3.3 — only diagnostics move out). Remove :missing-fields, :tax-issues, :fraud-flags, :validation-results.
The resulting block should look like:
;; Optional service classification (editable; LLM seeds the value at
;; ingestion, user can override).
[:service-category {:optional true} ServiceCategory]
:vat-validation from the LineItem schemaIn the same file, remove this line from LineItem:
[:vat-validation {:optional true} VatValidation]
In the same file, delete the entire ValidationCheck, ValidationResults, TaxIssue, FraudFlag, FraudFlagType, FraudSeverity, and VatValidation defs (they live in schema/diagnostics.clj now).
Run:
clj-kondo --lint src test dev
Expected: no errors. Any usages elsewhere of com.getorcha.schema.invoice.structured-data/ValidationResults, /FraudFlag, /TaxIssue, /VatValidation will surface here as unresolved-symbol errors. If any appear, change them to require com.getorcha.schema.diagnostics and use the relocated symbol — those usages will be retired in later phases (e.g. processors writing to diagnostics), so a temporary alias is fine.
git add src/com/getorcha/schema/invoice/structured_data.clj
git commit -m "feat: remove diagnostic keys from invoice structured-data schema"
Files:
Modify: src/com/getorcha/schema/purchase_order/structured_data.clj
Step 1: Read the file and identify the analogous keys
Read src/com/getorcha/schema/purchase_order/structured_data.clj. Confirm presence of :missing-fields and any of :validation-results, :fraud-flags, :tax-issues (presence varies per doc type).
Delete :missing-fields (always present). Delete :validation-results, :fraud-flags, :tax-issues if present. If LineItem carries :vat-validation, delete it.
clj-kondo --lint src test dev
Expected: no errors.
git add src/com/getorcha/schema/purchase_order/structured_data.clj
git commit -m "feat: remove diagnostic keys from purchase_order structured-data schema"
Files:
Modify: src/com/getorcha/schema/goods_received_note/structured_data.clj
Step 1: Read and edit per the same pattern as B3
Same approach: remove :missing-fields and any of the diagnostic keys present.
clj-kondo --lint src test dev
git add src/com/getorcha/schema/goods_received_note/structured_data.clj
git commit -m "feat: remove diagnostic keys from goods_received_note structured-data schema"
Files:
Modify: src/com/getorcha/schema/contract/structured_data.clj
Step 1: Read and edit per the same pattern
Step 2: Run lint
clj-kondo --lint src test dev
git add src/com/getorcha/schema/contract/structured_data.clj
git commit -m "feat: remove diagnostic keys from contract structured-data schema"
missing-fields from extraction promptsFiles:
Modify: src/com/getorcha/workers/ap/ingestion/extraction.clj
Step 1: Locate the four prompt templates
Search the file for the four occurrences of the "missing-fields": ["field1", ...] line — one per document type (invoice, PO, contract, GRN), at approximately lines 394, 950, 1207, 1481.
Run:
grep -n '"missing-fields"' src/com/getorcha/workers/ap/ingestion/extraction.clj
Expected: four hits.
For each hit, delete the entire "missing-fields": ["field1", ...], line plus any blank line that separates it from neighbors. Preserve the rest of the prompt.
Run:
clj-kondo --lint src/com/getorcha/workers/ap/ingestion/extraction.clj
Expected: no errors.
git add src/com/getorcha/workers/ap/ingestion/extraction.clj
git commit -m "feat: drop missing-fields from extraction prompts"
After this phase, Clojure code can insert/update/query document_processor_run rows.
db/document_processor_run.cljFiles:
Create: src/com/getorcha/db/document_processor_run.clj
Step 1: Create the namespace with insert/update/read functions
Write to src/com/getorcha/db/document_processor_run.clj:
(ns com.getorcha.db.document-processor-run
"Database access for document_processor_run.
Two-phase write pattern: insert with status='running' at dispatch,
update to status='completed'|'failed' at completion."
(:require [com.getorcha.db.sql :as db.sql]))
(defn insert-running!
"Inserts a row with status='running'. Returns the row's :id (UUID).
`params` keys:
:document-id UUID, required
:processor-id string, required
:trigger-kind :ingestion | :edit | :manual, required
:ingestion-id UUID, required iff trigger-kind = :ingestion
:triggered-by-history-id UUID, required iff trigger-kind = :edit
:document-version integer, required
:commit-sha string, optional"
[db {:keys [document-id processor-id trigger-kind ingestion-id
triggered-by-history-id document-version commit-sha]}]
(let [row (db.sql/execute-one!
db
{:insert-into [:document-processor-run]
:columns [:document-id :processor-id :trigger-kind
:ingestion-id :triggered-by-history-id
:document-version :commit-sha :status]
:values [[document-id processor-id [:cast (name trigger-kind) :processor-run-trigger]
ingestion-id triggered-by-history-id
document-version commit-sha
[:cast "running" :processor-run-status]]]
:returning [:id]})]
(:document-processor-run/id row)))
(defn complete-run!
"Marks a run as completed. Stores result + LLM stats. Returns nothing useful."
[db run-id {:keys [result input-tokens output-tokens model]}]
(db.sql/execute!
db
{:update :document-processor-run
:set {:status [:cast "completed" :processor-run-status]
:ended-at [:now]
:result [:cast (when result (cheshire.core/generate-string result)) :jsonb]
:input-tokens input-tokens
:output-tokens output-tokens
:model model}
:where [:= :id run-id]}))
(defn fail-run!
"Marks a run as failed with an error message."
[db run-id error-message]
(db.sql/execute!
db
{:update :document-processor-run
:set {:status [:cast "failed" :processor-run-status]
:ended-at [:now]
:error error-message}
:where [:= :id run-id]}))
(defn latest-runs-per-processor
"Returns the latest row per processor for a document. One row per
processor_id, ordered by document_version DESC then started_at DESC.
Returns a vector of full row maps (qualified keys per as-kebab-maps)."
[db document-id]
(db.sql/execute!
db
{:select [:processor-id :status :document-version :result :error
:started-at :ended-at :model :triggered-by-history-id]
:from [[{:select [:document-processor-run.*
[[:row-number] :over [{:partition-by [:processor-id]
:order-by [[:document-version :desc]
[:started-at :desc]]}]
:rn]]
:from [:document-processor-run]
:where [:= :document-id document-id]}
:ranked]]
:where [:= :ranked.rn 1]}))
(defn count-runs
"Counts runs for a document, optionally filtered by processor-id and/or status."
[db document-id & {:keys [processor-id status]}]
(let [{:keys [count]}
(db.sql/execute-one!
db
{:select [[:%count.* :count]]
:from [:document-processor-run]
:where (cond-> [:= :document-id document-id]
processor-id (->> (vector :and [:= :processor-id processor-id]))
status (->> (vector :and [:= :status [:cast (name status) :processor-run-status]])))})]
(or count 0)))
cheshire.core to the require list at the top of the namespaceThe complete-run! function needs cheshire. Update the ns form:
(ns com.getorcha.db.document-processor-run
"Database access for document_processor_run.
Two-phase write pattern: insert with status='running' at dispatch,
update to status='completed'|'failed' at completion."
(:require [cheshire.core]
[com.getorcha.db.sql :as db.sql]))
clj-kondo --lint src/com/getorcha/db/document_processor_run.clj
Expected: no errors.
git add src/com/getorcha/db/document_processor_run.clj
git commit -m "feat: add db/document_processor_run.clj data layer"
db/document_processor_run.cljFiles:
Create: test/com/getorcha/db/document_processor_run_test.clj
Step 1: Write failing tests
Write to test/com/getorcha/db/document_processor_run_test.clj:
(ns com.getorcha.db.document-processor-run-test
(:require [clojure.test :refer [deftest is testing use-fixtures]]
[com.getorcha.db :as db]
[com.getorcha.db.document-processor-run :as run]
[com.getorcha.test.fixtures :as fixtures]))
(use-fixtures :once fixtures/with-system)
(use-fixtures :each fixtures/with-rolled-back-transaction)
(defn ^:private seed-document!
"Inserts a minimal document row + ap_ingestion row. Returns
{:document-id ... :ingestion-id ... :legal-entity-id ...}."
[]
;; Stand up a tenant + legal-entity + document + ingestion using
;; existing fixture helpers. Implementation references
;; com.getorcha.test.fixtures helpers; the *db* var is bound by
;; with-rolled-back-transaction.
...)
(deftest insert-running-creates-row-test
(testing "insert-running! returns a UUID and stores all the inputs"
(let [{:keys [document-id ingestion-id]} (seed-document!)
run-id (run/insert-running!
fixtures/*db*
{:document-id document-id
:processor-id "fraud-detector"
:trigger-kind :ingestion
:ingestion-id ingestion-id
:document-version 1
:commit-sha "abc123"})]
(is (uuid? run-id))
(let [rows (run/latest-runs-per-processor fixtures/*db* document-id)
row (first (filter #(= "fraud-detector" (:document-processor-run/processor-id %)) rows))]
(is (= "running" (:document-processor-run/status row)))
(is (= 1 (:document-processor-run/document-version row)))))))
(deftest complete-run-updates-row-test
(testing "complete-run! writes status, ended_at, result, and stats"
(let [{:keys [document-id ingestion-id]} (seed-document!)
run-id (run/insert-running!
fixtures/*db*
{:document-id document-id
:processor-id "tax-compliance-analyzer"
:trigger-kind :ingestion
:ingestion-id ingestion-id
:document-version 1})]
(run/complete-run!
fixtures/*db* run-id
{:result {:tax-issues []}
:input-tokens 100
:output-tokens 50
:model "claude-sonnet-4-6"})
(let [row (->> (run/latest-runs-per-processor fixtures/*db* document-id)
(filter #(= "tax-compliance-analyzer" (:document-processor-run/processor-id %)))
first)]
(is (= "completed" (:document-processor-run/status row)))
(is (some? (:document-processor-run/ended-at row)))
(is (= 100 (:document-processor-run/input-tokens row)))
(is (= "claude-sonnet-4-6" (:document-processor-run/model row)))))))
(deftest fail-run-updates-row-test
(testing "fail-run! writes status='failed' and the error message"
(let [{:keys [document-id ingestion-id]} (seed-document!)
run-id (run/insert-running!
fixtures/*db*
{:document-id document-id
:processor-id "matching"
:trigger-kind :ingestion
:ingestion-id ingestion-id
:document-version 1})]
(run/fail-run! fixtures/*db* run-id "no candidates")
(let [row (->> (run/latest-runs-per-processor fixtures/*db* document-id)
(filter #(= "matching" (:document-processor-run/processor-id %)))
first)]
(is (= "failed" (:document-processor-run/status row)))
(is (= "no candidates" (:document-processor-run/error row)))))))
(deftest latest-runs-returns-newest-per-processor-test
(testing "When multiple runs exist for the same processor, only the newest is returned"
(let [{:keys [document-id ingestion-id]} (seed-document!)
older (run/insert-running!
fixtures/*db*
{:document-id document-id
:processor-id "validations"
:trigger-kind :ingestion
:ingestion-id ingestion-id
:document-version 1})
newer (run/insert-running!
fixtures/*db*
{:document-id document-id
:processor-id "validations"
:trigger-kind :ingestion
:ingestion-id ingestion-id
:document-version 2})]
(run/complete-run! fixtures/*db* older {:result {}})
(run/complete-run! fixtures/*db* newer {:result {}})
(let [rows (run/latest-runs-per-processor fixtures/*db* document-id)
validations-rows (filter #(= "validations" (:document-processor-run/processor-id %)) rows)]
(is (= 1 (count validations-rows)))
(is (= 2 (:document-processor-run/document-version (first validations-rows))))))))
(deftest count-runs-test
(testing "count-runs filters by processor-id and status"
(let [{:keys [document-id ingestion-id]} (seed-document!)]
(run/insert-running! fixtures/*db*
{:document-id document-id :processor-id "matching"
:trigger-kind :ingestion :ingestion-id ingestion-id
:document-version 1})
(let [run-id (run/insert-running!
fixtures/*db*
{:document-id document-id :processor-id "matching"
:trigger-kind :ingestion :ingestion-id ingestion-id
:document-version 1})]
(run/fail-run! fixtures/*db* run-id "x"))
(is (= 2 (run/count-runs fixtures/*db* document-id :processor-id "matching")))
(is (= 1 (run/count-runs fixtures/*db* document-id :processor-id "matching" :status :failed))))))
seed-document!Replace the ... placeholder in seed-document! with concrete code that inserts a minimal tenant, legal_entity, document, and ap_ingestion row using db.sql/execute!. Use the existing fixture helpers in com.getorcha.test.fixtures if present (search for similar helpers — e.g. there may already be a with-document or seed-document! helper). If not, write the inserts directly:
(defn ^:private seed-document!
[]
(let [tenant-id (random-uuid)
le-id (random-uuid)
document-id (random-uuid)
ingestion-id (random-uuid)]
(db.sql/execute! fixtures/*db*
{:insert-into [:tenant]
:columns [:id :name]
:values [[tenant-id "test-tenant"]]})
(db.sql/execute! fixtures/*db*
{:insert-into [:legal-entity]
:columns [:id :tenant-id :name]
:values [[le-id tenant-id "test-le"]]})
(db.sql/execute! fixtures/*db*
{:insert-into [:document]
:columns [:id :tenant-id :legal-entity-id :type :file-path :version]
:values [[document-id tenant-id le-id [:cast "invoice" :document-type] "test/path" 1]]})
(db.sql/execute! fixtures/*db*
{:insert-into [:ap-ingestion]
:columns [:id :document-id :tenant-id :legal-entity-id :status]
:values [[ingestion-id document-id tenant-id le-id [:cast "completed" :ap-ingestion-status]]]})
{:document-id document-id :ingestion-id ingestion-id :legal-entity-id le-id}))
If your project's tenant, legal_entity, document, or ap_ingestion schemas have additional NOT NULL columns, add them. Run a single insert via psql to confirm what's required if uncertain.
versionRun:
clj -X:test:silent :nses '[com.getorcha.db.document-processor-run-test]'
Expected outcome depends on whether the edit-history migration has shipped:
If yes (document.version exists): tests should PASS.
If no: tests will FAIL with column-not-found errors. Stop here and rebase onto edit-history first.
Step 4: If tests pass, commit
git add test/com/getorcha/db/document_processor_run_test.clj
git commit -m "test: cover db/document_processor_run insert/update/read"
After this phase, the ingestion pipeline writes per-processor document_processor_run rows two-phase, and the ingestion-completion transaction populates document.diagnostics, strips moved keys from structured_data, and recomputes needs_human_review from the new location.
Files:
Modify: src/com/getorcha/workers/ap/ingestion/post_process.clj
Step 1: Read the file to locate the per-processor invocation point
Read src/com/getorcha/workers/ap/ingestion/post_process.clj and find the function that invokes each post-processor (it iterates over processors, calls each, and writes one row to ap_ingestion_post_process_stat per result). Identify the single point where the call + stats write happens. We will wrap this with two-phase writes.
with-run-row! helper at the top of the namespaceAdd to the namespace require list:
[com.getorcha.db.document-processor-run :as db.run]
Add this helper near the top of the namespace (after the requires):
(defn ^:private with-run-row!
"Two-phase write: insert a 'running' row, run f, then complete or fail.
`run-params` is everything `db.run/insert-running!` needs except status.
`f` is a thunk that returns {:result <output> :stats {:input-tokens ... :output-tokens ... :model ...}}.
On exception, marks the run failed and re-throws."
[db run-params f]
(let [run-id (db.run/insert-running! db run-params)]
(try
(let [{:keys [result stats]} (f)]
(db.run/complete-run! db run-id (assoc stats :result result))
result)
(catch Throwable t
(db.run/fail-run! db run-id (.getMessage t))
(throw t)))))
with-run-row!Find the existing per-processor invocation (around the call site that writes to ap_ingestion_post_process_stat). Replace the direct invocation with a with-run-row! wrap:
Before (illustrative — the actual call shape may differ):
(let [{:keys [result stats]} (invoke-processor processor-id ctx structured-data)]
(write-stat! db ingestion-id processor-id stats)
result)
After:
(with-run-row!
db
{:document-id document-id
:processor-id (name processor-id)
:trigger-kind :ingestion
:ingestion-id ingestion-id
:document-version (:document/version (fetch-document db document-id))
:commit-sha (System/getenv "GIT_COMMIT_SHA")}
(fn []
(let [{:keys [result stats]} (invoke-processor processor-id ctx structured-data)]
;; Keep writing to the legacy table for now — Phase J removes those readers.
(write-stat! db ingestion-id processor-id stats)
{:result result :stats stats})))
Adjust the destructured params to match the actual local names in post_process.clj. The document-id and ingestion-id are typically already in scope at this layer — if not, thread them through from the outer ingestion handler.
clj-kondo --lint src/com/getorcha/workers/ap/ingestion/post_process.clj
Expected: no errors.
clj -X:test:silent :nses '[com.getorcha.workers.ap.ingestion.post-process-test]' 2>&1 | grep -E "FAIL|ERROR|Ran"
Expected: existing tests still pass — the wrap is purely additive and the legacy stats write is preserved.
git add src/com/getorcha/workers/ap/ingestion/post_process.clj
git commit -m "feat: write document_processor_run rows two-phase in post-process"
vat-validation to its own processorFiles:
Modify: src/com/getorcha/workers/ap/ingestion/validation.clj
Modify: src/com/getorcha/workers/ap/ingestion.clj
Step 1: Read validation.clj to find the inline VAT check
Read the file, locate where per-line-item VAT validation is computed (it appears to be inline in the validations pipeline today, attaching :vat-validation to each line item).
Add to src/com/getorcha/workers/ap/ingestion/validation.clj:
(defn run-vat-validation
"Runs deterministic per-line-item VAT validation against `structured-data`.
Returns a map of {line-item-id -> VatValidation-shaped map}.
IDs are read from line-item :id (from the edit-history schema)."
[{:keys [line-items] :as _structured-data}]
(into {}
(for [{:keys [id] :as li} line-items
:let [vv (compute-vat-validation li)]
:when (some? vv)]
[id {:vat-validation vv}])))
If compute-vat-validation is the existing inline helper that returns the VatValidation map for one line item, reuse it. If the inline check is not yet a function, extract its body into one before this step.
:vat-validation to line itemsIn the same file, find and remove the code path that writes :vat-validation into each line item of structured-data (the legacy inline assignment). The run-vat-validation function now produces the data; the ingestion-completion handler will write it to document.diagnostics.line-items.
clj-kondo --lint src/com/getorcha/workers/ap/ingestion/validation.clj
Expected: no errors.
git add src/com/getorcha/workers/ap/ingestion/validation.clj
git commit -m "feat: extract per-line vat-validation into standalone run-vat-validation"
document.diagnostics snapshot in the ingestion-completion handlerFiles:
Modify: src/com/getorcha/workers/ap/ingestion.clj
Step 1: Read the file's ingestion-completion section
Read src/com/getorcha/workers/ap/ingestion.clj and locate the ingestion-completion handler introduced by the edit-history plan (the function that writes the document_history ingestion row + updates document.structured_data + bumps document.version, in one transaction).
build-diagnostics helperAdd to the namespace:
(defn ^:private build-diagnostics
"Assembles the document.diagnostics snapshot from the in-memory results
of validations + post-process processors. `vat-validations` is the map
returned by `validation/run-vat-validation`. `matching` and
`reconciliation` may be nil at ingestion-completion time (the workers
that produce them run after the ingestion-completion transaction;
their outputs land via diagnostic-only updates later)."
[{:keys [validation-results fraud-flags tax-issues vat-validations
matching reconciliation]}]
(cond-> {}
(seq validation-results) (assoc :validations validation-results)
(seq fraud-flags) (assoc :fraud-flags fraud-flags)
(seq tax-issues) (assoc :tax-issues tax-issues)
(seq vat-validations) (assoc :line-items vat-validations)
matching (assoc :matching matching)
reconciliation (assoc :reconciliation reconciliation)))
compute-needs-human-review helperAdd:
(defn ^:private compute-needs-human-review
"True iff any validation has status='error' OR any fraud-flag has severity='critical'."
[{:keys [validations fraud-flags] :as _diagnostics}]
(boolean
(or (some #(= "error" (:status %)) (vals validations))
(some #(= :critical (:severity %)) fraud-flags))))
diagnostics, strip moved keys from structured_data, and recompute needs_human_reviewFind the existing UPDATE document SET structured_data = ..., version = version + 1 ... (or its honeysql equivalent) inside the completion transaction. Modify it to:
(let [diagnostics (build-diagnostics
{:validation-results validation-results
:fraud-flags fraud-flags
:tax-issues tax-issues
:vat-validations vat-validations})
stripped-data (-> final-structured-data
(dissoc :validation-results :fraud-flags :tax-issues :missing-fields)
(update :line-items (fn [lis] (mapv #(dissoc % :vat-validation) lis))))
needs-review? (compute-needs-human-review diagnostics)]
(db.sql/execute! tx
{:update :document
:set {:structured-data [:cast (cheshire.core/generate-string stripped-data) :jsonb]
:diagnostics [:cast (cheshire.core/generate-string diagnostics) :jsonb]
:type document-type
:needs-human-review needs-review?
:version [:+ :version 1]
:updated-at [:now]}
:where [:= :id document-id]}))
The exact local names (final-structured-data, validation-results, etc.) need to match what's already in scope at the completion handler. If those names don't exist, thread them in from the pipeline state map.
clj-kondo --lint src/com/getorcha/workers/ap/ingestion.clj
Expected: no errors.
git add src/com/getorcha/workers/ap/ingestion.clj
git commit -m "feat: write document.diagnostics in ingestion-completion transaction"
Files:
Modify: test/com/getorcha/workers/ap/ingestion_test.clj
Step 1: Add a test that verifies the new column is populated
Append to test/com/getorcha/workers/ap/ingestion_test.clj:
(deftest ingestion-completion-writes-diagnostics-test
(testing "After ingestion completes, document.diagnostics holds validations/fraud-flags/tax-issues
and structured_data does NOT contain those keys."
;; Arrange: set up a document + ingestion using existing fixture helpers,
;; with an extraction result that includes a validation error and a fraud flag.
(let [{:keys [document-id ingestion-id]} (set-up-completed-ingestion-with-issues!)]
(let [doc (db.sql/execute-one! fixtures/*db*
{:select [:diagnostics :structured-data :needs-human-review]
:from [:document]
:where [:= :id document-id]})
diag (:document/diagnostics doc)
sd (:document/structured-data doc)]
(is (some? (:validations diag)) "validations slice present")
(is (some? (:fraud-flags diag)) "fraud-flags slice present")
(is (not (contains? sd :validation-results)) "validation-results stripped from structured_data")
(is (not (contains? sd :fraud-flags)) "fraud-flags stripped from structured_data")
(is (true? (:document/needs-human-review doc)) "needs_human_review derived from diagnostics")))))
set-up-completed-ingestion-with-issues! is a new helper — write it to seed an ap_ingestion row, drive it through the ingestion completion handler with a hand-crafted extraction result containing one validation status='error' and one fraud-flag severity=:critical, and return the IDs.
clj -X:test:silent :nses '[com.getorcha.workers.ap.ingestion-test]' 2>&1 | grep -E "FAIL|ERROR|Ran"
Expected: PASS.
git add test/com/getorcha/workers/ap/ingestion_test.clj
git commit -m "test: ingestion-completion writes diagnostics + strips moved keys"
After this phase, matching and reconciliation workers write their own document_processor_run rows AND populate the matching/reconciliation slices in document.diagnostics. They keep writing to the legacy columns (matching_status, reconciliation_status) for now — Phase J removes those reads.
Files:
Modify: src/com/getorcha/workers/ap/matching/worker.clj
Step 1: Read the file and find the matching-result write site
Read src/com/getorcha/workers/ap/matching/worker.clj and find where matching results are persisted (typically: write to document_match, then UPDATE document SET matching_status = 'succeeded' ...).
At the dispatch point of the matching worker (where the work begins for a single document), insert a running row:
(let [run-id (db.run/insert-running!
tx
{:document-id document-id
:processor-id "matching"
:trigger-kind :ingestion
:ingestion-id ingestion-id
:document-version (:document/version (fetch-document tx document-id))})]
(try
(let [{:keys [matches]} (run-matching! tx document-id)]
;; Build the diagnostics snapshot
(let [matching-summary {:matches (mapv (fn [m]
{:document-id (str (:other-document-id m))
:blended-score (:blended-score m)
:llm-confidence (:llm-confidence m)
:match-method (:match-method m)})
matches)}]
;; Update document.diagnostics.matching, gated on version equality
(update-diagnostics-slice! tx document-id :matching matching-summary)
(db.run/complete-run! tx run-id {:result matching-summary})))
(catch Throwable t
(db.run/fail-run! tx run-id (.getMessage t))
(throw t))))
Add [com.getorcha.db.document-processor-run :as db.run] to the require list.
update-diagnostics-slice! helper at the namespace levelThis helper merges a slice into document.diagnostics, but only if the run's document_version equals document.version at commit time. Add to the namespace:
(defn ^:private update-diagnostics-slice!
"Merges {slice-key slice-value} into document.diagnostics atomically.
No-op if the document's version has advanced beyond `expected-version`."
([tx document-id slice-key slice-value]
(update-diagnostics-slice! tx document-id slice-key slice-value nil))
([tx document-id slice-key slice-value expected-version]
(db.sql/execute!
tx
(cond-> {:update :document
:set {:diagnostics
[:case
[:= :diagnostics nil]
[:cast (cheshire.core/generate-string {slice-key slice-value}) :jsonb]
:else
[:|| :diagnostics
[:cast (cheshire.core/generate-string {slice-key slice-value}) :jsonb]]]
:updated-at [:now]}
:where [:= :id document-id]}
expected-version (update :where (fn [w] [:and w [:= :version expected-version]])))))
(The || operator on JSONB shallow-merges keys, so calling with {:matching {:matches [...]}} overwrites the :matching key without touching siblings.)
needs_human_review after writing the sliceInside update-diagnostics-slice!, also recompute needs_human_review from the resulting diagnostics. Update the function to:
(defn ^:private update-diagnostics-slice!
([tx document-id slice-key slice-value]
(update-diagnostics-slice! tx document-id slice-key slice-value nil))
([tx document-id slice-key slice-value expected-version]
(let [merged-jsonb (cheshire.core/generate-string {slice-key slice-value})]
(db.sql/execute!
tx
(cond-> {:update :document
:set {:diagnostics
[:case
[:= :diagnostics nil]
[:cast merged-jsonb :jsonb]
:else
[:|| :diagnostics [:cast merged-jsonb :jsonb]]]
:updated-at [:now]}
:where [:= :id document-id]}
expected-version (update :where (fn [w] [:and w [:= :version expected-version]]))))
;; Recompute needs_human_review from the now-merged diagnostics.
(db.sql/execute!
tx
{:update :document
:set {:needs-human-review
[:or
[:exists
{:select [1]
:from [[[:jsonb_each [:- :diagnostics 'validations']] [:k :v]]]
:where [:= [:->> :v "status"] "error"]}]
[:exists
{:select [1]
:from [[[:jsonb_array_elements [:- :diagnostics 'fraud-flags']] :f]]
:where [:= [:->> :f "severity"] "critical"]}]]}
:where [:= :id document-id]}))))
This is heavier than the in-application Clojure helper from D3. The trade-off: matching/reconciliation runs from a worker that doesn't already hold the new diagnostics in memory, so a SQL-level recompute is simpler than reading + JSON-decoding + re-serializing.
clj-kondo --lint src/com/getorcha/workers/ap/matching/worker.clj
Expected: no errors.
git add src/com/getorcha/workers/ap/matching/worker.clj
git commit -m "feat: matching worker writes run row + diagnostics.matching"
Files:
Modify: src/com/getorcha/workers/ap/matching/reconciliation.clj
Step 1: Read the file and find the reconciliation result write site
Read src/com/getorcha/workers/ap/matching/reconciliation.clj. Locate where document.reconciliation_status is updated.
Wrap the reconciliation execution with db.run/insert-running! / db.run/complete-run!, write the slice as :reconciliation with shape {:status <status> :details <map>}, and recompute needs_human_review. Use the same update-diagnostics-slice! helper (extract it to a shared namespace, e.g. src/com/getorcha/db/document_diagnostics.clj, if both files need it):
(let [run-id (db.run/insert-running!
tx
{:document-id document-id
:processor-id "reconciliation"
:trigger-kind :ingestion
:ingestion-id ingestion-id
:document-version (:document/version (fetch-document tx document-id))})]
(try
(let [reconciliation-result (run-reconciliation! tx document-id)
summary {:status (:status reconciliation-result)
:details (dissoc reconciliation-result :status)}]
(db.diagnostics/update-slice! tx document-id :reconciliation summary)
(db.run/complete-run! tx run-id {:result summary}))
(catch Throwable t
(db.run/fail-run! tx run-id (.getMessage t))
(throw t))))
update-diagnostics-slice! to src/com/getorcha/db/document_diagnostics.cljCreate the file:
(ns com.getorcha.db.document-diagnostics
"Helpers for writing slices into document.diagnostics with derived
needs_human_review recomputation."
(:require [cheshire.core]
[com.getorcha.db.sql :as db.sql]))
(defn update-slice!
"Merges {slice-key slice-value} into document.diagnostics atomically.
Recomputes document.needs_human_review from the merged result.
No-op if `expected-version` is provided and document.version has advanced past it."
([tx document-id slice-key slice-value]
(update-slice! tx document-id slice-key slice-value nil))
([tx document-id slice-key slice-value expected-version]
;; ... full body from E1 step 4 ...
))
Update both worker.clj and reconciliation.clj to require this namespace and use db.diagnostics/update-slice!.
clj-kondo --lint src/com/getorcha/workers/ap/matching/reconciliation.clj src/com/getorcha/workers/ap/matching/worker.clj src/com/getorcha/db/document_diagnostics.clj
Expected: no errors.
git add src/com/getorcha/db/document_diagnostics.clj src/com/getorcha/workers/ap/matching/worker.clj src/com/getorcha/workers/ap/matching/reconciliation.clj
git commit -m "feat: reconciliation worker writes run row + diagnostics.reconciliation"
After this phase, the detail page renders diagnostic sections from document.diagnostics, classified per-section as current/stale/recomputing/failed/never-run.
app/ui/components.cljFiles:
Modify: src/com/getorcha/app/ui/components.clj
Step 1: Add a diagnostic-section-state helper near the bottom of the namespace
Append to src/com/getorcha/app/ui/components.clj (placement: near the other section helpers):
(defn diagnostic-section-state
"Classifies a section's render state given the document's current version
and the latest run for the processor.
Returns one of :current | :stale | :recomputing | :failed | :never-run.
- :current — latest completed run's document_version equals doc.version.
- :stale — latest completed run's document_version is behind doc.version.
- :recomputing— a row with status='running' exists.
- :failed — latest row has status='failed'.
- :never-run — no row at all AND diagnostics has no slice for this processor."
[{:document/keys [version diagnostics] :as _document}
processor-id
diagnostics-slice-key
latest-run]
(let [{run-status :document-processor-run/status
run-version :document-processor-run/document-version} latest-run]
(cond
(and (nil? latest-run)
(not (contains? diagnostics diagnostics-slice-key))) :never-run
(= "running" run-status) :recomputing
(= "failed" run-status) :failed
(and (= "completed" run-status)
(= run-version version)) :current
(and (= "completed" run-status)
(< run-version version)) :stale
:else :never-run)))
diagnostic-section-badge helper for the visual pillAppend:
(defn diagnostic-section-badge
"Renders the small status pill that goes in the section header.
Returns nil for :current (no pill needed)."
[state]
(case state
:recomputing [:span.diagnostic-badge.is-recomputing "Recomputing…"]
:stale [:span.diagnostic-badge.is-stale "Stale"]
:failed [:span.diagnostic-badge.is-failed "Analysis failed"]
:never-run [:span.diagnostic-badge.is-never-run "No analysis yet"]
:current nil))
diagnostic-section-class helper for the wrapper classAppend:
(defn diagnostic-section-class
"Returns the CSS class for a diagnostic section based on its state.
Used to apply gray-out + stripe styling to the body of stale/recomputing
sections."
[state]
(case state
:recomputing "diagnostic-section is-recomputing"
:stale "diagnostic-section is-stale"
:failed "diagnostic-section is-failed"
:never-run "diagnostic-section is-never-run"
:current "diagnostic-section is-current"))
clj-kondo --lint src/com/getorcha/app/ui/components.clj
Expected: no errors.
git add src/com/getorcha/app/ui/components.clj
git commit -m "feat: add diagnostic-section state classifier and badge helpers"
Files:
Modify: resources/app/public/css/style.css
Step 1: Append diagnostic-section styles
Append to resources/app/public/css/style.css:
/* Diagnostic section state styling */
.diagnostic-section {
position: relative;
transition: opacity 0.2s ease;
}
.diagnostic-section.is-recomputing,
.diagnostic-section.is-stale {
opacity: 0.55;
}
.diagnostic-section.is-failed {
opacity: 0.85;
}
.diagnostic-badge {
display: inline-block;
font-size: 11px;
font-weight: 500;
padding: 2px 8px;
border-radius: 999px;
margin-left: 8px;
vertical-align: middle;
}
.diagnostic-badge.is-recomputing {
background: #1f3a5f;
color: #58a6ff;
border: 1px solid #2c4a73;
}
.diagnostic-badge.is-stale {
background: #3a2f1f;
color: #d4a657;
border: 1px solid #4a3a25;
}
.diagnostic-badge.is-failed {
background: #4a1f1f;
color: #f85149;
border: 1px solid #6a2a2a;
}
.diagnostic-badge.is-never-run {
background: #21262d;
color: #8b949e;
border: 1px solid #30363d;
}
git add resources/app/public/css/style.css
git commit -m "feat: add diagnostic-section state styling"
document.diagnosticsFiles:
Modify: src/com/getorcha/app/http/documents/view/invoice.clj
Modify: src/com/getorcha/app/ui/components.clj (the relevant section renderers)
Step 1: Identify each diagnostic section in the invoice view
Read src/com/getorcha/app/http/documents/view/invoice.clj and find sections that read from structured-data's validation/fraud/tax keys. They will look like:
(when (seq (:validation-results structured-data))
(validation-results-section ...))
diagnostics and latest-runs into the view functionAdd diagnostics and latest-runs to the destructured params of the view function. Where the view's outer handler is called from, fetch them:
(let [latest-runs (db.run/latest-runs-per-processor db-pool document-id)
latest-runs-by-id (group-by :document-processor-run/processor-id latest-runs)]
(invoice-view db-pool document opts
{:diagnostics (:document/diagnostics document)
:latest-runs-by-id latest-runs-by-id}))
For each diagnostic section (validations, fraud-flags, tax-issues, matching, reconciliation), change the call to pass the section state classifier output:
(let [validations (:validations diagnostics)
run (first (get latest-runs-by-id "validations"))
state (components/diagnostic-section-state document "validations" :validations run)]
(validation-results-section validations state))
Update the matching, reconciliation, fraud-flags, and tax-issues reads similarly. Each section helper must accept a state arg and use components/diagnostic-section-class and components/diagnostic-section-badge to render.
Per-line VAT validation now lives in diagnostics.line-items[<line-item-id>].vat-validation,
not inside the line item itself. To minimise churn in the existing
line-item-card / line-items-table helpers, merge it back at the boundary:
(let [vat-validations (get diagnostics :line-items {})
vat-run (first (get latest-runs-by-id "vat-validation"))
vat-state (components/diagnostic-section-state document "vat-validation" :line-items vat-run)
line-items* (mapv (fn [li]
(assoc li :vat-validation
(get-in vat-validations [(:id li) :vat-validation])))
(:line-items structured-data))]
(line-items-table line-items* {:vat-validation-state vat-state}))
Pass the vat-state to line-items-table so it can apply the gray-out badge
to the VAT column header (or wherever the section indicator sits visually).
components.clj to render stateFor each affected helper (find them by name — validation-results-section, fraud-detection-section, tax-compliance-section, etc.), modify the signature:
(defn validation-results-section
([results] (validation-results-section results :current)) ;; backward-compatible default
([results state]
[:section {:class (diagnostic-section-class state)}
[:h3 "Validation results "
(diagnostic-section-badge state)]
;; ... existing render of validation results ...
]))
clj-kondo --lint src/com/getorcha/app/ui/components.clj src/com/getorcha/app/http/documents/view/invoice.clj
Expected: no errors.
Start the dev server:
clj -M:dev
In the REPL:
(integrant.repl/go)
Open a browser to a known invoice URL (use a test document ID); confirm:
:current).If there are visual regressions (e.g. opacity applied to current sections), check diagnostic-section-class — :current should not gray out.
git add src/com/getorcha/app/http/documents/view/invoice.clj src/com/getorcha/app/ui/components.clj
git commit -m "feat: invoice view reads diagnostics from document.diagnostics with section states"
Files:
Modify: src/com/getorcha/app/http/documents/view/purchase_order.clj
Step 1: Apply the same pattern as F3
Mirror the invoice view changes: pass diagnostics + latest-runs-by-id, classify each section, pass state to section helpers.
clj-kondo --lint src/com/getorcha/app/http/documents/view/purchase_order.clj
git add src/com/getorcha/app/http/documents/view/purchase_order.clj
git commit -m "feat: purchase_order view reads diagnostics with section states"
Files:
Modify: src/com/getorcha/app/http/documents/view/goods_received_note.clj
Step 1: Apply the same pattern
Step 2: Lint
clj-kondo --lint src/com/getorcha/app/http/documents/view/goods_received_note.clj
git add src/com/getorcha/app/http/documents/view/goods_received_note.clj
git commit -m "feat: goods_received_note view reads diagnostics with section states"
Files:
Modify: src/com/getorcha/app/http/documents/view/contract.clj
Step 1: Apply the same pattern
Step 2: Lint
clj-kondo --lint src/com/getorcha/app/http/documents/view/contract.clj
git add src/com/getorcha/app/http/documents/view/contract.clj
git commit -m "feat: contract view reads diagnostics with section states"
After this phase, the detail page auto-refreshes diagnostic sections as runs transition. Existing SSE infrastructure is reused.
exec-fn to handle diagnostic-run eventsFiles:
Modify: src/com/getorcha/app/http/documents/view/shared.clj
Step 1: Read the existing exec-fn to confirm event-dispatch pattern
Read src/com/getorcha/app/http/documents/view/shared.clj lines around 1018-1098 (the SSE case statement on event-type).
case branches for :diagnostic-run-started and :diagnostic-run-completedIn the case branch, add (before the nil fallthrough):
:diagnostic-run-started
;; A processor just started recomputing; re-render the section in
;; the recomputing state.
(let [processor-id (:processor/id event)
latest-run {:document-processor-run/status "running"
:document-processor-run/processor-id processor-id
:document-processor-run/document-version (:document-version event)}
doc (db.sql/execute-one!
db-pool
{:select [:diagnostics :version]
:from [:document]
:where [:= :id document-id]})
slice-key (processor-id->slice-key processor-id)
state (components/diagnostic-section-state doc processor-id slice-key latest-run)]
{:event (str "diagnostic-run-started:" processor-id)
:data (hiccup/html (render-diagnostic-section
(get (:document/diagnostics doc) slice-key)
state
processor-id))})
:diagnostic-run-completed
;; A processor just finished; re-render the section with current data.
(let [processor-id (:processor/id event)
latest-runs (db.run/latest-runs-per-processor db-pool document-id)
latest-run (->> latest-runs
(filter #(= processor-id (:document-processor-run/processor-id %)))
first)
doc (db.sql/execute-one!
db-pool
{:select [:diagnostics :version]
:from [:document]
:where [:= :id document-id]})
slice-key (processor-id->slice-key processor-id)
state (components/diagnostic-section-state doc processor-id slice-key latest-run)]
{:event (str "diagnostic-run-completed:" processor-id)
:data (hiccup/html (render-diagnostic-section
(get (:document/diagnostics doc) slice-key)
state
processor-id))})
processor-id->slice-key and render-diagnostic-section helpersAdd to shared.clj (or a new namespace app/http/documents/view/diagnostics.clj — choose based on file size; if shared.clj is already large, factor out):
(def ^:private processor-id->slice-key
{"validations" :validations
"fraud-detector" :fraud-flags
"tax-compliance-analyzer" :tax-issues
"vat-validation" :line-items
"matching" :matching
"reconciliation" :reconciliation})
(defn ^:private render-diagnostic-section
"Renders the hiccup fragment for a single diagnostic section based on
its slice and state. Dispatches to the existing per-section helper."
[slice state processor-id]
(case processor-id
"validations" (components/validation-results-section slice state)
"fraud-detector" (components/fraud-detection-section slice state)
"tax-compliance-analyzer" (components/tax-compliance-section slice state)
"vat-validation" (components/vat-validation-section slice state)
"matching" (components/matches-section slice state)
"reconciliation" (components/reconciliation-section slice state)))
(Component helper names need to match what exists in components.clj. Check those names and adjust.)
clj-kondo --lint src/com/getorcha/app/http/documents/view/shared.clj
git add src/com/getorcha/app/http/documents/view/shared.clj
git commit -m "feat: SSE exec-fn handles diagnostic-run-started/completed events"
Files:
Modify: src/com/getorcha/app/ui/components.clj
Step 1: Add hx-ext="sse" attributes to each section's outer wrapper
For each diagnostic section helper modified in F1 (validation-results-section, fraud-detection-section, etc.), add HTMX SSE attributes so the client swaps the section on the matching event:
(defn validation-results-section
([results] (validation-results-section results :current))
([results state]
[:section {:class (diagnostic-section-class state)
:hx-ext "sse"
:sse-swap "diagnostic-run-started:validations,diagnostic-run-completed:validations"
:hx-swap "outerHTML"}
[:h3 "Validation results " (diagnostic-section-badge state)]
;; ... existing render ...
]))
Set sse-swap per processor id. Make sure the SSE-connected element is at a known location (the SSE source must be opened by an ancestor with hx-ext="sse" and sse-connect="..." — confirm by reading the existing detail-view layout).
Search for sse-connect or sse-source in the codebase to find where the EventSource is opened:
grep -rn "sse-connect\|sse-source\|hx-ext=\"sse" src/com/getorcha/app/
If no such attribute exists yet, add one to the detail-view's outermost container — pointing at the existing SSE endpoint. The endpoint already exists at shared.clj:1018 and serves document-events for the page's tenant.
clj-kondo --lint src/com/getorcha/app/ui/components.clj
git add src/com/getorcha/app/ui/components.clj
git commit -m "feat: HTMX SSE attributes on diagnostic sections for auto-refresh"
Files:
(no source changes — verification only)
Step 1: Start the dev system
clj -M:dev
(integrant.repl/reset-all)
In the REPL:
(require '[com.getorcha.db.document-processor-run :as db.run])
(def db-pool (:com.getorcha.db/pool integrant.repl.state/system))
(def document-id #uuid "<some-document-id>")
(db.run/insert-running! db-pool
{:document-id document-id
:processor-id "fraud-detector"
:trigger-kind :ingestion
:ingestion-id #uuid "<an-ingestion-id>"
:document-version 1})
Expected: in the browser, the fraud-flags section transitions to the "Recomputing…" state.
;; Get the run id you just created
(def run-id ...)
(db.run/complete-run! db-pool run-id {:result {:fraud-flags []}})
Expected: in the browser, the section transitions back to current state.
No commit — verification only.
After this phase, both production (CDK) and local (LocalStack via init_aws.clj + test fixtures) environments have the new SQS queue.
Files:
Modify: infra/stacks/foundation_stack.py
Step 1: Read the existing queue definitions
Open infra/stacks/foundation_stack.py and find the block that defines ingest_queue and email_acquire_queue (lines ~282-321 per spec).
Insert (after the email_acquire_queue block):
# Diagnostics recompute pipeline
self.diagnostics_recompute_dlq = sqs.Queue(
self,
"DiagnosticsRecomputeDlq",
queue_name="v1-orcha-global-diagnostics-recompute-dlq",
retention_period=Duration.days(14),
encryption=sqs.QueueEncryption.SQS_MANAGED,
)
self.diagnostics_recompute_queue = sqs.Queue(
self,
"DiagnosticsRecomputeQueue",
queue_name="v1-orcha-global-diagnostics-recompute",
visibility_timeout=Duration.seconds(600),
retention_period=Duration.days(7),
encryption=sqs.QueueEncryption.SQS_MANAGED,
dead_letter_queue=sqs.DeadLetterQueue(
queue=self.diagnostics_recompute_dlq,
max_receive_count=3,
),
)
Find where ingest_queue.grant_consume_messages(...) is called for the workers role and add an analogous grant for the new queue. Search for ingest_queue.grant_consume_messages:
grep -rn "ingest_queue.grant" infra/
Replicate the pattern for diagnostics_recompute_queue.
git add infra/stacks/foundation_stack.py
git commit -m "infra: add diagnostics-recompute SQS queue + DLQ to CDK"
Files:
Modify: scripts/init_aws.clj
Step 1: Add the queue config var
In scripts/init_aws.clj near line 104-106 (where sqs-ingestion-queue etc. are defined), append:
(def sqs-diagnostics-recompute-queue (get-in config [:com.getorcha/aws :queues :diagnostics-recompute]))
In the setup-sqs! function around line 236, append:
(create-queue-with-dlq! sqs-diagnostics-recompute-queue))
(Replace the closing )) of the existing function with the call followed by the original closing.)
Find the printlns at lines ~343 and ~360-362 and add the new queue:
(println " SQS Queues:" sqs-ingestion-queue "," sqs-acquisition-queue ","
sqs-matching-queue "," sqs-diagnostics-recompute-queue)
;; ...
(println "Diagnostics-recompute queue (from config.edn):" sqs-diagnostics-recompute-queue)
git add scripts/init_aws.clj
git commit -m "infra: add diagnostics-recompute queue to local init script"
Files:
Modify: resources/com/getorcha/config.edn
Step 1: Read the config to find the queues map
Find the :com.getorcha/aws config entry's :queues map.
Add:
:diagnostics-recompute "v1-orcha-global-diagnostics-recompute"
(or whatever name format the config uses — match the existing :ingestion, :acquisition, :matching entries).
git add resources/com/getorcha/config.edn
git commit -m "config: add diagnostics-recompute queue entry"
Files:
Modify: test/com/getorcha/test/fixtures.clj
Step 1: Find where existing queues are created at test startup
Search the file for CreateQueueRequest or queue setup logic.
Add the diagnostics-recompute queue (with its DLQ) to whichever startup helper creates the existing queues. Follow the existing pattern for queue creation in tests; use the LocalStack endpoint from the test system.
clj -X:test:silent :nses '[com.getorcha.test.fixtures-test]' 2>&1 | grep -E "FAIL|ERROR|Ran"
If there's no fixtures-test namespace, run a small smoke test instead:
clj -X:test:silent :nses '[com.getorcha.db-test]' 2>&1 | grep -E "FAIL|ERROR|Ran"
Expected: no regressions.
git add test/com/getorcha/test/fixtures.clj
git commit -m "test: create diagnostics-recompute queue at test startup"
After this phase, edit handlers enqueue SQS messages, and a workers-service consumer debounces (idempotency check) and dispatches diagnostic processors.
Files:
Create: src/com/getorcha/workers/diagnostics_recompute.clj
Step 1: Create the file with the consumer entry-point
Write to src/com/getorcha/workers/diagnostics_recompute.clj:
(ns com.getorcha.workers.diagnostics-recompute
"SQS consumer for edit-triggered diagnostics recompute. Each message
represents a debounced 'rerun diagnostics for document X at version Y'
request. Idempotent: if a newer edit exists, skip."
(:require [cheshire.core :as json]
[clojure.tools.logging :as log]
[com.getorcha.db.sql :as db.sql]
[com.getorcha.workers.diagnostics-recompute.orchestrator :as orchestrator]))
(defn ^:private should-run?
"Returns true when no edit newer than the message's enqueued-at exists.
When a newer edit exists, a later message will fire — skip this one."
[db-pool {:keys [document-id enqueued-at]}]
(let [newest (db.sql/execute-one!
db-pool
{:select [:created-at]
:from [:document-history]
:where [:and
[:= :document-id document-id]
[:= :change-type [:cast "edit" :document-history-change-type]]]
:order-by [[:created-at :desc]]
:limit 1})]
(or (nil? newest)
(not (pos? (compare (:document-history/created-at newest) enqueued-at))))))
(defn handle-message
"SQS message handler. Idempotent — drops messages superseded by newer edits."
[{:keys [db-pool] :as ctx} message-body]
(let [{:keys [document-id history-id document-version enqueued-at]
:as message} (json/parse-string message-body true)]
(cond
(not (should-run? db-pool message))
(do (log/info "Skipping superseded diagnostics-recompute message"
{:document-id document-id :enqueued-at enqueued-at})
:skipped)
:else
(do (log/info "Running diagnostics recompute"
{:document-id document-id :version document-version})
(orchestrator/recompute-all! ctx
{:document-id (parse-uuid document-id)
:history-id (parse-uuid history-id)
:document-version document-version})
:ok))))
clj-kondo --lint src/com/getorcha/workers/diagnostics_recompute.clj
Expected: clj-kondo will warn that orchestrator is unresolved — that's fine; we create it next.
git add src/com/getorcha/workers/diagnostics_recompute.clj
git commit -m "feat: add diagnostics-recompute SQS consumer skeleton"
Files:
Create: src/com/getorcha/workers/diagnostics_recompute/orchestrator.clj
Step 1: Write the orchestrator with phase ordering
Write to src/com/getorcha/workers/diagnostics_recompute/orchestrator.clj:
(ns com.getorcha.workers.diagnostics-recompute.orchestrator
"Phase-ordered dispatch of diagnostic processors for edit-triggered
recomputes. Phase 1 processors run in parallel; phase 2 processors
wait for phase 1 to commit (they consume phase-1 outputs)."
(:require [clojure.core.async :as a]
[com.getorcha.db.document-processor-run :as db.run]
[com.getorcha.db.document-diagnostics :as db.diagnostics]
[com.getorcha.workers.ap.ingestion.validation :as validation]
[com.getorcha.workers.ap.ingestion.post-process.fraud :as fraud]
[com.getorcha.workers.ap.ingestion.post-process.tax-compliance :as tax-compliance]
[com.getorcha.workers.ap.matching.worker :as matching]
[com.getorcha.workers.ap.matching.reconciliation :as reconciliation]))
(def ^:private phase-1-processors
"Diagnostic processors that have no inter-processor dependencies."
[{:processor-id "validations"
:run-fn validation/run-validations}
{:processor-id "tax-compliance-analyzer"
:run-fn tax-compliance/run-tax-compliance}
{:processor-id "vat-validation"
:run-fn validation/run-vat-validation}
{:processor-id "matching"
:run-fn matching/run-matching}])
(def ^:private phase-2-processors
"Diagnostic processors that need phase-1 outputs (validations + matching)."
[{:processor-id "fraud-detector"
:run-fn fraud/run-fraud-detection}
{:processor-id "reconciliation"
:run-fn reconciliation/run-reconciliation}])
(defn ^:private slice-key-for-processor
[processor-id]
(case processor-id
"validations" :validations
"tax-compliance-analyzer" :tax-issues
"vat-validation" :line-items
"matching" :matching
"fraud-detector" :fraud-flags
"reconciliation" :reconciliation))
(defn ^:private run-one!
"Runs a single processor with two-phase row writes and slice merge.
Returns the run's :result for downstream phases."
[{:keys [db-pool] :as ctx} {:keys [document-id history-id document-version]} {:keys [processor-id run-fn]}]
(let [run-id (db.run/insert-running! db-pool
{:document-id document-id
:processor-id processor-id
:trigger-kind :edit
:triggered-by-history-id history-id
:document-version document-version})]
(try
(let [{:keys [result stats]} (run-fn ctx document-id document-version)]
(db.diagnostics/update-slice! db-pool document-id
(slice-key-for-processor processor-id)
result
document-version)
(db.run/complete-run! db-pool run-id (assoc stats :result result))
result)
(catch Throwable t
(db.run/fail-run! db-pool run-id (.getMessage t))
nil))))
(defn ^:private run-phase!
"Runs every processor in the phase concurrently. Returns when all complete.
Failures within a phase don't block other processors in the same phase."
[ctx params processors]
(let [chans (mapv (fn [p] (a/thread (run-one! ctx params p))) processors)]
(mapv a/<!! chans)))
(defn recompute-all!
"Top-level entry-point: runs every diagnostic processor in phase order."
[ctx params]
(run-phase! ctx params phase-1-processors)
(run-phase! ctx params phase-2-processors)
:done)
run-* functionFor each phase-1/phase-2 entry above, confirm the referenced function exists and has the signature (fn [ctx document-id document-version] -> {:result <output> :stats <map>}). If not, add a thin adapter in the processor namespace that calls the existing implementation.
For example, in src/com/getorcha/workers/ap/ingestion/post_process/fraud.clj, add:
(defn run-fraud-detection
"Edit-time entry point. Loads document state and runs the existing
fraud-detection logic."
[{:keys [db-pool] :as _ctx} document-id _document-version]
(let [doc (load-document-with-diagnostics db-pool document-id)
{:keys [fraud-flags stats]} (detect-fraud doc)]
{:result fraud-flags :stats stats}))
Repeat for validation/run-validations, validation/run-vat-validation,
tax-compliance/run-tax-compliance, matching/run-matching,
reconciliation/run-reconciliation.
clj-kondo --lint src/com/getorcha/workers/diagnostics_recompute/orchestrator.clj
Expected: no errors.
git add src/com/getorcha/workers/diagnostics_recompute/orchestrator.clj src/com/getorcha/workers/ap/ingestion/post_process/fraud.clj src/com/getorcha/workers/ap/ingestion/validation.clj src/com/getorcha/workers/ap/ingestion/post_process/tax_compliance.clj src/com/getorcha/workers/ap/matching/worker.clj src/com/getorcha/workers/ap/matching/reconciliation.clj
git commit -m "feat: orchestrator dispatches diagnostic processors with phase ordering"
Files:
Modify: src/com/getorcha/app/http/documents/edits.clj (the namespace introduced by edit-history; confirm by reading its file)
Step 1: Read the edit handler to find the per-edit commit point
Read the edit handler namespace introduced by edit-history. Find the function where the edit transaction commits.
enqueue-recompute! call at the post-commit boundaryAdd:
(defn ^:private enqueue-recompute!
"Sends a delayed SQS message to trigger debounced diagnostics recompute."
[{:keys [sqs-client diagnostics-queue-url]} {:keys [document-id history-id document-version]}]
(let [body (json/generate-string
{:document-id (str document-id)
:history-id (str history-id)
:document-version document-version
:enqueued-at (str (java.time.Instant/now))})]
(.sendMessage sqs-client
(-> (software.amazon.awssdk.services.sqs.model.SendMessageRequest/builder)
(.queueUrl diagnostics-queue-url)
(.messageBody body)
(.delaySeconds (int 60))
.build))))
After the edit transaction commits, call this with the new history row's ID + the document's new version.
In the Integrant config (system.clj), the edit handler component already exists from the edit-history plan. Add the new dependencies:
{:com.getorcha.app.http.documents/edit-handler
{:db-pool (ig/ref ::db/pool)
:sqs-client (ig/ref [:com.getorcha/sqs :client])
:diagnostics-queue-url (ig/ref [:com.getorcha/aws :queue-urls :diagnostics-recompute])}}
(Adjust to the existing config's shape — the project uses aero profiles + integrant; mirror the existing matching-queue wiring.)
clj-kondo --lint src/com/getorcha/app/http/documents/edits.clj src/com/getorcha/system.clj
Expected: no errors.
git add src/com/getorcha/app/http/documents/edits.clj src/com/getorcha/system.clj
git commit -m "feat: edit handler enqueues debounced diagnostics-recompute message"
Files:
Modify: src/com/getorcha/system.clj
Step 1: Locate the existing SQS consumer registrations
Find where the ingestion + matching + acquisition consumers are registered as Integrant components.
Add an Integrant component init key (e.g. :com.getorcha.workers.diagnostics-recompute/consumer) that:
db-pool, sqs-client, queue URL, and any other ctx its handle-message needs.diagnostics-recompute/handle-message per received message.Use the existing matching consumer as the template — the shape is the same.
clj-kondo --lint src/com/getorcha/system.clj
clj -M:dev
In the REPL:
(integrant.repl/reset-all)
Expected: system starts without error; no missing-component messages in logs.
git add src/com/getorcha/system.clj
git commit -m "feat: register diagnostics-recompute consumer in workers system"
Files:
Create: test/com/getorcha/workers/diagnostics_recompute_test.clj
Step 1: Write a failing test for should-run?
Write to test/com/getorcha/workers/diagnostics_recompute_test.clj:
(ns com.getorcha.workers.diagnostics-recompute-test
(:require [clojure.test :refer [deftest is testing use-fixtures]]
[com.getorcha.db.sql :as db.sql]
[com.getorcha.test.fixtures :as fixtures]
[com.getorcha.workers.diagnostics-recompute :as recompute]))
(use-fixtures :once fixtures/with-system)
(use-fixtures :each fixtures/with-rolled-back-transaction)
(defn ^:private seed-doc-with-edit!
"Seeds a document and a single edit history row at `edit-time`. Returns
{:document-id ... :history-id ...}."
[edit-time]
;; Use the helpers from C2 / fixtures
...)
(deftest should-run-when-no-newer-edit-test
(testing "should-run? returns true when no edit newer than enqueued-at exists"
(let [edit-time (java.time.Instant/parse "2026-04-13T10:00:00Z")
{:keys [document-id history-id]} (seed-doc-with-edit! edit-time)]
(is (true?
(#'recompute/should-run?
fixtures/*db*
{:document-id (str document-id)
:enqueued-at (.plusSeconds edit-time 1)}))))))
(deftest should-skip-when-newer-edit-exists-test
(testing "should-run? returns false when an edit newer than enqueued-at exists"
(let [edit-time (java.time.Instant/parse "2026-04-13T10:05:00Z")
{:keys [document-id]} (seed-doc-with-edit! edit-time)]
(is (false?
(#'recompute/should-run?
fixtures/*db*
{:document-id (str document-id)
:enqueued-at (java.time.Instant/parse "2026-04-13T10:00:00Z")}))))))
seed-doc-with-edit! using the same helpers as Task C2Adapt the seed-document! helper from C2 to also insert a document_history row with change_type='edit' at the given edit-time.
clj -X:test:silent :nses '[com.getorcha.workers.diagnostics-recompute-test]' 2>&1 | grep -E "FAIL|ERROR|Ran"
Expected: PASS.
git add test/com/getorcha/workers/diagnostics_recompute_test.clj
git commit -m "test: diagnostics-recompute idempotency check"
After this phase, no application code reads document.matching_status, document.matching_error, document.matching_attempts, document.matching_failed_at, document.reconciliation_status, or the validation-results / fraud-flags / tax-issues keys inside structured_data. The columns and JSONB keys still exist physically (their cleanup is gated in PENDING-CLEANUPS.md); the migration we wrote already strips the JSONB keys but leaves the columns.
document.matching_statusFiles:
(variable — depends on grep results)
Step 1: Find every reader
grep -rn "matching-status\|matching_status\|:document/matching-status" src test dev scripts
Expected: a list of reader sites in the app, the SSE handler (already replaced in G1), tests, and possibly admin queries.
Replace patterns like:
(:document/matching-status doc)
with:
(let [latest (->> (db.run/latest-runs-per-processor db-pool (:document/id doc))
(filter #(= "matching" (:document-processor-run/processor-id %)))
first)]
(case (:document-processor-run/status latest)
"completed" "succeeded"
"running" "in-progress"
"failed" "failed"
"pending" "pending"
nil))
Or, where the consumer needs only "is matching done?", inline the check.
clj-kondo --lint src test dev
Expected: no errors.
clj -X:test:silent 2>&1 | grep -E "(FAIL in|ERROR in|Execution error|Ran .* tests)"
Expected: 0 failures.
git add -A
git commit -m "refactor: read matching status from document_processor_run"
document.reconciliation_statusFiles:
(variable)
Step 1: Find every reader
grep -rn "reconciliation-status\|reconciliation_status\|:document/reconciliation-status" src test dev scripts
(:reconciliation document.diagnostics)Pattern:
(get-in doc [:document/diagnostics :reconciliation :status])
clj-kondo --lint src test dev
clj -X:test:silent 2>&1 | grep -E "(FAIL in|ERROR in|Execution error|Ran .* tests)"
Expected: clean lint, 0 test failures.
git add -A
git commit -m "refactor: read reconciliation status from document.diagnostics"
structured_data.{validation-results, fraud-flags, tax-issues}Files:
(variable)
Step 1: Find every reader
grep -rn ":validation-results\|:fraud-flags\|:tax-issues\|\"validation-results\"\|\"fraud-flags\"\|\"tax-issues\"" src test dev scripts
Expected hits: validation processors (writes — keep), tax processors (writes — keep), fraud processors (writes — keep), AND any reader still reaching into structured-data for these keys (replace with :document/diagnostics lookup).
structured-data, switch to diagnosticsPattern:
;; Before
(:validation-results (:document/structured-data doc))
;; After
(get-in doc [:document/diagnostics :validations])
clj-kondo --lint src test dev
clj -X:test:silent 2>&1 | grep -E "(FAIL in|ERROR in|Execution error|Ran .* tests)"
git add -A
git commit -m "refactor: read diagnostic outputs from document.diagnostics"
After this phase, dev tooling and Claude skills know about the new locations.
debug-doc skillFiles:
Modify: .claude/skills/debug-doc/SKILL.md
Step 1: Read the current skill content
Read .claude/skills/debug-doc/SKILL.md. Identify any SQL or query references to ap_ingestion.structured_data, ap_ingestion_post_process_stat, document.matching_status, or document.reconciliation_status.
Step 2: Replace each reference with the new location
ap_ingestion.structured_data → document_history join (already updated by edit-history) AND document.diagnostics for derived outputs.
ap_ingestion_post_process_stat → document_processor_run (filter WHERE document_id = X AND trigger_kind = 'ingestion').
document.matching_status → document_processor_run filter WHERE processor_id = 'matching'.
document.reconciliation_status → document.diagnostics ->> '$.reconciliation.status'.
Step 3: Commit
git add .claude/skills/debug-doc/SKILL.md
git commit -m "docs: update debug-doc skill for diagnostics + processor_run table"
ingestion-regression-test skillFiles:
Modify: .claude/skills/ingestion-regression-test/inspector-prompt.md
Step 1: Read the current prompt
Read .claude/skills/ingestion-regression-test/inspector-prompt.md.
validation-results, fraud-flags, tax-issues inside structured_dataThese keys are now under document.diagnostics. Update the prompt's "what to compare" section accordingly.
git add .claude/skills/ingestion-regression-test/inspector-prompt.md
git commit -m "docs: update ingestion-regression-test skill for diagnostics location"
debug_fetch_document.cljFiles:
Modify: scripts/debug_fetch_document.clj
Step 1: Read the script
Read scripts/debug_fetch_document.clj. It fetches a document + its ingestions + history rows from prod into local for debugging.
document_processor_run rows for the documentAppend the same row-copy pattern that already exists for document_history in this script:
(defn ^:private copy-processor-runs!
[src-pool dest-pool document-id]
(let [rows (db.sql/execute! src-pool
{:select [:*]
:from [:document-processor-run]
:where [:= :document-id document-id]})]
(doseq [row rows]
(db.sql/execute! dest-pool
{:insert-into [:document-processor-run]
:values [(dissoc row :document-processor-run/id)]}))))
Call it from the main fetch flow alongside the document-history copy.
clj-kondo --lint scripts/debug_fetch_document.clj
git add scripts/debug_fetch_document.clj
git commit -m "tooling: copy document_processor_run rows in debug_fetch_document"
debug_common.clj jsonb-keysFiles:
Modify: scripts/debug_common.clj
Step 1: Read the file
Read scripts/debug_common.clj. Look for the *-jsonb-keys set definitions (e.g. ingestion-jsonb-keys).
Add :diagnostics to document-jsonb-keys (or wherever document columns are listed) and create a new set for the new table:
(def document-processor-run-jsonb-keys
"JSONB columns on document_processor_run that need round-trip coercion."
#{:result})
Wire this into whatever fetch helper handles round-tripping for the table.
git add scripts/debug_common.clj
git commit -m "tooling: register document.diagnostics and document_processor_run.result jsonb columns"
Files:
(no source changes — verification only)
Step 1: Run clj-kondo across the whole project
clj-kondo --lint src test dev
Expected: no problems reported.
clj -X:test:silent 2>&1 | grep -E "(FAIL in|ERROR in|Execution error|failed because|Ran .* tests)"
Expected: all green; report Ran N tests with 0 failures.
In a dev REPL:
(integrant.repl/reset-all)
Manually:
SELECT processor_id, status, document_version, started_at, ended_at
FROM document_processor_run
WHERE document_id = '<your-doc-id>'
AND trigger_kind = 'edit'
ORDER BY started_at;
Expected: rows for each diagnostic processor with status='completed' and document_version equal to the post-edit document.version.After working through every task above:
clj -X:dev :ns 'com.getorcha.db.migrations' :fn 'migrate!' runs without errors against a fresh database.document.diagnostics is populated for newly ingested docs AND the moved keys are absent from document.structured_data.matching_status, matching_error, matching_attempts, matching_failed_at, reconciliation_status, or the moved structured_data keys remains in src/, test/, dev/, or scripts/. Confirm with the grep commands from Phase J.ap_ingestion_post_process_stat continues to be written alongside the new table for now (legacy backstop). Removal is queued in PENDING-CLEANUPS.md.trigger_document_matching_event no longer exists; trigger_processor_run_event fires SSE events for any processor run transition.ap_ingestion_post_process_stat, the four matching columns, the matching_status ENUM, reconciliation_status, and the moved structured_data keys.If any item fails verification, return to the relevant task and complete the missing piece.