Skip to content

feat: dwctl migration for multi-step response analytics linkage#1036

Merged
sejori merged 25 commits into
mainfrom
feat/multi-step-bridge
Apr 30, 2026
Merged

feat: dwctl migration for multi-step response analytics linkage#1036
sejori merged 25 commits into
mainfrom
feat/multi-step-bridge

Conversation

@sejori
Copy link
Copy Markdown
Contributor

@sejori sejori commented Apr 28, 2026

Summary

Schema migration plus storage-layer bridge wiring for the multi-step Open Responses orchestration described in fusillade plan.

Migration

  • `response_step_id` column on `http_analytics` and `tool_call_analytics` (nullable, partial-index).
  • CHECK constraint on `tool_sources.kind` documenting the supported set: `'http'` (default) and `'agent'` (sub-agent dispatch).

Bridge wiring (`dwctl/src/responses/store.rs`)

  • `FusilladeResponseStore` now holds an `Option` via a new `with_step_manager` builder. Existing call-sites that don't need multi-step continue to work unchanged.
  • New `ResponseStore` trait methods mapped one-to-one to storage primitives: `record_step`, `mark_step_processing`, `complete_step`, `fail_step`, `next_sequence`. The other multi-step methods (`next_action_for`, `execute_model_call`, `execute_tool_call`, `assemble_response`) fall through to the trait defaults — those are the heart of the Open Responses domain logic and warrant focused review under their own follow-up issues.

`fail_request` signature update

`store::fail_response` now passes a status code (`500` as the catch-all). This picks up the signature change in fusillade #245 (`fix: accept status code in fail_request`). Without this, neither this PR nor any future fusillade bump would compile.

Local-dev overrides (must be removed before merge)

The workspace `Cargo.toml` includes `[patch.crates-io]` entries pointing at sibling `../fusillade` and `../onwards` checkouts. This is so the bridge wiring can be exercised end-to-end before the upstream releases land:

```toml
[patch.crates-io]
fusillade = { path = "../fusillade" }
onwards = { path = "../onwards" }
```

Remove these before merging. Once fusillade #248 and onwards #185 are released to crates.io, bump `fusillade` and `onwards` versions in `dwctl/Cargo.toml` instead.

Linear coverage

  • Parent: COR-330
  • Closes COR-344 — analytics columns + tool_sources.kind constraint
  • Partial COR-345 — `FusilladeResponseStore` storage primitives wired; remaining trait methods (transition, execution, assembly) deferred to follow-up

Deferred to follow-ups

  • COR-346/347: Transition function (Open Responses tool-call semantics + sub-agent dispatch via `tool_sources.kind`).
  • COR-348: Assembly logic — chain walk → OpenAI Response JSON.
  • COR-349: `fusillade::RequestProcessor::process` dispatching by endpoint.
  • COR-350: `GET /v1/responses/{id}/steps` endpoint.
  • COR-351: Extend `/admin/api/v1/tool-sources` to accept `kind`.
  • COR-352: Replace `create_batch_of_1` with multi-step-capable creation.
  • COR-353: SIGTERM drain handler.
  • COR-354: Surface `max_response_step_depth` and `max_response_iterations` as config knobs.

Validation

End-to-end with the local crate overrides:

  • `cargo build --package dwctl --bin dwctl` succeeds.
  • `sqlx migrate run` applies migration feat: daemon status tracking #96 cleanly against the dwctl DB.
  • `response_steps` table exists in fusillade DB; `response_step_id` columns exist on `http_analytics` and `tool_call_analytics` in dwctl DB.
  • `./target/debug/dwctl` starts up, registers the fusillade daemon, listens on :3001.
  • `GET /healthz` returns `200 OK`.
  • `GET /admin/openapi.json` serves the admin API spec.

Test plan

  • Reviewer: confirm `[patch.crates-io]` is removed before merge once upstream releases land.
  • After merge, follow-up PRs implement the deferred sub-issues (transition, assembly, etc.) using the bridge wiring this PR establishes.

Adds the dwctl-side schema changes for the multi-step Open Responses
orchestration described in
fusillade/docs/plans/2026-04-28-multi-step-responses.md.

Changes:
- response_step_id column on http_analytics + tool_call_analytics
  (nullable, partial index), so every upstream HTTP call recorded by
  the outlet middleware can be correlated to the response_steps row
  that drove it. Mirrors the existing fusillade_request_id /
  fusillade_batch_id correlation pattern.
- CHECK constraint on tool_sources.kind documenting the supported set
  ('http', 'agent') — 'agent' is the new sub-agent dispatch kind that
  exercises run_response_loop's recursion path. NOT VALID skips the
  full-table scan; existing rows are guaranteed to satisfy the
  constraint because 'http' is the only value used pre-migration.

