Note (2026-04-24): After this document was written, legal_entity was renamed to tenant and the old tenant was renamed to organization. Read references to these terms with the pre-rename meaning.

Document Diagnostics Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Introduce document.diagnostics (materialized JSONB snapshot of derived outputs) and a unified document_processor_run audit table; add a workers-service-hosted, throttled (60s debounce) recompute pipeline triggered on edits; render per-section stale/recomputing UI states with auto-update via the existing SSE stream.

Architecture: Move validation-results, fraud-flags, tax-issues, and per-line-item vat-validation out of document.structured_data into document.diagnostics; replace ap_ingestion_post_process_stat and the per-document matching/reconciliation status columns with a single document_processor_run table; add a new SQS queue + consumer that debounces recomputes 60s after the last edit using an idempotent handler. Detail view classifies each diagnostic section as current/stale/recomputing/failed/never-run from the latest run row, reuses the existing document-events SSE stream with two new event types (diagnostic-run-started, diagnostic-run-completed), and re-renders sections in place as runs complete.

Tech Stack: Clojure, PostgreSQL (JSONB + ENUMs + triggers + pg_notify), Migratus migrations, AWS SQS (production via CDK; LocalStack via Testcontainers in tests), HTMX SSE, core.async pub/sub, Integrant, Malli schemas, clojure.test, embedded-postgres for tests.

Reference spec: docs/superpowers/specs/2026-04-13-document-diagnostics-design.md

Depends on: Edit-history plan — must be merged first. This plan assumes document.version, document_history, the app-level ingestion-completion handler, and resources/migrations/PENDING-CLEANUPS.md already exist.


File Map

Created

Modified


Phase A: Database foundation

Lays the schema. After this phase, the DB has the new table, the new column, and backfilled data — but no Clojure code reads or writes either yet.

Task A1: Create the up migration — types, table, indexes, column

Files:

Write to resources/migrations/20260413120000-add-document-diagnostics.up.sql:

-- New ENUM types for unified processor run tracking
CREATE TYPE processor_run_status  AS ENUM ('pending', 'running', 'completed', 'failed');
--;;
CREATE TYPE processor_run_trigger AS ENUM ('ingestion', 'edit', 'manual');
--;;

-- Unified per-processor run table; replaces ap_ingestion_post_process_stat
-- and absorbs per-document matching/reconciliation status tracking.
CREATE TABLE document_processor_run (
    id                      UUID PRIMARY KEY DEFAULT uuidv7(),
    document_id             UUID NOT NULL REFERENCES document(id) ON DELETE CASCADE,
    processor_id            TEXT NOT NULL,
    trigger_kind            processor_run_trigger NOT NULL,
    ingestion_id            UUID REFERENCES ap_ingestion(id) ON DELETE CASCADE,
    triggered_by_history_id UUID REFERENCES document_history(id),
    document_version        INT  NOT NULL,
    started_at              TIMESTAMPTZ NOT NULL DEFAULT now(),
    ended_at                TIMESTAMPTZ,
    input_tokens            INTEGER,
    output_tokens           INTEGER,
    model                   TEXT,
    commit_sha              TEXT,
    status                  processor_run_status NOT NULL DEFAULT 'running',
    result                  JSONB,
    error                   TEXT,

    CONSTRAINT doc_processor_run_trigger_xor CHECK (
        (trigger_kind = 'ingestion' AND ingestion_id IS NOT NULL AND triggered_by_history_id IS NULL)
        OR (trigger_kind = 'edit'   AND ingestion_id IS NULL     AND triggered_by_history_id IS NOT NULL)
        OR (trigger_kind = 'manual' AND ingestion_id IS NULL     AND triggered_by_history_id IS NULL)
    )
);
--;;

CREATE INDEX idx_doc_processor_run_doc_proc_version
    ON document_processor_run(document_id, processor_id, document_version DESC);
--;;

CREATE INDEX idx_doc_processor_run_ingestion
    ON document_processor_run(ingestion_id) WHERE ingestion_id IS NOT NULL;
--;;

-- Materialized snapshot of all diagnostic outputs (validations, fraud, tax,
-- per-line VAT, matching summary, reconciliation summary).
ALTER TABLE document ADD COLUMN diagnostics JSONB;
--;;
git add resources/migrations/20260413120000-add-document-diagnostics.up.sql
git commit -m "feat: add document_processor_run table and document.diagnostics column"

Task A2: Create the down migration

Files:

Write to resources/migrations/20260413120000-add-document-diagnostics.down.sql:

ALTER TABLE document DROP COLUMN IF EXISTS diagnostics;
--;;
DROP INDEX IF EXISTS idx_doc_processor_run_ingestion;
--;;
DROP INDEX IF EXISTS idx_doc_processor_run_doc_proc_version;
--;;
DROP TABLE IF EXISTS document_processor_run;
--;;
DROP TYPE IF EXISTS processor_run_trigger;
--;;
DROP TYPE IF EXISTS processor_run_status;
--;;
git add resources/migrations/20260413120000-add-document-diagnostics.down.sql
git commit -m "feat: down migration for document diagnostics schema"

Task A3: Add backfill — copy from ap_ingestion_post_process_stat

Files:

Append to the end of resources/migrations/20260413120000-add-document-diagnostics.up.sql:

-- Backfill: copy every legacy post-process stat row into the new run table.
-- These rows are LLM-driven processors run during ingestion. We have no
-- per-row result snapshot for them; result stays NULL.
INSERT INTO document_processor_run
    (document_id, processor_id, trigger_kind, ingestion_id, document_version,
     started_at, ended_at, input_tokens, output_tokens, model, status, commit_sha)
SELECT i.document_id,
       s.processor_id,
       'ingestion'::processor_run_trigger,
       s.ingestion_id,
       1,
       s.started_at,
       s.ended_at,
       s.input_tokens,
       s.output_tokens,
       s.model,
       'completed'::processor_run_status,
       i.commit_sha
  FROM ap_ingestion_post_process_stat s
  JOIN ap_ingestion i ON i.id = s.ingestion_id
 WHERE i.document_id IS NOT NULL;
--;;
git add resources/migrations/20260413120000-add-document-diagnostics.up.sql
git commit -m "feat: backfill document_processor_run from ap_ingestion_post_process_stat"

Task A4: Add backfill — matching state per document

Files:

Append to resources/migrations/20260413120000-add-document-diagnostics.up.sql:

-- Backfill: one matching-run row per document with non-null matching_status.
-- This loses per-attempt fidelity (matching_attempts > 1 docs collapse to a
-- single "latest state" row), but preserves the audit signal.
INSERT INTO document_processor_run
    (document_id, processor_id, trigger_kind, ingestion_id, document_version,
     started_at, ended_at, status, error)
SELECT d.id,
       'matching',
       'ingestion'::processor_run_trigger,
       (SELECT id FROM ap_ingestion
         WHERE document_id = d.id
         ORDER BY completed_at DESC NULLS LAST LIMIT 1),
       1,
       COALESCE(d.matching_failed_at, d.updated_at),
       CASE WHEN d.matching_status IN ('succeeded','failed','skipped')
            THEN COALESCE(d.matching_failed_at, d.updated_at) END,
       (CASE d.matching_status
             WHEN 'pending'     THEN 'pending'
             WHEN 'in-progress' THEN 'running'
             WHEN 'succeeded'   THEN 'completed'
             WHEN 'failed'      THEN 'failed'
             WHEN 'skipped'     THEN 'completed'
        END)::processor_run_status,
       d.matching_error
  FROM document d
 WHERE d.matching_status IS NOT NULL;
--;;
git add resources/migrations/20260413120000-add-document-diagnostics.up.sql
git commit -m "feat: backfill matching runs from document.matching_status columns"

Task A5: Add backfill — reconciliation state per document

Files:

Append to resources/migrations/20260413120000-add-document-diagnostics.up.sql:

-- Backfill: one reconciliation-run row per document with non-null reconciliation_status.
INSERT INTO document_processor_run
    (document_id, processor_id, trigger_kind, ingestion_id, document_version,
     started_at, ended_at, status, result)
SELECT d.id,
       'reconciliation',
       'ingestion'::processor_run_trigger,
       (SELECT id FROM ap_ingestion
         WHERE document_id = d.id
         ORDER BY completed_at DESC NULLS LAST LIMIT 1),
       1,
       d.updated_at,
       d.updated_at,
       'completed'::processor_run_status,
       jsonb_build_object('status', d.reconciliation_status)
  FROM document d
 WHERE d.reconciliation_status IS NOT NULL;
--;;
git add resources/migrations/20260413120000-add-document-diagnostics.up.sql
git commit -m "feat: backfill reconciliation runs from document.reconciliation_status"

Task A6: Add backfill — synthetic validations rows

Files:

Append to resources/migrations/20260413120000-add-document-diagnostics.up.sql:

-- Backfill: one synthetic 'validations' run per document with validation-results.
-- Validations were previously not recorded as a discrete run anywhere;
-- this gives every existing doc a baseline audit row.
INSERT INTO document_processor_run
    (document_id, processor_id, trigger_kind, ingestion_id, document_version,
     started_at, ended_at, status, result)
SELECT d.id,
       'validations',
       'ingestion'::processor_run_trigger,
       (SELECT id FROM ap_ingestion
         WHERE document_id = d.id
         ORDER BY completed_at DESC NULLS LAST LIMIT 1),
       1,
       d.updated_at,
       d.updated_at,
       'completed'::processor_run_status,
       d.structured_data -> 'validation-results'
  FROM document d
 WHERE d.structured_data ? 'validation-results';
--;;
git add resources/migrations/20260413120000-add-document-diagnostics.up.sql
git commit -m "feat: backfill synthetic validations runs"

Task A7: Seed document.diagnostics from structured_data

Files:

Append to resources/migrations/20260413120000-add-document-diagnostics.up.sql:

-- Seed document.diagnostics from existing structured_data keys.
-- jsonb_strip_nulls trims absent slices so we don't carry NULL keys.
UPDATE document SET diagnostics = jsonb_strip_nulls(jsonb_build_object(
    'validations',    structured_data -> 'validation-results',
    'fraud-flags',    structured_data -> 'fraud-flags',
    'tax-issues',     structured_data -> 'tax-issues',
    'line-items',     (SELECT jsonb_object_agg(li ->> 'id',
                                               jsonb_build_object('vat-validation',
                                                                  li -> 'vat-validation'))
                         FROM jsonb_array_elements(structured_data -> 'line-items') li
                        WHERE li ? 'vat-validation'),
    'reconciliation', CASE WHEN reconciliation_status IS NOT NULL
                           THEN jsonb_build_object('status', reconciliation_status)
                      END
))
WHERE structured_data IS NOT NULL;
--;;
git add resources/migrations/20260413120000-add-document-diagnostics.up.sql
git commit -m "feat: seed document.diagnostics from structured_data and matching/reconciliation columns"

Task A8: Strip moved keys from structured_data

Files:

Append to resources/migrations/20260413120000-add-document-diagnostics.up.sql:

-- Strip moved keys from structured_data.
-- 'missing-fields' is removed entirely (dead LLM metadata; see spec §8).
UPDATE document SET
    structured_data = structured_data
                    - 'validation-results'
                    - 'fraud-flags'
                    - 'tax-issues'
                    - 'missing-fields'
