Remediation: heavy-work gate must not block before SQS message protection

Status: ready to execute. Branch: master (same as the Phase-1 prod-resilience work). Origin: discovered by the Plan B Task 5 nREPL 23-invoice replay — 16/22 ingestion children drained to the DLQ before claim-ingestion! ever ran. Root-caused below.

Root cause (confirmed by code + config, not hypothesis)

Plan A Task 3/5 wrap each poll-loop message in concurrency/with-permit at the .submit site. with-permit (src/com/getorcha/workers/concurrency.clj:42) calls a blocking (.acquire semaphore) at the very start of the executor task — i.e. before any per-message SQS protection (claim-ingestion! / extend-visibility! / heartbeat) runs. The executor is Executors/newVirtualThreadPerTaskExecutor (unbounded, immediate start), so the only blocking wait is that semaphore acquire.

Window receive → first extend-visibility = permit-wait + claim-query. With Semaphore(N=3) and a 22-child fan-out at ~40–60 s heavy work each, a child in wave ≥2 waits >60 s for a permit. Ingest queue base VisibilityTimeout: local 60 s (volumes/ministack/sqs.json, scripts/ministack-seed/seed.sh:72), prod 600 s (infra/stacks/foundation_stack.py:298); maxReceiveCount = 3 both. So the message is redelivered every VT while still permit-waiting, and at ApproximateReceiveCount > 3 SQS moves it to the DLQ before it is ever processed. Pre-gate, work ran immediately on receive (the OOM incident) so this window was ≈0 — this is a regression introduced by the gate's placement. Prod is not safe by construction, only by a thinner margin (larger PDF / slower LLM / bigger fan-out reproduces it at VT=600 s).

Fix principle: the permit must gate only the heavy work, acquired after the message is protected (claim/extend-visibility/heartbeat started). Because the executor starts tasks immediately, protection then runs within ms of receive and the heartbeat (or an up-front extend) holds visibility across the permit wait.

Per-worker shape (all 5 share the defect; lifecycles differ)

Worker Protection it has Fix
ap/ingestion.clj claim + up-front extend + heartbeat move permit inside job-handler, gate only the heavy try (after heartbeat)
document_output.clj claim-job + up-front extend + heartbeat move permit inside handle-message!, gate only dispatch-job!+status/delete (after heartbeat)
ap/processors/matching/worker.clj none (receive-count backoff only) up-front extend-visibility! before the permit; gate only process-document-with-output!
diagnostics_recompute.clj none (handler idempotent/superseded-safe) up-front extend-visibility! before the permit; gate only handle-message
ap/acquisition.clj none; deletes msg immediately after submit (different bug: crash-loss, widened by permit) idempotency-review the re-pull path, then move delete-message! into the task after success + up-front extend + gate only the work

Task R1: concurrency/call-with-permit inline helper

File: src/com/getorcha/workers/concurrency.clj, test/com/getorcha/workers/concurrency_test.clj

with-permit returns a ^Callable for ExecutorService.submit. The fix needs to gate a section already running on its own virtual thread. Add a sibling:

(defn call-with-permit
  "Acquire one permit from `gate`, run thunk `f` (0-arg), release in a finally.
   Inline form for gating a heavy section that is already executing on its own
   thread — unlike `with-permit`, which wraps a task for ExecutorService.submit."
  [{:keys [^Semaphore semaphore] :as _gate} f]
  (.acquire semaphore)
  (try (f) (finally (.release semaphore))))

TDD: write tests first (RED → GREEN): (a) runs the thunk and returns its value; (b) caps concurrency — with a 1-permit gate, two overlapping calls never run the thunk body simultaneously (latch/counter, peak == 1); (c) releases on exception (a throwing thunk still frees the permit — a following call proceeds). Reuse the patterns in the existing concurrency_test.clj. Do not change with-permit.

Commit: feat(concurrency): add call-with-permit inline gate helper


Task R2: ingestion — gate only the heavy pipeline (TDD, the reproduced case)

File: src/com/getorcha/workers/ap/ingestion.clj (+ test)

RED first. Add a regression test (test/com/getorcha/workers/ap/ingestion_test.clj or the existing ingestion test ns) proving the ordering: build the job-handler context with a heavy-work-gate of 1 permit held by a separate blocked thread, with-redefs stub claim-ingestion! (returns a fake pipeline-state), aws/extend-visibility! (records a timestamp/flag), and the first heavy stage (fetch-files-from-s3!) to block on a latch. Assert: aws/extend-visibility! is invoked before the heavy stage is reached, even though the gate is saturated (today it is not — the test is RED because the permit is acquired before claim-ingestion!).

GREEN. Then:

  1. Poll loop (ingestion.clj ~:step-fn doseq): change (.submit executor (concurrency/with-permit heavy-work-gate (job-handler (merge config orchestrator) message)))(.submit executor (job-handler (merge config orchestrator) message)).
  2. job-handler: add heavy-work-gate to its context :keys destructure (the poll loop already passes (merge config orchestrator); config carries :heavy-work-gate). Wrap only the heavy (try (let [classified …] …) (catch Throwable …) (finally …)) block (currently ~lines 1021–1129) in (concurrency/call-with-permit heavy-work-gate (fn [] <that try block>)). Leave claim-ingestion!, the up-front extend-visibility!, and the heartbeat-future schedule outside / before the permit. Keep (.cancel heartbeat-future false) in the existing finally.

