Note (2026-04-24): After this document was written,
legal_entitywas renamed totenantand the oldtenantwas renamed toorganization. Read references to these terms with the pre-rename meaning.
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Unify the three ways document derivations run today (post-processor pipeline, matching SQS worker, new diagnostics-recompute) under a single IProcessor protocol (v2) + a single run-processors! engine. Add conditional recomputation on edits (declared reads/writes, leaf-only, malli seq-regex) and write-protection against user edits.
Architecture: One protocol, one engine, two callers (ingestion-complete + edit-recompute + matching worker, which all use the same engine). Every processor declares its reads (malli seq-regex patterns over clj-path segments), its writes (for the op-filter), its diagnostic slice, and its modes. The engine handles run-row lifecycle, conditional scheduling (edit mode), op filtering against user-edited paths, and diagnostic slice writes. Matching and reconciliation become one processor (reconciliation moves into matching's -compute); matching lives alongside the other processors under workers.ap.processors.*.
Tech Stack: Clojure, Malli (seq-regex schemas for reads patterns), PostgreSQL (JSONB jsonb_set for sub-path writes), core.async/thread for per-phase concurrency, clojure.test + embedded-postgres + Testcontainers/LocalStack for integration tests.
Reference spec: docs/superpowers/specs/2026-04-15-unified-processors-design.md
Depends on:
document.diagnostics, document_processor_run, db.document-diagnostics/update-slice!, db.document-processor-run/*.document_history, the edit handlers at app.http.documents.edits, and the current (stub) diagnostics-recompute SQS path.resources/migrations/20260415120000-add-derivation-change-type.up.sql — adds 'derivation' to document_history_change_type enum; relaxes CHECK constraint to allow change_type='derivation' with null ingestion_id + edited_by.resources/migrations/20260415120000-add-derivation-change-type.down.sql — no-op (can't remove enum values cleanly in PG; documented).src/com/getorcha/workers/ap/processors.clj — IProcessor protocol (v2) + default method implementations.src/com/getorcha/workers/ap/processors/engine.clj — run-processors! + phase scheduler + op filter + run-row lifecycle.src/com/getorcha/workers/ap/processors/reads.clj — path-pattern helpers (read-leaf, match-any?, leaf-expansion).src/com/getorcha/document/provenance.clj — extracted from app.http.documents.view.provenance. Exports document-provenance (path → edit-meta map) and user-edited-paths (just the set of clj-paths).src/com/getorcha/workers/ap/processors/accounts.clj — migrated processor (moved from post_process/accounts.clj).src/com/getorcha/workers/ap/processors/accruals.clj — migrated.src/com/getorcha/workers/ap/processors/cost_center.clj — migrated.src/com/getorcha/workers/ap/processors/financial_validation.clj — migrated; owns invoice :validations.financial-math sub-path; calls validation/check-financial-math (pure) then LLM-refines.src/com/getorcha/workers/ap/processors/fraud.clj — migrated.src/com/getorcha/workers/ap/processors/supplier.clj — migrated (both SupplierMatcher + SupplierVerifier).src/com/getorcha/workers/ap/processors/tax_compliance.clj — migrated; multi-slice owner of :tax-issues + :line-items.src/com/getorcha/workers/ap/processors/uncertain_validations.clj — migrated; owns invoice :validations.{required-fields,date-reasonableness,recipient-identity} sub-paths; calls the pure checks then LLM-refines.src/com/getorcha/workers/ap/processors/validations.clj — NEW; wraps the deterministic check-* functions (per-doc-type dispatch; invoice-only keeps 5 sub-paths, other types own full :validations slice).src/com/getorcha/workers/ap/processors/matching.clj — NEW; wraps match-document! + reconcile-cluster! as one processor.src/com/getorcha/workers/ap/processors/matching/queue.clj — NEW; holds enqueue! (the renamed publish-document-ready!).src/com/getorcha/notifications/processor.clj — NEW unified notification builder (:processor/failure kind). Replaces :matching/permanent-failure + :reconciliation/failure + ad-hoc anomaly code in ingestion.test/com/getorcha/document/provenance_test.cljtest/com/getorcha/workers/ap/processors_test.clj (protocol + default impls)test/com/getorcha/workers/ap/processors/engine_test.cljtest/com/getorcha/workers/ap/processors/reads_test.cljtest/com/getorcha/workers/ap/processors/accounts_test.clj (moved from post_process)test/com/getorcha/workers/ap/processors/accruals_test.cljtest/com/getorcha/workers/ap/processors/cost_center_test.cljtest/com/getorcha/workers/ap/processors/financial_validation_test.cljtest/com/getorcha/workers/ap/processors/fraud_test.cljtest/com/getorcha/workers/ap/processors/supplier_test.cljtest/com/getorcha/workers/ap/processors/tax_compliance_test.cljtest/com/getorcha/workers/ap/processors/uncertain_validations_test.cljtest/com/getorcha/workers/ap/processors/validations_test.cljtest/com/getorcha/workers/ap/processors/matching_test.cljsrc/com/getorcha/workers/ap/matching/core.clj → src/com/getorcha/workers/ap/processors/matching/core.cljsrc/com/getorcha/workers/ap/matching/candidates.clj → …/processors/matching/candidates.cljsrc/com/getorcha/workers/ap/matching/evidence.clj → …/processors/matching/evidence.cljsrc/com/getorcha/workers/ap/matching/llm_decision.clj → …/processors/matching/llm_decision.cljsrc/com/getorcha/workers/ap/matching/normalize.clj → …/processors/matching/normalize.cljsrc/com/getorcha/workers/ap/matching/reconciliation.clj → …/processors/matching/reconciliation.cljsrc/com/getorcha/workers/ap/matching/searchable_text.clj → …/processors/matching/searchable_text.cljsrc/com/getorcha/workers/ap/matching/worker.clj → src/com/getorcha/workers/ap/processors/matching/worker.cljprocessors/*).src/com/getorcha/db/document_diagnostics.clj — rename update-slice! to update-diagnostic!, add sub-path + multi-slice support via jsonb_set; update all callers (matching.worker, matching.reconciliation, ingestion).src/com/getorcha/db/document_processor_run.clj — add :document-version kwarg to count-runs.src/com/getorcha/app/http/documents/view/provenance.clj — delete; callers require com.getorcha.document.provenance instead.src/com/getorcha/app/http/documents/view/shared.clj — require new provenance ns.src/com/getorcha/app/http/documents/view/invoice.clj — require new provenance ns.src/com/getorcha/workers/ap/ingestion.clj — replace with-validations + post-process/run calls with one run-processors! call + matching.queue/enqueue!. Strip logic in complete-ingestion! goes away (processors write to diagnostic slices directly).src/com/getorcha/workers/ap/ingestion/validation.clj — make check-tax-id-format, check-iban, check-issuer-country, check-recipient-country, check-large-document-summary-only public (remove ^:private); drop the validate multimethod and all its defmethods.src/com/getorcha/workers/ap/ingestion/post_process.clj — delete.src/com/getorcha/workers/ap/ingestion/post_process/*.clj — delete (content moved).src/com/getorcha/workers/diagnostics_recompute/orchestrator.clj — replace stubs with one run-processors! call in edit mode.src/com/getorcha/workers/diagnostics_recompute.clj — expand ctx with :llm-config, :search-config, :notifications.src/com/getorcha/notifications.clj — add :processor/failure template.resources/com/getorcha/config.edn — diagnostics-recompute/consumer gains :llm-config, :search-config, :notifications; rename matching.worker/* ns refs to processors.matching.worker/*.After this phase: the IProcessor protocol exists, the engine runs, helpers for reads/writes exist, document-provenance is exported from a neutral namespace, and the document_history_change_type enum accepts 'derivation' rows. No processors have been migrated yet; the post-process pipeline + matching worker continue to work unchanged.
'derivation' to change_type enumFiles:
Create: resources/migrations/20260415120000-add-derivation-change-type.up.sql
Create: resources/migrations/20260415120000-add-derivation-change-type.down.sql
Step 1: Write the up migration
Write resources/migrations/20260415120000-add-derivation-change-type.up.sql:
-- Add 'derivation' to document_history_change_type enum.
-- Derivation rows capture aggregate structured-data mutations written
-- by the processors engine during edit-mode recomputes.
--
-- ALTER TYPE ADD VALUE cannot run inside a transaction in PostgreSQL.
--;;
ALTER TYPE document_history_change_type ADD VALUE IF NOT EXISTS 'derivation';
--;;
-- Relax the CHECK constraint so derivation rows (edited_by NULL,
-- ingestion_id NULL) are allowed.
ALTER TABLE document_history
DROP CONSTRAINT document_history_exactly_one_source;
--;;
ALTER TABLE document_history
ADD CONSTRAINT document_history_exactly_one_source
CHECK (
(change_type = 'ingestion' AND ingestion_id IS NOT NULL AND edited_by IS NULL)
OR (change_type = 'edit' AND ingestion_id IS NULL AND edited_by IS NOT NULL)
OR (change_type = 'derivation' AND ingestion_id IS NULL AND edited_by IS NULL)
);
Note: the existing constraint name — verify with psql \d+ document_history or by checking resources/migrations/20260413161401-add-document-history.up.sql line-by-line; use the correct constraint name in the migration.
Write resources/migrations/20260415120000-add-derivation-change-type.down.sql:
-- PostgreSQL does not support removing values from an enum. The down
-- migration can only revert the CHECK constraint; the 'derivation'
-- value remains in the enum (inert if no rows reference it).
ALTER TABLE document_history
DROP CONSTRAINT document_history_exactly_one_source;
--;;
ALTER TABLE document_history
ADD CONSTRAINT document_history_exactly_one_source
CHECK (
(change_type = 'ingestion' AND ingestion_id IS NOT NULL AND edited_by IS NULL)
OR (change_type = 'edit' AND ingestion_id IS NULL AND edited_by IS NOT NULL)
);
clj -X:dev-run :fn 'com.getorcha.db.migrations/migrate' 2>&1 | tail -10
Expected: "migration 20260415120000 applied". Then confirm via psql:
psql -h localhost -U postgres -d orcha -c "\dT+ document_history_change_type"
Expected output lists ingestion, edit, derivation.
Open src/com/getorcha/db/document_history.clj, find the insert! fn's assertion (line 27-32), and extend to accept :derivation:
(assert (case change-type
:ingestion (and (some? ingestion-id) (nil? edited-by))
:edit (and (nil? ingestion-id) (some? edited-by))
:derivation (and (nil? ingestion-id) (nil? edited-by))
false)
(str "document_history insert XOR violation; got change-type="
(pr-str change-type)
" ingestion-id=" (pr-str ingestion-id)
" edited-by=" (pr-str edited-by)))
git add resources/migrations/20260415120000-add-derivation-change-type.up.sql resources/migrations/20260415120000-add-derivation-change-type.down.sql src/com/getorcha/db/document_history.clj
git commit -m "migration: add derivation to document_history_change_type"
Files:
Create: src/com/getorcha/document/provenance.clj
Create: test/com/getorcha/document/provenance_test.clj
Modify: src/com/getorcha/app/http/documents/view/shared.clj
Modify: src/com/getorcha/app/http/documents/view/invoice.clj
Delete: src/com/getorcha/app/http/documents/view/provenance.clj
Step 1: Create the new provenance ns
Write src/com/getorcha/document/provenance.clj:
(ns com.getorcha.document.provenance
"Per-path provenance for a document, derived from document_history.
Shared between the UI (which tints user-edited fields) and the
processor engine (which refuses to overwrite user-edited paths).
Walks document_history newest → oldest, collecting op paths up to
(but not including) the most recent :ingestion row. Post-ingestion
edits only: anything before the most recent ingestion is 'rewritten'
by that ingestion and no longer user-sourced."
(:require [com.getorcha.db.document-history :as document-history]
[com.getorcha.json-patch.path :as json-patch.path]))
(defn document-provenance
"Returns {json-pointer-string → {:edited-by :edited-at}} for every
path with a human edit since the most recent ingestion for this
document. Paths absent from the map are implicitly LLM-sourced.
Within the post-ingestion edits, the newest entry at each path wins
(first-seen-wins while iterating newest → oldest)."
[db-pool document-id]
(let [rows (document-history/rows-for-document db-pool document-id)
post-ingest (take-while #(not= :ingestion
(:document-history/change-type %))
rows)]
(reduce
(fn [acc {:document-history/keys [patch edited-by created-at]}]
(reduce (fn [acc' op]
(let [path (:path op)]
(if (contains? acc' path)
acc'
(assoc acc' path {:edited-by edited-by
:edited-at created-at}))))
acc
patch))
{}
post-ingest)))
(defn user-edited-paths
"Returns #{clj-path ...} — the set of user-edited paths since the
most recent ingestion, pre-parsed to clj-paths for prefix-matching."
[db-pool document-id]
(->> (document-provenance db-pool document-id)
keys
(into #{} (map json-patch.path/pointer->clj-path))))
Write test/com/getorcha/document/provenance_test.clj:
(ns com.getorcha.document.provenance-test
(:require [clojure.test :refer [deftest is testing use-fixtures]]
[com.getorcha.db.document-history :as document-history]
[com.getorcha.document.provenance :as provenance]
[com.getorcha.test.fixtures :as fixtures]
[com.getorcha.test.notification-helpers :as helpers]))
(use-fixtures :once fixtures/with-running-system)
(use-fixtures :each fixtures/with-db-rollback)
(defn ^:private seed-doc!
[]
(let [legal-entity-id (helpers/create-legal-entity!)
_identity-id (helpers/create-identity!)
document-id (helpers/create-document! legal-entity-id)]
document-id))
(deftest document-provenance-collects-edits-since-last-ingestion
(testing "edits after the most recent ingestion are returned; earlier edits are not"
(let [document-id (seed-doc!)
identity-id (helpers/create-identity!)
_ (document-history/insert!
fixtures/*db*
{:document-id document-id
:change-type :edit
:edited-by identity-id
:patch [{"op" "replace" "path" "/issuer/name" "value" "A"}]})
_ (document-history/insert!
fixtures/*db*
{:document-id document-id
:change-type :ingestion
:ingestion-id (helpers/create-ingestion! document-id)
:patch []})
_ (document-history/insert!
fixtures/*db*
{:document-id document-id
:change-type :edit
:edited-by identity-id
:patch [{"op" "replace" "path" "/issuer/name" "value" "B"}
{"op" "replace" "path" "/total" "value" 42}]})
result (provenance/document-provenance fixtures/*db* document-id)]
(is (= #{"/issuer/name" "/total"} (set (keys result))))
(is (= identity-id (get-in result ["/issuer/name" :edited-by]))))))
(deftest user-edited-paths-returns-clj-paths
(testing "returns clj-paths ready for prefix matching"
(let [document-id (seed-doc!)
identity-id (helpers/create-identity!)
_ (document-history/insert!
fixtures/*db*
{:document-id document-id
:change-type :edit
:edited-by identity-id
:patch [{"op" "replace" "path" "/issuer/name" "value" "A"}
{"op" "replace" "path" "/line-items[id=abc]/description" "value" "x"}]})
result (provenance/user-edited-paths fixtures/*db* document-id)]
(is (contains? result [:issuer :name]))
(is (contains? result [:line-items {:id "abc"} :description])))))
Run: clj -X:test:silent :nses '[com.getorcha.document.provenance-test]' 2>&1 | grep -E "FAIL|ERROR|Ran"
Expected: PASS (fn already works since it's a straight lift of the existing UI code).
In src/com/getorcha/app/http/documents/view/shared.clj, replace the require [com.getorcha.app.http.documents.view.provenance :as provenance] with [com.getorcha.document.provenance :as provenance] (same local alias, same call sites).
Repeat in src/com/getorcha/app/http/documents/view/invoice.clj and any other caller (grep for view.provenance).
rm src/com/getorcha/app/http/documents/view/provenance.clj
clj-kondo --lint src/com/getorcha/document src/com/getorcha/app/http/documents
clj -X:test:silent :nses '[com.getorcha.app.http.documents.view.invoice-test]' 2>&1 | grep -E "FAIL|ERROR|Ran"
Expected: no lint errors, tests pass (UI continues to render per-field provenance).
git add src/com/getorcha/document/ test/com/getorcha/document/ src/com/getorcha/app/http/documents/
git commit -m "refactor: extract document-provenance to shared ns"
Files:
Create: src/com/getorcha/workers/ap/processors.clj
Create: test/com/getorcha/workers/ap/processors_test.clj
Step 1: Write the protocol namespace
Write src/com/getorcha/workers/ap/processors.clj:
(ns com.getorcha.workers.ap.processors
"IProcessor protocol v2 and default method implementations.
Version 1 of IProcessor lived in
`com.getorcha.workers.ap.ingestion.post-process.protocol`. This
version extends it with declarative reads/writes/diagnostic/modes
metadata and an ops-based -apply-ops (see spec §3)."
(:refer-clojure :exclude [get]))
(defprotocol IProcessor
\"A unit of derivation that runs against document state.
See docs/superpowers/specs/2026-04-15-unified-processors-design.md.\"
(-id [this]
\"Returns the processor's stable keyword id (e.g. :accounts).\")
(-reads [this state]
\"Returns a vector of malli seq-regex patterns over clj-path
segments, declaring the leaf paths this processor reads.
Dispatches on state for per-doc-type reads (e.g. matching reads
differ for invoice vs. contract). Ignored when -always? is true.\")
(-writes [this state]
\"Returns a vector of malli seq-regex patterns. Empty when the
processor writes only to a diagnostic slice.\")
(-diagnostic [this state]
\"Returns one of:
nil
{:slice :kw}
{:slice :kw :sub-path [k1 k2 ...]}
{:slice :kw :sub-paths [[k1] [k2] ...]}
[spec1 spec2 ...] ;; multi-slice
See spec §3.1.\")
(-modes [this]
\"Returns a non-empty subset of #{:ingestion :edit}.\")
(-always? [this]
\"Returns true if the processor bypasses the conditional scheduler
filter in :edit mode. Defaults to false.\")
(-compute [this ctx state]
\"Returns {:result <map-or-value> :stats <llm-stats-map-or-nil>}.
When -diagnostic returns a multi-slice or sub-paths spec, the
result map MUST have top-level keys matching each slice name.\")
(-apply-ops [this state result]
\"Returns a vector of JSON-Patch ops (maps with string keys:
\\\"op\\\" \\\"path\\\" \\\"value\\\") that should be applied to
state's structured-data. Empty when the processor only writes
to a diagnostic slice.\"))
(def defaults
\"Default method implementations. Processors may use (extend-type)
with these to avoid reimplementing trivial methods.\"
{:reads (fn [_this _state] [])
:writes (fn [_this _state] [])
:diagnostic (fn [_this _state] nil)
:modes (fn [_this] #{:ingestion :edit})
:always? (fn [_this] false)
:apply-ops (fn [_this _state _result] [])})
Write test/com/getorcha/workers/ap/processors_test.clj:
(ns com.getorcha.workers.ap.processors-test
(:require [clojure.test :refer [deftest is testing]]
[com.getorcha.workers.ap.processors :as proc]))
(defrecord ^:private MinimalProcessor []
proc/IProcessor
(-id [_] :minimal)
(-reads [_ _state] [])
(-writes [_ _state] [])
(-diagnostic [_ _state] nil)
(-modes [_] #{:ingestion})
(-always? [_] false)
(-compute [_ _ctx _state] {:result {:ok true} :stats nil})
(-apply-ops [_ _state _result] []))
(deftest protocol-exposes-required-methods
(let [p (->MinimalProcessor)]
(is (= :minimal (proc/-id p)))
(is (= #{:ingestion} (proc/-modes p)))
(is (false? (proc/-always? p)))
(is (nil? (proc/-diagnostic p nil)))
(is (= {:result {:ok true} :stats nil} (proc/-compute p nil nil)))
(is (= [] (proc/-apply-ops p nil {:ok true})))))
Run: clj -X:test:silent :nses '[com.getorcha.workers.ap.processors-test]' 2>&1 | grep -E "FAIL|ERROR|Ran"
Expected: PASS.
git add src/com/getorcha/workers/ap/processors.clj test/com/getorcha/workers/ap/processors_test.clj
git commit -m "feat: IProcessor v2 protocol"
Files:
Create: src/com/getorcha/workers/ap/processors/reads.clj
Create: test/com/getorcha/workers/ap/processors/reads_test.clj
Step 1: Write the reads ns with read-leaf + match-any?
Write src/com/getorcha/workers/ap/processors/reads.clj:
(ns com.getorcha.workers.ap.processors.reads
"Helpers for declaring and matching processor reads/writes against
edit paths.
Authors declare reads as concise segment vectors:
(read-leaf :issuer :name)
(read-leaf :line-items :* :description)
These lower to malli seq-regex patterns:
[:cat [:= :issuer] [:= :name]]
[:cat [:= :line-items] :any [:= :description]]
which are validated against clj-paths produced by
`com.getorcha.json-patch.path/pointer->clj-path`. A patch segment
of {:id X} matches :any in the pattern (see spec §2.3)."
(:require [com.getorcha.schema.structured-data :as schema.structured-data]
[malli.core :as m]
[malli.util :as m.util]))
(defn read-leaf
"Builds a malli seq-regex pattern from a concise segment vector.
`:*` in the segments becomes `:any`; every other segment becomes
`[:= seg]`."
[& segs]
(into [:cat]
(map (fn [seg]
(cond
(= seg :*) :any
:else [:= seg])))
segs))
(defn match-any?
"Returns true when any of `patterns` m/validates `clj-path`.
`clj-path` is a vector of segments (keywords, ints, {:id X} maps)."
[patterns clj-path]
(boolean (some #(m/validate % clj-path) patterns)))
(defn ^:private atomic-schema?
"True when `schema` is a primitive or a vector of primitives —
anything that represents a leaf in structured-data."
[schema]
(let [schema (loop [s schema]
(if (and s (= :maybe (m/type s)))
(recur (first (m/children s)))
s))
kind (m/type schema)]
(or (#{:int :double :string :boolean :keyword :uuid :any
number? int? double? string? boolean? keyword? uuid?} kind)
(and (= :vector kind)
(atomic-schema? (first (m/children schema))))
(and (= :enum kind)))))
(defn ^:private walk-leaves
"Walks `value` against `schema`, yielding {:path clj-path :value v}
for every atomic descendant."
[schema clj-path value]
(let [schema (loop [s schema]
(if (and s (= :maybe (m/type s)))
(recur (first (m/children s)))
s))]
(cond
(nil? schema)
[{:path clj-path :value value}]
(atomic-schema? schema)
[{:path clj-path :value value}]
(and (map? value) (= :map (m/type schema)))
(mapcat (fn [[k v]]
(let [child (m.util/get schema k)]
(when child
(walk-leaves child (conj clj-path k) v))))
value)
(and (vector? value) (= :vector (m/type schema)))
(mapcat (fn [item]
(let [child (m.util/get schema 0)]
(when child
(walk-leaves child
(conj clj-path
(if (and (map? item) (:id item))
{:id (:id item)}
(count clj-path)))
item))))
value)
:else
[{:path clj-path :value value}])))
(defn expand-op-to-leaves
"Given a patch op and (optionally) a pre-patch structured-data
snapshot, returns a sequence of virtual leaf clj-paths the op
touches.
For scalar/leaf ops returns `[clj-path]`. For non-leaf ops (add
whole object, remove whole object, replace subtree), walks the
value (or pre-patch snapshot for :remove) to enumerate descendant
leaves under the StructuredData malli schema for the document type."
[op pre-patch-sd {:keys [document-type]}]
(let [op-path (:path op)
clj (com.getorcha.json-patch.path/pointer->clj-path op-path)
sd (when pre-patch-sd pre-patch-sd)
type-schema (m.util/get schema.structured-data/StructuredData document-type)
subschema (reduce (fn [s seg]
(let [s (loop [s s]
(if (and s (= :maybe (m/type s)))
(recur (first (m/children s)))
s))]
(cond
(nil? s) (reduced nil)
(or (int? seg) (map? seg)) (m.util/get s 0)
:else (m.util/get s seg))))
type-schema
clj)]
(cond
;; Atomic leaf — the op path itself is a leaf.
(atomic-schema? subschema)
[clj]
;; Non-leaf op with a value (add or replace) — walk the value.
(and subschema (contains? op :value))
(map :path (walk-leaves subschema clj (:value op)))
;; Non-leaf remove — walk the pre-patch snapshot.
(and subschema (= "remove" (get op "op")))
(let [v (reduce (fn [v seg]
(cond
(nil? v) nil
(and (map? seg) (:id seg))
(some (fn [i] (when (= (:id seg) (:id i)) i)) v)
:else (get v seg)))
sd
clj)]
(map :path (walk-leaves subschema clj v)))
:else
[clj])))
Write test/com/getorcha/workers/ap/processors/reads_test.clj:
(ns com.getorcha.workers.ap.processors.reads-test
(:require [clojure.test :refer [deftest is testing]]
[com.getorcha.workers.ap.processors.reads :as reads]))
(deftest read-leaf-builds-seq-regex
(testing "fixed segments become [:= seg], :* becomes :any"
(is (= [:cat [:= :issuer] [:= :name]]
(reads/read-leaf :issuer :name)))
(is (= [:cat [:= :line-items] :any [:= :description]]
(reads/read-leaf :line-items :* :description)))))
(deftest match-any-accepts-exact-paths
(testing "exact match"
(is (true? (reads/match-any? [(reads/read-leaf :issuer :name)] [:issuer :name])))
(is (false? (reads/match-any? [(reads/read-leaf :issuer :name)] [:issuer :country])))))
(deftest match-any-wildcards-line-item-id
(testing ":any matches {:id X}"
(is (true?
(reads/match-any? [(reads/read-leaf :line-items :* :description)]
[:line-items {:id "abc-123"} :description])))
(is (false?
(reads/match-any? [(reads/read-leaf :line-items :* :description)]
[:line-items {:id "abc-123"} :amount])))))
(deftest expand-op-to-leaves-passes-through-scalar
(testing "leaf path returns itself"
(let [op {:path "/issuer/name" :value "Acme"}]
(is (= [[:issuer :name]]
(reads/expand-op-to-leaves op nil {:document-type "invoice"}))))))
(deftest expand-op-to-leaves-walks-object-add
(testing "add of a whole line-item expands to its leaves"
(let [op {"op" "add"
:path "/line-items/-"
:value {:id "new" :order 1 :description "x" :amount 10.0}}]
(is (contains? (set (reads/expand-op-to-leaves op nil {:document-type "invoice"}))
[:line-items {:id "new"} :description])))))
Run: clj -X:test:silent :nses '[com.getorcha.workers.ap.processors.reads-test]' 2>&1 | grep -E "FAIL|ERROR|Ran"
Expected: PASS.
git add src/com/getorcha/workers/ap/processors/reads.clj test/com/getorcha/workers/ap/processors/reads_test.clj
git commit -m "feat: processor reads + leaf-expansion helpers"
update-slice! → update-diagnostic!)Files:
Modify: src/com/getorcha/db/document_diagnostics.clj
Step 1: Add sub-path support via jsonb_set
Replace src/com/getorcha/db/document_diagnostics.clj body with:
(ns com.getorcha.db.document-diagnostics
"Helpers for writing slices (and optional sub-paths) into
document.diagnostics with derived needs_human_review recomputation."
(:require [cheshire.core :as json]
[com.getorcha.db.sql :as db.sql]
[honey.sql.pg-ops]))
(defn ^:private jsonb-path-array
[slice-key sub-path]
(into [slice-key] (map name) sub-path))
(defn update-diagnostic!
"Merges `slice-value` into document.diagnostics at
`[slice-key & sub-path]`. Also recomputes
document.needs_human_review from the merged diagnostics.
Atomic per-update (single SQL statement using jsonb_set).
No-op if `expected-version` is provided and document.version has
advanced past it.
Arity:
(update-diagnostic! tx doc-id slice-key slice-value)
(update-diagnostic! tx doc-id slice-key sub-path slice-value)
(update-diagnostic! tx doc-id slice-key sub-path slice-value expected-version)
`sub-path` is a vector of keywords/strings ([] = replace whole slice)."
([tx document-id slice-key slice-value]
(update-diagnostic! tx document-id slice-key [] slice-value nil))
([tx document-id slice-key sub-path slice-value]
(update-diagnostic! tx document-id slice-key sub-path slice-value nil))
([tx document-id slice-key sub-path slice-value expected-version]
(let [path-arr (jsonb-path-array slice-key sub-path)
jsonb (json/generate-string slice-value)]
(db.sql/execute!
tx
(cond-> {:update :document
:set {:diagnostics
[:case
[:= :diagnostics nil]
[:cast (json/generate-string {slice-key (if (seq sub-path)
(reduce (fn [m k]
{k m})
slice-value
(reverse sub-path))
slice-value)})
:jsonb]
:else
[:jsonb_set
[:coalesce :diagnostics [:cast "{}" :jsonb]]
[:cast [:array (mapv name path-arr)] [:raw "text[]"]]
[:cast jsonb :jsonb]
true]]
:updated-at [:now]}
:where [:= :id document-id]}
expected-version (update :where (fn [w] [:and w [:= :version expected-version]]))))
(db.sql/execute!
tx
{:update :document
:set {:needs-human-review
[:or
[:exists
{:select [1]
:from [[[:jsonb_each [:-> :diagnostics [:inline "validations"]]]
[[:raw "t(k, v)"]]]]
:where [:= [:->> :v [:inline "status"]] [:inline "error"]]}]
[:exists
{:select [1]
:from [[[:jsonb_array_elements [:-> :diagnostics [:inline "fraud-flags"]]]
[[:raw "f"]]]]
:where [:= [:->> :f [:inline "severity"]] [:inline "critical"]]}]]}
:where [:= :id document-id]}))))
update-slice! to update-diagnostic!No alias is kept. Existing callers:
grep -rn "update-slice!" src test | grep -v document_diagnostics.clj
Expected callers: workers/ap/ingestion.clj, workers/ap/matching/worker.clj, workers/ap/matching/reconciliation.clj (after C1 rename, they're under processors/matching/), possibly more. Replace each (update-slice! …) with (update-diagnostic! …).
None of these callers use the 5-arg form with expected-version; all are 4-arg. After rename, the call-sites continue to compile unchanged except for the fn name.
Append to test/com/getorcha/db/document_diagnostics_test.clj (create if absent):
(deftest sub-path-merge-leaves-sibling-keys-intact
(testing "writing a sub-path does not clobber sibling keys in the slice"
(let [doc-id (helpers/create-document! (helpers/create-legal-entity!))]
(diagnostics/update-diagnostic! fixtures/*db* doc-id
:validations
{:tax-id-format {:status "pass"}
:iban-format {:status "pass"}})
(diagnostics/update-diagnostic! fixtures/*db* doc-id
:validations [:tax-id-format]
{:status "warning" :message "x"})
(let [doc (db.sql/execute-one! fixtures/*db*
{:select [:diagnostics]
:from [:document]
:where [:= :id doc-id]})]
(is (= "warning" (get-in doc [:document/diagnostics :validations :tax-id-format :status])))
(is (= "pass" (get-in doc [:document/diagnostics :validations :iban-format :status])))))))
Run: clj -X:test:silent :nses '[com.getorcha.db.document-diagnostics-test]' 2>&1 | grep -E "FAIL|ERROR|Ran"
Expected: PASS.
git add src/com/getorcha/db/document_diagnostics.clj test/com/getorcha/db/document_diagnostics_test.clj
git commit -m "feat: update-diagnostic! supports sub-path writes"
Files:
Create: src/com/getorcha/workers/ap/processors/engine.clj
Create: test/com/getorcha/workers/ap/processors/engine_test.clj
Step 1: Write the engine with core flow only
Write src/com/getorcha/workers/ap/processors/engine.clj:
(ns com.getorcha.workers.ap.processors.engine
"Run-processors engine: phase-ordered, parallel-within-phase,
run-row lifecycle, diagnostic slice writes, ops-based apply.
See docs/superpowers/specs/2026-04-15-unified-processors-design.md §4."
(:require [cheshire.core :as json]
[clojure.core.async :as a]
[clojure.tools.logging :as log]
[com.getorcha.db.document-diagnostics :as db.diagnostics]
[com.getorcha.db.document-history :as db.document-history]
[com.getorcha.db.document-processor-run :as db.run]
[com.getorcha.db.sql :as db.sql]
[com.getorcha.json-patch :as json-patch]
[com.getorcha.json-patch.path :as json-patch.path]
[com.getorcha.workers.ap.processors :as proc]))
(defn ^:private trigger-params
"Maps state -> run-row-insert params (without :processor-id)."
[{:keys [document trigger-kind ingestion-id history-id commit-sha] :as _state}]
(let [base {:document-id (:document/id document)
:trigger-kind trigger-kind
:document-version (:document/version document)
:commit-sha commit-sha}]
(cond-> base
ingestion-id (assoc :ingestion-id ingestion-id)
history-id (assoc :triggered-by-history-id history-id))))
(defn ^:private specs-seq
"Normalises a -diagnostic return to a sequence of {:slice :sub-path}
or {:slice :sub-paths [...]} specs."
[diag]
(cond
(nil? diag) nil
(vector? diag) diag
:else [diag]))
(defn ^:private write-diagnostic!
"Writes `result` to one or more diagnostic slices/sub-paths per the
declared `diag` spec (see spec §3.1 and §13).
- `{:slice K}` or `{:slice K :sub-path P}`: one write; value is
the whole result (or result itself if result is a map keyed by
slice names).
- `{:slice K :sub-paths [[p1] [p2] ...]}`: one write per sub-path;
value is `(get result (first sub-path))`.
- Vector of specs: same rule applied per spec; each slice-K pulls
`(get result K)` as its value."
[db-pool document-id diag result version]
(doseq [{:keys [slice sub-path sub-paths] :as spec} (specs-seq diag)]
(cond
sub-paths
(doseq [sp sub-paths]
(db.diagnostics/update-diagnostic!
db-pool document-id slice sp (get result (first sp)) version))
sub-path
(db.diagnostics/update-diagnostic!
db-pool document-id slice sub-path result version)
:else
;; Whole-slice write. In multi-slice contexts the result is a
;; map keyed by slice names; pick our slice's value.
(let [value (if (and (map? result) (contains? result slice))
(get result slice)
result)]
(db.diagnostics/update-diagnostic!
db-pool document-id slice [] value version)))))
(defn ^:private apply-ops-in-memory
"Applies `ops` to `state`'s in-memory structured-data only. No DB write."
[state ops]
(if (empty? ops)
state
(let [new-sd (json-patch/apply-patch (:structured-data state) ops)]
(-> state
(assoc :structured-data new-sd)
(assoc-in [:document :document/structured-data] new-sd)))))
(defn ^:private refresh-diagnostics
"Refetches the latest diagnostics from the DB and populates
`state.diagnostics`. Called between phases so later phases see
earlier-phase diagnostic writes."
[db-pool state]
(let [doc-id (get-in state [:document :document/id])
row (db.sql/execute-one! db-pool
{:select [:diagnostics]
:from [:document]
:where [:= :id doc-id]})]
(assoc state :diagnostics (:document/diagnostics row))))
(defn ^:private should-run?
"Engine scheduler gate. Stub in A5 (always true); A6 implements the
real conditional logic."
[_db-pool _state _processor]
true)
(defn ^:private filter-ops-against-user-edits
"Blocks processor ops that target user-edited paths. Stub in A5
(pass-through); A7 implements the prefix-match filter."
[_user-edited-paths _processor-id ops]
ops)
(defn ^:private run-processor!
"Runs a single processor: inserts running row, -compute, writes
diagnostic + builds ops, completes/fails row. Returns the ops
that should be applied (possibly empty)."
[{:keys [db-pool user-edited-paths] :as ctx} state processor]
(let [run-id (db.run/insert-running!
db-pool
(assoc (trigger-params state) :processor-id (name (proc/-id processor))))]
(try
(let [{:keys [result stats]} (proc/-compute processor ctx state)
diag (proc/-diagnostic processor state)
raw-ops (proc/-apply-ops processor state result)
ops (case (:mode state)
:edit (filter-ops-against-user-edits
(or user-edited-paths #{})
(proc/-id processor)
raw-ops)
raw-ops)]
(write-diagnostic! db-pool
(get-in state [:document :document/id])
diag
result
(get-in state [:document :document/version]))
(db.run/complete-run! db-pool run-id (assoc stats :result result))
ops)
(catch Throwable t
(db.run/fail-run! db-pool run-id (.getMessage t))
(log/error t "Processor failed"
{:processor (proc/-id processor)
:document (get-in state [:document :document/id])})
;; Notification hook is added in Task A8.
[]))))
(defn ^:private run-phase!
[ctx state processors]
(let [state (refresh-diagnostics (:db-pool ctx) state)
to-run (filterv #(should-run? (:db-pool ctx) state %) processors)
chans (mapv (fn [p] (a/thread (run-processor! ctx state p))) to-run)
all-ops (reduce into [] (mapv a/<!! chans))]
(apply-ops-in-memory state all-ops)))
(defn ^:private persist-derivation!
"After an :edit-mode run completes, if structured-data has changed,
compute the aggregate JSON-Patch (diff initial → final) and persist
as a document.structured_data update + document_history row with
change_type=:derivation + version++."
[db-pool {:keys [document initial-structured-data structured-data history-id]}]
(let [doc-id (:document/id document)
initial initial-structured-data
final structured-data]
(when (and initial (not= initial final))
(let [patch (json-patch/diff-patch initial final)
new-version (inc (:document/version document))]
(when (seq patch)
(db.sql/with-transaction [tx db-pool]
(db.sql/execute!
tx
{:update :document
:set {:structured-data [:lift final]
:version new-version
:updated-at [:now]}
:where [:= :id doc-id]})
(db.document-history/insert!
tx
{:document-id doc-id
:change-type :derivation
:patch patch})))))))
(defn run-processors!
"Runs every phase in order. Within a phase, processors run
concurrently. Returns the final state (with applied ops folded in).
Caller must fill state's :mode, :trigger-kind, :ingestion-id /
:history-id / :edited-by, :document (row), :structured-data,
:legal-entity, :file, :commit-sha, and (in :edit mode)
:changed-leaves as appropriate. The `phases` argument is a vector
of vectors of IProcessor instances."
[ctx state phases]
(let [initial (:structured-data state)
state (-> state
(assoc :initial-structured-data initial)
(cond-> (= :edit (:mode state))
(assoc :user-edited-paths
(com.getorcha.document.provenance/user-edited-paths
(:db-pool ctx)
(get-in state [:document :document/id])))))
ctx (cond-> ctx
(:user-edited-paths state)
(assoc :user-edited-paths (:user-edited-paths state)))
final (reduce (fn [st phase] (run-phase! ctx st phase)) state phases)]
(when (= :edit (:mode state))
(persist-derivation! (:db-pool ctx) final))
final))
Note on json-patch/diff-patch: this function does not exist
today. Task A5 must add it to src/com/getorcha/json_patch.clj: a
simple diff returning JSON-Patch ops that transform initial into
final. A naive implementation walks both values simultaneously,
emitting replace ops for leaves that differ and add/remove
ops for keys that were added or dropped. Add a small test. If a
production-grade diff turns out non-trivial, fall back to the
per-op :apply-ops record (keep ops from every phase, flatten,
emit as the derivation patch) — no diff-patch needed.
Recommended: take the per-op-record approach (avoids implementing JSON-Patch diff):
(defn ^:private apply-ops-in-memory
"Applies `ops` to `state`'s in-memory structured-data only, and
appends them to `:applied-ops` for the eventual derivation row."
[state ops]
(if (empty? ops)
state
(let [new-sd (json-patch/apply-patch (:structured-data state) ops)]
(-> state
(assoc :structured-data new-sd)
(assoc-in [:document :document/structured-data] new-sd)
(update :applied-ops (fnil into []) ops)))))
And persist-derivation! reads (:applied-ops state) instead of
computing a diff. Update both accordingly in your implementation.
Write test/com/getorcha/workers/ap/processors/engine_test.clj:
(ns com.getorcha.workers.ap.processors.engine-test
(:require [clojure.test :refer [deftest is testing use-fixtures]]
[com.getorcha.db.document-processor-run :as db.run]
[com.getorcha.db.sql :as db.sql]
[com.getorcha.test.fixtures :as fixtures]
[com.getorcha.test.notification-helpers :as helpers]
[com.getorcha.workers.ap.processors :as proc]
[com.getorcha.workers.ap.processors.engine :as engine]))
(use-fixtures :once fixtures/with-running-system)
(use-fixtures :each fixtures/with-db-rollback)
(defrecord ^:private CountingProcessor [id order-atom]
proc/IProcessor
(-id [_] id)
(-reads [_ _state] [])
(-writes [_ _state] [])
(-diagnostic [_ _state] nil)
(-modes [_] #{:ingestion})
(-always? [_] true)
(-compute [_ _ctx _state]
(swap! order-atom conj id)
{:result {:ran id} :stats nil})
(-apply-ops [_ _state _r] []))
(deftest phase-ordering-sequential-between-phases
(let [document-id (helpers/create-document! (helpers/create-legal-entity!))
ingestion-id (helpers/create-ingestion! document-id)
order (atom [])
ctx {:db-pool fixtures/*db*}
state {:mode :ingestion
:trigger-kind :ingestion
:ingestion-id ingestion-id
:document {:document/id document-id
:document/version 0
:document/structured-data {}}
:structured-data {}}
phase-1 [(->CountingProcessor :p1 order)]
phase-2 [(->CountingProcessor :p2 order)]]
(engine/run-processors! ctx state [phase-1 phase-2])
(is (= [:p1 :p2] @order))
(is (= 2 (db.run/count-runs fixtures/*db* document-id)))))
Run: clj -X:test:silent :nses '[com.getorcha.workers.ap.processors.engine-test]' 2>&1 | grep -E "FAIL|ERROR|Ran"
Expected: PASS.
git add src/com/getorcha/workers/ap/processors/engine.clj test/com/getorcha/workers/ap/processors/engine_test.clj
git commit -m "feat: processors engine — core run loop"
Files:
Modify: src/com/getorcha/workers/ap/processors/engine.clj
Modify: test/com/getorcha/workers/ap/processors/engine_test.clj
Step 1: Extend count-runs to accept :document-version filter
In src/com/getorcha/db/document_processor_run.clj, extend the kwarg destructuring + WHERE cond->:
(defn count-runs
"Counts runs for `document-id`, optionally filtered by `:processor-id`,
`:status`, and/or `:document-version`."
[db-pool document-id & {:keys [processor-id status document-version]}]
(let [{:keys [count]}
(db.sql/execute-one!
db-pool
{:select [[:%count.* :count]]
:from [:document-processor-run]
:where (cond-> [:and [:= :document-id document-id]]
processor-id (conj [:= :processor-id processor-id])
status (conj [:= :status (db.sql/->cast status :processor-run-status)])
document-version (conj [:= :document-version document-version]))})]
(or count 0)))
Update the existing test/com/getorcha/db/document_processor_run_test.clj (count-runs-test) to add a kwarg test covering :document-version:
(is (= 1 (run/count-runs fixtures/*db* document-id
:processor-id "matching"
:document-version 1)))
should-run? stub in engine.cljReplace A5's stub with the real implementation (uses kwarg-style per count-runs' signature):
(defn ^:private has-completed-run-at-version?
[db-pool document-id processor-id version]
(pos? (db.run/count-runs
db-pool document-id
:processor-id processor-id
:status :completed
:document-version version)))
(defn ^:private should-run?
"Returns true when a processor must execute. Always runs in
:ingestion mode. In :edit mode, runs iff `-always?`, or the
processor has no completed run at the current document version,
or any read pattern intersects changed-leaves."
[db-pool state processor]
(let [document-id (get-in state [:document :document/id])
version (get-in state [:document :document/version])]
(case (:mode state)
:ingestion true
:edit
(or (proc/-always? processor)
(not (has-completed-run-at-version?
db-pool document-id (name (proc/-id processor)) version))
(some #(reads/match-any? (proc/-reads processor state) %)
(:changed-leaves state))))))
Add to engine.clj's require block:
[com.getorcha.workers.ap.processors.reads :as reads]
Append to engine_test.clj:
(defrecord ^:private ReadsProcessor [id reads order-atom]
proc/IProcessor
(-id [_] id)
(-reads [_ _state] reads)
(-writes [_ _state] [])
(-diagnostic [_ _state] nil)
(-modes [_] #{:ingestion :edit})
(-always? [_] false)
(-compute [_ _ _]
(swap! order-atom conj id)
{:result nil :stats nil})
(-apply-ops [_ _ _] []))
(deftest edit-mode-skips-processors-whose-reads-miss-changed-leaves
(let [document-id (helpers/create-document! (helpers/create-legal-entity!))
identity-id (helpers/create-identity!)
history-id (:document-history/id
(com.getorcha.db.document-history/insert!
fixtures/*db*
{:document-id document-id :change-type :edit
:edited-by identity-id
:patch [{"op" "replace" "path" "/issuer/name" "value" "A"}]}))
order (atom [])
ctx {:db-pool fixtures/*db*}
state {:mode :edit :trigger-kind :edit
:history-id history-id :edited-by identity-id
:document {:document/id document-id :document/version 1
:document/structured-data {:document-type "invoice"}}
:structured-data {:document-type "invoice"}
:changed-leaves #{[:issuer :name]}}
runs [(->ReadsProcessor :on-issuer
[(com.getorcha.workers.ap.processors.reads/read-leaf :issuer :name)]
order)
(->ReadsProcessor :on-line-items
[(com.getorcha.workers.ap.processors.reads/read-leaf :line-items :* :description)]
order)]]
(engine/run-processors! ctx state [runs])
(is (= [:on-issuer] @order) "on-line-items should be skipped")))
Run: clj -X:test:silent :nses '[com.getorcha.workers.ap.processors.engine-test]' 2>&1 | grep -E "FAIL|ERROR|Ran"
Expected: PASS.
git add src/com/getorcha/workers/ap/processors/engine.clj src/com/getorcha/db/document_processor_run.clj test/com/getorcha/workers/ap/processors/engine_test.clj test/com/getorcha/db/document_processor_run_test.clj
git commit -m "feat: engine conditional filter (edit-mode reads intersection)"
Files:
Modify: src/com/getorcha/workers/ap/processors/engine.clj
Modify: test/com/getorcha/workers/ap/processors/engine_test.clj
Step 1: Replace the filter-ops-against-user-edits stub with the real implementation
Patch ops use STRING keys ({"op" "replace" "path" "..." "value" ...}) per the JSON-Patch convention used by json-patch/apply-patch and the edit handlers. Use (get op "path") to read path strings.
Replace A5's stub with:
(defn ^:private prefix-match?
"Returns true when any prefix of `clj-path` (including itself) is
in `user-edited-set`."
[user-edited-set clj-path]
(boolean (some user-edited-set
(reductions conj [] clj-path))))
(defn ^:private filter-ops-against-user-edits
[user-edited-set processor-id ops]
(reduce
(fn [kept op]
(let [pointer (get op "path")
clj (json-patch.path/pointer->clj-path pointer)]
(if (prefix-match? user-edited-set clj)
(do (log/info "Processor op blocked by prior user edit"
{:processor processor-id :path pointer})
kept)
(conj kept op))))
[]
ops))
The json-patch.path require was added in A5; ensure it's still there.
run-processor! already wires the filter in (from A5). run-processors! already precomputes user-edited-paths into ctx (from A5). Nothing further to wire.
Append to engine_test.clj:
(defrecord ^:private WriterProcessor [id ops]
proc/IProcessor
(-id [_] id)
(-reads [_ _state] [])
(-writes [_ _state] [])
(-diagnostic [_ _state] nil)
(-modes [_] #{:ingestion :edit})
(-always? [_] true)
(-compute [_ _ _] {:result nil :stats nil})
(-apply-ops [_ _ _] ops))
(deftest edit-mode-op-filter-drops-user-edited-paths
(let [document-id (helpers/create-document! (helpers/create-legal-entity!))
identity-id (helpers/create-identity!)
_ (com.getorcha.db.document-history/insert!
fixtures/*db*
{:document-id document-id :change-type :edit
:edited-by identity-id
:patch [{"op" "replace" "path" "/issuer/name" "value" "UserValue"}]})
state {:mode :edit :trigger-kind :edit
:edited-by identity-id
:document {:document/id document-id :document/version 1
:document/structured-data {:document-type "invoice"
:issuer {:name "UserValue"}}}
:structured-data {:document-type "invoice" :issuer {:name "UserValue"}}
:changed-leaves #{[:issuer :name]}}
p (->WriterProcessor :overreacher
[{"op" "replace" "path" "/issuer/name" "value" "ProcessorValue"}
{"op" "replace" "path" "/issuer/country" "value" "DE"}])]
(engine/run-processors! {:db-pool fixtures/*db*} state [[p]])
(let [row (db.sql/execute-one! fixtures/*db*
{:select [:structured-data]
:from [:document]
:where [:= :id document-id]})]
(is (= "UserValue" (get-in row [:document/structured-data :issuer :name]))
"user edit preserved")
(is (= "DE" (get-in row [:document/structured-data :issuer :country]))
"non-edited path accepts processor write"))))
Run: clj -X:test:silent :nses '[com.getorcha.workers.ap.processors.engine-test]' 2>&1 | grep -E "FAIL|ERROR|Ran"
Expected: PASS.
git add src/com/getorcha/workers/ap/processors/engine.clj test/com/getorcha/workers/ap/processors/engine_test.clj
git commit -m "feat: engine op filter — preserves user edits in edit mode"
Files:
Modify: src/com/getorcha/notifications.clj
Create: src/com/getorcha/notifications/processor.clj
Step 1: Create the processor notification template
Write src/com/getorcha/notifications/processor.clj:
(ns com.getorcha.notifications.processor
"Unified processor-failure notification payload. Replaces the
per-processor :matching/permanent-failure and
:reconciliation/failure kinds (see spec §8)."
(:require [com.getorcha.notifications :as notifications]))
(defn notify!
"Fire-and-forget admin notification for a processor failure.
`ctx` includes :db-pool, :aws, :notifications.
`trigger` is {:kind :edit|:ingestion|:manual :history-id _ :edited-by _}.
`doc` is the failing document row."
[{:keys [db-pool aws notifications] :as _ctx}
{:keys [processor-id trigger document error]}]
(try
(let [le-id (:document/legal-entity-id document)]
(when le-id
(notifications/notify-admins!
{:db-pool db-pool :aws aws :notifications notifications}
{:legal-entity/id le-id}
{:kind :processor/failure
:title (str "Processor " (name processor-id) " failed: "
(or (:document/file-original-name document)
(str (:document/id document))))
:body (str "Document: " (:document/id document) "\n"
"Processor: " (name processor-id) "\n"
"Trigger: " (name (:kind trigger))
(when (:history-id trigger)
(str " (history-id " (:history-id trigger) ")"))
(when (:edited-by trigger)
(str ", edited-by " (:edited-by trigger)))
"\nError: " error)
:data {:document-id (:document/id document)
:processor processor-id
:trigger trigger}})))
(catch Exception e
(clojure.tools.logging/warn e "Failed to send processor failure notification"
{:processor processor-id
:document (:document/id document)}))))
In engine.clj, update the catch block of run-processor!:
(catch Throwable t
(db.run/fail-run! db-pool run-id (.getMessage t))
(com.getorcha.notifications.processor/notify!
ctx
{:processor-id (proc/-id processor)
:trigger {:kind (:trigger-kind state)
:history-id (:history-id state)
:edited-by (:edited-by state)
:ingestion-id (:ingestion-id state)}
:document (:document state)
:error (.getMessage t)})
(log/error t "Processor failed"
{:processor (proc/-id processor)
:document (get-in state [:document :document/id])})
[])
clj-kondo --lint src/com/getorcha/notifications src/com/getorcha/workers/ap/processors
Expected: clean.
git add src/com/getorcha/notifications src/com/getorcha/workers/ap/processors/engine.clj
git commit -m "feat: unified :processor/failure notification"
For every existing post-processor, same pattern: create a new v2 IProcessor under processors/, port the logic, add reads/writes declarations, ops-based -apply-ops, keep the existing test fixtures intact (ported under processors/*_test.clj). The old post_process/* record still exists and still works — the migration is additive. The old post-process/run is rewritten in Phase D.
Note (schema alignment): All
-writesand-apply-opspaths in this phase target existing top-level LineItem / document fields — no schema changes are made as part of this plan. Earlier drafts of this plan referenced synthetic nesting (/line-items[id=X]/accounts/debit-accountetc.); those have been rewritten to match the actual schema insrc/com/getorcha/schema/*as of 2026-04-16.
Each task follows this template:
-compute from (-compute [_]) (closing over ingestion) to (-compute [_ ctx state]).-apply (structured-data → structured-data) to -apply-ops (state × result → [ops]).-id, -reads, -writes, -diagnostic, -modes, -always?.Files:
src/com/getorcha/workers/ap/processors/accounts.cljtest/com/getorcha/workers/ap/processors/accounts_test.cljReference source: src/com/getorcha/workers/ap/ingestion/post_process/accounts.clj. Copy its internal helpers (prompt building, LLM parsing, batched invocation) verbatim. Change only:
(defrecord AccountsMatcher [] (no fields — state passed to -compute).(->accounts) returning an instance.Write src/com/getorcha/workers/ap/processors/accounts.clj. Start by copying post_process/accounts.clj to the new path, then:
com.getorcha.workers.ap.processors.accounts.(:require [com.getorcha.workers.ap.ingestion.post-process.protocol :refer [IProcessor]]) with (:require [com.getorcha.workers.ap.processors :as proc] [com.getorcha.workers.ap.processors.reads :as reads]).-compute body but:(defrecord AccountsMatcher []
proc/IProcessor
(-id [_] :accounts)
(-reads [_ _state] [(reads/read-leaf :issuer :name)
(reads/read-leaf :issuer :vat-id)
(reads/read-leaf :issuer :country)
(reads/read-leaf :line-items :* :description)
(reads/read-leaf :line-items :* :amount)])
(-writes [_ _state] [(reads/read-leaf :line-items :* :debit-account)
(reads/read-leaf :line-items :* :credit-account)])
(-diagnostic [_ _state] nil)
(-modes [_] #{:ingestion :edit})
(-always? [_] false)
(-compute [_ ctx state]
;; body from old -compute, with `context` replaced by `ctx`
;; and `ingestion` replaced by `state`. :structured-data/:document/:legal-entity
;; keys are the same on the new state map.
;; <inline the existing -compute body here>
)
(-apply-ops [_ _state result]
(if (nil? result)
[]
;; Result shape: {:line-items [{:id X :debit-account N :credit-account M} ...]}
;; Turn each proposed account into two replace ops.
(mapcat (fn [{:keys [id debit-account credit-account]}]
(cond-> []
debit-account
(conj {"op" "replace"
"path" (str "/line-items[id=" id "]/debit-account")
"value" debit-account})
credit-account
(conj {"op" "replace"
"path" (str "/line-items[id=" id "]/credit-account")
"value" credit-account})))
(:line-items result)))))
(defn ->accounts [] (->AccountsMatcher))
The inline -compute body (from old -compute) reads context and ingestion. Replace context with ctx, replace ingestion with state. The old code references (:structured-data ingestion), (:document ingestion), (:legal-entity ingestion) — these keys still live on state. The old code also expects booking-csv to be passed in as a record field; under the new model the engine computes it once per run and threads it via state (engine assignment: state.booking-csv). Adjust the -compute body to read (:booking-csv state) instead of the old record field.
Copy test/com/getorcha/workers/ap/ingestion/post_process/accounts_test.clj to test/com/getorcha/workers/ap/processors/accounts_test.clj and update:
(->AccountsMatcher context ingestion booking-csv) with (assoc-in ctx [:booking-csv] csv) in state + (accounts/->accounts) as the processor instance.with-redefs.-reads includes the expected leaf patterns.(deftest accounts-reads-declared
(let [p (accounts/->accounts)]
(is (some #(= (reads/read-leaf :line-items :* :description) %) (proc/-reads p)))
(is (some #(= (reads/read-leaf :issuer :name) %) (proc/-reads p)))))
clj -X:test:silent :nses '[com.getorcha.workers.ap.processors.accounts-test]' 2>&1 | grep -E "FAIL|ERROR|Ran"
Expected: PASS.
git add src/com/getorcha/workers/ap/processors/accounts.clj test/com/getorcha/workers/ap/processors/accounts_test.clj
git commit -m "feat: migrate accounts processor to IProcessor v2"
Follow the same pattern as Task B1. (All -reads/-writes/-diagnostic
methods take state as their second argument per the v2 protocol.)
Files:
src/com/getorcha/workers/ap/processors/cost_center.cljtest/com/getorcha/workers/ap/processors/cost_center_test.cljDeclarations (from spec §12):
(-reads [_ _state] [(reads/read-leaf :issuer :name)
(reads/read-leaf :line-items :* :description)
(reads/read-leaf :line-items :* :amount)])
(-writes [_ _state] [(reads/read-leaf :line-items :* :cost-center)])
-apply-ops maps result's per-line-item cost-center into one replace op per line-item. Commit: git commit -m "feat: migrate cost-center processor to IProcessor v2".
Same pattern.
Files:
src/com/getorcha/workers/ap/processors/accruals.cljtest/com/getorcha/workers/ap/processors/accruals_test.cljDeclarations:
(-reads [_ _state] [(reads/read-leaf :invoice-date)
(reads/read-leaf :line-items :* :description)])
(-writes [_ _state] [(reads/read-leaf :line-items :* :accrual)])
Commit: git commit -m "feat: migrate accruals processor to IProcessor v2".
Files:
src/com/getorcha/workers/ap/processors/supplier.clj (houses BOTH SupplierMatcher and SupplierVerifier)test/com/getorcha/workers/ap/processors/supplier_test.cljDeclarations:
SupplierMatcher:
(-id [_] :supplier-matcher)
(-reads [_ _state] [(reads/read-leaf :issuer :name)
(reads/read-leaf :issuer :vat-id)
(reads/read-leaf :issuer :iban)])
(-writes [_ _state] [(reads/read-leaf :supplier-match)])
SupplierVerifier:
(-id [_] :supplier-verifier)
(-reads [_ _state] [(reads/read-leaf :issuer :name)
(reads/read-leaf :issuer :vat-id)
(reads/read-leaf :issuer :country)
(reads/read-leaf :issuer :address)])
(-writes [_ _state] [(reads/read-leaf :supplier-verification-id)])
->supplier-matcher and ->supplier-verifier, port tests.Commit: git commit -m "feat: migrate supplier processors to IProcessor v2".
TCA is the MULTI-SLICE processor: it owns :tax-issues (invoice-level)
and :line-items (per-line :vat-validation) diagnostic slices, plus
structured-data mutations for :service-category, per-line :bu-code,
and (ingestion-only) tax-id correction. See spec §13.2.
The previously-added tax-compliance/run-vat-validation shim (which
only extracted :vat-validation off line items after TCA wrote it
there) is deleted entirely; TCA writes :line-items diagnostic slice
directly.
Files:
src/com/getorcha/workers/ap/processors/tax_compliance.cljtest/com/getorcha/workers/ap/processors/tax_compliance_test.cljDeclarations:
(-id [_] :tax-compliance-analyzer)
(-reads [_ _state]
[(reads/read-leaf :issuer :country)
(reads/read-leaf :issuer :tax-id)
(reads/read-leaf :issuer :tax-id-type)
(reads/read-leaf :recipient :country)
(reads/read-leaf :recipient :tax-id-type)
(reads/read-leaf :shipping-country)
(reads/read-leaf :line-items :* :tax-rate)
(reads/read-leaf :line-items :* :description)
(reads/read-leaf :delivery-terms-raw)
(reads/read-leaf :incoterm-code)
(reads/read-leaf :compliance-statements :* :text)])
(-writes [_ _state]
[(reads/read-leaf :service-category)
(reads/read-leaf :line-items :* :bu-code)
(reads/read-leaf :issuer :tax-id) ;; :ingestion-only branch
(reads/read-leaf :issuer :tax-id-type)])
(-diagnostic [_ _state] [{:slice :tax-issues} {:slice :line-items}])
:ingestionIn the ported -compute, wrap the existing
(if (and no-vat? tax-id-warn?) … (if (…) …)) dispatch in
(case (:mode state) :ingestion <vision-or-standard> :edit <standard-only>).
Do NOT call the tax-id-correction-section path in :edit mode. Do
NOT fetch PDF bytes in :edit mode.
TCA's existing -compute reads :tax-id-format status from
structured-data.validation-results. Under the unified model, this
lives at state.diagnostics.validations.tax-id-format. Update the
lookup:
(let [tax-id-result (get-in state [:diagnostics :validations :tax-id-format])
tax-id-warn? (= "warning" (:status tax-id-result))]
...)
state.diagnostics is refreshed between phases by the engine (A5
refresh-diagnostics), so TCA (phase 2) sees validations' phase-1
write.
-compute resultThe existing LLM schema produces:
{:service-category {...}
:line-items [{:vat-validation {...} :bu-code {...}} ...]
:issues [{...} ...]
:tax-id-correction {...}}
TCA's -compute returns:
{:tax-issues <vector of TaxIssue maps from `:issues`>
:line-items <map keyed by line-item id: {id {:vat-validation {...}}}>
:service-category <unchanged>
:bu-codes <map keyed by line-item id: {id {...}}> ;; not a slice — used by -apply-ops
:tax-id-correction <unchanged>}
:tax-issues and :line-items are diagnostic-slice top-level keys
(matched against -diagnostic returns); engine writes them.
:service-category, :bu-codes, :tax-id-correction are
processor-internal and consumed by -apply-ops.
Restructuring happens inside -compute after LLM parse: transform
the raw LLM :line-items vector into a map keyed by the line-item's
:id (which TCA must carry through — the orig LLM output doesn't
include line-item ids, so merge against input line-items by order):
(let [raw-line-items (:line-items llm-result)
input-line-items (get-in state [:structured-data :line-items])
by-id (into {}
(map (fn [input output]
[(:id input)
(cond-> {}
(:vat-validation output)
(assoc :vat-validation (:vat-validation output)))])
input-line-items
(concat raw-line-items (repeat nil))))
bu-codes (into {}
(keep (fn [input output]
(when-let [bu (:bu-code output)]
[(:id input) bu]))
input-line-items
(concat raw-line-items (repeat nil))))]
{:result {:tax-issues (format-tax-issues (:issues llm-result))
:line-items by-id
:service-category (:service-category llm-result)
:bu-codes bu-codes
:tax-id-correction (:tax-id-correction llm-result)}
:stats llm-stats})
(-apply-ops [_ state result]
(when result
(let [mode (:mode state)
ops (cond-> []
(:service-category result)
(conj {"op" "replace" "path" "/service-category"
"value" (:service-category result)}))
bu-ops (mapv (fn [[id bu]]
{"op" "replace"
"path" (str "/line-items[id=" id "]/bu-code")
"value" bu})
(:bu-codes result))
corr (:tax-id-correction result)
corr-ops (when (and (= mode :ingestion)
corr
(= "corrected" (:status corr))
(:tax-id corr))
[{"op" "replace" "path" "/issuer/tax-id"
"value" (:tax-id corr)}
{"op" "replace" "path" "/issuer/tax-id-type"
"value" (:tax-id-type corr)}])]
(into (into ops bu-ops) corr-ops))))
Commit: git commit -m "feat: migrate tax-compliance-analyzer to IProcessor v2 (multi-slice)".
Files:
src/com/getorcha/workers/ap/processors/financial_validation.cljtest/com/getorcha/workers/ap/processors/financial_validation_test.cljThis processor owns :validations.financial-math sub-path for
invoice documents (the only doc type with a financial-math check).
Its -compute:
validation/check-financial-math (pure
function, stays in ingestion/validation.clj).:status != "pass", calls the existing LLM resolver to refine
the uncertainty.{:status ... :details ...} — the final resolved state of
the financial-math sub-path.Declarations:
(-id [_] :financial-validation-resolver)
(-reads [_ _state] [(reads/read-leaf :subtotal)
(reads/read-leaf :total)
(reads/read-leaf :tax-amount)
(reads/read-leaf :line-items :* :amount)
(reads/read-leaf :line-items :* :quantity)
(reads/read-leaf :line-items :* :unit-price)])
(-writes [_ _state] []) ;; writes only to the diagnostic slice
(-diagnostic [_ state]
(when (= "invoice" (get-in state [:document :document/structured-data :document-type]))
{:slice :validations :sub-path [:financial-math]}))
(-modes [_] #{:ingestion :edit})
-diagnostic returns nil for non-invoice document types, so the
engine skips the slice write. This is the per-type dispatch pattern.
check-financial-math invocation into -compute(-compute [_ ctx state]
(let [sd (:structured-data state)
deterministic (validation/check-financial-math sd)]
(if (= "pass" (:status deterministic))
{:result deterministic :stats nil}
;; Call existing LLM refinement — returns {:result refined :stats llm-stats}
(run-llm-resolver ctx state deterministic))))
(run-llm-resolver is the existing resolver logic from the old
FinancialValidationResolver defrecord; port it as a private fn.)
Step 2: -apply-ops returns [] — this processor writes only the diagnostic sub-path.
Step 3: Port tests, commit
Commit: git commit -m "feat: migrate financial-validation-resolver to IProcessor v2".
Files:
src/com/getorcha/workers/ap/processors/uncertain_validations.cljtest/com/getorcha/workers/ap/processors/uncertain_validations_test.cljOwns :validations.required-fields, :validations.date-reasonableness,
:validations.recipient-identity sub-paths for invoice documents.
Uses the multi-sub-path -diagnostic form (engine's write-diagnostic!
handles this — see A5). -compute's result has three top-level keys
matching the sub-paths' first elements:
{:result {:required-fields {:status "pass"}
:date-reasonableness {:status "warning" ...}
:recipient-identity {:status "pass"}}
:stats nil}
Engine iterates :sub-paths, writing each with value
(get result (first sub-path)).
Declarations:
(-id [_] :uncertain-validations-resolver)
(-reads [_ _state] [(reads/read-leaf :issuer :name)
(reads/read-leaf :issuer :address)
(reads/read-leaf :recipient :name)
(reads/read-leaf :recipient :address)
(reads/read-leaf :invoice-date)
(reads/read-leaf :invoice-number)])
(-writes [_ _state] [])
(-diagnostic [_ state]
(when (= "invoice" (get-in state [:document :document/structured-data :document-type]))
{:slice :validations
:sub-paths [[:required-fields]
[:date-reasonameness]
[:recipient-identity]]}))
The multi-sub-path shape was already wired into the engine in Task A5; no engine change needed here.
Pull logic from old UncertainValidationsResolver defrecord. The old
-compute starts from structured-data.validation-results to find
uncertain checks. Under the new model, read those deterministic
statuses from state.structured-data using the pure check fns:
(-compute [_ ctx state]
(let [sd (:structured-data state)
le (:legal-entity state)
det-required (validation/check-required-fields sd)
det-dates (validation/check-date-reasonableness sd)
det-identity (validation/check-recipient-identity sd le)
;; For each uncertain check, run the existing LLM refinement; else use det as-is.
result {:required-fields (if (= "uncertain" (:status det-required))
(resolve-required-fields ctx state det-required)
det-required)
:date-reasonableness (if (= "uncertain" (:status det-dates))
(resolve-dates ctx state det-dates)
det-dates)
:recipient-identity (if (= "uncertain" (:status det-identity))
(resolve-identity ctx state det-identity)
det-identity)}]
{:result result :stats (merge-llm-stats ...)}))
(Port the existing resolve-* helpers from the old
uncertain_validations.clj into private fns in the new ns.)
Step 3: -apply-ops returns [].
Step 4: Port tests, commit
Commit: git commit -m "feat: migrate uncertain-validations-resolver to IProcessor v2".
Files:
src/com/getorcha/workers/ap/processors/fraud.cljtest/com/getorcha/workers/ap/processors/fraud_test.cljDeclarations:
(-id [_] :fraud-detector)
(-reads [_ _state] [(reads/read-leaf :issuer :name)
(reads/read-leaf :issuer :country)
(reads/read-leaf :issuer :vat-id)
(reads/read-leaf :issuer :tax-id)
(reads/read-leaf :issuer :iban)
(reads/read-leaf :issuer :account-number)
(reads/read-leaf :issuer :sort-code)
(reads/read-leaf :issuer :routing-number)
(reads/read-leaf :issuer :bsb)
(reads/read-leaf :recipient :country)
(reads/read-leaf :invoice-date)
(reads/read-leaf :line-items :* :description)])
(-writes [_ _state] [])
(-diagnostic [_ _state] {:slice :fraud-flags})
-compute and expose llm-result->flags (was private). Result of -compute is the final fraud-flags vector (after applying llm-result->flags):(-compute [_ ctx state]
(let [deterministic (run-deterministic-checks (:db-pool ctx) state)
llm-out (run-llm-fraud-analysis ctx state)
llm-flags (when (:result llm-out) (llm-result->flags (:result llm-out)))]
{:result (into deterministic (or llm-flags []))
:stats (:stats llm-out)}))
-apply-ops returns [] — all fraud output is in the diagnostic slice.
Port tests, commit
Commit: git commit -m "feat: migrate fraud-detector to IProcessor v2".
Files:
src/com/getorcha/workers/ap/processors/validations.cljtest/com/getorcha/workers/ap/processors/validations_test.cljsrc/com/getorcha/workers/ap/ingestion/validation.clj (remove ^:private from check-tax-id-format, check-iban, check-issuer-country, check-recipient-country, check-large-document-summary-only).The validations processor owns the deterministic :validations
sub-paths. Its ownership varies per doc type (see spec §12.1):
:tax-id-format, :iban-format,
:issuer-country, :recipient-country, :large-document-summary-only
(FVR and UVR own the LLM-refined sub-paths).:required-fields (the whole :validations slice).:signature-presence, :required-fields,
:date-validity, :party-identification, :financial-consistency,
:termination-clause.:required-fields.Always runs in edit mode (-always? true).
In src/com/getorcha/workers/ap/ingestion/validation.clj, remove
^:private from check-tax-id-format, check-iban,
check-issuer-country, check-recipient-country,
check-large-document-summary-only. (The others — check-financial-math,
check-required-fields, check-date-reasonableness,
check-recipient-identity, and all check-contract-* helpers — are
already public.)
(ns com.getorcha.workers.ap.processors.validations
(:require [com.getorcha.workers.ap.ingestion.validation :as validation]
[com.getorcha.workers.ap.processors :as proc]
[com.getorcha.workers.ap.processors.reads :as reads]))
(defn ^:private compute-invoice
[sd legal-entity]
{:tax-id-format (validation/check-tax-id-format sd)
:iban-format (validation/check-iban sd)
:issuer-country (validation/check-issuer-country sd)
:recipient-country (validation/check-recipient-country sd)
:large-document-summary-only
(when (:summary-page-range sd)
(validation/check-large-document-summary-only sd))})
(defn ^:private compute-contract
[sd]
{:signature-presence (validation/check-contract-signature-presence sd)
:required-fields (validation/check-contract-required-fields sd)
:date-validity (validation/check-contract-date-validity sd)
:party-identification (validation/check-contract-party-identification sd)
:financial-consistency (validation/check-contract-financial-consistency sd)
:termination-clause (validation/check-contract-termination-clause sd)})
(defn ^:private compute-purchase-order
[sd]
{:required-fields
(if (and (seq (:po-number sd)) (seq (get-in sd [:supplier :name])))
{:status "pass"}
{:status "error"
:message "Missing required fields: po-number and/or supplier name"})})
(defn ^:private compute-grn
[sd]
{:required-fields
(if (or (seq (:grn-number sd)) (seq (:delivery-note-numbers sd)))
{:status "pass"}
{:status "error"
:message "Missing required field: grn-number or delivery-note-numbers"})})
(def ^:private invoice-sub-paths
[[:tax-id-format] [:iban-format] [:issuer-country]
[:recipient-country] [:large-document-summary-only]])
(def ^:private contract-sub-paths
[[:signature-presence] [:required-fields] [:date-validity]
[:party-identification] [:financial-consistency] [:termination-clause]])
(defrecord Validations []
proc/IProcessor
(-id [_] :validations)
(-reads [_ _state] [])
(-writes [_ _state] [])
(-diagnostic [_ state]
(let [doc-type (get-in state [:document :document/structured-data :document-type])]
(case doc-type
"invoice" {:slice :validations :sub-paths invoice-sub-paths}
"contract" {:slice :validations :sub-paths contract-sub-paths}
"purchase-order" {:slice :validations :sub-paths [[:required-fields]]}
"goods-received-note" {:slice :validations :sub-paths [[:required-fields]]}
nil)))
(-modes [_] #{:ingestion :edit})
(-always? [_] true)
(-compute [_ _ctx state]
(let [sd (:structured-data state)
doc-type (:document-type sd)
le (:legal-entity state)]
{:result (case doc-type
"invoice" (compute-invoice sd le)
"contract" (compute-contract sd)
"purchase-order" (compute-purchase-order sd)
"goods-received-note" (compute-grn sd)
{})
:stats nil}))
(-apply-ops [_ _state _result] []))
(defn ->validations [] (->Validations))
(defn ->validations [] (->Validations))
Expose check-* fns by removing ^:private where needed and adding them to the public API of ingestion/validation.clj.
Port tests, commit
Commit: git commit -m "feat: validations processor (always-run deterministic checks)".
Relocate matching files under processors/matching/* and build the unified matching processor that wraps match-document! + reconcile-cluster!. Rename publish-document-ready!.
Files:
Moved: src/com/getorcha/workers/ap/matching/*.clj → src/com/getorcha/workers/ap/processors/matching/*.clj
Moved: test/com/getorcha/workers/ap/matching/*.clj → test/com/getorcha/workers/ap/processors/matching/*.clj
Step 1: Move files with git mv
git mv src/com/getorcha/workers/ap/matching src/com/getorcha/workers/ap/processors/matching
git mv test/com/getorcha/workers/ap/matching test/com/getorcha/workers/ap/processors/matching
Global replace in every moved file:
com.getorcha.workers.ap.matching.core → com.getorcha.workers.ap.processors.matching.core
com.getorcha.workers.ap.matching.candidates → com.getorcha.workers.ap.processors.matching.candidates
com.getorcha.workers.ap.matching.evidence → com.getorcha.workers.ap.processors.matching.evidence
com.getorcha.workers.ap.matching.llm-decision → com.getorcha.workers.ap.processors.matching.llm-decision
com.getorcha.workers.ap.matching.normalize → com.getorcha.workers.ap.processors.matching.normalize
com.getorcha.workers.ap.matching.reconciliation → com.getorcha.workers.ap.processors.matching.reconciliation
com.getorcha.workers.ap.matching.searchable-text → com.getorcha.workers.ap.processors.matching.searchable-text
com.getorcha.workers.ap.matching.worker → com.getorcha.workers.ap.processors.matching.worker
Use sed-style global replace, but via Edit tool per file, to be safe.
Grep for com.getorcha.workers.ap.matching outside the moved dir:
grep -rln "com.getorcha.workers.ap.matching" src test resources dev --include="*.clj" --include="*.edn"
For each match outside the new dir, update the ns to processors.matching.*. Known callers:
src/com/getorcha/workers/ap/ingestion.clj (will be replaced in Phase D; update the ns for now to make the move compile cleanly)
resources/com/getorcha/config.edn (Integrant keys)
src/com/getorcha/system.clj (if it references matching keys)
Step 4: Lint + run matching tests to verify nothing broke
clj-kondo --lint src/com/getorcha/workers/ap/processors/matching test
clj -X:test:silent :nses '[com.getorcha.workers.ap.processors.matching.worker-test]' 2>&1 | grep -E "FAIL|ERROR|Ran"
Expected: clean lint, tests pass.
git add -A
git commit -m "refactor: move matching namespaces under processors.matching"
Files:
Create: src/com/getorcha/workers/ap/processors/matching.clj
Create: test/com/getorcha/workers/ap/processors/matching_test.clj
Step 1: Write the processor wrapper
Write src/com/getorcha/workers/ap/processors/matching.clj:
(ns com.getorcha.workers.ap.processors.matching
"The matching IProcessor. Wraps match-document! + reconcile-cluster!
(reconciliation is a sub-step of matching under the unified model —
see spec §12.2)."
(:require [clojure.tools.logging :as log]
[com.getorcha.db.document-matching :as db.matching]
[com.getorcha.db.sql :as db.sql]
[com.getorcha.workers.ap.processors :as proc]
[com.getorcha.workers.ap.processors.matching.core :as matching.core]
[com.getorcha.workers.ap.processors.matching.reconciliation :as reconciliation]
[com.getorcha.workers.ap.processors.reads :as reads]))
(defn ^:private build-matching-summary
[db-pool document-id]
(let [match-rows (db.matching/get-matches-for-document db-pool document-id)]
{:matches
(mapv (fn [{:ap-document-match/keys [document-a-id document-b-id
blended-score llm-confidence match-method]}]
(cond-> {:document-id (str (if (= document-a-id document-id)
document-b-id
document-a-id))
:blended-score (double blended-score)
:match-method match-method}
llm-confidence (assoc :llm-confidence llm-confidence)))
match-rows)}))
(defn ^:private get-cluster-id
[db document-id]
(:document/cluster-id
(db.sql/execute-one! db
{:select [:cluster-id]
:from [:document]
:where [:= :id document-id]})))
(def ^:private reads-by-doc-type
{"invoice"
[(reads/read-leaf :issuer :name)
(reads/read-leaf :issuer :vat-id)
(reads/read-leaf :issuer :iban)
(reads/read-leaf :invoice-number)
(reads/read-leaf :total)
(reads/read-leaf :currency)
(reads/read-leaf :line-items :* :description)
(reads/read-leaf :line-items :* :quantity)
(reads/read-leaf :line-items :* :unit)
(reads/read-leaf :po-references :*)
(reads/read-leaf :gr-references :*)
(reads/read-leaf :service-period :start)
(reads/read-leaf :service-period :end)]
"purchase-order"
[(reads/read-leaf :supplier :name)
(reads/read-leaf :supplier :vat-id)
(reads/read-leaf :po-number)
(reads/read-leaf :total-value)
(reads/read-leaf :currency)
(reads/read-leaf :line-items :* :description)
(reads/read-leaf :line-items :* :quantity)
(reads/read-leaf :line-items :* :unit)
(reads/read-leaf :contract-references :*)
(reads/read-leaf :requisition-numbers :*)]
"contract"
[(reads/read-leaf :counterparty :name)
(reads/read-leaf :counterparty :tax-id)
(reads/read-leaf :contract-number)
(reads/read-leaf :total-value)
(reads/read-leaf :currency)
(reads/read-leaf :deliverables :*)]
"goods-received-note"
[(reads/read-leaf :supplier :name)
(reads/read-leaf :supplier :vat-id)
(reads/read-leaf :grn-number)
(reads/read-leaf :line-items :* :description)
(reads/read-leaf :line-items :* :quantity)
(reads/read-leaf :line-items :* :unit)
(reads/read-leaf :po-references :*)
(reads/read-leaf :delivery-note-numbers :*)]})
(defrecord Matching []
proc/IProcessor
(-id [_] :matching)
(-reads [_ state]
(let [doc-type (get-in state [:document :document/structured-data :document-type])]
(get reads-by-doc-type doc-type [])))
(-writes [_ _state] []) ; side effects on document_match / cluster; no structured-data ops
(-diagnostic [_ _state] {:slice :matching})
;; NOTE: we declare only :matching as the engine-written slice.
;; reconcile-cluster! writes the per-doc :reconciliation slice for
;; EACH cluster member itself (side effect outside the engine —
;; spec §12.3). Matching's -compute result therefore omits
;; reconciliation; the slice is already populated once the call
;; returns.
(-modes [_] #{:ingestion :edit})
(-always? [_] false)
(-compute [_ {:keys [db-pool llm-config search-config] :as _ctx} state]
(let [doc (:document state)
document-id (:document/id doc)
cluster-before (:document/cluster-id doc)]
;; Side effect: persists matches + cluster assignment
(matching.core/match-document! db-pool search-config (:matching llm-config) doc)
(let [summary (build-matching-summary db-pool document-id)
cluster-after (get-cluster-id db-pool document-id)
affected (cond-> #{}
cluster-before (conj cluster-before)
cluster-after (conj cluster-after))]
(doseq [cid affected]
(try
(reconciliation/reconcile-cluster! db-pool
(:matching llm-config)
cid
(:document/legal-entity-id doc))
(catch Exception e
(when-not (= :com.getorcha.workers.ap.processors.matching.reconciliation/insufficient-documents
(:kind (ex-data e)))
(log/warn e "Reconciliation failed inside matching processor"
{:cluster-id cid :document-id document-id})))))
(db.matching/set-matching-status! db-pool document-id {:status "succeeded"})
{:result summary :stats nil})))
(-apply-ops [_ _state _result] []))
(defn ->matching [] (->Matching))
Write test/com/getorcha/workers/ap/processors/matching_test.clj:
(ns com.getorcha.workers.ap.processors.matching-test
(:require [clojure.test :refer [deftest is testing]]
[com.getorcha.workers.ap.processors :as proc]
[com.getorcha.workers.ap.processors.matching :as matching]
[com.getorcha.workers.ap.processors.reads :as reads]))
(deftest matching-declarations
(let [p (matching/->matching)
state {:document {:document/structured-data {:document-type "invoice"}}}]
(is (= :matching (proc/-id p)))
(is (= {:slice :matching} (proc/-diagnostic p state)))
(is (false? (proc/-always? p)))
(is (some #(= (reads/read-leaf :issuer :name) %) (proc/-reads p state)))))
(deftest matching-reads-dispatch-on-doc-type
(let [p (matching/->matching)]
(is (some #(= (reads/read-leaf :counterparty :name) %)
(proc/-reads p {:document {:document/structured-data {:document-type "contract"}}})))
(is (some #(= (reads/read-leaf :po-number) %)
(proc/-reads p {:document {:document/structured-data {:document-type "purchase-order"}}})))))
Keep existing integration tests (the ones in the moved matching/*_test.clj files) untouched; they exercise match-document! + reconcile-cluster! directly.
git add src/com/getorcha/workers/ap/processors/matching.clj test/com/getorcha/workers/ap/processors/matching_test.clj
git commit -m "feat: matching IProcessor (absorbs reconciliation)"
publish-document-ready! and move to matching/queue.cljFiles:
Create: src/com/getorcha/workers/ap/processors/matching/queue.clj
Modify: src/com/getorcha/workers/ap/ingestion.clj
Step 1: Extract to matching/queue.clj
Write src/com/getorcha/workers/ap/processors/matching/queue.clj:
(ns com.getorcha.workers.ap.processors.matching.queue
"SQS enqueue helper for pending matching runs. Called at the tail
of ingestion to hand off from the ingestion worker to the matching
worker."
(:require [clojure.tools.logging :as log]
[com.getorcha.aws :as aws]
[com.getorcha.db.document-matching :as db.matching])
(:import (software.amazon.awssdk.services.sqs SqsClient)))
(defn enqueue!
"Sets document's matching_status to `pending` and publishes the
document id to the matching SQS queue."
[{:keys [^SqsClient sqs-client aws db-pool] :as _ctx} document-id]
(when-let [queue-url (get-in aws [:queue-urls :matching])]
(log/info "Enqueuing document for matching" {:document-id document-id})
(db.matching/set-matching-status! db-pool document-id {:status "pending"})
(aws/send-message! sqs-client queue-url (str document-id))))
In src/com/getorcha/workers/ap/ingestion.clj, replace the publish-document-ready! defn with a require alias and update the call site at line 1031:
Remove the old defn (lines ~582-589). Add to require block:
[com.getorcha.workers.ap.processors.matching.queue :as matching.queue]
Change the call (currently (publish-document-ready! context (:document/id document))) to:
(matching.queue/enqueue! context (:document/id document))
clj-kondo --lint src/com/getorcha/workers/ap src/com/getorcha/workers/ap/ingestion.clj
clj -X:test:silent :nses '[com.getorcha.workers.ap.ingestion-test]' 2>&1 | grep -E "FAIL|ERROR|Ran"
Expected: pass.
git add src/com/getorcha/workers/ap/processors/matching/queue.clj src/com/getorcha/workers/ap/ingestion.clj
git commit -m "refactor: rename publish-document-ready! to matching.queue/enqueue!"
Replace the three legacy orchestrators with calls to run-processors!.
Files:
src/com/getorcha/workers/ap/ingestion.cljsrc/com/getorcha/workers/ap/ingestion/post_process.cljsrc/com/getorcha/workers/ap/ingestion/post_process/*.cljPipeline change to be aware of: today ingestion runs two separate
stages — with-validations (deterministic checks, writes
:validation-results on structured-data) then post-process/run
(nine processors that can mutate structured-data). Both stages
collapse into ONE engine call in this task. The new pipeline is:
transcribe → classify → extract →
run-processors! [validations] [post-procs…] [fraud] →
complete
Phase 1 = validations (always-run, deterministic). Phase 2 = bulk of
post-processors. Phase 3 = fraud (sees phase-2 corrections). The old
validate multimethod is no longer called; its contents distribute
across: validations (tax-id-format, iban-format, country checks,
large-document; plus the contract/PO/GRN checks),
financial-validation-resolver (invoice financial-math), and
uncertain-validations-resolver (invoice required-fields,
date-reasonableness, recipient-identity). Pure check-* functions
stay in ingestion/validation.clj — only the validate multimethod
and with-validations wrapper go away. There is no separate
vat-validation processor — tax-compliance-analyzer writes the
:line-items diagnostic slice directly (spec §13.2).
In ingestion.clj, add require imports:
[com.getorcha.workers.ap.processors.accounts :as accounts]
[com.getorcha.workers.ap.processors.accruals :as accruals]
[com.getorcha.workers.ap.processors.cost-center :as cost-center]
[com.getorcha.workers.ap.processors.engine :as engine]
[com.getorcha.workers.ap.processors.financial-validation :as financial-validation]
[com.getorcha.workers.ap.processors.fraud :as fraud]
[com.getorcha.workers.ap.processors.supplier :as supplier]
[com.getorcha.workers.ap.processors.tax-compliance :as tax-compliance]
[com.getorcha.workers.ap.processors.uncertain-validations :as uncertain-validations]
[com.getorcha.workers.ap.processors.validations :as validations]
with-validations + post-process! with run-processors!Find the section that calls (post-process/run context ingestion) and the earlier with-validations (validates + post-processes). Replace with:
(defn ^:private run-diagnostics!
[context ingestion]
(let [state (-> ingestion
(assoc :mode :ingestion)
(assoc :trigger-kind :ingestion)
(assoc :ingestion-id (:id ingestion)))]
(engine/run-processors!
context
state
[[(validations/->validations)]
[(accounts/->accounts)
(cost-center/->cost-center)
(accruals/->accruals)
(supplier/->supplier-matcher)
(supplier/->supplier-verifier)
(tax-compliance/->tax-compliance-analyzer)
(financial-validation/->financial-validation-resolver)
(uncertain-validations/->uncertain-validations-resolver)]
[(fraud/->fraud-detector)]])))
Delete with-validations and post-process! functions.
complete-ingestion! for the new state shapeThe old code read structured-data keys like :validation-results, :fraud-flags, etc. Those keys no longer exist on structured-data under the new model — they're in diagnostics via per-processor slice writes. Remove the build-diagnostics + stripped-data dance; the diagnostics slice has already been populated by processors during run-diagnostics!.
Before:
(let [vat-validations (tax-compliance/run-vat-validation structured-data)
diagnostics (build-diagnostics {:validation-results ...
:fraud-flags ...
:tax-issues ...
:vat-validations vat-validations})
stripped-data (-> structured-data (dissoc ...) (update :line-items ...))
needs-review? (compute-needs-human-review diagnostics)]
(db.sql/execute-one! tx
{:update :document
:set {:structured-data [:lift stripped-data]
:diagnostics [:cast (json/generate-string diagnostics) :jsonb]
...}})
...)
After:
;; structured-data is already stripped (no :validation-results / :fraud-flags /
;; :tax-issues / :vat-validation on line items) because processors never wrote
;; them there. diagnostics was written slice-by-slice during run-diagnostics!.
;; Only the document row's structured-data, type, version, updated-at need
;; updating here.
(db.sql/execute-one!
tx
{:update :document
:set {:structured-data [:lift structured-data]
:type (db.sql/->cast (:document-type structured-data) :document-type)
:version [:raw "version + 1"]
:updated-at [:now]}
:where [:= :id document-id]})
document.needs_human_review is recomputed inside update-diagnostic! after each slice write, so no explicit recomputation here.
rm src/com/getorcha/workers/ap/ingestion/post_process.clj
rm -rf src/com/getorcha/workers/ap/ingestion/post_process
rm -rf test/com/getorcha/workers/ap/ingestion/post_process
validate multimethod from ingestion/validation.cljThe processors (Task B6/B7/B9) call the pure check-* functions
directly and own their sub-paths. The validate multimethod (and the
annotate-line-items helper if it was only used by with-validations)
are orphaned.
In src/com/getorcha/workers/ap/ingestion/validation.clj:
(defmulti validate ...) and all (defmethod validate ...) bodies.check-* function intact (they're still called by the
new processors).^:private on check-* functions referenced from
outside the ns (grep shows which ones — the processors need
check-tax-id-format, check-iban, check-issuer-country,
check-recipient-country, check-large-document-summary-only,
check-financial-math, check-required-fields,
check-date-reasonableness, check-recipient-identity,
check-contract-signature-presence, and the contract-* helpers).In src/com/getorcha/workers/ap/ingestion.clj, delete the
with-validations private fn (it called validation/validate on
every ingestion — the new pipeline runs validations processor as
phase 1 instead).
clj-kondo --lint src/com/getorcha/workers/ap/ingestion.clj
clj -X:test:silent :nses '[com.getorcha.workers.ap.ingestion-test]' 2>&1 | grep -E "FAIL|ERROR|Ran"
Expected: pass.
git add -A
git commit -m "refactor: ingestion uses unified processors engine"
Files:
Modify: src/com/getorcha/workers/ap/processors/matching/worker.clj
Step 1: Replace process-document! body
The current body does: fetch doc → set matching-status in-progress → insert run row → call match-document! → build summary → update-slice! → complete-run! → reconcile-cluster! per affected → set matching-status succeeded → notify on fail.
Under the unified engine, most of that is absorbed. New body:
(defn ^:private process-document!
[{:keys [db-pool] :as context} document-id]
(let [doc (fetch-document db-pool document-id)]
(when (nil? doc)
(throw (ex-info "Document not found for matching"
{:document-id document-id :kind ::document-not-found})))
(when (nil? (:document/structured-data doc))
(throw (ex-info "Document missing structured data, skipping matching"
{:document-id document-id :kind ::missing-structured-data})))
(mdc/put! :legal-entity-id (:document/legal-entity-id doc))
(xray/add-annotation! "legal_entity_id" (str (:document/legal-entity-id doc)))
(if-not (candidates/matchable-type? (:document/type doc))
(do
(log/info "Document type does not participate in matching, marking skipped"
{:document-id document-id :document-type (:document/type doc)})
(db.matching/set-matching-status! db-pool document-id {:status "skipped"}))
(let [ingestion-id (:ap-ingestion/id
(db.sql/execute-one! db-pool
{:select [:id]
:from [:ap-ingestion]
:where [:= :document-id document-id]
:order-by [[:completed-at :desc :nulls-last]
[:created-at :desc]]
:limit 1}))
state {:mode :ingestion
:trigger-kind :ingestion
:ingestion-id ingestion-id
:document doc
:structured-data (:document/structured-data doc)}]
(engine/run-processors! context state [[(matching/->matching)]])))))
Remove notify-failure! and notify-reconciliation-failure! — the engine's unified notification handles both.
[com.getorcha.workers.ap.processors.engine :as engine]
[com.getorcha.workers.ap.processors.matching :as matching]
Remove now-dead requires (for reconciliation, db.diagnostics, db.run, etc.) if no other code in this ns uses them.
clj-kondo --lint src/com/getorcha/workers/ap/processors/matching/worker.clj
clj -X:test:silent :nses '[com.getorcha.workers.ap.processors.matching.worker-test]' 2>&1 | grep -E "FAIL|ERROR|Ran"
Expected: pass.
git add src/com/getorcha/workers/ap/processors/matching/worker.clj
git commit -m "refactor: matching worker uses unified processors engine"
Files:
Modify: src/com/getorcha/workers/diagnostics_recompute/orchestrator.clj
Modify: src/com/getorcha/workers/diagnostics_recompute.clj
Modify: resources/com/getorcha/config.edn
Step 1: Replace orchestrator with engine call
Rewrite src/com/getorcha/workers/diagnostics_recompute/orchestrator.clj:
(ns com.getorcha.workers.diagnostics-recompute.orchestrator
"Edit-triggered recompute entry point: builds an edit-mode state
map and invokes the processors engine."
(:require [com.getorcha.db.document-history :as document-history]
[com.getorcha.db.sql :as db.sql]
[com.getorcha.json-patch.path :as json-patch.path]
[com.getorcha.workers.ap.processors.accounts :as accounts]
[com.getorcha.workers.ap.processors.accruals :as accruals]
[com.getorcha.workers.ap.processors.cost-center :as cost-center]
[com.getorcha.workers.ap.processors.engine :as engine]
[com.getorcha.workers.ap.processors.financial-validation :as financial-validation]
[com.getorcha.workers.ap.processors.fraud :as fraud]
[com.getorcha.workers.ap.processors.matching :as matching]
[com.getorcha.workers.ap.processors.reads :as reads]
[com.getorcha.workers.ap.processors.supplier :as supplier]
[com.getorcha.workers.ap.processors.tax-compliance :as tax-compliance]
[com.getorcha.workers.ap.processors.uncertain-validations :as uncertain-validations]
[com.getorcha.workers.ap.processors.validations :as validations]))
(defn ^:private fetch-document
[db-pool document-id]
(db.sql/execute-one! db-pool
{:select [:*]
:from [:document]
:where [:= :id document-id]}))
(defn ^:private fetch-history-row
[db-pool history-id]
(db.sql/execute-one! db-pool
{:select [:patch :edited-by]
:from [:document-history]
:where [:= :id history-id]}))
(defn ^:private compute-changed-leaves
"From a patch, produce the set of clj-paths the edit touches.
Non-leaf ops are expanded to virtual leaves via reads/expand-op-to-leaves."
[patch document-type pre-patch-sd]
(into #{}
(mapcat #(reads/expand-op-to-leaves % pre-patch-sd {:document-type document-type}))
patch))
(defn recompute-all!
"Top-level entry-point invoked by the SQS consumer. Builds
edit-mode state and calls the processors engine."
[ctx {:keys [document-id history-id document-version] :as _params}]
(let [db-pool (:db-pool ctx)
doc (fetch-document db-pool document-id)
hist (fetch-history-row db-pool history-id)
document-type (get-in doc [:document/structured-data :document-type])
changed (compute-changed-leaves
(:document-history/patch hist)
document-type
(:document/structured-data doc))
state {:mode :edit
:trigger-kind :edit
:history-id history-id
:edited-by (:document-history/edited-by hist)
:document doc
:structured-data (:document/structured-data doc)
:changed-leaves changed}]
(engine/run-processors!
ctx
state
[[(validations/->validations)]
[(tax-compliance/->tax-compliance-analyzer)
(fraud/->fraud-detector)
(matching/->matching)
(accounts/->accounts)
(cost-center/->cost-center)
(accruals/->accruals)
(supplier/->supplier-matcher)
(supplier/->supplier-verifier)
(financial-validation/->financial-validation-resolver)
(uncertain-validations/->uncertain-validations-resolver)]])
:done))
In src/com/getorcha/workers/diagnostics_recompute.clj, the ::consumer init-key currently accepts :aws and :db-pool. Expand to also accept :llm-config, :search-config, :notifications. Update the ctx passed into handle-message:
(defmethod ig/init-key ::consumer
[_ {:keys [aws db-pool llm-config search-config notifications
max-queue-messages wait-time-seconds] :as _config}]
(let [executor (Executors/newVirtualThreadPerTaskExecutor)
polling? (atom true)
^SqsClient sqs (get-in aws [:clients :sqs])
queue-url (get-in aws [:queue-urls :diagnostics-recompute])
ctx {:aws aws :db-pool db-pool :llm-config llm-config
:search-config search-config :notifications notifications
:sqs-client sqs}]
...))
In resources/com/getorcha/config.edn, the diagnostics-recompute/consumer section gains :llm-config, :search-config, :notifications:
:com.getorcha.workers.diagnostics-recompute/consumer
{:aws #integrant/ref :com.getorcha.aws/state
:db-pool #integrant/ref :com.getorcha.db/pool
:llm-config {:matching #ref [:com.getorcha/llm :fast]
:post-processing #ref [:com.getorcha/llm :fast]
:tax-compliance #ref [:com.getorcha/llm :main]
:vision #ref [:com.getorcha/llm :vision]
:web-search #ref [:com.getorcha/llm :web-search]}
:search-config #ref [:com.getorcha/search]
:notifications #ref [:com.getorcha/notifications]
:max-queue-messages 10
:wait-time-seconds #profile {#{:local-dev :demo} 0 :test 0 :default 20}}
Search config.edn for com.getorcha.workers.ap.matching.worker and replace with com.getorcha.workers.ap.processors.matching.worker. Same for system.clj.
In test/com/getorcha/workers/diagnostics_recompute_test.clj, add an end-to-end test:
(deftest edit-triggers-recompute-and-updates-diagnostics
(let [document-id (helpers/create-document! (helpers/create-legal-entity!))
_ (helpers/seed-completed-ingestion! document-id)
identity-id (helpers/create-identity!)
history-row (document-history/insert!
fixtures/*db*
{:document-id document-id :change-type :edit
:edited-by identity-id
:patch [{"op" "replace" "path" "/issuer/name" "value" "X"}]})]
(with-redefs [matching/->matching (fn [] (->NoopProcessor :matching))
validations/->validations (fn [] (->CountingProcessor :validations (atom [])))
...] ; stub the rest similarly
(orchestrator/recompute-all! {...}
{:document-id document-id
:history-id (:document-history/id history-row)
:document-version 2}))
(is (pos? (db.run/count-runs fixtures/*db* document-id {:processor-id "validations"})))))
clj-kondo --lint src/com/getorcha/workers/diagnostics_recompute*
clj -X:test:silent :nses '[com.getorcha.workers.diagnostics-recompute-test]' 2>&1 | grep -E "FAIL|ERROR|Ran"
Expected: pass.
git add -A
git commit -m "refactor: diagnostics-recompute uses unified processors engine"
update-slice! → update-diagnostic!Files:
Modify: src/com/getorcha/db/document_diagnostics.clj (drop update-slice! alias)
Modify: Any remaining update-slice! callers outside the engine (should be zero after Phase D)
Step 1: Find remaining callers
grep -rn "update-slice!" src test
Expected: only the alias itself in db/document_diagnostics.clj.
Remove the (def update-slice! update-diagnostic!) line at the bottom of the ns.
clj -X:test:silent 2>&1 | grep -A 5 -E "FAIL|ERROR|Execution error|failed because|Ran .* tests"
Expected: all pass.
git add src/com/getorcha/db/document_diagnostics.clj
git commit -m "cleanup: drop update-slice! alias"
Files:
Delete: src/com/getorcha/workers/ap/ingestion/post_process/protocol.clj (already deleted via Task D1, verify)
Delete: Any lingering with-run-row!, compute!, run-processor-phases, run-phase, build-diagnostics references
Step 1: Grep for orphans
grep -rn "with-run-row!\|run-processor-phases\|run-phase\|build-diagnostics" src test
Expected: empty.
Step 2: If any remain, delete inline (most likely none after Phase D)
Step 3: Commit (if any changes)
git add -A
git commit -m "cleanup: remove legacy post-process orchestrator helpers"
Files:
Modify: test/com/getorcha/workers/diagnostics_recompute_test.clj
Step 1: Write an integration test seeding a full document state
Seed document + ingestion (so processors have something to read), edit a scalar, call recompute-all!, assert each affected slice is updated. Skip matching (expensive LLM); redef matching/->matching to a no-op counting processor.
(deftest end-to-end-edit-recompute
(let [legal-entity-id (helpers/create-legal-entity!)
document-id (helpers/create-document! legal-entity-id)
ingestion-id (helpers/create-ingestion! document-id)
_ (helpers/seed-invoice-structured-data! document-id
{:issuer {:name "Acme"}
:total 100.0
:line-items [{:id "l1" :description "d"}]})
identity-id (helpers/create-identity!)
history-row (document-history/insert!
fixtures/*db*
{:document-id document-id :change-type :edit
:edited-by identity-id
:patch [{"op" "replace" "path" "/issuer/name" "value" "Acme-edited"}]})
_ing (document-history/insert!
fixtures/*db*
{:document-id document-id :change-type :ingestion
:ingestion-id ingestion-id :patch []})]
(with-redefs [matching/->matching (fn [] (->NoopProcessor :matching))
fraud/->fraud-detector (fn [] (->NoopProcessor :fraud-detector))]
(orchestrator/recompute-all!
{:db-pool fixtures/*db*
:llm-config {...}}
{:document-id document-id
:history-id (:document-history/id history-row)
:document-version 2}))
(let [doc (db.sql/execute-one! fixtures/*db*
{:select [:diagnostics]
:from [:document]
:where [:= :id document-id]})]
(is (some? (get-in doc [:document/diagnostics :validations])))
(is (pos? (db.run/count-runs fixtures/*db* document-id
{:processor-id "validations" :status :completed}))))))
clj -X:test:silent :nses '[com.getorcha.workers.diagnostics-recompute-test]' 2>&1 | grep -E "FAIL|ERROR|Ran"
Expected: pass.
git add test/com/getorcha/workers/diagnostics_recompute_test.clj
git commit -m "test: end-to-end edit recompute"
clj-kondo --lint src test dev 2>&1 | tail -40
Expected: 0 errors, 0 warnings.
clj -X:test:silent 2>&1 | grep -A 5 -E "FAIL|ERROR|Execution error|failed because|Ran .* tests"
Expected: all pass.
The most likely category of failures is test files that still reference com.getorcha.workers.ap.ingestion.post-process.* or the old matching ns. Grep + fix:
grep -rn "com.getorcha.workers.ap.ingestion.post-process\|com.getorcha.workers.ap.matching" test
Expected: empty.
git add -A
git commit -m "chore: final lint + test fixups"
Coverage against the spec:
| Spec section | Tasks |
|---|---|
| §2 Concepts (IProcessor, state, leaves, diagnostic) | A2, A3 |
| §3 IProcessor protocol v2 (state-aware reads/writes/diagnostic) | A2 |
| §3.1 DiagnosticSpec (single / sub-path / sub-paths / multi-slice) | A2, A5 (engine write-diagnostic!) |
| §4 Engine: run-processors! (refresh diagnostics between phases, in-memory apply) | A5 (core), A6 (filter), A7 (op filter) |
| §4.1 Persistence (ingestion: caller writes; edit: derivation row) | A0 (migration), A5 (persist-derivation!) |
| §5 Modes (:ingestion, :edit) | A5, A6, A7 |
| §6 Conditional recomputation | A3 (leaf-expansion, match-any), A6 (filter), D3 (changed-leaves) |
| §7 Write-protection | A1 (provenance ns), A7 (op filter) |
| §8 Notifications | A8 |
| §9 Renames | A4 (update-diagnostic!), C3 (enqueue!), plan-wide |
| §10 Namespace reorg | C1 (matching move), D1 (post-process delete) |
| §11 Phase lists (ingestion / matching-worker / edit) | D1, D2, D3 |
| §12 Migration table | B1–B9 |
| §12.1 validations per-doc-type + always-run | B9 |
| §12.2 Matching per-doc-type reads | C2 |
| §12.3 Reconciliation inside matching | C2 |
| §13 Diagnostic slice co-ownership | A4 (jsonb_set), A5 (write-diagnostic! multi-slice/sub-paths) |
| §13.2 tax-compliance multi-slice | B5 |
| §14 Tax-compliance vision :ingestion-only | B5 |
| §15 Test strategy | Each task includes tests; E3 E2E |
| §16 Rollout (13 steps) | A0–E4 |
| §17 Risks | (design-level; tests cover the concrete risks) |
Known gaps acknowledged:
:matching/permanent-failure specially. After A8 switches to :processor/failure, check src/com/getorcha/app/http/notifications.clj for any special-case rendering that needs updating. Likely a small fixup rather than a full follow-up.post-process/common.clj (PDF page extraction helper used by tax-compliance): must be reachable from processors.tax-compliance. In Task B5, move post_process/common.clj → processors/common.clj and update TCA's require.json-patch/diff-patch referenced in A5 doesn't exist. Task A5's "per-op-record" alternative avoids needing it — use the :applied-ops accumulator pattern.update-slice! caller must be updated as part of A4 Step 1a (the grep will find it). The call site becomes update-diagnostic!.TaxComplianceAnalyzer -compute originally expected an ingestion map with a :file :contents byte-array for vision mode. Under the new model, state.file is populated by the ingestion pipeline only. Edit mode doesn't fetch the PDF (vision mode is :ingestion-only per §14); task B5 Step 1 explicitly gates the vision branch.