WHERE structured_data IS NOT NULL;
--;;

Append:

-- Strip vat-validation from each line item; preserve order using the :order
-- attribute introduced by the edit-history migration.
UPDATE document SET structured_data = jsonb_set(
    structured_data, '{line-items}',
    COALESCE(
        (SELECT jsonb_agg(li - 'vat-validation' ORDER BY (li ->> 'order')::int)
           FROM jsonb_array_elements(structured_data -> 'line-items') li),
        '[]'::jsonb))
WHERE structured_data ? 'line-items'
  AND jsonb_typeof(structured_data -> 'line-items') = 'array';
--;;
git add resources/migrations/20260413120000-add-document-diagnostics.up.sql
git commit -m "feat: strip moved diagnostic keys from structured_data"

Task A9: Replace the matching pg_notify trigger

Files:

Append to resources/migrations/20260413120000-add-document-diagnostics.up.sql:

-- Drop the old matching-status notify trigger; the column is going away.
DROP TRIGGER IF EXISTS trigger_document_matching_event ON document;
--;;
DROP FUNCTION IF EXISTS notify_matching_event();
--;;

-- New trigger fires on every processor run row insert/status update,
-- emitting diagnostic-run-started / diagnostic-run-completed events.
CREATE OR REPLACE FUNCTION notify_processor_run_event()
RETURNS TRIGGER AS $$
DECLARE
    payload         jsonb;
    le_id           uuid;
    le_tenant_id    uuid;
BEGIN
    IF NEW.status IS NULL OR NEW.status NOT IN ('running', 'completed', 'failed') THEN
        RETURN NEW;
    END IF;

    SELECT d.legal_entity_id, le.tenant_id
      INTO le_id, le_tenant_id
      FROM document d
      JOIN legal_entity le ON le.id = d.legal_entity_id
     WHERE d.id = NEW.document_id;

    payload := jsonb_build_object(
        'event/type',       CASE WHEN NEW.status = 'running'
                                 THEN 'diagnostic-run-started'
                                 ELSE 'diagnostic-run-completed' END,
        'document/id',      NEW.document_id::text,
        'processor/id',     NEW.processor_id,
        'document-version', NEW.document_version,
        'run-status',       NEW.status::text,
        'legal-entity/id',  le_id::text,
        'tenant/id',        le_tenant_id::text
    );
    PERFORM pg_notify('document_events', payload::text);
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;
--;;

CREATE TRIGGER trigger_processor_run_event
    AFTER INSERT OR UPDATE OF status ON document_processor_run
    FOR EACH ROW
    WHEN (NEW.status IN ('running', 'completed', 'failed'))
    EXECUTE FUNCTION notify_processor_run_event();
--;;

Prepend (so the down recreates the old trigger before dropping the new) to resources/migrations/20260413120000-add-document-diagnostics.down.sql:

DROP TRIGGER IF EXISTS trigger_processor_run_event ON document_processor_run;
--;;
DROP FUNCTION IF EXISTS notify_processor_run_event();
--;;

CREATE OR REPLACE FUNCTION notify_matching_event()
RETURNS TRIGGER AS $$
DECLARE
    payload      jsonb;
    le_tenant_id uuid;
BEGIN
    IF NEW.matching_status NOT IN ('succeeded', 'failed') THEN
        RETURN NEW;
    END IF;
    SELECT le.tenant_id INTO le_tenant_id
      FROM legal_entity le WHERE le.id = NEW.legal_entity_id;
    payload := jsonb_build_object(
        'event/type', 'matching',
        'document/id', NEW.id::text,
        'document/matching-status', NEW.matching_status::text,
        'legal-entity/id', NEW.legal_entity_id::text,
        'tenant/id', le_tenant_id::text,
        'old-status', CASE WHEN OLD.matching_status IS NULL
                           THEN NULL ELSE OLD.matching_status::text END
    );
    PERFORM pg_notify('document_events', payload::text);
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;
--;;

CREATE TRIGGER trigger_document_matching_event
    AFTER UPDATE OF matching_status ON document
    FOR EACH ROW
    WHEN (OLD.matching_status IS DISTINCT FROM NEW.matching_status)
    EXECUTE FUNCTION notify_matching_event();
--;;
git add resources/migrations/20260413120000-add-document-diagnostics.up.sql resources/migrations/20260413120000-add-document-diagnostics.down.sql
git commit -m "feat: replace matching pg_notify with processor_run_event trigger"

Task A10: Append PENDING-CLEANUPS.md entries

Files:

Append to resources/migrations/PENDING-CLEANUPS.md:


## `ap_ingestion_post_process_stat` (entire table)

- **Replaced by:** `document_processor_run` (unified per-processor run history,
  any trigger kind).
- **Stopped being written:** <DATE>, when the diagnostics recompute pipeline
  shipped and post-process handlers started writing to `document_processor_run`.
- **Gate to drop:** backfill verified, no open queries against the old table.

## `document.matching_status`, `matching_error`, `matching_attempts`, `matching_failed_at`

- **Replaced by:** `document_processor_run` rows where `processor_id = 'matching'`.
  Latest status via `DISTINCT ON (processor_id) ... ORDER BY document_version DESC`;
  attempts via `COUNT(*)`.
- **Stopped being written:** <DATE>.
- **Gate to drop:** all readers migrated to the new table; the pg_notify
  trigger replaced by `trigger_processor_run_event`.

## `matching_status` ENUM type

- **Replaced by:** `processor_run_status` ENUM.
- **Gate to drop:** after the columns above are dropped.

## `document.reconciliation_status`

- **Replaced by:** `document.diagnostics.reconciliation.status` (materialized)
  and `document_processor_run` rows where `processor_id = 'reconciliation'`.
- **Stopped being written:** <DATE>.
- **Gate to drop:** reconciliation UI reads from `document.diagnostics`.

## `structured_data.{validation-results, fraud-flags, tax-issues}`
## `structured_data.line-items[*].vat-validation`

- **Replaced by:** `document.diagnostics` + `document_processor_run`.
- **Stopped being written:** <DATE>.
- **Gate to drop:** migration verified, all readers moved to `document.diagnostics`.
git add resources/migrations/PENDING-CLEANUPS.md
git commit -m "docs: queue diagnostics-related columns/tables for cleanup"

Task A11: Verify migration runs locally and produces expected schema

Files:

clj -M:dev

In the REPL:

(integrant.repl/reset-all)

Expected: system starts; logs show migratus applying 20260413120000-add-document-diagnostics. No errors.

psql -h localhost -U postgres -d orcha -c "\d document_processor_run"
psql -h localhost -U postgres -d orcha -c "\dT processor_run_status processor_run_trigger"
psql -h localhost -U postgres -d orcha -c "SELECT column_name, data_type FROM information_schema.columns WHERE table_name = 'document' AND column_name = 'diagnostics';"

Expected:

psql -h localhost -U postgres -d orcha -c "SELECT tgname FROM pg_trigger WHERE tgrelid = 'document_processor_run'::regclass AND NOT tgisinternal;"
psql -h localhost -U postgres -d orcha -c "SELECT tgname FROM pg_trigger WHERE tgrelid = 'document'::regclass AND tgname = 'trigger_document_matching_event';"

Expected:

psql -h localhost -U postgres -d orcha -c "SELECT processor_id, COUNT(*) FROM document_processor_run GROUP BY processor_id ORDER BY processor_id;"
psql -h localhost -U postgres -d orcha -c "SELECT COUNT(*) FROM document WHERE diagnostics IS NOT NULL;"

Expected: counts reflect what was in the local DB before migration. If the local DB is empty, both queries return 0 rows / 0 — that's fine.

No commit — verification only.


Phase B: Clojure schemas and prompt cleanup

After this phase, the Malli schemas reflect the new shape, the diagnostics namespace exists, and the LLM extraction prompts no longer ask for missing-fields.

Task B1: Create schema/diagnostics.clj

Files:

Write to src/com/getorcha/schema/diagnostics.clj:

(ns com.getorcha.schema.diagnostics
  "Schema for document.diagnostics — derived outputs computed from
   editable document state. Not editable by users; recomputed by the
   diagnostics-recompute pipeline after edits."
  (:require [malli.core :as m]))


(def ValidationCheck
  "Single validation check result.
   :pass checks have no message; other statuses require message."
  [:map
   [:status [:enum "pass" "warning" "error" "uncertain"]]
   [:field {:optional true} :string]
   [:message {:optional true} :string]
   [:details {:optional true} :map]
   [:resolved-value {:optional true} :any]
   [:confidence {:optional true} :double]
   [:reasoning {:optional true} :string]])


(def Validations
  "Map of check-name -> ValidationCheck."
  [:map-of :keyword ValidationCheck])


(def FraudFlagType
  [:enum
   :bank-account-mismatch :high-risk-country :missing-vat-id
   :supplier-bank-change :supplier-tax-id-change :invoice-splitting
   :sender-domain-change :urgency-language :vague-description])


(def FraudSeverity
  [:enum :critical :warning :info :not-applicable])


(def FraudFlag
  [:map
   [:rule-id :keyword]
   [:type FraudFlagType]
   [:severity FraudSeverity]
   [:message :string]
   [:suggestion {:optional true} :string]
   [:details {:optional true} :map]])


(def TaxIssue
  [:map
   [:type [:enum :missing-vat-id :missing-tax-id :incorrect-vat-treatment
                 :rate-mismatch :missing-compliance-statement :other]]
   [:severity [:enum "error" "warning"]]
   [:message :string]
   [:suggestion {:optional true} :string]])


(def VatValidation
  "Per-line-item VAT validation result."
  [:map
   [:status [:enum "valid" "invalid" "warning" "skipped"]]
   [:expected-rate {:optional true} number?]
   [:reasoning [:maybe :string]]
   [:suggestion {:optional true} :string]])


(def MatchingSummary
  "Snapshot of the matches produced by the matching processor.
   The relational document_match table remains the primary query source;
   this snapshot is for self-contained diagnostic-run audit and UI render."
  [:map
   [:matches [:vector
              [:map
               [:document-id :string]
               [:blended-score number?]
               [:llm-confidence [:enum "high" "medium" "low"]]
               [:match-method [:enum "rule-based" "llm"]]]]]])


(def ReconciliationSummary
  [:map
   [:status [:enum "reconciled" "incomplete" "error"]]
   [:details {:optional true} :map]])


(def Diagnostics
  "Schema for the document.diagnostics JSONB column. Top-level keys are
   absent when the corresponding processor has never successfully run."
  (m/schema
   [:map
    [:validations    {:optional true} Validations]
    [:fraud-flags    {:optional true} [:vector FraudFlag]]
    [:tax-issues     {:optional true} [:vector TaxIssue]]
    [:line-items     {:optional true} [:map-of :string [:map [:vat-validation {:optional true} VatValidation]]]]
    [:matching       {:optional true} MatchingSummary]
    [:reconciliation {:optional true} ReconciliationSummary]]))

Run:

clj-kondo --lint src/com/getorcha/schema/diagnostics.clj

Expected: no problems reported.

git add src/com/getorcha/schema/diagnostics.clj
git commit -m "feat: add schema/diagnostics.clj for document.diagnostics shape"

Task B2: Remove moved keys from invoice structured-data schema

Files:

Read src/com/getorcha/schema/invoice/structured_data.clj to confirm the current location of the keys to be removed (per spec §3.3): :missing-fields, :tax-issues, :fraud-flags, :validation-results, plus the :vat-validation field on LineItem and the ValidationCheck / ValidationResults defs.

Edit src/com/getorcha/schema/invoice/structured_data.clj and delete these lines from the InvoiceData body:

     ;; LLM-provided metadata
     [:missing-fields [:maybe [:vector :string]]]

     ;; Post-processing results (optional, added after extraction)
     ;; Note: cost-center and bu-code are per line-item, not invoice-level
     [:service-category {:optional true} ServiceCategory]
     [:tax-issues {:optional true} [:vector TaxIssue]]

     ;; Fraud detection results (added by with-fraud-detection phase)
     [:fraud-flags {:optional true} [:vector FraudFlag]]

     ;; Validation results (added by with-validations phase)
     [:validation-results {:optional true} ValidationResults]

Keep :service-category (still editable per spec §3.3 — only diagnostics move out). Remove :missing-fields, :tax-issues, :fraud-flags, :validation-results.

The resulting block should look like:

     ;; Optional service classification (editable; LLM seeds the value at
     ;; ingestion, user can override).
     [:service-category {:optional true} ServiceCategory]

In the same file, remove this line from LineItem:

   [:vat-validation {:optional true} VatValidation]

In the same file, delete the entire ValidationCheck, ValidationResults, TaxIssue, FraudFlag, FraudFlagType, FraudSeverity, and VatValidation defs (they live in schema/diagnostics.clj now).

Run:

clj-kondo --lint src test dev

Expected: no errors. Any usages elsewhere of com.getorcha.schema.invoice.structured-data/ValidationResults, /FraudFlag, /TaxIssue, /VatValidation will surface here as unresolved-symbol errors. If any appear, change them to require com.getorcha.schema.diagnostics and use the relocated symbol — those usages will be retired in later phases (e.g. processors writing to diagnostics), so a temporary alias is fine.

git add src/com/getorcha/schema/invoice/structured_data.clj
git commit -m "feat: remove diagnostic keys from invoice structured-data schema"

Task B3: Remove moved keys from purchase-order structured-data schema

Files:

Read src/com/getorcha/schema/purchase_order/structured_data.clj. Confirm presence of :missing-fields and any of :validation-results, :fraud-flags, :tax-issues (presence varies per doc type).

Delete :missing-fields (always present). Delete :validation-results, :fraud-flags, :tax-issues if present. If LineItem carries :vat-validation, delete it.

clj-kondo --lint src test dev

Expected: no errors.

git add src/com/getorcha/schema/purchase_order/structured_data.clj
git commit -m "feat: remove diagnostic keys from purchase_order structured-data schema"

Task B4: Remove moved keys from goods-received-note structured-data schema

Files:

Same approach: remove :missing-fields and any of the diagnostic keys present.

clj-kondo --lint src test dev
git add src/com/getorcha/schema/goods_received_note/structured_data.clj
git commit -m "feat: remove diagnostic keys from goods_received_note structured-data schema"

Task B5: Remove moved keys from contract structured-data schema

Files:

clj-kondo --lint src test dev
git add src/com/getorcha/schema/contract/structured_data.clj
git commit -m "feat: remove diagnostic keys from contract structured-data schema"

Task B6: Drop missing-fields from extraction prompts

Files:

Search the file for the four occurrences of the "missing-fields": ["field1", ...] line — one per document type (invoice, PO, contract, GRN), at approximately lines 394, 950, 1207, 1481.

Run:

grep -n '"missing-fields"' src/com/getorcha/workers/ap/ingestion/extraction.clj

Expected: four hits.

For each hit, delete the entire "missing-fields": ["field1", ...], line plus any blank line that separates it from neighbors. Preserve the rest of the prompt.

Run:

clj-kondo --lint src/com/getorcha/workers/ap/ingestion/extraction.clj

Expected: no errors.

git add src/com/getorcha/workers/ap/ingestion/extraction.clj
git commit -m "feat: drop missing-fields from extraction prompts"

Phase C: Run table data layer

After this phase, Clojure code can insert/update/query document_processor_run rows.

Task C1: Create db/document_processor_run.clj

Files:

Write to src/com/getorcha/db/document_processor_run.clj:

(ns com.getorcha.db.document-processor-run
  "Database access for document_processor_run.
   Two-phase write pattern: insert with status='running' at dispatch,
   update to status='completed'|'failed' at completion."
  (:require [com.getorcha.db.sql :as db.sql]))


(defn insert-running!
  "Inserts a row with status='running'. Returns the row's :id (UUID).

   `params` keys:
     :document-id              UUID, required
     :processor-id             string, required
     :trigger-kind             :ingestion | :edit | :manual, required
     :ingestion-id             UUID, required iff trigger-kind = :ingestion
     :triggered-by-history-id  UUID, required iff trigger-kind = :edit
     :document-version         integer, required
     :commit-sha               string, optional"
  [db {:keys [document-id processor-id trigger-kind ingestion-id
              triggered-by-history-id document-version commit-sha]}]
  (let [row (db.sql/execute-one!
             db
             {:insert-into [:document-processor-run]
              :columns     [:document-id :processor-id :trigger-kind
                            :ingestion-id :triggered-by-history-id
                            :document-version :commit-sha :status]
              :values      [[document-id processor-id [:cast (name trigger-kind) :processor-run-trigger]
                             ingestion-id triggered-by-history-id
                             document-version commit-sha
                             [:cast "running" :processor-run-status]]]
              :returning   [:id]})]
    (:document-processor-run/id row)))


(defn complete-run!
  "Marks a run as completed. Stores result + LLM stats. Returns nothing useful."
  [db run-id {:keys [result input-tokens output-tokens model]}]
  (db.sql/execute!
   db
   {:update :document-processor-run
    :set    {:status        [:cast "completed" :processor-run-status]
             :ended-at      [:now]
             :result        [:cast (when result (cheshire.core/generate-string result)) :jsonb]
             :input-tokens  input-tokens
             :output-tokens output-tokens
             :model         model}
    :where  [:= :id run-id]}))


(defn fail-run!
  "Marks a run as failed with an error message."
  [db run-id error-message]
  (db.sql/execute!
   db
   {:update :document-processor-run
    :set    {:status   [:cast "failed" :processor-run-status]
             :ended-at [:now]
             :error    error-message}
    :where  [:= :id run-id]}))


(defn latest-runs-per-processor
  "Returns the latest row per processor for a document. One row per
   processor_id, ordered by document_version DESC then started_at DESC.
   Returns a vector of full row maps (qualified keys per as-kebab-maps)."
  [db document-id]
  (db.sql/execute!
   db
   {:select   [:processor-id :status :document-version :result :error
               :started-at :ended-at :model :triggered-by-history-id]
    :from     [[{:select   [:document-processor-run.*
                            [[:row-number] :over [{:partition-by [:processor-id]
                                                   :order-by    [[:document-version :desc]
                                                                 [:started-at :desc]]}]
                             :rn]]
                 :from     [:document-processor-run]
                 :where    [:= :document-id document-id]}
                :ranked]]
    :where    [:= :ranked.rn 1]}))


(defn count-runs
  "Counts runs for a document, optionally filtered by processor-id and/or status."
  [db document-id & {:keys [processor-id status]}]
  (let [{:keys [count]}
        (db.sql/execute-one!
         db
         {:select [[:%count.* :count]]
          :from   [:document-processor-run]
          :where  (cond-> [:= :document-id document-id]
                    processor-id (->> (vector :and [:= :processor-id processor-id]))
                    status       (->> (vector :and [:= :status [:cast (name status) :processor-run-status]])))})]
    (or count 0)))

The complete-run! function needs cheshire. Update the ns form:

(ns com.getorcha.db.document-processor-run
  "Database access for document_processor_run.
   Two-phase write pattern: insert with status='running' at dispatch,
   update to status='completed'|'failed' at completion."
  (:require [cheshire.core]
            [com.getorcha.db.sql :as db.sql]))
clj-kondo --lint src/com/getorcha/db/document_processor_run.clj

Expected: no errors.

git add src/com/getorcha/db/document_processor_run.clj
git commit -m "feat: add db/document_processor_run.clj data layer"

Task C2: Tests for db/document_processor_run.clj

Files:

Write to test/com/getorcha/db/document_processor_run_test.clj:

(ns com.getorcha.db.document-processor-run-test
  (:require [clojure.test :refer [deftest is testing use-fixtures]]
            [com.getorcha.db :as db]
            [com.getorcha.db.document-processor-run :as run]
            [com.getorcha.test.fixtures :as fixtures]))


(use-fixtures :once fixtures/with-system)
(use-fixtures :each fixtures/with-rolled-back-transaction)


(defn ^:private seed-document!
  "Inserts a minimal document row + ap_ingestion row. Returns
   {:document-id ... :ingestion-id ... :legal-entity-id ...}."
  []
  ;; Stand up a tenant + legal-entity + document + ingestion using
  ;; existing fixture helpers. Implementation references
  ;; com.getorcha.test.fixtures helpers; the *db* var is bound by
  ;; with-rolled-back-transaction.
  ...)


