For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Move AP final-approval export and manual DATEV re-export from app-side direct execution to an SQS-driven document output worker, while removing AP batch export.
Architecture: The app creates document_output_job rows as output intent and sends a plain UUID job id to a new :document-output SQS queue. A new worker atomically claims jobs, extends SQS visibility, dispatches the existing DATEV export connector, and updates job status to dispatched or failed. DATEV task outcome remains in ap_datev_export_audit, linked by dispatch_job_id.
Tech Stack: Clojure, Integrant, HoneySQL via com.getorcha.db.sql, PostgreSQL migrations/triggers, AWS SQS helpers in com.getorcha.aws, HTMX/SSE document UI, clojure.test.
resources/migrations/20260501120000-add-document-output-jobs.up.sql
document_output_job, partial active-job index, ap_datev_export_audit.dispatch_job_id, and document_output_job notify trigger.resources/migrations/20260501120000-add-document-output-jobs.down.sql
dispatch_job_id, document_output_job, indexes, and enums.resources/com/getorcha/config.edn
:document-output queue and worker Integrant config.scripts/ministack-seed/seed.sh
v1-orcha-global-document-output and DLQ locally.src/com/getorcha/app/document_output.clj
src/com/getorcha/workers/document_output.clj
src/com/getorcha/integrations/ap/maesn.clj
:dispatch-job-id in context and writes ap_datev_export_audit.dispatch_job_id.src/com/getorcha/app/http/documents/view/approval.clj
fire-export-async! with output job creation and SQS dispatch after final approval.src/com/getorcha/app/http/documents/view/invoice.clj
src/com/getorcha/app/http/documents/view/shared.clj
:output-dispatch events.src/com/getorcha/app/ingestion.clj
output-dispatch if the publisher validation requires it.src/com/getorcha/app/http/ap.clj
bulk-export-datev.test/com/getorcha/app/document_output_test.cljtest/com/getorcha/workers/document_output_test.cljtest/com/getorcha/app/http/documents/view/approval_test.cljtest/com/getorcha/app/http/documents/view/invoice_test.cljtest/com/getorcha/app/http/documents_test.cljFiles:
Create: resources/migrations/20260501120000-add-document-output-jobs.up.sql
Create: resources/migrations/20260501120000-add-document-output-jobs.down.sql
Step 1: Verify current export schema before adding new migration
Run:
rg -n "datev_export_status|datev_export_task_id|datev_export_error|datev_exported_at|ap_datev_export_audit|tenant_datev_integration|bulk-export-datev|selection-session-key" resources/migrations src test
Expected:
ap_datev_export_audit is active and must remain.
tenant_datev_integration is active and must remain.
legacy tenant-level datev_export_* columns only appear in old squashed migration history and are not active source code fields.
AP batch export and selection references exist and will be removed in Task 9.
Step 2: Write the migration
Create resources/migrations/20260501120000-add-document-output-jobs.up.sql:
-- Up: document output jobs
CREATE TYPE document_output_domain AS ENUM ('ap');
--;;
CREATE TYPE document_output_trigger AS ENUM ('approval_completed', 'manual');
--;;
CREATE TYPE document_output_status AS ENUM ('pending', 'running', 'dispatched', 'failed');
--;;
CREATE TABLE document_output_job (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL REFERENCES tenant(id) ON DELETE CASCADE,
document_id UUID NOT NULL REFERENCES document(id) ON DELETE CASCADE,
document_domain document_output_domain NOT NULL,
trigger document_output_trigger NOT NULL,
status document_output_status NOT NULL DEFAULT 'pending',
document_version INTEGER,
requested_by UUID REFERENCES "identity"(id) ON DELETE SET NULL,
started_at TIMESTAMPTZ,
completed_at TIMESTAMPTZ,
last_error TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now()
);
--;;
CREATE INDEX idx_document_output_job_document_created
ON document_output_job (document_id, created_at DESC);
--;;
CREATE INDEX idx_document_output_job_tenant_created
ON document_output_job (tenant_id, created_at DESC);
--;;
CREATE INDEX idx_document_output_job_status_created
ON document_output_job (status, created_at);
--;;
CREATE UNIQUE INDEX idx_document_output_job_document_active
ON document_output_job (document_id)
WHERE status IN ('pending', 'running');
--;;
ALTER TABLE ap_datev_export_audit
ADD COLUMN dispatch_job_id UUID REFERENCES document_output_job(id) ON DELETE SET NULL;
--;;
CREATE INDEX idx_ap_datev_export_audit_dispatch_job
ON ap_datev_export_audit (dispatch_job_id)
WHERE dispatch_job_id IS NOT NULL;
--;;
CREATE OR REPLACE FUNCTION notify_document_output_job_event()
RETURNS trigger
LANGUAGE plpgsql
AS $function$
DECLARE
payload jsonb;
tenant_org_id uuid;
BEGIN
SELECT organization_id INTO tenant_org_id
FROM tenant WHERE id = NEW.tenant_id;
payload := jsonb_build_object(
'event/type', 'output-dispatch',
'document-output-job/id', NEW.id::text,
'document-output-job/status', NEW.status::text,
'document/id', NEW.document_id::text,
'tenant/id', NEW.tenant_id::text,
'organization/id', tenant_org_id::text,
'old-status', CASE WHEN TG_OP = 'INSERT' THEN NULL ELSE OLD.status::text END
);
PERFORM pg_notify('document_events', payload::text);
RETURN NEW;
END;
$function$;
--;;
CREATE TRIGGER document_output_job_after_insert
AFTER INSERT ON document_output_job
FOR EACH ROW EXECUTE FUNCTION notify_document_output_job_event();
--;;
CREATE TRIGGER document_output_job_after_update_status
AFTER UPDATE OF status ON document_output_job
FOR EACH ROW
WHEN (OLD.status IS DISTINCT FROM NEW.status)
EXECUTE FUNCTION notify_document_output_job_event();
Create resources/migrations/20260501120000-add-document-output-jobs.down.sql:
-- Down: document output jobs
DROP TRIGGER IF EXISTS document_output_job_after_update_status ON document_output_job;
--;;
DROP TRIGGER IF EXISTS document_output_job_after_insert ON document_output_job;
--;;
DROP FUNCTION IF EXISTS notify_document_output_job_event();
--;;
DROP INDEX IF EXISTS idx_ap_datev_export_audit_dispatch_job;
--;;
ALTER TABLE ap_datev_export_audit
DROP COLUMN IF EXISTS dispatch_job_id;
--;;
DROP INDEX IF EXISTS idx_document_output_job_document_active;
--;;
DROP INDEX IF EXISTS idx_document_output_job_status_created;
--;;
DROP INDEX IF EXISTS idx_document_output_job_tenant_created;
--;;
DROP INDEX IF EXISTS idx_document_output_job_document_created;
--;;
DROP TABLE IF EXISTS document_output_job;
--;;
DROP TYPE IF EXISTS document_output_status;
--;;
DROP TYPE IF EXISTS document_output_trigger;
--;;
DROP TYPE IF EXISTS document_output_domain;
Run:
clj -X:test:silent :nses '[com.getorcha.db.migrations-test]'
Expected: PASS.
git add resources/migrations/20260501120000-add-document-output-jobs.up.sql resources/migrations/20260501120000-add-document-output-jobs.down.sql
git commit -m "feat(output): add document output job schema"
Files:
Modify: resources/com/getorcha/config.edn
Modify: scripts/ministack-seed/seed.sh
Step 1: Add the queue name to config
In resources/com/getorcha/config.edn, change the queue map at the top from:
:queues {:ingestion "v1-orcha-global-ingest"
:acquisition "v1-orcha-global-email-acquire"
:matching "v1-orcha-global-doc-matching"
:diagnostics-recompute "v1-orcha-global-diagnostics-recompute"}
to:
:queues {:ingestion "v1-orcha-global-ingest"
:acquisition "v1-orcha-global-email-acquire"
:matching "v1-orcha-global-doc-matching"
:diagnostics-recompute "v1-orcha-global-diagnostics-recompute"
:document-output "v1-orcha-global-document-output"}
Near the other worker configs in resources/com/getorcha/config.edn, add:
;; Workers - Document Output
;; -----------------------------------------------------------------------------
:com.getorcha.workers.document-output/worker-pools
{:timeout-minutes 5}
:com.getorcha.workers.document-output/orchestrator
{:aws #integrant/ref :com.getorcha.aws/state
:db-pool #integrant/ref :com.getorcha.db/pool
:heartbeat-extension-seconds 300
:heartbeat-rate-seconds 60
:integrations #ref [:com.getorcha/integrations]
:interrupt-on-halt? #ref [:com.getorcha/dev?]
:max-queue-messages 10
:notifications #ref [:com.getorcha/notifications]
:stale-running-seconds 600
:wait-time-seconds #profile {:test 0 :default 20}
:worker-pools #integrant/ref :com.getorcha.workers.document-output/worker-pools}
In scripts/ministack-seed/seed.sh, add:
DOCUMENT_OUTPUT_QUEUE="v1-orcha-global-document-output"
near the other queue names, then add:
ensure_queue_with_dlq "$DOCUMENT_OUTPUT_QUEUE"
near the existing ensure_queue_with_dlq calls.
Run:
clj -M -e "(require 'com.getorcha.system) (keys (com.getorcha.system/ig-config {:config-file \"com/getorcha/config.edn\" :profile :test}))"
Expected: command exits 0 and includes :com.getorcha.workers.document-output/orchestrator in the printed keys.
git add resources/com/getorcha/config.edn scripts/ministack-seed/seed.sh
git commit -m "feat(output): configure document output queue"
Files:
Create: src/com/getorcha/app/document_output.clj
Test: test/com/getorcha/app/document_output_test.clj
Step 1: Write failing tests for job creation, guarded failure, and active-job uniqueness
Create test/com/getorcha/app/document_output_test.clj:
(ns com.getorcha.app.document-output-test
(:require [clojure.test :refer [deftest is testing use-fixtures]]
[com.getorcha.app.document-output :as document-output]
[com.getorcha.db.sql :as db.sql]
[com.getorcha.test.fixtures :as fixtures]
[com.getorcha.test.notification-helpers :as helpers]))
(use-fixtures :once fixtures/with-running-system)
(use-fixtures :each fixtures/with-db-rollback)
(deftest create-job!-inserts-pending-job
(let [tenant-id (helpers/create-tenant!)
user-id (helpers/create-identity!)
doc-id (helpers/create-completed-document! tenant-id user-id)
job (document-output/create-job!
fixtures/*db*
{:tenant-id tenant-id
:document-id doc-id
:document-domain :ap
:trigger :manual
:document-version 1
:requested-by user-id})]
(is (uuid? (:document-output-job/id job)))
(is (= "pending" (:document-output-job/status job)))
(is (= "manual" (:document-output-job/trigger job)))))
(deftest guarded-mark-failed-does-not-overwrite-running
(let [tenant-id (helpers/create-tenant!)
user-id (helpers/create-identity!)
doc-id (helpers/create-completed-document! tenant-id user-id)
job (document-output/create-job!
fixtures/*db*
{:tenant-id tenant-id
:document-id doc-id
:document-domain :ap
:trigger :manual
:requested-by user-id})]
(db.sql/execute-one!
fixtures/*db*
{:update :document-output-job
:set {:status (db.sql/->cast "running" :document-output-status)}
:where [:= :id (:document-output-job/id job)]})
(is (nil? (document-output/mark-send-failed-if-pending!
fixtures/*db* (:document-output-job/id job) "send failed")))
(is (= "running"
(:document-output-job/status
(db.sql/execute-one!
fixtures/*db*
{:select [:status]
:from [:document-output-job]
:where [:= :id (:document-output-job/id job)]})))))))
(deftest create-job!-rejects-second-active-job-for-document
(let [tenant-id (helpers/create-tenant!)
user-id (helpers/create-identity!)
doc-id (helpers/create-completed-document! tenant-id user-id)]
(document-output/create-job!
fixtures/*db*
{:tenant-id tenant-id
:document-id doc-id
:document-domain :ap
:trigger :manual
:requested-by user-id})
(is (thrown? Exception
(document-output/create-job!
fixtures/*db*
{:tenant-id tenant-id
:document-id doc-id
:document-domain :ap
:trigger :manual
:requested-by user-id})))))
Run:
clj -X:test:silent :nses '[com.getorcha.app.document-output-test]'
Expected: FAIL because com.getorcha.app.document-output does not exist.
Create src/com/getorcha/app/document_output.clj:
(ns com.getorcha.app.document-output
"Document output job creation, dispatch enqueueing, and query helpers."
(:require [clojure.string :as str]
[clojure.tools.logging :as log]
[com.getorcha.aws :as aws]
[com.getorcha.db.sql :as db.sql]))
(defn ^:private enum-name
[x]
(str/replace (name x) "-" "_"))
(defn create-job!
"Creates a pending document output job and returns it."
[db {:keys [tenant-id document-id document-domain trigger document-version requested-by]}]
(db.sql/execute-one!
db
{:insert-into :document-output-job
:values [(cond-> {:tenant-id tenant-id
:document-id document-id
:document-domain (db.sql/->cast (enum-name document-domain) :document-output-domain)
:trigger (db.sql/->cast (enum-name trigger) :document-output-trigger)
:status (db.sql/->cast "pending" :document-output-status)
:requested-by requested-by}
document-version (assoc :document-version document-version))]
:returning [:*]}))
(defn mark-send-failed-if-pending!
"Marks a job failed after an enqueue error only if it is still pending.
Returns the updated row, or nil if another worker already changed state."
[db job-id error-message]
(db.sql/execute-one!
db
{:update :document-output-job
:set {:status (db.sql/->cast "failed" :document-output-status)
:last-error error-message
:completed-at [:now]
:updated-at [:now]}
:where [:and
[:= :id job-id]
[:= :status (db.sql/->cast "pending" :document-output-status)]]
:returning [:*]}))
(defn enqueue-job!
"Sends the job id as a plain UUID string to the document-output queue."
[{:keys [aws] :as _context} job-id]
(aws/send-message!
(get-in aws [:clients :sqs])
(get-in aws [:queue-urls :document-output])
(str job-id)))
(defn create-and-enqueue-job!
"Creates a job in `db`, then sends its id to SQS. If SQS send fails, marks
the job failed only while it is still pending and rethrows the exception."
[{:keys [db-pool] :as context} job-attrs]
(let [job (create-job! db-pool job-attrs)
job-id (:document-output-job/id job)]
(try
(enqueue-job! context job-id)
job
(catch Exception e
(log/warn e "Failed to enqueue document output job" {:job-id job-id})
(mark-send-failed-if-pending! db-pool job-id (ex-message e))
(throw e)))))
(defn latest-job
"Returns the latest document output job for a document, if any."
[db document-id]
(db.sql/execute-one!
db
{:select [:*]
:from [:document-output-job]
:where [:= :document-id document-id]
:order-by [[:created-at :desc]]
:limit 1}))
Run:
clj -X:test:silent :nses '[com.getorcha.app.document-output-test]'
Expected: PASS.
git add src/com/getorcha/app/document_output.clj test/com/getorcha/app/document_output_test.clj
git commit -m "feat(output): add document output job helpers"
Files:
Modify: src/com/getorcha/integrations/ap/maesn.clj
Create: test/com/getorcha/integrations/ap/maesn_test.clj
Step 1: Add a failing test for dispatch job id propagation
Create test/com/getorcha/integrations/ap/maesn_test.clj:
(ns com.getorcha.integrations.ap.maesn-test
(:require [clojure.test :refer [deftest is testing]]
[com.getorcha.aws :as aws]
[com.getorcha.db.sql :as db.sql]
[com.getorcha.http.client :as http]
[com.getorcha.integrations.ap.cover-page :as cover-page]
[com.getorcha.integrations.ap.maesn :as maesn]))
(deftest create-booking-proposal-writes-dispatch-job-id
(testing "DATEV audit row records the document output job id"
(let [dispatch-job-id (random-uuid)
audit-insert (atom nil)
document-id (random-uuid)
tenant-id (random-uuid)
document {:document/id document-id
:document/tenant-id tenant-id
:document/file-path "documents/invoice.pdf"
:document/file-original-name "invoice.pdf"
:document/structured-data {:invoice-number "INV-1"
:currency "EUR"
:issue-date "2026-05-01"
:issuer {:name "Supplier"}
:recipient {:name "Recipient"}
:line-items []}}]
(with-redefs [maesn/connected-datev-integration
(fn [_ctx requested-tenant-id]
(is (= tenant-id requested-tenant-id))
{:tenant-datev-integration/config {}
:tenant-datev-integration/credentials-encrypted (.getBytes "encrypted")})
maesn/decrypt-account-key (fn [& _] "account-key")
aws/get-object (fn [& _] (.getBytes "pdf"))
cover-page/add-cover-page (fn [_doc _supplier bytes] bytes)
http/request (fn [_request]
{:status 201
:body {:data {:taskId "task-1"
:status "IN_PROGRESS"}}})
maesn/poll-until-complete! (fn [& _] nil)
db.sql/execute-one!
(fn
([_db query]
(cond
(= :ap-datev-export-audit (:insert-into query))
(do
(reset! audit-insert (-> query :values first))
{:ap-datev-export-audit/id (random-uuid)})
(= :ap-datev-export-audit (:update query))
{}
:else
nil))
([_db query _opts]
(cond
(= :ap-datev-export-audit (:insert-into query))
(do
(reset! audit-insert (-> query :values first))
{:ap-datev-export-audit/id (random-uuid)})
(= :ap-datev-export-audit (:update query))
{}
:else
nil)))]
(maesn/create-booking-proposal!
{:aws {:s3-buckets {:storage "bucket"}
:clients {:s3 ::s3}}
:db-pool ::db
:dispatch-job-id dispatch-job-id
:identity {:identity/id (random-uuid)}
:integrations {:datev {:api-key "api-key"}}}
document)
(is (= dispatch-job-id (:dispatch-job-id @audit-insert)))))))
Run:
clj -X:test:silent :nses '[com.getorcha.integrations.ap.maesn-test]'
Expected: FAIL until maesn/create-booking-proposal! writes dispatch-job-id.
In src/com/getorcha/integrations/ap/maesn.clj, update the context destructuring in create-booking-proposal!. Add dispatch-job-id to the existing first-argument :keys vector:
{:keys [aws db-pool identity integrations dispatch-job-id] :as context}
Change the audit insert value from:
{:document-id document-id
:tenant-id tenant-id
:initiated-by (:identity/id identity)
:request-payload [:lift proposal]
:payload-hash payload-hash}
to:
(cond-> {:document-id document-id
:tenant-id tenant-id
:initiated-by (:identity/id identity)
:request-payload [:lift proposal]
:payload-hash payload-hash}
dispatch-job-id (assoc :dispatch-job-id dispatch-job-id))
Run:
clj -X:test:silent :nses '[com.getorcha.integrations.ap.maesn-test]'
Expected: PASS.
git add src/com/getorcha/integrations/ap/maesn.clj test/com/getorcha/integrations/ap/maesn_test.clj
git commit -m "feat(datev): link export audits to output jobs"
Files:
Create: src/com/getorcha/workers/document_output.clj
Test: test/com/getorcha/workers/document_output_test.clj
Step 1: Write worker tests
Create test/com/getorcha/workers/document_output_test.clj:
(ns com.getorcha.workers.document-output-test
(:require [clojure.test :refer [deftest is testing use-fixtures]]
[com.getorcha.app.document-output :as app.output]
[com.getorcha.db.sql :as db.sql]
[com.getorcha.integrations.ap.maesn :as maesn]
[com.getorcha.test.fixtures :as fixtures]
[com.getorcha.test.notification-helpers :as helpers]
[com.getorcha.workers.document-output :as worker]))
(use-fixtures :once fixtures/with-running-system)
(use-fixtures :each fixtures/with-db-rollback)
(defn ^:private output-job!
[status]
(let [tenant-id (helpers/create-tenant!)
user-id (helpers/create-identity!)
doc-id (helpers/create-completed-document! tenant-id user-id)
job (app.output/create-job!
fixtures/*db*
{:tenant-id tenant-id
:document-id doc-id
:document-domain :ap
:trigger :approval-completed
:requested-by user-id})]
(when-not (= status "pending")
(db.sql/execute-one!
fixtures/*db*
{:update :document-output-job
:set {:status (db.sql/->cast status :document-output-status)
:updated-at [:now]}
:where [:= :id (:document-output-job/id job)]}))
(assoc job :tenant-id tenant-id :document-id doc-id :user-id user-id)))
(deftest claim-job!-claims-pending
(let [job (output-job! "pending")
claimed (worker/claim-job! fixtures/*db*
(:document-output-job/id job)
600)]
(is (= (:document-output-job/id job)
(:document-output-job/id claimed)))
(is (= "running" (:document-output-job/status claimed)))))
(deftest claim-job!-does-not-claim-fresh-running
(let [job (output-job! "running")]
(is (nil? (worker/claim-job! fixtures/*db*
(:document-output-job/id job)
600)))))
(deftest claim-job!-reclaims-stale-running
(let [job (output-job! "running")]
(db.sql/execute-one!
fixtures/*db*
{:update :document-output-job
:set {:updated-at [:raw "now() - interval '11 minutes'"]}
:where [:= :id (:document-output-job/id job)]})
(let [claimed (worker/claim-job! fixtures/*db*
(:document-output-job/id job)
600)]
(is (= (:document-output-job/id job)
(:document-output-job/id claimed)))
(is (= "running" (:document-output-job/status claimed))))))
(deftest dispatch-job!-marks-dispatched
(let [calls (atom 0)
job (output-job! "pending")]
(with-redefs [maesn/export-eligible? (constantly true)
maesn/create-booking-proposal! (fn [ctx _doc]
(is (= (:document-output-job/id job)
(:dispatch-job-id ctx)))
(swap! calls inc)
{:task-id "task-1"})]
(worker/dispatch-job!
{:aws {} :db-pool fixtures/*db* :integrations {}}
(:document-output-job/id job)
600)
(is (= 1 @calls))
(is (= "dispatched"
(:document-output-job/status
(db.sql/execute-one!
fixtures/*db*
{:select [:status]
:from [:document-output-job]
:where [:= :id (:document-output-job/id job)]})))))))
(deftest dispatch-job!-marks-failed-on-ineligible-document
(let [job (output-job! "pending")]
(with-redefs [maesn/export-eligible? (constantly false)]
(worker/dispatch-job!
{:aws {} :db-pool fixtures/*db* :integrations {}}
(:document-output-job/id job)
600)
(let [row (db.sql/execute-one!
fixtures/*db*
{:select [:status :last-error]
:from [:document-output-job]
:where [:= :id (:document-output-job/id job)]})]
(is (= "failed" (:document-output-job/status row)))
(is (re-find #"not eligible" (:document-output-job/last-error row)))))))
Run:
clj -X:test:silent :nses '[com.getorcha.workers.document-output-test]'
Expected: FAIL because worker namespace does not exist.
Create src/com/getorcha/workers/document_output.clj:
(ns com.getorcha.workers.document-output
"SQS worker for document output dispatch."
(:require [clojure.tools.logging :as log]
[com.getorcha.aws :as aws]
[com.getorcha.app.http.documents.shared :as documents.shared]
[com.getorcha.db.sql :as db.sql]
[com.getorcha.integrations.ap.maesn :as maesn]
[com.getorcha.logging.mdc :as mdc]
[integrant.core :as ig])
(:import (java.util.concurrent Callable Executors ExecutorService Future ScheduledExecutorService ScheduledFuture TimeUnit)
(software.amazon.awssdk.services.sqs SqsClient)
(software.amazon.awssdk.services.sqs.model Message)))
(set! *warn-on-reflection* true)
(defmethod ig/init-key ::worker-pools
[_ {:keys [timeout-minutes]}]
{:executor (Executors/newVirtualThreadPerTaskExecutor)
:heartbeat-pool (Executors/newScheduledThreadPool 2)
:timeout-minutes timeout-minutes})
(defmethod ig/halt-key! ::worker-pools
[_ {:keys [^ExecutorService executor ^ScheduledExecutorService heartbeat-pool timeout-minutes]}]
(log/info "Shutting down document-output worker pools")
(.shutdown executor)
(.shutdown heartbeat-pool)
(when-not (.awaitTermination executor (long timeout-minutes) TimeUnit/MINUTES)
(log/warn "Document-output executor did not terminate in time, forcing shutdown")
(.shutdownNow executor))
(when-not (.awaitTermination heartbeat-pool 30 TimeUnit/SECONDS)
(log/warn "Document-output heartbeat pool did not terminate in time, forcing shutdown")
(.shutdownNow heartbeat-pool)))
(defn claim-job!
"Atomically claims a pending or stale-running job."
[db-pool job-id stale-running-seconds]
(db.sql/execute-one!
db-pool
{:update :document-output-job
:set {:status (db.sql/->cast "running" :document-output-status)
:started-at [:coalesce :started-at [:now]]
:updated-at [:now]}
:where [:and
[:= :id job-id]
[:or
[:= :status (db.sql/->cast "pending" :document-output-status)]
[:and
[:= :status (db.sql/->cast "running" :document-output-status)]
[:< :updated-at [:- [:now] [:raw (str "interval '" stale-running-seconds " seconds'")]]]]]]
:returning [:*]}))
(defn ^:private touch-running!
[db-pool job-id]
(db.sql/execute-one!
db-pool
{:update :document-output-job
:set {:updated-at [:now]}
:where [:and
[:= :id job-id]
[:= :status (db.sql/->cast "running" :document-output-status)]]}))
(defn ^:private mark-dispatched!
[db-pool job-id]
(db.sql/execute-one!
db-pool
{:update :document-output-job
:set {:status (db.sql/->cast "dispatched" :document-output-status)
:completed-at [:now]
:updated-at [:now]}
:where [:and
[:= :id job-id]
[:= :status (db.sql/->cast "running" :document-output-status)]]}))
(defn ^:private mark-failed!
[db-pool job-id error-message]
(db.sql/execute-one!
db-pool
{:update :document-output-job
:set {:status (db.sql/->cast "failed" :document-output-status)
:last-error error-message
:completed-at [:now]
:updated-at [:now]}
:where [:and
[:= :id job-id]
[:= :status (db.sql/->cast "running" :document-output-status)]]}))
(defn ^:private latest-linked-audit
[db-pool job-id]
(db.sql/execute-one!
db-pool
{:select [:*]
:from [:ap-datev-export-audit]
:where [:= :dispatch-job-id job-id]
:order-by [[:initiated-at :desc]]
:limit 1}))
(def ^:private terminal-audit-statuses
#{"SUCCESS"
"FAILED"
"PARTIAL_SUCCESS"
"CANCELLED_SUCCESSFULLY"
"CANCEL_FAILED"})
(defn ^:private audit-dispatched?
[audit]
(or (:ap-datev-export-audit/task-id audit)
(contains? terminal-audit-statuses
(:ap-datev-export-audit/status audit))))
(defn ^:private load-document
[db-pool document-id]
(db.sql/execute-one!
db-pool
{:select [:*]
:from [:document]
:where [:= :id document-id]}
{:builder-fn documents.shared/document-builder-fn}))
(defn dispatch-job!
"Claims and dispatches a document output job. Public for focused tests."
[{:keys [db-pool] :as context} job-id stale-running-seconds]
(if-let [job (claim-job! db-pool job-id stale-running-seconds)]
(let [document-id (:document-output-job/document-id job)
doc (load-document db-pool document-id)
audit (latest-linked-audit db-pool job-id)]
(try
(cond
(nil? doc)
(mark-failed! db-pool job-id "document not found")
(audit-dispatched? audit)
(mark-dispatched! db-pool job-id)
audit
(do (log/warn "Output dispatch found a linked DATEV audit without a task id or terminal status; deferring to operator review"
{:job-id job-id
:audit-id (:ap-datev-export-audit/id audit)})
(mark-failed! db-pool job-id
"Previous DATEV export attempt is in an inconsistent state and needs operator review"))
(not (maesn/export-eligible? db-pool doc))
(mark-failed! db-pool job-id "document not eligible for DATEV export")
:else
(do
(maesn/create-booking-proposal!
(assoc context :dispatch-job-id job-id)
doc)
(mark-dispatched! db-pool job-id)))
(catch Exception e
(log/warn e "Document output dispatch failed" {:job-id job-id :document-id document-id})
(mark-failed! db-pool job-id (ex-message e)))))
(log/info "Document output job was not claimable" {:job-id job-id})))
(defn ^:private handle-message!
[{:keys [aws db-pool heartbeat-extension-seconds heartbeat-rate-seconds stale-running-seconds worker-pools]
:as context}
^Message message]
(let [job-id (parse-uuid (.body message))
sqs-client (get-in aws [:clients :sqs])
queue-url (get-in aws [:queue-urls :document-output])]
(if (nil? job-id)
(do
(log/error "Invalid document output job id" {:message-body (.body message)})
(aws/delete-message! sqs-client queue-url message))
(let [{:keys [^ScheduledExecutorService heartbeat-pool]} worker-pools
_ (aws/extend-visibility! sqs-client queue-url message heartbeat-extension-seconds)
heartbeat (mdc/wrap-runnable
(fn []
(aws/extend-visibility! sqs-client queue-url message heartbeat-extension-seconds)
(touch-running! db-pool job-id)))
heartbeat-future ^ScheduledFuture (.scheduleAtFixedRate
heartbeat-pool
^Runnable heartbeat
(long heartbeat-rate-seconds)
(long heartbeat-rate-seconds)
TimeUnit/SECONDS)]
(try
(dispatch-job! context job-id stale-running-seconds)
(let [status (:document-output-job/status
(db.sql/execute-one!
db-pool
{:select [:status]
:from [:document-output-job]
:where [:= :id job-id]}))]
(when (#{"dispatched" "failed"} status)
(aws/delete-message! sqs-client queue-url message)))
(finally
(.cancel heartbeat-future false)))))))
(defmethod ig/init-key ::orchestrator
[_ {:keys [aws interrupt-on-halt? max-queue-messages wait-time-seconds worker-pools] :as config}]
(let [{:keys [^ExecutorService executor]} worker-pools
polling? (atom true)
^SqsClient sqs-client (get-in aws [:clients :sqs])
queue-url (get-in aws [:queue-urls :document-output])
^Future polling-future
(.submit executor
^Callable
(fn []
(while @polling?
(try
(let [messages (aws/receive-messages
sqs-client
queue-url
{:max-messages max-queue-messages
:wait-time-seconds wait-time-seconds})]
(doseq [^Message message messages]
(.submit executor
^Callable
(mdc/wrap-callable
#(handle-message! config message)))))
(catch Exception e
(when @polling?
(log/error e "Error in document-output polling loop, will retry")))))))]
(log/info "Starting document-output orchestrator for queue" (get-in aws [:queues :document-output]))
{:polling? polling?
:polling-future polling-future
:interrupt-on-halt? interrupt-on-halt?}))
(defmethod ig/halt-key! ::orchestrator
[_ {:keys [polling? ^Future polling-future interrupt-on-halt?]}]
(log/info "Stopping document-output orchestrator")
(reset! polling? false)
(when interrupt-on-halt?
(.cancel polling-future true)))
Run:
clj -X:test:silent :nses '[com.getorcha.workers.document-output-test]'
Expected: PASS.
git add src/com/getorcha/workers/document_output.clj test/com/getorcha/workers/document_output_test.clj
git commit -m "feat(output): add document output worker"
Files:
Modify: src/com/getorcha/app/http/documents/view/approval.clj
Modify: test/com/getorcha/app/http/documents/view/approval_test.clj
Step 1: Replace final-approval export tests
In test/com/getorcha/app/http/documents/view/approval_test.clj, replace auto-fire-on-final-approval-test and auto-fire-skipped-when-not-eligible-test with tests that assert job creation and enqueue. Use with-redefs around document-output/enqueue-job!:
(deftest final-approval-creates-and-enqueues-output-job-test
(let [enqueued (atom [])
{:keys [doc-id alice bob approval-rows]} (setup-doc-with-approvers!)]
(with-redefs [com.getorcha.app.document-output/enqueue-job!
(fn [_ctx job-id] (swap! enqueued conj job-id) "msg-1")]
(invoke-handler
#'com.getorcha.app.http.documents.view.approval/approve!
{:identity {:identity/id alice :identity/is-super-admin false}
:parameters {:path {:document-id doc-id
:approval-id (:ap-invoice-approval/id (first approval-rows))}}})
(is (empty? @enqueued))
(invoke-handler
#'com.getorcha.app.http.documents.view.approval/approve!
{:aws {}
:identity {:identity/id bob :identity/is-super-admin false}
:parameters {:path {:document-id doc-id
:approval-id (:ap-invoice-approval/id (second approval-rows))}}})
(is (= 1 (count @enqueued)))
(let [job (db.sql/execute-one!
fixtures/*db*
{:select [:*]
:from [:document-output-job]
:where [:= :document-id doc-id]})]
(is (= (first @enqueued) (:document-output-job/id job)))
(is (= "approval_completed" (:document-output-job/trigger job)))
(is (= "pending" (:document-output-job/status job)))))))
Run:
clj -X:test:silent :nses '[com.getorcha.app.http.documents.view.approval-test]'
Expected: FAIL because approval handler still calls Maesn directly.
In src/com/getorcha/app/http/documents/view/approval.clj:
clojure.tools.logging and com.getorcha.integrations.ap.maesn requires if only used by fire-export-async!.[com.getorcha.app.document-output :as document-output]
fire-export-async!.approve! to return [result job] from the transaction instead of [result fully-approved?].Inside the final approval branch, after deriving updated-rows, only fetch the
document and create the job when the document is fully approved (skip the fetch
for non-final approvals):
(let [job (when (= :fully-approved (derive-state updated-rows))
(let [document (db.sql/execute-one!
tx
{:select [:id :tenant-id :version]
:from [:document]
:where [:= :id document-id]})]
(document-output/create-job!
tx
{:tenant-id (:document/tenant-id document)
:document-id document-id
:document-domain :ap
:trigger :approval-completed
:document-version (:document/version document)
:requested-by (:identity/id identity)})))]
[(ring.resp/ok "approved")
job])
After the transaction commits:
(when job
(try
(document-output/enqueue-job! request (:document-output-job/id job))
(catch Exception e
(document-output/mark-send-failed-if-pending!
db-pool
(:document-output-job/id job)
(ex-message e))
nil)))
Keep the approval response as 200 approved even if SQS dispatch fails, because the approval itself succeeded and the output job records dispatch failure.
Run:
clj -X:test:silent :nses '[com.getorcha.app.http.documents.view.approval-test]'
Expected: PASS.
git add src/com/getorcha/app/http/documents/view/approval.clj test/com/getorcha/app/http/documents/view/approval_test.clj
git commit -m "feat(approvals): enqueue output after final approval"
Files:
Modify: src/com/getorcha/app/http/documents/view/invoice.clj
Modify: src/com/getorcha/app/http/documents/view/shared.clj (existing
datev-export-section caller in detail-events needs the new sixth arg)
Modify: test/com/getorcha/app/http/documents/view/invoice_test.clj
Step 1: Extend datev-export-section signature and update existing callers
This step lands the signature change without rendering logic so the manual re-export rewrite in Step 4 can pass the new arg without leaving the tree non-compiling between commits. Task 8 fills in the rendering.
In src/com/getorcha/app/http/documents/view/invoice.clj, change the
datev-export-section parameter vector from:
[router document-id export-audit can-export? pending-approval?]
to:
[router document-id export-audit can-export? pending-approval? output-job]
Leave the body unchanged for now; output-job is unused until Task 8.
Update every existing caller of datev-export-section to pass nil (or an
already-fetched output job where one is convenient) as the new sixth argument.
Search:
rg -n "datev-export-section" src test
Expected: every call site passes six args; project still compiles.
Update test/com/getorcha/app/http/documents/view/invoice_test.clj: the current export-datev-uses-tenant-datev-integration-credentials should be replaced with a test that stubs document-output/create-and-enqueue-job! and asserts maesn/create-booking-proposal! is not called.
Add:
(deftest export-datev-creates-output-job-instead-of-calling-datev
(testing "manual re-export dispatches through document output"
(let [document-id #uuid "019dce7b-aefc-709d-b04b-3c989d7e9d95"
tenant-id #uuid "00000000-0000-0000-0000-000000000001"
job-attrs (atom nil)]
(with-redefs [db.sql/execute-one! (fn [& _]
{:document/id document-id
:document/tenant-id tenant-id
:document/version 3})
shared/tenant-ids (constantly [tenant-id])
com.getorcha.app.document-output/create-and-enqueue-job!
(fn [_request attrs]
(reset! job-attrs attrs)
{:document-output-job/id (random-uuid)
:document-output-job/status "pending"})
maesn/create-booking-proposal!
(fn [& _] (throw (ex-info "should not call DATEV" {})))
maesn/get-latest-export-audit (constantly nil)]
(#'view.invoice/export-datev
{:aws {}
:db-pool ::db
:identity {:identity/id #uuid "00000000-0000-0000-0000-000000000010"}
:integrations {:datev {}}
:parameters {:path {:document-id document-id}}
::reitit/router test-router}
(fn [_response])
(fn [_error]))
(is (= {:tenant-id tenant-id
:document-id document-id
:document-domain :ap
:trigger :manual
:document-version 3
:requested-by #uuid "00000000-0000-0000-0000-000000000010"}
@job-attrs))))))
Add a second test that asserts a non-uniqueness exception surfaces a synthetic failed-dispatch marker to the UI rather than swallowing it:
(deftest export-datev-surfaces-synthetic-failure-on-non-uniqueness-error
(testing "non-uniqueness errors render a dispatch-failed UI marker"
(let [document-id #uuid "019dce7b-aefc-709d-b04b-3c989d7e9d96"
tenant-id #uuid "00000000-0000-0000-0000-000000000001"
captured (atom nil)]
(with-redefs [db.sql/execute-one! (fn [& _]
{:document/id document-id
:document/tenant-id tenant-id
:document/version 3})
shared/tenant-ids (constantly [tenant-id])
com.getorcha.app.document-output/create-and-enqueue-job!
(fn [& _] (throw (ex-info "DB unavailable" {})))
com.getorcha.app.document-output/latest-job (constantly nil)
maesn/get-latest-export-audit (constantly nil)
view.invoice/datev-export-section
(fn [_router _doc-id _audit _can-export? _pending? job]
(reset! captured job)
[:section])]
(#'view.invoice/export-datev
{:aws {}
:db-pool ::db
:identity {:identity/id #uuid "00000000-0000-0000-0000-000000000010"}
:integrations {:datev {}}
:parameters {:path {:document-id document-id}}
::reitit/router test-router}
(fn [_response])
(fn [_error]))
(is (= "failed" (:document-output-job/status @captured)))
(is (= "DB unavailable" (:document-output-job/last-error @captured)))))))
Run:
clj -X:test:silent :nses '[com.getorcha.app.http.documents.view.invoice-test]'
Expected: FAIL because export handler still calls DATEV.
In src/com/getorcha/app/http/documents/view/invoice.clj:
[com.getorcha.app.document-output :as document-output]
export-datev, remove direct maesn/create-booking-proposal!, decrypt, and poll code.(let [job (try
(document-output/create-and-enqueue-job!
request
{:tenant-id (:document/tenant-id document)
:document-id document-id
:document-domain :ap
:trigger :manual
:document-version (:document/version document)
:requested-by (:identity/id (:identity request))})
(catch java.sql.SQLException e
(if (= "23505" (.getSQLState e))
;; Active job already exists for this document; show its state.
(do (log/info "Manual re-export rejected: dispatch already active"
{:document-id document-id})
(document-output/latest-job db-pool document-id))
(do (log/error e "Failed to dispatch DATEV export"
{:document-id document-id})
{:document-output-job/status "failed"
:document-output-job/last-error (ex-message e)})))
(catch Exception e
(log/error e "Failed to dispatch DATEV export"
{:document-id document-id})
;; Synthesize a transient failed-dispatch marker for the UI so the
;; user sees an explicit error instead of an unchanged section.
{:document-output-job/status "failed"
:document-output-job/last-error (ex-message e)}))
audit (maesn/get-latest-export-audit db-pool document-id)]
(respond (ring.resp/ok
(datev-export-section router document-id audit false false job))))
The two catch branches differentiate uniqueness violations (the user double-clicked or a previous dispatch is still in flight) from other failures (transient DB error, etc.). Uniqueness falls back to the live job; everything else surfaces a synthetic failed marker so the section renders an explicit "dispatch failed" state instead of silently keeping the previous render.
Run:
clj -X:test:silent :nses '[com.getorcha.app.http.documents.view.invoice-test]'
Expected: PASS.
git add src/com/getorcha/app/http/documents/view/invoice.clj src/com/getorcha/app/http/documents/view/shared.clj test/com/getorcha/app/http/documents/view/invoice_test.clj
git commit -m "feat(datev): dispatch manual exports through output jobs"
Files:
Modify: src/com/getorcha/app/ingestion.clj
Modify: src/com/getorcha/app/http/documents/view/invoice.clj
Modify: src/com/getorcha/app/http/documents/view/shared.clj
Test: test/com/getorcha/app/http/documents/view/shared_test.clj
Test: test/com/getorcha/app/http/documents/view/invoice_test.clj
Step 1: Add dispatch rendering logic to datev-export-section
The signature was already extended in Task 7 Step 1 and output-job is currently
unused. This step adds the rendering logic that consumes it.
In src/com/getorcha/app/http/documents/view/invoice.clj, add these bindings to
the existing let that computes the DATEV display state:
[dispatch-status (:document-output-job/status output-job)
dispatch-error (:document-output-job/last-error output-job)
dispatch-active? (#{"pending" "running"} dispatch-status)
dispatch-failed? (and (= "failed" dispatch-status)
(nil? status))]
Extend the existing badge cond with these branches before the pending-approval? branch:
dispatch-active?
[:span.section-badge.badge-datev-pending "Dispatching..."]
dispatch-failed?
[:span.section-badge.badge-datev-failed "Dispatch failed"]
pending-approval?
[:span.section-badge.badge-datev-pending "Awaiting approval"]
Render dispatch error before DATEV logs:
(when dispatch-failed?
[:div.datev-logs
[:div.datev-log-entry.failed
[:div.log-message (or dispatch-error "Export dispatch failed")]]])
Keep SSE subscription active when dispatch-active? is true:
(when (or is-pending? dispatch-active?)
[:div {:hx-ext "sse"
:sse-connect (app.http.routes/path-for router :com.getorcha.app.http.documents.view/detail-events {:document-id document-id})
:sse-swap "export-status-changed"
:hx-target "#section-datev-export"
:hx-swap "outerHTML"}])
After this step, update any caller that previously passed nil as the sixth
argument to pass (document-output/latest-job db-pool document-id) (or an
already-fetched output-job) so dispatch state actually renders. Confirm with:
rg -n "datev-export-section" src test
In src/com/getorcha/app/ingestion.clj, extend the document event schema/parser to allow :output-dispatch with keys emitted by the trigger. Follow the existing :export event shape.
In src/com/getorcha/app/http/documents/view/shared.clj, in detail-events, add:
:output-dispatch
(let [audit (maesn/get-latest-export-audit db-pool document-id)
output-job (document-output/latest-job db-pool document-id)
status (:ap-datev-export-audit/status audit)
can-export? (#{"FAILED" "PARTIAL_SUCCESS" "CANCELLED_SUCCESSFULLY" "CANCEL_FAILED"} status)
rows (view.approval/fetch-approval-rows db-pool document-id)
pending-approval? (= :pending (view.approval/derive-state rows))]
{:event "export-status-changed"
:data (hiccup/html
(view.invoice/datev-export-section router document-id audit can-export? pending-approval? output-job))})
Add require:
[com.getorcha.app.document-output :as document-output]
In test/com/getorcha/app/http/documents/view/invoice_test.clj, add a Hiccup string test:
(deftest datev-export-section-shows-dispatch-failure
(let [html (str (hiccup2.core/html
(view.invoice/datev-export-section
test-router
#uuid "019dce7b-aefc-709d-b04b-3c989d7e9d95"
nil
true
false
{:document-output-job/status "failed"
:document-output-job/last-error "SQS unavailable"})))]
(is (clojure.string/includes? html "Dispatch failed"))
(is (clojure.string/includes? html "SQS unavailable"))))
Add hiccup2.core and clojure.string requires if missing.
Run:
clj -X:test:silent :nses '[com.getorcha.app.http.documents.view.invoice-test com.getorcha.app.http.documents.view.shared-test]'
Expected: PASS.
git add src/com/getorcha/app/ingestion.clj src/com/getorcha/app/http/documents/view/invoice.clj src/com/getorcha/app/http/documents/view/shared.clj test/com/getorcha/app/http/documents/view/invoice_test.clj test/com/getorcha/app/http/documents/view/shared_test.clj
git commit -m "feat(output): surface document output dispatch status"
Files:
Modify: src/com/getorcha/app/http/ap.clj
Modify: test/com/getorcha/app/http/documents_test.clj
Step 1: Identify batch-selection-only symbols
Run:
rg -n "bulk-actions-bar|select-all-cell|bulk-toggle|bulk-toggle-all|bulk-deselect-all|bulk-export-datev|selection-session-key|get-selection|fetch-exportable-ids|selected-count|bulk-export-btn|checkbox-cell" src/com/getorcha/app/http/ap.clj test/com/getorcha/app/http/documents_test.clj
Expected: references are only AP batch selection/export. If a symbol is used for another bulk action, do not remove it without replacing that action.
In src/com/getorcha/app/http/ap.clj:
bulk-actions-bar.select-all-cell.document-row.After editing, this search should return no results in src/com/getorcha/app/http/ap.clj:
rg -n "bulk-actions-bar|select-all-cell|selected-count|bulk-export-btn|checkbox-cell" src/com/getorcha/app/http/ap.clj
In src/com/getorcha/app/http/ap.clj, delete:
bulk-togglebulk-toggle-allbulk-deselect-allfetch-exportable-idsbulk-export-datevRemove the route entries whose first route segment is "/toggle/:document-id", "/toggle-all", "/deselect-all", and "/export-datev".
In test/com/getorcha/app/http/documents_test.clj, delete bulk-export-datev-test and any tests that only cover selection toggles. Keep tests for AP list rendering, status filters, and non-bulk behavior.
Run:
clj -X:test:silent :nses '[com.getorcha.app.http.documents-test]'
Expected: PASS.
git add src/com/getorcha/app/http/ap.clj test/com/getorcha/app/http/documents_test.clj
git commit -m "remove(ap): drop batch DATEV export"
Files:
No code changes expected.
Step 1: Run all touched namespaces
Run:
clj -X:test:silent :nses '[com.getorcha.app.document-output-test com.getorcha.workers.document-output-test com.getorcha.app.http.documents.view.approval-test com.getorcha.app.http.documents.view.invoice-test com.getorcha.app.http.documents.view.shared-test com.getorcha.app.http.documents-test]'
Expected: PASS.
Run:
clj-kondo --lint src test dev
Expected: no findings introduced by this work.
Run:
clj -X:test
Expected: PASS.
Run:
rg -n "Thread/startVirtualThread|create-booking-proposal!|bulk-export-datev|bulk-toggle|selected-count|dispatch-job-id|document-output" src test resources/com/getorcha/config.edn
Expected:
approval.clj no longer starts a virtual thread or calls create-booking-proposal!.export-datev handler no longer calls create-booking-proposal!.maesn.clj still calls create-booking-proposal! internally for ledger retry.bulk-export-datev or selection route symbols remain.dispatch-job-id appears in Maesn audit insert and worker tests.document-output queue and worker config appear.target, active-job uniqueness, and dispatch_job_id: Tasks 1 and 4.pending, running, dispatched, failed.approval_completed in SQL/string form and :approval-completed in Clojure keyword form before enum-name conversion.:document-output, message body is UUID string.