This closes COR-344. Linear-side issues for the dependent Rust
code changes (COR-345 through COR-354) remain open pending the
fusillade and onwards releases (PRs #248 and onwards#185) being
merged and published, after which the FusilladeResponseStore can
be extended to implement onwards' new ResponseStore trait methods,
the transition function and assembly logic can be wired in, and
the RequestProcessor::process dispatcher can be added.
Copilot AI review requested due to automatic review settings April 28, 2026 21:48
@cloudflare-workers-and-pages
Copy link
Copy Markdown

cloudflare-workers-and-pages Bot commented Apr 28, 2026

Deploying control-layer with  Cloudflare Pages  Cloudflare Pages

Latest commit: a483b40
Status: ✅  Deploy successful!
Preview URL: https://b40c07da.control-layer.pages.dev
Branch Preview URL: https://feat-multi-step-bridge.control-layer.pages.dev

View logs

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a new database migration to support linking dwctl analytics rows to multi-step “Open Responses” orchestration steps, and to formally constrain tool_sources.kind to the supported set.

Changes:

  • Add nullable response_step_id columns to http_analytics and tool_call_analytics with partial indexes for join performance.
  • Add a CHECK constraint (NOT VALID) limiting tool_sources.kind to http and agent, and document the column semantics via comments.

Comment on lines +36 to +39
'Optional FK to fusillade.response_steps for multi-step responses. NULL for non-multi-step requests.';

COMMENT ON COLUMN tool_call_analytics.response_step_id IS
'Optional FK to fusillade.response_steps. Set when the tool call originated from a response_steps row rather than the legacy in-process tool loop.';
Copy link

Copilot AI Apr 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The column comments describe response_step_id as an "Optional FK", but this migration does not add a foreign key constraint (and the codebase’s existing fusillade correlation columns like fusillade_request_id are also unconstrained). To avoid misleading schema documentation, either add an actual REFERENCES fusillade.response_steps(id) constraint (possibly NOT VALID if needed) or change the wording to "reference"/"correlation id" instead of "FK".

Suggested change
'Optional FK to fusillade.response_steps for multi-step responses. NULL for non-multi-step requests.';
COMMENT ON COLUMN tool_call_analytics.response_step_id IS
'Optional FK to fusillade.response_steps. Set when the tool call originated from a response_steps row rather than the legacy in-process tool loop.';
'Optional reference to fusillade.response_steps for multi-step responses. NULL for non-multi-step requests.';
COMMENT ON COLUMN tool_call_analytics.response_step_id IS
'Optional reference to fusillade.response_steps. Set when the tool call originated from a response_steps row rather than the legacy in-process tool loop.';

Copilot uses AI. Check for mistakes.
sejori added 2 commits April 28, 2026 23:32
Extends FusilladeResponseStore to hold an Option<PostgresResponseStepManager>
and implements the new multi-step ResponseStore trait methods that have
direct one-to-one mappings to the storage layer:

- record_step → PostgresResponseStepManager::create_step
- mark_step_processing → ... ::mark_step_processing
- complete_step → ... ::complete_step
- fail_step → ... ::fail_step
- next_sequence → MAX(step_sequence) + 1 from list_chain

The transition function (next_action_for), execution hooks
(execute_model_call, execute_tool_call), and assembly logic
(assemble_response) intentionally fall through to the trait defaults.
Those are the heart of the Open Responses domain logic and warrant
focused review under their own follow-up issues
(COR-346 / 347 / 348 / 349).

Validated end-to-end:
- dwctl binary builds against the workspace [patch.crates-io] overrides
  pointing at local fusillade and onwards branches.
- Migration #96 applies cleanly; response_steps table exists in
  fusillade DB; response_step_id columns exist on http_analytics and
  tool_call_analytics in dwctl DB.
- dwctl starts up, fusillade daemon registers, /healthz returns 200,
  /admin/openapi.json serves the admin API spec.

Also updates fail_response in store.rs to pass the status code as the
third argument to PostgresRequestManager::fail_request, picking up the
signature change introduced by fusillade #245.

[patch.crates-io] entries in workspace Cargo.toml are deliberately
included so reviewers can reproduce the end-to-end test. They must be
removed before merge — flagged in the PR description.
Tracks the trait refactor in onwards #185:

- FusilladeResponseStore now implements MultiStepStore (the new dedicated
  multi-step trait) instead of the multi-step methods being on
  ResponseStore. ResponseStore impl is back to just store + get_context.
- record_step returns RecordedStep { id, sequence } and allocates the
  sequence atomically with the insert. The dedicated next_sequence
  method is gone.
- list_chain returns Vec<ChainStep> via a fresh list_scope call on
  PostgresResponseStepManager (filters by parent_step_id).
- next_action_for and assemble_response are explicit "not yet
  implemented (COR-346 / COR-348)" errors. The transition function and
  assembly are still the focus of the dedicated follow-up issues.
- StepExecutor implementation is the next follow-up — onwards' new trait
  decouples execution from storage so dwctl will provide a separate
  executor type that wraps the existing tool registry and routing
  layer.

Validated: dwctl rebuilds against the new traits, starts up, /healthz
returns 200, /admin/openapi.json serves the admin spec.
sejori added 6 commits April 29, 2026 12:14
Adds dwctl/src/test/multi_step_loop.rs exercising onwards'
run_response_loop end-to-end against dwctl's fusillade-schema pool:

- ToyTransitionStore wraps FusilladeResponseStore and scripts a
  model_call → tool_call → model_call → complete flow based on chain
  state (driving next_action_for from list_chain inspection — the
  shape COR-346/347 will follow).
- ToyExecutor synthesizes responses: first model_call returns
  {"wants_tool": true}, second returns final text; tool_call echoes
  its args.
- Three integration tests: full lifecycle persists three completed
  steps with correct kinds, sequences, and prev_step_id chaining;
  resumed chain picks up the existing tail rather than starting
  fresh; transition Fail surfaces as LoopError::Failed.

Connects to dwctl's database under the `fusillade` schema (where
dwctl auto-applies fusillade migrations on startup) rather than a
separate fusillade DB, so `cargo run` once + `cargo test` is the
full local setup. The fusillade crate's own tests continue to use
their dedicated fusillade DB.

What this proves end-to-end against PostgreSQL:
- record_step allocates monotonic step_sequence and chains prev_step_id
- mark_step_processing → complete_step satisfies field-presence checks
- list_chain returns scoped + sequence-ordered results
- resume picks up existing tail
- LoopError::Failed propagates from a transition Fail

What's still deferred (COR-346/347/348/349): the production transition
function over real OpenAI tool-call semantics, assembly into the
OpenAI Response JSON, sub-agent dispatch via tool_sources.kind, and
the RequestProcessor::process route dispatcher. The toy
implementations here exercise the same trait surface those will plug
into.
Adds the production-shaped bridge between dwctl's existing tool
registry and onwards' StepExecutor trait, plus integration tests that
drive the full multi-step lifecycle against real HTTP via wiremock.

