For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking. Before editing any.cljfile, the implementer must invoke theorcha-clojure-styleskill; use theclojure-evalskill (nREPL port 33905) to verify compiles/tests.
Goal: Bound the document-pipeline fan-out with a global heavy-work semaphore, and stop failing-dependency poll loops from CPU-spinning by adding shared exponential backoff + jitter.
Architecture: One new Integrant component holds a fair java.util.concurrent.Semaphore ("heavy-work gate"); every per-message worker task acquires a permit before running and releases it in a finally. A second new namespace provides one shared run-poll-loop that replaces the five duplicated (while @polling? (try … (catch …))) loops, adding backoff-with-full-jitter on error and reset-on-success. No infra changes.
Tech Stack: Clojure, Integrant, aero config, java.util.concurrent, clojure.test.
Source of truth — measured basis: single-page invoice transient ≈ 170 MB (median), spec §1.1. Default permits = 3, env-tunable via ORCHA_HEAVY_CONCURRENCY.
ORCHA_HEAVY_CONCURRENCY=3 default is sized against Plan B's lowered heap (MaxRAMPercentage 60 + mem_limit: 3200m ≈ 1.9 GB heap). Plan A is safe to merge and run on its own (the gate strictly reduces concurrency, never increases it), but the permit default is only correct once Plan B is deployed; against the current 75% / no-limit heap, 3 is a different, untested operating point. Treat 3 as a hypothesis until validated.prod and local — no staging; infra/app.py:38), depending only on this plan's gate + Plan B's lowered heap/mem_limit — not Plan C's alarm. It is the spec §Testing "memory replay" requirement and the Phase 1 acceptance gate; the operator must not deploy until it passes. Rollback/tuning knob on failure: ORCHA_HEAVY_CONCURRENCY=1 — restart only, no code change, but only because Plan B Task 2 plumbs that env var into deploy/docker-compose.yml (the orcha service otherwise declares no env_file and would silently ignore the host var, pinning permits at the default 3). The knob is inert until Plan B Task 2 is applied; tuning it is documented in Plan B Task 5.segmentation-gate! is not a separately-gated phase (reviewed; intentional — do not "fix"): the 2026-05-16 OOM was the fan-out, not the split. segmentation-gate! (ingestion.clj:835) re-enqueues each child via aws/send-message! (ingestion.clj:943); every child then arrives as its own SQS message → its own job-handler, which Task 3 Step 2 wraps with with-permit. So the gate bounds the actual fan-out to :permits concurrent. The split itself runs sequentially inside one already-gated parent task — child-pdf-bytes is created and uploaded per doseq iteration, then eligible for GC (not 23 retained). Its heaviest footprint ("parent pdf-bytes retained + N sequential extract-pages") is a single gated task's working set — precisely what the Plan C Step 1 replay sizes :permits against. Adding a second inner semaphore around the split would risk self-deadlock (a permit holder waiting on a permit) for no memory benefit. Do not add one.src/com/getorcha/workers/concurrency.clj — heavy-work gate component + with-permit. One responsibility: global concurrency gating.src/com/getorcha/workers/poll_loop.clj — run-poll-loop (backoff/jitter/reset). One responsibility: resilient SQS poll loop control flow.test/com/getorcha/workers/concurrency_test.cljtest/com/getorcha/workers/poll_loop_test.cljresources/com/getorcha/config.edn — register the gate component; add :heavy-work-gate ref to the 5 orchestrator configs.src/com/getorcha/workers/ap/ingestion.clj (loop ~1150-1166, inner submit 1163, destructure ~1135-1139)src/com/getorcha/workers/ap/acquisition.clj (loop ~87-142, inner submit ~122)src/com/getorcha/workers/ap/processors/matching/worker.clj (loop ~241-288, inner submit ~254)src/com/getorcha/workers/document_output.clj (loop ~221-238, inner submit ~232)src/com/getorcha/workers/diagnostics_recompute.clj (loop ~105-127, inner submit ~116)Ordering rationale: gate component (Task 1-2) → wire gate into workers (Task 3) → shared poll-loop (Task 4) → refactor loops (Task 5) → integration verify (Task 6). The gate is the actual fix; it lands first and is independently valuable even before the poll-loop refactor.
Files:
Create: src/com/getorcha/workers/concurrency.clj
Test: test/com/getorcha/workers/concurrency_test.clj
Step 1: Write the failing test
(ns com.getorcha.workers.concurrency-test
(:require [clojure.test :refer [deftest is testing]]
[com.getorcha.workers.concurrency :as concurrency]
[integrant.core :as ig])
(:import (java.util.concurrent CountDownLatch TimeUnit)
(java.util.concurrent.atomic AtomicInteger)))
(deftest gate-init-coerces-permits
(testing "permits accepts int or string"
(let [g1 (ig/init-key :com.getorcha.workers.concurrency/heavy-work-gate {:permits 3})
g2 (ig/init-key :com.getorcha.workers.concurrency/heavy-work-gate {:permits "4"})]
(is (= 3 (.availablePermits ^java.util.concurrent.Semaphore (:semaphore g1))))
(is (= 4 (.availablePermits ^java.util.concurrent.Semaphore (:semaphore g2)))))))
(deftest gate-init-rejects-non-positive-permits
(testing "0 / negative / non-numeric permits fail fast at init, not a silent deadlock"
;; A Semaphore with <=0 permits makes every .acquire block forever ->
;; all heavy work wedges silently. A bad ORCHA_HEAVY_CONCURRENCY must
;; crash startup, not deadlock the workers.
(doseq [bad [0 -1 "0" "-3" "abc"]]
(is (thrown? clojure.lang.ExceptionInfo
(ig/init-key :com.getorcha.workers.concurrency/heavy-work-gate
{:permits bad}))
(str "should reject " (pr-str bad))))))
(deftest with-permit-caps-concurrency
(testing "no more than :permits tasks run heavy work simultaneously"
(let [gate (ig/init-key :com.getorcha.workers.concurrency/heavy-work-gate {:permits 2})
inside (AtomicInteger. 0)
observed (AtomicInteger. 0)
start (CountDownLatch. 1)
done (CountDownLatch. 10)
;; with-permit returns a Callable — it must be .call'd (workers
;; submit it to an executor, which calls .call). Constructing it
;; alone acquires nothing and runs nothing.
task (fn []
(.call ^java.util.concurrent.Callable
(concurrency/with-permit gate
(fn []
(let [n (.incrementAndGet inside)]
(.getAndUpdate observed (reify java.util.function.IntUnaryOperator
(applyAsInt [_ cur] (max cur n))))
(Thread/sleep 20)
(.decrementAndGet inside)
(.countDown done))))))
threads (repeatedly 10 #(Thread. ^Runnable (fn [] (.await start) (task))))]
(doseq [^Thread t threads] (.start t))
(.countDown start)
(is (.await done 10 TimeUnit/SECONDS))
(is (<= (.get observed) 2)))))
(deftest with-permit-releases-on-exception
(testing "permit is released even when the task throws"
(let [gate (ig/init-key :com.getorcha.workers.concurrency/heavy-work-gate {:permits 1})]
(is (thrown? RuntimeException
(.call ^java.util.concurrent.Callable
(concurrency/with-permit
gate (fn [] (throw (RuntimeException. "boom")))))))
(is (= 1 (.availablePermits ^java.util.concurrent.Semaphore (:semaphore gate)))))))
(deftest with-permit-runs-runnable-and-callable
;; Regression: acquisition submits (mdc/wrap-runnable …) -> a reify
;; Runnable, which is neither Callable nor IFn. with-permit must .run it,
;; not call it as a fn (ClassCastException). The other four workers pass
;; a reify Callable (mdc/wrap-callable) or a bare fn.
(testing "a reify Runnable is .run, not invoked as a fn"
(let [gate (ig/init-key :com.getorcha.workers.concurrency/heavy-work-gate {:permits 1})
ran (atom false)
r (reify Runnable (run [_] (reset! ran true)))]
(.call ^java.util.concurrent.Callable (concurrency/with-permit gate r))
(is @ran)
(is (= 1 (.availablePermits ^java.util.concurrent.Semaphore (:semaphore gate))))))
(testing "a reify Callable is .call'd and its value returned"
(let [gate (ig/init-key :com.getorcha.workers.concurrency/heavy-work-gate {:permits 1})
c (reify java.util.concurrent.Callable (call [_] :result))]
(is (= :result (.call ^java.util.concurrent.Callable (concurrency/with-permit gate c))))
(is (= 1 (.availablePermits ^java.util.concurrent.Semaphore (:semaphore gate))))))
(testing "a bare fn still works (implements both Callable and IFn)"
(let [gate (ig/init-key :com.getorcha.workers.concurrency/heavy-work-gate {:permits 1})]
(is (= :ok (.call ^java.util.concurrent.Callable
(concurrency/with-permit gate (fn [] :ok)))))
(is (= 1 (.availablePermits ^java.util.concurrent.Semaphore (:semaphore gate)))))))
Run: clj-nrepl-eval -p 33905 "(require 'com.getorcha.workers.concurrency-test :reload)"
Expected: FAIL — com.getorcha.workers.concurrency does not exist (FileNotFoundException).
(ns com.getorcha.workers.concurrency
"Global heavy-work concurrency gate.
A single fair semaphore caps how many memory-heavy document tasks run
simultaneously across ALL workers, bounding aggregate peak heap regardless
of fan-out size. Sized from measured ~170 MB/invoice (see
docs/superpowers/specs/2026-05-16-prod-resilience-design.html §1.1)."
(:require [clojure.tools.logging :as log]
[integrant.core :as ig])
(:import (java.util.concurrent Callable Semaphore)))
(defmethod ig/init-key ::heavy-work-gate
[_ {:keys [permits] :as _config}]
(let [n (try
(cond-> permits (string? permits) Integer/parseInt)
(catch Exception _ ::invalid))]
;; Fail fast on a bad ORCHA_HEAVY_CONCURRENCY. A Semaphore with <=0
;; permits never grants an acquire -> every heavy task blocks forever
;; (silent, total deadlock). Crash startup instead.
(when-not (and (integer? n) (pos? n))
(throw (ex-info "heavy-work-gate :permits must be a positive integer"
{:permits permits :parsed n})))
(log/info "Heavy-work gate initialized" {:permits n})
{:semaphore (Semaphore. (int n) true)}))
(defmethod ig/halt-key! ::heavy-work-gate
[_ _gate]
nil)
(defn with-permit
"Acquire one permit from `gate`, run `task`, release the permit in a
`finally`. `task` may be a `Callable`, a `Runnable`, or a 0-arg fn —
the workers submit a mix: `mdc/wrap-callable` returns a `reify Callable`,
`mdc/wrap-runnable` returns a `reify Runnable` (NOT an `IFn` — calling it
as a fn throws `ClassCastException`), and some submit a bare `fn`. Returns
a `Callable` so it can be passed straight to `ExecutorService.submit`."
^Callable [{:keys [^Semaphore semaphore] :as _gate} task]
(reify Callable
(call [_]
(.acquire semaphore)
(try
(cond
(instance? Callable task) (.call ^Callable task)
(instance? Runnable task) (.run ^Runnable task)
:else (task))
(finally
(.release semaphore))))))
Run: clj-nrepl-eval -p 33905 "(require 'com.getorcha.workers.concurrency :reload 'com.getorcha.workers.concurrency-test :reload) (clojure.test/run-tests 'com.getorcha.workers.concurrency-test)"
Expected: PASS — 0 failures, 0 errors.
Run: clj-kondo --lint src/com/getorcha/workers/concurrency.clj test/com/getorcha/workers/concurrency_test.clj
Expected: 0 errors, 0 warnings (fix any, including info-level).
git add src/com/getorcha/workers/concurrency.clj test/com/getorcha/workers/concurrency_test.clj
git commit -m "feat(workers): add global heavy-work concurrency gate"
Files:
Modify: resources/com/getorcha/config.edn (the :integrant/config map, starts line 114)
Step 1: Add the component to the :integrant/config map
Insert this entry near the other worker components (anywhere inside the :integrant/config map, e.g. immediately before :com.getorcha.workers.ap.ingestion/worker-pools at line 221):
:com.getorcha.workers.concurrency/heavy-work-gate
{:permits #or [#env ORCHA_HEAVY_CONCURRENCY 3]}
(#or/#env are already used in this file at line 31; if ORCHA_HEAVY_CONCURRENCY is set it arrives as a string and ig/init-key coerces it — covered by Task 1's gate-init-coerces-permits test.)
Run: clj-nrepl-eval -p 33905 --timeout 30000 "(require 'com.getorcha.system :reload) (-> (com.getorcha.system/ig-config {:config-file \"com/getorcha/config.edn\" :profile :prod}) (get :com.getorcha.workers.concurrency/heavy-work-gate))"
Expected: {:permits 3} (no aero exception).
git add resources/com/getorcha/config.edn
git commit -m "feat(config): register heavy-work-gate component (default 3, env ORCHA_HEAVY_CONCURRENCY)"
Each worker submits its heavy per-message task with (.submit executor <task>). We pass the gate into each orchestrator via config and wrap the submitted task with concurrency/with-permit. The transformation is identical in shape for all five; exact per-file edits below.
Files:
Modify: resources/com/getorcha/config.edn
Modify: src/com/getorcha/workers/ap/ingestion.clj
Modify: src/com/getorcha/workers/ap/acquisition.clj
Modify: src/com/getorcha/workers/ap/processors/matching/worker.clj
Modify: src/com/getorcha/workers/document_output.clj
Modify: src/com/getorcha/workers/diagnostics_recompute.clj
Step 1: Add the gate ref to the 5 orchestrator configs in config.edn
Add this exact line into each of these 5 component maps in :integrant/config (alongside their existing #integrant/ref keys):
:heavy-work-gate #integrant/ref :com.getorcha.workers.concurrency/heavy-work-gate
:com.getorcha.workers.ap.ingestion/orchestrator (line ~226)
:com.getorcha.workers.ap.acquisition/orchestrator (line ~267)
:com.getorcha.workers.ap.processors.matching.worker/orchestrator (line ~292)
:com.getorcha.workers.diagnostics-recompute/consumer (line ~308)
:com.getorcha.workers.document-output/orchestrator (line ~329)
Step 2: ingestion.clj — destructure gate and wrap the submit
Add heavy-work-gate to the ig/init-key ::orchestrator destructuring (the {:keys [...] :as config} arg whose body is at line 1138) and add the require [com.getorcha.workers.concurrency :as concurrency] (alphabetical order) to the ns form.
Replace line 1163:
(.submit executor (job-handler (merge config orchestrator) message))))
with:
(.submit executor
(concurrency/with-permit
heavy-work-gate
(job-handler (merge config orchestrator) message)))))
Add [com.getorcha.workers.concurrency :as concurrency] to requires; add heavy-work-gate to the ig/init-key ::orchestrator destructuring. Replace the inner submit at lines 122-138 — change:
(when (not= ::ignored q-message)
(.submit executor
(mdc/wrap-runnable
(fn []
to wrap the mdc/wrap-runnable form:
(when (not= ::ignored q-message)
(.submit executor
(concurrency/with-permit
heavy-work-gate
(mdc/wrap-runnable
(fn []
…and add one closing ) to balance the added with-permit call (the form already ends at line 138 with the (finally (AWSXRay/endSegment))))))))) — append exactly one )).
Note (do not regress): acquisition is the only worker that passes
(mdc/wrap-runnable …)→ areify Runnable.with-permit(Task 1) dispatchesCallable/Runnable/fn viacond, so this is safe. Do not "simplify"with-permitback to aCallable-only check — it is a runtimeClassCastExceptionhere that no compile-check catches; covered bywith-permit-runs-runnable-and-callable.
Add the require + heavy-work-gate destructuring. Replace lines 254-256:
(.submit executor
^Callable
(mdc/wrap-callable
with:
(.submit executor
(concurrency/with-permit
heavy-work-gate
(mdc/wrap-callable
…and add one closing ) after the existing close of the mdc/wrap-callable form (the ))))))) ending the doseq's submit, near line 285).
Add the require + heavy-work-gate destructuring (its ig/init-key ::orchestrator arg map is on line 214). Replace lines 232-235:
(.submit executor
^Callable
(mdc/wrap-callable
#(handle-message! config message)))))
with:
(.submit executor
(concurrency/with-permit
heavy-work-gate
(mdc/wrap-callable
#(handle-message! config message))))))
Add the require + add heavy-work-gate to the ig/init-key destructuring arg map (line ~76-77). Replace lines 116-124:
(.submit executor
^Callable
(fn []
(try
(handle-message ctx (.body message))
(aws/delete-message! sqs queue-url message)
(catch Exception e
(log/error e "diagnostics-recompute message processing failed"
{:message-body (.body message)})))))))
with:
(.submit executor
(concurrency/with-permit
heavy-work-gate
(fn []
(try
(handle-message ctx (.body message))
(aws/delete-message! sqs queue-url message)
(catch Exception e
(log/error e "diagnostics-recompute message processing failed"
{:message-body (.body message)}))))))))
Run: clj-nrepl-eval -p 33905 --timeout 30000 "(doseq [n '[com.getorcha.workers.concurrency com.getorcha.workers.ap.ingestion com.getorcha.workers.ap.acquisition com.getorcha.workers.ap.processors.matching.worker com.getorcha.workers.document-output com.getorcha.workers.diagnostics-recompute]] (require n :reload)) :ok"
Expected: :ok (no compilation/reflection errors).
Run: clj-kondo --lint src/com/getorcha/workers
Expected: 0 errors, 0 warnings.
git add resources/com/getorcha/config.edn src/com/getorcha/workers
git commit -m "feat(workers): gate all 5 heavy per-message tasks through heavy-work-gate"
Files:
Create: src/com/getorcha/workers/poll_loop.clj
Test: test/com/getorcha/workers/poll_loop_test.clj
Step 1: Write the failing test
(ns com.getorcha.workers.poll-loop-test
(:require [clojure.test :refer [deftest is testing]]
[com.getorcha.workers.poll-loop :as poll-loop]))
(deftest backoff-sequence-is-exponential-capped
(let [backoff {:base-ms 1000 :max-ms 30000}]
(testing "cap doubles from base then clamps at max"
(is (= [1000 2000 4000 8000 16000 30000 30000 30000]
(map #(poll-loop/backoff-cap % backoff) (range 0 8)))))
(testing "full-jitter: every delay is within [0, cap] for that attempt"
(doseq [attempt (range 0 8)]
(let [cap (poll-loop/backoff-cap attempt backoff)]
(dotimes [_ 50]
(is (<= 0 (poll-loop/backoff-ms attempt backoff) cap))))))
(testing "high attempt is overflow-safe and stays at max (regression)"
;; The attempt counter is unbounded under a persistent failure; a
;; large attempt must not throw (long overflow) and must clamp to max.
(is (= 30000 (poll-loop/backoff-cap 1000 backoff)))
(is (<= 0 (poll-loop/backoff-ms 1000 backoff) 30000)))))
(deftest runs-step-until-stopped-and-resets-on-success
(testing "loop runs while polling? true; backoff resets after a success"
(let [polling? (atom true)
calls (atom 0)
sleeps (atom [])]
(with-redefs [poll-loop/sleep! (fn [ms] (swap! sleeps conj ms))]
(poll-loop/run-poll-loop
{:polling? polling?
:label "test"
:backoff {:base-ms 1000 :max-ms 30000}
:step-fn (fn []
(let [n (swap! calls inc)]
(cond
(= n 1) (throw (RuntimeException. "fail-1"))
(= n 2) (throw (RuntimeException. "fail-2"))
(= n 3) :ok
:else (do (reset! polling? false) :ok))))}))
(is (= 4 @calls))
;; two failures before the first success -> two backoff sleeps,
;; attempt counter reset by the success at call 3.
(is (= 2 (count @sleeps)))
(is (every? #(<= 0 % 2000) @sleeps)))))
(deftest exits-cleanly-when-interrupted-during-backoff
(testing "InterruptedException during the backoff sleep stops the loop quietly"
;; run-poll-loop's shutdown contract calls (.interrupt
;; (Thread/currentThread)) so the executor sees the interrupt. It must
;; therefore run on a DEDICATED thread (as it does in prod — an executor
;; worker), never the test/nREPL thread, or it would interrupt the test
;; runner itself. We capture the return value and the loop thread's own
;; post-run interrupt status from inside that thread (so no `dosync`/STM
;; or nREPL machinery can consume the flag first).
(let [polling? (atom true)
calls (atom 0)
result (atom ::unset)
self-interrupted? (atom ::unset)]
(with-redefs [poll-loop/sleep! (fn [_]
(reset! polling? false)
(throw (InterruptedException. "shutdown")))]
(let [t (Thread.
^Runnable
(fn []
(reset! result
(poll-loop/run-poll-loop
{:polling? polling?
:label "test"
:backoff {:base-ms 1000 :max-ms 30000}
:step-fn (fn []
(swap! calls inc)
(throw (RuntimeException. "always")))}))
(reset! self-interrupted?
(.isInterrupted (Thread/currentThread)))))]
(.start t)
(.join t 5000)
(is (not (.isAlive t))
"loop thread terminated (did not hang in backoff after interrupt)")))
(is (nil? @result)
"loop swallows the interrupt and returns, does not propagate")
(is (= 1 @calls) "no further step-fn calls after the interrupt")
(is (true? @self-interrupted?)
"run-poll-loop restored the interrupt flag on its own thread before exiting"))))
Run: clj-nrepl-eval -p 33905 "(require 'com.getorcha.workers.poll-loop-test :reload)"
Expected: FAIL — namespace com.getorcha.workers.poll-loop not found.
(ns com.getorcha.workers.poll-loop
"Resilient SQS poll-loop control flow shared by all workers.
Normal path is unchanged: `step-fn` performs one receive+dispatch and
`aws/receive-messages` long-polls. On a caught exception we sleep with
exponential backoff + full jitter (instead of spinning), then retry;
a successful step resets the backoff. Honors the `polling?` stop atom."
(:require [clojure.tools.logging :as log]))
(defn backoff-cap
"Capped exponential delay ceiling for `attempt` (0-based):
min(max-ms, base-ms * 2^attempt).
The exponent is clamped to 30. `run-poll-loop`'s attempt counter grows
unbounded under a persistently-failing dependency; without the clamp
`base-ms * 2^attempt` overflows `long` (~attempt 54, primitive `*`
throws `ArithmeticException`) and — since that is not an
`InterruptedException` — escapes the loop and kills the worker forever.
Beyond ~attempt 25 the value is already `max-ms`, so clamping at 30 is
exact for any sane base/max and overflow-proof (base-ms * 2^30 fits long)."
[attempt {:keys [base-ms max-ms] :as _backoff}]
(min (long max-ms)
(* (long base-ms) (long (Math/pow 2 (min (long attempt) 30))))))
(defn backoff-ms
"Full-jitter delay: a random value in [0, (backoff-cap attempt)]."
[attempt backoff]
(long (rand-int (inc (backoff-cap attempt backoff)))))
(defn sleep!
"Sleep indirection so tests can stub it. `Thread/sleep` propagates
`InterruptedException`; `run-poll-loop` treats that as its stop signal
so a shutdown mid-backoff exits promptly instead of waiting out the cap."
[ms]
(Thread/sleep (long ms)))
(defn run-poll-loop
"Run `step-fn` repeatedly while `@polling?`. On exception: log once (if
still polling) and sleep with backoff+jitter, escalating the attempt
counter; a successful step resets it. Blocks the calling thread — submit
it to the worker's executor exactly like the old `(fn [] (while …))`.
Shutdown contract: `halt-key!` does `(reset! polling? false)` then
`(.cancel polling-future true)`, which interrupts this thread. An
`InterruptedException` — whether thrown by `sleep!` mid-backoff or by a
blocking `step-fn` — is the stop signal: the loop exits quietly (no error
log), and the thread's interrupt flag is restored so the executor sees it.
Without this, an interrupt during the up-to-30s backoff would either be
swallowed by the generic `catch` and retried, or unwind as a logged
stacktrace — neither of which honors the stop atom during the sleep."
[{:keys [polling? label backoff step-fn] :as _opts}]
(try
(loop [attempt 0]
(when @polling?
(let [ok? (try
(step-fn)
true
(catch InterruptedException e
(throw e))
(catch Exception e
(when @polling?
(log/error e (str "Error in " label " polling loop, will retry")))
false))]
(if ok?
(recur 0)
(do
(when @polling?
(sleep! (backoff-ms attempt backoff)))
(recur (inc attempt)))))))
(catch InterruptedException _
;; Log BEFORE re-arming the interrupt: a logging appender may do an
;; interruptible blocking op, which would drop/throw if the thread's
;; interrupt flag is already set. Emit the shutdown line first, then
;; restore the flag so the executor still sees the interrupt.
(log/info (str label " poll loop interrupted; stopping"))
(.interrupt (Thread/currentThread)))))
Run: clj-nrepl-eval -p 33905 "(require 'com.getorcha.workers.poll-loop :reload 'com.getorcha.workers.poll-loop-test :reload) (clojure.test/run-tests 'com.getorcha.workers.poll-loop-test)"
Expected: PASS — 0 failures, 0 errors.
Run: clj-kondo --lint src/com/getorcha/workers/poll_loop.clj test/com/getorcha/workers/poll_loop_test.clj
Expected: 0 errors, 0 warnings.
git add src/com/getorcha/workers/poll_loop.clj test/com/getorcha/workers/poll_loop_test.clj
git commit -m "feat(workers): add shared resilient poll-loop (backoff + full jitter)"
run-poll-loopFor each worker, replace the (.submit executor ^Callable (fn [] (while @polling? (try <RECEIVE+DISPATCH> (catch Exception e (when @polling? (log/error e "Error in … polling loop, will retry"))))))) with a call that submits a run-poll-loop. The <RECEIVE+DISPATCH> body becomes the :step-fn. Behavior is preserved (normal long-poll unchanged); only the error path gains backoff.
Files: the same 5 worker files. Add [com.getorcha.workers.poll-loop :as poll-loop] to each ns require (alphabetical).
^Future polling-future
(.submit executor
^Callable
(fn []
(while @polling?
(try
(let [messages (aws/receive-messages
sqs-client
queue-url
{:max-messages max-queue-messages
:wait-time-seconds wait-time-seconds})]
(when (seq messages)
(log/debug "Received" (count messages) "messages"))
(doseq [^Message message messages]
(.submit executor
(concurrency/with-permit
heavy-work-gate
(job-handler (merge config orchestrator) message)))))
(catch Exception e
(when @polling?
(log/error e "Error in polling loop, will retry")))))))]
with:
^Future polling-future
(.submit executor
^Callable
(fn []
(poll-loop/run-poll-loop
{:polling? polling?
:label "ingestion"
:backoff {:base-ms 1000 :max-ms 30000}
:step-fn (fn []
(let [messages (aws/receive-messages
sqs-client
queue-url
{:max-messages max-queue-messages
:wait-time-seconds wait-time-seconds})]
(when (seq messages)
(log/debug "Received" (count messages) "messages"))
(doseq [^Message message messages]
(.submit executor
(concurrency/with-permit
heavy-work-gate
(job-handler (merge config orchestrator) message))))))})))]
Step 2: acquisition.clj — apply the same transformation to the loop at lines 87-142: keep the (.submit executor ^Callable (fn [] …)) outer wrapper, replace its (while @polling? (try <body> (catch …))) with (poll-loop/run-poll-loop {:polling? polling? :label "acquisition" :backoff {:base-ms 1000 :max-ms 30000} :step-fn (fn [] <body>)}) where <body> is the existing (let [messages (aws/receive-messages sqs-client queue-url {:max-messages max-queue-messages :wait-time-seconds wait-time-seconds})] … (doseq …)) (the gated submit from Task 3 Step 3 is inside <body> unchanged). Delete the old (catch Exception e (when @polling? (log/error e "Error in acquisition polling loop, will retry"))).
Step 3: matching/worker.clj — same transformation on lines 241-288, :label "matching", <body> = the (let [messages …] … (doseq …)) containing the Task 3 Step 4 gated submit. Drop the old catch.
Step 4: document_output.clj — same on lines 221-238, :label "document-output", <body> = the (let [messages …] (doseq …)) with the Task 3 Step 5 gated submit. Drop the old catch.
Step 5: diagnostics_recompute.clj — same on lines 105-127, :label "diagnostics-recompute", <body> = the (let [messages …] … (doseq …)) with the Task 3 Step 6 gated submit. Drop the old catch. (Leave the startup reap-stuck-running! block at lines 92-110 untouched — it is outside the loop.)
Step 6: Compile-check all five
Run: clj-nrepl-eval -p 33905 --timeout 30000 "(doseq [n '[com.getorcha.workers.ap.ingestion com.getorcha.workers.ap.acquisition com.getorcha.workers.ap.processors.matching.worker com.getorcha.workers.document-output com.getorcha.workers.diagnostics-recompute]] (require n :reload)) :ok"
Expected: :ok.
Run: clj-kondo --lint src/com/getorcha/workers
Expected: 0 errors, 0 warnings.
git add src/com/getorcha/workers
git commit -m "refactor(workers): route all 5 poll loops through shared run-poll-loop"
Files: none (verification only).
Run: clj-nrepl-eval -p 33905 --timeout 60000 "(require 'integrant.repl) ((resolve 'integrant.repl/reset-all)) (keys integrant.repl.state/system)"
Expected: keys include :com.getorcha.workers.concurrency/heavy-work-gate and all 5 worker orchestrators; no exception.
Run: clj-nrepl-eval -p 33905 "(let [g (:com.getorcha.workers.concurrency/heavy-work-gate integrant.repl.state/system)] {:permits (.availablePermits ^java.util.concurrent.Semaphore (:semaphore g))})"
Expected: {:permits 3}.
Run: clj -X:test:silent :nses '[com.getorcha.workers.concurrency-test com.getorcha.workers.poll-loop-test]'
Expected: Ran N tests … 0 failures, 0 errors.
Run: clj -X:test:silent 2>&1 | grep -A 5 -E "(FAIL in|ERROR in|Execution error|Ran .* tests)"
Expected: existing suite green (no new failures vs. baseline). If pre-existing unrelated failures exist, confirm they match the pre-change baseline.
Run: clj-nrepl-eval -p 33905 --timeout 30000 <<'EOF' (let [gate (:com.getorcha.workers.concurrency/heavy-work-gate integrant.repl.state/system) inside (java.util.concurrent.atomic.AtomicInteger. 0) peak (java.util.concurrent.atomic.AtomicInteger. 0) done (java.util.concurrent.CountDownLatch. 12) ex (java.util.concurrent.Executors/newVirtualThreadPerTaskExecutor)] (dotimes [_ 12] (.submit ex (com.getorcha.workers.concurrency/with-permit gate (fn [] (let [n (.incrementAndGet inside)] (.getAndUpdate peak (reify java.util.function.IntUnaryOperator (applyAsInt [_ c] (max c n)))) (Thread/sleep 50) (.decrementAndGet inside) (.countDown done)))))) (.await done) (.shutdown ex) {:peak-concurrency (.get peak)}) EOF
Expected: {:peak-concurrency 3} (≤ permits).
git add -A
git commit -m "test(workers): integration verification for Plan A resilience"
Runnable/Callable/fn dispatch + non-positive-permits reject — Task 1) / backoff (incl. interrupt-during-backoff — Task 4) / behavioral max-concurrent (Task 6); the spec's "memory replay" is owned by Plan B Task 5 and "ingestion-regression" is out of this plan's scope. Out-of-scope items (infra, monitoring, auto-recovery, uuid_array) correctly excluded → Plans B/C.with-permit accepts Callable/Runnable/fn (acquisition submits a reify Runnable via mdc/wrap-runnable — dispatched by cond, regression-tested) and returns a Callable that must be .call'd/.submit'd — every test invokes it (not just constructs it); ig/init-key rejects non-positive/non-numeric :permits with ex-info (fail-fast vs silent semaphore deadlock), covered by gate-init-rejects-non-positive-permits; gate map key :semaphore consistent across Task 1 impl, tests, and Task 6 checks; run-poll-loop opts keys (:polling? :label :backoff :step-fn) consistent between Task 4 impl/tests and all Task 5 callsites; run-poll-loop treats InterruptedException (sleep! or step-fn) as the stop signal, honoring polling? during the backoff; backoff map keys :base-ms/:max-ms consistent.