(deftest insert-running-creates-row-test
  (testing "insert-running! returns a UUID and stores all the inputs"
    (let [{:keys [document-id ingestion-id]} (seed-document!)
          run-id (run/insert-running!
                  fixtures/*db*
                  {:document-id      document-id
                   :processor-id     "fraud-detector"
                   :trigger-kind     :ingestion
                   :ingestion-id     ingestion-id
                   :document-version 1
                   :commit-sha       "abc123"})]
      (is (uuid? run-id))
      (let [rows (run/latest-runs-per-processor fixtures/*db* document-id)
            row  (first (filter #(= "fraud-detector" (:document-processor-run/processor-id %)) rows))]
        (is (= "running" (:document-processor-run/status row)))
        (is (= 1 (:document-processor-run/document-version row)))))))


(deftest complete-run-updates-row-test
  (testing "complete-run! writes status, ended_at, result, and stats"
    (let [{:keys [document-id ingestion-id]} (seed-document!)
          run-id (run/insert-running!
                  fixtures/*db*
                  {:document-id      document-id
                   :processor-id     "tax-compliance-analyzer"
                   :trigger-kind     :ingestion
                   :ingestion-id     ingestion-id
                   :document-version 1})]
      (run/complete-run!
       fixtures/*db* run-id
       {:result        {:tax-issues []}
        :input-tokens  100
        :output-tokens 50
        :model         "claude-sonnet-4-6"})
      (let [row (->> (run/latest-runs-per-processor fixtures/*db* document-id)
                     (filter #(= "tax-compliance-analyzer" (:document-processor-run/processor-id %)))
                     first)]
        (is (= "completed" (:document-processor-run/status row)))
        (is (some? (:document-processor-run/ended-at row)))
        (is (= 100 (:document-processor-run/input-tokens row)))
        (is (= "claude-sonnet-4-6" (:document-processor-run/model row)))))))


(deftest fail-run-updates-row-test
  (testing "fail-run! writes status='failed' and the error message"
    (let [{:keys [document-id ingestion-id]} (seed-document!)
          run-id (run/insert-running!
                  fixtures/*db*
                  {:document-id      document-id
                   :processor-id     "matching"
                   :trigger-kind     :ingestion
                   :ingestion-id     ingestion-id
                   :document-version 1})]
      (run/fail-run! fixtures/*db* run-id "no candidates")
      (let [row (->> (run/latest-runs-per-processor fixtures/*db* document-id)
                     (filter #(= "matching" (:document-processor-run/processor-id %)))
                     first)]
        (is (= "failed" (:document-processor-run/status row)))
        (is (= "no candidates" (:document-processor-run/error row)))))))


(deftest latest-runs-returns-newest-per-processor-test
  (testing "When multiple runs exist for the same processor, only the newest is returned"
    (let [{:keys [document-id ingestion-id]} (seed-document!)
          older (run/insert-running!
                 fixtures/*db*
                 {:document-id      document-id
                  :processor-id     "validations"
                  :trigger-kind     :ingestion
                  :ingestion-id     ingestion-id
                  :document-version 1})
          newer (run/insert-running!
                 fixtures/*db*
                 {:document-id      document-id
                  :processor-id     "validations"
                  :trigger-kind     :ingestion
                  :ingestion-id     ingestion-id
                  :document-version 2})]
      (run/complete-run! fixtures/*db* older {:result {}})
      (run/complete-run! fixtures/*db* newer {:result {}})
      (let [rows (run/latest-runs-per-processor fixtures/*db* document-id)
            validations-rows (filter #(= "validations" (:document-processor-run/processor-id %)) rows)]
        (is (= 1 (count validations-rows)))
        (is (= 2 (:document-processor-run/document-version (first validations-rows))))))))


(deftest count-runs-test
  (testing "count-runs filters by processor-id and status"
    (let [{:keys [document-id ingestion-id]} (seed-document!)]
      (run/insert-running! fixtures/*db*
                           {:document-id document-id :processor-id "matching"
                            :trigger-kind :ingestion :ingestion-id ingestion-id
                            :document-version 1})
      (let [run-id (run/insert-running!
                    fixtures/*db*
                    {:document-id document-id :processor-id "matching"
                     :trigger-kind :ingestion :ingestion-id ingestion-id
                     :document-version 1})]
        (run/fail-run! fixtures/*db* run-id "x"))
      (is (= 2 (run/count-runs fixtures/*db* document-id :processor-id "matching")))
      (is (= 1 (run/count-runs fixtures/*db* document-id :processor-id "matching" :status :failed))))))

Replace the ... placeholder in seed-document! with concrete code that inserts a minimal tenant, legal_entity, document, and ap_ingestion row using db.sql/execute!. Use the existing fixture helpers in com.getorcha.test.fixtures if present (search for similar helpers — e.g. there may already be a with-document or seed-document! helper). If not, write the inserts directly:

(defn ^:private seed-document!
  []
  (let [tenant-id   (random-uuid)
        le-id       (random-uuid)
        document-id (random-uuid)
        ingestion-id (random-uuid)]
    (db.sql/execute! fixtures/*db*
      {:insert-into [:tenant]
       :columns [:id :name]
       :values [[tenant-id "test-tenant"]]})
    (db.sql/execute! fixtures/*db*
      {:insert-into [:legal-entity]
       :columns [:id :tenant-id :name]
       :values [[le-id tenant-id "test-le"]]})
    (db.sql/execute! fixtures/*db*
      {:insert-into [:document]
       :columns [:id :tenant-id :legal-entity-id :type :file-path :version]
       :values [[document-id tenant-id le-id [:cast "invoice" :document-type] "test/path" 1]]})
    (db.sql/execute! fixtures/*db*
      {:insert-into [:ap-ingestion]
       :columns [:id :document-id :tenant-id :legal-entity-id :status]
       :values [[ingestion-id document-id tenant-id le-id [:cast "completed" :ap-ingestion-status]]]})
    {:document-id document-id :ingestion-id ingestion-id :legal-entity-id le-id}))

If your project's tenant, legal_entity, document, or ap_ingestion schemas have additional NOT NULL columns, add them. Run a single insert via psql to confirm what's required if uncertain.

Run:

clj -X:test:silent :nses '[com.getorcha.db.document-processor-run-test]'

Expected outcome depends on whether the edit-history migration has shipped:

git add test/com/getorcha/db/document_processor_run_test.clj
git commit -m "test: cover db/document_processor_run insert/update/read"

Phase D: Ingestion-time wiring

After this phase, the ingestion pipeline writes per-processor document_processor_run rows two-phase, and the ingestion-completion transaction populates document.diagnostics, strips moved keys from structured_data, and recomputes needs_human_review from the new location.

Task D1: Add two-phase run-row writes to post-process orchestration

Files:

Read src/com/getorcha/workers/ap/ingestion/post_process.clj and find the function that invokes each post-processor (it iterates over processors, calls each, and writes one row to ap_ingestion_post_process_stat per result). Identify the single point where the call + stats write happens. We will wrap this with two-phase writes.

Add to the namespace require list:

[com.getorcha.db.document-processor-run :as db.run]

Add this helper near the top of the namespace (after the requires):

(defn ^:private with-run-row!
  "Two-phase write: insert a 'running' row, run f, then complete or fail.
   `run-params` is everything `db.run/insert-running!` needs except status.
   `f` is a thunk that returns {:result <output> :stats {:input-tokens ... :output-tokens ... :model ...}}.
   On exception, marks the run failed and re-throws."
  [db run-params f]
  (let [run-id (db.run/insert-running! db run-params)]
    (try
      (let [{:keys [result stats]} (f)]
        (db.run/complete-run! db run-id (assoc stats :result result))
        result)
      (catch Throwable t
        (db.run/fail-run! db run-id (.getMessage t))
        (throw t)))))

Find the existing per-processor invocation (around the call site that writes to ap_ingestion_post_process_stat). Replace the direct invocation with a with-run-row! wrap:

Before (illustrative — the actual call shape may differ):

(let [{:keys [result stats]} (invoke-processor processor-id ctx structured-data)]
  (write-stat! db ingestion-id processor-id stats)
  result)

After:

(with-run-row!
  db
  {:document-id      document-id
   :processor-id     (name processor-id)
   :trigger-kind     :ingestion
   :ingestion-id     ingestion-id
   :document-version (:document/version (fetch-document db document-id))
   :commit-sha       (System/getenv "GIT_COMMIT_SHA")}
  (fn []
    (let [{:keys [result stats]} (invoke-processor processor-id ctx structured-data)]
      ;; Keep writing to the legacy table for now — Phase J removes those readers.
      (write-stat! db ingestion-id processor-id stats)
      {:result result :stats stats})))

Adjust the destructured params to match the actual local names in post_process.clj. The document-id and ingestion-id are typically already in scope at this layer — if not, thread them through from the outer ingestion handler.

clj-kondo --lint src/com/getorcha/workers/ap/ingestion/post_process.clj

Expected: no errors.

clj -X:test:silent :nses '[com.getorcha.workers.ap.ingestion.post-process-test]' 2>&1 | grep -E "FAIL|ERROR|Ran"

Expected: existing tests still pass — the wrap is purely additive and the legacy stats write is preserved.

git add src/com/getorcha/workers/ap/ingestion/post_process.clj
git commit -m "feat: write document_processor_run rows two-phase in post-process"

Task D2: Promote vat-validation to its own processor

Files:

Read the file, locate where per-line-item VAT validation is computed (it appears to be inline in the validations pipeline today, attaching :vat-validation to each line item).

Add to src/com/getorcha/workers/ap/ingestion/validation.clj:

(defn run-vat-validation
  "Runs deterministic per-line-item VAT validation against `structured-data`.
   Returns a map of {line-item-id -> VatValidation-shaped map}.
   IDs are read from line-item :id (from the edit-history schema)."
  [{:keys [line-items] :as _structured-data}]
  (into {}
        (for [{:keys [id] :as li} line-items
              :let [vv (compute-vat-validation li)]
              :when (some? vv)]
          [id {:vat-validation vv}])))

If compute-vat-validation is the existing inline helper that returns the VatValidation map for one line item, reuse it. If the inline check is not yet a function, extract its body into one before this step.

In the same file, find and remove the code path that writes :vat-validation into each line item of structured-data (the legacy inline assignment). The run-vat-validation function now produces the data; the ingestion-completion handler will write it to document.diagnostics.line-items.

clj-kondo --lint src/com/getorcha/workers/ap/ingestion/validation.clj

Expected: no errors.

git add src/com/getorcha/workers/ap/ingestion/validation.clj
git commit -m "feat: extract per-line vat-validation into standalone run-vat-validation"

Task D3: Compute document.diagnostics snapshot in the ingestion-completion handler

Files:

Read src/com/getorcha/workers/ap/ingestion.clj and locate the ingestion-completion handler introduced by the edit-history plan (the function that writes the document_history ingestion row + updates document.structured_data + bumps document.version, in one transaction).

Add to the namespace:

(defn ^:private build-diagnostics
  "Assembles the document.diagnostics snapshot from the in-memory results
   of validations + post-process processors. `vat-validations` is the map
   returned by `validation/run-vat-validation`. `matching` and
   `reconciliation` may be nil at ingestion-completion time (the workers
   that produce them run after the ingestion-completion transaction;
   their outputs land via diagnostic-only updates later)."
  [{:keys [validation-results fraud-flags tax-issues vat-validations
           matching reconciliation]}]
  (cond-> {}
    (seq validation-results) (assoc :validations validation-results)
    (seq fraud-flags)        (assoc :fraud-flags fraud-flags)
    (seq tax-issues)         (assoc :tax-issues tax-issues)
    (seq vat-validations)    (assoc :line-items vat-validations)
    matching                 (assoc :matching matching)
    reconciliation           (assoc :reconciliation reconciliation)))

Add:

(defn ^:private compute-needs-human-review
  "True iff any validation has status='error' OR any fraud-flag has severity='critical'."
  [{:keys [validations fraud-flags] :as _diagnostics}]
  (boolean
    (or (some #(= "error" (:status %)) (vals validations))
        (some #(= :critical (:severity %)) fraud-flags))))

Find the existing UPDATE document SET structured_data = ..., version = version + 1 ... (or its honeysql equivalent) inside the completion transaction. Modify it to:

(let [diagnostics      (build-diagnostics
                        {:validation-results validation-results
                         :fraud-flags        fraud-flags
                         :tax-issues         tax-issues
                         :vat-validations    vat-validations})
      stripped-data    (-> final-structured-data
                           (dissoc :validation-results :fraud-flags :tax-issues :missing-fields)
                           (update :line-items (fn [lis] (mapv #(dissoc % :vat-validation) lis))))
      needs-review?    (compute-needs-human-review diagnostics)]
  (db.sql/execute! tx
    {:update :document
     :set    {:structured-data    [:cast (cheshire.core/generate-string stripped-data) :jsonb]
              :diagnostics        [:cast (cheshire.core/generate-string diagnostics) :jsonb]
              :type               document-type
              :needs-human-review needs-review?
              :version            [:+ :version 1]
              :updated-at         [:now]}
     :where  [:= :id document-id]}))

The exact local names (final-structured-data, validation-results, etc.) need to match what's already in scope at the completion handler. If those names don't exist, thread them in from the pipeline state map.

clj-kondo --lint src/com/getorcha/workers/ap/ingestion.clj

Expected: no errors.

git add src/com/getorcha/workers/ap/ingestion.clj
git commit -m "feat: write document.diagnostics in ingestion-completion transaction"

Task D4: Test ingestion-completion writes diagnostics correctly

Files:

Append to test/com/getorcha/workers/ap/ingestion_test.clj:

(deftest ingestion-completion-writes-diagnostics-test
  (testing "After ingestion completes, document.diagnostics holds validations/fraud-flags/tax-issues
            and structured_data does NOT contain those keys."
    ;; Arrange: set up a document + ingestion using existing fixture helpers,
    ;; with an extraction result that includes a validation error and a fraud flag.
    (let [{:keys [document-id ingestion-id]} (set-up-completed-ingestion-with-issues!)]
      (let [doc (db.sql/execute-one! fixtures/*db*
                  {:select [:diagnostics :structured-data :needs-human-review]
                   :from   [:document]
                   :where  [:= :id document-id]})
            diag (:document/diagnostics doc)
            sd   (:document/structured-data doc)]
        (is (some? (:validations diag)) "validations slice present")
        (is (some? (:fraud-flags diag)) "fraud-flags slice present")
        (is (not (contains? sd :validation-results)) "validation-results stripped from structured_data")
        (is (not (contains? sd :fraud-flags)) "fraud-flags stripped from structured_data")
        (is (true? (:document/needs-human-review doc)) "needs_human_review derived from diagnostics")))))

set-up-completed-ingestion-with-issues! is a new helper — write it to seed an ap_ingestion row, drive it through the ingestion completion handler with a hand-crafted extraction result containing one validation status='error' and one fraud-flag severity=:critical, and return the IDs.

clj -X:test:silent :nses '[com.getorcha.workers.ap.ingestion-test]' 2>&1 | grep -E "FAIL|ERROR|Ran"

Expected: PASS.

git add test/com/getorcha/workers/ap/ingestion_test.clj
git commit -m "test: ingestion-completion writes diagnostics + strips moved keys"

Phase E: Matching and reconciliation writes

After this phase, matching and reconciliation workers write their own document_processor_run rows AND populate the matching/reconciliation slices in document.diagnostics. They keep writing to the legacy columns (matching_status, reconciliation_status) for now — Phase J removes those reads.

Task E1: Matching worker writes run row + diagnostics.matching

Files:

Read src/com/getorcha/workers/ap/matching/worker.clj and find where matching results are persisted (typically: write to document_match, then UPDATE document SET matching_status = 'succeeded' ...).

At the dispatch point of the matching worker (where the work begins for a single document), insert a running row:

(let [run-id (db.run/insert-running!
              tx
              {:document-id      document-id
               :processor-id     "matching"
               :trigger-kind     :ingestion
               :ingestion-id     ingestion-id
               :document-version (:document/version (fetch-document tx document-id))})]
  (try
    (let [{:keys [matches]} (run-matching! tx document-id)]
      ;; Build the diagnostics snapshot
      (let [matching-summary {:matches (mapv (fn [m]
                                              {:document-id    (str (:other-document-id m))
                                               :blended-score  (:blended-score m)
                                               :llm-confidence (:llm-confidence m)
                                               :match-method   (:match-method m)})
                                             matches)}]
        ;; Update document.diagnostics.matching, gated on version equality
        (update-diagnostics-slice! tx document-id :matching matching-summary)
        (db.run/complete-run! tx run-id {:result matching-summary})))
    (catch Throwable t
      (db.run/fail-run! tx run-id (.getMessage t))
      (throw t))))

Add [com.getorcha.db.document-processor-run :as db.run] to the require list.

This helper merges a slice into document.diagnostics, but only if the run's document_version equals document.version at commit time. Add to the namespace:

(defn ^:private update-diagnostics-slice!
  "Merges {slice-key slice-value} into document.diagnostics atomically.
   No-op if the document's version has advanced beyond `expected-version`."
  ([tx document-id slice-key slice-value]
   (update-diagnostics-slice! tx document-id slice-key slice-value nil))
  ([tx document-id slice-key slice-value expected-version]
   (db.sql/execute!
    tx
    (cond-> {:update :document
             :set    {:diagnostics
                      [:case
                       [:= :diagnostics nil]
                       [:cast (cheshire.core/generate-string {slice-key slice-value}) :jsonb]
                       :else
                       [:|| :diagnostics
                        [:cast (cheshire.core/generate-string {slice-key slice-value}) :jsonb]]]
                      :updated-at [:now]}
             :where  [:= :id document-id]}
      expected-version (update :where (fn [w] [:and w [:= :version expected-version]])))))

(The || operator on JSONB shallow-merges keys, so calling with {:matching {:matches [...]}} overwrites the :matching key without touching siblings.)

Inside update-diagnostics-slice!, also recompute needs_human_review from the resulting diagnostics. Update the function to:

(defn ^:private update-diagnostics-slice!
  ([tx document-id slice-key slice-value]
   (update-diagnostics-slice! tx document-id slice-key slice-value nil))
  ([tx document-id slice-key slice-value expected-version]
   (let [merged-jsonb (cheshire.core/generate-string {slice-key slice-value})]
     (db.sql/execute!
      tx
      (cond-> {:update :document
               :set    {:diagnostics
                        [:case
                         [:= :diagnostics nil]
                         [:cast merged-jsonb :jsonb]
                         :else
                         [:|| :diagnostics [:cast merged-jsonb :jsonb]]]
                        :updated-at [:now]}
               :where  [:= :id document-id]}
        expected-version (update :where (fn [w] [:and w [:= :version expected-version]]))))
     ;; Recompute needs_human_review from the now-merged diagnostics.
     (db.sql/execute!
      tx
      {:update :document
       :set    {:needs-human-review
                [:or
                 [:exists
                  {:select [1]
                   :from   [[[:jsonb_each [:- :diagnostics 'validations']] [:k :v]]]
                   :where  [:= [:->> :v "status"] "error"]}]
                 [:exists
                  {:select [1]
                   :from   [[[:jsonb_array_elements [:- :diagnostics 'fraud-flags']] :f]]
                   :where  [:= [:->> :f "severity"] "critical"]}]]}
       :where  [:= :id document-id]}))))

This is heavier than the in-application Clojure helper from D3. The trade-off: matching/reconciliation runs from a worker that doesn't already hold the new diagnostics in memory, so a SQL-level recompute is simpler than reading + JSON-decoding + re-serializing.

clj-kondo --lint src/com/getorcha/workers/ap/matching/worker.clj

Expected: no errors.

git add src/com/getorcha/workers/ap/matching/worker.clj
git commit -m "feat: matching worker writes run row + diagnostics.matching"

Task E2: Reconciliation worker writes run row + diagnostics.reconciliation

Files:

Read src/com/getorcha/workers/ap/matching/reconciliation.clj. Locate where document.reconciliation_status is updated.

Wrap the reconciliation execution with db.run/insert-running! / db.run/complete-run!, write the slice as :reconciliation with shape {:status <status> :details <map>}, and recompute needs_human_review. Use the same update-diagnostics-slice! helper (extract it to a shared namespace, e.g. src/com/getorcha/db/document_diagnostics.clj, if both files need it):

(let [run-id (db.run/insert-running!
              tx
              {:document-id      document-id
               :processor-id     "reconciliation"
               :trigger-kind     :ingestion
               :ingestion-id     ingestion-id
               :document-version (:document/version (fetch-document tx document-id))})]
  (try
    (let [reconciliation-result (run-reconciliation! tx document-id)
          summary               {:status  (:status reconciliation-result)
                                 :details (dissoc reconciliation-result :status)}]
      (db.diagnostics/update-slice! tx document-id :reconciliation summary)
      (db.run/complete-run! tx run-id {:result summary}))
    (catch Throwable t
      (db.run/fail-run! tx run-id (.getMessage t))
      (throw t))))

Create the file:

(ns com.getorcha.db.document-diagnostics
  "Helpers for writing slices into document.diagnostics with derived
   needs_human_review recomputation."
  (:require [cheshire.core]
            [com.getorcha.db.sql :as db.sql]))


(defn update-slice!
  "Merges {slice-key slice-value} into document.diagnostics atomically.
   Recomputes document.needs_human_review from the merged result.
   No-op if `expected-version` is provided and document.version has advanced past it."
  ([tx document-id slice-key slice-value]
   (update-slice! tx document-id slice-key slice-value nil))
  ([tx document-id slice-key slice-value expected-version]
   ;; ... full body from E1 step 4 ...
   ))

Update both worker.clj and reconciliation.clj to require this namespace and use db.diagnostics/update-slice!.

clj-kondo --lint src/com/getorcha/workers/ap/matching/reconciliation.clj src/com/getorcha/workers/ap/matching/worker.clj src/com/getorcha/db/document_diagnostics.clj

Expected: no errors.

git add src/com/getorcha/db/document_diagnostics.clj src/com/getorcha/workers/ap/matching/worker.clj src/com/getorcha/workers/ap/matching/reconciliation.clj
git commit -m "feat: reconciliation worker writes run row + diagnostics.reconciliation"

Phase F: Detail-view rendering with stale states

After this phase, the detail page renders diagnostic sections from document.diagnostics, classified per-section as current/stale/recomputing/failed/never-run.

Task F1: Add per-section state classifier in app/ui/components.clj

Files:

Append to src/com/getorcha/app/ui/components.clj (placement: near the other section helpers):

(defn diagnostic-section-state
  "Classifies a section's render state given the document's current version
   and the latest run for the processor.

   Returns one of :current | :stale | :recomputing | :failed | :never-run.

   - :current    — latest completed run's document_version equals doc.version.
   - :stale      — latest completed run's document_version is behind doc.version.
   - :recomputing— a row with status='running' exists.
   - :failed     — latest row has status='failed'.
   - :never-run  — no row at all AND diagnostics has no slice for this processor."
  [{:document/keys [version diagnostics] :as _document}
   processor-id
   diagnostics-slice-key
   latest-run]
  (let [{run-status     :document-processor-run/status
         run-version    :document-processor-run/document-version} latest-run]
    (cond
      (and (nil? latest-run)
           (not (contains? diagnostics diagnostics-slice-key))) :never-run
      (= "running" run-status)                                  :recomputing
      (= "failed" run-status)                                   :failed
      (and (= "completed" run-status)
           (= run-version version))                             :current
      (and (= "completed" run-status)
           (< run-version version))                             :stale
      :else                                                     :never-run)))

Append:

(defn diagnostic-section-badge
  "Renders the small status pill that goes in the section header.
   Returns nil for :current (no pill needed)."
  [state]
  (case state
    :recomputing [:span.diagnostic-badge.is-recomputing "Recomputing…"]
    :stale       [:span.diagnostic-badge.is-stale "Stale"]
    :failed      [:span.diagnostic-badge.is-failed "Analysis failed"]
    :never-run   [:span.diagnostic-badge.is-never-run "No analysis yet"]
    :current     nil))

Append:

(defn diagnostic-section-class
  "Returns the CSS class for a diagnostic section based on its state.
   Used to apply gray-out + stripe styling to the body of stale/recomputing
   sections."
  [state]
  (case state
    :recomputing "diagnostic-section is-recomputing"
    :stale       "diagnostic-section is-stale"
    :failed      "diagnostic-section is-failed"
    :never-run   "diagnostic-section is-never-run"
    :current     "diagnostic-section is-current"))
clj-kondo --lint src/com/getorcha/app/ui/components.clj

Expected: no errors.

git add src/com/getorcha/app/ui/components.clj
git commit -m "feat: add diagnostic-section state classifier and badge helpers"

Task F2: Add CSS for stale and recomputing states

Files:

Append to resources/app/public/css/style.css:

/* Diagnostic section state styling */

.diagnostic-section {
  position: relative;
  transition: opacity 0.2s ease;
}

.diagnostic-section.is-recomputing,
.diagnostic-section.is-stale {
  opacity: 0.55;
}

.diagnostic-section.is-failed {
  opacity: 0.85;
}

.diagnostic-badge {
  display: inline-block;
  font-size: 11px;
  font-weight: 500;
  padding: 2px 8px;
  border-radius: 999px;
  margin-left: 8px;
  vertical-align: middle;
}

.diagnostic-badge.is-recomputing {
  background: #1f3a5f;
  color: #58a6ff;
  border: 1px solid #2c4a73;
}

.diagnostic-badge.is-stale {
  background: #3a2f1f;
  color: #d4a657;
  border: 1px solid #4a3a25;
}

.diagnostic-badge.is-failed {
  background: #4a1f1f;
  color: #f85149;
  border: 1px solid #6a2a2a;
}

.diagnostic-badge.is-never-run {
  background: #21262d;
  color: #8b949e;
  border: 1px solid #30363d;
}
git add resources/app/public/css/style.css
git commit -m "feat: add diagnostic-section state styling"

Task F3: Update invoice view to read diagnostics from document.diagnostics

Files:

Read src/com/getorcha/app/http/documents/view/invoice.clj and find sections that read from structured-data's validation/fraud/tax keys. They will look like:

(when (seq (:validation-results structured-data))
  (validation-results-section ...))

Add diagnostics and latest-runs to the destructured params of the view function. Where the view's outer handler is called from, fetch them:

(let [latest-runs (db.run/latest-runs-per-processor db-pool document-id)
      latest-runs-by-id (group-by :document-processor-run/processor-id latest-runs)]
  (invoice-view db-pool document opts
                {:diagnostics (:document/diagnostics document)
                 :latest-runs-by-id latest-runs-by-id}))

For each diagnostic section (validations, fraud-flags, tax-issues, matching, reconciliation), change the call to pass the section state classifier output:

(let [validations (:validations diagnostics)
      run         (first (get latest-runs-by-id "validations"))
      state       (components/diagnostic-section-state document "validations" :validations run)]
  (validation-results-section validations state))

Update the matching, reconciliation, fraud-flags, and tax-issues reads similarly. Each section helper must accept a state arg and use components/diagnostic-section-class and components/diagnostic-section-badge to render.

Per-line VAT validation now lives in diagnostics.line-items[<line-item-id>].vat-validation, not inside the line item itself. To minimise churn in the existing line-item-card / line-items-table helpers, merge it back at the boundary:

(let [vat-validations (get diagnostics :line-items {})
      vat-run         (first (get latest-runs-by-id "vat-validation"))
      vat-state       (components/diagnostic-section-state document "vat-validation" :line-items vat-run)
      line-items*     (mapv (fn [li]
                              (assoc li :vat-validation
                                     (get-in vat-validations [(:id li) :vat-validation])))
                            (:line-items structured-data))]
  (line-items-table line-items* {:vat-validation-state vat-state}))

Pass the vat-state to line-items-table so it can apply the gray-out badge to the VAT column header (or wherever the section indicator sits visually).

For each affected helper (find them by name — validation-results-section, fraud-detection-section, tax-compliance-section, etc.), modify the signature:

(defn validation-results-section
  ([results] (validation-results-section results :current))  ;; backward-compatible default
  ([results state]
   [:section {:class (diagnostic-section-class state)}
    [:h3 "Validation results "
     (diagnostic-section-badge state)]
    ;; ... existing render of validation results ...
    ]))
clj-kondo --lint src/com/getorcha/app/ui/components.clj src/com/getorcha/app/http/documents/view/invoice.clj

Expected: no errors.

Start the dev server:

clj -M:dev

In the REPL:

(integrant.repl/go)

Open a browser to a known invoice URL (use a test document ID); confirm:

If there are visual regressions (e.g. opacity applied to current sections), check diagnostic-section-class:current should not gray out.

git add src/com/getorcha/app/http/documents/view/invoice.clj src/com/getorcha/app/ui/components.clj
git commit -m "feat: invoice view reads diagnostics from document.diagnostics with section states"

Task F4: Update purchase-order view

Files:

Mirror the invoice view changes: pass diagnostics + latest-runs-by-id, classify each section, pass state to section helpers.

clj-kondo --lint src/com/getorcha/app/http/documents/view/purchase_order.clj
git add src/com/getorcha/app/http/documents/view/purchase_order.clj
git commit -m "feat: purchase_order view reads diagnostics with section states"

Task F5: Update goods-received-note view

Files:

clj-kondo --lint src/com/getorcha/app/http/documents/view/goods_received_note.clj
git add src/com/getorcha/app/http/documents/view/goods_received_note.clj
git commit -m "feat: goods_received_note view reads diagnostics with section states"

Task F6: Update contract view

Files:

clj-kondo --lint src/com/getorcha/app/http/documents/view/contract.clj
git add src/com/getorcha/app/http/documents/view/contract.clj
git commit -m "feat: contract view reads diagnostics with section states"

Phase G: SSE wiring

After this phase, the detail page auto-refreshes diagnostic sections as runs transition. Existing SSE infrastructure is reused.

Task G1: Extend SSE exec-fn to handle diagnostic-run events

Files:

Read src/com/getorcha/app/http/documents/view/shared.clj lines around 1018-1098 (the SSE case statement on event-type).

In the case branch, add (before the nil fallthrough):

:diagnostic-run-started
;; A processor just started recomputing; re-render the section in
;; the recomputing state.
(let [processor-id (:processor/id event)
      latest-run   {:document-processor-run/status           "running"
                    :document-processor-run/processor-id     processor-id
                    :document-processor-run/document-version (:document-version event)}
      doc          (db.sql/execute-one!
                     db-pool
                     {:select [:diagnostics :version]
                      :from   [:document]
                      :where  [:= :id document-id]})
      slice-key    (processor-id->slice-key processor-id)
      state        (components/diagnostic-section-state doc processor-id slice-key latest-run)]
  {:event (str "diagnostic-run-started:" processor-id)
   :data  (hiccup/html (render-diagnostic-section
                         (get (:document/diagnostics doc) slice-key)
                         state
                         processor-id))})

:diagnostic-run-completed
;; A processor just finished; re-render the section with current data.
(let [processor-id (:processor/id event)
      latest-runs  (db.run/latest-runs-per-processor db-pool document-id)
      latest-run   (->> latest-runs
                        (filter #(= processor-id (:document-processor-run/processor-id %)))
                        first)
      doc          (db.sql/execute-one!
                     db-pool
                     {:select [:diagnostics :version]
                      :from   [:document]
                      :where  [:= :id document-id]})
      slice-key    (processor-id->slice-key processor-id)
      state        (components/diagnostic-section-state doc processor-id slice-key latest-run)]
  {:event (str "diagnostic-run-completed:" processor-id)
   :data  (hiccup/html (render-diagnostic-section
                         (get (:document/diagnostics doc) slice-key)
                         state
                         processor-id))})

Add to shared.clj (or a new namespace app/http/documents/view/diagnostics.clj — choose based on file size; if shared.clj is already large, factor out):

(def ^:private processor-id->slice-key
  {"validations"             :validations
   "fraud-detector"          :fraud-flags
   "tax-compliance-analyzer" :tax-issues
   "vat-validation"          :line-items
   "matching"                :matching
   "reconciliation"          :reconciliation})


(defn ^:private render-diagnostic-section
  "Renders the hiccup fragment for a single diagnostic section based on
   its slice and state. Dispatches to the existing per-section helper."
  [slice state processor-id]
  (case processor-id
    "validations"             (components/validation-results-section slice state)
    "fraud-detector"          (components/fraud-detection-section    slice state)
    "tax-compliance-analyzer" (components/tax-compliance-section     slice state)
    "vat-validation"          (components/vat-validation-section     slice state)
    "matching"                (components/matches-section            slice state)
    "reconciliation"          (components/reconciliation-section     slice state)))

(Component helper names need to match what exists in components.clj. Check those names and adjust.)

clj-kondo --lint src/com/getorcha/app/http/documents/view/shared.clj
git add src/com/getorcha/app/http/documents/view/shared.clj
git commit -m "feat: SSE exec-fn handles diagnostic-run-started/completed events"

Task G2: Wire HTMX SSE attributes onto each diagnostic section

Files:

For each diagnostic section helper modified in F1 (validation-results-section, fraud-detection-section, etc.), add HTMX SSE attributes so the client swaps the section on the matching event:

(defn validation-results-section
  ([results] (validation-results-section results :current))
  ([results state]
   [:section {:class       (diagnostic-section-class state)
              :hx-ext      "sse"
              :sse-swap    "diagnostic-run-started:validations,diagnostic-run-completed:validations"
              :hx-swap     "outerHTML"}
    [:h3 "Validation results " (diagnostic-section-badge state)]
    ;; ... existing render ...
    ]))

Set sse-swap per processor id. Make sure the SSE-connected element is at a known location (the SSE source must be opened by an ancestor with hx-ext="sse" and sse-connect="..." — confirm by reading the existing detail-view layout).

Search for sse-connect or sse-source in the codebase to find where the EventSource is opened:

grep -rn "sse-connect\|sse-source\|hx-ext=\"sse" src/com/getorcha/app/

If no such attribute exists yet, add one to the detail-view's outermost container — pointing at the existing SSE endpoint. The endpoint already exists at shared.clj:1018 and serves document-events for the page's tenant.

clj-kondo --lint src/com/getorcha/app/ui/components.clj
git add src/com/getorcha/app/ui/components.clj
git commit -m "feat: HTMX SSE attributes on diagnostic sections for auto-refresh"

Task G3: End-to-end SSE smoke test

Files:

clj -M:dev
(integrant.repl/reset-all)

In the REPL:

(require '[com.getorcha.db.document-processor-run :as db.run])
(def db-pool (:com.getorcha.db/pool integrant.repl.state/system))
(def document-id #uuid "<some-document-id>")
(db.run/insert-running! db-pool
  {:document-id document-id
   :processor-id "fraud-detector"
   :trigger-kind :ingestion
   :ingestion-id #uuid "<an-ingestion-id>"
   :document-version 1})

Expected: in the browser, the fraud-flags section transitions to the "Recomputing…" state.

;; Get the run id you just created
(def run-id ...)
(db.run/complete-run! db-pool run-id {:result {:fraud-flags []}})

Expected: in the browser, the section transitions back to current state.

No commit — verification only.


Phase H: Infrastructure (queue + config)

After this phase, both production (CDK) and local (LocalStack via init_aws.clj + test fixtures) environments have the new SQS queue.

Task H1: Add SQS queue to CDK foundation stack

Files:

Open infra/stacks/foundation_stack.py and find the block that defines ingest_queue and email_acquire_queue (lines ~282-321 per spec).

Insert (after the email_acquire_queue block):

        # Diagnostics recompute pipeline
        self.diagnostics_recompute_dlq = sqs.Queue(
            self,
            "DiagnosticsRecomputeDlq",
            queue_name="v1-orcha-global-diagnostics-recompute-dlq",
            retention_period=Duration.days(14),
            encryption=sqs.QueueEncryption.SQS_MANAGED,
        )
        self.diagnostics_recompute_queue = sqs.Queue(
            self,
            "DiagnosticsRecomputeQueue",
            queue_name="v1-orcha-global-diagnostics-recompute",
            visibility_timeout=Duration.seconds(600),
            retention_period=Duration.days(7),
            encryption=sqs.QueueEncryption.SQS_MANAGED,
            dead_letter_queue=sqs.DeadLetterQueue(
                queue=self.diagnostics_recompute_dlq,
                max_receive_count=3,
            ),
        )

Find where ingest_queue.grant_consume_messages(...) is called for the workers role and add an analogous grant for the new queue. Search for ingest_queue.grant_consume_messages:

grep -rn "ingest_queue.grant" infra/

Replicate the pattern for diagnostics_recompute_queue.

git add infra/stacks/foundation_stack.py
git commit -m "infra: add diagnostics-recompute SQS queue + DLQ to CDK"

Task H2: Add SQS queue to local init script

Files:

In scripts/init_aws.clj near line 104-106 (where sqs-ingestion-queue etc. are defined), append:

(def sqs-diagnostics-recompute-queue (get-in config [:com.getorcha/aws :queues :diagnostics-recompute]))

In the setup-sqs! function around line 236, append:

  (create-queue-with-dlq! sqs-diagnostics-recompute-queue))

(Replace the closing )) of the existing function with the call followed by the original closing.)

Find the printlns at lines ~343 and ~360-362 and add the new queue:

(println "  SQS Queues:" sqs-ingestion-queue "," sqs-acquisition-queue ","
         sqs-matching-queue "," sqs-diagnostics-recompute-queue)
;; ...
(println "Diagnostics-recompute queue (from config.edn):" sqs-diagnostics-recompute-queue)
git add scripts/init_aws.clj
git commit -m "infra: add diagnostics-recompute queue to local init script"

Task H3: Add queue URL to config

Files:

Find the :com.getorcha/aws config entry's :queues map.

Add:

:diagnostics-recompute "v1-orcha-global-diagnostics-recompute"

(or whatever name format the config uses — match the existing :ingestion, :acquisition, :matching entries).

git add resources/com/getorcha/config.edn
git commit -m "config: add diagnostics-recompute queue entry"

Task H4: Update test fixtures to create the new queue

Files:

Search the file for CreateQueueRequest or queue setup logic.

Add the diagnostics-recompute queue (with its DLQ) to whichever startup helper creates the existing queues. Follow the existing pattern for queue creation in tests; use the LocalStack endpoint from the test system.

clj -X:test:silent :nses '[com.getorcha.test.fixtures-test]' 2>&1 | grep -E "FAIL|ERROR|Ran"

If there's no fixtures-test namespace, run a small smoke test instead:

clj -X:test:silent :nses '[com.getorcha.db-test]' 2>&1 | grep -E "FAIL|ERROR|Ran"

Expected: no regressions.

git add test/com/getorcha/test/fixtures.clj
git commit -m "test: create diagnostics-recompute queue at test startup"

Phase I: Edit-triggered recompute pipeline

After this phase, edit handlers enqueue SQS messages, and a workers-service consumer debounces (idempotency check) and dispatches diagnostic processors.

Task I1: Create the consumer namespace skeleton

Files:

Write to src/com/getorcha/workers/diagnostics_recompute.clj:

(ns com.getorcha.workers.diagnostics-recompute
  "SQS consumer for edit-triggered diagnostics recompute. Each message
   represents a debounced 'rerun diagnostics for document X at version Y'
   request. Idempotent: if a newer edit exists, skip."
  (:require [cheshire.core :as json]
            [clojure.tools.logging :as log]
            [com.getorcha.db.sql :as db.sql]
            [com.getorcha.workers.diagnostics-recompute.orchestrator :as orchestrator]))


(defn ^:private should-run?
  "Returns true when no edit newer than the message's enqueued-at exists.
   When a newer edit exists, a later message will fire — skip this one."
  [db-pool {:keys [document-id enqueued-at]}]
  (let [newest (db.sql/execute-one!
                db-pool
                {:select   [:created-at]
                 :from     [:document-history]
                 :where    [:and
                            [:= :document-id document-id]
                            [:= :change-type [:cast "edit" :document-history-change-type]]]
                 :order-by [[:created-at :desc]]
                 :limit    1})]
    (or (nil? newest)
        (not (pos? (compare (:document-history/created-at newest) enqueued-at))))))


(defn handle-message
  "SQS message handler. Idempotent — drops messages superseded by newer edits."
  [{:keys [db-pool] :as ctx} message-body]
  (let [{:keys [document-id history-id document-version enqueued-at]
         :as   message} (json/parse-string message-body true)]
    (cond
      (not (should-run? db-pool message))
      (do (log/info "Skipping superseded diagnostics-recompute message"
                    {:document-id document-id :enqueued-at enqueued-at})
          :skipped)

      :else
      (do (log/info "Running diagnostics recompute"
                    {:document-id document-id :version document-version})
          (orchestrator/recompute-all! ctx
                                       {:document-id      (parse-uuid document-id)
                                        :history-id       (parse-uuid history-id)
                                        :document-version document-version})
          :ok))))
clj-kondo --lint src/com/getorcha/workers/diagnostics_recompute.clj

Expected: clj-kondo will warn that orchestrator is unresolved — that's fine; we create it next.

git add src/com/getorcha/workers/diagnostics_recompute.clj
git commit -m "feat: add diagnostics-recompute SQS consumer skeleton"

Task I2: Create the orchestrator namespace

Files:

Write to src/com/getorcha/workers/diagnostics_recompute/orchestrator.clj:

(ns com.getorcha.workers.diagnostics-recompute.orchestrator
  "Phase-ordered dispatch of diagnostic processors for edit-triggered
   recomputes. Phase 1 processors run in parallel; phase 2 processors
   wait for phase 1 to commit (they consume phase-1 outputs)."
  (:require [clojure.core.async :as a]
            [com.getorcha.db.document-processor-run :as db.run]
            [com.getorcha.db.document-diagnostics :as db.diagnostics]
            [com.getorcha.workers.ap.ingestion.validation :as validation]
            [com.getorcha.workers.ap.ingestion.post-process.fraud :as fraud]
            [com.getorcha.workers.ap.ingestion.post-process.tax-compliance :as tax-compliance]
            [com.getorcha.workers.ap.matching.worker :as matching]
            [com.getorcha.workers.ap.matching.reconciliation :as reconciliation]))


(def ^:private phase-1-processors
  "Diagnostic processors that have no inter-processor dependencies."
  [{:processor-id "validations"
    :run-fn       validation/run-validations}
   {:processor-id "tax-compliance-analyzer"
    :run-fn       tax-compliance/run-tax-compliance}
   {:processor-id "vat-validation"
    :run-fn       validation/run-vat-validation}
   {:processor-id "matching"
    :run-fn       matching/run-matching}])


(def ^:private phase-2-processors
  "Diagnostic processors that need phase-1 outputs (validations + matching)."
  [{:processor-id "fraud-detector"
    :run-fn       fraud/run-fraud-detection}
   {:processor-id "reconciliation"
    :run-fn       reconciliation/run-reconciliation}])


(defn ^:private slice-key-for-processor
  [processor-id]
  (case processor-id
    "validations"             :validations
    "tax-compliance-analyzer" :tax-issues
    "vat-validation"          :line-items
    "matching"                :matching
    "fraud-detector"          :fraud-flags
    "reconciliation"          :reconciliation))


(defn ^:private run-one!
  "Runs a single processor with two-phase row writes and slice merge.
   Returns the run's :result for downstream phases."
  [{:keys [db-pool] :as ctx} {:keys [document-id history-id document-version]} {:keys [processor-id run-fn]}]
  (let [run-id (db.run/insert-running! db-pool
                                       {:document-id             document-id
                                        :processor-id            processor-id
                                        :trigger-kind            :edit
                                        :triggered-by-history-id history-id
                                        :document-version        document-version})]
    (try
      (let [{:keys [result stats]} (run-fn ctx document-id document-version)]
        (db.diagnostics/update-slice! db-pool document-id
                                      (slice-key-for-processor processor-id)
                                      result
                                      document-version)
        (db.run/complete-run! db-pool run-id (assoc stats :result result))
        result)
      (catch Throwable t
        (db.run/fail-run! db-pool run-id (.getMessage t))
        nil))))


(defn ^:private run-phase!
  "Runs every processor in the phase concurrently. Returns when all complete.
   Failures within a phase don't block other processors in the same phase."
  [ctx params processors]
  (let [chans (mapv (fn [p] (a/thread (run-one! ctx params p))) processors)]
    (mapv a/<!! chans)))


(defn recompute-all!
  "Top-level entry-point: runs every diagnostic processor in phase order."
  [ctx params]
  (run-phase! ctx params phase-1-processors)
  (run-phase! ctx params phase-2-processors)
  :done)

For each phase-1/phase-2 entry above, confirm the referenced function exists and has the signature (fn [ctx document-id document-version] -> {:result <output> :stats <map>}). If not, add a thin adapter in the processor namespace that calls the existing implementation.

For example, in src/com/getorcha/workers/ap/ingestion/post_process/fraud.clj, add:

(defn run-fraud-detection
  "Edit-time entry point. Loads document state and runs the existing
   fraud-detection logic."
  [{:keys [db-pool] :as _ctx} document-id _document-version]
  (let [doc (load-document-with-diagnostics db-pool document-id)
        {:keys [fraud-flags stats]} (detect-fraud doc)]
    {:result fraud-flags :stats stats}))

Repeat for validation/run-validations, validation/run-vat-validation, tax-compliance/run-tax-compliance, matching/run-matching, reconciliation/run-reconciliation.

clj-kondo --lint src/com/getorcha/workers/diagnostics_recompute/orchestrator.clj

Expected: no errors.

git add src/com/getorcha/workers/diagnostics_recompute/orchestrator.clj src/com/getorcha/workers/ap/ingestion/post_process/fraud.clj src/com/getorcha/workers/ap/ingestion/validation.clj src/com/getorcha/workers/ap/ingestion/post_process/tax_compliance.clj src/com/getorcha/workers/ap/matching/worker.clj src/com/getorcha/workers/ap/matching/reconciliation.clj
git commit -m "feat: orchestrator dispatches diagnostic processors with phase ordering"

Task I3: Wire SQS enqueue from edit handlers

Files:

Read the edit handler namespace introduced by edit-history. Find the function where the edit transaction commits.

Add:

(defn ^:private enqueue-recompute!
  "Sends a delayed SQS message to trigger debounced diagnostics recompute."
  [{:keys [sqs-client diagnostics-queue-url]} {:keys [document-id history-id document-version]}]
  (let [body (json/generate-string
               {:document-id      (str document-id)
                :history-id       (str history-id)
                :document-version document-version
                :enqueued-at      (str (java.time.Instant/now))})]
    (.sendMessage sqs-client
                  (-> (software.amazon.awssdk.services.sqs.model.SendMessageRequest/builder)
                      (.queueUrl diagnostics-queue-url)
                      (.messageBody body)
                      (.delaySeconds (int 60))
                      .build))))

After the edit transaction commits, call this with the new history row's ID + the document's new version.

In the Integrant config (system.clj), the edit handler component already exists from the edit-history plan. Add the new dependencies:

{:com.getorcha.app.http.documents/edit-handler
 {:db-pool                (ig/ref ::db/pool)
  :sqs-client             (ig/ref [:com.getorcha/sqs :client])
  :diagnostics-queue-url  (ig/ref [:com.getorcha/aws :queue-urls :diagnostics-recompute])}}

(Adjust to the existing config's shape — the project uses aero profiles + integrant; mirror the existing matching-queue wiring.)

clj-kondo --lint src/com/getorcha/app/http/documents/edits.clj src/com/getorcha/system.clj

Expected: no errors.

git add src/com/getorcha/app/http/documents/edits.clj src/com/getorcha/system.clj
git commit -m "feat: edit handler enqueues debounced diagnostics-recompute message"

Task I4: Register the diagnostics-recompute consumer in the workers system

Files:

Find where the ingestion + matching + acquisition consumers are registered as Integrant components.

Add an Integrant component init key (e.g. :com.getorcha.workers.diagnostics-recompute/consumer) that:

Use the existing matching consumer as the template — the shape is the same.

clj-kondo --lint src/com/getorcha/system.clj
clj -M:dev

In the REPL:

(integrant.repl/reset-all)

Expected: system starts without error; no missing-component messages in logs.

git add src/com/getorcha/system.clj
git commit -m "feat: register diagnostics-recompute consumer in workers system"

Task I5: Test idempotency check

Files:

Write to test/com/getorcha/workers/diagnostics_recompute_test.clj:

(ns com.getorcha.workers.diagnostics-recompute-test
  (:require [clojure.test :refer [deftest is testing use-fixtures]]
            [com.getorcha.db.sql :as db.sql]
            [com.getorcha.test.fixtures :as fixtures]
            [com.getorcha.workers.diagnostics-recompute :as recompute]))


(use-fixtures :once fixtures/with-system)
(use-fixtures :each fixtures/with-rolled-back-transaction)


(defn ^:private seed-doc-with-edit!
  "Seeds a document and a single edit history row at `edit-time`. Returns
   {:document-id ... :history-id ...}."
  [edit-time]
  ;; Use the helpers from C2 / fixtures
  ...)


(deftest should-run-when-no-newer-edit-test
  (testing "should-run? returns true when no edit newer than enqueued-at exists"
    (let [edit-time (java.time.Instant/parse "2026-04-13T10:00:00Z")
          {:keys [document-id history-id]} (seed-doc-with-edit! edit-time)]
      (is (true?
           (#'recompute/should-run?
            fixtures/*db*
            {:document-id (str document-id)
             :enqueued-at (.plusSeconds edit-time 1)}))))))


(deftest should-skip-when-newer-edit-exists-test
  (testing "should-run? returns false when an edit newer than enqueued-at exists"
    (let [edit-time (java.time.Instant/parse "2026-04-13T10:05:00Z")
          {:keys [document-id]} (seed-doc-with-edit! edit-time)]
      (is (false?
           (#'recompute/should-run?
            fixtures/*db*
            {:document-id (str document-id)
             :enqueued-at (java.time.Instant/parse "2026-04-13T10:00:00Z")}))))))

Adapt the seed-document! helper from C2 to also insert a document_history row with change_type='edit' at the given edit-time.

clj -X:test:silent :nses '[com.getorcha.workers.diagnostics-recompute-test]' 2>&1 | grep -E "FAIL|ERROR|Ran"

Expected: PASS.

git add test/com/getorcha/workers/diagnostics_recompute_test.clj
git commit -m "test: diagnostics-recompute idempotency check"

Phase J: Reader cleanup (move readers off legacy locations)

After this phase, no application code reads document.matching_status, document.matching_error, document.matching_attempts, document.matching_failed_at, document.reconciliation_status, or the validation-results / fraud-flags / tax-issues keys inside structured_data. The columns and JSONB keys still exist physically (their cleanup is gated in PENDING-CLEANUPS.md); the migration we wrote already strips the JSONB keys but leaves the columns.

Task J1: Find and update readers of document.matching_status

Files:

grep -rn "matching-status\|matching_status\|:document/matching-status" src test dev scripts

Expected: a list of reader sites in the app, the SSE handler (already replaced in G1), tests, and possibly admin queries.

Replace patterns like:

(:document/matching-status doc)

with:

(let [latest (->> (db.run/latest-runs-per-processor db-pool (:document/id doc))
                  (filter #(= "matching" (:document-processor-run/processor-id %)))
                  first)]
  (case (:document-processor-run/status latest)
    "completed" "succeeded"
    "running"   "in-progress"
    "failed"    "failed"
    "pending"   "pending"
    nil))

Or, where the consumer needs only "is matching done?", inline the check.

clj-kondo --lint src test dev

Expected: no errors.

clj -X:test:silent 2>&1 | grep -E "(FAIL in|ERROR in|Execution error|Ran .* tests)"

Expected: 0 failures.

git add -A
git commit -m "refactor: read matching status from document_processor_run"

Task J2: Find and update readers of document.reconciliation_status

Files:

grep -rn "reconciliation-status\|reconciliation_status\|:document/reconciliation-status" src test dev scripts

Pattern:

(get-in doc [:document/diagnostics :reconciliation :status])
clj-kondo --lint src test dev
clj -X:test:silent 2>&1 | grep -E "(FAIL in|ERROR in|Execution error|Ran .* tests)"

Expected: clean lint, 0 test failures.

git add -A
git commit -m "refactor: read reconciliation status from document.diagnostics"

Task J3: Find and update readers of structured_data.{validation-results, fraud-flags, tax-issues}

Files:

grep -rn ":validation-results\|:fraud-flags\|:tax-issues\|\"validation-results\"\|\"fraud-flags\"\|\"tax-issues\"" src test dev scripts

Expected hits: validation processors (writes — keep), tax processors (writes — keep), fraud processors (writes — keep), AND any reader still reaching into structured-data for these keys (replace with :document/diagnostics lookup).

Pattern:

;; Before
(:validation-results (:document/structured-data doc))

;; After
(get-in doc [:document/diagnostics :validations])
clj-kondo --lint src test dev
clj -X:test:silent 2>&1 | grep -E "(FAIL in|ERROR in|Execution error|Ran .* tests)"
git add -A
git commit -m "refactor: read diagnostic outputs from document.diagnostics"

Phase K: Tools, scripts, and Claude skills

After this phase, dev tooling and Claude skills know about the new locations.

Task K1: Update debug-doc skill

Files:

Read .claude/skills/debug-doc/SKILL.md. Identify any SQL or query references to ap_ingestion.structured_data, ap_ingestion_post_process_stat, document.matching_status, or document.reconciliation_status.

git add .claude/skills/debug-doc/SKILL.md
git commit -m "docs: update debug-doc skill for diagnostics + processor_run table"

Task K2: Update ingestion-regression-test skill

Files:

Read .claude/skills/ingestion-regression-test/inspector-prompt.md.

These keys are now under document.diagnostics. Update the prompt's "what to compare" section accordingly.

git add .claude/skills/ingestion-regression-test/inspector-prompt.md
git commit -m "docs: update ingestion-regression-test skill for diagnostics location"

Task K3: Update debug_fetch_document.clj

Files:

Read scripts/debug_fetch_document.clj. It fetches a document + its ingestions + history rows from prod into local for debugging.

Append the same row-copy pattern that already exists for document_history in this script:

(defn ^:private copy-processor-runs!
  [src-pool dest-pool document-id]
  (let [rows (db.sql/execute! src-pool
                              {:select [:*]
                               :from   [:document-processor-run]
                               :where  [:= :document-id document-id]})]
    (doseq [row rows]
      (db.sql/execute! dest-pool
                       {:insert-into [:document-processor-run]
                        :values      [(dissoc row :document-processor-run/id)]}))))

Call it from the main fetch flow alongside the document-history copy.

clj-kondo --lint scripts/debug_fetch_document.clj
git add scripts/debug_fetch_document.clj
git commit -m "tooling: copy document_processor_run rows in debug_fetch_document"

Task K4: Update debug_common.clj jsonb-keys

Files:

Read scripts/debug_common.clj. Look for the *-jsonb-keys set definitions (e.g. ingestion-jsonb-keys).

Add :diagnostics to document-jsonb-keys (or wherever document columns are listed) and create a new set for the new table:

(def document-processor-run-jsonb-keys
  "JSONB columns on document_processor_run that need round-trip coercion."
  #{:result})

Wire this into whatever fetch helper handles round-tripping for the table.

git add scripts/debug_common.clj
git commit -m "tooling: register document.diagnostics and document_processor_run.result jsonb columns"

Phase L: Final verification

Task L1: Full test suite + lint sweep

Files:

clj-kondo --lint src test dev

Expected: no problems reported.

clj -X:test:silent 2>&1 | grep -E "(FAIL in|ERROR in|Execution error|failed because|Ran .* tests)"

Expected: all green; report Ran N tests with 0 failures.

In a dev REPL:

(integrant.repl/reset-all)

Manually:

  1. Open a known invoice in the browser.
  2. Edit a value (e.g. invoice-number).
  3. Wait 60 seconds.
  4. Confirm in the browser that diagnostic sections briefly transition to "Recomputing…" then back to current.
  5. In a SQL console:
    SELECT processor_id, status, document_version, started_at, ended_at
      FROM document_processor_run
     WHERE document_id = '<your-doc-id>'
       AND trigger_kind = 'edit'
     ORDER BY started_at;
    
    Expected: rows for each diagnostic processor with status='completed' and document_version equal to the post-edit document.version.

Self-Review Checklist

After working through every task above:

  1. All migrations apply cleanly: clj -X:dev :ns 'com.getorcha.db.migrations' :fn 'migrate!' runs without errors against a fresh database.
  2. document.diagnostics is populated for newly ingested docs AND the moved keys are absent from document.structured_data.
  3. Editing a document triggers a debounced recompute 60s later; UI sections transition stale → current automatically via SSE.
  4. No reader of matching_status, matching_error, matching_attempts, matching_failed_at, reconciliation_status, or the moved structured_data keys remains in src/, test/, dev/, or scripts/. Confirm with the grep commands from Phase J.
  5. ap_ingestion_post_process_stat continues to be written alongside the new table for now (legacy backstop). Removal is queued in PENDING-CLEANUPS.md.
  6. The matching pg_notify trigger is replaced: trigger_document_matching_event no longer exists; trigger_processor_run_event fires SSE events for any processor run transition.
  7. Test fixtures stand up the new SQS queue at startup — confirm by running any worker-system-touching test.
  8. PENDING-CLEANUPS.md has appended entries for ap_ingestion_post_process_stat, the four matching columns, the matching_status ENUM, reconciliation_status, and the moved structured_data keys.

If any item fails verification, return to the relevant task and complete the missing piece.