Status: ready to execute. Branch: master (same as the Phase-1 prod-resilience work).
Origin: discovered by the Plan B Task 5 nREPL 23-invoice replay — 16/22 ingestion
children drained to the DLQ before claim-ingestion! ever ran. Root-caused below.
Plan A Task 3/5 wrap each poll-loop message in concurrency/with-permit at the
.submit site. with-permit (src/com/getorcha/workers/concurrency.clj:42) calls a
blocking (.acquire semaphore) at the very start of the executor task — i.e. before
any per-message SQS protection (claim-ingestion! / extend-visibility! / heartbeat)
runs. The executor is Executors/newVirtualThreadPerTaskExecutor (unbounded, immediate
start), so the only blocking wait is that semaphore acquire.
Window receive → first extend-visibility = permit-wait + claim-query. With
Semaphore(N=3) and a 22-child fan-out at ~40–60 s heavy work each, a child in wave ≥2
waits >60 s for a permit. Ingest queue base VisibilityTimeout: local 60 s
(volumes/ministack/sqs.json, scripts/ministack-seed/seed.sh:72), prod 600 s
(infra/stacks/foundation_stack.py:298); maxReceiveCount = 3 both. So the message is
redelivered every VT while still permit-waiting, and at ApproximateReceiveCount > 3
SQS moves it to the DLQ before it is ever processed. Pre-gate, work ran immediately
on receive (the OOM incident) so this window was ≈0 — this is a regression introduced
by the gate's placement. Prod is not safe by construction, only by a thinner margin
(larger PDF / slower LLM / bigger fan-out reproduces it at VT=600 s).
Fix principle: the permit must gate only the heavy work, acquired after the message is protected (claim/extend-visibility/heartbeat started). Because the executor starts tasks immediately, protection then runs within ms of receive and the heartbeat (or an up-front extend) holds visibility across the permit wait.
| Worker | Protection it has | Fix |
|---|---|---|
ap/ingestion.clj |
claim + up-front extend + heartbeat | move permit inside job-handler, gate only the heavy try (after heartbeat) |
document_output.clj |
claim-job + up-front extend + heartbeat | move permit inside handle-message!, gate only dispatch-job!+status/delete (after heartbeat) |
ap/processors/matching/worker.clj |
none (receive-count backoff only) | up-front extend-visibility! before the permit; gate only process-document-with-output! |
diagnostics_recompute.clj |
none (handler idempotent/superseded-safe) | up-front extend-visibility! before the permit; gate only handle-message |
ap/acquisition.clj |
none; deletes msg immediately after submit (different bug: crash-loss, widened by permit) | idempotency-review the re-pull path, then move delete-message! into the task after success + up-front extend + gate only the work |
concurrency/call-with-permit inline helperFile: src/com/getorcha/workers/concurrency.clj, test/com/getorcha/workers/concurrency_test.clj
with-permit returns a ^Callable for ExecutorService.submit. The fix needs to gate
a section already running on its own virtual thread. Add a sibling:
(defn call-with-permit
"Acquire one permit from `gate`, run thunk `f` (0-arg), release in a finally.
Inline form for gating a heavy section that is already executing on its own
thread — unlike `with-permit`, which wraps a task for ExecutorService.submit."
[{:keys [^Semaphore semaphore] :as _gate} f]
(.acquire semaphore)
(try (f) (finally (.release semaphore))))
TDD: write tests first (RED → GREEN): (a) runs the thunk and returns its value;
(b) caps concurrency — with a 1-permit gate, two overlapping calls never run the thunk
body simultaneously (latch/counter, peak == 1); (c) releases on exception (a throwing
thunk still frees the permit — a following call proceeds). Reuse the patterns in the
existing concurrency_test.clj. Do not change with-permit.
Commit: feat(concurrency): add call-with-permit inline gate helper
File: src/com/getorcha/workers/ap/ingestion.clj (+ test)
RED first. Add a regression test (test/com/getorcha/workers/ap/ingestion_test.clj
or the existing ingestion test ns) proving the ordering: build the job-handler
context with a heavy-work-gate of 1 permit held by a separate blocked thread,
with-redefs stub claim-ingestion! (returns a fake pipeline-state),
aws/extend-visibility! (records a timestamp/flag), and the first heavy stage
(fetch-files-from-s3!) to block on a latch. Assert: aws/extend-visibility! is
invoked before the heavy stage is reached, even though the gate is saturated
(today it is not — the test is RED because the permit is acquired before
claim-ingestion!).
GREEN. Then:
ingestion.clj ~:step-fn doseq): change
(.submit executor (concurrency/with-permit heavy-work-gate (job-handler (merge config orchestrator) message)))
→ (.submit executor (job-handler (merge config orchestrator) message)).job-handler: add heavy-work-gate to its context :keys destructure (the poll
loop already passes (merge config orchestrator); config carries
:heavy-work-gate). Wrap only the heavy (try (let [classified …] …) (catch Throwable …) (finally …)) block (currently ~lines 1021–1129) in
(concurrency/call-with-permit heavy-work-gate (fn [] <that try block>)). Leave
claim-ingestion!, the up-front extend-visibility!, and the
heartbeat-future schedule outside / before the permit. Keep
(.cancel heartbeat-future false) in the existing finally.Verify: the new test passes; clj-kondo clean; existing ingestion tests pass.
Commit: fix(ingestion): acquire heavy-work permit after SQS protection (DLQ regression)
File: src/com/getorcha/workers/document_output.clj (+ test mirroring R2)
concurrency/with-permit wrapper; submit
(mdc/wrap-callable #(handle-message! config message)) directly (config carries
:heavy-work-gate).handle-message!: add heavy-work-gate to its :keys. Wrap only
(dispatch-job! …) + the status-check delete (currently ~lines 202–211) in
call-with-permit, placed after the up-front extend-visibility! (~191) and
the heartbeat-future schedule (~196). Keep heartbeat cancel in finally.TDD: an ordering test analogous to R2 (saturated 1-permit gate; assert up-front
extend-visibility! runs before dispatch-job!). Verify kondo + existing tests.
Commit: fix(document-output): acquire heavy-work permit after SQS protection
File: src/com/getorcha/workers/ap/processors/matching/worker.clj (+ test), config
matching has no heartbeat. It is fed a burst of ~23 messages by the ingestion fan-out
(ingestion.clj enqueues matching on completion), so it has the same
DLQ-before-processing exposure.
:visibility-extension-seconds (default 300)
to the matching orchestrator config in resources/com/getorcha/config.edn and its
:keys destructure. No new Integrant component — plain config value (matches the
"no needless components" convention).with-permit wrapper at .submit.document-id is parsed and validated, before any heavy
work: (aws/extend-visibility! sqs-client queue-url message visibility-extension-seconds).
Then wrap only (process-document-with-output! config document-id) +
its success delete-message! in (concurrency/call-with-permit heavy-work-gate …).
Keep the existing handle-failure! catch (its receive-count/visibility-backoff
retry path is unchanged) outside or around the gated body as today.TDD: ordering test — saturated 1-permit gate; assert aws/extend-visibility! is
called before process-document-with-output!. Verify kondo + existing matching tests.
Commit: fix(matching): extend message visibility before heavy-work permit
File: src/com/getorcha/workers/diagnostics_recompute.clj (+ test), config
handle-message is idempotent and drops superseded messages, so severity is low, but
the fix is cheap and keeps the pattern consistent.
:visibility-extension-seconds (default 300) to the diagnostics-recompute
consumer config + :keys.with-permit wrapper. In the task:
(aws/extend-visibility! sqs queue-url message visibility-extension-seconds) then
(concurrency/call-with-permit heavy-work-gate (fn [] (handle-message ctx (.body message)) (aws/delete-message! sqs queue-url message))),
keeping the existing catch.TDD: ordering test as above. Verify kondo + existing tests.
Commit: fix(diagnostics-recompute): extend message visibility before heavy-work permit
File: src/com/getorcha/workers/ap/acquisition.clj, acquisition.multi (+ test)
acquisition's bug is different: it (aws/delete-message!) immediately after
.submit, before the gated work runs (acquisition.clj:147), so a crash while the
task is permit-waiting/running silently loses the work. The permit widens that window.
Step 1 — idempotency review (do this first; report findings). Read
acquisition.multi/handle-queue-message and the SES/S3 pull path. Determine whether
re-processing the same message / S3 object on SQS redelivery is safe (no duplicate
doc-source / document / re-ingestion). If it is not idempotent, STOP and report
BLOCKED with the specific non-idempotent step and a proposed dedupe key (e.g. S3
object key or message id) — do not proceed to Step 2 without a safe redelivery story.
Step 2 — fix (only if Step 1 shows redelivery is safe, possibly with a guard).
:visibility-extension-seconds (default 300) to the acquisition orchestrator
config + :keys.::ignored) message: (aws/extend-visibility! sqs-client queue-url message visibility-extension-seconds)
before submitting, then submit
(concurrency/with-permit heavy-work-gate (mdc/wrap-runnable (fn [] <work> )))
where <work> ends by calling (aws/delete-message! sqs-client queue-url message)
only on success (move the delete from line 147 to the end of the task body,
inside the success path; on Throwable do not delete — let visibility expire for
redelivery). The ::ignored / s3:TestEvent path keeps deleting immediately
(no work, safe).TDD: (a) ordering test — saturated 1-permit gate, assert extend-visibility! before
the gated work; (b) a test asserting the message is not deleted when the work
throws, and is deleted on success.
Commit: fix(acquisition): delete SQS message only after gated work succeeds
Re-run Plan B Task 5's replay (running nREPL, port from the user, the real incident PDF) with all R1–R6 fixes in place. All must hold:
*.hprof — proves the permit still bounds
heavy concurrency (the original Phase-1 property is not regressed by the reorder).completed (terminal, not orphaned in-progress).:repl-max-heap-mib, :peak-used-mib, :post-gc-used-mib, the per-status
child breakdown, and the chosen ORCHA_HEAVY_CONCURRENCY for the PR body.Synchronous nREPL evals only (same constraint as the original Task 5). If children still DLQ, the fix is incomplete — return to systematic debugging, do not paper over.
Dispatch a final whole-implementation code reviewer (Plans A/B/C + this remediation),
then superpowers:finishing-a-development-branch. The PR body must document: the
gate-ordering regression, its root cause, the per-worker fix, the acquisition
idempotency finding, and the final replay numbers.