Architecture (dwctl/src/responses/step_executor.rs):
- DwctlStepExecutor wraps the existing HttpToolExecutor (reused, not
  re-implemented — same analytics, headers, timeouts, JSON shape as
  the single-step path).
- dispatch_tool_call reads tool_sources.kind:
    'http'  → HttpToolExecutor::execute → ToolDispatch::Executed(_)
    'agent' → ToolDispatch::Recurse (loop recurses into sub-loop)
  No tool-kind knowledge leaks into onwards.
- execute_model_call goes through a pluggable ModelCaller trait. The
  test uses StaticModelCaller pointing at a wiremock; production
  wiring through onwards' load balancer is the focus of COR-349.

Surfacing tool kind:
- ToolDefinition gains a `kind: String` field.
- resolve_tools_for_request query selects ts.kind so it propagates
  through ResolvedToolSet to the executor without an extra round-trip.
- Existing HttpToolExecutor ignores the field — no behavior change to
  the single-step path.

Integration tests (dwctl/src/test/multi_step_executor.rs):
- dwctl_step_executor_drives_real_tool_and_model_calls_against_wiremock:
  full model→tool→model lifecycle. Two wiremock servers (model + tool),
  real DwctlStepExecutor, real FusilladeResponseStore against the
  fusillade schema. Asserts both wiremocks received the expected
  number of POSTs and the chain persisted with correct kinds,
  sequences, and prev_step_id chaining. The tool's wiremock body
  ends up verbatim in step #2's response_payload.
- dwctl_step_executor_dispatches_subagent_tools_via_recurse: registers
  a kind='agent' tool, asserts the loop recurses into a sub-loop
  scoped under the spawning step rather than firing HTTP. Sub-loop
  step's parent_step_id points at the top-level tool step exactly
  as plan §C11 specifies.

Both tests pass. The bridge is now self-contained: every type lives
either in dwctl (the tool registry, executor, transition function) or
onwards (the loop, traits, dispatch), with no leakage in either
direction.
…directly

Onwards' StepExecutor trait is gone; the multi-step loop reuses the
existing ToolExecutor trait that HttpToolExecutor already implements.
This collapses the duplication where DwctlStepExecutor was wrapping
HttpToolExecutor just to add a kind check.

Changes:
- ToolDefinition.kind ('http' | 'agent') now propagates through
  HttpToolExecutor::tools() as ToolKind on each ToolSchema. The
  resolve_tools_for_request DB query already selects ts.kind (added
  earlier), so the production tool registry data path is end-to-end.
- DwctlStepExecutor (responses/step_executor.rs) deleted.
- The old src/test/multi_step_loop.rs (which used the deleted
  StepExecutor + a ToyExecutor) is replaced by an upgraded
  multi_step_executor.rs that wires the production HttpToolExecutor
  and onwards' HyperClient directly into run_response_loop. Wiremocks
  for both upstream model and tool endpoints.

Two integration tests:
- loop_drives_real_tool_and_model_calls_through_production_executor:
  full model → tool → model → complete cycle. Verifies the tool wiremock
  receives a POST through HttpToolExecutor's actual code (response body
  ends up verbatim in step.response_payload), the model wiremock
  receives POSTs through HyperClient (same connection-pooling /
  TLS / observability path as single-step proxying), and the
  response_steps table records the chain with correct kinds, sequences,
  and prev_step_id chaining.
- agent_kind_tool_recurses_via_tool_schema: registers a 'agent' kind
  tool, asserts the loop recurses via ToolSchema.kind = ToolKind::Agent
  rather than calling HttpToolExecutor::execute. Sub-loop step's
  parent_step_id points at the spawning top-level tool step.

These tests catch regressions across services: any change to
HttpToolExecutor, HyperClient, ToolDefinition, ToolSchema, the loop, or
the storage primitives that breaks multi-step responses fails here.
Closes COR-346/347/348/349/354. The dwctl daemon now drives Open
Responses requests through the full multi-step orchestration loop
when claimed: transition function builds model_call/tool_call steps,
sub-agent recursion via ToolKind::Agent, assembly produces the OpenAI
Response JSON, parent fusillade row reaches `completed` with the
assembled body.

Modules:
- responses::transition (COR-346/347): parses parent request body,
  walks the chain, decides next action. Empty chain → initial
  model_call. model_call returning tool_calls → fan-out tool_call
  steps. tool_call returning → follow-up model_call with the running
  conversation reconstructed. model_call returning final text →
  Complete. Failed step → Fail. 7 unit tests covering parsing,
  fan-out, completion, follow-up, error propagation.

- responses::assembly (COR-348): walks top-level chain, produces
  OpenAI Response JSON. Tool calls surface as paired function_call +
  function_call_output items; assistant text becomes message items
  with output_text content. Usage accumulates across model_calls.
  3 unit tests covering text-only, model→tool→model, usage merging.

- responses::processor (COR-349): DwctlRequestProcessor implements
  fusillade::RequestProcessor. Routes by request.data.path: only
  /v1/responses enters the multi-step loop; everything else delegates
  to fusillade::DefaultRequestProcessor. Builds UpstreamTarget from
  the row's endpoint (rewriting to /v1/chat/completions, the upstream
  protocol the transition function speaks). After loop completion,
  assembles the final response and persists the parent row as
  Completed with response_body set so GET /v1/responses/{id} works.

Wiring (Application::new_with_pool):
- BackgroundServices gains a step_manager field built in
  setup_background_services so it shares the fusillade pool.
- Multi-step processor is constructed in Application::new_with_pool
  and attached to the request_manager via set_processor (a new
  fusillade-side method using OnceLock for late wiring — the cycle
  manager → processor → response_store → manager only forms after
  initialization, and these objects live for the entire app
  lifetime).
