Jobs Control Plane
Canonical reference for submitting external work into Breyta's jobs control plane, awaiting normalized terminal results, and wiring worker outputs back into the flow.
Quick Answer
Use flow/step :job as the primary authoring surface.
:op :submitcreates one queued external job:op :submit-batchcreates a queued batch:op :getand:op :get-batchread the latest durable state:op :awaitand:op :await-batchwait for terminal completion
When a flow depends on external workers for installability or operator setup,
declare that dependency in :requires with {:kind :worker ...}. The :job
step describes runtime orchestration; the worker requirement describes the
install/operator contract.
Canonical Step Surface
| Step op | Purpose | Return shape |
|---|---|---|
:submit | Create one queued job. | Job map with fields such as :job-id, :job-type, :status, :payload. |
:submit-batch | Create a queued batch of jobs of the same type. | Batch map with :batch-id, aggregate counts, and :jobs. |
:get | Read the latest state of one job. | Job map. |
:get-batch | Read the latest state of one batch. | Batch map, optionally with :jobs. |
:await | Bounded wait for one job to become terminal. | Final job map. |
:await-batch | Bounded wait for one batch to become terminal. | Final batch map. |
Single Job Example
'(let [{:keys [surface mode timeoutSeconds]} (flow/input)
queued-job (flow/step :job :submit-agent-review
{:op :submit
:job-type "agent-review"
:payload {:surface (or surface "flows-api")
:mode (or mode "succeeded")}
:metadata {:campaign "agent-review"}})
final-job (flow/step :job :await-agent-review
{:op :await
:job-id (:job-id queued-job)
:interval "250ms"
:timeout (str (long (or timeoutSeconds 60)) "s")})]
final-job)
Matching worker command:
export BREYTA_API_URL="https://flows.breyta.ai"
export BREYTA_WORKSPACE="ws-acme"
export BREYTA_API_KEY="<service-account-api-key>"
breyta jobs worker run --type agent-review --handler ./run-agent-review.sh
Recommended operator setup for that worker identity:
breyta service-accounts create \
--name agent-review-worker \
--scope jobs.worker \
--job-type agent-review
breyta service-accounts keys create <service-account-id> --name agent-review-key
--scope accepts repeated flags or comma-separated values. --capability
remains accepted as a compatibility alias.
jobs.worker is the minimum worker scope. If the same machine should also
inspect or mutate broader Breyta state, grant additional service-account
scopes such as flows.read, flows.manage, flows.run,
resources.read, resources.write, or workspace.full according to the
automation boundary you want.
That worker injects the local job directory and the API context the handler-side
commands use:
BREYTA_JOB_DIRBREYTA_JOB_CONTEXT_FILEBREYTA_JOB_FILEBREYTA_JOB_PAYLOAD_FILEBREYTA_JOB_RESULT_FILEBREYTA_JOB_IDBREYTA_JOB_TYPEBREYTA_JOB_WORKSPACE_IDBREYTA_API_URLBREYTA_WORKSPACEBREYTA_API_KEYorBREYTA_TOKEN
Use breyta jobs show <job-id> for the durable job state the flow can await.
Use breyta jobs worker state inside the handler, or with --job-dir <dir>,
to inspect the local job.json, payload.json, and result.json state while
the worker is building the result.
That handler reads the injected payload.json, emits progress through breyta jobs worker progress, optionally attaches resource-backed artifacts or
structured resources, and marks terminal state through breyta jobs worker finish or breyta jobs worker fail.
Those helper commands reuse the worker context plus the worker API/workspace/auth
env injected by breyta jobs worker run.
BREYTA_JOB_CONTEXT_FILE is an opaque worker context for those helper commands;
handlers typically read payload.json and write result state instead of parsing
that file directly.
Minimal handler implementation:
#!/usr/bin/env bash
set -euo pipefail
surface="$(python3 - "$BREYTA_JOB_PAYLOAD_FILE" <<'PY'
import json
import sys
with open(sys.argv[1], "r", encoding="utf-8") as handle:
payload = json.load(handle)
print(payload.get("surface", "flows-api"))
PY
)"
report_path="$BREYTA_JOB_DIR/review-report.md"
cat >"$report_path" <<EOF
# Review
Surface: $surface
EOF
breyta jobs worker progress \
--status running \
--message "Reviewing $surface"
report_uri="$(breyta jobs worker attach-file \
--file "$report_path" \
--label review-report \
--kind report \
--print-uri)"
summary_uri="$(breyta jobs worker attach-kv \
--label review-summary \
--key review-summary \
--field finding-count=1 \
--field severity=high \
--print-uri)"
breyta jobs worker finish \
--summary "Reviewed $surface" \
--output "surface=$surface" \
--output finding-count=1 \
--output "report-resource-uri=$report_uri" \
--output "summary-resource-uri=$summary_uri"
Row-shaped outputs can be persisted directly as table resources:
findings_uri="$(breyta jobs worker attach-table \
--label findings \
--table security-findings \
--rows-file "$BREYTA_JOB_DIR/findings.json" \
--write-mode upsert \
--key-field finding_id \
--index-field severity \
--print-uri)"
attach-table creates the table resource and its schema on write from the
provided row objects. --key and --table are logical job-local suffixes; the
API persists the actual KV key or table name under a job-scoped namespace and
returns that effective name on the artifact. Add the returned findings_uri to
jobs worker finish outputs when later flow steps should reference that table
directly.
That is the minimum implementation shape:
- read structured input from
BREYTA_JOB_PAYLOAD_FILE - inspect local worker state with
breyta jobs worker statewhen debugging - optionally report progress with
breyta jobs worker progress - optionally persist resource-backed artifacts with
breyta jobs worker attach-file - optionally persist structured summaries with
breyta jobs worker attach-kv - optionally persist row-shaped outputs with
breyta jobs worker attach-table - mark terminal success or failure with
breyta jobs worker finishorbreyta jobs worker fail - raw
BREYTA_JOB_RESULT_FILEwrites remain a fallback for advanced handlers
Using Outputs And Artifacts Later In The Flow
Workers should keep branchable facts in :outputs and attach larger reports or
files as artifacts. A common shape is:
:outputscarries small fields such as:finding-count,:severity,
:report-resource-uri,:summary-resource-uri, or
:findings-table-uri:artifactscarries resource-backed entries such as a markdown report with a
:resource-uri, a structured summary, or a findings table
Example flow usage:
'(let [queued-job (flow/step :job :submit-agent-review
{:op :submit
:job-type "agent-review"
:payload {:surface "flows-api"}})
final-job (flow/step :job :await-agent-review
{:op :await
:job-id (:job-id queued-job)})
finding-count (get-in final-job [:outputs :finding-count])
report-uri (get-in final-job [:outputs :report-resource-uri])
summary-uri (get-in final-job [:outputs :summary-resource-uri])
findings-uri (get-in final-job [:outputs :findings-table-uri])]
{:finding-count finding-count
:report-uri report-uri
:summary-uri summary-uri
:findings-uri findings-uri
:artifacts (:artifacts final-job)})
Batch Example
'(let [surfaces ["flows-api" "runtime" "worker"]
queued-batch (flow/step :job :submit-security-review-batch
{:op :submit-batch
:job-type "codex-security-review"
:metadata {:campaign "weekly-audit"}
:jobs (mapv (fn [surface]
{:payload {:surface surface}})
surfaces)})
final-batch (flow/step :job :await-security-review-batch
{:op :await-batch
:batch-id (:batch-id queued-batch)
:interval "15s"
:timeout "45m"
:include-jobs? true
:limit 50})]
{:batch-id (:batch-id final-batch)
:status (:status final-batch)
:succeeded (:succeeded-count final-batch)
:failed (:failed-count final-batch)
:jobs (:jobs final-batch)})
Recommended Model
Use the :job step as the primary external execution surface.
- Flow creates one job or a small explicit batch.
- External workers claim and execute that work through
breyta jobs worker run. - Flow awaits the job or batch and branches on the normalized result.
Authoring boundary:
- use
flow/step :jobin the flow body to submit and await work - use
{:kind :worker ...}inside:requiresto declare the external worker
dependency to installers and operators
Step Fields
Common fields:
| Field | Required | Meaning |
|---|---|---|
:op | Yes | One of :submit, :submit-batch, :get, :get-batch, :await, :await-batch. |
Submit fields:
| Field | Ops | Required | Meaning |
|---|---|---|---|
:job-type | :submit, :submit-batch | Yes | Worker capability/job type to target. |
:payload | :submit | No | Structured job input for the worker. |
:metadata | :submit, :submit-batch | No | Orchestration/operator metadata. |
:jobs | :submit-batch | Yes | Vector of per-job configs, typically { :payload ... :metadata ... }. |
:max-attempts | :submit, :submit-batch | No | Max lease attempts before timeout. |
Read/await fields:
| Field | Ops | Required | Meaning |
|---|---|---|---|
:job-id | :get, :await | Yes | Target job id. |
:batch-id | :get-batch, :await-batch | Yes | Target batch id. |
:interval | :await, :await-batch | No | Poll interval. Defaults to "10s". |
:timeout | :await, :await-batch | Conditionally | Duration or ms timeout cap. Defaults to "30m" when no bound is provided. |
:max-attempts | :await, :await-batch | Conditionally | Attempt cap. |
:backoff | :await, :await-batch | No | Interval progression map with :constant, :linear, or :exponential. |
:include-jobs? | :get-batch, :await-batch | No | Include :jobs in the batch result. Defaults to true for terminal results. |
:limit | :get-batch, :await-batch | No | Max jobs returned when :jobs are included. |
Internal lineage metadata may be attached automatically, but it is not part of
the primary authoring contract.
Terminal Statuses
Jobs stop awaiting when :status becomes one of:
"succeeded""no_changes""failed""cancelled""timed_out"
Batches stop awaiting when :status becomes one of:
"completed""failed""partially_failed""cancelled"
When To Prefer flow/poll
Use flow/poll instead when:
- the external async surface is not backed by the Breyta jobs control plane
- completion depends on a custom predicate instead of standard job/batch terminal states
- you are polling a third-party API directly