Phase 1 Resilience — Plan A: App-Code Resilience Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking. Before editing any .clj file, the implementer must invoke the orcha-clojure-style skill; use the clojure-eval skill (nREPL port 33905) to verify compiles/tests.

Goal: Bound the document-pipeline fan-out with a global heavy-work semaphore, and stop failing-dependency poll loops from CPU-spinning by adding shared exponential backoff + jitter.

Architecture: One new Integrant component holds a fair java.util.concurrent.Semaphore ("heavy-work gate"); every per-message worker task acquires a permit before running and releases it in a finally. A second new namespace provides one shared run-poll-loop that replaces the five duplicated (while @polling? (try … (catch …))) loops, adding backoff-with-full-jitter on error and reset-on-success. No infra changes.

Tech Stack: Clojure, Integrant, aero config, java.util.concurrent, clojure.test.

Source of truth — measured basis: single-page invoice transient ≈ 170 MB (median), spec §1.1. Default permits = 3, env-tunable via ORCHA_HEAVY_CONCURRENCY.


Cross-plan dependencies (read before executing)


File Structure

Ordering rationale: gate component (Task 1-2) → wire gate into workers (Task 3) → shared poll-loop (Task 4) → refactor loops (Task 5) → integration verify (Task 6). The gate is the actual fix; it lands first and is independently valuable even before the poll-loop refactor.


Task 1: Heavy-work gate component

Files:

(ns com.getorcha.workers.concurrency-test
  (:require [clojure.test :refer [deftest is testing]]
            [com.getorcha.workers.concurrency :as concurrency]
            [integrant.core :as ig])
  (:import (java.util.concurrent CountDownLatch TimeUnit)
           (java.util.concurrent.atomic AtomicInteger)))


(deftest gate-init-coerces-permits
  (testing "permits accepts int or string"
    (let [g1 (ig/init-key :com.getorcha.workers.concurrency/heavy-work-gate {:permits 3})
          g2 (ig/init-key :com.getorcha.workers.concurrency/heavy-work-gate {:permits "4"})]
      (is (= 3 (.availablePermits ^java.util.concurrent.Semaphore (:semaphore g1))))
      (is (= 4 (.availablePermits ^java.util.concurrent.Semaphore (:semaphore g2)))))))


(deftest gate-init-rejects-non-positive-permits
  (testing "0 / negative / non-numeric permits fail fast at init, not a silent deadlock"
    ;; A Semaphore with <=0 permits makes every .acquire block forever ->
    ;; all heavy work wedges silently. A bad ORCHA_HEAVY_CONCURRENCY must
    ;; crash startup, not deadlock the workers.
    (doseq [bad [0 -1 "0" "-3" "abc"]]
      (is (thrown? clojure.lang.ExceptionInfo
                   (ig/init-key :com.getorcha.workers.concurrency/heavy-work-gate
                                {:permits bad}))
          (str "should reject " (pr-str bad))))))