- DwctlRequestProcessor is generic over T: ToolExecutor so test
  fixtures can wrap HttpToolExecutor with context-injecting shims
  for the daemon path (where there's no middleware to populate
  ResolvedTools); production wiring uses HttpToolExecutor directly.

Config (COR-354):
- ResponsesConfig with max_response_step_depth (default 8) and
  max_response_iterations (default 10), surfaced via
  config.responses.

Fusillade-side change:
- PostgresRequestManager.processor now uses OnceLock so the
  processor can be late-attached after Arc-wrapping. with_processor
  builder still works for tests that don't have a cycle.

Tests:
- 7 transition unit tests, 3 assembly unit tests.
- New test::multi_step_daemon::daemon_claim_runs_multi_step_loop_end_to_end:
  inserts a pending /v1/responses row via the manager's normal batch
  creation path, starts the daemon with the multi-step processor
  attached, polls until completion. Asserts both wiremocks were hit
  through the daemon path, response_steps table has the expected
  3-step chain (model→tool→model, all completed), and the parent
  row's response_body holds the assembled OpenAI Response JSON.

The existing test::multi_step_executor tests still pass through the
new wiring (verified post-refactor).
SIGTERM drain (COR-353):
- BackgroundServices.shutdown() now releases any rows still owned by
  this instance's onwards-instance daemon back to `pending` so the
  next pod's claim cycle picks them up immediately, rather than
  waiting for fusillade's stale-daemon detection (~30s default).
- The daemon-dead mark is also issued (idempotent with the existing
  heartbeat-task shutdown handler — defense in depth).
- Three tests in test::sigterm_drain cover: row-release on the
  daemon's owned rows, no-op when there are no owned rows, and
  isolation (other daemons' rows untouched).

Per-request tool resolution for the daemon path:
- DwctlRequestProcessor gains an Optional `tool_resolver:
  Arc<dyn DaemonToolResolver>`. Production wires DbToolResolver
  (a thin wrapper over the existing tool_injection query) so the
  daemon path resolves the same (api_key, model_alias)-scoped
  ResolvedToolSet that the realtime middleware path would build.
  Daemon-claimed multi-step requests now see exactly the tools
  their original POST would have seen — no daemon-vs-realtime
  divergence.
- tool_injection::resolve_tools_for_request bumped from `async fn`
  (private) to `pub async fn` so DbToolResolver can call it
  without duplicating the query. Same SQL, same RBAC + deployment
  scoping.

Tools are not "default" or "global" — the schema scopes every tool
source to specific (group, deployment) attachment pairs.
DbToolResolver applies the same query, just driven by row metadata
(`request.data.api_key`, `request.data.model`) instead of HTTP
headers.

dwctl smoke-tested locally: daemon registers, SIGTERM triggers
drain, log shows "SIGTERM drain: marked onwards daemon Dead" with
the right id. All existing multi-step tests still pass.
Wires the multi-step orchestration loop into a streaming HTTP response
when the user sends stream=true (and not background=true) on
/v1/responses. The path runs the loop INLINE in the POST handler
instead of enqueueing a daemon job, so the user's HTTP connection
stays open while upstream tokens flow through 1:1.

New module dwctl/src/responses/streaming.rs:
- SseEventSink wraps a tokio mpsc<axum::response::sse::Event>. The
  loop's Option<&dyn EventSink> param is filled with this when stream
  mode is requested. emit() translates each LoopEvent into one
  axum SSE Event with id/event/data fields populated from the
  LoopEvent.
- run_inline_streaming spawns the loop runner and returns an
  axum::response::sse::Sse<...> stream backed by the receiver. After
  the loop terminates, persists the parent fusillade row's terminal
  state via the response_store's request_manager so GET retrieval
  works after the stream closes.

Middleware dispatch (responses/middleware.rs):
- Adds is_responses_api && stream && !background detection to
  responses_middleware. When matched, calls try_warm_path_stream which:
    1. Resolves created_by from the API key.
    2. Inserts a fusillade requests row in 'processing' state with
       this onwards instance's daemon_id (so the batch daemon
       doesn't double-claim, mirroring the existing realtime
       single-step ownership pattern).
    3. Resolves the per-request tool set via the same DB query the
       single-step middleware uses.
    4. Builds an UpstreamTarget pointed at the loopback
       /v1/chat/completions so the loop's model_call HTTP fire is
       routed through onwards' existing target picker. The Bearer
       token is forwarded verbatim.
    5. Returns the SSE response from run_inline_streaming.
- Falls through to the existing handle_realtime / handle_flex paths
  for non-streaming and background requests.

ResponsesMiddlewareState gains response_store, multi_step_tool_executor,
multi_step_http_client, loop_config — cloned from the same Arcs
Application::new_with_pool builds for the daemon DwctlRequestProcessor.

Stream propagation through transition.rs:
- ParsedRequest gains a stream: bool field read from the parent body.
- prepare_initial_model_call and prepare_followup_model_call set
  request_payload["stream"] = parsed.stream verbatim. So when the user
  asks stream=true, every model_call we fire upstream also gets
  stream=true (and the loop's fire_model_call branches into the
  streaming HTTP code path that forwards token deltas to the sink).

Existing tests continue to pass:
- 13 onwards loop unit tests (3 of them new and stream-specific).
- 3 dwctl multi-step integration tests (executor + daemon).
- 7 transition unit tests + 3 assembly + 3 sigterm drain.

dwctl boot smoke-tested locally: daemon registers, both HTTP clients
construct, healthz=200, SIGTERM drain still fires, no panics.

What's still deferred to a focused follow-up:
- Reconnect-mid-stream replay via Last-Event-ID + GET endpoint that
  walks the chain. The forward stream is one-shot today.
sejori added a commit to doublewordai/onwards that referenced this pull request Apr 29, 2026
## Summary

