Document Output Dispatcher Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Move AP final-approval export and manual DATEV re-export from app-side direct execution to an SQS-driven document output worker, while removing AP batch export.

Architecture: The app creates document_output_job rows as output intent and sends a plain UUID job id to a new :document-output SQS queue. A new worker atomically claims jobs, extends SQS visibility, dispatches the existing DATEV export connector, and updates job status to dispatched or failed. DATEV task outcome remains in ap_datev_export_audit, linked by dispatch_job_id.

Tech Stack: Clojure, Integrant, HoneySQL via com.getorcha.db.sql, PostgreSQL migrations/triggers, AWS SQS helpers in com.getorcha.aws, HTMX/SSE document UI, clojure.test.


File Structure

Task 1: Schema and Redundancy Audit

Files:

Run:

rg -n "datev_export_status|datev_export_task_id|datev_export_error|datev_exported_at|ap_datev_export_audit|tenant_datev_integration|bulk-export-datev|selection-session-key" resources/migrations src test

Expected:

Create resources/migrations/20260501120000-add-document-output-jobs.up.sql:

-- Up: document output jobs

CREATE TYPE document_output_domain AS ENUM ('ap');

--;;

CREATE TYPE document_output_trigger AS ENUM ('approval_completed', 'manual');

--;;

CREATE TYPE document_output_status AS ENUM ('pending', 'running', 'dispatched', 'failed');

--;;

CREATE TABLE document_output_job (
  id               UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  tenant_id        UUID NOT NULL REFERENCES tenant(id) ON DELETE CASCADE,
  document_id      UUID NOT NULL REFERENCES document(id) ON DELETE CASCADE,
  document_domain  document_output_domain NOT NULL,
  trigger          document_output_trigger NOT NULL,
  status           document_output_status NOT NULL DEFAULT 'pending',
  document_version INTEGER,
  requested_by     UUID REFERENCES "identity"(id) ON DELETE SET NULL,
  started_at       TIMESTAMPTZ,
  completed_at     TIMESTAMPTZ,
  last_error       TEXT,
  created_at       TIMESTAMPTZ NOT NULL DEFAULT now(),
  updated_at       TIMESTAMPTZ NOT NULL DEFAULT now()
);

--;;

CREATE INDEX idx_document_output_job_document_created
  ON document_output_job (document_id, created_at DESC);

--;;

CREATE INDEX idx_document_output_job_tenant_created
  ON document_output_job (tenant_id, created_at DESC);

--;;

CREATE INDEX idx_document_output_job_status_created
  ON document_output_job (status, created_at);

--;;

CREATE UNIQUE INDEX idx_document_output_job_document_active
  ON document_output_job (document_id)
  WHERE status IN ('pending', 'running');

--;;

ALTER TABLE ap_datev_export_audit
  ADD COLUMN dispatch_job_id UUID REFERENCES document_output_job(id) ON DELETE SET NULL;

--;;

CREATE INDEX idx_ap_datev_export_audit_dispatch_job
  ON ap_datev_export_audit (dispatch_job_id)
  WHERE dispatch_job_id IS NOT NULL;

--;;

CREATE OR REPLACE FUNCTION notify_document_output_job_event()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
DECLARE
  payload jsonb;
  tenant_org_id uuid;
BEGIN
  SELECT organization_id INTO tenant_org_id
  FROM tenant WHERE id = NEW.tenant_id;

  payload := jsonb_build_object(
    'event/type', 'output-dispatch',
    'document-output-job/id', NEW.id::text,
    'document-output-job/status', NEW.status::text,
    'document/id', NEW.document_id::text,
    'tenant/id', NEW.tenant_id::text,
    'organization/id', tenant_org_id::text,
    'old-status', CASE WHEN TG_OP = 'INSERT' THEN NULL ELSE OLD.status::text END
  );
  PERFORM pg_notify('document_events', payload::text);
  RETURN NEW;
END;
$function$;

--;;

CREATE TRIGGER document_output_job_after_insert
  AFTER INSERT ON document_output_job
  FOR EACH ROW EXECUTE FUNCTION notify_document_output_job_event();

--;;

CREATE TRIGGER document_output_job_after_update_status
  AFTER UPDATE OF status ON document_output_job
  FOR EACH ROW
  WHEN (OLD.status IS DISTINCT FROM NEW.status)
  EXECUTE FUNCTION notify_document_output_job_event();

Create resources/migrations/20260501120000-add-document-output-jobs.down.sql:

-- Down: document output jobs

DROP TRIGGER IF EXISTS document_output_job_after_update_status ON document_output_job;

--;;

DROP TRIGGER IF EXISTS document_output_job_after_insert ON document_output_job;

--;;

DROP FUNCTION IF EXISTS notify_document_output_job_event();

--;;

DROP INDEX IF EXISTS idx_ap_datev_export_audit_dispatch_job;

--;;

ALTER TABLE ap_datev_export_audit
  DROP COLUMN IF EXISTS dispatch_job_id;

--;;

DROP INDEX IF EXISTS idx_document_output_job_document_active;

--;;

DROP INDEX IF EXISTS idx_document_output_job_status_created;

--;;

DROP INDEX IF EXISTS idx_document_output_job_tenant_created;

--;;

DROP INDEX IF EXISTS idx_document_output_job_document_created;

--;;

DROP TABLE IF EXISTS document_output_job;

--;;

DROP TYPE IF EXISTS document_output_status;

--;;

DROP TYPE IF EXISTS document_output_trigger;

--;;

DROP TYPE IF EXISTS document_output_domain;

Run:

clj -X:test:silent :nses '[com.getorcha.db.migrations-test]'

Expected: PASS.

git add resources/migrations/20260501120000-add-document-output-jobs.up.sql resources/migrations/20260501120000-add-document-output-jobs.down.sql
git commit -m "feat(output): add document output job schema"

Task 2: Queue Configuration

Files:

In resources/com/getorcha/config.edn, change the queue map at the top from:

:queues     {:ingestion             "v1-orcha-global-ingest"
             :acquisition           "v1-orcha-global-email-acquire"
             :matching              "v1-orcha-global-doc-matching"
             :diagnostics-recompute "v1-orcha-global-diagnostics-recompute"}

to:

:queues     {:ingestion             "v1-orcha-global-ingest"
             :acquisition           "v1-orcha-global-email-acquire"
             :matching              "v1-orcha-global-doc-matching"
             :diagnostics-recompute "v1-orcha-global-diagnostics-recompute"
             :document-output       "v1-orcha-global-document-output"}

Near the other worker configs in resources/com/getorcha/config.edn, add:

  ;; Workers - Document Output
  ;; -----------------------------------------------------------------------------

  :com.getorcha.workers.document-output/worker-pools
  {:timeout-minutes 5}

  :com.getorcha.workers.document-output/orchestrator
  {:aws                         #integrant/ref :com.getorcha.aws/state
   :db-pool                     #integrant/ref :com.getorcha.db/pool
   :heartbeat-extension-seconds 300
   :heartbeat-rate-seconds      60
   :integrations                #ref [:com.getorcha/integrations]
   :interrupt-on-halt?          #ref [:com.getorcha/dev?]
   :max-queue-messages          10
   :notifications               #ref [:com.getorcha/notifications]
   :stale-running-seconds       600
   :wait-time-seconds           #profile {:test 0 :default 20}
   :worker-pools                #integrant/ref :com.getorcha.workers.document-output/worker-pools}

In scripts/ministack-seed/seed.sh, add:

DOCUMENT_OUTPUT_QUEUE="v1-orcha-global-document-output"

near the other queue names, then add:

ensure_queue_with_dlq "$DOCUMENT_OUTPUT_QUEUE"

near the existing ensure_queue_with_dlq calls.

Run:

clj -M -e "(require 'com.getorcha.system) (keys (com.getorcha.system/ig-config {:config-file \"com/getorcha/config.edn\" :profile :test}))"

Expected: command exits 0 and includes :com.getorcha.workers.document-output/orchestrator in the printed keys.

git add resources/com/getorcha/config.edn scripts/ministack-seed/seed.sh
git commit -m "feat(output): configure document output queue"

Task 3: App-Side Document Output Helper

Files:

Create test/com/getorcha/app/document_output_test.clj:

(ns com.getorcha.app.document-output-test
  (:require [clojure.test :refer [deftest is testing use-fixtures]]
            [com.getorcha.app.document-output :as document-output]
            [com.getorcha.db.sql :as db.sql]
            [com.getorcha.test.fixtures :as fixtures]
            [com.getorcha.test.notification-helpers :as helpers]))


(use-fixtures :once fixtures/with-running-system)
(use-fixtures :each fixtures/with-db-rollback)


(deftest create-job!-inserts-pending-job
  (let [tenant-id (helpers/create-tenant!)
        user-id   (helpers/create-identity!)
        doc-id    (helpers/create-completed-document! tenant-id user-id)
        job       (document-output/create-job!
                   fixtures/*db*
                   {:tenant-id        tenant-id
                    :document-id      doc-id
                    :document-domain  :ap
                    :trigger          :manual
                    :document-version 1
                    :requested-by     user-id})]
    (is (uuid? (:document-output-job/id job)))
    (is (= "pending" (:document-output-job/status job)))
    (is (= "manual" (:document-output-job/trigger job)))))


(deftest guarded-mark-failed-does-not-overwrite-running
  (let [tenant-id (helpers/create-tenant!)
        user-id   (helpers/create-identity!)
        doc-id    (helpers/create-completed-document! tenant-id user-id)
        job       (document-output/create-job!
                   fixtures/*db*
                   {:tenant-id       tenant-id
                    :document-id     doc-id
                    :document-domain :ap
                    :trigger         :manual
                    :requested-by    user-id})]
    (db.sql/execute-one!
     fixtures/*db*
     {:update :document-output-job
      :set    {:status (db.sql/->cast "running" :document-output-status)}
      :where  [:= :id (:document-output-job/id job)]})
    (is (nil? (document-output/mark-send-failed-if-pending!
               fixtures/*db* (:document-output-job/id job) "send failed")))
    (is (= "running"
           (:document-output-job/status
            (db.sql/execute-one!
             fixtures/*db*
             {:select [:status]
              :from   [:document-output-job]
              :where  [:= :id (:document-output-job/id job)]})))))))


(deftest create-job!-rejects-second-active-job-for-document
  (let [tenant-id (helpers/create-tenant!)
        user-id   (helpers/create-identity!)
        doc-id    (helpers/create-completed-document! tenant-id user-id)]
    (document-output/create-job!
     fixtures/*db*
     {:tenant-id       tenant-id
      :document-id     doc-id
      :document-domain :ap
      :trigger         :manual
      :requested-by    user-id})
    (is (thrown? Exception
                 (document-output/create-job!
                  fixtures/*db*
                  {:tenant-id       tenant-id
                   :document-id     doc-id
                   :document-domain :ap
                   :trigger         :manual
                   :requested-by    user-id})))))

Run:

clj -X:test:silent :nses '[com.getorcha.app.document-output-test]'

Expected: FAIL because com.getorcha.app.document-output does not exist.

Create src/com/getorcha/app/document_output.clj:

(ns com.getorcha.app.document-output
  "Document output job creation, dispatch enqueueing, and query helpers."
  (:require [clojure.string :as str]
            [clojure.tools.logging :as log]
            [com.getorcha.aws :as aws]
            [com.getorcha.db.sql :as db.sql]))


(defn ^:private enum-name
  [x]
  (str/replace (name x) "-" "_"))


(defn create-job!
  "Creates a pending document output job and returns it."
  [db {:keys [tenant-id document-id document-domain trigger document-version requested-by]}]
  (db.sql/execute-one!
   db
   {:insert-into :document-output-job
    :values      [(cond-> {:tenant-id       tenant-id
                           :document-id     document-id
                           :document-domain (db.sql/->cast (enum-name document-domain) :document-output-domain)
                           :trigger         (db.sql/->cast (enum-name trigger) :document-output-trigger)
                           :status          (db.sql/->cast "pending" :document-output-status)
                           :requested-by    requested-by}
                    document-version (assoc :document-version document-version))]
    :returning   [:*]}))


(defn mark-send-failed-if-pending!
  "Marks a job failed after an enqueue error only if it is still pending.
   Returns the updated row, or nil if another worker already changed state."
  [db job-id error-message]
  (db.sql/execute-one!
   db
   {:update    :document-output-job
    :set       {:status       (db.sql/->cast "failed" :document-output-status)
                :last-error   error-message
                :completed-at [:now]
                :updated-at   [:now]}
    :where     [:and
                [:= :id job-id]
                [:= :status (db.sql/->cast "pending" :document-output-status)]]
    :returning [:*]}))


(defn enqueue-job!
  "Sends the job id as a plain UUID string to the document-output queue."
  [{:keys [aws] :as _context} job-id]
  (aws/send-message!
   (get-in aws [:clients :sqs])
   (get-in aws [:queue-urls :document-output])
   (str job-id)))


(defn create-and-enqueue-job!
  "Creates a job in `db`, then sends its id to SQS. If SQS send fails, marks
   the job failed only while it is still pending and rethrows the exception."
  [{:keys [db-pool] :as context} job-attrs]
  (let [job    (create-job! db-pool job-attrs)
        job-id (:document-output-job/id job)]
    (try
      (enqueue-job! context job-id)
      job
      (catch Exception e
        (log/warn e "Failed to enqueue document output job" {:job-id job-id})
        (mark-send-failed-if-pending! db-pool job-id (ex-message e))
        (throw e)))))


(defn latest-job
  "Returns the latest document output job for a document, if any."
  [db document-id]
  (db.sql/execute-one!
   db
   {:select   [:*]
    :from     [:document-output-job]
    :where    [:= :document-id document-id]
    :order-by [[:created-at :desc]]
    :limit    1}))

Run:

clj -X:test:silent :nses '[com.getorcha.app.document-output-test]'

Expected: PASS.

git add src/com/getorcha/app/document_output.clj test/com/getorcha/app/document_output_test.clj
git commit -m "feat(output): add document output job helpers"

Files:

Create test/com/getorcha/integrations/ap/maesn_test.clj:

(ns com.getorcha.integrations.ap.maesn-test
  (:require [clojure.test :refer [deftest is testing]]
            [com.getorcha.aws :as aws]
            [com.getorcha.db.sql :as db.sql]
            [com.getorcha.http.client :as http]
            [com.getorcha.integrations.ap.cover-page :as cover-page]
            [com.getorcha.integrations.ap.maesn :as maesn]))


(deftest create-booking-proposal-writes-dispatch-job-id
  (testing "DATEV audit row records the document output job id"
    (let [dispatch-job-id (random-uuid)
          audit-insert    (atom nil)
          document-id     (random-uuid)
          tenant-id       (random-uuid)
          document        {:document/id                 document-id
                           :document/tenant-id          tenant-id
                           :document/file-path          "documents/invoice.pdf"
                           :document/file-original-name "invoice.pdf"
                           :document/structured-data    {:invoice-number "INV-1"
                                                         :currency       "EUR"
                                                         :issue-date     "2026-05-01"
                                                         :issuer         {:name "Supplier"}
                                                         :recipient      {:name "Recipient"}
                                                         :line-items     []}}]
      (with-redefs [maesn/connected-datev-integration
                    (fn [_ctx requested-tenant-id]
                      (is (= tenant-id requested-tenant-id))
                      {:tenant-datev-integration/config {}
                       :tenant-datev-integration/credentials-encrypted (.getBytes "encrypted")})
                    maesn/decrypt-account-key (fn [& _] "account-key")
                    aws/get-object            (fn [& _] (.getBytes "pdf"))
                    cover-page/add-cover-page  (fn [_doc _supplier bytes] bytes)
                    http/request              (fn [_request]
                                                {:status 201
                                                 :body   {:data {:taskId "task-1"
                                                                 :status "IN_PROGRESS"}}})
                    maesn/poll-until-complete! (fn [& _] nil)
                    db.sql/execute-one!
                    (fn
                      ([_db query]
                       (cond
                         (= :ap-datev-export-audit (:insert-into query))
                         (do
                           (reset! audit-insert (-> query :values first))
                           {:ap-datev-export-audit/id (random-uuid)})

                         (= :ap-datev-export-audit (:update query))
                         {}

                         :else
                         nil))
                      ([_db query _opts]
                       (cond
                         (= :ap-datev-export-audit (:insert-into query))
                         (do
                           (reset! audit-insert (-> query :values first))
                           {:ap-datev-export-audit/id (random-uuid)})

                         (= :ap-datev-export-audit (:update query))
                         {}

                         :else
                         nil)))]
        (maesn/create-booking-proposal!
         {:aws             {:s3-buckets {:storage "bucket"}
                            :clients    {:s3 ::s3}}
          :db-pool         ::db
          :dispatch-job-id dispatch-job-id
          :identity        {:identity/id (random-uuid)}
          :integrations    {:datev {:api-key "api-key"}}}
         document)
        (is (= dispatch-job-id (:dispatch-job-id @audit-insert)))))))

Run:

clj -X:test:silent :nses '[com.getorcha.integrations.ap.maesn-test]'

Expected: FAIL until maesn/create-booking-proposal! writes dispatch-job-id.

In src/com/getorcha/integrations/ap/maesn.clj, update the context destructuring in create-booking-proposal!. Add dispatch-job-id to the existing first-argument :keys vector:

{:keys [aws db-pool identity integrations dispatch-job-id] :as context}

Change the audit insert value from:

{:document-id     document-id
 :tenant-id       tenant-id
 :initiated-by    (:identity/id identity)
 :request-payload [:lift proposal]
 :payload-hash    payload-hash}

to:

(cond-> {:document-id     document-id
         :tenant-id       tenant-id
         :initiated-by    (:identity/id identity)
         :request-payload [:lift proposal]
         :payload-hash    payload-hash}
  dispatch-job-id (assoc :dispatch-job-id dispatch-job-id))

Run:

clj -X:test:silent :nses '[com.getorcha.integrations.ap.maesn-test]'

Expected: PASS.

git add src/com/getorcha/integrations/ap/maesn.clj test/com/getorcha/integrations/ap/maesn_test.clj
git commit -m "feat(datev): link export audits to output jobs"

Task 5: Document Output Worker

Files:

Create test/com/getorcha/workers/document_output_test.clj:

(ns com.getorcha.workers.document-output-test
  (:require [clojure.test :refer [deftest is testing use-fixtures]]
            [com.getorcha.app.document-output :as app.output]
            [com.getorcha.db.sql :as db.sql]
            [com.getorcha.integrations.ap.maesn :as maesn]
            [com.getorcha.test.fixtures :as fixtures]
            [com.getorcha.test.notification-helpers :as helpers]
            [com.getorcha.workers.document-output :as worker]))


(use-fixtures :once fixtures/with-running-system)
(use-fixtures :each fixtures/with-db-rollback)


(defn ^:private output-job!
  [status]
  (let [tenant-id (helpers/create-tenant!)
        user-id   (helpers/create-identity!)
        doc-id    (helpers/create-completed-document! tenant-id user-id)
        job       (app.output/create-job!
                   fixtures/*db*
                   {:tenant-id       tenant-id
                    :document-id     doc-id
                    :document-domain :ap
                    :trigger         :approval-completed
                    :requested-by    user-id})]
    (when-not (= status "pending")
      (db.sql/execute-one!
       fixtures/*db*
       {:update :document-output-job
        :set    {:status     (db.sql/->cast status :document-output-status)
                 :updated-at [:now]}
        :where  [:= :id (:document-output-job/id job)]}))
    (assoc job :tenant-id tenant-id :document-id doc-id :user-id user-id)))


(deftest claim-job!-claims-pending
  (let [job (output-job! "pending")
        claimed (worker/claim-job! fixtures/*db*
                                   (:document-output-job/id job)
                                   600)]
    (is (= (:document-output-job/id job)
           (:document-output-job/id claimed)))
    (is (= "running" (:document-output-job/status claimed)))))


(deftest claim-job!-does-not-claim-fresh-running
  (let [job (output-job! "running")]
    (is (nil? (worker/claim-job! fixtures/*db*
                                 (:document-output-job/id job)
                                 600)))))


(deftest claim-job!-reclaims-stale-running
  (let [job (output-job! "running")]
    (db.sql/execute-one!
     fixtures/*db*
     {:update :document-output-job
      :set    {:updated-at [:raw "now() - interval '11 minutes'"]}
      :where  [:= :id (:document-output-job/id job)]})
    (let [claimed (worker/claim-job! fixtures/*db*
                                     (:document-output-job/id job)
                                     600)]
      (is (= (:document-output-job/id job)
             (:document-output-job/id claimed)))
      (is (= "running" (:document-output-job/status claimed))))))


(deftest dispatch-job!-marks-dispatched
  (let [calls (atom 0)
        job   (output-job! "pending")]
    (with-redefs [maesn/export-eligible?          (constantly true)
                  maesn/create-booking-proposal! (fn [ctx _doc]
                                                   (is (= (:document-output-job/id job)
                                                          (:dispatch-job-id ctx)))
                                                   (swap! calls inc)
                                                   {:task-id "task-1"})]
      (worker/dispatch-job!
       {:aws {} :db-pool fixtures/*db* :integrations {}}
       (:document-output-job/id job)
       600)
      (is (= 1 @calls))
      (is (= "dispatched"
             (:document-output-job/status
              (db.sql/execute-one!
               fixtures/*db*
               {:select [:status]
                :from   [:document-output-job]
                :where  [:= :id (:document-output-job/id job)]})))))))


(deftest dispatch-job!-marks-failed-on-ineligible-document
  (let [job (output-job! "pending")]
    (with-redefs [maesn/export-eligible? (constantly false)]
      (worker/dispatch-job!
       {:aws {} :db-pool fixtures/*db* :integrations {}}
       (:document-output-job/id job)
       600)
      (let [row (db.sql/execute-one!
                 fixtures/*db*
                 {:select [:status :last-error]
                  :from   [:document-output-job]
                  :where  [:= :id (:document-output-job/id job)]})]
        (is (= "failed" (:document-output-job/status row)))
        (is (re-find #"not eligible" (:document-output-job/last-error row)))))))

Run:

clj -X:test:silent :nses '[com.getorcha.workers.document-output-test]'

Expected: FAIL because worker namespace does not exist.

Create src/com/getorcha/workers/document_output.clj:

(ns com.getorcha.workers.document-output
  "SQS worker for document output dispatch."
  (:require [clojure.tools.logging :as log]
            [com.getorcha.aws :as aws]
            [com.getorcha.app.http.documents.shared :as documents.shared]
            [com.getorcha.db.sql :as db.sql]
            [com.getorcha.integrations.ap.maesn :as maesn]
            [com.getorcha.logging.mdc :as mdc]
            [integrant.core :as ig])
  (:import (java.util.concurrent Callable Executors ExecutorService Future ScheduledExecutorService ScheduledFuture TimeUnit)
           (software.amazon.awssdk.services.sqs SqsClient)
           (software.amazon.awssdk.services.sqs.model Message)))


(set! *warn-on-reflection* true)


(defmethod ig/init-key ::worker-pools
  [_ {:keys [timeout-minutes]}]
  {:executor        (Executors/newVirtualThreadPerTaskExecutor)
   :heartbeat-pool  (Executors/newScheduledThreadPool 2)
   :timeout-minutes timeout-minutes})


(defmethod ig/halt-key! ::worker-pools
  [_ {:keys [^ExecutorService executor ^ScheduledExecutorService heartbeat-pool timeout-minutes]}]
  (log/info "Shutting down document-output worker pools")
  (.shutdown executor)
  (.shutdown heartbeat-pool)
  (when-not (.awaitTermination executor (long timeout-minutes) TimeUnit/MINUTES)
    (log/warn "Document-output executor did not terminate in time, forcing shutdown")
    (.shutdownNow executor))
  (when-not (.awaitTermination heartbeat-pool 30 TimeUnit/SECONDS)
    (log/warn "Document-output heartbeat pool did not terminate in time, forcing shutdown")
    (.shutdownNow heartbeat-pool)))


(defn claim-job!
  "Atomically claims a pending or stale-running job."
  [db-pool job-id stale-running-seconds]
  (db.sql/execute-one!
   db-pool
   {:update    :document-output-job
    :set       {:status     (db.sql/->cast "running" :document-output-status)
                :started-at [:coalesce :started-at [:now]]
                :updated-at [:now]}
    :where     [:and
                [:= :id job-id]
                [:or
                 [:= :status (db.sql/->cast "pending" :document-output-status)]
                 [:and
                  [:= :status (db.sql/->cast "running" :document-output-status)]
                  [:< :updated-at [:- [:now] [:raw (str "interval '" stale-running-seconds " seconds'")]]]]]]
    :returning [:*]}))


(defn ^:private touch-running!
  [db-pool job-id]
  (db.sql/execute-one!
   db-pool
   {:update :document-output-job
    :set    {:updated-at [:now]}
    :where  [:and
             [:= :id job-id]
             [:= :status (db.sql/->cast "running" :document-output-status)]]}))


(defn ^:private mark-dispatched!
  [db-pool job-id]
  (db.sql/execute-one!
   db-pool
   {:update :document-output-job
    :set    {:status       (db.sql/->cast "dispatched" :document-output-status)
             :completed-at [:now]
             :updated-at   [:now]}
    :where  [:and
             [:= :id job-id]
             [:= :status (db.sql/->cast "running" :document-output-status)]]}))


(defn ^:private mark-failed!
  [db-pool job-id error-message]
  (db.sql/execute-one!
   db-pool
   {:update :document-output-job
    :set    {:status       (db.sql/->cast "failed" :document-output-status)
             :last-error   error-message
             :completed-at [:now]
             :updated-at   [:now]}
    :where  [:and
             [:= :id job-id]
             [:= :status (db.sql/->cast "running" :document-output-status)]]}))


(defn ^:private latest-linked-audit
  [db-pool job-id]
  (db.sql/execute-one!
   db-pool
   {:select   [:*]
    :from     [:ap-datev-export-audit]
    :where    [:= :dispatch-job-id job-id]
    :order-by [[:initiated-at :desc]]
    :limit    1}))


(def ^:private terminal-audit-statuses
  #{"SUCCESS"
    "FAILED"
    "PARTIAL_SUCCESS"
    "CANCELLED_SUCCESSFULLY"
    "CANCEL_FAILED"})


(defn ^:private audit-dispatched?
  [audit]
  (or (:ap-datev-export-audit/task-id audit)
      (contains? terminal-audit-statuses
                 (:ap-datev-export-audit/status audit))))


(defn ^:private load-document
  [db-pool document-id]
  (db.sql/execute-one!
   db-pool
   {:select [:*]
    :from   [:document]
    :where  [:= :id document-id]}
   {:builder-fn documents.shared/document-builder-fn}))


(defn dispatch-job!
  "Claims and dispatches a document output job. Public for focused tests."
  [{:keys [db-pool] :as context} job-id stale-running-seconds]
  (if-let [job (claim-job! db-pool job-id stale-running-seconds)]
    (let [document-id (:document-output-job/document-id job)
          doc         (load-document db-pool document-id)
          audit       (latest-linked-audit db-pool job-id)]
      (try
        (cond
          (nil? doc)
          (mark-failed! db-pool job-id "document not found")

          (audit-dispatched? audit)
          (mark-dispatched! db-pool job-id)

          audit
          (do (log/warn "Output dispatch found a linked DATEV audit without a task id or terminal status; deferring to operator review"
                        {:job-id   job-id
                         :audit-id (:ap-datev-export-audit/id audit)})
              (mark-failed! db-pool job-id
                            "Previous DATEV export attempt is in an inconsistent state and needs operator review"))

          (not (maesn/export-eligible? db-pool doc))
          (mark-failed! db-pool job-id "document not eligible for DATEV export")

          :else
          (do
            (maesn/create-booking-proposal!
             (assoc context :dispatch-job-id job-id)
             doc)
            (mark-dispatched! db-pool job-id)))
        (catch Exception e
          (log/warn e "Document output dispatch failed" {:job-id job-id :document-id document-id})
          (mark-failed! db-pool job-id (ex-message e)))))
    (log/info "Document output job was not claimable" {:job-id job-id})))


(defn ^:private handle-message!
  [{:keys [aws db-pool heartbeat-extension-seconds heartbeat-rate-seconds stale-running-seconds worker-pools]
    :as   context}
   ^Message message]
  (let [job-id     (parse-uuid (.body message))
        sqs-client (get-in aws [:clients :sqs])
        queue-url  (get-in aws [:queue-urls :document-output])]
    (if (nil? job-id)
      (do
        (log/error "Invalid document output job id" {:message-body (.body message)})
        (aws/delete-message! sqs-client queue-url message))
      (let [{:keys [^ScheduledExecutorService heartbeat-pool]} worker-pools
            _                (aws/extend-visibility! sqs-client queue-url message heartbeat-extension-seconds)
            heartbeat        (mdc/wrap-runnable
                               (fn []
                                 (aws/extend-visibility! sqs-client queue-url message heartbeat-extension-seconds)
                                 (touch-running! db-pool job-id)))
            heartbeat-future ^ScheduledFuture (.scheduleAtFixedRate
                                               heartbeat-pool
                                               ^Runnable heartbeat
                                               (long heartbeat-rate-seconds)
                                               (long heartbeat-rate-seconds)
                                               TimeUnit/SECONDS)]
        (try
          (dispatch-job! context job-id stale-running-seconds)
          (let [status (:document-output-job/status
                        (db.sql/execute-one!
                         db-pool
                         {:select [:status]
                          :from   [:document-output-job]
                          :where  [:= :id job-id]}))]
            (when (#{"dispatched" "failed"} status)
              (aws/delete-message! sqs-client queue-url message)))
          (finally
            (.cancel heartbeat-future false)))))))


(defmethod ig/init-key ::orchestrator
  [_ {:keys [aws interrupt-on-halt? max-queue-messages wait-time-seconds worker-pools] :as config}]
  (let [{:keys [^ExecutorService executor]} worker-pools
        polling?                            (atom true)
        ^SqsClient sqs-client               (get-in aws [:clients :sqs])
        queue-url                           (get-in aws [:queue-urls :document-output])
        ^Future polling-future
        (.submit executor
                 ^Callable
                 (fn []
                   (while @polling?
                     (try
                       (let [messages (aws/receive-messages
                                       sqs-client
                                       queue-url
                                       {:max-messages      max-queue-messages
                                        :wait-time-seconds wait-time-seconds})]
                         (doseq [^Message message messages]
                           (.submit executor
                                    ^Callable
                                    (mdc/wrap-callable
                                     #(handle-message! config message)))))
                       (catch Exception e
                         (when @polling?
                           (log/error e "Error in document-output polling loop, will retry")))))))]
    (log/info "Starting document-output orchestrator for queue" (get-in aws [:queues :document-output]))
    {:polling?           polling?
     :polling-future     polling-future
     :interrupt-on-halt? interrupt-on-halt?}))


(defmethod ig/halt-key! ::orchestrator
  [_ {:keys [polling? ^Future polling-future interrupt-on-halt?]}]
  (log/info "Stopping document-output orchestrator")
  (reset! polling? false)
  (when interrupt-on-halt?
    (.cancel polling-future true)))

Run:

clj -X:test:silent :nses '[com.getorcha.workers.document-output-test]'

Expected: PASS.

git add src/com/getorcha/workers/document_output.clj test/com/getorcha/workers/document_output_test.clj
git commit -m "feat(output): add document output worker"

Task 6: Final Approval Enqueues Output Job

Files:

In test/com/getorcha/app/http/documents/view/approval_test.clj, replace auto-fire-on-final-approval-test and auto-fire-skipped-when-not-eligible-test with tests that assert job creation and enqueue. Use with-redefs around document-output/enqueue-job!:

(deftest final-approval-creates-and-enqueues-output-job-test
  (let [enqueued (atom [])
        {:keys [doc-id alice bob approval-rows]} (setup-doc-with-approvers!)]
    (with-redefs [com.getorcha.app.document-output/enqueue-job!
                  (fn [_ctx job-id] (swap! enqueued conj job-id) "msg-1")]
      (invoke-handler
       #'com.getorcha.app.http.documents.view.approval/approve!
       {:identity   {:identity/id alice :identity/is-super-admin false}
        :parameters {:path {:document-id doc-id
                            :approval-id (:ap-invoice-approval/id (first approval-rows))}}})
      (is (empty? @enqueued))
      (invoke-handler
       #'com.getorcha.app.http.documents.view.approval/approve!
       {:aws        {}
        :identity   {:identity/id bob :identity/is-super-admin false}
        :parameters {:path {:document-id doc-id
                            :approval-id (:ap-invoice-approval/id (second approval-rows))}}})
      (is (= 1 (count @enqueued)))
      (let [job (db.sql/execute-one!
                 fixtures/*db*
                 {:select [:*]
                  :from   [:document-output-job]
                  :where  [:= :document-id doc-id]})]
        (is (= (first @enqueued) (:document-output-job/id job)))
        (is (= "approval_completed" (:document-output-job/trigger job)))
        (is (= "pending" (:document-output-job/status job)))))))

Run:

clj -X:test:silent :nses '[com.getorcha.app.http.documents.view.approval-test]'

Expected: FAIL because approval handler still calls Maesn directly.

In src/com/getorcha/app/http/documents/view/approval.clj:

[com.getorcha.app.document-output :as document-output]

Inside the final approval branch, after deriving updated-rows, only fetch the document and create the job when the document is fully approved (skip the fetch for non-final approvals):

(let [job (when (= :fully-approved (derive-state updated-rows))
            (let [document (db.sql/execute-one!
                            tx
                            {:select [:id :tenant-id :version]
                             :from   [:document]
                             :where  [:= :id document-id]})]
              (document-output/create-job!
               tx
               {:tenant-id        (:document/tenant-id document)
                :document-id      document-id
                :document-domain  :ap
                :trigger          :approval-completed
                :document-version (:document/version document)
                :requested-by     (:identity/id identity)})))]
  [(ring.resp/ok "approved")
   job])

After the transaction commits:

(when job
  (try
    (document-output/enqueue-job! request (:document-output-job/id job))
    (catch Exception e
      (document-output/mark-send-failed-if-pending!
       db-pool
       (:document-output-job/id job)
       (ex-message e))
      nil)))

Keep the approval response as 200 approved even if SQS dispatch fails, because the approval itself succeeded and the output job records dispatch failure.

Run:

clj -X:test:silent :nses '[com.getorcha.app.http.documents.view.approval-test]'

Expected: PASS.

git add src/com/getorcha/app/http/documents/view/approval.clj test/com/getorcha/app/http/documents/view/approval_test.clj
git commit -m "feat(approvals): enqueue output after final approval"

Task 7: Manual Re-Export Routes Through Output Job

Files:

This step lands the signature change without rendering logic so the manual re-export rewrite in Step 4 can pass the new arg without leaving the tree non-compiling between commits. Task 8 fills in the rendering.

In src/com/getorcha/app/http/documents/view/invoice.clj, change the datev-export-section parameter vector from:

[router document-id export-audit can-export? pending-approval?]

to:

[router document-id export-audit can-export? pending-approval? output-job]

Leave the body unchanged for now; output-job is unused until Task 8.

Update every existing caller of datev-export-section to pass nil (or an already-fetched output job where one is convenient) as the new sixth argument. Search:

rg -n "datev-export-section" src test

Expected: every call site passes six args; project still compiles.

Update test/com/getorcha/app/http/documents/view/invoice_test.clj: the current export-datev-uses-tenant-datev-integration-credentials should be replaced with a test that stubs document-output/create-and-enqueue-job! and asserts maesn/create-booking-proposal! is not called.

Add:

(deftest export-datev-creates-output-job-instead-of-calling-datev
  (testing "manual re-export dispatches through document output"
    (let [document-id #uuid "019dce7b-aefc-709d-b04b-3c989d7e9d95"
          tenant-id   #uuid "00000000-0000-0000-0000-000000000001"
          job-attrs   (atom nil)]
      (with-redefs [db.sql/execute-one! (fn [& _]
                                          {:document/id        document-id
                                           :document/tenant-id tenant-id
                                           :document/version   3})
                    shared/tenant-ids   (constantly [tenant-id])
                    com.getorcha.app.document-output/create-and-enqueue-job!
                    (fn [_request attrs]
                      (reset! job-attrs attrs)
                      {:document-output-job/id (random-uuid)
                       :document-output-job/status "pending"})
                    maesn/create-booking-proposal!
                    (fn [& _] (throw (ex-info "should not call DATEV" {})))
                    maesn/get-latest-export-audit (constantly nil)]
        (#'view.invoice/export-datev
         {:aws           {}
          :db-pool       ::db
          :identity      {:identity/id #uuid "00000000-0000-0000-0000-000000000010"}
          :integrations  {:datev {}}
          :parameters    {:path {:document-id document-id}}
          ::reitit/router test-router}
         (fn [_response])
         (fn [_error]))
        (is (= {:tenant-id        tenant-id
                :document-id      document-id
                :document-domain  :ap
                :trigger          :manual
                :document-version 3
                :requested-by     #uuid "00000000-0000-0000-0000-000000000010"}
               @job-attrs))))))

Add a second test that asserts a non-uniqueness exception surfaces a synthetic failed-dispatch marker to the UI rather than swallowing it:

(deftest export-datev-surfaces-synthetic-failure-on-non-uniqueness-error
  (testing "non-uniqueness errors render a dispatch-failed UI marker"
    (let [document-id #uuid "019dce7b-aefc-709d-b04b-3c989d7e9d96"
          tenant-id   #uuid "00000000-0000-0000-0000-000000000001"
          captured    (atom nil)]
      (with-redefs [db.sql/execute-one! (fn [& _]
                                          {:document/id        document-id
                                           :document/tenant-id tenant-id
                                           :document/version   3})
                    shared/tenant-ids   (constantly [tenant-id])
                    com.getorcha.app.document-output/create-and-enqueue-job!
                    (fn [& _] (throw (ex-info "DB unavailable" {})))
                    com.getorcha.app.document-output/latest-job (constantly nil)
                    maesn/get-latest-export-audit (constantly nil)
                    view.invoice/datev-export-section
                    (fn [_router _doc-id _audit _can-export? _pending? job]
                      (reset! captured job)
                      [:section])]
        (#'view.invoice/export-datev
         {:aws            {}
          :db-pool        ::db
          :identity       {:identity/id #uuid "00000000-0000-0000-0000-000000000010"}
          :integrations   {:datev {}}
          :parameters     {:path {:document-id document-id}}
          ::reitit/router test-router}
         (fn [_response])
         (fn [_error]))
        (is (= "failed" (:document-output-job/status @captured)))
        (is (= "DB unavailable" (:document-output-job/last-error @captured)))))))

Run:

clj -X:test:silent :nses '[com.getorcha.app.http.documents.view.invoice-test]'

Expected: FAIL because export handler still calls DATEV.

In src/com/getorcha/app/http/documents/view/invoice.clj:

[com.getorcha.app.document-output :as document-output]
(let [job (try
            (document-output/create-and-enqueue-job!
             request
             {:tenant-id        (:document/tenant-id document)
              :document-id      document-id
              :document-domain  :ap
              :trigger          :manual
              :document-version (:document/version document)
              :requested-by     (:identity/id (:identity request))})
            (catch java.sql.SQLException e
              (if (= "23505" (.getSQLState e))
                ;; Active job already exists for this document; show its state.
                (do (log/info "Manual re-export rejected: dispatch already active"
                              {:document-id document-id})
                    (document-output/latest-job db-pool document-id))
                (do (log/error e "Failed to dispatch DATEV export"
                               {:document-id document-id})
                    {:document-output-job/status     "failed"
                     :document-output-job/last-error (ex-message e)})))
            (catch Exception e
              (log/error e "Failed to dispatch DATEV export"
                         {:document-id document-id})
              ;; Synthesize a transient failed-dispatch marker for the UI so the
              ;; user sees an explicit error instead of an unchanged section.
              {:document-output-job/status     "failed"
               :document-output-job/last-error (ex-message e)}))
      audit (maesn/get-latest-export-audit db-pool document-id)]
  (respond (ring.resp/ok
            (datev-export-section router document-id audit false false job))))

The two catch branches differentiate uniqueness violations (the user double-clicked or a previous dispatch is still in flight) from other failures (transient DB error, etc.). Uniqueness falls back to the live job; everything else surfaces a synthetic failed marker so the section renders an explicit "dispatch failed" state instead of silently keeping the previous render.

Run:

clj -X:test:silent :nses '[com.getorcha.app.http.documents.view.invoice-test]'

Expected: PASS.

git add src/com/getorcha/app/http/documents/view/invoice.clj src/com/getorcha/app/http/documents/view/shared.clj test/com/getorcha/app/http/documents/view/invoice_test.clj
git commit -m "feat(datev): dispatch manual exports through output jobs"

Task 8: UI, SSE, and Output Dispatch Events

Files:

The signature was already extended in Task 7 Step 1 and output-job is currently unused. This step adds the rendering logic that consumes it.

In src/com/getorcha/app/http/documents/view/invoice.clj, add these bindings to the existing let that computes the DATEV display state:

[dispatch-status (:document-output-job/status output-job)
 dispatch-error  (:document-output-job/last-error output-job)
 dispatch-active? (#{"pending" "running"} dispatch-status)
 dispatch-failed? (and (= "failed" dispatch-status)
                       (nil? status))]

Extend the existing badge cond with these branches before the pending-approval? branch:

dispatch-active?
[:span.section-badge.badge-datev-pending "Dispatching..."]

dispatch-failed?
[:span.section-badge.badge-datev-failed "Dispatch failed"]

pending-approval?
[:span.section-badge.badge-datev-pending "Awaiting approval"]

Render dispatch error before DATEV logs:

(when dispatch-failed?
  [:div.datev-logs
   [:div.datev-log-entry.failed
    [:div.log-message (or dispatch-error "Export dispatch failed")]]])

Keep SSE subscription active when dispatch-active? is true:

(when (or is-pending? dispatch-active?)
  [:div {:hx-ext      "sse"
         :sse-connect (app.http.routes/path-for router :com.getorcha.app.http.documents.view/detail-events {:document-id document-id})
         :sse-swap    "export-status-changed"
         :hx-target   "#section-datev-export"
         :hx-swap     "outerHTML"}])

After this step, update any caller that previously passed nil as the sixth argument to pass (document-output/latest-job db-pool document-id) (or an already-fetched output-job) so dispatch state actually renders. Confirm with:

rg -n "datev-export-section" src test

In src/com/getorcha/app/ingestion.clj, extend the document event schema/parser to allow :output-dispatch with keys emitted by the trigger. Follow the existing :export event shape.

In src/com/getorcha/app/http/documents/view/shared.clj, in detail-events, add:

:output-dispatch
(let [audit       (maesn/get-latest-export-audit db-pool document-id)
      output-job  (document-output/latest-job db-pool document-id)
      status      (:ap-datev-export-audit/status audit)
      can-export? (#{"FAILED" "PARTIAL_SUCCESS" "CANCELLED_SUCCESSFULLY" "CANCEL_FAILED"} status)
      rows        (view.approval/fetch-approval-rows db-pool document-id)
      pending-approval? (= :pending (view.approval/derive-state rows))]
  {:event "export-status-changed"
   :data  (hiccup/html
           (view.invoice/datev-export-section router document-id audit can-export? pending-approval? output-job))})

Add require:

[com.getorcha.app.document-output :as document-output]

In test/com/getorcha/app/http/documents/view/invoice_test.clj, add a Hiccup string test:

(deftest datev-export-section-shows-dispatch-failure
  (let [html (str (hiccup2.core/html
                   (view.invoice/datev-export-section
                    test-router
                    #uuid "019dce7b-aefc-709d-b04b-3c989d7e9d95"
                    nil
                    true
                    false
                    {:document-output-job/status "failed"
                     :document-output-job/last-error "SQS unavailable"})))]
    (is (clojure.string/includes? html "Dispatch failed"))
    (is (clojure.string/includes? html "SQS unavailable"))))

Add hiccup2.core and clojure.string requires if missing.

Run:

clj -X:test:silent :nses '[com.getorcha.app.http.documents.view.invoice-test com.getorcha.app.http.documents.view.shared-test]'

Expected: PASS.

git add src/com/getorcha/app/ingestion.clj src/com/getorcha/app/http/documents/view/invoice.clj src/com/getorcha/app/http/documents/view/shared.clj test/com/getorcha/app/http/documents/view/invoice_test.clj test/com/getorcha/app/http/documents/view/shared_test.clj
git commit -m "feat(output): surface document output dispatch status"

Task 9: Remove AP Batch Export and Selection UI

Files:

Run:

rg -n "bulk-actions-bar|select-all-cell|bulk-toggle|bulk-toggle-all|bulk-deselect-all|bulk-export-datev|selection-session-key|get-selection|fetch-exportable-ids|selected-count|bulk-export-btn|checkbox-cell" src/com/getorcha/app/http/ap.clj test/com/getorcha/app/http/documents_test.clj

Expected: references are only AP batch selection/export. If a symbol is used for another bulk action, do not remove it without replacing that action.

In src/com/getorcha/app/http/ap.clj:

After editing, this search should return no results in src/com/getorcha/app/http/ap.clj:

rg -n "bulk-actions-bar|select-all-cell|selected-count|bulk-export-btn|checkbox-cell" src/com/getorcha/app/http/ap.clj

In src/com/getorcha/app/http/ap.clj, delete:

Remove the route entries whose first route segment is "/toggle/:document-id", "/toggle-all", "/deselect-all", and "/export-datev".

In test/com/getorcha/app/http/documents_test.clj, delete bulk-export-datev-test and any tests that only cover selection toggles. Keep tests for AP list rendering, status filters, and non-bulk behavior.

Run:

clj -X:test:silent :nses '[com.getorcha.app.http.documents-test]'

Expected: PASS.

git add src/com/getorcha/app/http/ap.clj test/com/getorcha/app/http/documents_test.clj
git commit -m "remove(ap): drop batch DATEV export"

Task 10: Integration Verification

Files:

Run:

clj -X:test:silent :nses '[com.getorcha.app.document-output-test com.getorcha.workers.document-output-test com.getorcha.app.http.documents.view.approval-test com.getorcha.app.http.documents.view.invoice-test com.getorcha.app.http.documents.view.shared-test com.getorcha.app.http.documents-test]'

Expected: PASS.

Run:

clj-kondo --lint src test dev

Expected: no findings introduced by this work.

Run:

clj -X:test

Expected: PASS.

Run:

rg -n "Thread/startVirtualThread|create-booking-proposal!|bulk-export-datev|bulk-toggle|selected-count|dispatch-job-id|document-output" src test resources/com/getorcha/config.edn

Expected:

Self-Review Checklist