(deftest with-permit-caps-concurrency
  (testing "no more than :permits tasks run heavy work simultaneously"
    (let [gate     (ig/init-key :com.getorcha.workers.concurrency/heavy-work-gate {:permits 2})
          inside   (AtomicInteger. 0)
          observed (AtomicInteger. 0)
          start    (CountDownLatch. 1)
          done     (CountDownLatch. 10)
          ;; with-permit returns a Callable — it must be .call'd (workers
          ;; submit it to an executor, which calls .call). Constructing it
          ;; alone acquires nothing and runs nothing.
          task     (fn []
                     (.call ^java.util.concurrent.Callable
                            (concurrency/with-permit gate
                              (fn []
                                (let [n (.incrementAndGet inside)]
                                  (.getAndUpdate observed (reify java.util.function.IntUnaryOperator
                                                            (applyAsInt [_ cur] (max cur n))))
                                  (Thread/sleep 20)
                                  (.decrementAndGet inside)
                                  (.countDown done))))))
          threads  (repeatedly 10 #(Thread. ^Runnable (fn [] (.await start) (task))))]
      (doseq [^Thread t threads] (.start t))
      (.countDown start)
      (is (.await done 10 TimeUnit/SECONDS))
      (is (<= (.get observed) 2)))))


(deftest with-permit-releases-on-exception
  (testing "permit is released even when the task throws"
    (let [gate (ig/init-key :com.getorcha.workers.concurrency/heavy-work-gate {:permits 1})]
      (is (thrown? RuntimeException
                   (.call ^java.util.concurrent.Callable
                          (concurrency/with-permit
                            gate (fn [] (throw (RuntimeException. "boom")))))))
      (is (= 1 (.availablePermits ^java.util.concurrent.Semaphore (:semaphore gate)))))))


(deftest with-permit-runs-runnable-and-callable
  ;; Regression: acquisition submits (mdc/wrap-runnable …) -> a reify
  ;; Runnable, which is neither Callable nor IFn. with-permit must .run it,
  ;; not call it as a fn (ClassCastException). The other four workers pass
  ;; a reify Callable (mdc/wrap-callable) or a bare fn.
  (testing "a reify Runnable is .run, not invoked as a fn"
    (let [gate (ig/init-key :com.getorcha.workers.concurrency/heavy-work-gate {:permits 1})
          ran  (atom false)
          r    (reify Runnable (run [_] (reset! ran true)))]
      (.call ^java.util.concurrent.Callable (concurrency/with-permit gate r))
      (is @ran)
      (is (= 1 (.availablePermits ^java.util.concurrent.Semaphore (:semaphore gate))))))
  (testing "a reify Callable is .call'd and its value returned"
    (let [gate (ig/init-key :com.getorcha.workers.concurrency/heavy-work-gate {:permits 1})
          c    (reify java.util.concurrent.Callable (call [_] :result))]
      (is (= :result (.call ^java.util.concurrent.Callable (concurrency/with-permit gate c))))
      (is (= 1 (.availablePermits ^java.util.concurrent.Semaphore (:semaphore gate))))))
  (testing "a bare fn still works (implements both Callable and IFn)"
    (let [gate (ig/init-key :com.getorcha.workers.concurrency/heavy-work-gate {:permits 1})]
      (is (= :ok (.call ^java.util.concurrent.Callable
                        (concurrency/with-permit gate (fn [] :ok)))))
      (is (= 1 (.availablePermits ^java.util.concurrent.Semaphore (:semaphore gate)))))))

Run: clj-nrepl-eval -p 33905 "(require 'com.getorcha.workers.concurrency-test :reload)" Expected: FAIL — com.getorcha.workers.concurrency does not exist (FileNotFoundException).

(ns com.getorcha.workers.concurrency
  "Global heavy-work concurrency gate.

  A single fair semaphore caps how many memory-heavy document tasks run
  simultaneously across ALL workers, bounding aggregate peak heap regardless
  of fan-out size. Sized from measured ~170 MB/invoice (see
  docs/superpowers/specs/2026-05-16-prod-resilience-design.html §1.1)."
  (:require [clojure.tools.logging :as log]
            [integrant.core :as ig])
  (:import (java.util.concurrent Callable Semaphore)))


(defmethod ig/init-key ::heavy-work-gate
  [_ {:keys [permits] :as _config}]
  (let [n (try
            (cond-> permits (string? permits) Integer/parseInt)
            (catch Exception _ ::invalid))]
    ;; Fail fast on a bad ORCHA_HEAVY_CONCURRENCY. A Semaphore with <=0
    ;; permits never grants an acquire -> every heavy task blocks forever
    ;; (silent, total deadlock). Crash startup instead.
    (when-not (and (integer? n) (pos? n))
      (throw (ex-info "heavy-work-gate :permits must be a positive integer"
                      {:permits permits :parsed n})))
    (log/info "Heavy-work gate initialized" {:permits n})
    {:semaphore (Semaphore. (int n) true)}))


(defmethod ig/halt-key! ::heavy-work-gate
  [_ _gate]
  nil)


(defn with-permit
  "Acquire one permit from `gate`, run `task`, release the permit in a
  `finally`. `task` may be a `Callable`, a `Runnable`, or a 0-arg fn —
  the workers submit a mix: `mdc/wrap-callable` returns a `reify Callable`,
  `mdc/wrap-runnable` returns a `reify Runnable` (NOT an `IFn` — calling it
  as a fn throws `ClassCastException`), and some submit a bare `fn`. Returns
  a `Callable` so it can be passed straight to `ExecutorService.submit`."
  ^Callable [{:keys [^Semaphore semaphore] :as _gate} task]
  (reify Callable
    (call [_]
      (.acquire semaphore)
      (try
        (cond
          (instance? Callable task) (.call ^Callable task)
          (instance? Runnable task) (.run ^Runnable task)
          :else                     (task))
        (finally
          (.release semaphore))))))

Run: clj-nrepl-eval -p 33905 "(require 'com.getorcha.workers.concurrency :reload 'com.getorcha.workers.concurrency-test :reload) (clojure.test/run-tests 'com.getorcha.workers.concurrency-test)" Expected: PASS — 0 failures, 0 errors.

Run: clj-kondo --lint src/com/getorcha/workers/concurrency.clj test/com/getorcha/workers/concurrency_test.clj Expected: 0 errors, 0 warnings (fix any, including info-level).

git add src/com/getorcha/workers/concurrency.clj test/com/getorcha/workers/concurrency_test.clj
git commit -m "feat(workers): add global heavy-work concurrency gate"

Task 2: Register the gate in Integrant config

Files:

Insert this entry near the other worker components (anywhere inside the :integrant/config map, e.g. immediately before :com.getorcha.workers.ap.ingestion/worker-pools at line 221):

 :com.getorcha.workers.concurrency/heavy-work-gate
 {:permits #or [#env ORCHA_HEAVY_CONCURRENCY 3]}

(#or/#env are already used in this file at line 31; if ORCHA_HEAVY_CONCURRENCY is set it arrives as a string and ig/init-key coerces it — covered by Task 1's gate-init-coerces-permits test.)

Run: clj-nrepl-eval -p 33905 --timeout 30000 "(require 'com.getorcha.system :reload) (-> (com.getorcha.system/ig-config {:config-file \"com/getorcha/config.edn\" :profile :prod}) (get :com.getorcha.workers.concurrency/heavy-work-gate))" Expected: {:permits 3} (no aero exception).

git add resources/com/getorcha/config.edn
git commit -m "feat(config): register heavy-work-gate component (default 3, env ORCHA_HEAVY_CONCURRENCY)"

Task 3: Gate the per-message task in all 5 workers

Each worker submits its heavy per-message task with (.submit executor <task>). We pass the gate into each orchestrator via config and wrap the submitted task with concurrency/with-permit. The transformation is identical in shape for all five; exact per-file edits below.

Files:

Add this exact line into each of these 5 component maps in :integrant/config (alongside their existing #integrant/ref keys):

   :heavy-work-gate #integrant/ref :com.getorcha.workers.concurrency/heavy-work-gate

Add heavy-work-gate to the ig/init-key ::orchestrator destructuring (the {:keys [...] :as config} arg whose body is at line 1138) and add the require [com.getorcha.workers.concurrency :as concurrency] (alphabetical order) to the ns form.

Replace line 1163:

                           (.submit executor (job-handler (merge config orchestrator) message))))

with:

                           (.submit executor
                                    (concurrency/with-permit
                                      heavy-work-gate
                                      (job-handler (merge config orchestrator) message)))))

Add [com.getorcha.workers.concurrency :as concurrency] to requires; add heavy-work-gate to the ig/init-key ::orchestrator destructuring. Replace the inner submit at lines 122-138 — change:

                             (when (not= ::ignored q-message)
                               (.submit executor
                                        (mdc/wrap-runnable
                                         (fn []

to wrap the mdc/wrap-runnable form:

                             (when (not= ::ignored q-message)
                               (.submit executor
                                        (concurrency/with-permit
                                          heavy-work-gate
                                          (mdc/wrap-runnable
                                           (fn []

…and add one closing ) to balance the added with-permit call (the form already ends at line 138 with the (finally (AWSXRay/endSegment))))))))) — append exactly one )).

Note (do not regress): acquisition is the only worker that passes (mdc/wrap-runnable …) → a reify Runnable. with-permit (Task 1) dispatches Callable/Runnable/fn via cond, so this is safe. Do not "simplify" with-permit back to a Callable-only check — it is a runtime ClassCastException here that no compile-check catches; covered by with-permit-runs-runnable-and-callable.

Add the require + heavy-work-gate destructuring. Replace lines 254-256:

                           (.submit executor
                                    ^Callable
                                    (mdc/wrap-callable

with:

                           (.submit executor
                                    (concurrency/with-permit
                                      heavy-work-gate
                                      (mdc/wrap-callable

…and add one closing ) after the existing close of the mdc/wrap-callable form (the ))))))) ending the doseq's submit, near line 285).

Add the require + heavy-work-gate destructuring (its ig/init-key ::orchestrator arg map is on line 214). Replace lines 232-235:

                           (.submit executor
                                    ^Callable
                                    (mdc/wrap-callable
                                     #(handle-message! config message)))))

with:

                           (.submit executor
                                    (concurrency/with-permit
                                      heavy-work-gate
                                      (mdc/wrap-callable
                                       #(handle-message! config message))))))

Add the require + add heavy-work-gate to the ig/init-key destructuring arg map (line ~76-77). Replace lines 116-124:

                             (.submit executor
                                      ^Callable
                                      (fn []
                                        (try
                                          (handle-message ctx (.body message))
                                          (aws/delete-message! sqs queue-url message)
                                          (catch Exception e
                                            (log/error e "diagnostics-recompute message processing failed"
                                                       {:message-body (.body message)})))))))

with:

                             (.submit executor
                                      (concurrency/with-permit
                                        heavy-work-gate
                                        (fn []
                                          (try
                                            (handle-message ctx (.body message))
                                            (aws/delete-message! sqs queue-url message)
                                            (catch Exception e
                                              (log/error e "diagnostics-recompute message processing failed"
                                                         {:message-body (.body message)}))))))))

Run: clj-nrepl-eval -p 33905 --timeout 30000 "(doseq [n '[com.getorcha.workers.concurrency com.getorcha.workers.ap.ingestion com.getorcha.workers.ap.acquisition com.getorcha.workers.ap.processors.matching.worker com.getorcha.workers.document-output com.getorcha.workers.diagnostics-recompute]] (require n :reload)) :ok" Expected: :ok (no compilation/reflection errors).

Run: clj-kondo --lint src/com/getorcha/workers Expected: 0 errors, 0 warnings.

git add resources/com/getorcha/config.edn src/com/getorcha/workers
git commit -m "feat(workers): gate all 5 heavy per-message tasks through heavy-work-gate"

Task 4: Shared resilient poll-loop helper

Files:

(ns com.getorcha.workers.poll-loop-test
  (:require [clojure.test :refer [deftest is testing]]
            [com.getorcha.workers.poll-loop :as poll-loop]))


(deftest backoff-sequence-is-exponential-capped
  (let [backoff {:base-ms 1000 :max-ms 30000}]
    (testing "cap doubles from base then clamps at max"
      (is (= [1000 2000 4000 8000 16000 30000 30000 30000]
             (map #(poll-loop/backoff-cap % backoff) (range 0 8)))))
    (testing "full-jitter: every delay is within [0, cap] for that attempt"
      (doseq [attempt (range 0 8)]
        (let [cap (poll-loop/backoff-cap attempt backoff)]
          (dotimes [_ 50]
            (is (<= 0 (poll-loop/backoff-ms attempt backoff) cap))))))
    (testing "high attempt is overflow-safe and stays at max (regression)"
      ;; The attempt counter is unbounded under a persistent failure; a
      ;; large attempt must not throw (long overflow) and must clamp to max.
      (is (= 30000 (poll-loop/backoff-cap 1000 backoff)))
      (is (<= 0 (poll-loop/backoff-ms 1000 backoff) 30000)))))


(deftest runs-step-until-stopped-and-resets-on-success
  (testing "loop runs while polling? true; backoff resets after a success"
    (let [polling? (atom true)
          calls    (atom 0)
          sleeps   (atom [])]
      (with-redefs [poll-loop/sleep! (fn [ms] (swap! sleeps conj ms))]
        (poll-loop/run-poll-loop
         {:polling?  polling?
          :label     "test"
          :backoff   {:base-ms 1000 :max-ms 30000}
          :step-fn   (fn []
                       (let [n (swap! calls inc)]
                         (cond
                           (= n 1) (throw (RuntimeException. "fail-1"))
                           (= n 2) (throw (RuntimeException. "fail-2"))
                           (= n 3) :ok
                           :else   (do (reset! polling? false) :ok))))}))
      (is (= 4 @calls))
      ;; two failures before the first success -> two backoff sleeps,
      ;; attempt counter reset by the success at call 3.
      (is (= 2 (count @sleeps)))
      (is (every? #(<= 0 % 2000) @sleeps)))))


(deftest exits-cleanly-when-interrupted-during-backoff
  (testing "InterruptedException during the backoff sleep stops the loop quietly"
    ;; run-poll-loop's shutdown contract calls (.interrupt
    ;; (Thread/currentThread)) so the executor sees the interrupt. It must
    ;; therefore run on a DEDICATED thread (as it does in prod — an executor
    ;; worker), never the test/nREPL thread, or it would interrupt the test
    ;; runner itself. We capture the return value and the loop thread's own
    ;; post-run interrupt status from inside that thread (so no `dosync`/STM
    ;; or nREPL machinery can consume the flag first).
    (let [polling?          (atom true)
          calls             (atom 0)
          result            (atom ::unset)
          self-interrupted? (atom ::unset)]
      (with-redefs [poll-loop/sleep! (fn [_]
                                       (reset! polling? false)
                                       (throw (InterruptedException. "shutdown")))]
        (let [t (Thread.
                 ^Runnable
                 (fn []
                   (reset! result
                           (poll-loop/run-poll-loop
                            {:polling? polling?
                             :label    "test"
                             :backoff  {:base-ms 1000 :max-ms 30000}
                             :step-fn  (fn []
                                         (swap! calls inc)
                                         (throw (RuntimeException. "always")))}))
                   (reset! self-interrupted?
                           (.isInterrupted (Thread/currentThread)))))]
          (.start t)
          (.join t 5000)
          (is (not (.isAlive t))
              "loop thread terminated (did not hang in backoff after interrupt)")))
      (is (nil? @result)
          "loop swallows the interrupt and returns, does not propagate")
      (is (= 1 @calls) "no further step-fn calls after the interrupt")
      (is (true? @self-interrupted?)
          "run-poll-loop restored the interrupt flag on its own thread before exiting"))))

Run: clj-nrepl-eval -p 33905 "(require 'com.getorcha.workers.poll-loop-test :reload)" Expected: FAIL — namespace com.getorcha.workers.poll-loop not found.

(ns com.getorcha.workers.poll-loop
  "Resilient SQS poll-loop control flow shared by all workers.

  Normal path is unchanged: `step-fn` performs one receive+dispatch and
  `aws/receive-messages` long-polls. On a caught exception we sleep with
  exponential backoff + full jitter (instead of spinning), then retry;
  a successful step resets the backoff. Honors the `polling?` stop atom."
  (:require [clojure.tools.logging :as log]))


(defn backoff-cap
  "Capped exponential delay ceiling for `attempt` (0-based):
  min(max-ms, base-ms * 2^attempt).

  The exponent is clamped to 30. `run-poll-loop`'s attempt counter grows
  unbounded under a persistently-failing dependency; without the clamp
  `base-ms * 2^attempt` overflows `long` (~attempt 54, primitive `*`
  throws `ArithmeticException`) and — since that is not an
  `InterruptedException` — escapes the loop and kills the worker forever.
  Beyond ~attempt 25 the value is already `max-ms`, so clamping at 30 is
  exact for any sane base/max and overflow-proof (base-ms * 2^30 fits long)."
  [attempt {:keys [base-ms max-ms] :as _backoff}]
  (min (long max-ms)
       (* (long base-ms) (long (Math/pow 2 (min (long attempt) 30))))))


(defn backoff-ms
  "Full-jitter delay: a random value in [0, (backoff-cap attempt)]."
  [attempt backoff]
  (long (rand-int (inc (backoff-cap attempt backoff)))))


(defn sleep!
  "Sleep indirection so tests can stub it. `Thread/sleep` propagates
  `InterruptedException`; `run-poll-loop` treats that as its stop signal
  so a shutdown mid-backoff exits promptly instead of waiting out the cap."
  [ms]
  (Thread/sleep (long ms)))


(defn run-poll-loop
  "Run `step-fn` repeatedly while `@polling?`. On exception: log once (if
  still polling) and sleep with backoff+jitter, escalating the attempt
  counter; a successful step resets it. Blocks the calling thread — submit
  it to the worker's executor exactly like the old `(fn [] (while …))`.

  Shutdown contract: `halt-key!` does `(reset! polling? false)` then
  `(.cancel polling-future true)`, which interrupts this thread. An
  `InterruptedException` — whether thrown by `sleep!` mid-backoff or by a
  blocking `step-fn` — is the stop signal: the loop exits quietly (no error
  log), and the thread's interrupt flag is restored so the executor sees it.
  Without this, an interrupt during the up-to-30s backoff would either be
  swallowed by the generic `catch` and retried, or unwind as a logged
  stacktrace — neither of which honors the stop atom during the sleep."
  [{:keys [polling? label backoff step-fn] :as _opts}]
  (try
    (loop [attempt 0]
      (when @polling?
        (let [ok? (try
                    (step-fn)
                    true
                    (catch InterruptedException e
                      (throw e))
                    (catch Exception e
                      (when @polling?
                        (log/error e (str "Error in " label " polling loop, will retry")))
                      false))]
          (if ok?
            (recur 0)
            (do
              (when @polling?
                (sleep! (backoff-ms attempt backoff)))
              (recur (inc attempt)))))))
    (catch InterruptedException _
      ;; Log BEFORE re-arming the interrupt: a logging appender may do an
      ;; interruptible blocking op, which would drop/throw if the thread's
      ;; interrupt flag is already set. Emit the shutdown line first, then
      ;; restore the flag so the executor still sees the interrupt.
      (log/info (str label " poll loop interrupted; stopping"))
      (.interrupt (Thread/currentThread)))))

Run: clj-nrepl-eval -p 33905 "(require 'com.getorcha.workers.poll-loop :reload 'com.getorcha.workers.poll-loop-test :reload) (clojure.test/run-tests 'com.getorcha.workers.poll-loop-test)" Expected: PASS — 0 failures, 0 errors.

Run: clj-kondo --lint src/com/getorcha/workers/poll_loop.clj test/com/getorcha/workers/poll_loop_test.clj Expected: 0 errors, 0 warnings.

git add src/com/getorcha/workers/poll_loop.clj test/com/getorcha/workers/poll_loop_test.clj
git commit -m "feat(workers): add shared resilient poll-loop (backoff + full jitter)"

Task 5: Refactor the 5 outer poll loops onto run-poll-loop

For each worker, replace the (.submit executor ^Callable (fn [] (while @polling? (try <RECEIVE+DISPATCH> (catch Exception e (when @polling? (log/error e "Error in … polling loop, will retry"))))))) with a call that submits a run-poll-loop. The <RECEIVE+DISPATCH> body becomes the :step-fn. Behavior is preserved (normal long-poll unchanged); only the error path gains backoff.

Files: the same 5 worker files. Add [com.getorcha.workers.poll-loop :as poll-loop] to each ns require (alphabetical).

        ^Future polling-future
        (.submit executor
                 ^Callable
                 (fn []
                   (while @polling?
                     (try
                       (let [messages (aws/receive-messages
                                       sqs-client
                                       queue-url
                                       {:max-messages      max-queue-messages
                                        :wait-time-seconds wait-time-seconds})]
                         (when (seq messages)
                           (log/debug "Received" (count messages) "messages"))
                         (doseq [^Message message messages]
                           (.submit executor
                                    (concurrency/with-permit
                                      heavy-work-gate
                                      (job-handler (merge config orchestrator) message)))))
                       (catch Exception e
                         (when @polling?
                           (log/error e "Error in polling loop, will retry")))))))]

with:

        ^Future polling-future
        (.submit executor
                 ^Callable
                 (fn []
                   (poll-loop/run-poll-loop
                    {:polling? polling?
                     :label    "ingestion"
                     :backoff  {:base-ms 1000 :max-ms 30000}
                     :step-fn  (fn []
                                 (let [messages (aws/receive-messages
                                                 sqs-client
                                                 queue-url
                                                 {:max-messages      max-queue-messages
                                                  :wait-time-seconds wait-time-seconds})]
                                   (when (seq messages)
                                     (log/debug "Received" (count messages) "messages"))
                                   (doseq [^Message message messages]
                                     (.submit executor
                                              (concurrency/with-permit
                                                heavy-work-gate
                                                (job-handler (merge config orchestrator) message))))))})))]

Run: clj-nrepl-eval -p 33905 --timeout 30000 "(doseq [n '[com.getorcha.workers.ap.ingestion com.getorcha.workers.ap.acquisition com.getorcha.workers.ap.processors.matching.worker com.getorcha.workers.document-output com.getorcha.workers.diagnostics-recompute]] (require n :reload)) :ok" Expected: :ok.

Run: clj-kondo --lint src/com/getorcha/workers Expected: 0 errors, 0 warnings.

git add src/com/getorcha/workers
git commit -m "refactor(workers): route all 5 poll loops through shared run-poll-loop"

Task 6: Integration verification

Files: none (verification only).

Run: clj-nrepl-eval -p 33905 --timeout 60000 "(require 'integrant.repl) ((resolve 'integrant.repl/reset-all)) (keys integrant.repl.state/system)" Expected: keys include :com.getorcha.workers.concurrency/heavy-work-gate and all 5 worker orchestrators; no exception.

Run: clj-nrepl-eval -p 33905 "(let [g (:com.getorcha.workers.concurrency/heavy-work-gate integrant.repl.state/system)] {:permits (.availablePermits ^java.util.concurrent.Semaphore (:semaphore g))})" Expected: {:permits 3}.

Run: clj -X:test:silent :nses '[com.getorcha.workers.concurrency-test com.getorcha.workers.poll-loop-test]' Expected: Ran N tests … 0 failures, 0 errors.

Run: clj -X:test:silent 2>&1 | grep -A 5 -E "(FAIL in|ERROR in|Execution error|Ran .* tests)" Expected: existing suite green (no new failures vs. baseline). If pre-existing unrelated failures exist, confirm they match the pre-change baseline.

Run: clj-nrepl-eval -p 33905 --timeout 30000 <<'EOF' (let [gate (:com.getorcha.workers.concurrency/heavy-work-gate integrant.repl.state/system) inside (java.util.concurrent.atomic.AtomicInteger. 0) peak (java.util.concurrent.atomic.AtomicInteger. 0) done (java.util.concurrent.CountDownLatch. 12) ex (java.util.concurrent.Executors/newVirtualThreadPerTaskExecutor)] (dotimes [_ 12] (.submit ex (com.getorcha.workers.concurrency/with-permit gate (fn [] (let [n (.incrementAndGet inside)] (.getAndUpdate peak (reify java.util.function.IntUnaryOperator (applyAsInt [_ c] (max c n)))) (Thread/sleep 50) (.decrementAndGet inside) (.countDown done)))))) (.await done) (.shutdown ex) {:peak-concurrency (.get peak)}) EOF Expected: {:peak-concurrency 3} (≤ permits).

git add -A
git commit -m "test(workers): integration verification for Plan A resilience"

Self-Review