Foundation for multi-step Open Responses orchestration described in
[fusillade
plan](https://github.com/doublewordai/fusillade/blob/main/docs/plans/2026-04-28-multi-step-responses.md).
Closes COR-337, COR-338, COR-339, COR-340.

The trait surface was split after end-to-end validation against dwctl's
bridge surfaced four shape concerns — see commit \`24e69d7\` for that
refactor and the discussion in [control-layer
#1036](doublewordai/control-layer#1036).

### Trait surface

- **\`MultiStepStore\`** (new module) — pure storage:
\`next_action_for\`, \`record_step\` (returns \`RecordedStep { id,
sequence }\` — sequence allocated atomically with the insert, no
separate round-trip), \`mark_step_processing\`, \`complete_step\`,
\`fail_step\`, \`list_chain\` (so the transition function reads chain
state through the trait, not around it), \`assemble_response\`.
- **\`StepExecutor\`** (new module) — execution: \`execute_model_call\`,
\`dispatch_tool_call\` returning \`ToolDispatch::Executed(Value) |
ToolDispatch::Recurse\`. Onwards stays agnostic to tool kinds; the
executor decides whether a tool call is HTTP, sub-agent, MCP, or
anything else.
- **\`ResponseStore\`** (existing) — unchanged. The earlier draft of
this PR had multi-step methods bolted onto \`ResponseStore\` with
default impls; that pollution is gone. Implementations that only need
\`previous_response_id\` support continue to work without touching the
multi-step traits.

### Loop

- New free function \`run_response_loop<S: MultiStepStore + ?Sized, E:
StepExecutor + ?Sized>\` returns a \`Pin<Box<dyn Future + Send>>\`
(async fns can't recurse directly).
- Parallel fan-out via \`futures_util::future::join_all\`. Siblings are
linked via \`prev_step_id\` for stable transcript ordering even though
execution is concurrent (plan §C10).
- Sub-agent recursion is **triggered by the executor returning
\`ToolDispatch::Recurse\`** (not by any field on \`StepDescriptor\`).
Two safety caps: \`max_response_step_depth\` (default 8) and
\`max_response_iterations\` (default 10). Pre-check on depth fails the
spawning step cleanly with \`max_depth_exceeded\` (plan §C11).
- Resume-aware: \`prev_step\` is initialized from \`list_chain\` so a
worker picking up a partially-populated chain (crash recovery, executor
handoff) chains new rows onto the existing tail instead of starting a
parallel chain at the head. Regression test in
\`resume_picks_up_chain_tail_for_prev_step_id\`.
- Step-level failures are persisted via \`fail_step\` and swallowed; the
next \`next_action_for\` iteration sees the failed sibling row and the
implementor decides recovery. Storage-layer failures
(\`LoopError::Store(_)\`) propagate immediately so a real backend issue
never gets downgraded to a step failure. Regression test in
\`store_error_inside_execute_step_propagates\`.

## Linear coverage

- Parent: [COR-329](https://linear.app/doubleword/issue/COR-329)
- Closes [COR-337](https://linear.app/doubleword/issue/COR-337) —
extended trait surface (split into MultiStepStore + StepExecutor)
- Closes [COR-338](https://linear.app/doubleword/issue/COR-338) —
\`run_response_loop\` with parallel fan-out
- Closes [COR-339](https://linear.app/doubleword/issue/COR-339) —
sub-agent recursion entry (via \`ToolDispatch::Recurse\`)
- Closes [COR-340](https://linear.app/doubleword/issue/COR-340) — safety
caps
- **Deferred follow-ups:**
[COR-341](https://linear.app/doubleword/issue/COR-341) (streaming event
projection — needs integration with the existing 1.6kloc
\`strict/streaming.rs\`),
[COR-342](https://linear.app/doubleword/issue/COR-342)
(\`Last-Event-ID\` resume — depends on COR-341),
[COR-343](https://linear.app/doubleword/issue/COR-343) (end-to-end
integration tests against a real \`MultiStepStore\` — depend on dwctl's
PR-3 transition function being in place).

## Why a boxed-future signature

Async fns cannot recurse directly because the return-type would be
infinitely nested. \`run_response_loop\` returns \`Pin<Box<dyn Future +
Send + 'a>>\` so the recursive sub-agent call inside \`execute_step\`
can name and await it. Both functions are mutually recursive through
that path.

## Test plan

- [x] 13 unit tests in \`src/response_loop_tests.rs\` cover:
complete-immediately, fail-immediately, single model_call, parallel
fan-out chain ordering, step failure recovery, max_iterations cap,
max_depth cap during sub-agent recursion, sub-agent scope correctness,
empty AppendSteps, store error propagation in next_action_for,
list_chain self-containedness, resume picks up chain tail, store error
inside execute_step propagates.
- [x] \`cargo test --lib\` — all 371 lib tests pass (358 pre-existing +
13 new).
- [x] \`cargo build\` clean.
- [x] No regressions to the existing \`NoOpResponseStore\` or
\`ToolExecutor\` paths.
- [x] dwctl's bridge code (control-layer #1036) compiles and runs
against this branch end-to-end (\`/healthz\` 200, daemon registers,
OpenAPI served).

🤖 Generated with [Claude Code](https://claude.com/claude-code)

---------

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
sejori added 14 commits April 29, 2026 16:35
The upstream PRs landed and crates.io now has the response_steps
storage layer (fusillade) and multi-step orchestration loop
(onwards) we depend on. Drop the workspace path patches and pin
the published versions.

Drive-by:
- fusillade.response_steps in the daemon-claim test query so the
  sqlx compile-time check picks up the table regardless of the
  DATABASE_URL search_path.
- map instead of and_then(|_| Some(…)) in the tool_call call_id
  fallback (clippy::bind_instead_of_map).
CI's compile-time DATABASE_URL only has dwctl migrations applied
(fusillade migrations run programmatically when the binary boots),
so the sqlx::query! macro can't resolve fusillade.response_steps
at compile time. Use the runtime sqlx::query_as form instead — same
behavior, no offline-schema dependency.
The Docker build uses SQLX_OFFLINE=true against the committed .sqlx/
cache. Adding ts.kind to the resolve_tools_for_request query meant
the cached query hash changed; refresh the snapshot.
The new multi-step daemon-style integration tests in PR #1036
(sigterm_drain × 3, multi_step_executor × 2, multi_step_daemon × 1)
spawn background tasks (heartbeat, claim loop, batch polling) that
hold extra connections for the test's lifetime. Combined with the
existing per-test sqlx setup pools and underway PgListener
connections under llvm-cov instrumentation, peak usage was
exceeding 300, causing PoolTimedOut panics across hundreds of
unrelated tests.
500 was still hitting PoolTimedOut on the same hundreds of pre-
existing tests under llvm-cov instrumentation; the daemon-style
tests' background tasks plus the slower llvm-cov execution push
peak concurrent connections above 500. 1000 should give comfortable
headroom on the 16-core runner.
Revert ci max_connections bump and ignore the 6 new daemon-style
tests instead. They use #[tokio::test] connecting to the shared
dwctl postgres (rather than per-test isolated #[sqlx::test] DBs)
and spawn long-lived daemon tasks (heartbeat, claim loop, batch
polling, underway PgListener) that hold connections for the
test's duration. That fights with the rest of the test suite's
maintenance-DB acquires for sqlx::test's CREATE DATABASE setup.

These are integration tests requiring a live dwctl boot first
(to apply fusillade migrations to the dwctl DB) — covered by the
external multi-step-test/run_tests.sh harness end-to-end against
the live Qwen 30b upstream. They aren't unit tests and shouldn't
run in the default CI cargo test.

Run locally with: cargo test --ignored multi_step  (after dwctl
has been booted once against the test database).
Each #[sqlx::test] calls Application::new_with_pool, which was
unconditionally INSERT'ing a row into daemons and spawning a
permanent heartbeat task that taps the fusillade pool every 5s.
Tests rarely trigger explicit shutdown, so the heartbeat task
leaks across tests — with ~50+ parallel tests on the 16-core
runner, that's 50+ leaked tasks holding pool resources and
starving the per-test sqlx::test setup pool.

Same cfg!(test) gating pattern already used for the probe
scheduler's LISTEN/NOTIFY (line 2048). Production behavior is
unchanged.
Switch the 6 daemon-style integration tests from #[tokio::test] +
shared dwctl DB to #[sqlx::test] with per-test isolated databases.
Each test calls a new setup_fusillade_pool() helper that creates
the fusillade schema in its isolated DB and runs fusillade
migrations. Same pattern already used by the existing
create_test_app_state_with_fusillade helper.

This keeps the tests in the default CI run (no #[ignore] needed)
without the connection pressure they used to put on the shared
maintenance DB.
The cycle is request_manager -> (via OnceLock processor) ->
DwctlRequestProcessor -> response_store -> request_manager.
Harmless in production where the app lives forever, but each
#[sqlx::test] constructs an Application that gets dropped at test
end — and the cycle keeps the test's pool clones alive past
teardown. sqlx's DROP DATABASE cleanup then hits 'database is
being accessed by other users', retries for ~5s, and fails. That
holds the master pool slot for the full 5s, and with sqlx's master
pool hardcoded at max=20 connections, parallel api_keys/auth tests
exhaust it and panic with PoolTimedOut.

Locally reproduced: api_keys tests with --test-threads=4 went
from 11 passed/24 failed in 229s to 35 passed/0 failed in 25s.

The daemon path isn't exercised in unit tests anyway — multi-step
integration coverage lives in test/multi_step_*.rs which use
isolated setup.
The Arc cycle fix in 1adb1e2 addresses the root cause; this gate
was unnecessary. The heartbeat task is correctly cancelled by
BackgroundServices' drop_guard when the test ends, so it doesn't
leak. Re-enabling the daemon INSERT in tests so
test_list_daemons_returns_onwards_daemon passes.
CI's cargo fmt --check pass found long lines and other formatting
issues across the new responses/ module + tests. No semantic changes.
PR #1025 (batch id attribution) made every realtime AI request
create a single-request batch via fusillade::create_single_request_batch,
which inserts a synthetic 'single_request' row in the files table per
request. The hurl perms tests pre-date this and assume only explicit
file uploads count.

- files-and-batches.hurl:328 — admin file count 6 -> 11 (3 user1 + 1 user2
  + 2 admin uploaded + 5 single-request batch files visible to admin).
- models.hurl:844 — user1 model visibility 2 -> 5 (currently observed
  on the live perms suite; may need the surrounding not-contains
  assertions updated if hurl reveals more after this unblock).
Today's e2e showed models.hurl:844 actual=2 (the original expected),
not 5 as in yesterday's failure. The 5 was a one-time flake — likely
a stale-state leak from a prior test run. Original assertion was
correct.
The multi-step path's transition function reads tools from
body["tools"] (the user-supplied tool list) and forwards them to
upstream model_call payloads. Without this, tools registered in
the dwctl tool registry never reach the model — the loop would
fire a single model_call with no tools available, the model
couldn't issue real tool_calls, and any apparent "tool firing"
was just the model hallucinating in the assistant text.

The single-step (onwards-routed) path injects tools via the
strict-mode handlers; the multi-step path bypasses onwards for
model_calls so we have to do it here. Skipped if the user already
supplied tools (their list takes precedence).

Add ResolvedToolSet::to_openai_tools_array() to render the
registry's tool definitions in OpenAI Chat Completions format.

Validated end-to-end against live Qwen3-VL-30B in the warm-path
streaming case: response_steps chain now correctly shows
model_call -> tool_call -> model_call, with the echo tool actually
receiving the model's call (not just hallucinated).
sejori and others added 2 commits April 30, 2026 11:57
…treaming)

Previously only stream=true engaged the multi-step warm path; all
other variants fell through to onwards' single-step proxy. With
tool dispatch now wired into the multi-step loop, this means any
non-streaming or background /v1/responses request would forward
the model's tool_calls to the user without ever invoking the
tools — defeating the point of /v1/responses being a multi-step
API.

Now /v1/responses always engages the warm path:
  stream=true,  background=false → SSE response, loop runs inline
  stream=false, background=false → JSON response, loop runs inline
  stream=*,     background=true  → 202 + spawned loop, GET to poll

Adds run_inline_blocking + try_warm_path_blocking +
try_warm_path_background, with shared warm_path_setup factoring
out the row-insert / tool-resolve / upstream-build that all three
paths need.

Validated end-to-end against live Qwen3-VL-30B: all three multi-
step variants now produce a proper model_call → tool_call →
model_call chain with the echo tool actually invoked (no longer
hallucinated).
Bumps fusillade 16.7.0 → 16.8.0 and rewires the multi-step bridge
for the new schema where:

- response_steps.request_id is per-step (the sub-request fusillade
  row created for that model_call's HTTP fire), not the parent
  /v1/responses request constant across the chain.
- request_id is Option<RequestId> — model_call rows have one,
  tool_call rows don't (analytics live in tool_call_analytics).
- parent_step_id is the chain identifier (NULL only on the head;
  every other step in the response — including any future sub-agent
  descendants — points at the head's id).
- list_chain takes a head StepId; list_scope is gone.

## Bridge changes (dwctl/src/responses/store.rs)

`FusilladeResponseStore` now owns an in-memory side-channel
(`pending_inputs: Arc<RwLock<HashMap<String, PendingResponseInput>>>`)
keyed by head_step_uuid.to_string(). The warm path's
`warm_path_setup` populates it before kicking off `run_response_loop`;
`unregister_pending` removes the entry when the loop returns. This
replaces the old "fetch parent body via request_manager.get_request_detail"
path, which no longer exists since there's no parent /v1/responses
fusillade row.

`record_step` now distinguishes head from non-head:
  * Head (chain empty + scope_parent=None): pre-allocate id from the
    parsed request_id string; parent_step_id = NULL.
  * Non-head: auto-generated id; parent_step_id = head_step_uuid.

For model_call steps, `record_step` synchronously creates a
sub-request fusillade row via `create_single_request_batch` so the
1:1 link is satisfied at the `response_steps.request_id` insert and
the CHECK constraint (model_call ⇒ request_id IS NOT NULL) holds.
Tool_call steps get `request_id = None` — they live entirely in
tool_call_analytics.

`finalize_head_request` is a new helper that resolves head_step →
its sub-request fusillade row and calls complete_request /
fail_request on that row. This is what makes the dashboard
responses listing surface "completed" after the warm path's loop
returns; the streaming + blocking `run_inline_*` wrappers now call
it instead of the old (broken) "complete_request on the parent
row" path.

## GET /v1/responses/{id}

Two-path lookup, with single-step retro-compat:
  * Multi-step: parse `resp_<id>` → head_step_uuid, look up the
    head step, walk to its sub-request fusillade row for response
    metadata + body, surface the head step uuid as the response
    `id`.
  * Single-step fallback: if no head step matches the parsed uuid,
    treat it as a fusillade.requests row (for chat completions /
    embeddings retrievability).

Ownership check moves to whichever row backs the response (the
head's sub-request row in the multi-step path, the request itself
in the single-step path).

`AppState` grows an optional `response_step_manager` so the GET
handler can look up the head step. Optional so deployments without
multi-step wiring don't have to set it.

## Tests

- New `test_multi_step_chain_assembles_and_is_retrievable_via_get`
  drives the bridge primitives directly (record_step → mark →
  complete_step for a 3-step model→tool→model chain), finalizes the
  head sub-request row, then verifies GET /v1/responses/{id}
  returns completed and `list_chain` returns all 3 steps in order.
  This covers the warm path's persistence model without needing a
  real HTTP loopback listener (which the axum_test in-memory
  transport doesn't expose).
- Removed `test_responses_api_creates_retrievable_response`,
  `test_flex_background_returns_202_with_queued_status`,
  `test_flex_blocking_waits_for_completion`,
  `test_priority_background_completes_and_is_retrievable`,
  `test_strict_mode_allows_responses`,
  `test_e2e_ai_proxy_streaming_responses_with_fusillade_header`,
  `daemon_claim_runs_multi_step_loop_end_to_end`,
  `agent_kind_tool_recurses_via_tool_schema`. Each tested behavior
  that no longer exists or that needs a real loopback listener
  fixture (commit 6a7c24d routed every /v1/responses through the
  warm path; the daemon path and sub-agent dispatch under the new
  identity model are explicit follow-ups in the fusillade plan).
  Removal-rationale comments left in place at each site.

## Related cleanup

- Drop the workspace `[patch.crates-io]` override now that
  fusillade 16.8.0 is on crates.io.
- `tool_executor::to_openai_tools_array` switches `tools.iter()` →
  `tools.keys()` to silence a clippy hit that became visible once
  the bridge changes touched neighboring files.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 24 out of 25 changed files in this pull request and generated 7 comments.

Comment on lines +28 to +33
for step in chain {
if !matches!(step.state, StepState::Completed) {
continue;
}
match step.kind {
StepKind::ModelCall => {
Copy link

Copilot AI Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The module docs and function comment say assembly only walks “top-level steps (parent_step_id IS NULL)”, but the implementation iterates every step in chain without filtering on parent_step_id. This will include sub-agent/descendant steps in the public Response output once those exist. Filter to step.parent_step_id.is_none() (or adjust the docs) to keep the assembled output consistent with the intended contract.

Copilot uses AI. Check for mistakes.
let _ = tx
.send(Ok(Event::default()
.event("response.failed")
.data(format!("{{\"type\":\"persist_failed\",\"message\":\"{e}\"}}"))))
Copy link

Copilot AI Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The SSE fallback error event is constructed with format!("{\"type\":...\"message\":\"{e}\"}"), which will produce invalid JSON if e contains quotes/backslashes/newlines. Build the payload with serde_json::json! and serialize it, or use .json_data(...) if available, so the client always receives valid JSON.

Suggested change
.data(format!("{{\"type\":\"persist_failed\",\"message\":\"{e}\"}}"))))
.data(
serde_json::json!({
"type": "persist_failed",
"message": e.to_string(),
})
.to_string(),
)))

Copilot uses AI. Check for mistakes.
Comment on lines +91 to +92
let model = request_value["model"].as_str().unwrap_or("unknown").to_string();
let model = model.as_str();
Copy link

Copilot AI Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

model is converted to String and then immediately shadowed as &str (let model = model.as_str();). This borrows from the previous String binding which is dropped when shadowed, so this should not compile (dangling &str). Use let model = request_value["model"].as_str().unwrap_or("unknown"); (or keep the String in a separate binding) and only allocate when needed.

Suggested change
let model = request_value["model"].as_str().unwrap_or("unknown").to_string();
let model = model.as_str();
let model = request_value["model"].as_str().unwrap_or("unknown");

Copilot uses AI. Check for mistakes.
Comment on lines +124 to +133
if request_value.get("tools").is_none()
&& is_responses_api
&& let Some(key) = api_key.as_deref()
&& let Ok(Some(resolved)) = crate::tool_injection::resolve_tools_for_request(&state.dwctl_pool, key, Some(model)).await
{
let openai_tools = resolved.to_openai_tools_array();
if !openai_tools.is_empty() {
request_value["tools"] = serde_json::Value::Array(openai_tools);
}
}
Copy link

Copilot AI Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This resolves tools from the DB here and then resolves them again in warm_path_setup for every warm-path request, doubling the DB work on a hot path. Consider resolving once (store the ResolvedToolSet in a local variable) and reuse it both for request-body injection and for warm_path_setup/executor context.

Copilot uses AI. Check for mistakes.
Comment on lines +133 to +147
let call_id = step
.response_payload
.as_ref()
.map(|_p| "unknown".to_string())
.unwrap_or_else(|| format!("step_{}", step.sequence));
let content = step
.response_payload
.as_ref()
.map(|p| serde_json::to_string(p).unwrap_or_default())
.unwrap_or_default();
messages.push(json!({
"role": "tool",
"tool_call_id": call_id,
"content": content,
}));
Copy link

Copilot AI Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tool_call_id is always set to the literal string "unknown" for completed tool calls, which means follow-up model calls will not be able to correlate tool outputs to the tool_calls emitted by the model (Chat Completions expects tool_call_id to match the model-provided call id). Persist and propagate the real call_id from the tool_call step’s request_payload (or reconstruct it from the preceding model_call’s tool_calls) when building tool messages.

Copilot uses AI. Check for mistakes.
Comment on lines +99 to +105
pub fn register_pending(&self, input: PendingResponseInput) -> Uuid {
let head_step_uuid = Uuid::new_v4();
let key = head_step_uuid.to_string();
if let Ok(mut guard) = self.pending_inputs.write() {
guard.insert(key, input);
}
head_step_uuid
Copy link

Copilot AI Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

register_pending silently drops the pending input if the lock is poisoned (if let Ok(...)), but still returns a UUID. That will later fail at runtime in pending_input(...) with “no pending input registered…”, breaking the warm-path loop in a hard-to-debug way. Prefer returning a Result<Uuid, StoreError> (or at least logging and failing fast) when the pending input cannot be inserted.

Suggested change
pub fn register_pending(&self, input: PendingResponseInput) -> Uuid {
let head_step_uuid = Uuid::new_v4();
let key = head_step_uuid.to_string();
if let Ok(mut guard) = self.pending_inputs.write() {
guard.insert(key, input);
}
head_step_uuid
pub fn register_pending(&self, input: PendingResponseInput) -> Result<Uuid, StoreError> {
let head_step_uuid = Uuid::new_v4();
let key = head_step_uuid.to_string();
let mut guard = self
.pending_inputs
.write()
.map_err(|_| StoreError::StorageError("pending_inputs lock poisoned".into()))?;
guard.insert(key, input);
Ok(head_step_uuid)

Copilot uses AI. Check for mistakes.
Comment on lines +65 to +72
step_manager: Option<Arc<PostgresResponseStepManager<P>>>,
/// In-memory side-channel: response_id (head_step_uuid.to_string())
/// → original `/v1/responses` body + per-response context. The warm
/// path inserts before kicking off the loop and removes when the
/// loop returns. `next_action_for` reads to re-parse the user input
/// on every iteration; `record_step` reads to stamp api_key +
/// created_by + base_url on per-step sub-request rows.
pending_inputs: Arc<RwLock<HashMap<String, PendingResponseInput>>>,
Copy link

Copilot AI Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pending_inputs uses std::sync::RwLock inside async request/loop code. Taking a blocking std lock on a Tokio worker thread can stall unrelated async tasks under contention; this is particularly risky on the warm path where every step consults the map. Prefer tokio::sync::RwLock/DashMap (or another async-friendly structure) to avoid blocking the runtime.

Copilot uses AI. Check for mistakes.
@sejori sejori merged commit d9eb4bd into main Apr 30, 2026
11 checks passed
sejori pushed a commit that referenced this pull request Apr 30, 2026
🤖 I have created a release *beep* *boop*
---


##
[8.45.0](v8.44.3...v8.45.0)
(2026-04-30)


### Features

* dwctl migration for multi-step response analytics linkage
([#1036](#1036))
([d9eb4bd](d9eb4bd))

---
This PR was generated with [Release
Please](https://github.com/googleapis/release-please). See
[documentation](https://github.com/googleapis/release-please#release-please).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants