Note (2026-04-24): After this document was written, legal_entity was renamed to tenant and the old tenant was renamed to organization. Read references to these terms with the pre-rename meaning.

Unified Processors Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Unify the three ways document derivations run today (post-processor pipeline, matching SQS worker, new diagnostics-recompute) under a single IProcessor protocol (v2) + a single run-processors! engine. Add conditional recomputation on edits (declared reads/writes, leaf-only, malli seq-regex) and write-protection against user edits.

Architecture: One protocol, one engine, two callers (ingestion-complete + edit-recompute + matching worker, which all use the same engine). Every processor declares its reads (malli seq-regex patterns over clj-path segments), its writes (for the op-filter), its diagnostic slice, and its modes. The engine handles run-row lifecycle, conditional scheduling (edit mode), op filtering against user-edited paths, and diagnostic slice writes. Matching and reconciliation become one processor (reconciliation moves into matching's -compute); matching lives alongside the other processors under workers.ap.processors.*.

Tech Stack: Clojure, Malli (seq-regex schemas for reads patterns), PostgreSQL (JSONB jsonb_set for sub-path writes), core.async/thread for per-phase concurrency, clojure.test + embedded-postgres + Testcontainers/LocalStack for integration tests.

Reference spec: docs/superpowers/specs/2026-04-15-unified-processors-design.md

Depends on:


File Map

Created

Moved (delete old, create new)

Modified


Phase A: Foundation

After this phase: the IProcessor protocol exists, the engine runs, helpers for reads/writes exist, document-provenance is exported from a neutral namespace, and the document_history_change_type enum accepts 'derivation' rows. No processors have been migrated yet; the post-process pipeline + matching worker continue to work unchanged.

Task A0: Migration — add 'derivation' to change_type enum

Files:

Write resources/migrations/20260415120000-add-derivation-change-type.up.sql:

-- Add 'derivation' to document_history_change_type enum.
-- Derivation rows capture aggregate structured-data mutations written
-- by the processors engine during edit-mode recomputes.
--
-- ALTER TYPE ADD VALUE cannot run inside a transaction in PostgreSQL.
--;;
ALTER TYPE document_history_change_type ADD VALUE IF NOT EXISTS 'derivation';
--;;

-- Relax the CHECK constraint so derivation rows (edited_by NULL,
-- ingestion_id NULL) are allowed.
ALTER TABLE document_history
    DROP CONSTRAINT document_history_exactly_one_source;
--;;
ALTER TABLE document_history
    ADD CONSTRAINT document_history_exactly_one_source
    CHECK (
        (change_type = 'ingestion' AND ingestion_id IS NOT NULL AND edited_by IS NULL)
     OR (change_type = 'edit'      AND ingestion_id IS NULL     AND edited_by IS NOT NULL)
     OR (change_type = 'derivation' AND ingestion_id IS NULL    AND edited_by IS NULL)
    );

Note: the existing constraint name — verify with psql \d+ document_history or by checking resources/migrations/20260413161401-add-document-history.up.sql line-by-line; use the correct constraint name in the migration.

Write resources/migrations/20260415120000-add-derivation-change-type.down.sql:

-- PostgreSQL does not support removing values from an enum. The down
-- migration can only revert the CHECK constraint; the 'derivation'
-- value remains in the enum (inert if no rows reference it).
ALTER TABLE document_history
    DROP CONSTRAINT document_history_exactly_one_source;
--;;
ALTER TABLE document_history
    ADD CONSTRAINT document_history_exactly_one_source
    CHECK (
        (change_type = 'ingestion' AND ingestion_id IS NOT NULL AND edited_by IS NULL)
     OR (change_type = 'edit'      AND ingestion_id IS NULL     AND edited_by IS NOT NULL)
    );
clj -X:dev-run :fn 'com.getorcha.db.migrations/migrate' 2>&1 | tail -10

Expected: "migration 20260415120000 applied". Then confirm via psql:

psql -h localhost -U postgres -d orcha -c "\dT+ document_history_change_type"

Expected output lists ingestion, edit, derivation.

Open src/com/getorcha/db/document_history.clj, find the insert! fn's assertion (line 27-32), and extend to accept :derivation:

(assert (case change-type
          :ingestion  (and (some? ingestion-id) (nil? edited-by))
          :edit       (and (nil? ingestion-id)   (some? edited-by))
          :derivation (and (nil? ingestion-id)   (nil? edited-by))
          false)
        (str "document_history insert XOR violation; got change-type="
             (pr-str change-type)
             " ingestion-id=" (pr-str ingestion-id)
             " edited-by=" (pr-str edited-by)))
git add resources/migrations/20260415120000-add-derivation-change-type.up.sql resources/migrations/20260415120000-add-derivation-change-type.down.sql src/com/getorcha/db/document_history.clj
git commit -m "migration: add derivation to document_history_change_type"

Task A1: Extract document provenance to a shared namespace

Files:

Write src/com/getorcha/document/provenance.clj:

(ns com.getorcha.document.provenance
  "Per-path provenance for a document, derived from document_history.
   Shared between the UI (which tints user-edited fields) and the
   processor engine (which refuses to overwrite user-edited paths).

   Walks document_history newest → oldest, collecting op paths up to
   (but not including) the most recent :ingestion row. Post-ingestion
   edits only: anything before the most recent ingestion is 'rewritten'
   by that ingestion and no longer user-sourced."
  (:require [com.getorcha.db.document-history :as document-history]
            [com.getorcha.json-patch.path :as json-patch.path]))


(defn document-provenance
  "Returns {json-pointer-string → {:edited-by :edited-at}} for every
   path with a human edit since the most recent ingestion for this
   document. Paths absent from the map are implicitly LLM-sourced.

   Within the post-ingestion edits, the newest entry at each path wins
   (first-seen-wins while iterating newest → oldest)."
  [db-pool document-id]
  (let [rows        (document-history/rows-for-document db-pool document-id)
        post-ingest (take-while #(not= :ingestion
                                       (:document-history/change-type %))
                                rows)]
    (reduce
     (fn [acc {:document-history/keys [patch edited-by created-at]}]
       (reduce (fn [acc' op]
                 (let [path (:path op)]
                   (if (contains? acc' path)
                     acc'
                     (assoc acc' path {:edited-by edited-by
                                       :edited-at created-at}))))
               acc
               patch))
     {}
     post-ingest)))


(defn user-edited-paths
  "Returns #{clj-path ...} — the set of user-edited paths since the
   most recent ingestion, pre-parsed to clj-paths for prefix-matching."
  [db-pool document-id]
  (->> (document-provenance db-pool document-id)
       keys
       (into #{} (map json-patch.path/pointer->clj-path))))

Write test/com/getorcha/document/provenance_test.clj:

(ns com.getorcha.document.provenance-test
  (:require [clojure.test :refer [deftest is testing use-fixtures]]
            [com.getorcha.db.document-history :as document-history]
            [com.getorcha.document.provenance :as provenance]
            [com.getorcha.test.fixtures :as fixtures]
            [com.getorcha.test.notification-helpers :as helpers]))


(use-fixtures :once fixtures/with-running-system)
(use-fixtures :each fixtures/with-db-rollback)


(defn ^:private seed-doc!
  []
  (let [legal-entity-id (helpers/create-legal-entity!)
        _identity-id    (helpers/create-identity!)
        document-id     (helpers/create-document! legal-entity-id)]
    document-id))


(deftest document-provenance-collects-edits-since-last-ingestion
  (testing "edits after the most recent ingestion are returned; earlier edits are not"
    (let [document-id (seed-doc!)
          identity-id (helpers/create-identity!)
          _           (document-history/insert!
                       fixtures/*db*
                       {:document-id document-id
                        :change-type :edit
                        :edited-by   identity-id
                        :patch       [{"op" "replace" "path" "/issuer/name" "value" "A"}]})
          _           (document-history/insert!
                       fixtures/*db*
                       {:document-id  document-id
                        :change-type  :ingestion
                        :ingestion-id (helpers/create-ingestion! document-id)
                        :patch        []})
          _           (document-history/insert!
                       fixtures/*db*
                       {:document-id document-id
                        :change-type :edit
                        :edited-by   identity-id
                        :patch       [{"op" "replace" "path" "/issuer/name" "value" "B"}
                                      {"op" "replace" "path" "/total" "value" 42}]})
          result      (provenance/document-provenance fixtures/*db* document-id)]
      (is (= #{"/issuer/name" "/total"} (set (keys result))))
      (is (= identity-id (get-in result ["/issuer/name" :edited-by]))))))


(deftest user-edited-paths-returns-clj-paths
  (testing "returns clj-paths ready for prefix matching"
    (let [document-id (seed-doc!)
          identity-id (helpers/create-identity!)
          _           (document-history/insert!
                       fixtures/*db*
                       {:document-id document-id
                        :change-type :edit
                        :edited-by   identity-id
                        :patch       [{"op" "replace" "path" "/issuer/name" "value" "A"}
                                      {"op" "replace" "path" "/line-items[id=abc]/description" "value" "x"}]})
          result      (provenance/user-edited-paths fixtures/*db* document-id)]
      (is (contains? result [:issuer :name]))
      (is (contains? result [:line-items {:id "abc"} :description])))))

Run: clj -X:test:silent :nses '[com.getorcha.document.provenance-test]' 2>&1 | grep -E "FAIL|ERROR|Ran" Expected: PASS (fn already works since it's a straight lift of the existing UI code).

In src/com/getorcha/app/http/documents/view/shared.clj, replace the require [com.getorcha.app.http.documents.view.provenance :as provenance] with [com.getorcha.document.provenance :as provenance] (same local alias, same call sites).

Repeat in src/com/getorcha/app/http/documents/view/invoice.clj and any other caller (grep for view.provenance).

rm src/com/getorcha/app/http/documents/view/provenance.clj
clj-kondo --lint src/com/getorcha/document src/com/getorcha/app/http/documents
clj -X:test:silent :nses '[com.getorcha.app.http.documents.view.invoice-test]' 2>&1 | grep -E "FAIL|ERROR|Ran"

Expected: no lint errors, tests pass (UI continues to render per-field provenance).

git add src/com/getorcha/document/ test/com/getorcha/document/ src/com/getorcha/app/http/documents/
git commit -m "refactor: extract document-provenance to shared ns"

Task A2: Introduce the IProcessor v2 protocol

Files:

Write src/com/getorcha/workers/ap/processors.clj:

(ns com.getorcha.workers.ap.processors
  "IProcessor protocol v2 and default method implementations.
   Version 1 of IProcessor lived in
   `com.getorcha.workers.ap.ingestion.post-process.protocol`. This
   version extends it with declarative reads/writes/diagnostic/modes
   metadata and an ops-based -apply-ops (see spec §3)."
  (:refer-clojure :exclude [get]))


(defprotocol IProcessor
  \"A unit of derivation that runs against document state.
   See docs/superpowers/specs/2026-04-15-unified-processors-design.md.\"
  (-id         [this]
    \"Returns the processor's stable keyword id (e.g. :accounts).\")
  (-reads      [this state]
    \"Returns a vector of malli seq-regex patterns over clj-path
     segments, declaring the leaf paths this processor reads.
     Dispatches on state for per-doc-type reads (e.g. matching reads
     differ for invoice vs. contract). Ignored when -always? is true.\")
  (-writes     [this state]
    \"Returns a vector of malli seq-regex patterns. Empty when the
     processor writes only to a diagnostic slice.\")
  (-diagnostic [this state]
    \"Returns one of:
       nil
       {:slice :kw}
       {:slice :kw :sub-path [k1 k2 ...]}
       {:slice :kw :sub-paths [[k1] [k2] ...]}
       [spec1 spec2 ...]   ;; multi-slice
     See spec §3.1.\")
  (-modes      [this]
    \"Returns a non-empty subset of #{:ingestion :edit}.\")
  (-always?    [this]
    \"Returns true if the processor bypasses the conditional scheduler
     filter in :edit mode. Defaults to false.\")
  (-compute    [this ctx state]
    \"Returns {:result <map-or-value> :stats <llm-stats-map-or-nil>}.
     When -diagnostic returns a multi-slice or sub-paths spec, the
     result map MUST have top-level keys matching each slice name.\")
  (-apply-ops  [this state result]
    \"Returns a vector of JSON-Patch ops (maps with string keys:
     \\\"op\\\" \\\"path\\\" \\\"value\\\") that should be applied to
     state's structured-data. Empty when the processor only writes
     to a diagnostic slice.\"))


(def defaults
  \"Default method implementations. Processors may use (extend-type)
   with these to avoid reimplementing trivial methods.\"
  {:reads      (fn [_this _state] [])
   :writes     (fn [_this _state] [])
   :diagnostic (fn [_this _state] nil)
   :modes      (fn [_this] #{:ingestion :edit})
   :always?    (fn [_this] false)
   :apply-ops  (fn [_this _state _result] [])})

Write test/com/getorcha/workers/ap/processors_test.clj:

(ns com.getorcha.workers.ap.processors-test
  (:require [clojure.test :refer [deftest is testing]]
            [com.getorcha.workers.ap.processors :as proc]))


(defrecord ^:private MinimalProcessor []
  proc/IProcessor
  (-id         [_] :minimal)
  (-reads      [_ _state] [])
  (-writes     [_ _state] [])
  (-diagnostic [_ _state] nil)
  (-modes      [_] #{:ingestion})
  (-always?    [_] false)
  (-compute    [_ _ctx _state] {:result {:ok true} :stats nil})
  (-apply-ops  [_ _state _result] []))


(deftest protocol-exposes-required-methods
  (let [p (->MinimalProcessor)]
    (is (= :minimal (proc/-id p)))
    (is (= #{:ingestion} (proc/-modes p)))
    (is (false? (proc/-always? p)))
    (is (nil? (proc/-diagnostic p nil)))
    (is (= {:result {:ok true} :stats nil} (proc/-compute p nil nil)))
    (is (= [] (proc/-apply-ops p nil {:ok true})))))

Run: clj -X:test:silent :nses '[com.getorcha.workers.ap.processors-test]' 2>&1 | grep -E "FAIL|ERROR|Ran" Expected: PASS.

git add src/com/getorcha/workers/ap/processors.clj test/com/getorcha/workers/ap/processors_test.clj
git commit -m "feat: IProcessor v2 protocol"

Task A3: Reads helpers — seq-regex authoring + leaf expansion

Files:

Write src/com/getorcha/workers/ap/processors/reads.clj:

(ns com.getorcha.workers.ap.processors.reads
  "Helpers for declaring and matching processor reads/writes against
   edit paths.

   Authors declare reads as concise segment vectors:

     (read-leaf :issuer :name)
     (read-leaf :line-items :* :description)

   These lower to malli seq-regex patterns:

     [:cat [:= :issuer] [:= :name]]
     [:cat [:= :line-items] :any [:= :description]]

   which are validated against clj-paths produced by
   `com.getorcha.json-patch.path/pointer->clj-path`. A patch segment
   of {:id X} matches :any in the pattern (see spec §2.3)."
  (:require [com.getorcha.schema.structured-data :as schema.structured-data]
            [malli.core :as m]
            [malli.util :as m.util]))


(defn read-leaf
  "Builds a malli seq-regex pattern from a concise segment vector.
   `:*` in the segments becomes `:any`; every other segment becomes
   `[:= seg]`."
  [& segs]
  (into [:cat]
        (map (fn [seg]
               (cond
                 (= seg :*) :any
                 :else      [:= seg])))
        segs))


(defn match-any?
  "Returns true when any of `patterns` m/validates `clj-path`.
   `clj-path` is a vector of segments (keywords, ints, {:id X} maps)."
  [patterns clj-path]
  (boolean (some #(m/validate % clj-path) patterns)))


(defn ^:private atomic-schema?
  "True when `schema` is a primitive or a vector of primitives —
   anything that represents a leaf in structured-data."
  [schema]
  (let [schema (loop [s schema]
                 (if (and s (= :maybe (m/type s)))
                   (recur (first (m/children s)))
                   s))
        kind   (m/type schema)]
    (or (#{:int :double :string :boolean :keyword :uuid :any
            number? int? double? string? boolean? keyword? uuid?} kind)
        (and (= :vector kind)
             (atomic-schema? (first (m/children schema))))
        (and (= :enum kind)))))


(defn ^:private walk-leaves
  "Walks `value` against `schema`, yielding {:path clj-path :value v}
   for every atomic descendant."
  [schema clj-path value]
  (let [schema (loop [s schema]
                 (if (and s (= :maybe (m/type s)))
                   (recur (first (m/children s)))
                   s))]
    (cond
      (nil? schema)
      [{:path clj-path :value value}]

      (atomic-schema? schema)
      [{:path clj-path :value value}]

      (and (map? value) (= :map (m/type schema)))
      (mapcat (fn [[k v]]
                (let [child (m.util/get schema k)]
                  (when child
                    (walk-leaves child (conj clj-path k) v))))
              value)

      (and (vector? value) (= :vector (m/type schema)))
      (mapcat (fn [item]
                (let [child (m.util/get schema 0)]
                  (when child
                    (walk-leaves child
                                 (conj clj-path
                                       (if (and (map? item) (:id item))
                                         {:id (:id item)}
                                         (count clj-path)))
                                 item))))
              value)

      :else
      [{:path clj-path :value value}])))


(defn expand-op-to-leaves
  "Given a patch op and (optionally) a pre-patch structured-data
   snapshot, returns a sequence of virtual leaf clj-paths the op
   touches.

   For scalar/leaf ops returns `[clj-path]`. For non-leaf ops (add
   whole object, remove whole object, replace subtree), walks the
   value (or pre-patch snapshot for :remove) to enumerate descendant
   leaves under the StructuredData malli schema for the document type."
  [op pre-patch-sd {:keys [document-type]}]
  (let [op-path (:path op)
        clj    (com.getorcha.json-patch.path/pointer->clj-path op-path)
        sd     (when pre-patch-sd pre-patch-sd)
        type-schema (m.util/get schema.structured-data/StructuredData document-type)
        subschema   (reduce (fn [s seg]
                              (let [s (loop [s s]
                                        (if (and s (= :maybe (m/type s)))
                                          (recur (first (m/children s)))
                                          s))]
                                (cond
                                  (nil? s)                   (reduced nil)
                                  (or (int? seg) (map? seg)) (m.util/get s 0)
                                  :else                      (m.util/get s seg))))
                            type-schema
                            clj)]
    (cond
      ;; Atomic leaf — the op path itself is a leaf.
      (atomic-schema? subschema)
      [clj]

      ;; Non-leaf op with a value (add or replace) — walk the value.
      (and subschema (contains? op :value))
      (map :path (walk-leaves subschema clj (:value op)))

      ;; Non-leaf remove — walk the pre-patch snapshot.
      (and subschema (= "remove" (get op "op")))
      (let [v (reduce (fn [v seg]
                        (cond
                          (nil? v)                 nil
                          (and (map? seg) (:id seg))
                          (some (fn [i] (when (= (:id seg) (:id i)) i)) v)
                          :else                    (get v seg)))
                      sd
                      clj)]
        (map :path (walk-leaves subschema clj v)))

      :else
      [clj])))

Write test/com/getorcha/workers/ap/processors/reads_test.clj:

(ns com.getorcha.workers.ap.processors.reads-test
  (:require [clojure.test :refer [deftest is testing]]
            [com.getorcha.workers.ap.processors.reads :as reads]))


(deftest read-leaf-builds-seq-regex
  (testing "fixed segments become [:= seg], :* becomes :any"
    (is (= [:cat [:= :issuer] [:= :name]]
           (reads/read-leaf :issuer :name)))
    (is (= [:cat [:= :line-items] :any [:= :description]]
           (reads/read-leaf :line-items :* :description)))))


(deftest match-any-accepts-exact-paths
  (testing "exact match"
    (is (true?  (reads/match-any? [(reads/read-leaf :issuer :name)] [:issuer :name])))
    (is (false? (reads/match-any? [(reads/read-leaf :issuer :name)] [:issuer :country])))))


(deftest match-any-wildcards-line-item-id
  (testing ":any matches {:id X}"
    (is (true?
         (reads/match-any? [(reads/read-leaf :line-items :* :description)]
                           [:line-items {:id "abc-123"} :description])))
    (is (false?
         (reads/match-any? [(reads/read-leaf :line-items :* :description)]
                           [:line-items {:id "abc-123"} :amount])))))


(deftest expand-op-to-leaves-passes-through-scalar
  (testing "leaf path returns itself"
    (let [op {:path "/issuer/name" :value "Acme"}]
      (is (= [[:issuer :name]]
             (reads/expand-op-to-leaves op nil {:document-type "invoice"}))))))


(deftest expand-op-to-leaves-walks-object-add
  (testing "add of a whole line-item expands to its leaves"
    (let [op {"op"    "add"
              :path   "/line-items/-"
              :value  {:id "new" :order 1 :description "x" :amount 10.0}}]
      (is (contains? (set (reads/expand-op-to-leaves op nil {:document-type "invoice"}))
                     [:line-items {:id "new"} :description])))))

Run: clj -X:test:silent :nses '[com.getorcha.workers.ap.processors.reads-test]' 2>&1 | grep -E "FAIL|ERROR|Ran" Expected: PASS.

git add src/com/getorcha/workers/ap/processors/reads.clj test/com/getorcha/workers/ap/processors/reads_test.clj
git commit -m "feat: processor reads + leaf-expansion helpers"

Task A4: Diagnostic sub-path writes (update-slice!update-diagnostic!)

Files:

Replace src/com/getorcha/db/document_diagnostics.clj body with:

(ns com.getorcha.db.document-diagnostics
  "Helpers for writing slices (and optional sub-paths) into
   document.diagnostics with derived needs_human_review recomputation."
  (:require [cheshire.core :as json]
            [com.getorcha.db.sql :as db.sql]
            [honey.sql.pg-ops]))


(defn ^:private jsonb-path-array
  [slice-key sub-path]
  (into [slice-key] (map name) sub-path))


(defn update-diagnostic!
  "Merges `slice-value` into document.diagnostics at
   `[slice-key & sub-path]`. Also recomputes
   document.needs_human_review from the merged diagnostics.

   Atomic per-update (single SQL statement using jsonb_set).
   No-op if `expected-version` is provided and document.version has
   advanced past it.

   Arity:
     (update-diagnostic! tx doc-id slice-key slice-value)
     (update-diagnostic! tx doc-id slice-key sub-path slice-value)
     (update-diagnostic! tx doc-id slice-key sub-path slice-value expected-version)

   `sub-path` is a vector of keywords/strings ([] = replace whole slice)."
  ([tx document-id slice-key slice-value]
   (update-diagnostic! tx document-id slice-key [] slice-value nil))
  ([tx document-id slice-key sub-path slice-value]
   (update-diagnostic! tx document-id slice-key sub-path slice-value nil))
  ([tx document-id slice-key sub-path slice-value expected-version]
   (let [path-arr (jsonb-path-array slice-key sub-path)
         jsonb    (json/generate-string slice-value)]
     (db.sql/execute!
      tx
      (cond-> {:update :document
               :set    {:diagnostics
                        [:case
                         [:= :diagnostics nil]
                         [:cast (json/generate-string {slice-key (if (seq sub-path)
                                                                   (reduce (fn [m k]
                                                                             {k m})
                                                                           slice-value
                                                                           (reverse sub-path))
                                                                   slice-value)})
                          :jsonb]
                         :else
                         [:jsonb_set
                          [:coalesce :diagnostics [:cast "{}" :jsonb]]
                          [:cast [:array (mapv name path-arr)] [:raw "text[]"]]
                          [:cast jsonb :jsonb]
                          true]]
                        :updated-at [:now]}
               :where  [:= :id document-id]}
        expected-version (update :where (fn [w] [:and w [:= :version expected-version]]))))
     (db.sql/execute!
      tx
      {:update :document
       :set    {:needs-human-review
                [:or
                 [:exists
                  {:select [1]
                   :from   [[[:jsonb_each [:-> :diagnostics [:inline "validations"]]]
                             [[:raw "t(k, v)"]]]]
                   :where  [:= [:->> :v [:inline "status"]] [:inline "error"]]}]
                 [:exists
                  {:select [1]
                   :from   [[[:jsonb_array_elements [:-> :diagnostics [:inline "fraud-flags"]]]
                             [[:raw "f"]]]]
                   :where  [:= [:->> :f [:inline "severity"]] [:inline "critical"]]}]]}
       :where  [:= :id document-id]}))))


No alias is kept. Existing callers:

grep -rn "update-slice!" src test | grep -v document_diagnostics.clj

Expected callers: workers/ap/ingestion.clj, workers/ap/matching/worker.clj, workers/ap/matching/reconciliation.clj (after C1 rename, they're under processors/matching/), possibly more. Replace each (update-slice! …) with (update-diagnostic! …).

None of these callers use the 5-arg form with expected-version; all are 4-arg. After rename, the call-sites continue to compile unchanged except for the fn name.

Append to test/com/getorcha/db/document_diagnostics_test.clj (create if absent):

(deftest sub-path-merge-leaves-sibling-keys-intact
  (testing "writing a sub-path does not clobber sibling keys in the slice"
    (let [doc-id (helpers/create-document! (helpers/create-legal-entity!))]
      (diagnostics/update-diagnostic! fixtures/*db* doc-id
                                      :validations
                                      {:tax-id-format {:status "pass"}
                                       :iban-format   {:status "pass"}})
      (diagnostics/update-diagnostic! fixtures/*db* doc-id
                                      :validations [:tax-id-format]
                                      {:status "warning" :message "x"})
      (let [doc (db.sql/execute-one! fixtures/*db*
                                     {:select [:diagnostics]
                                      :from   [:document]
                                      :where  [:= :id doc-id]})]
        (is (= "warning" (get-in doc [:document/diagnostics :validations :tax-id-format :status])))
        (is (= "pass"    (get-in doc [:document/diagnostics :validations :iban-format :status])))))))

Run: clj -X:test:silent :nses '[com.getorcha.db.document-diagnostics-test]' 2>&1 | grep -E "FAIL|ERROR|Ran" Expected: PASS.

git add src/com/getorcha/db/document_diagnostics.clj test/com/getorcha/db/document_diagnostics_test.clj
git commit -m "feat: update-diagnostic! supports sub-path writes"

Task A5: Engine — core flow (no filtering yet)

Files:

Write src/com/getorcha/workers/ap/processors/engine.clj:

(ns com.getorcha.workers.ap.processors.engine
  "Run-processors engine: phase-ordered, parallel-within-phase,
   run-row lifecycle, diagnostic slice writes, ops-based apply.

   See docs/superpowers/specs/2026-04-15-unified-processors-design.md §4."
  (:require [cheshire.core :as json]
            [clojure.core.async :as a]
            [clojure.tools.logging :as log]
            [com.getorcha.db.document-diagnostics :as db.diagnostics]
            [com.getorcha.db.document-history :as db.document-history]
            [com.getorcha.db.document-processor-run :as db.run]
            [com.getorcha.db.sql :as db.sql]
            [com.getorcha.json-patch :as json-patch]
            [com.getorcha.json-patch.path :as json-patch.path]
            [com.getorcha.workers.ap.processors :as proc]))


(defn ^:private trigger-params
  "Maps state -> run-row-insert params (without :processor-id)."
  [{:keys [document trigger-kind ingestion-id history-id commit-sha] :as _state}]
  (let [base {:document-id      (:document/id document)
              :trigger-kind     trigger-kind
              :document-version (:document/version document)
              :commit-sha       commit-sha}]
    (cond-> base
      ingestion-id (assoc :ingestion-id ingestion-id)
      history-id   (assoc :triggered-by-history-id history-id))))


(defn ^:private specs-seq
  "Normalises a -diagnostic return to a sequence of {:slice :sub-path}
   or {:slice :sub-paths [...]} specs."
  [diag]
  (cond
    (nil? diag)    nil
    (vector? diag) diag
    :else          [diag]))


(defn ^:private write-diagnostic!
  "Writes `result` to one or more diagnostic slices/sub-paths per the
   declared `diag` spec (see spec §3.1 and §13).

   - `{:slice K}` or `{:slice K :sub-path P}`: one write; value is
     the whole result (or result itself if result is a map keyed by
     slice names).
   - `{:slice K :sub-paths [[p1] [p2] ...]}`: one write per sub-path;
     value is `(get result (first sub-path))`.
   - Vector of specs: same rule applied per spec; each slice-K pulls
     `(get result K)` as its value."
  [db-pool document-id diag result version]
  (doseq [{:keys [slice sub-path sub-paths] :as spec} (specs-seq diag)]
    (cond
      sub-paths
      (doseq [sp sub-paths]
        (db.diagnostics/update-diagnostic!
          db-pool document-id slice sp (get result (first sp)) version))

      sub-path
      (db.diagnostics/update-diagnostic!
        db-pool document-id slice sub-path result version)

      :else
      ;; Whole-slice write. In multi-slice contexts the result is a
      ;; map keyed by slice names; pick our slice's value.
      (let [value (if (and (map? result) (contains? result slice))
                    (get result slice)
                    result)]
        (db.diagnostics/update-diagnostic!
          db-pool document-id slice [] value version)))))


(defn ^:private apply-ops-in-memory
  "Applies `ops` to `state`'s in-memory structured-data only. No DB write."
  [state ops]
  (if (empty? ops)
    state
    (let [new-sd (json-patch/apply-patch (:structured-data state) ops)]
      (-> state
          (assoc :structured-data new-sd)
          (assoc-in [:document :document/structured-data] new-sd)))))


(defn ^:private refresh-diagnostics
  "Refetches the latest diagnostics from the DB and populates
   `state.diagnostics`. Called between phases so later phases see
   earlier-phase diagnostic writes."
  [db-pool state]
  (let [doc-id (get-in state [:document :document/id])
        row    (db.sql/execute-one! db-pool
                                    {:select [:diagnostics]
                                     :from   [:document]
                                     :where  [:= :id doc-id]})]
    (assoc state :diagnostics (:document/diagnostics row))))


(defn ^:private should-run?
  "Engine scheduler gate. Stub in A5 (always true); A6 implements the
   real conditional logic."
  [_db-pool _state _processor]
  true)


(defn ^:private filter-ops-against-user-edits
  "Blocks processor ops that target user-edited paths. Stub in A5
   (pass-through); A7 implements the prefix-match filter."
  [_user-edited-paths _processor-id ops]
  ops)


(defn ^:private run-processor!
  "Runs a single processor: inserts running row, -compute, writes
   diagnostic + builds ops, completes/fails row. Returns the ops
   that should be applied (possibly empty)."
  [{:keys [db-pool user-edited-paths] :as ctx} state processor]
  (let [run-id (db.run/insert-running!
                db-pool
                (assoc (trigger-params state) :processor-id (name (proc/-id processor))))]
    (try
      (let [{:keys [result stats]} (proc/-compute processor ctx state)
            diag                   (proc/-diagnostic processor state)
            raw-ops                (proc/-apply-ops processor state result)
            ops                    (case (:mode state)
                                     :edit (filter-ops-against-user-edits
                                            (or user-edited-paths #{})
                                            (proc/-id processor)
                                            raw-ops)
                                     raw-ops)]
        (write-diagnostic! db-pool
                           (get-in state [:document :document/id])
                           diag
                           result
                           (get-in state [:document :document/version]))
        (db.run/complete-run! db-pool run-id (assoc stats :result result))
        ops)
      (catch Throwable t
        (db.run/fail-run! db-pool run-id (.getMessage t))
        (log/error t "Processor failed"
                   {:processor (proc/-id processor)
                    :document  (get-in state [:document :document/id])})
        ;; Notification hook is added in Task A8.
        []))))


(defn ^:private run-phase!
  [ctx state processors]
  (let [state   (refresh-diagnostics (:db-pool ctx) state)
        to-run  (filterv #(should-run? (:db-pool ctx) state %) processors)
        chans   (mapv (fn [p] (a/thread (run-processor! ctx state p))) to-run)
        all-ops (reduce into [] (mapv a/<!! chans))]
    (apply-ops-in-memory state all-ops)))


(defn ^:private persist-derivation!
  "After an :edit-mode run completes, if structured-data has changed,
   compute the aggregate JSON-Patch (diff initial → final) and persist
   as a document.structured_data update + document_history row with
   change_type=:derivation + version++."
  [db-pool {:keys [document initial-structured-data structured-data history-id]}]
  (let [doc-id  (:document/id document)
        initial initial-structured-data
        final   structured-data]
    (when (and initial (not= initial final))
      (let [patch (json-patch/diff-patch initial final)
            new-version (inc (:document/version document))]
        (when (seq patch)
          (db.sql/with-transaction [tx db-pool]
            (db.sql/execute!
              tx
              {:update :document
               :set    {:structured-data [:lift final]
                        :version         new-version
                        :updated-at      [:now]}
               :where  [:= :id doc-id]})
            (db.document-history/insert!
              tx
              {:document-id doc-id
               :change-type :derivation
               :patch       patch})))))))


(defn run-processors!
  "Runs every phase in order. Within a phase, processors run
   concurrently. Returns the final state (with applied ops folded in).

   Caller must fill state's :mode, :trigger-kind, :ingestion-id /
   :history-id / :edited-by, :document (row), :structured-data,
   :legal-entity, :file, :commit-sha, and (in :edit mode)
   :changed-leaves as appropriate. The `phases` argument is a vector
   of vectors of IProcessor instances."
  [ctx state phases]
  (let [initial (:structured-data state)
        state   (-> state
                    (assoc :initial-structured-data initial)
                    (cond-> (= :edit (:mode state))
                      (assoc :user-edited-paths
                             (com.getorcha.document.provenance/user-edited-paths
                               (:db-pool ctx)
                               (get-in state [:document :document/id])))))
        ctx     (cond-> ctx
                  (:user-edited-paths state)
                  (assoc :user-edited-paths (:user-edited-paths state)))
        final   (reduce (fn [st phase] (run-phase! ctx st phase)) state phases)]
    (when (= :edit (:mode state))
      (persist-derivation! (:db-pool ctx) final))
    final))

Note on json-patch/diff-patch: this function does not exist today. Task A5 must add it to src/com/getorcha/json_patch.clj: a simple diff returning JSON-Patch ops that transform initial into final. A naive implementation walks both values simultaneously, emitting replace ops for leaves that differ and add/remove ops for keys that were added or dropped. Add a small test. If a production-grade diff turns out non-trivial, fall back to the per-op :apply-ops record (keep ops from every phase, flatten, emit as the derivation patch) — no diff-patch needed.

Recommended: take the per-op-record approach (avoids implementing JSON-Patch diff):

(defn ^:private apply-ops-in-memory
  "Applies `ops` to `state`'s in-memory structured-data only, and
   appends them to `:applied-ops` for the eventual derivation row."
  [state ops]
  (if (empty? ops)
    state
    (let [new-sd (json-patch/apply-patch (:structured-data state) ops)]
      (-> state
          (assoc :structured-data new-sd)
          (assoc-in [:document :document/structured-data] new-sd)
          (update :applied-ops (fnil into []) ops)))))

And persist-derivation! reads (:applied-ops state) instead of computing a diff. Update both accordingly in your implementation.

Write test/com/getorcha/workers/ap/processors/engine_test.clj:

(ns com.getorcha.workers.ap.processors.engine-test
  (:require [clojure.test :refer [deftest is testing use-fixtures]]
            [com.getorcha.db.document-processor-run :as db.run]
            [com.getorcha.db.sql :as db.sql]
            [com.getorcha.test.fixtures :as fixtures]
            [com.getorcha.test.notification-helpers :as helpers]
            [com.getorcha.workers.ap.processors :as proc]
            [com.getorcha.workers.ap.processors.engine :as engine]))


(use-fixtures :once fixtures/with-running-system)
(use-fixtures :each fixtures/with-db-rollback)


(defrecord ^:private CountingProcessor [id order-atom]
  proc/IProcessor
  (-id         [_] id)
  (-reads      [_ _state] [])
  (-writes     [_ _state] [])
  (-diagnostic [_ _state] nil)
  (-modes      [_] #{:ingestion})
  (-always?    [_] true)
  (-compute    [_ _ctx _state]
    (swap! order-atom conj id)
    {:result {:ran id} :stats nil})
  (-apply-ops  [_ _state _r] []))


(deftest phase-ordering-sequential-between-phases
  (let [document-id (helpers/create-document! (helpers/create-legal-entity!))
        ingestion-id (helpers/create-ingestion! document-id)
        order        (atom [])
        ctx          {:db-pool fixtures/*db*}
        state        {:mode :ingestion
                      :trigger-kind :ingestion
                      :ingestion-id ingestion-id
                      :document {:document/id document-id
                                 :document/version 0
                                 :document/structured-data {}}
                      :structured-data {}}
        phase-1      [(->CountingProcessor :p1 order)]
        phase-2      [(->CountingProcessor :p2 order)]]
    (engine/run-processors! ctx state [phase-1 phase-2])
    (is (= [:p1 :p2] @order))
    (is (= 2 (db.run/count-runs fixtures/*db* document-id)))))

Run: clj -X:test:silent :nses '[com.getorcha.workers.ap.processors.engine-test]' 2>&1 | grep -E "FAIL|ERROR|Ran" Expected: PASS.

git add src/com/getorcha/workers/ap/processors/engine.clj test/com/getorcha/workers/ap/processors/engine_test.clj
git commit -m "feat: processors engine — core run loop"

Task A6: Engine conditional filter

Files:

In src/com/getorcha/db/document_processor_run.clj, extend the kwarg destructuring + WHERE cond->:

(defn count-runs
  "Counts runs for `document-id`, optionally filtered by `:processor-id`,
   `:status`, and/or `:document-version`."
  [db-pool document-id & {:keys [processor-id status document-version]}]
  (let [{:keys [count]}
        (db.sql/execute-one!
         db-pool
         {:select [[:%count.* :count]]
          :from   [:document-processor-run]
          :where  (cond-> [:and [:= :document-id document-id]]
                    processor-id     (conj [:= :processor-id processor-id])
                    status           (conj [:= :status (db.sql/->cast status :processor-run-status)])
                    document-version (conj [:= :document-version document-version]))})]
    (or count 0)))

Update the existing test/com/getorcha/db/document_processor_run_test.clj (count-runs-test) to add a kwarg test covering :document-version:

(is (= 1 (run/count-runs fixtures/*db* document-id
                         :processor-id "matching"
                         :document-version 1)))

Replace A5's stub with the real implementation (uses kwarg-style per count-runs' signature):

(defn ^:private has-completed-run-at-version?
  [db-pool document-id processor-id version]
  (pos? (db.run/count-runs
         db-pool document-id
         :processor-id processor-id
         :status :completed
         :document-version version)))


(defn ^:private should-run?
  "Returns true when a processor must execute. Always runs in
   :ingestion mode. In :edit mode, runs iff `-always?`, or the
   processor has no completed run at the current document version,
   or any read pattern intersects changed-leaves."
  [db-pool state processor]
  (let [document-id (get-in state [:document :document/id])
        version     (get-in state [:document :document/version])]
    (case (:mode state)
      :ingestion true
      :edit
      (or (proc/-always? processor)
          (not (has-completed-run-at-version?
                 db-pool document-id (name (proc/-id processor)) version))
          (some #(reads/match-any? (proc/-reads processor state) %)
                (:changed-leaves state))))))

Add to engine.clj's require block:

[com.getorcha.workers.ap.processors.reads :as reads]

Append to engine_test.clj:

(defrecord ^:private ReadsProcessor [id reads order-atom]
  proc/IProcessor
  (-id         [_] id)
  (-reads      [_ _state] reads)
  (-writes     [_ _state] [])
  (-diagnostic [_ _state] nil)
  (-modes      [_] #{:ingestion :edit})
  (-always?    [_] false)
  (-compute    [_ _ _]
    (swap! order-atom conj id)
    {:result nil :stats nil})
  (-apply-ops  [_ _ _] []))


(deftest edit-mode-skips-processors-whose-reads-miss-changed-leaves
  (let [document-id   (helpers/create-document! (helpers/create-legal-entity!))
        identity-id   (helpers/create-identity!)
        history-id    (:document-history/id
                       (com.getorcha.db.document-history/insert!
                        fixtures/*db*
                        {:document-id document-id :change-type :edit
                         :edited-by identity-id
                         :patch [{"op" "replace" "path" "/issuer/name" "value" "A"}]}))
        order         (atom [])
        ctx           {:db-pool fixtures/*db*}
        state         {:mode :edit :trigger-kind :edit
                       :history-id history-id :edited-by identity-id
                       :document {:document/id document-id :document/version 1
                                  :document/structured-data {:document-type "invoice"}}
                       :structured-data {:document-type "invoice"}
                       :changed-leaves #{[:issuer :name]}}
        runs          [(->ReadsProcessor :on-issuer
                        [(com.getorcha.workers.ap.processors.reads/read-leaf :issuer :name)]
                        order)
                       (->ReadsProcessor :on-line-items
                        [(com.getorcha.workers.ap.processors.reads/read-leaf :line-items :* :description)]
                        order)]]
    (engine/run-processors! ctx state [runs])
    (is (= [:on-issuer] @order) "on-line-items should be skipped")))

Run: clj -X:test:silent :nses '[com.getorcha.workers.ap.processors.engine-test]' 2>&1 | grep -E "FAIL|ERROR|Ran" Expected: PASS.

git add src/com/getorcha/workers/ap/processors/engine.clj src/com/getorcha/db/document_processor_run.clj test/com/getorcha/workers/ap/processors/engine_test.clj test/com/getorcha/db/document_processor_run_test.clj
git commit -m "feat: engine conditional filter (edit-mode reads intersection)"

Task A7: Engine op filter (user-edit protection)

Files:

Patch ops use STRING keys ({"op" "replace" "path" "..." "value" ...}) per the JSON-Patch convention used by json-patch/apply-patch and the edit handlers. Use (get op "path") to read path strings.

Replace A5's stub with:

(defn ^:private prefix-match?
  "Returns true when any prefix of `clj-path` (including itself) is
   in `user-edited-set`."
  [user-edited-set clj-path]
  (boolean (some user-edited-set
                 (reductions conj [] clj-path))))


(defn ^:private filter-ops-against-user-edits
  [user-edited-set processor-id ops]
  (reduce
   (fn [kept op]
     (let [pointer (get op "path")
           clj     (json-patch.path/pointer->clj-path pointer)]
       (if (prefix-match? user-edited-set clj)
         (do (log/info "Processor op blocked by prior user edit"
                       {:processor processor-id :path pointer})
             kept)
         (conj kept op))))
   []
   ops))

The json-patch.path require was added in A5; ensure it's still there. run-processor! already wires the filter in (from A5). run-processors! already precomputes user-edited-paths into ctx (from A5). Nothing further to wire.

Append to engine_test.clj:

(defrecord ^:private WriterProcessor [id ops]
  proc/IProcessor
  (-id         [_] id)
  (-reads      [_ _state] [])
  (-writes     [_ _state] [])
  (-diagnostic [_ _state] nil)
  (-modes      [_] #{:ingestion :edit})
  (-always?    [_] true)
  (-compute    [_ _ _] {:result nil :stats nil})
  (-apply-ops  [_ _ _] ops))


(deftest edit-mode-op-filter-drops-user-edited-paths
  (let [document-id (helpers/create-document! (helpers/create-legal-entity!))
        identity-id (helpers/create-identity!)
        _           (com.getorcha.db.document-history/insert!
                     fixtures/*db*
                     {:document-id document-id :change-type :edit
                      :edited-by identity-id
                      :patch [{"op" "replace" "path" "/issuer/name" "value" "UserValue"}]})
        state       {:mode :edit :trigger-kind :edit
                     :edited-by identity-id
                     :document {:document/id document-id :document/version 1
                                :document/structured-data {:document-type "invoice"
                                                           :issuer {:name "UserValue"}}}
                     :structured-data {:document-type "invoice" :issuer {:name "UserValue"}}
                     :changed-leaves #{[:issuer :name]}}
        p           (->WriterProcessor :overreacher
                                       [{"op" "replace" "path" "/issuer/name" "value" "ProcessorValue"}
                                        {"op" "replace" "path" "/issuer/country" "value" "DE"}])]
    (engine/run-processors! {:db-pool fixtures/*db*} state [[p]])
    (let [row (db.sql/execute-one! fixtures/*db*
                                   {:select [:structured-data]
                                    :from   [:document]
                                    :where  [:= :id document-id]})]
      (is (= "UserValue" (get-in row [:document/structured-data :issuer :name]))
          "user edit preserved")
      (is (= "DE" (get-in row [:document/structured-data :issuer :country]))
          "non-edited path accepts processor write"))))

Run: clj -X:test:silent :nses '[com.getorcha.workers.ap.processors.engine-test]' 2>&1 | grep -E "FAIL|ERROR|Ran" Expected: PASS.

git add src/com/getorcha/workers/ap/processors/engine.clj test/com/getorcha/workers/ap/processors/engine_test.clj
git commit -m "feat: engine op filter — preserves user edits in edit mode"

Task A8: Unified failure notification template

Files:

Write src/com/getorcha/notifications/processor.clj:

(ns com.getorcha.notifications.processor
  "Unified processor-failure notification payload. Replaces the
   per-processor :matching/permanent-failure and
   :reconciliation/failure kinds (see spec §8)."
  (:require [com.getorcha.notifications :as notifications]))


(defn notify!
  "Fire-and-forget admin notification for a processor failure.
   `ctx` includes :db-pool, :aws, :notifications.
   `trigger` is {:kind :edit|:ingestion|:manual :history-id _ :edited-by _}.
   `doc` is the failing document row."
  [{:keys [db-pool aws notifications] :as _ctx}
   {:keys [processor-id trigger document error]}]
  (try
    (let [le-id (:document/legal-entity-id document)]
      (when le-id
        (notifications/notify-admins!
         {:db-pool db-pool :aws aws :notifications notifications}
         {:legal-entity/id le-id}
         {:kind      :processor/failure
          :title     (str "Processor " (name processor-id) " failed: "
                          (or (:document/file-original-name document)
                              (str (:document/id document))))
          :body      (str "Document: " (:document/id document) "\n"
                          "Processor: " (name processor-id) "\n"
                          "Trigger: " (name (:kind trigger))
                          (when (:history-id trigger)
                            (str " (history-id " (:history-id trigger) ")"))
                          (when (:edited-by trigger)
                            (str ", edited-by " (:edited-by trigger)))
                          "\nError: " error)
          :data      {:document-id (:document/id document)
                      :processor   processor-id
                      :trigger     trigger}})))
    (catch Exception e
      (clojure.tools.logging/warn e "Failed to send processor failure notification"
                                   {:processor processor-id
                                    :document  (:document/id document)}))))

In engine.clj, update the catch block of run-processor!:

(catch Throwable t
  (db.run/fail-run! db-pool run-id (.getMessage t))
  (com.getorcha.notifications.processor/notify!
    ctx
    {:processor-id (proc/-id processor)
     :trigger      {:kind      (:trigger-kind state)
                    :history-id (:history-id state)
                    :edited-by  (:edited-by state)
                    :ingestion-id (:ingestion-id state)}
     :document     (:document state)
     :error        (.getMessage t)})
  (log/error t "Processor failed"
             {:processor (proc/-id processor)
              :document  (get-in state [:document :document/id])})
  [])
clj-kondo --lint src/com/getorcha/notifications src/com/getorcha/workers/ap/processors

Expected: clean.

git add src/com/getorcha/notifications src/com/getorcha/workers/ap/processors/engine.clj
git commit -m "feat: unified :processor/failure notification"

Phase B: Migrate post-processors

For every existing post-processor, same pattern: create a new v2 IProcessor under processors/, port the logic, add reads/writes declarations, ops-based -apply-ops, keep the existing test fixtures intact (ported under processors/*_test.clj). The old post_process/* record still exists and still works — the migration is additive. The old post-process/run is rewritten in Phase D.

Note (schema alignment): All -writes and -apply-ops paths in this phase target existing top-level LineItem / document fields — no schema changes are made as part of this plan. Earlier drafts of this plan referenced synthetic nesting (/line-items[id=X]/accounts/debit-account etc.); those have been rewritten to match the actual schema in src/com/getorcha/schema/* as of 2026-04-16.

Each task follows this template:

  1. Create the new processor file, copying logic from old post_process file.
  2. Restructure -compute from (-compute [_]) (closing over ingestion) to (-compute [_ ctx state]).
  3. Convert -apply (structured-data → structured-data) to -apply-ops (state × result → [ops]).
  4. Declare -id, -reads, -writes, -diagnostic, -modes, -always?.
  5. Port/write test.
  6. Commit.

Task B1: accounts

Files:

Reference source: src/com/getorcha/workers/ap/ingestion/post_process/accounts.clj. Copy its internal helpers (prompt building, LLM parsing, batched invocation) verbatim. Change only:

  1. The defrecord becomes (defrecord AccountsMatcher [] (no fields — state passed to -compute).
  2. Add factory (->accounts) returning an instance.
  3. Add reads/writes declarations per §12.

Write src/com/getorcha/workers/ap/processors/accounts.clj. Start by copying post_process/accounts.clj to the new path, then:

(defrecord AccountsMatcher []
  proc/IProcessor
  (-id         [_] :accounts)
  (-reads      [_ _state] [(reads/read-leaf :issuer :name)
                           (reads/read-leaf :issuer :vat-id)
                           (reads/read-leaf :issuer :country)
                           (reads/read-leaf :line-items :* :description)
                           (reads/read-leaf :line-items :* :amount)])
  (-writes     [_ _state] [(reads/read-leaf :line-items :* :debit-account)
                           (reads/read-leaf :line-items :* :credit-account)])
  (-diagnostic [_ _state] nil)
  (-modes      [_] #{:ingestion :edit})
  (-always?    [_] false)

  (-compute    [_ ctx state]
    ;; body from old -compute, with `context` replaced by `ctx`
    ;; and `ingestion` replaced by `state`. :structured-data/:document/:legal-entity
    ;; keys are the same on the new state map.
    ;; <inline the existing -compute body here>
    )

  (-apply-ops  [_ _state result]
    (if (nil? result)
      []
      ;; Result shape: {:line-items [{:id X :debit-account N :credit-account M} ...]}
      ;; Turn each proposed account into two replace ops.
      (mapcat (fn [{:keys [id debit-account credit-account]}]
                (cond-> []
                  debit-account
                  (conj {"op"    "replace"
                         "path"  (str "/line-items[id=" id "]/debit-account")
                         "value" debit-account})
                  credit-account
                  (conj {"op"    "replace"
                         "path"  (str "/line-items[id=" id "]/credit-account")
                         "value" credit-account})))
              (:line-items result)))))


(defn ->accounts [] (->AccountsMatcher))

The inline -compute body (from old -compute) reads context and ingestion. Replace context with ctx, replace ingestion with state. The old code references (:structured-data ingestion), (:document ingestion), (:legal-entity ingestion) — these keys still live on state. The old code also expects booking-csv to be passed in as a record field; under the new model the engine computes it once per run and threads it via state (engine assignment: state.booking-csv). Adjust the -compute body to read (:booking-csv state) instead of the old record field.

Copy test/com/getorcha/workers/ap/ingestion/post_process/accounts_test.clj to test/com/getorcha/workers/ap/processors/accounts_test.clj and update:

(deftest accounts-reads-declared
  (let [p (accounts/->accounts)]
    (is (some #(= (reads/read-leaf :line-items :* :description) %) (proc/-reads p)))
    (is (some #(= (reads/read-leaf :issuer :name) %) (proc/-reads p)))))
clj -X:test:silent :nses '[com.getorcha.workers.ap.processors.accounts-test]' 2>&1 | grep -E "FAIL|ERROR|Ran"

Expected: PASS.

git add src/com/getorcha/workers/ap/processors/accounts.clj test/com/getorcha/workers/ap/processors/accounts_test.clj
git commit -m "feat: migrate accounts processor to IProcessor v2"

Task B2: cost-center

Follow the same pattern as Task B1. (All -reads/-writes/-diagnostic methods take state as their second argument per the v2 protocol.)

Files:

Declarations (from spec §12):

(-reads  [_ _state] [(reads/read-leaf :issuer :name)
                     (reads/read-leaf :line-items :* :description)
                     (reads/read-leaf :line-items :* :amount)])
(-writes [_ _state] [(reads/read-leaf :line-items :* :cost-center)])

-apply-ops maps result's per-line-item cost-center into one replace op per line-item. Commit: git commit -m "feat: migrate cost-center processor to IProcessor v2".

Task B3: accruals

Same pattern.

Files:

Declarations:

(-reads  [_ _state] [(reads/read-leaf :invoice-date)
                     (reads/read-leaf :line-items :* :description)])
(-writes [_ _state] [(reads/read-leaf :line-items :* :accrual)])

Commit: git commit -m "feat: migrate accruals processor to IProcessor v2".

Task B4: supplier-matcher

Files:

Declarations:

SupplierMatcher:

(-id     [_] :supplier-matcher)
(-reads  [_ _state] [(reads/read-leaf :issuer :name)
                     (reads/read-leaf :issuer :vat-id)
                     (reads/read-leaf :issuer :iban)])
(-writes [_ _state] [(reads/read-leaf :supplier-match)])

SupplierVerifier:

(-id     [_] :supplier-verifier)
(-reads  [_ _state] [(reads/read-leaf :issuer :name)
                     (reads/read-leaf :issuer :vat-id)
                     (reads/read-leaf :issuer :country)
                     (reads/read-leaf :issuer :address)])
(-writes [_ _state] [(reads/read-leaf :supplier-verification-id)])

Commit: git commit -m "feat: migrate supplier processors to IProcessor v2".

Task B5: tax-compliance-analyzer

TCA is the MULTI-SLICE processor: it owns :tax-issues (invoice-level) and :line-items (per-line :vat-validation) diagnostic slices, plus structured-data mutations for :service-category, per-line :bu-code, and (ingestion-only) tax-id correction. See spec §13.2.

The previously-added tax-compliance/run-vat-validation shim (which only extracted :vat-validation off line items after TCA wrote it there) is deleted entirely; TCA writes :line-items diagnostic slice directly.

Files:

Declarations:

(-id     [_] :tax-compliance-analyzer)
(-reads  [_ _state]
  [(reads/read-leaf :issuer :country)
   (reads/read-leaf :issuer :tax-id)
   (reads/read-leaf :issuer :tax-id-type)
   (reads/read-leaf :recipient :country)
   (reads/read-leaf :recipient :tax-id-type)
   (reads/read-leaf :shipping-country)
   (reads/read-leaf :line-items :* :tax-rate)
   (reads/read-leaf :line-items :* :description)
   (reads/read-leaf :delivery-terms-raw)
   (reads/read-leaf :incoterm-code)
   (reads/read-leaf :compliance-statements :* :text)])
(-writes [_ _state]
  [(reads/read-leaf :service-category)
   (reads/read-leaf :line-items :* :bu-code)
   (reads/read-leaf :issuer :tax-id)          ;; :ingestion-only branch
   (reads/read-leaf :issuer :tax-id-type)])
(-diagnostic [_ _state] [{:slice :tax-issues} {:slice :line-items}])

In the ported -compute, wrap the existing (if (and no-vat? tax-id-warn?) … (if (…) …)) dispatch in (case (:mode state) :ingestion <vision-or-standard> :edit <standard-only>). Do NOT call the tax-id-correction-section path in :edit mode. Do NOT fetch PDF bytes in :edit mode.

TCA's existing -compute reads :tax-id-format status from structured-data.validation-results. Under the unified model, this lives at state.diagnostics.validations.tax-id-format. Update the lookup:

(let [tax-id-result (get-in state [:diagnostics :validations :tax-id-format])
      tax-id-warn?  (= "warning" (:status tax-id-result))]
  ...)

state.diagnostics is refreshed between phases by the engine (A5 refresh-diagnostics), so TCA (phase 2) sees validations' phase-1 write.

The existing LLM schema produces:

{:service-category {...}
 :line-items       [{:vat-validation {...} :bu-code {...}} ...]
 :issues           [{...} ...]
 :tax-id-correction {...}}

TCA's -compute returns:

{:tax-issues      <vector of TaxIssue maps from `:issues`>
 :line-items      <map keyed by line-item id: {id {:vat-validation {...}}}>
 :service-category <unchanged>
 :bu-codes        <map keyed by line-item id: {id {...}}>   ;; not a slice — used by -apply-ops
 :tax-id-correction <unchanged>}

:tax-issues and :line-items are diagnostic-slice top-level keys (matched against -diagnostic returns); engine writes them. :service-category, :bu-codes, :tax-id-correction are processor-internal and consumed by -apply-ops.

Restructuring happens inside -compute after LLM parse: transform the raw LLM :line-items vector into a map keyed by the line-item's :id (which TCA must carry through — the orig LLM output doesn't include line-item ids, so merge against input line-items by order):

(let [raw-line-items    (:line-items llm-result)
      input-line-items  (get-in state [:structured-data :line-items])
      by-id             (into {}
                              (map (fn [input output]
                                     [(:id input)
                                      (cond-> {}
                                        (:vat-validation output)
                                        (assoc :vat-validation (:vat-validation output)))])
                                   input-line-items
                                   (concat raw-line-items (repeat nil))))
      bu-codes          (into {}
                              (keep (fn [input output]
                                      (when-let [bu (:bu-code output)]
                                        [(:id input) bu]))
                                    input-line-items
                                    (concat raw-line-items (repeat nil))))]
  {:result {:tax-issues       (format-tax-issues (:issues llm-result))
            :line-items       by-id
            :service-category (:service-category llm-result)
            :bu-codes         bu-codes
            :tax-id-correction (:tax-id-correction llm-result)}
   :stats  llm-stats})
(-apply-ops [_ state result]
  (when result
    (let [mode         (:mode state)
          ops          (cond-> []
                         (:service-category result)
                         (conj {"op" "replace" "path" "/service-category"
                                "value" (:service-category result)}))
          bu-ops       (mapv (fn [[id bu]]
                               {"op" "replace"
                                "path" (str "/line-items[id=" id "]/bu-code")
                                "value" bu})
                             (:bu-codes result))
          corr         (:tax-id-correction result)
          corr-ops     (when (and (= mode :ingestion)
                                  corr
                                  (= "corrected" (:status corr))
                                  (:tax-id corr))
                         [{"op" "replace" "path" "/issuer/tax-id"
                           "value" (:tax-id corr)}
                          {"op" "replace" "path" "/issuer/tax-id-type"
                           "value" (:tax-id-type corr)}])]
      (into (into ops bu-ops) corr-ops))))

Commit: git commit -m "feat: migrate tax-compliance-analyzer to IProcessor v2 (multi-slice)".

Task B6: financial-validation-resolver

Files:

This processor owns :validations.financial-math sub-path for invoice documents (the only doc type with a financial-math check). Its -compute:

  1. Calls the deterministic validation/check-financial-math (pure function, stays in ingestion/validation.clj).
  2. If :status != "pass", calls the existing LLM resolver to refine the uncertainty.
  3. Returns {:status ... :details ...} — the final resolved state of the financial-math sub-path.

Declarations:

(-id         [_] :financial-validation-resolver)
(-reads      [_ _state] [(reads/read-leaf :subtotal)
                         (reads/read-leaf :total)
                         (reads/read-leaf :tax-amount)
                         (reads/read-leaf :line-items :* :amount)
                         (reads/read-leaf :line-items :* :quantity)
                         (reads/read-leaf :line-items :* :unit-price)])
(-writes     [_ _state] [])   ;; writes only to the diagnostic slice
(-diagnostic [_ state]
  (when (= "invoice" (get-in state [:document :document/structured-data :document-type]))
    {:slice :validations :sub-path [:financial-math]}))
(-modes      [_] #{:ingestion :edit})

-diagnostic returns nil for non-invoice document types, so the engine skips the slice write. This is the per-type dispatch pattern.

(-compute [_ ctx state]
  (let [sd               (:structured-data state)
        deterministic    (validation/check-financial-math sd)]
    (if (= "pass" (:status deterministic))
      {:result deterministic :stats nil}
      ;; Call existing LLM refinement — returns {:result refined :stats llm-stats}
      (run-llm-resolver ctx state deterministic))))

(run-llm-resolver is the existing resolver logic from the old FinancialValidationResolver defrecord; port it as a private fn.)

Commit: git commit -m "feat: migrate financial-validation-resolver to IProcessor v2".

Task B7: uncertain-validations-resolver

Files:

Owns :validations.required-fields, :validations.date-reasonableness, :validations.recipient-identity sub-paths for invoice documents.

Uses the multi-sub-path -diagnostic form (engine's write-diagnostic! handles this — see A5). -compute's result has three top-level keys matching the sub-paths' first elements:

{:result {:required-fields     {:status "pass"}
          :date-reasonableness {:status "warning" ...}
          :recipient-identity  {:status "pass"}}
 :stats nil}

Engine iterates :sub-paths, writing each with value (get result (first sub-path)).

Declarations:

(-id         [_] :uncertain-validations-resolver)
(-reads      [_ _state] [(reads/read-leaf :issuer :name)
                         (reads/read-leaf :issuer :address)
                         (reads/read-leaf :recipient :name)
                         (reads/read-leaf :recipient :address)
                         (reads/read-leaf :invoice-date)
                         (reads/read-leaf :invoice-number)])
(-writes     [_ _state] [])
(-diagnostic [_ state]
  (when (= "invoice" (get-in state [:document :document/structured-data :document-type]))
    {:slice :validations
     :sub-paths [[:required-fields]
                 [:date-reasonameness]
                 [:recipient-identity]]}))

The multi-sub-path shape was already wired into the engine in Task A5; no engine change needed here.

Pull logic from old UncertainValidationsResolver defrecord. The old -compute starts from structured-data.validation-results to find uncertain checks. Under the new model, read those deterministic statuses from state.structured-data using the pure check fns:

(-compute [_ ctx state]
  (let [sd (:structured-data state)
        le (:legal-entity state)
        det-required  (validation/check-required-fields sd)
        det-dates     (validation/check-date-reasonableness sd)
        det-identity  (validation/check-recipient-identity sd le)
        ;; For each uncertain check, run the existing LLM refinement; else use det as-is.
        result        {:required-fields     (if (= "uncertain" (:status det-required))
                                              (resolve-required-fields ctx state det-required)
                                              det-required)
                       :date-reasonableness (if (= "uncertain" (:status det-dates))
                                              (resolve-dates ctx state det-dates)
                                              det-dates)
                       :recipient-identity  (if (= "uncertain" (:status det-identity))
                                              (resolve-identity ctx state det-identity)
                                              det-identity)}]
    {:result result :stats (merge-llm-stats ...)}))

(Port the existing resolve-* helpers from the old uncertain_validations.clj into private fns in the new ns.)

Commit: git commit -m "feat: migrate uncertain-validations-resolver to IProcessor v2".

Task B8: fraud-detector

Files:

Declarations:

(-id         [_] :fraud-detector)
(-reads      [_ _state] [(reads/read-leaf :issuer :name)
                         (reads/read-leaf :issuer :country)
                         (reads/read-leaf :issuer :vat-id)
                         (reads/read-leaf :issuer :tax-id)
                         (reads/read-leaf :issuer :iban)
                         (reads/read-leaf :issuer :account-number)
                         (reads/read-leaf :issuer :sort-code)
                         (reads/read-leaf :issuer :routing-number)
                         (reads/read-leaf :issuer :bsb)
                         (reads/read-leaf :recipient :country)
                         (reads/read-leaf :invoice-date)
                         (reads/read-leaf :line-items :* :description)])
(-writes     [_ _state] [])
(-diagnostic [_ _state] {:slice :fraud-flags})
(-compute [_ ctx state]
  (let [deterministic (run-deterministic-checks (:db-pool ctx) state)
        llm-out       (run-llm-fraud-analysis ctx state)
        llm-flags     (when (:result llm-out) (llm-result->flags (:result llm-out)))]
    {:result (into deterministic (or llm-flags []))
     :stats  (:stats llm-out)}))

Commit: git commit -m "feat: migrate fraud-detector to IProcessor v2".

Task B9: validations (new processor)

Files:

The validations processor owns the deterministic :validations sub-paths. Its ownership varies per doc type (see spec §12.1):

Always runs in edit mode (-always? true).

In src/com/getorcha/workers/ap/ingestion/validation.clj, remove ^:private from check-tax-id-format, check-iban, check-issuer-country, check-recipient-country, check-large-document-summary-only. (The others — check-financial-math, check-required-fields, check-date-reasonableness, check-recipient-identity, and all check-contract-* helpers — are already public.)

(ns com.getorcha.workers.ap.processors.validations
  (:require [com.getorcha.workers.ap.ingestion.validation :as validation]
            [com.getorcha.workers.ap.processors :as proc]
            [com.getorcha.workers.ap.processors.reads :as reads]))


(defn ^:private compute-invoice
  [sd legal-entity]
  {:tax-id-format       (validation/check-tax-id-format sd)
   :iban-format         (validation/check-iban sd)
   :issuer-country      (validation/check-issuer-country sd)
   :recipient-country   (validation/check-recipient-country sd)
   :large-document-summary-only
   (when (:summary-page-range sd)
     (validation/check-large-document-summary-only sd))})


(defn ^:private compute-contract
  [sd]
  {:signature-presence    (validation/check-contract-signature-presence sd)
   :required-fields       (validation/check-contract-required-fields sd)
   :date-validity         (validation/check-contract-date-validity sd)
   :party-identification  (validation/check-contract-party-identification sd)
   :financial-consistency (validation/check-contract-financial-consistency sd)
   :termination-clause    (validation/check-contract-termination-clause sd)})


(defn ^:private compute-purchase-order
  [sd]
  {:required-fields
   (if (and (seq (:po-number sd)) (seq (get-in sd [:supplier :name])))
     {:status "pass"}
     {:status "error"
      :message "Missing required fields: po-number and/or supplier name"})})


(defn ^:private compute-grn
  [sd]
  {:required-fields
   (if (or (seq (:grn-number sd)) (seq (:delivery-note-numbers sd)))
     {:status "pass"}
     {:status "error"
      :message "Missing required field: grn-number or delivery-note-numbers"})})


(def ^:private invoice-sub-paths
  [[:tax-id-format] [:iban-format] [:issuer-country]
   [:recipient-country] [:large-document-summary-only]])


(def ^:private contract-sub-paths
  [[:signature-presence] [:required-fields] [:date-validity]
   [:party-identification] [:financial-consistency] [:termination-clause]])


(defrecord Validations []
  proc/IProcessor
  (-id         [_] :validations)
  (-reads      [_ _state] [])
  (-writes     [_ _state] [])
  (-diagnostic [_ state]
    (let [doc-type (get-in state [:document :document/structured-data :document-type])]
      (case doc-type
        "invoice"              {:slice :validations :sub-paths invoice-sub-paths}
        "contract"             {:slice :validations :sub-paths contract-sub-paths}
        "purchase-order"       {:slice :validations :sub-paths [[:required-fields]]}
        "goods-received-note"  {:slice :validations :sub-paths [[:required-fields]]}
        nil)))
  (-modes      [_] #{:ingestion :edit})
  (-always?    [_] true)
  (-compute    [_ _ctx state]
    (let [sd       (:structured-data state)
          doc-type (:document-type sd)
          le       (:legal-entity state)]
      {:result (case doc-type
                 "invoice"              (compute-invoice sd le)
                 "contract"             (compute-contract sd)
                 "purchase-order"       (compute-purchase-order sd)
                 "goods-received-note"  (compute-grn sd)
                 {})
       :stats nil}))
  (-apply-ops  [_ _state _result] []))


(defn ->validations [] (->Validations))


(defn ->validations [] (->Validations))

Commit: git commit -m "feat: validations processor (always-run deterministic checks)".


Phase C: Matching subsystem

Relocate matching files under processors/matching/* and build the unified matching processor that wraps match-document! + reconcile-cluster!. Rename publish-document-ready!.

Task C1: Move matching files to new location

Files:

git mv src/com/getorcha/workers/ap/matching src/com/getorcha/workers/ap/processors/matching
git mv test/com/getorcha/workers/ap/matching test/com/getorcha/workers/ap/processors/matching

Global replace in every moved file:

com.getorcha.workers.ap.matching.core           → com.getorcha.workers.ap.processors.matching.core
com.getorcha.workers.ap.matching.candidates     → com.getorcha.workers.ap.processors.matching.candidates
com.getorcha.workers.ap.matching.evidence       → com.getorcha.workers.ap.processors.matching.evidence
com.getorcha.workers.ap.matching.llm-decision   → com.getorcha.workers.ap.processors.matching.llm-decision
com.getorcha.workers.ap.matching.normalize      → com.getorcha.workers.ap.processors.matching.normalize
com.getorcha.workers.ap.matching.reconciliation → com.getorcha.workers.ap.processors.matching.reconciliation
com.getorcha.workers.ap.matching.searchable-text → com.getorcha.workers.ap.processors.matching.searchable-text
com.getorcha.workers.ap.matching.worker         → com.getorcha.workers.ap.processors.matching.worker

Use sed-style global replace, but via Edit tool per file, to be safe.

Grep for com.getorcha.workers.ap.matching outside the moved dir:

grep -rln "com.getorcha.workers.ap.matching" src test resources dev --include="*.clj" --include="*.edn"

For each match outside the new dir, update the ns to processors.matching.*. Known callers:

clj-kondo --lint src/com/getorcha/workers/ap/processors/matching test
clj -X:test:silent :nses '[com.getorcha.workers.ap.processors.matching.worker-test]' 2>&1 | grep -E "FAIL|ERROR|Ran"

Expected: clean lint, tests pass.

git add -A
git commit -m "refactor: move matching namespaces under processors.matching"

Task C2: Introduce the matching IProcessor wrapper

Files:

Write src/com/getorcha/workers/ap/processors/matching.clj:

(ns com.getorcha.workers.ap.processors.matching
  "The matching IProcessor. Wraps match-document! + reconcile-cluster!
   (reconciliation is a sub-step of matching under the unified model —
   see spec §12.2)."
  (:require [clojure.tools.logging :as log]
            [com.getorcha.db.document-matching :as db.matching]
            [com.getorcha.db.sql :as db.sql]
            [com.getorcha.workers.ap.processors :as proc]
            [com.getorcha.workers.ap.processors.matching.core :as matching.core]
            [com.getorcha.workers.ap.processors.matching.reconciliation :as reconciliation]
            [com.getorcha.workers.ap.processors.reads :as reads]))


(defn ^:private build-matching-summary
  [db-pool document-id]
  (let [match-rows (db.matching/get-matches-for-document db-pool document-id)]
    {:matches
     (mapv (fn [{:ap-document-match/keys [document-a-id document-b-id
                                          blended-score llm-confidence match-method]}]
             (cond-> {:document-id   (str (if (= document-a-id document-id)
                                            document-b-id
                                            document-a-id))
                      :blended-score (double blended-score)
                      :match-method  match-method}
               llm-confidence (assoc :llm-confidence llm-confidence)))
           match-rows)}))


(defn ^:private get-cluster-id
  [db document-id]
  (:document/cluster-id
   (db.sql/execute-one! db
                        {:select [:cluster-id]
                         :from   [:document]
                         :where  [:= :id document-id]})))


(def ^:private reads-by-doc-type
  {"invoice"
   [(reads/read-leaf :issuer :name)
    (reads/read-leaf :issuer :vat-id)
    (reads/read-leaf :issuer :iban)
    (reads/read-leaf :invoice-number)
    (reads/read-leaf :total)
    (reads/read-leaf :currency)
    (reads/read-leaf :line-items :* :description)
    (reads/read-leaf :line-items :* :quantity)
    (reads/read-leaf :line-items :* :unit)
    (reads/read-leaf :po-references :*)
    (reads/read-leaf :gr-references :*)
    (reads/read-leaf :service-period :start)
    (reads/read-leaf :service-period :end)]

   "purchase-order"
   [(reads/read-leaf :supplier :name)
    (reads/read-leaf :supplier :vat-id)
    (reads/read-leaf :po-number)
    (reads/read-leaf :total-value)
    (reads/read-leaf :currency)
    (reads/read-leaf :line-items :* :description)
    (reads/read-leaf :line-items :* :quantity)
    (reads/read-leaf :line-items :* :unit)
    (reads/read-leaf :contract-references :*)
    (reads/read-leaf :requisition-numbers :*)]

   "contract"
   [(reads/read-leaf :counterparty :name)
    (reads/read-leaf :counterparty :tax-id)
    (reads/read-leaf :contract-number)
    (reads/read-leaf :total-value)
    (reads/read-leaf :currency)
    (reads/read-leaf :deliverables :*)]

   "goods-received-note"
   [(reads/read-leaf :supplier :name)
    (reads/read-leaf :supplier :vat-id)
    (reads/read-leaf :grn-number)
    (reads/read-leaf :line-items :* :description)
    (reads/read-leaf :line-items :* :quantity)
    (reads/read-leaf :line-items :* :unit)
    (reads/read-leaf :po-references :*)
    (reads/read-leaf :delivery-note-numbers :*)]})


(defrecord Matching []
  proc/IProcessor
  (-id         [_] :matching)
  (-reads      [_ state]
    (let [doc-type (get-in state [:document :document/structured-data :document-type])]
      (get reads-by-doc-type doc-type [])))
  (-writes     [_ _state] [])  ; side effects on document_match / cluster; no structured-data ops
  (-diagnostic [_ _state] {:slice :matching})
  ;; NOTE: we declare only :matching as the engine-written slice.
  ;; reconcile-cluster! writes the per-doc :reconciliation slice for
  ;; EACH cluster member itself (side effect outside the engine —
  ;; spec §12.3). Matching's -compute result therefore omits
  ;; reconciliation; the slice is already populated once the call
  ;; returns.
  (-modes      [_] #{:ingestion :edit})
  (-always?    [_] false)
  (-compute    [_ {:keys [db-pool llm-config search-config] :as _ctx} state]
    (let [doc            (:document state)
          document-id    (:document/id doc)
          cluster-before (:document/cluster-id doc)]
      ;; Side effect: persists matches + cluster assignment
      (matching.core/match-document! db-pool search-config (:matching llm-config) doc)
      (let [summary       (build-matching-summary db-pool document-id)
            cluster-after (get-cluster-id db-pool document-id)
            affected      (cond-> #{}
                            cluster-before (conj cluster-before)
                            cluster-after  (conj cluster-after))]
        (doseq [cid affected]
          (try
            (reconciliation/reconcile-cluster! db-pool
                                               (:matching llm-config)
                                               cid
                                               (:document/legal-entity-id doc))
            (catch Exception e
              (when-not (= :com.getorcha.workers.ap.processors.matching.reconciliation/insufficient-documents
                           (:kind (ex-data e)))
                (log/warn e "Reconciliation failed inside matching processor"
                          {:cluster-id cid :document-id document-id})))))
        (db.matching/set-matching-status! db-pool document-id {:status "succeeded"})
        {:result summary :stats nil})))
  (-apply-ops  [_ _state _result] []))


(defn ->matching [] (->Matching))

Write test/com/getorcha/workers/ap/processors/matching_test.clj:

(ns com.getorcha.workers.ap.processors.matching-test
  (:require [clojure.test :refer [deftest is testing]]
            [com.getorcha.workers.ap.processors :as proc]
            [com.getorcha.workers.ap.processors.matching :as matching]
            [com.getorcha.workers.ap.processors.reads :as reads]))


(deftest matching-declarations
  (let [p      (matching/->matching)
        state  {:document {:document/structured-data {:document-type "invoice"}}}]
    (is (= :matching (proc/-id p)))
    (is (= {:slice :matching} (proc/-diagnostic p state)))
    (is (false? (proc/-always? p)))
    (is (some #(= (reads/read-leaf :issuer :name) %) (proc/-reads p state)))))


(deftest matching-reads-dispatch-on-doc-type
  (let [p (matching/->matching)]
    (is (some #(= (reads/read-leaf :counterparty :name) %)
              (proc/-reads p {:document {:document/structured-data {:document-type "contract"}}})))
    (is (some #(= (reads/read-leaf :po-number) %)
              (proc/-reads p {:document {:document/structured-data {:document-type "purchase-order"}}})))))

Keep existing integration tests (the ones in the moved matching/*_test.clj files) untouched; they exercise match-document! + reconcile-cluster! directly.

git add src/com/getorcha/workers/ap/processors/matching.clj test/com/getorcha/workers/ap/processors/matching_test.clj
git commit -m "feat: matching IProcessor (absorbs reconciliation)"

Task C3: Rename publish-document-ready! and move to matching/queue.clj

Files:

Write src/com/getorcha/workers/ap/processors/matching/queue.clj:

(ns com.getorcha.workers.ap.processors.matching.queue
  "SQS enqueue helper for pending matching runs. Called at the tail
   of ingestion to hand off from the ingestion worker to the matching
   worker."
  (:require [clojure.tools.logging :as log]
            [com.getorcha.aws :as aws]
            [com.getorcha.db.document-matching :as db.matching])
  (:import (software.amazon.awssdk.services.sqs SqsClient)))


(defn enqueue!
  "Sets document's matching_status to `pending` and publishes the
   document id to the matching SQS queue."
  [{:keys [^SqsClient sqs-client aws db-pool] :as _ctx} document-id]
  (when-let [queue-url (get-in aws [:queue-urls :matching])]
    (log/info "Enqueuing document for matching" {:document-id document-id})
    (db.matching/set-matching-status! db-pool document-id {:status "pending"})
    (aws/send-message! sqs-client queue-url (str document-id))))

In src/com/getorcha/workers/ap/ingestion.clj, replace the publish-document-ready! defn with a require alias and update the call site at line 1031:

Remove the old defn (lines ~582-589). Add to require block:

[com.getorcha.workers.ap.processors.matching.queue :as matching.queue]

Change the call (currently (publish-document-ready! context (:document/id document))) to:

(matching.queue/enqueue! context (:document/id document))
clj-kondo --lint src/com/getorcha/workers/ap src/com/getorcha/workers/ap/ingestion.clj
clj -X:test:silent :nses '[com.getorcha.workers.ap.ingestion-test]' 2>&1 | grep -E "FAIL|ERROR|Ran"

Expected: pass.

git add src/com/getorcha/workers/ap/processors/matching/queue.clj src/com/getorcha/workers/ap/ingestion.clj
git commit -m "refactor: rename publish-document-ready! to matching.queue/enqueue!"

Phase D: Rewrite call sites

Replace the three legacy orchestrators with calls to run-processors!.

Task D1: Rewrite ingestion to use run-processors!

Files:

Pipeline change to be aware of: today ingestion runs two separate stages — with-validations (deterministic checks, writes :validation-results on structured-data) then post-process/run (nine processors that can mutate structured-data). Both stages collapse into ONE engine call in this task. The new pipeline is:

transcribe → classify → extract →
  run-processors! [validations] [post-procs…] [fraud] →
  complete

Phase 1 = validations (always-run, deterministic). Phase 2 = bulk of post-processors. Phase 3 = fraud (sees phase-2 corrections). The old validate multimethod is no longer called; its contents distribute across: validations (tax-id-format, iban-format, country checks, large-document; plus the contract/PO/GRN checks), financial-validation-resolver (invoice financial-math), and uncertain-validations-resolver (invoice required-fields, date-reasonableness, recipient-identity). Pure check-* functions stay in ingestion/validation.clj — only the validate multimethod and with-validations wrapper go away. There is no separate vat-validation processor — tax-compliance-analyzer writes the :line-items diagnostic slice directly (spec §13.2).

In ingestion.clj, add require imports:

[com.getorcha.workers.ap.processors.accounts :as accounts]
[com.getorcha.workers.ap.processors.accruals :as accruals]
[com.getorcha.workers.ap.processors.cost-center :as cost-center]
[com.getorcha.workers.ap.processors.engine :as engine]
[com.getorcha.workers.ap.processors.financial-validation :as financial-validation]
[com.getorcha.workers.ap.processors.fraud :as fraud]
[com.getorcha.workers.ap.processors.supplier :as supplier]
[com.getorcha.workers.ap.processors.tax-compliance :as tax-compliance]
[com.getorcha.workers.ap.processors.uncertain-validations :as uncertain-validations]
[com.getorcha.workers.ap.processors.validations :as validations]

Find the section that calls (post-process/run context ingestion) and the earlier with-validations (validates + post-processes). Replace with:

(defn ^:private run-diagnostics!
  [context ingestion]
  (let [state (-> ingestion
                  (assoc :mode :ingestion)
                  (assoc :trigger-kind :ingestion)
                  (assoc :ingestion-id (:id ingestion)))]
    (engine/run-processors!
      context
      state
      [[(validations/->validations)]
       [(accounts/->accounts)
        (cost-center/->cost-center)
        (accruals/->accruals)
        (supplier/->supplier-matcher)
        (supplier/->supplier-verifier)
        (tax-compliance/->tax-compliance-analyzer)
        (financial-validation/->financial-validation-resolver)
        (uncertain-validations/->uncertain-validations-resolver)]
       [(fraud/->fraud-detector)]])))

Delete with-validations and post-process! functions.

The old code read structured-data keys like :validation-results, :fraud-flags, etc. Those keys no longer exist on structured-data under the new model — they're in diagnostics via per-processor slice writes. Remove the build-diagnostics + stripped-data dance; the diagnostics slice has already been populated by processors during run-diagnostics!.

Before:

(let [vat-validations  (tax-compliance/run-vat-validation structured-data)
      diagnostics      (build-diagnostics {:validation-results ...
                                            :fraud-flags ...
                                            :tax-issues ...
                                            :vat-validations vat-validations})
      stripped-data    (-> structured-data (dissoc ...) (update :line-items ...))
      needs-review?    (compute-needs-human-review diagnostics)]
  (db.sql/execute-one! tx
                       {:update :document
                        :set    {:structured-data [:lift stripped-data]
                                 :diagnostics [:cast (json/generate-string diagnostics) :jsonb]
                                 ...}})
  ...)

After:

;; structured-data is already stripped (no :validation-results / :fraud-flags /
;; :tax-issues / :vat-validation on line items) because processors never wrote
;; them there. diagnostics was written slice-by-slice during run-diagnostics!.
;; Only the document row's structured-data, type, version, updated-at need
;; updating here.
(db.sql/execute-one!
  tx
  {:update :document
   :set    {:structured-data    [:lift structured-data]
            :type               (db.sql/->cast (:document-type structured-data) :document-type)
            :version            [:raw "version + 1"]
            :updated-at         [:now]}
   :where  [:= :id document-id]})

document.needs_human_review is recomputed inside update-diagnostic! after each slice write, so no explicit recomputation here.

rm src/com/getorcha/workers/ap/ingestion/post_process.clj
rm -rf src/com/getorcha/workers/ap/ingestion/post_process
rm -rf test/com/getorcha/workers/ap/ingestion/post_process

The processors (Task B6/B7/B9) call the pure check-* functions directly and own their sub-paths. The validate multimethod (and the annotate-line-items helper if it was only used by with-validations) are orphaned.

In src/com/getorcha/workers/ap/ingestion/validation.clj:

In src/com/getorcha/workers/ap/ingestion.clj, delete the with-validations private fn (it called validation/validate on every ingestion — the new pipeline runs validations processor as phase 1 instead).

clj-kondo --lint src/com/getorcha/workers/ap/ingestion.clj
clj -X:test:silent :nses '[com.getorcha.workers.ap.ingestion-test]' 2>&1 | grep -E "FAIL|ERROR|Ran"

Expected: pass.

git add -A
git commit -m "refactor: ingestion uses unified processors engine"

Task D2: Rewrite matching worker to use run-processors!

Files:

The current body does: fetch doc → set matching-status in-progress → insert run row → call match-document! → build summary → update-slice! → complete-run! → reconcile-cluster! per affected → set matching-status succeeded → notify on fail.

Under the unified engine, most of that is absorbed. New body:

(defn ^:private process-document!
  [{:keys [db-pool] :as context} document-id]
  (let [doc (fetch-document db-pool document-id)]
    (when (nil? doc)
      (throw (ex-info "Document not found for matching"
                      {:document-id document-id :kind ::document-not-found})))
    (when (nil? (:document/structured-data doc))
      (throw (ex-info "Document missing structured data, skipping matching"
                      {:document-id document-id :kind ::missing-structured-data})))
    (mdc/put! :legal-entity-id (:document/legal-entity-id doc))
    (xray/add-annotation! "legal_entity_id" (str (:document/legal-entity-id doc)))
    (if-not (candidates/matchable-type? (:document/type doc))
      (do
        (log/info "Document type does not participate in matching, marking skipped"
                  {:document-id document-id :document-type (:document/type doc)})
        (db.matching/set-matching-status! db-pool document-id {:status "skipped"}))
      (let [ingestion-id (:ap-ingestion/id
                          (db.sql/execute-one! db-pool
                                               {:select [:id]
                                                :from   [:ap-ingestion]
                                                :where  [:= :document-id document-id]
                                                :order-by [[:completed-at :desc :nulls-last]
                                                            [:created-at :desc]]
                                                :limit 1}))
            state        {:mode :ingestion
                          :trigger-kind :ingestion
                          :ingestion-id ingestion-id
                          :document doc
                          :structured-data (:document/structured-data doc)}]
        (engine/run-processors! context state [[(matching/->matching)]])))))

Remove notify-failure! and notify-reconciliation-failure! — the engine's unified notification handles both.

[com.getorcha.workers.ap.processors.engine :as engine]
[com.getorcha.workers.ap.processors.matching :as matching]

Remove now-dead requires (for reconciliation, db.diagnostics, db.run, etc.) if no other code in this ns uses them.

clj-kondo --lint src/com/getorcha/workers/ap/processors/matching/worker.clj
clj -X:test:silent :nses '[com.getorcha.workers.ap.processors.matching.worker-test]' 2>&1 | grep -E "FAIL|ERROR|Ran"

Expected: pass.

git add src/com/getorcha/workers/ap/processors/matching/worker.clj
git commit -m "refactor: matching worker uses unified processors engine"

Task D3: Rewrite diagnostics-recompute orchestrator

Files:

Rewrite src/com/getorcha/workers/diagnostics_recompute/orchestrator.clj:

(ns com.getorcha.workers.diagnostics-recompute.orchestrator
  "Edit-triggered recompute entry point: builds an edit-mode state
   map and invokes the processors engine."
  (:require [com.getorcha.db.document-history :as document-history]
            [com.getorcha.db.sql :as db.sql]
            [com.getorcha.json-patch.path :as json-patch.path]
            [com.getorcha.workers.ap.processors.accounts :as accounts]
            [com.getorcha.workers.ap.processors.accruals :as accruals]
            [com.getorcha.workers.ap.processors.cost-center :as cost-center]
            [com.getorcha.workers.ap.processors.engine :as engine]
            [com.getorcha.workers.ap.processors.financial-validation :as financial-validation]
            [com.getorcha.workers.ap.processors.fraud :as fraud]
            [com.getorcha.workers.ap.processors.matching :as matching]
            [com.getorcha.workers.ap.processors.reads :as reads]
            [com.getorcha.workers.ap.processors.supplier :as supplier]
            [com.getorcha.workers.ap.processors.tax-compliance :as tax-compliance]
            [com.getorcha.workers.ap.processors.uncertain-validations :as uncertain-validations]
            [com.getorcha.workers.ap.processors.validations :as validations]))


(defn ^:private fetch-document
  [db-pool document-id]
  (db.sql/execute-one! db-pool
                       {:select [:*]
                        :from   [:document]
                        :where  [:= :id document-id]}))


(defn ^:private fetch-history-row
  [db-pool history-id]
  (db.sql/execute-one! db-pool
                       {:select [:patch :edited-by]
                        :from   [:document-history]
                        :where  [:= :id history-id]}))


(defn ^:private compute-changed-leaves
  "From a patch, produce the set of clj-paths the edit touches.
   Non-leaf ops are expanded to virtual leaves via reads/expand-op-to-leaves."
  [patch document-type pre-patch-sd]
  (into #{}
        (mapcat #(reads/expand-op-to-leaves % pre-patch-sd {:document-type document-type}))
        patch))


(defn recompute-all!
  "Top-level entry-point invoked by the SQS consumer. Builds
   edit-mode state and calls the processors engine."
  [ctx {:keys [document-id history-id document-version] :as _params}]
  (let [db-pool       (:db-pool ctx)
        doc           (fetch-document db-pool document-id)
        hist          (fetch-history-row db-pool history-id)
        document-type (get-in doc [:document/structured-data :document-type])
        changed       (compute-changed-leaves
                        (:document-history/patch hist)
                        document-type
                        (:document/structured-data doc))
        state         {:mode :edit
                       :trigger-kind :edit
                       :history-id history-id
                       :edited-by (:document-history/edited-by hist)
                       :document doc
                       :structured-data (:document/structured-data doc)
                       :changed-leaves changed}]
    (engine/run-processors!
      ctx
      state
      [[(validations/->validations)]
       [(tax-compliance/->tax-compliance-analyzer)
        (fraud/->fraud-detector)
        (matching/->matching)
        (accounts/->accounts)
        (cost-center/->cost-center)
        (accruals/->accruals)
        (supplier/->supplier-matcher)
        (supplier/->supplier-verifier)
        (financial-validation/->financial-validation-resolver)
        (uncertain-validations/->uncertain-validations-resolver)]])
    :done))

In src/com/getorcha/workers/diagnostics_recompute.clj, the ::consumer init-key currently accepts :aws and :db-pool. Expand to also accept :llm-config, :search-config, :notifications. Update the ctx passed into handle-message:

(defmethod ig/init-key ::consumer
  [_ {:keys [aws db-pool llm-config search-config notifications
             max-queue-messages wait-time-seconds] :as _config}]
  (let [executor       (Executors/newVirtualThreadPerTaskExecutor)
        polling?       (atom true)
        ^SqsClient sqs (get-in aws [:clients :sqs])
        queue-url      (get-in aws [:queue-urls :diagnostics-recompute])
        ctx            {:aws aws :db-pool db-pool :llm-config llm-config
                        :search-config search-config :notifications notifications
                        :sqs-client sqs}]
    ...))

In resources/com/getorcha/config.edn, the diagnostics-recompute/consumer section gains :llm-config, :search-config, :notifications:

:com.getorcha.workers.diagnostics-recompute/consumer
{:aws                #integrant/ref :com.getorcha.aws/state
 :db-pool            #integrant/ref :com.getorcha.db/pool
 :llm-config         {:matching        #ref [:com.getorcha/llm :fast]
                      :post-processing #ref [:com.getorcha/llm :fast]
                      :tax-compliance  #ref [:com.getorcha/llm :main]
                      :vision          #ref [:com.getorcha/llm :vision]
                      :web-search      #ref [:com.getorcha/llm :web-search]}
 :search-config      #ref [:com.getorcha/search]
 :notifications      #ref [:com.getorcha/notifications]
 :max-queue-messages 10
 :wait-time-seconds  #profile {#{:local-dev :demo} 0 :test 0 :default 20}}

Search config.edn for com.getorcha.workers.ap.matching.worker and replace with com.getorcha.workers.ap.processors.matching.worker. Same for system.clj.

In test/com/getorcha/workers/diagnostics_recompute_test.clj, add an end-to-end test:

(deftest edit-triggers-recompute-and-updates-diagnostics
  (let [document-id   (helpers/create-document! (helpers/create-legal-entity!))
        _             (helpers/seed-completed-ingestion! document-id)
        identity-id   (helpers/create-identity!)
        history-row   (document-history/insert!
                       fixtures/*db*
                       {:document-id document-id :change-type :edit
                        :edited-by identity-id
                        :patch [{"op" "replace" "path" "/issuer/name" "value" "X"}]})]
    (with-redefs [matching/->matching (fn [] (->NoopProcessor :matching))
                  validations/->validations (fn [] (->CountingProcessor :validations (atom [])))
                  ...]  ; stub the rest similarly
      (orchestrator/recompute-all! {...}
                                    {:document-id document-id
                                     :history-id  (:document-history/id history-row)
                                     :document-version 2}))
    (is (pos? (db.run/count-runs fixtures/*db* document-id {:processor-id "validations"})))))
clj-kondo --lint src/com/getorcha/workers/diagnostics_recompute*
clj -X:test:silent :nses '[com.getorcha.workers.diagnostics-recompute-test]' 2>&1 | grep -E "FAIL|ERROR|Ran"

Expected: pass.

git add -A
git commit -m "refactor: diagnostics-recompute uses unified processors engine"

Phase E: Cleanup + full-suite verification

Task E1: Delete dead code + rename update-slice!update-diagnostic!

Files:

grep -rn "update-slice!" src test

Expected: only the alias itself in db/document_diagnostics.clj.

Remove the (def update-slice! update-diagnostic!) line at the bottom of the ns.

clj -X:test:silent 2>&1 | grep -A 5 -E "FAIL|ERROR|Execution error|failed because|Ran .* tests"

Expected: all pass.

git add src/com/getorcha/db/document_diagnostics.clj
git commit -m "cleanup: drop update-slice! alias"

Task E2: Delete old IProcessor v1 protocol + stubs

Files:

grep -rn "with-run-row!\|run-processor-phases\|run-phase\|build-diagnostics" src test

Expected: empty.

git add -A
git commit -m "cleanup: remove legacy post-process orchestrator helpers"

Task E3: Integration test — end-to-end edit recompute

Files:

Seed document + ingestion (so processors have something to read), edit a scalar, call recompute-all!, assert each affected slice is updated. Skip matching (expensive LLM); redef matching/->matching to a no-op counting processor.

(deftest end-to-end-edit-recompute
  (let [legal-entity-id (helpers/create-legal-entity!)
        document-id     (helpers/create-document! legal-entity-id)
        ingestion-id    (helpers/create-ingestion! document-id)
        _               (helpers/seed-invoice-structured-data! document-id
                                                                 {:issuer {:name "Acme"}
                                                                  :total 100.0
                                                                  :line-items [{:id "l1" :description "d"}]})
        identity-id     (helpers/create-identity!)
        history-row     (document-history/insert!
                         fixtures/*db*
                         {:document-id document-id :change-type :edit
                          :edited-by identity-id
                          :patch [{"op" "replace" "path" "/issuer/name" "value" "Acme-edited"}]})
        _ing            (document-history/insert!
                         fixtures/*db*
                         {:document-id document-id :change-type :ingestion
                          :ingestion-id ingestion-id :patch []})]
    (with-redefs [matching/->matching (fn [] (->NoopProcessor :matching))
                  fraud/->fraud-detector (fn [] (->NoopProcessor :fraud-detector))]
      (orchestrator/recompute-all!
        {:db-pool fixtures/*db*
         :llm-config {...}}
        {:document-id document-id
         :history-id (:document-history/id history-row)
         :document-version 2}))
    (let [doc (db.sql/execute-one! fixtures/*db*
                                   {:select [:diagnostics]
                                    :from [:document]
                                    :where [:= :id document-id]})]
      (is (some? (get-in doc [:document/diagnostics :validations])))
      (is (pos? (db.run/count-runs fixtures/*db* document-id
                                    {:processor-id "validations" :status :completed}))))))
clj -X:test:silent :nses '[com.getorcha.workers.diagnostics-recompute-test]' 2>&1 | grep -E "FAIL|ERROR|Ran"

Expected: pass.

git add test/com/getorcha/workers/diagnostics_recompute_test.clj
git commit -m "test: end-to-end edit recompute"

Task E4: Full lint + full test suite

clj-kondo --lint src test dev 2>&1 | tail -40

Expected: 0 errors, 0 warnings.

clj -X:test:silent 2>&1 | grep -A 5 -E "FAIL|ERROR|Execution error|failed because|Ran .* tests"

Expected: all pass.

The most likely category of failures is test files that still reference com.getorcha.workers.ap.ingestion.post-process.* or the old matching ns. Grep + fix:

grep -rn "com.getorcha.workers.ap.ingestion.post-process\|com.getorcha.workers.ap.matching" test

Expected: empty.

git add -A
git commit -m "chore: final lint + test fixups"

Self-review summary

Coverage against the spec:

Spec section Tasks
§2 Concepts (IProcessor, state, leaves, diagnostic) A2, A3
§3 IProcessor protocol v2 (state-aware reads/writes/diagnostic) A2
§3.1 DiagnosticSpec (single / sub-path / sub-paths / multi-slice) A2, A5 (engine write-diagnostic!)
§4 Engine: run-processors! (refresh diagnostics between phases, in-memory apply) A5 (core), A6 (filter), A7 (op filter)
§4.1 Persistence (ingestion: caller writes; edit: derivation row) A0 (migration), A5 (persist-derivation!)
§5 Modes (:ingestion, :edit) A5, A6, A7
§6 Conditional recomputation A3 (leaf-expansion, match-any), A6 (filter), D3 (changed-leaves)
§7 Write-protection A1 (provenance ns), A7 (op filter)
§8 Notifications A8
§9 Renames A4 (update-diagnostic!), C3 (enqueue!), plan-wide
§10 Namespace reorg C1 (matching move), D1 (post-process delete)
§11 Phase lists (ingestion / matching-worker / edit) D1, D2, D3
§12 Migration table B1–B9
§12.1 validations per-doc-type + always-run B9
§12.2 Matching per-doc-type reads C2
§12.3 Reconciliation inside matching C2
§13 Diagnostic slice co-ownership A4 (jsonb_set), A5 (write-diagnostic! multi-slice/sub-paths)
§13.2 tax-compliance multi-slice B5
§14 Tax-compliance vision :ingestion-only B5
§15 Test strategy Each task includes tests; E3 E2E
§16 Rollout (13 steps) A0–E4
§17 Risks (design-level; tests cover the concrete risks)

Known gaps acknowledged: