Docs
Reference

Jobs Control Plane

Canonical reference for submitting external work into Breyta's jobs control plane, awaiting normalized terminal results, and wiring worker outputs back into the flow.

Quick Answer

Use flow/step :job as the primary authoring surface.

  • :op :submit creates one queued external job
  • :op :submit-batch creates a queued batch
  • :op :get and :op :get-batch read the latest durable state
  • :op :await and :op :await-batch wait for terminal completion

When a flow depends on external workers for installability or operator setup,
declare that dependency in :requires with {:kind :worker ...}. The :job
step describes runtime orchestration; the worker requirement describes the
install/operator contract.

Canonical Step Surface

Step opPurposeReturn shape
:submitCreate one queued job.Job map with fields such as :job-id, :job-type, :status, :payload.
:submit-batchCreate a queued batch of jobs of the same type.Batch map with :batch-id, aggregate counts, and :jobs.
:getRead the latest state of one job.Job map.
:get-batchRead the latest state of one batch.Batch map, optionally with :jobs.
:awaitBounded wait for one job to become terminal.Final job map.
:await-batchBounded wait for one batch to become terminal.Final batch map.

Single Job Example

'(let [{:keys [surface mode timeoutSeconds]} (flow/input)
       queued-job (flow/step :job :submit-agent-review
                    {:op :submit
                     :job-type "agent-review"
                     :payload {:surface (or surface "flows-api")
                               :mode (or mode "succeeded")}
                     :metadata {:campaign "agent-review"}})
       final-job (flow/step :job :await-agent-review
                   {:op :await
                    :job-id (:job-id queued-job)
                    :interval "250ms"
                    :timeout (str (long (or timeoutSeconds 60)) "s")})]
   final-job)

Matching worker command:

export BREYTA_API_URL="https://flows.breyta.ai"
export BREYTA_WORKSPACE="ws-acme"
export BREYTA_API_KEY="<service-account-api-key>"

breyta jobs worker run --type agent-review --handler ./run-agent-review.sh

Recommended operator setup for that worker identity:

breyta service-accounts create \
  --name agent-review-worker \
  --scope jobs.worker \
  --job-type agent-review

breyta service-accounts keys create <service-account-id> --name agent-review-key

--scope accepts repeated flags or comma-separated values. --capability
remains accepted as a compatibility alias.

jobs.worker is the minimum worker scope. If the same machine should also
inspect or mutate broader Breyta state, grant additional service-account
scopes such as flows.read, flows.manage, flows.run,
resources.read, resources.write, or workspace.full according to the
automation boundary you want.

That worker injects the local job directory and the API context the handler-side
commands use:

  • BREYTA_JOB_DIR
  • BREYTA_JOB_CONTEXT_FILE
  • BREYTA_JOB_FILE
  • BREYTA_JOB_PAYLOAD_FILE
  • BREYTA_JOB_RESULT_FILE
  • BREYTA_JOB_ID
  • BREYTA_JOB_TYPE
  • BREYTA_JOB_WORKSPACE_ID
  • BREYTA_API_URL
  • BREYTA_WORKSPACE
  • BREYTA_API_KEY or BREYTA_TOKEN

Use breyta jobs show <job-id> for the durable job state the flow can await.
Use breyta jobs worker state inside the handler, or with --job-dir <dir>,
to inspect the local job.json, payload.json, and result.json state while
the worker is building the result.

That handler reads the injected payload.json, emits progress through breyta jobs worker progress, optionally attaches resource-backed artifacts or
structured resources, and marks terminal state through breyta jobs worker finish or breyta jobs worker fail.
Those helper commands reuse the worker context plus the worker API/workspace/auth
env injected by breyta jobs worker run.
BREYTA_JOB_CONTEXT_FILE is an opaque worker context for those helper commands;
handlers typically read payload.json and write result state instead of parsing
that file directly.

Minimal handler implementation:

#!/usr/bin/env bash
set -euo pipefail

surface="$(python3 - "$BREYTA_JOB_PAYLOAD_FILE" <<'PY'
import json
import sys

with open(sys.argv[1], "r", encoding="utf-8") as handle:
    payload = json.load(handle)

print(payload.get("surface", "flows-api"))
PY
)"

report_path="$BREYTA_JOB_DIR/review-report.md"
cat >"$report_path" <<EOF
# Review

Surface: $surface
EOF

breyta jobs worker progress \
  --status running \
  --message "Reviewing $surface"

report_uri="$(breyta jobs worker attach-file \
  --file "$report_path" \
  --label review-report \
  --kind report \
  --print-uri)"

summary_uri="$(breyta jobs worker attach-kv \
  --label review-summary \
  --key review-summary \
  --field finding-count=1 \
  --field severity=high \
  --print-uri)"

breyta jobs worker finish \
  --summary "Reviewed $surface" \
  --output "surface=$surface" \
  --output finding-count=1 \
  --output "report-resource-uri=$report_uri" \
  --output "summary-resource-uri=$summary_uri"

Row-shaped outputs can be persisted directly as table resources:

findings_uri="$(breyta jobs worker attach-table \
  --label findings \
  --table security-findings \
  --rows-file "$BREYTA_JOB_DIR/findings.json" \
  --write-mode upsert \
  --key-field finding_id \
  --index-field severity \
  --print-uri)"

