Note (2026-04-24): After this document was written,
legal_entitywas renamed totenantand the oldtenantwas renamed toorganization. Read references to these terms with the pre-rename meaning.
For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
Goal: Make the matching pipeline reliable: retry transient failures, track state in the DB, and notify admins on permanent failure.
Architecture: Fix the root bug (delete-message! fires in finally before processing even starts), add in-process LLM retry with error classification, wrap the match-write phase in a DB transaction, track matching_status on the document row, and route admin notifications through the existing notifications/notify-admins! facility.
Tech Stack: Clojure, next.jdbc (db.sql/with-transaction), SQS (ChangeMessageVisibility for backoff), com.getorcha.notifications/notify-admins!, AWS SES + Slack webhook.
Files:
resources/migrations/20260226100000-add-matching-status.up.sqlresources/migrations/20260226100000-add-matching-status.down.sqlStep 1: Write the up migration
-- resources/migrations/20260226100000-add-matching-status.up.sql
ALTER TABLE document ADD COLUMN matching_status text;
--;;
ALTER TABLE document ADD COLUMN matching_error text;
--;;
ALTER TABLE document ADD COLUMN matching_attempts integer NOT NULL DEFAULT 0;
--;;
ALTER TABLE document ADD COLUMN matching_failed_at timestamptz;
--;;
CREATE INDEX idx_document_matching_status ON document(matching_status)
WHERE matching_status IS NOT NULL;
Step 2: Write the down migration
-- resources/migrations/20260226100000-add-matching-status.down.sql
DROP INDEX IF EXISTS idx_document_matching_status;
--;;
ALTER TABLE document DROP COLUMN IF EXISTS matching_failed_at;
--;;
ALTER TABLE document DROP COLUMN IF EXISTS matching_attempts;
--;;
ALTER TABLE document DROP COLUMN IF EXISTS matching_error;
--;;
ALTER TABLE document DROP COLUMN IF EXISTS matching_status;
Step 3: Apply migration in dev REPL
bb migrate migrate
Step 4: Verify in REPL
(require '[com.getorcha.db.sql :as db.sql])
(db.sql/execute! (:com.getorcha.db/pool integrant.repl.state/system)
{:select [:matching-status :matching-error :matching-attempts :matching-failed-at]
:from [:document]
:limit 1})
;; => [] (empty result is fine — just confirms column exists)
Step 5: Commit
git add resources/migrations/20260226100000-add-matching-status.up.sql \
resources/migrations/20260226100000-add-matching-status.down.sql
git commit -m "feat(matching): add matching_status tracking columns to document"
Files:
src/com/getorcha/db/document_matching.cljtest/com/getorcha/db/document_matching_test.cljStep 1: Write failing tests
Add to test/com/getorcha/db/document_matching_test.clj:
;; At the top, require the notification helpers already used in core_test:
;; [com.getorcha.test.notification-helpers :as helpers]
;; (check if it's already required; add if not)
(deftest set-matching-status-test
(testing "set-matching-status! updates all status fields"
(let [le-id (helpers/create-legal-entity!)
doc-id (helpers/create-document! le-id)]
(db.matching/set-matching-status! fixtures/*db* doc-id
{:status "pending"
:error nil
:failed-at nil})
(let [doc (db.sql/execute-one! fixtures/*db*
{:select [:matching-status :matching-error :matching-attempts]
:from [:document]
:where [:= :id doc-id]})]
(is (= "pending" (:document/matching-status doc)))
(is (nil? (:document/matching-error doc))))))
(testing "set-matching-status! increments attempts when requested"
(let [le-id (helpers/create-legal-entity!)
doc-id (helpers/create-document! le-id)]
(db.matching/set-matching-status! fixtures/*db* doc-id
{:status "in-progress"
:increment-attempts true})
(let [doc (db.sql/execute-one! fixtures/*db*
{:select [:matching-attempts]
:from [:document]
:where [:= :id doc-id]})]
(is (= 1 (:document/matching-attempts doc))))))
(testing "set-matching-status! with :failed sets matching_failed_at"
(let [le-id (helpers/create-legal-entity!)
doc-id (helpers/create-document! le-id)]
(db.matching/set-matching-status! fixtures/*db* doc-id
{:status "failed"
:error "LLM context window exceeded"
:failed-at (java.time.Instant/now)})
(let [doc (db.sql/execute-one! fixtures/*db*
{:select [:matching-status :matching-error :matching-failed-at]
:from [:document]
:where [:= :id doc-id]})]
(is (= "failed" (:document/matching-status doc)))
(is (= "LLM context window exceeded" (:document/matching-error doc)))
(is (some? (:document/matching-failed-at doc)))))))
Step 2: Run tests to verify they fail
clj -X:test:silent :nses '[com.getorcha.db.document-matching-test]'
Expected: FAIL with "Unable to resolve symbol: set-matching-status!"
Step 3: Implement set-matching-status!
Add to src/com/getorcha/db/document_matching.clj (after set-normalized-fields!):
(defn set-matching-status!
"Update matching status fields on a document.
Options map keys (all optional):
:status - Status string (\"pending\", \"in-progress\", \"succeeded\", \"failed\")
:error - Error message string, or nil to clear
:failed-at - java.time.Instant, set when transitioning to failed
:increment-attempts - When truthy, increments matching_attempts by 1"
[db document-id {:keys [status error failed-at increment-attempts] :as _opts}]
(db.sql/execute! db
{:update :document
:set (cond-> {}
status (assoc :matching-status status)
(contains? _opts :error) (assoc :matching-error error)
failed-at (assoc :matching-failed-at failed-at)
increment-attempts (assoc :matching-attempts [:+ :matching-attempts 1]))
:where [:= :id document-id]}))
Step 4: Run tests to verify they pass
clj -X:test:silent :nses '[com.getorcha.db.document-matching-test]'
Expected: All set-matching-status-test cases pass.
Step 5: Commit
git add src/com/getorcha/db/document_matching.clj \
test/com/getorcha/db/document_matching_test.clj
git commit -m "feat(matching): add set-matching-status! DB function"
Files:
src/com/getorcha/workers/matching/llm_decision.cljtest/com/getorcha/workers/matching/llm_decision_test.cljContext: parse-llm-response currently catches all errors and returns {:matches []}. This hides failures. After this task, it throws on any failure so callers can distinguish a real "no matches" result from an LLM error.
Step 1: Write failing tests
Create test/com/getorcha/workers/matching/llm_decision_test.clj:
(ns com.getorcha.workers.matching.llm-decision-test
(:require [clojure.test :refer [deftest is testing]]
[com.getorcha.workers.matching.llm-decision :as llm-decision]))
(deftest parse-llm-response-test
(testing "returns valid response with matches"
(let [result (llm-decision/parse-llm-response
"{\"matches\": [{\"candidate\": 1, \"confidence\": \"high\", \"reasoning\": \"ok\"}]}")]
(is (= 1 (count (:matches result))))))
(testing "returns valid response with empty matches"
(let [result (llm-decision/parse-llm-response "{\"matches\": []}")]
(is (= [] (:matches result)))))
(testing "throws on invalid JSON"
(is (thrown-with-msg? clojure.lang.ExceptionInfo #"not valid JSON"
(llm-decision/parse-llm-response "not json at all"))))
(testing "throws on schema validation failure (missing matches key)"
(is (thrown-with-msg? clojure.lang.ExceptionInfo #"schema validation"
(llm-decision/parse-llm-response "{\"wrong\": \"field\"}"))))
(testing "throws on schema validation failure (invalid confidence value)"
(is (thrown-with-msg? clojure.lang.ExceptionInfo #"schema validation"
(llm-decision/parse-llm-response
"{\"matches\": [{\"candidate\": 1, \"confidence\": \"certain\", \"reasoning\": \"x\"}]}")))))
Step 2: Run tests to verify they fail
clj -X:test:silent :nses '[com.getorcha.workers.matching.llm-decision-test]'
Expected: The "throws on invalid JSON" and "throws on schema validation" tests FAIL (currently they don't throw).
Step 3: Update parse-llm-response to throw
In src/com/getorcha/workers/matching/llm_decision.clj, replace parse-llm-response (lines 112–128):
(defn parse-llm-response
"Parse and validate LLM JSON response for match decisions.
Throws on malformed JSON or schema validation failure."
[response]
(let [parsed (llm/parse-json-response response)] ;; throws on bad JSON
(when-not (m/validate schema.matching/LlmMatchResponse parsed)
(throw
(ex-info "LLM match response failed schema validation"
{:kind ::schema-validation-failure
:errors (m/explain schema.matching/LlmMatchResponse parsed)
:response-preview (subs response 0 (min 200 (count response)))})))
parsed))
Note: llm/parse-json-response already throws ex-info with ::llm/json-parse-error on bad JSON. This removes the catch that was silencing it.
Step 4: Run tests to verify they pass
clj -X:test:silent :nses '[com.getorcha.workers.matching.llm-decision-test]'
Expected: All pass.
Step 5: Run core tests to verify no regressions
clj -X:test:silent :nses '[com.getorcha.workers.matching.core-test]'
Expected: All pass. (Tests that stub llm-match-decision are unaffected; parse-llm-response is called internally.)
Step 6: Commit
git add src/com/getorcha/workers/matching/llm_decision.clj \
test/com/getorcha/workers/matching/llm_decision_test.clj
git commit -m "fix(matching): llm-match-decision throws on failure instead of silently returning no matches"
Files:
src/com/getorcha/workers/matching/llm_decision.cljContext: llm-match-decision calls llm/generate (which can fail transiently). We add 3-attempt in-process retry with 2s/4s/8s backoff. Non-transient errors (context too long, auth failures) skip retries and throw immediately.
Step 1: Write failing tests
Add to test/com/getorcha/workers/matching/llm_decision_test.clj:
(:require ...
[com.getorcha.workers.llm :as llm])
(deftest llm-match-decision-retries-transient-errors-test
(testing "retries on transient error (HTTP 429) and succeeds on second attempt"
(let [call-count (atom 0)
fake-generate (fn [_ _]
(swap! call-count inc)
(if (= 1 @call-count)
(throw (ex-info "Rate limited"
{:kind :com.getorcha.workers.llm/api-error
:status 429}))
{:text "{\"matches\": []}"
:input-tokens 10
:output-tokens 5
:model "test"}))]
(with-redefs [llm/generate fake-generate]
(let [result (llm-decision/llm-match-decision
{:provider :anthropic :api-key "k" :model "m"}
{:type :invoice :structured-data {}}
[{:doc {:type :purchase-order :structured-data {}} :score 0.6 :evidence []}])]
(is (= [] (:matches result)))
(is (= 2 @call-count))))))
(testing "fails immediately on non-transient error (context too long)"
(let [call-count (atom 0)
fake-generate (fn [_ _]
(swap! call-count inc)
(throw (ex-info "Too long"
{:kind :com.getorcha.workers.llm/truncation
:status 400})))]
(with-redefs [llm/generate fake-generate]
(is (thrown? clojure.lang.ExceptionInfo
(llm-decision/llm-match-decision
{:provider :anthropic :api-key "k" :model "m"}
{:type :invoice :structured-data {}}
[{:doc {:type :purchase-order :structured-data {}} :score 0.6 :evidence []}])))
(is (= 1 @call-count)))))
(testing "exhausts retries on persistent transient error"
(let [call-count (atom 0)
fake-generate (fn [_ _]
(swap! call-count inc)
(throw (ex-info "Server error"
{:kind :com.getorcha.workers.llm/api-error
:status 503})))]
(with-redefs [llm/generate fake-generate]
(is (thrown? clojure.lang.ExceptionInfo
(llm-decision/llm-match-decision
{:provider :anthropic :api-key "k" :model "m"}
{:type :invoice :structured-data {}}
[{:doc {:type :purchase-order :structured-data {}} :score 0.6 :evidence []}])))
(is (= 3 @call-count))))))
Step 2: Run tests to verify they fail
clj -X:test:silent :nses '[com.getorcha.workers.matching.llm-decision-test]'
Expected: The retry tests FAIL (no retry logic yet).
Step 3: Add retry logic to llm_decision.clj
Add to src/com/getorcha/workers/matching/llm_decision.clj, before the llm-match-decision function:
(defn ^:private transient-llm-error?
"Returns true if the LLM error is likely transient and worth retrying."
[e]
(let [{:keys [kind status]} (ex-data e)]
(case kind
:com.getorcha.workers.llm/api-error (or (= 429 status) (<= 500 status 599))
:com.getorcha.workers.llm/truncation false
:com.getorcha.workers.llm/empty-response true
:com.getorcha.workers.llm/json-parse-error true
::schema-validation-failure true
true)))
(defn ^:private with-llm-retry
"Calls f, retrying up to max-attempts times on transient errors.
Sleeps backoff-ms * 2^(attempt-1) between attempts."
[f max-attempts backoff-ms]
(loop [attempt 1]
(let [result (try {:ok (f)} (catch Exception e {:error e}))]
(if-let [v (:ok result)]
v
(let [e (:error result)]
(if (or (not (transient-llm-error? e)) (>= attempt max-attempts))
(throw e)
(do
(log/warn "LLM call failed, retrying"
{:attempt attempt
:max max-attempts
:error-kind (:kind (ex-data e))
:error (ex-message e)})
(Thread/sleep ^long (* backoff-ms (long (Math/pow 2 (dec attempt)))))
(recur (inc attempt)))))))))
Then update llm-match-decision to wrap llm/generate in retry:
(defn llm-match-decision
"Ask LLM to decide which candidates match the source document.
Retries up to 3 times on transient errors (2s/4s/8s backoff).
Throws on non-transient errors or when all retries are exhausted."
[llm-config source-doc candidates]
(let [{:keys [system user]} (build-match-prompt source-doc candidates)
prompt (str system "\n\n" user)
{:keys [text input-tokens output-tokens model]}
(with-llm-retry #(llm/generate llm-config prompt) 3 2000)]
(assoc (parse-llm-response text)
:input-tokens input-tokens
:output-tokens output-tokens
:model model)))
Also add (:import (java.lang Math)) to the ns imports if not already present (it usually isn't needed for Math/pow in Clojure, which accesses it via Java interop). Actually in Clojure you can call (Math/pow ...) without importing java.lang.Math since java.lang is always available.
Step 4: Run tests to verify they pass
clj -X:test:silent :nses '[com.getorcha.workers.matching.llm-decision-test]'
Expected: All pass. Note: tests with Thread/sleep will be slow (~6s for the exhaust-retries test). Add (with-redefs [llm-decision/with-llm-retry (fn [f _ _] (f))] ...) pattern in other tests if needed to avoid sleep.
Actually — in the retry tests above, we stub llm/generate. The real with-llm-retry will call Thread/sleep between attempts. To avoid slowness, pass a 0ms backoff in tests. But since llm-match-decision hardcodes 3 2000, we need to either:
with-llm-retry in tests to use 0ms backoff, orThe simplest fix: expose with-llm-retry as non-private (remove ^:private) just for test redefining. Actually a cleaner approach: test the retry behavior with a clock that doesn't sleep. For now, since the tests already stub generate to fail immediately, Thread/sleep is the only delay. For 3 attempts with 2s and 4s sleeps, the last test would take 6 seconds.
To avoid this: redef Thread/sleep is not possible. Instead, add a dynamic var for the backoff (internal detail):
(def ^:dynamic ^:private *retry-backoff-ms* 2000)
Then use *retry-backoff-ms* instead of backoff-ms parameter, or pass it as a test override. Actually, the cleanest without over-engineering: just accept the 6s test for now and note it could be optimized later by making the sleep duration configurable.
For the purposes of the plan, keep it simple. The test suite will have one 6s test.
Step 5: Run full matching tests
clj -X:test:silent :nses '[com.getorcha.workers.matching.core-test
com.getorcha.workers.matching.llm-decision-test]'
Expected: All pass.
Step 6: Commit
git add src/com/getorcha/workers/matching/llm_decision.clj \
test/com/getorcha/workers/matching/llm_decision_test.clj
git commit -m "feat(matching): add LLM retry with transient/non-transient error classification"
Files:
src/com/getorcha/workers/matching/core.cljtest/com/getorcha/workers/matching/core_test.cljContext: Currently, match-document! runs the match-write phase (clear old matches → create new matches → update clusters) without a transaction. A failure mid-way leaves inconsistent state. The fix: wrap that phase in db.sql/with-transaction. The pre-processing writes (set-normalized-fields!) stay outside the transaction since they're safe to overwrite on retry.
Step 1: Write a failing test
Add to test/com/getorcha/workers/matching/core_test.clj:
(deftest match-document-atomicity-test
(testing "cluster update failure rolls back match edge creation"
(let [le-id (helpers/create-legal-entity!)
inv-id (create-document-with-type! le-id :invoice
{:issuer {:vat-id "DE999"}
:po-reference "PO-ATOMIC"})
_po-id (create-document-with-type! le-id :purchase-order
{:supplier {:vat-id "DE999"}
:po-number "PO-ATOMIC"})
source {:id inv-id
:type :invoice
:legal-entity-id le-id
:structured-data {:issuer {:vat-id "DE999"}
:po-reference "PO-ATOMIC"}}
;; Simulate a failure during cluster assignment after match edges are created
call-count (atom 0)]
(with-redefs [com.getorcha.db.document-matching/set-cluster-id!
(fn [db ids cluster-id]
(swap! call-count inc)
;; Fail on the second call (first call clears cluster, second sets it)
(when (= 2 @call-count)
(throw (ex-info "Simulated cluster update failure" {})))
(db.sql/execute! db
{:update :document
:set {:cluster-id cluster-id}
:where [:in :id ids]}))]
(is (thrown? clojure.lang.ExceptionInfo
(matching/match-document! fixtures/*db* {} nil source)))
;; No match edges should exist — rolled back
(let [matches (db.matching/get-matches-for-document fixtures/*db* inv-id)]
(is (empty? matches)))))))
Step 2: Run test to verify it fails
clj -X:test:silent :nses '[com.getorcha.workers.matching.core-test]'
Expected: match-document-atomicity-test FAILS — without a transaction, match edges exist even though cluster update failed.
Step 3: Wrap match-write phase in a transaction
In src/com/getorcha/workers/matching/core.clj, update the match-document! function. Currently the function structure is:
(set-normalized-fields! ...) ; write 1 — outside transaction
(delete-matches-for-document! ...) ; write 2 — currently outside
(set-cluster-id! ... nil) ; write 3 — currently outside
... (decide-matches ...) ; LLM call — must stay outside transaction
... (doseq match (create-match! ...)) ; write 4 — currently outside
Restructure so only the LLM-free writes are in the transaction. The LLM call must stay outside because long-running ops inside a transaction hold a DB connection:
(defn match-document!
[db search-config llm-config doc]
{:pre [(m/validate schema.matching/SourceDocument doc)]}
(let [counterparty (normalize/extract-counterparty doc)
references (normalize/extract-references doc)
searchable-text (searchable-text/build-searchable-text doc)
embedding (when (and (seq searchable-text) (seq search-config))
(search/embed-document search-config searchable-text))
doc (assoc doc
:normalized-counterparty counterparty
:normalized-references references
:searchable-text searchable-text)]
;; Pre-processing writes — safe to commit independently, will be overwritten on retry
(db.matching/set-normalized-fields! db (:id doc)
{:counterparty counterparty
:references references
:searchable-text searchable-text
:embedding embedding})
(if (nil? counterparty)
(log/warn "No counterparty extracted, skipping matching"
{:document-id (:id doc) :type (:type doc)})
(let [candidate-rows (candidates/find-candidates db search-config doc)]
(if (empty? candidate-rows)
(log/debug "No candidates found"
{:document-id (:id doc)
:matchable-types (candidates/get-matchable-types (:type doc))})
(let [all-scored (score-all-candidates doc candidate-rows)
above-threshold (filter-and-sort-candidates all-scored)]
(log/debug "Candidate scoring complete" ...)
;; Decide matches — may call LLM, must be OUTSIDE the transaction
(let [by-type (group-by #(keyword (:document/type (:doc %))) above-threshold)
decided (into {} (map (fn [[doc-type type-candidates]]
[doc-type (decide-matches llm-config doc type-candidates)])
by-type))]
;; Atomic write phase: clear → insert → cluster
(db.sql/with-transaction [tx db]
(db.matching/delete-matches-for-document! tx (:id doc))
(db.matching/set-cluster-id! tx [(:id doc)] nil)
(doseq [[_doc-type matches] decided
:when (seq matches)]
(log/debug "Matches decided" ...)
(doseq [match matches]
(create-match! tx doc match))))))))))
Key changes:
delete-matches-for-document!, set-cluster-id! (clear), and all create-match! calls in db.sql/with-transactioncreate-match! calls assign-cluster! which also does DB ops — those will use the transaction txImportant: create-match! and assign-cluster! accept db as first arg. Pass tx instead of db when calling them inside the transaction.
Step 4: Run tests to verify they pass
clj -X:test:silent :nses '[com.getorcha.workers.matching.core-test]'
Expected: All pass including the new atomicity test.
Step 5: Run linter
clj-kondo --lint src test dev --fail-level warning
Expected: No warnings.
Step 6: Commit
git add src/com/getorcha/workers/matching/core.clj \
test/com/getorcha/workers/matching/core_test.clj
git commit -m "fix(matching): wrap match write phase in DB transaction for atomicity"
Files:
src/com/getorcha/aws.cljContext: The worker needs to know how many times a message has been delivered to implement the SQS-level backoff (attempt 1 → 5min, attempt 2 → 30min, attempt 3 → DLQ). SQS provides this as a system message attribute that must be explicitly requested.
Step 1: No failing test needed — this is a thin wrapper over the SQS SDK. The integration test in Task 7 will validate end-to-end behavior.
Step 2: Update receive-messages to request the attribute
In src/com/getorcha/aws.clj:
MessageSystemAttributeName to the imports:(software.amazon.awssdk.services.sqs.model
ChangeMessageVisibilityRequest DeleteMessageRequest GetQueueUrlRequest
Message MessageSystemAttributeName ReceiveMessageRequest SendMessageRequest)
receive-messages to request the attribute:(defn receive-messages
"Receives messages from the queue with long polling.
Options:
:max-messages - Maximum number of messages to receive (1-10)
:wait-time-seconds - Long polling wait time in seconds
Returned messages include the ApproximateReceiveCount system attribute."
[^SqsClient client queue-url {:keys [max-messages wait-time-seconds]}]
(-> (.receiveMessage
client
^ReceiveMessageRequest
(.build (doto (ReceiveMessageRequest/builder)
(.queueUrl queue-url)
(.maxNumberOfMessages (int max-messages))
(.waitTimeSeconds (int wait-time-seconds))
(.messageSystemAttributeNames
^java.util.List [MessageSystemAttributeName/APPROXIMATE_RECEIVE_COUNT]))))
.messages))
receive-count helper:(defn receive-count
"Returns the ApproximateReceiveCount for a message, or 1 if unavailable."
[^Message message]
(let [attrs (.messageSystemAttributes message)]
(if-let [attr (.get attrs MessageSystemAttributeName/APPROXIMATE_RECEIVE_COUNT)]
(Integer/parseInt (.stringValue attr))
1)))
Step 3: Lint
clj-kondo --lint src test dev --fail-level warning
Step 4: Verify in REPL (optional)
In the dev REPL, after (reset), send a test message to the matching queue and verify the receive count attribute is present. Only needed if you want extra confidence before Task 7.
Step 5: Commit
git add src/com/getorcha/aws.clj
git commit -m "feat(matching): request ApproximateReceiveCount in SQS receive-messages"
Files:
src/com/getorcha/workers/matching/worker.cljContext: The root bug: delete-message! is currently called in the doseq body, immediately after .submit executor — before the submitted task even starts. This must move inside the Callable, after successful processing.
Additionally:
:in-progress when processing starts, :succeeded on success, :failed on permanent failureChangeMessageVisibility with backoff instead of deleting:failed, call notify-admins!, let message go to DLQStep 1: Write failing tests
The worker logic is hard to unit-test in isolation (it submits to an executor). Write a test that verifies the message lifecycle via the DB state. Add a new test namespace:
Create test/com/getorcha/workers/matching/worker_test.clj:
(ns com.getorcha.workers.matching.worker-test
(:require [clojure.test :refer [deftest is testing use-fixtures]]
[com.getorcha.db.sql :as db.sql]
[com.getorcha.test.fixtures :as fixtures]
[com.getorcha.test.notification-helpers :as helpers]
[com.getorcha.workers.matching.worker :as worker]))
(use-fixtures :once fixtures/with-running-system)
(use-fixtures :each fixtures/with-db-rollback)
(defn ^:private get-document-status [doc-id]
(db.sql/execute-one!
fixtures/*db*
{:select [:matching-status :matching-error :matching-attempts]
:from [:document]
:where [:= :id doc-id]}))
(deftest process-document-sets-in-progress-test
(testing "sets matching_status to in-progress at start of processing"
(let [le-id (helpers/create-legal-entity!)
doc-id (helpers/create-document! le-id)
config {:db-pool fixtures/*db*
:llm-config nil
:search-config {}}
;; Stub the actual matching to be a no-op
calls (atom [])]
(with-redefs [com.getorcha.workers.matching.core/match-document!
(fn [_ _ _ _]
;; At this point, status should already be in-progress
(let [status (:document/matching-status
(get-document-status doc-id))]
(swap! calls conj status)))]
;; Need a document with type+structured-data to get past the early guards
(db.sql/execute-one! fixtures/*db*
{:update :document
:set {:type (db.sql/->cast :invoice :document-type)
:structured-data [:lift {:issuer {:name "X"}}]}
:where [:= :id doc-id]})
(worker/process-document! config doc-id)
(is (= ["in-progress"] @calls))))))
(deftest process-document-sets-succeeded-on-success-test
(testing "sets matching_status to succeeded after successful processing"
(let [le-id (helpers/create-legal-entity!)
doc-id (helpers/create-document! le-id)
config {:db-pool fixtures/*db*
:llm-config nil
:search-config {}}]
(db.sql/execute-one! fixtures/*db*
{:update :document
:set {:type (db.sql/->cast :invoice :document-type)
:structured-data [:lift {:issuer {:name "X"}}]}
:where [:= :id doc-id]})
(with-redefs [com.getorcha.workers.matching.core/match-document! (fn [_ _ _ _] nil)]
(worker/process-document! config doc-id))
(is (= "succeeded" (:document/matching-status (get-document-status doc-id)))))))
Step 2: Run tests to verify they fail
clj -X:test:silent :nses '[com.getorcha.workers.matching.worker-test]'
Expected: FAIL — process-document! does not currently set matching status.
Step 3: Rewrite worker.clj
The key changes to src/com/getorcha/workers/matching/worker.clj:
[com.getorcha.db.document-matching :as db.matching] and [com.getorcha.notifications :as notifications]process-document! to set :in-progress at start and :succeeded at enddelete-message! inside the Callable, to the success pathtransient-matching-error? predicatehandle-failure! function:notifications to orchestrator config destructuringNew process-document! (replace existing):
(defn ^:private process-document!
"Processes a single document through the matching pipeline.
Sets matching_status to :in-progress at start and :succeeded on completion."
[{:keys [db-pool llm-config search-config] :as _context} document-id]
(let [doc (fetch-document db-pool document-id)]
(cond
(nil? doc)
(log/warn "Document not found for matching" {:document-id document-id})
(nil? (:document/structured-data doc))
(log/warn "Document missing structured data, skipping matching"
{:document-id document-id})
:else
(do
(log/info "Processing document for matching"
{:document-id document-id
:document-type (:document/type doc)
:legal-entity-id (:document/legal-entity-id doc)})
(mdc/put! :legal-entity-id (:document/legal-entity-id doc))
(xray/add-annotation! "legal_entity_id" (str (:document/legal-entity-id doc)))
;; Mark in-progress, increment attempt counter
(db.matching/set-matching-status! db-pool document-id
{:status "in-progress"
:increment-attempts true})
(matching/match-document! db-pool search-config (:matching llm-config)
{:id (:document/id doc)
:type (keyword (:document/type doc))
:legal-entity-id (:document/legal-entity-id doc)
:structured-data (:document/structured-data doc)
:cluster-id (:document/cluster-id doc)})
(db.matching/set-matching-status! db-pool document-id {:status "succeeded"})
(log/info "Document matching complete" {:document-id document-id})))))
New helper constants and functions (add before the ::orchestrator init-key):
(def ^:private max-receive-count
"Maximum SQS delivery attempts before sending to DLQ."
3)
(defn ^:private transient-matching-error?
"Returns true if the error is worth retrying at the SQS level.
Non-transient errors (LLM context too long, auth failures) should not be retried."
[e]
(let [{:keys [kind status]} (ex-data e)]
(case kind
:com.getorcha.workers.llm/api-error (not (#{400 401 403} status))
:com.getorcha.workers.llm/truncation false
true)))
(defn ^:private backoff-seconds
"Returns visibility timeout extension in seconds based on receive count."
[receive-count]
(case receive-count
1 300 ;; 5 minutes
2 1800 ;; 30 minutes
300)) ;; default
(defn ^:private notify-failure!
"Sends admin notification about a permanent matching failure. Fire-and-forget."
[{:keys [db-pool aws notifications] :as _config} document-id error-message]
(try
(let [le-id (some-> (db.sql/execute-one! db-pool
{:select [:legal-entity-id]
:from [:document]
:where [:= :id document-id]})
:document/legal-entity-id)]
(when le-id
(notifications/notify-admins!
{:db-pool db-pool
:aws aws
:notifications notifications}
{:legal-entity/id le-id}
{:kind :matching/permanent-failure
:title (str "Document matching failed: " document-id)
:body (str "Document ID: " document-id
"\nError: " error-message)})))
(catch Exception e
(log/warn e "Failed to send matching failure notification"
{:document-id document-id}))))
(defn ^:private handle-failure!
"Handles a matching failure after in-process retries are exhausted.
- Non-transient error: mark failed, notify admins, delete message
- Transient, early attempt (< max): extend visibility with backoff
- Transient, final attempt: mark failed, notify admins, let go to DLQ"
[{:keys [aws db-pool] :as config} ^Message message document-id ^Throwable t]
(let [sqs-client (get-in aws [:clients :sqs])
queue-url (get-in aws [:queue-urls :matching])
receive-count (aws/receive-count message)
transient? (transient-matching-error? t)
error-msg (ex-message t)]
(log/error t "Document matching failed"
{:document-id document-id
:receive-count receive-count
:transient? transient?})
(cond
;; Non-transient: give up immediately
(not transient?)
(do
(db.matching/set-matching-status! db-pool document-id
{:status "failed"
:error error-msg
:failed-at (java.time.Instant/now)})
(notify-failure! config document-id error-msg)
(aws/delete-message! sqs-client queue-url message))
;; Transient, more SQS attempts available: extend visibility for backoff
(< receive-count max-receive-count)
(do
(log/info "Extending message visibility for retry"
{:document-id document-id
:receive-count receive-count
:delay-seconds (backoff-seconds receive-count)})
(aws/extend-visibility! sqs-client queue-url message
(backoff-seconds receive-count)))
;; Transient, final SQS attempt: give up, let message go to DLQ
:else
(do
(db.matching/set-matching-status! db-pool document-id
{:status "failed"
:error error-msg
:failed-at (java.time.Instant/now)})
(notify-failure! config document-id error-msg)))))
Updated orchestrator init-key — move delete-message! inside the Callable, add :notifications to config destructuring:
(defmethod ig/init-key ::orchestrator
[_ {:keys [aws
max-queue-messages
notifications
wait-time-seconds
worker-pools]
:as config}]
(let [{:keys [^ExecutorService executor]} worker-pools
polling? (atom true)
^SqsClient sqs-client (get-in aws [:clients :sqs])
queue-url (get-in aws [:queue-urls :matching])]
(log/info "Starting matching orchestrator for queue" (get-in aws [:queues :matching]))
(.submit executor
^Callable
(fn []
(while @polling?
(try
(let [messages (aws/receive-messages
sqs-client
queue-url
{:max-messages max-queue-messages
:wait-time-seconds wait-time-seconds})]
(when (pos? (count messages))
(log/debug "Received" (count messages) "matching messages"))
(doseq [^Message message messages]
(.submit executor
^Callable
(mdc/wrap-callable
(fn []
(let [document-id (parse-uuid (.body message))]
(if (nil? document-id)
(do
(log/error "Invalid document ID in matching message"
{:message-body (.body message)})
(aws/delete-message! sqs-client queue-url message))
(mdc/with-context {:document-id document-id}
(AWSXRay/beginSegment "document-matching")
(xray/add-annotation! "document_id" (str document-id))
(try
(process-document! config document-id)
(aws/delete-message! sqs-client queue-url message)
(catch Throwable t
(xray/add-exception! t)
(xray/set-error!)
(handle-failure! config message document-id t))
(finally
(AWSXRay/endSegment)))))))))))
(catch Exception e
(when @polling?
(log/error e "Error in matching polling loop, will retry")))))))
{:polling? polling?}))
Step 4: Run tests
clj -X:test:silent :nses '[com.getorcha.workers.matching.worker-test]'
Expected: All pass.
Step 5: Run linter
clj-kondo --lint src test dev --fail-level warning
Step 6: Commit
git add src/com/getorcha/workers/matching/worker.clj \
test/com/getorcha/workers/matching/worker_test.clj
git commit -m "fix(matching): fix message deletion, add status tracking and failure handling"
Files:
src/com/getorcha/workers/ingestion.cljContext: When ingestion finishes and publishes the document ID to the matching queue, it should also mark matching_status = pending on the document. This gives the admin panel visibility into which documents are queued but not yet processed.
Step 1: No failing test needed — the publish function is a side-effect. The behavior is verified by checking matching_status after document ingestion in integration tests.
Step 2: Update publish-document-ready!
In src/com/getorcha/workers/ingestion.clj, find publish-document-ready! (around line 399):
(defn ^:private publish-document-ready!
"Publishes a document-ready event to the matching queue and marks the document
as pending for matching."
[{:keys [^SqsClient sqs-client aws db-pool] :as _context} document-id]
(when-let [matching-queue-url (get-in aws [:queue-urls :matching])]
(log/info "Publishing document-ready event for matching" {:document-id document-id})
(db.matching/set-matching-status! db-pool document-id {:status "pending"})
(aws/send-message! sqs-client matching-queue-url (str document-id))))
Note: you need to add [com.getorcha.db.document-matching :as db.matching] to the namespace requires if it isn't already there. Check the existing requires in ingestion.clj first. Also verify the context map already has :db-pool — check how publish-document-ready! is called to confirm.
Step 3: Lint
clj-kondo --lint src test dev --fail-level warning
Step 4: Commit
git add src/com/getorcha/workers/ingestion.clj
git commit -m "feat(matching): set matching_status=pending when publishing to matching queue"
Files:
resources/com/getorcha/config.ednContext: The matching orchestrator now uses notifications/notify-admins! which requires a notifications context map. This is already defined globally in the config. Add a reference to the orchestrator's config.
Step 1: Update config.edn
In resources/com/getorcha/config.edn, find the matching orchestrator config (around line 269) and add :notifications:
:com.getorcha.workers.matching.worker/orchestrator
{:aws #integrant/ref :com.getorcha.aws/state
:db-pool #integrant/ref :com.getorcha.db/pool
:llm-config {:matching #ref [:com.getorcha/llm :fast]}
:max-queue-messages 10
:notifications #ref [:com.getorcha/notifications]
:search-config #ref [:com.getorcha/search]
:wait-time-seconds #profile {:local-dev 0 :test 0 :default 20}
:worker-pools #integrant/ref :com.getorcha.workers.matching.worker/worker-pools}
Step 2: Run full matching test suite
clj -X:test:silent :nses '[com.getorcha.db.document-matching-test
com.getorcha.workers.matching.core-test
com.getorcha.workers.matching.llm-decision-test
com.getorcha.workers.matching.worker-test]'
Expected: All pass.
Step 3: Run full test suite
clj -X:test:silent 2>&1 | grep -A 5 -E "(FAIL in|ERROR in|Execution error|failed because|Ran .* tests)"
Expected: 0 failures, 0 errors.
Step 4: Lint
clj-kondo --lint src test dev --fail-level warning
Step 5: Commit
git add resources/com/getorcha/config.edn
git commit -m "feat(matching): wire notifications config to matching orchestrator"
After all tasks, the matching pipeline will:
delete-message! only fires on success or non-transient failurematching_status on document gives visibility into pending/in-progress/succeeded/failed