Verify: the new test passes; clj-kondo clean; existing ingestion tests pass.

Commit: fix(ingestion): acquire heavy-work permit after SQS protection (DLQ regression)


Task R3: document_output — same fix

File: src/com/getorcha/workers/document_output.clj (+ test mirroring R2)

  1. Poll loop (~237): drop the concurrency/with-permit wrapper; submit (mdc/wrap-callable #(handle-message! config message)) directly (config carries :heavy-work-gate).
  2. handle-message!: add heavy-work-gate to its :keys. Wrap only (dispatch-job! …) + the status-check delete (currently ~lines 202–211) in call-with-permit, placed after the up-front extend-visibility! (~191) and the heartbeat-future schedule (~196). Keep heartbeat cancel in finally.

TDD: an ordering test analogous to R2 (saturated 1-permit gate; assert up-front extend-visibility! runs before dispatch-job!). Verify kondo + existing tests.

Commit: fix(document-output): acquire heavy-work permit after SQS protection


Task R4: matching — up-front extend before the permit

File: src/com/getorcha/workers/ap/processors/matching/worker.clj (+ test), config

matching has no heartbeat. It is fed a burst of ~23 messages by the ingestion fan-out (ingestion.clj enqueues matching on completion), so it has the same DLQ-before-processing exposure.

  1. Add a config-driven extension, e.g. :visibility-extension-seconds (default 300) to the matching orchestrator config in resources/com/getorcha/config.edn and its :keys destructure. No new Integrant component — plain config value (matches the "no needless components" convention).
  2. Poll loop (~260): drop the with-permit wrapper at .submit.
  3. In the handler, after document-id is parsed and validated, before any heavy work: (aws/extend-visibility! sqs-client queue-url message visibility-extension-seconds). Then wrap only (process-document-with-output! config document-id) + its success delete-message! in (concurrency/call-with-permit heavy-work-gate …). Keep the existing handle-failure! catch (its receive-count/visibility-backoff retry path is unchanged) outside or around the gated body as today.

TDD: ordering test — saturated 1-permit gate; assert aws/extend-visibility! is called before process-document-with-output!. Verify kondo + existing matching tests.

Commit: fix(matching): extend message visibility before heavy-work permit


Task R5: diagnostics_recompute — up-front extend before the permit

File: src/com/getorcha/workers/diagnostics_recompute.clj (+ test), config

handle-message is idempotent and drops superseded messages, so severity is low, but the fix is cheap and keeps the pattern consistent.

  1. Add :visibility-extension-seconds (default 300) to the diagnostics-recompute consumer config + :keys.
  2. Poll loop (~122): drop the with-permit wrapper. In the task: (aws/extend-visibility! sqs queue-url message visibility-extension-seconds) then (concurrency/call-with-permit heavy-work-gate (fn [] (handle-message ctx (.body message)) (aws/delete-message! sqs queue-url message))), keeping the existing catch.

TDD: ordering test as above. Verify kondo + existing tests.

Commit: fix(diagnostics-recompute): extend message visibility before heavy-work permit


Task R6: acquisition — idempotency review, then move delete into the task

File: src/com/getorcha/workers/ap/acquisition.clj, acquisition.multi (+ test)

acquisition's bug is different: it (aws/delete-message!) immediately after .submit, before the gated work runs (acquisition.clj:147), so a crash while the task is permit-waiting/running silently loses the work. The permit widens that window.

Step 1 — idempotency review (do this first; report findings). Read acquisition.multi/handle-queue-message and the SES/S3 pull path. Determine whether re-processing the same message / S3 object on SQS redelivery is safe (no duplicate doc-source / document / re-ingestion). If it is not idempotent, STOP and report BLOCKED with the specific non-idempotent step and a proposed dedupe key (e.g. S3 object key or message id) — do not proceed to Step 2 without a safe redelivery story.

Step 2 — fix (only if Step 1 shows redelivery is safe, possibly with a guard).

  1. Add :visibility-extension-seconds (default 300) to the acquisition orchestrator config + :keys.
  2. For a real (non-::ignored) message: (aws/extend-visibility! sqs-client queue-url message visibility-extension-seconds) before submitting, then submit (concurrency/with-permit heavy-work-gate (mdc/wrap-runnable (fn [] <work> ))) where <work> ends by calling (aws/delete-message! sqs-client queue-url message) only on success (move the delete from line 147 to the end of the task body, inside the success path; on Throwable do not delete — let visibility expire for redelivery). The ::ignored / s3:TestEvent path keeps deleting immediately (no work, safe).

TDD: (a) ordering test — saturated 1-permit gate, assert extend-visibility! before the gated work; (b) a test asserting the message is not deleted when the work throws, and is deleted on success.

Commit: fix(acquisition): delete SQS message only after gated work succeeds


Task R7: re-run the 23-invoice nREPL replay — acceptance gate (supersedes Task 15)

Re-run Plan B Task 5's replay (running nREPL, port from the user, the real incident PDF) with all R1–R6 fixes in place. All must hold:

Synchronous nREPL evals only (same constraint as the original Task 5). If children still DLQ, the fix is incomplete — return to systematic debugging, do not paper over.


After R7

Dispatch a final whole-implementation code reviewer (Plans A/B/C + this remediation), then superpowers:finishing-a-development-branch. The PR body must document: the gate-ordering regression, its root cause, the per-worker fix, the acquisition idempotency finding, and the final replay numbers.