attach-table creates the table resource and its schema on write from the
provided row objects. --key and --table are logical job-local suffixes; the
API persists the actual KV key or table name under a job-scoped namespace and
returns that effective name on the artifact. Add the returned findings_uri to
jobs worker finish outputs when later flow steps should reference that table
directly.

That is the minimum implementation shape:

  • read structured input from BREYTA_JOB_PAYLOAD_FILE
  • inspect local worker state with breyta jobs worker state when debugging
  • optionally report progress with breyta jobs worker progress
  • optionally persist resource-backed artifacts with breyta jobs worker attach-file
  • optionally persist structured summaries with breyta jobs worker attach-kv
  • optionally persist row-shaped outputs with breyta jobs worker attach-table
  • mark terminal success or failure with breyta jobs worker finish or breyta jobs worker fail
  • raw BREYTA_JOB_RESULT_FILE writes remain a fallback for advanced handlers

Using Outputs And Artifacts Later In The Flow

Workers should keep branchable facts in :outputs and attach larger reports or
files as artifacts. A common shape is:

  • :outputs carries small fields such as :finding-count, :severity,
    :report-resource-uri, :summary-resource-uri, or
    :findings-table-uri
  • :artifacts carries resource-backed entries such as a markdown report with a
    :resource-uri, a structured summary, or a findings table

Example flow usage:

'(let [queued-job (flow/step :job :submit-agent-review
                    {:op :submit
                     :job-type "agent-review"
                     :payload {:surface "flows-api"}})
       final-job (flow/step :job :await-agent-review
                   {:op :await
                    :job-id (:job-id queued-job)})
       finding-count (get-in final-job [:outputs :finding-count])
       report-uri (get-in final-job [:outputs :report-resource-uri])
       summary-uri (get-in final-job [:outputs :summary-resource-uri])
       findings-uri (get-in final-job [:outputs :findings-table-uri])]
   {:finding-count finding-count
    :report-uri report-uri
    :summary-uri summary-uri
    :findings-uri findings-uri
    :artifacts (:artifacts final-job)})

Batch Example

'(let [surfaces ["flows-api" "runtime" "worker"]
       queued-batch (flow/step :job :submit-security-review-batch
                      {:op :submit-batch
                       :job-type "codex-security-review"
                       :metadata {:campaign "weekly-audit"}
                       :jobs (mapv (fn [surface]
                                     {:payload {:surface surface}})
                                   surfaces)})
       final-batch (flow/step :job :await-security-review-batch
                     {:op :await-batch
                      :batch-id (:batch-id queued-batch)
                      :interval "15s"
                      :timeout "45m"
                      :include-jobs? true
                      :limit 50})]
   {:batch-id (:batch-id final-batch)
    :status (:status final-batch)
    :succeeded (:succeeded-count final-batch)
    :failed (:failed-count final-batch)
    :jobs (:jobs final-batch)})

Recommended Model

Use the :job step as the primary external execution surface.

  • Flow creates one job or a small explicit batch.
  • External workers claim and execute that work through breyta jobs worker run.
  • Flow awaits the job or batch and branches on the normalized result.

Authoring boundary:

  • use flow/step :job in the flow body to submit and await work
  • use {:kind :worker ...} inside :requires to declare the external worker
    dependency to installers and operators

Step Fields

Common fields:

FieldRequiredMeaning
:opYesOne of :submit, :submit-batch, :get, :get-batch, :await, :await-batch.

Submit fields:

FieldOpsRequiredMeaning
:job-type:submit, :submit-batchYesWorker capability/job type to target.
:payload:submitNoStructured job input for the worker.
:metadata:submit, :submit-batchNoOrchestration/operator metadata.
:jobs:submit-batchYesVector of per-job configs, typically { :payload ... :metadata ... }.
:max-attempts:submit, :submit-batchNoMax lease attempts before timeout.

Read/await fields:

FieldOpsRequiredMeaning
:job-id:get, :awaitYesTarget job id.
:batch-id:get-batch, :await-batchYesTarget batch id.
:interval:await, :await-batchNoPoll interval. Defaults to "10s".
:timeout:await, :await-batchConditionallyDuration or ms timeout cap. Defaults to "30m" when no bound is provided.
:max-attempts:await, :await-batchConditionallyAttempt cap.
:backoff:await, :await-batchNoInterval progression map with :constant, :linear, or :exponential.
:include-jobs?:get-batch, :await-batchNoInclude :jobs in the batch result. Defaults to true for terminal results.
:limit:get-batch, :await-batchNoMax jobs returned when :jobs are included.

Internal lineage metadata may be attached automatically, but it is not part of
the primary authoring contract.

Terminal Statuses

Jobs stop awaiting when :status becomes one of:

  • "succeeded"
  • "no_changes"
  • "failed"
  • "cancelled"
  • "timed_out"

Batches stop awaiting when :status becomes one of:

  • "completed"
  • "failed"
  • "partially_failed"
  • "cancelled"

When To Prefer flow/poll

Use flow/poll instead when:

  • the external async surface is not backed by the Breyta jobs control plane
  • completion depends on a custom predicate instead of standard job/batch terminal states
  • you are polling a third-party API directly

Related

As of Apr 17, 2026