Skip to content

cortex: implement phase 2 health supervision#305

Merged
jamiepine merged 5 commits intospacedriveapp:mainfrom
vsumner:phase2/cortex-health-supervision
Mar 5, 2026
Merged

cortex: implement phase 2 health supervision#305
jamiepine merged 5 commits intospacedriveapp:mainfrom
vsumner:phase2/cortex-health-supervision

Conversation

@vsumner
Copy link
Contributor

@vsumner vsumner commented Mar 4, 2026

Overview

This PR implements Phase 2: System Health supervision for Cortex, as defined in
docs/design-docs/cortex-implementation.md and the execution plan
docs/design-docs/cortex-phase2-health-supervision-plan.md.

It depends on and should merge after PR #301 ("feat(cortex): Implement Phase 1 ("The Tick Loop")"): #301

What Phase 2 includes

  1. Control plane for cancellation convergence

    • Adds src/agent/process_control.rs with ProcessControlRegistry and detached-worker control records.
    • Adds channel handle indirection (ChannelControlHandle / WeakChannelControlHandle) so cortex can safely request cancellation without owning live tasks.
    • Registry operations are idempotent by design: missing entries are non-fatal (NotFound), already-terminal entries return AlreadyTerminal.
  2. Detached ready-task lifecycle hardening

    • Updates pickup_one_ready_task in src/agent/cortex.rs to supervise with a cancel channel.
    • Uses single-winner terminal transitions with atomic states:
      • ACTIVE (0) -> COMPLETING (1) -> TERMINAL (3)
      • ACTIVE (0) -> KILLING (2) -> TERMINAL (3)
    • Completion and timeout branches both require CAS ownership before terminal side-effects.
    • Ensures control entry rollback on bootstrap/update failure and panic-safe cleanup.
  3. Deterministic bounded cancellation

    • Adds lag-aware tick behavior (lagged_control_since_last_tick): if control bus lag is detected, kill pass is skipped for that tick.
    • Adds bounded cancellation via supervisor_kill_budget_per_tick to avoid mass cancellation bursts.
    • Enforces deterministic oldest-first kill ordering.
  4. Timeout/retry policy for detached workers

    • Implements supervisor_timeout_count and supervisor_timeout_exhausted metadata tracking.
    • Applies retry-to-ready then backlog quarantine behavior using detached_worker_timeout_retry_limit.
    • Updates status and metadata in one TaskStore::update call.
  5. Observe-only breaker behavior

    • Tracks structured failures only:
      • WorkerComplete with success=false
      • structured tool outputs with {"success": false} or {"ok": false}
    • Emits circuit_breaker_tripped observation events with action_taken="observe_only".
    • Resets breaker state on subsequent structured success for the same key.
  6. Channel/branch cancel convergence updates

    • Worker cancel now emits a single terminal ProcessEvent::WorkerComplete payload (result, notify, success).
    • Branch cancel removes branch status, logs terminal run state, and emits synthetic terminal branch result.
  7. Wiring + runtime config exposure

    • Propagates process_control_registry through AgentDeps.
    • Exposes phase-2 controls in runtime config/API wiring.
    • Adds integration coverage for bootstrap cleanup and regression scenarios.

Verification and test guidance

Commands run for this PR

  • cargo test -q --lib detached_worker_completion_takes_priority_when_cancel_signal_and_worker_finish_simultaneously
  • cargo test -q --lib register_detached_worker_for_pickup_unregisters_control_on_task_update_error
  • cargo test -q --test detached_worker_bootstrap
  • cargo test -q --lib
  • cargo fmt --all
  • just preflight
  • just gate-pr

Recommended ongoing validation (aligned with the Phase 2 plan)

  • cargo test --lib agent::cortex::tests::
  • cargo test --lib agent::channel::tests::
  • cargo test --lib agent::status::tests::
  • cargo test --test bulletin
  • cargo test --test context_dump
  • just preflight
  • just gate-pr

Acceptance Criteria

  • Timeout cancellation is deterministic and idempotent with single-winner terminalization.
  • Detached workers are cancellable and do not leak control entries on bootstrap/update failures.
  • Supervisor health enforcement is bounded and deterministic.
  • Kill enforcement is skipped safely under control bus lag to avoid false kills.
  • Retry policy transitions ready -> ready then backlog using metadata counters exactly per limit.
  • Breaker is observe-only (no destructive disablement) and tracks failures only from structured signals.
  • No migration changes are introduced.

Definition of Done

  • Detached control-plane lifecycle supports cancel and unregister semantics with idempotent status.
  • Supervisor loop executes lag-aware, budgeted kill enforcement.
  • Single-winner detached lifecycle prevents double terminal side-effects on completion/timeout races.
  • Timeout count/exhausted metadata and status transitions align with limits in config.
  • Observe-only breaker emits circuit_breaker_tripped events and resets on success.
  • Validation commands and full gate pass pass.

Risks / Rollback

  • Residual risk (low): concurrency-heavy paths are mostly covered by unit/integration tests, but full-system fault-injection (high churn + induced DB/controller faults) is not yet exercised end-to-end.
  • Rollback strategy: this change can be reverted at the commit granularity (cb5a60c) or superseded by a follow-up phase PR; since there are no schema changes, rollback does not require data migration.

Files changed in this PR

  • src/agent/process_control.rs (new)
  • src/agent/cortex.rs
  • src/agent/channel.rs
  • src/agent/status.rs
  • src/agent.rs
  • src/lib.rs
  • src/main.rs
  • src/config.rs
  • src/api/agents.rs
  • src/api/channels.rs
  • src/api/config.rs
  • src/tools/cancel.rs
  • tests/bulletin.rs
  • tests/context_dump.rs
  • tests/detached_worker_bootstrap.rs
  • docs/design-docs/cortex-implementation.md
  • docs/design-docs/cortex-history.md

Notes

  • No schema migration introduced.
  • Phase 2 breaker behavior is intentionally observe-only (no destructive disablement).

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 4, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Introduces a distributed supervision control plane for managing channel and detached worker cancellation/lifecycle across agents, implements comprehensive health supervision tracking in Cortex (Phase 2), adds configuration parameters for health policies (timeout retries, kill budgets), and updates design documentation accordingly.

Changes

Cohort / File(s) Summary
Documentation
docs/content/docs/(configuration)/config.mdx, docs/content/docs/(core)/architecture.mdx, docs/design-docs/cortex-history.md, docs/design-docs/cortex-implementation.md
Updated event schema documentation, added MemorySaved and inter-agent message event types, documented Phase 2 health supervision as implemented with observe-only circuit breakers, kill budgets, and timeout retry logic; updated phase ordering and open questions.
Process Control Plane
src/agent/process_control.rs
New file introducing ProcessControlRegistry for centralized supervision of channels and detached workers; exports ControlActionResult enum, DetachedWorkerControl struct with lifecycle state machine, and registry methods for cancellation with single-winner semantics and handle pruning.
Agent Control Utilities
src/agent.rs
Added panic_payload_to_string helper and new process_control module declaration.
Channel Supervision
src/agent/channel.rs, src/agent/channel_dispatch.rs, src/agent/status.rs
Added ChannelControlHandle and WeakChannelControlHandle for external cancellation; implemented cancel_worker_with_reason and cancel_branch_with_reason on ChannelState; replaced panic payload extraction with centralized helper; added remove_branch method to StatusBlock.
Cortex Health Supervision
src/agent/cortex.rs
Introduced HealthRuntimeState with WorkerTracker, BranchTracker, BreakerState, and KillTarget types; added observe_health_event and run_health_tick methods; implemented detached worker lifecycle tracking via register_detached_worker_for_pickup; expanded Signal enum with BranchStarted, WorkerStarted, WorkerCompleted, ToolStarted, AgentMessageSent/Received, and other health-related variants; tracks latency windows and enforces deterministic kill budgets per tick.
Configuration Types
src/config/types.rs, src/config/toml_schema.rs, src/config/load.rs, src/api/config.rs
Added detached_worker_timeout_retry_limit (u8, default 2) and supervisor_kill_budget_per_tick (usize, default 8) to CortexConfig, TomlCortexConfig, and CortexSection; wired defaults and per-agent overrides through configuration loading paths.
API Integration
src/api/agents.rs, src/api/channels.rs, src/tools/cancel.rs
Added process_control_registry initialization in agent creation; updated cancel endpoints to use reason-aware cancellation methods (cancel_worker_with_reason, cancel_branch_with_reason); added fallback path for detached worker reconciliation.
Runtime Wiring
src/lib.rs, src/main.rs, src/conversation/history.rs
Extended AgentDeps with process_control_registry field; wired channel registration/unregistration in channel lifecycle; enforced minimum event bus capacities; added cancel_running_detached_worker method to ProcessRunLogger for database-level detached worker reconciliation.
Test Coverage
tests/bulletin.rs, tests/context_dump.rs, tests/detached_worker_bootstrap.rs
Updated bootstrap_deps to initialize process_control_registry; added integration test for detached worker bootstrap failure handling and registry cleanup.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested reviewers

  • jamiepine
🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title 'cortex: implement phase 2 health supervision' clearly and concisely describes the main objective of the PR, which is implementing Phase 2 health supervision for the Cortex system as detailed in the description.
Description check ✅ Passed The description is comprehensive and directly related to the changeset, detailing Phase 2 implementation including control plane, detached worker lifecycle, bounded cancellation, timeout policy, circuit breaker behavior, and configuration updates.
Docstring Coverage ✅ Passed Docstring coverage is 99.19% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 11

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/agent/compactor.rs (1)

226-236: ⚠️ Potential issue | 🟠 Major

Compactor still executes an LLM turn directly instead of delegating.

This path builds a Rig agent and runs prompt completion inside compactor flow. The compactor should only monitor thresholds and trigger worker-based compaction execution.

Based on learnings: Applies to src/agent/compactor.rs : The Compactor must NOT be an LLM process; it is a programmatic monitor per channel that watches context size and triggers compaction using tiered thresholds: >80% background, >85% aggressive, >95% emergency truncation.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/agent/compactor.rs` around lines 226 - 236, Current compactor creates a
Rig LLM agent (calls create_cortex_tool_server, constructs ToolServerHandle,
then
AgentBuilder::new(...).preamble(compactor_prompt).default_max_turns(...).tool_server_handle(...).build())
which runs prompt completion; instead, remove the AgentBuilder/LLM construction
and replace it with a programmatic monitor that watches channel context sizes
and triggers worker-based compaction calls using tiered thresholds (>80%
background, >85% aggressive, >95% emergency truncation). Concretely, delete the
create_cortex_tool_server and AgentBuilder usage (compactor_prompt,
default_max_turns, tool_server_handle, build) in this flow and implement a loop
or async task that inspects memory/search metrics via memory_search or
memory_event_tx, applies the three thresholds, and sends compact requests to the
compaction worker (via existing worker API) rather than calling the LLM.
🧹 Nitpick comments (2)
docs/content/docs/(core)/architecture.mdx (1)

104-104: Clarify MemorySaved producer wording to avoid topology ambiguity.

The producer text says memory_save runs in branch/compactor/cortex contexts. Consider clarifying whether this is truly direct compactor emission or compaction-worker mediated, so it stays consistent with your tool-server topology docs.

Based on learnings: "ToolServer topology: per-channel (no memory tools), per-branch (with memory tools), per-worker (task-specific tools), per-cortex (memory_save only)".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/content/docs/`(core)/architecture.mdx at line 104, The Producer entry
"MemorySaved" currently states "memory_save tool (branch/compactor/cortex
contexts)" which is ambiguous about whether emission comes directly from
compactor or via compaction-worker; update the wording to match the ToolServer
topology: indicate that memory_save is emitted only from per-cortex ToolServer
(not per-channel), available in per-branch (with memory tools) and per-worker
for task-specific tools, and clarify that compaction results are sent via the
compaction-worker (not the compactor process itself) if that is how your system
implements it; specifically edit the `MemorySaved` row to reference
`memory_save` and the correct topology nodes (per-cortex, per-branch,
per-worker) and mention compaction-worker-mediated emission where applicable so
it aligns with the ToolServer topology description.
src/config.rs (1)

4792-4797: Add a small smoke regression test for these two new Cortex fields.

Please add one test for defaults propagation and one for agent override propagation so future config refactors don’t silently drop either field.

Based on learnings, in src/config.rs tests prefer smoke tests that do not panic and avoid tight coupling to implementation details.

Also applies to: 5004-5009

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/config.rs` around lines 4792 - 4797, Add two non-panicking smoke tests in
src/config.rs that exercise the new Cortex fields
detached_worker_timeout_retry_limit and supervisor_kill_budget_per_tick: one
test (e.g., test_defaults_propagation_for_cortex_fields) loads a minimal config
that omits these two fields and asserts the parsed Config (or cortex sub-struct)
returns the values from base_defaults.cortex for both fields, and another test
(e.g., test_agent_override_for_cortex_fields) loads a config string that sets
explicit values for detached_worker_timeout_retry_limit and
supervisor_kill_budget_per_tick and asserts those values are preserved after
parsing; keep tests high-level (use public parse/load helper used elsewhere in
this file) and avoid panicking assertions or coupling to private implementation
details.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@docs/content/docs/`(configuration)/config.mdx:
- Around line 502-507: The table is missing two new Phase 2 supervision knobs;
add rows for detached_worker_timeout_retry_limit and
supervisor_kill_budget_per_tick to the [defaults.cortex] config table: ensure
each row lists the key name, Type (integer), the actual Default value taken from
the implementation (don’t invent defaults), and a concise Description—e.g.,
detached_worker_timeout_retry_limit: retry limit for re-queuing/quarantining
detached workers after timeout; supervisor_kill_budget_per_tick: max number of
worker kills the supervisor may perform per maintenance tick—so operators can
discover and tune these behaviors.

In `@docs/design-docs/cortex-history.md`:
- Around line 71-74: The document contains conflicting rollout wording: update
the "Health supervision (Phase 2, implemented)" section and any earlier sentence
that says "only bulletin events are emitted initially" so they consistently
state Phase 2 is implemented and emits the new supervision events
(`worker_killed`, `branch_killed`, `circuit_breaker_tripped`, `health_check`);
replace or remove the earlier phrase about only bulletin events being emitted
initially (or change it to note that Phase 2 now also emits supervision events)
so the design doc has a single, consistent rollout state.

In `@src/agent/channel.rs`:
- Around line 1688-1693: The early return after checking active_branches skips
cleanup for memory_persistence_branches and branch_reply_targets, leaving stale
entries when terminal events are duplicated or canceled; always remove the
branch from those maps regardless of whether active_branches contained it.
Modify the logic around active_branches.remove(branch_id) so you do not return
early — instead, record whether removal happened but proceed to acquire the
write locks for self.state.memory_persistence_branches and
self.state.branch_reply_targets and call .remove(branch_id) on both (using
.write().await as appropriate), then drop locks and return; use the same
branch_id and the state field names (active_branches,
memory_persistence_branches, branch_reply_targets) to locate the code.
- Around line 152-165: The code is silently discarding send errors from
self.deps.event_tx (e.g., where you call
self.deps.event_tx.send(ProcessEvent::WorkerComplete { ... }).ok()), which
violates the guideline to not swallow errors; update these sends (including the
similar send in the WorkerCanceled/other block around the 201-210 range) to
explicitly handle the Result: check for Err(e) and log or propagate it (for
example, using the module logger or tracing with a clear message mentioning
agent_id/channel_id/worker_id and the ProcessEvent variant), rather than calling
.ok(), so failures to deliver synthetic terminal events are visible for
diagnostics.

In `@src/agent/cortex.rs`:
- Around line 271-281: In track_worker_complete, avoid treating missing trackers
as "unknown" and updating the breaker: change the logic around
self.worker_trackers.remove(&worker_id) in track_worker_complete so you
early-return if remove returns None (i.e., no tracker found) and only call
self.update_breaker when you have Some(tracker) to extract tracker.worker_type;
keep the existing !success and threshold.max(1) behavior unchanged. Also
document in the PR summary the race/terminal-state reasoning for this
async/stateful change and run targeted lifecycle tests for WorkerId/worker
lifecycle paths.
- Around line 892-897: The cancel-result handling currently treats only
ControlActionResult::Cancelled and ControlActionResult::AlreadyTerminal as
terminal; update the match in the block that checks `result` (and the analogous
block around the other handling at the second location) to also consider
ControlActionResult::NotFound as terminal so the tracker is pruned instead of
continued; locate the code using the `result` variable and the match against
ControlActionResult variants and add `ControlActionResult::NotFound` to the
terminal set so the tracker cleanup path runs and stale targets are removed.

In `@src/agent/process_control.rs`:
- Around line 102-112: The code removes a channel by key after a failed
Weak::upgrade, which risks TOCTOU if the channel was re-registered; instead,
when upgrade fails compare the stored weak handle you originally read with the
current map value and only remove if they are the same instance (i.e. perform a
conditional/remove-if-equal using the same Weak handle read earlier); locate the
logic around channels.read().await.get(channel_id).cloned(), the local handle
variable, the upgrade() call, and the subsequent
self.channels.write().await.remove(channel_id) and replace unconditional remove
with a compare-and-remove that only deletes when the map still holds the
identical Weak handle you observed.

In `@src/api/config.rs`:
- Around line 588-590: The current direct cast "v as i64" for
cortex.supervisor_kill_budget_per_tick can wrap if usize > i64::MAX; change this
to a checked conversion like i64::try_from(v) (or v.try_into::<i64>()) and only
write table["supervisor_kill_budget_per_tick"] when the conversion succeeds
(matching the pattern used in update_warmup_table), otherwise handle the
overflow case (e.g., skip setting the field or return an error) to avoid
persisting corrupted negative values.

In `@src/config.rs`:
- Around line 4792-4797: The Cortex-related fields (e.g.,
detached_worker_timeout_retry_limit and supervisor_kill_budget_per_tick) are
being merged inline and duplicated; instead implement a single per-subsystem
resolver on the Cortex config struct (add/extend a Cortex::resolve(...) method)
that performs the merge/precedence logic (env > DB > default) and validation
using base_defaults.cortex, and replace all inline merges (including the other
occurrences noted) to call Cortex::resolve so precedence/validation is
centralized and consistent.

In `@src/lib.rs`:
- Around line 267-277: The public factory
create_process_event_buses_with_capacity can panic if control_event_capacity or
memory_event_capacity is 0 because tokio::sync::broadcast::channel(0) is
invalid; clamp or validate both inputs to a minimum of 1 (e.g., let control = if
control_event_capacity == 0 {1} else {control_event_capacity} and same for
memory) and use those adjusted values when constructing the channels so event_tx
and memory_event_tx are always created with capacity >= 1.

In `@src/tools/cancel.rs`:
- Around line 86-87: Normalize blank cancellation reasons by trimming and
treating empty/whitespace-only strings as unset before dispatch: replace the
existing let reason = args.reason.as_deref().unwrap_or("cancelled by tool")
logic with a trimmed-and-filtered lookup that uses "cancelled by tool" when
args.reason is None or consists only of whitespace; apply the same change to the
second occurrence referenced around lines 111-118 so both uses of reason (the
variable derived from args.reason and any downstream match on args.process_type)
uniformly default to "cancelled by tool" when the provided reason is blank.

---

Outside diff comments:
In `@src/agent/compactor.rs`:
- Around line 226-236: Current compactor creates a Rig LLM agent (calls
create_cortex_tool_server, constructs ToolServerHandle, then
AgentBuilder::new(...).preamble(compactor_prompt).default_max_turns(...).tool_server_handle(...).build())
which runs prompt completion; instead, remove the AgentBuilder/LLM construction
and replace it with a programmatic monitor that watches channel context sizes
and triggers worker-based compaction calls using tiered thresholds (>80%
background, >85% aggressive, >95% emergency truncation). Concretely, delete the
create_cortex_tool_server and AgentBuilder usage (compactor_prompt,
default_max_turns, tool_server_handle, build) in this flow and implement a loop
or async task that inspects memory/search metrics via memory_search or
memory_event_tx, applies the three thresholds, and sends compact requests to the
compaction worker (via existing worker API) rather than calling the LLM.

---

Nitpick comments:
In `@docs/content/docs/`(core)/architecture.mdx:
- Line 104: The Producer entry "MemorySaved" currently states "memory_save tool
(branch/compactor/cortex contexts)" which is ambiguous about whether emission
comes directly from compactor or via compaction-worker; update the wording to
match the ToolServer topology: indicate that memory_save is emitted only from
per-cortex ToolServer (not per-channel), available in per-branch (with memory
tools) and per-worker for task-specific tools, and clarify that compaction
results are sent via the compaction-worker (not the compactor process itself) if
that is how your system implements it; specifically edit the `MemorySaved` row
to reference `memory_save` and the correct topology nodes (per-cortex,
per-branch, per-worker) and mention compaction-worker-mediated emission where
applicable so it aligns with the ToolServer topology description.

In `@src/config.rs`:
- Around line 4792-4797: Add two non-panicking smoke tests in src/config.rs that
exercise the new Cortex fields detached_worker_timeout_retry_limit and
supervisor_kill_budget_per_tick: one test (e.g.,
test_defaults_propagation_for_cortex_fields) loads a minimal config that omits
these two fields and asserts the parsed Config (or cortex sub-struct) returns
the values from base_defaults.cortex for both fields, and another test (e.g.,
test_agent_override_for_cortex_fields) loads a config string that sets explicit
values for detached_worker_timeout_retry_limit and
supervisor_kill_budget_per_tick and asserts those values are preserved after
parsing; keep tests high-level (use public parse/load helper used elsewhere in
this file) and avoid panicking assertions or coupling to private implementation
details.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 7b5bb90 and cb5a60c.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock, !**/*.lock
📒 Files selected for processing (26)
  • docs/content/docs/(configuration)/config.mdx
  • docs/content/docs/(core)/architecture.mdx
  • docs/design-docs/cortex-history.md
  • docs/design-docs/cortex-implementation.md
  • src/agent.rs
  • src/agent/channel.rs
  • src/agent/channel_dispatch.rs
  • src/agent/channel_history.rs
  • src/agent/compactor.rs
  • src/agent/cortex.rs
  • src/agent/ingestion.rs
  • src/agent/process_control.rs
  • src/agent/status.rs
  • src/api/agents.rs
  • src/api/channels.rs
  • src/api/config.rs
  • src/config.rs
  • src/lib.rs
  • src/main.rs
  • src/telemetry/registry.rs
  • src/tools.rs
  • src/tools/cancel.rs
  • src/tools/memory_save.rs
  • tests/bulletin.rs
  • tests/context_dump.rs
  • tests/detached_worker_bootstrap.rs

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

♻️ Duplicate comments (7)
src/api/config.rs (1)

588-590: ⚠️ Potential issue | 🟠 Major

Use checked conversion for supervisor_kill_budget_per_tick write

Line 589 uses v as i64 for a usize value; oversized values can overflow/wrap and persist an invalid negative TOML integer.

💡 Proposed fix
     if let Some(v) = cortex.supervisor_kill_budget_per_tick {
-        table["supervisor_kill_budget_per_tick"] = toml_edit::value(v as i64);
+        table["supervisor_kill_budget_per_tick"] =
+            toml_edit::value(i64::try_from(v).map_err(|_| StatusCode::BAD_REQUEST)?);
     }

Verification (read-only): confirm the field type is Option<usize> and the current write path uses an unchecked cast.

#!/bin/bash
set -euo pipefail

rg -n -C2 'supervisor_kill_budget_per_tick' --type rust
rg -n -C2 'table\["supervisor_kill_budget_per_tick"\].*as i64' --type rust
rg -n -C2 'i64::try_from\(.*supervisor_kill_budget_per_tick|try_from\(v\)' --type rust
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/config.rs` around lines 588 - 590, The code writes a usize into TOML
using an unchecked cast (v as i64) which can overflow; change the write to
perform a checked conversion from usize to i64 (e.g. use i64::try_from(v) or
similar) and handle the failure path instead of casting: in the block
referencing cortex.supervisor_kill_budget_per_tick and
table["supervisor_kill_budget_per_tick"], attempt a try_from conversion, and on
Err either return an error/result or skip/log the invalid value so you never
insert a wrapped negative integer into the TOML table.
src/agent/cortex.rs (2)

271-281: ⚠️ Potential issue | 🟡 Minor

Avoid counting missing worker trackers as failures.

At Line 272, falling back to worker_type:unknown means late/duplicate WorkerComplete events can still increment breaker failures and produce false trip signals. Early-return when no tracker is present.

Proposed fix
 fn track_worker_complete(&mut self, worker_id: WorkerId, success: bool, threshold: u8) {
-    let worker_type = self
-        .worker_trackers
-        .remove(&worker_id)
-        .map(|tracker| tracker.worker_type)
-        .unwrap_or_else(|| "unknown".to_string());
+    let Some(worker_type) = self
+        .worker_trackers
+        .remove(&worker_id)
+        .map(|tracker| tracker.worker_type)
+    else {
+        return;
+    };
     self.update_breaker(
         format!("worker_type:{worker_type}"),
         !success,
         threshold.max(1),
     );
 }

Based on learnings "For changes in async/stateful paths (worker lifecycle, cancellation, retrigger, recall cache behavior), include explicit race/terminal-state reasoning in the PR summary and run targeted tests in addition to just gate-pr".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/agent/cortex.rs` around lines 271 - 281, In track_worker_complete, avoid
treating missing tracker entries as failures: change the logic that calls
self.worker_trackers.remove(&worker_id).map(...).unwrap_or_else(...) to
early-return when remove returns None (i.e., if no tracker exists for the given
WorkerId), so you do not call update_breaker with a fabricated "unknown"
worker_type; ensure the function returns immediately for missing trackers and
only calls update_breaker when a real tracker (and its worker_type) was removed.

892-897: ⚠️ Potential issue | 🟠 Major

Treat ControlActionResult::NotFound as terminal for local tracker cleanup.

At Line 892, NotFound currently continues and leaves the local tracker intact, so the same stale target is retried on later ticks and keeps consuming kill budget.

Proposed fix
-            if !matches!(
-                result,
-                ControlActionResult::Cancelled | ControlActionResult::AlreadyTerminal
-            ) {
+            if matches!(result, ControlActionResult::NotFound) {
+                match &target {
+                    KillTarget::Worker(tracker) => cancelled_worker_ids.push(tracker.worker_id),
+                    KillTarget::Branch(tracker) => cancelled_branch_ids.push(tracker.branch_id),
+                }
+                continue;
+            }
+
+            if !matches!(
+                result,
+                ControlActionResult::Cancelled | ControlActionResult::AlreadyTerminal
+            ) {
                 continue;
             }

Also applies to: 930-938

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/agent/cortex.rs` around lines 892 - 897, The current checks treat
ControlActionResult::NotFound as non-terminal and thus skip local tracker
cleanup; update the conditional(s) that currently match only Cancelled |
AlreadyTerminal to also include ControlActionResult::NotFound so NotFound is
handled as terminal and triggers the same cleanup logic; apply this change for
both occurrences of this pattern (the block using the matches! check around
ControlActionResult and the similar block at the other location mentioned) so
the local tracker is cleared and stale targets stop being retried.
src/lib.rs (1)

267-277: ⚠️ Potential issue | 🟡 Minor

Guard zero capacities in bus factory to avoid runtime panic.

At Line 274 and Line 276, tokio::sync::broadcast::channel will panic if capacity is 0. Since this helper is public, clamp/validate both capacities to at least 1.

Suggested patch
 pub fn create_process_event_buses_with_capacity(
     control_event_capacity: usize,
     memory_event_capacity: usize,
 ) -> (
     tokio::sync::broadcast::Sender<ProcessEvent>,
     tokio::sync::broadcast::Sender<ProcessEvent>,
 ) {
-    let (event_tx, _event_rx) = tokio::sync::broadcast::channel(control_event_capacity);
-    let (memory_event_tx, _memory_event_rx) =
-        tokio::sync::broadcast::channel(memory_event_capacity);
+    let control_event_capacity = control_event_capacity.max(1);
+    let memory_event_capacity = memory_event_capacity.max(1);
+    let (event_tx, _event_rx) = tokio::sync::broadcast::channel(control_event_capacity);
+    let (memory_event_tx, _memory_event_rx) =
+        tokio::sync::broadcast::channel(memory_event_capacity);
     (event_tx, memory_event_tx)
 }
#!/bin/bash
# Verify direct zero-capacity call sites and similar raw broadcast usage.
rg -nP --type rust 'create_process_event_buses_with_capacity\s*\(\s*0\s*,|create_process_event_buses_with_capacity\s*\([^,]+,\s*0\s*\)'
rg -nP --type rust 'broadcast::channel\s*\(\s*0\s*\)'
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/lib.rs` around lines 267 - 277, Public function
create_process_event_buses_with_capacity passes capacities directly to
tokio::sync::broadcast::channel which panics on 0; clamp or validate both
control_event_capacity and memory_event_capacity to be at least 1 (e.g., compute
local_control = control_event_capacity.max(1) and local_memory =
memory_event_capacity.max(1)) and use those when calling
tokio::sync::broadcast::channel to prevent runtime panic.
src/agent/channel.rs (2)

1689-1691: ⚠️ Potential issue | 🟡 Minor

Don’t return before branch-side maps are cleaned up.

The early return on Line 1689-Line 1691 skips cleanup of memory_persistence_branches and branch_reply_targets, leaving stale entries on duplicate/cancel convergence paths.

Suggested fix
                 let mut branches = self.state.active_branches.write().await;
-                if branches.remove(branch_id).is_none() {
-                    return Ok(());
-                };
+                let removed_active = branches.remove(branch_id).is_some();
                 drop(branches);
 
+                if !removed_active {
+                    self.memory_persistence_branches.remove(branch_id);
+                    self.branch_reply_targets.remove(branch_id);
+                    return Ok(());
+                }
+
                 run_logger.log_branch_completed(*branch_id, conclusion);
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/agent/channel.rs` around lines 1689 - 1691, The code currently returns
immediately when branches.remove(branch_id).is_none(), which skips cleanup of
memory_persistence_branches and branch_reply_targets; instead, call
branches.remove(branch_id) and capture its result (e.g., let removed =
branches.remove(branch_id)), then always perform
memory_persistence_branches.remove(branch_id) and
branch_reply_targets.remove(branch_id) to clear stale entries, and finally
return Ok(()) or proceed based on whether removed was Some. Ensure you reference
the existing branches.remove(branch_id) call and the maps
memory_persistence_branches and branch_reply_targets when making the change.

155-164: ⚠️ Potential issue | 🟡 Minor

Handle synthetic event send failures instead of dropping them silently.

event_tx.send(...).ok() on Line 164 and Line 209 suppresses cancellation-convergence failures, which makes diagnostics hard when receivers are unavailable.

Suggested fix
-        self.deps
-            .event_tx
-            .send(ProcessEvent::WorkerComplete {
+        if let Err(error) = self.deps.event_tx.send(ProcessEvent::WorkerComplete {
                 agent_id: self.deps.agent_id.clone(),
                 worker_id,
                 channel_id: Some(self.channel_id.clone()),
                 result,
                 notify: true,
                 success: false,
-            })
-            .ok();
+            }) {
+            tracing::warn!(
+                %error,
+                channel_id = %self.channel_id,
+                worker_id = %worker_id,
+                "failed to emit synthetic worker completion event"
+            );
+        }
@@
-        self.deps
-            .event_tx
-            .send(ProcessEvent::BranchResult {
+        if let Err(error) = self.deps.event_tx.send(ProcessEvent::BranchResult {
                 agent_id: self.deps.agent_id.clone(),
                 branch_id,
                 channel_id: self.channel_id.clone(),
                 conclusion,
-            })
-            .ok();
+            }) {
+            tracing::warn!(
+                %error,
+                channel_id = %self.channel_id,
+                branch_id = %branch_id,
+                "failed to emit synthetic branch result event"
+            );
+        }
As per coding guidelines "`src/**/*.rs`: Don't silently discard errors; use `let _ =` only on channel sends where the receiver may be dropped; handle, log, or propagate all other errors".

Also applies to: 201-209

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/agent/channel.rs` around lines 155 - 164, The send of
ProcessEvent::WorkerComplete via event_tx currently calls .ok(), silently
dropping send errors; replace that with explicit error handling: capture the
Result from event_tx.send(...) and on Err(e) log or otherwise surface the
failure (include agent_id self.deps.agent_id, worker_id,
channel_id/self.channel_id, and result in the log) so diagnostics show why the
synthetic event failed; do the same change for the other send site that
currently uses .ok() (the duplicate at the other WorkerComplete send) and ensure
you use the crate's logging/tracing facility consistently.
src/agent/process_control.rs (1)

107-112: ⚠️ Potential issue | 🟠 Major

Avoid key-based removal after failed weak upgrade (TOCTOU).

On Line 110 and Line 129, removing by channel_id after upgrade() fails can delete a newly re-registered live handle if registration races with this path.

Suggested fix
         if let Some(handle) = handle.upgrade() {
             handle.cancel_worker_with_reason(worker_id, reason).await
         } else {
-            self.channels.write().await.remove(channel_id);
             ControlActionResult::NotFound
         }
@@
         if let Some(handle) = handle.upgrade() {
             handle.cancel_branch_with_reason(branch_id, reason).await
         } else {
-            self.channels.write().await.remove(channel_id);
             ControlActionResult::NotFound
         }

Also applies to: 126-130

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/agent/process_control.rs` around lines 107 - 112, The code removes the
channel by channel_id after a failed handle.upgrade(), which creates a TOCTOU
race that can remove a newly-registered handle; change the removal to only
remove when the stored entry still corresponds to the same (stale) weak handle:
read the map entry (self.channels) and compare the stored Weak/Arc identity with
the Weak handle you attempted to upgrade (e.g., compare pointer or use
Arc::ptr_eq after upgrading a clone or store a unique id), and only call
remove(channel_id) if they match; otherwise return
ControlActionResult::NotFound. Apply the same guarded-removal logic around
cancel_worker_with_reason/upgrade failure sites instead of unconditionally
calling self.channels.write().await.remove(channel_id).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@docs/content/docs/`(core)/architecture.mdx:
- Line 104: The table row for MemorySaved incorrectly lists the compactor
context for the `memory_save` tool; remove "compactor" and restrict producers to
the actual tool-hosting contexts per guidelines (e.g., per-branch and per-cortex
where `memory_save` runs), leaving only `branch`/`cortex` (and/or `worker` if
applicable) as producers for `MemorySaved`; update the table entry for
`MemorySaved` (`memory_save`) accordingly and ensure this aligns with the
ToolServer topology described in `src/tools/**/*.rs` (per-channel: no memory
tools, per-branch: memory tools, per-worker: task-specific, per-cortex:
memory_save only).

In `@docs/design-docs/cortex-implementation.md`:
- Around line 86-89: The fenced code block containing the state transitions "0
active -> 1 completing -> 3 terminal / 0 active -> 2 killing    -> 3 terminal"
is unlabeled and triggers markdownlint MD040; fix it by adding a language
identifier (for example "text") to the opening triple backticks of that code
fence so it becomes ```text, ensuring the block is explicitly labeled.

In `@src/agent/cortex.rs`:
- Around line 2333-2388: The code calls task_store.update(...) and assigns
db_updated but proceeds to emit terminal side-effects
(run_logger.log_worker_completed, logger.log "task_pickup_completed",
notify_delegation_completion, and event_tx.send(ProcessEvent::WorkerComplete))
unconditionally; change the flow so that these terminal actions only run when
db_updated.is_ok() (i.e., move run_logger.log_worker_completed, logger.log(...
"task_pickup_completed"), notify_delegation_completion(...).await, and the
WorkerComplete event send into the db_updated.is_ok() branch), and when
db_updated.is_err() emit a non-terminal/failure event or compensating rollback
path (e.g., send a TaskUpdated/WorkerFailed event or log and avoid publishing
terminal completion) so DB state and runtime events remain consistent; locate
the change around the task_store.update(...) call, the db_updated variable,
run_logger.log_worker_completed, notify_delegation_completion, logger.log, and
the event_tx.send(ProcessEvent::WorkerComplete) calls.

---

Duplicate comments:
In `@src/agent/channel.rs`:
- Around line 1689-1691: The code currently returns immediately when
branches.remove(branch_id).is_none(), which skips cleanup of
memory_persistence_branches and branch_reply_targets; instead, call
branches.remove(branch_id) and capture its result (e.g., let removed =
branches.remove(branch_id)), then always perform
memory_persistence_branches.remove(branch_id) and
branch_reply_targets.remove(branch_id) to clear stale entries, and finally
return Ok(()) or proceed based on whether removed was Some. Ensure you reference
the existing branches.remove(branch_id) call and the maps
memory_persistence_branches and branch_reply_targets when making the change.
- Around line 155-164: The send of ProcessEvent::WorkerComplete via event_tx
currently calls .ok(), silently dropping send errors; replace that with explicit
error handling: capture the Result from event_tx.send(...) and on Err(e) log or
otherwise surface the failure (include agent_id self.deps.agent_id, worker_id,
channel_id/self.channel_id, and result in the log) so diagnostics show why the
synthetic event failed; do the same change for the other send site that
currently uses .ok() (the duplicate at the other WorkerComplete send) and ensure
you use the crate's logging/tracing facility consistently.

In `@src/agent/cortex.rs`:
- Around line 271-281: In track_worker_complete, avoid treating missing tracker
entries as failures: change the logic that calls
self.worker_trackers.remove(&worker_id).map(...).unwrap_or_else(...) to
early-return when remove returns None (i.e., if no tracker exists for the given
WorkerId), so you do not call update_breaker with a fabricated "unknown"
worker_type; ensure the function returns immediately for missing trackers and
only calls update_breaker when a real tracker (and its worker_type) was removed.
- Around line 892-897: The current checks treat ControlActionResult::NotFound as
non-terminal and thus skip local tracker cleanup; update the conditional(s) that
currently match only Cancelled | AlreadyTerminal to also include
ControlActionResult::NotFound so NotFound is handled as terminal and triggers
the same cleanup logic; apply this change for both occurrences of this pattern
(the block using the matches! check around ControlActionResult and the similar
block at the other location mentioned) so the local tracker is cleared and stale
targets stop being retried.

In `@src/agent/process_control.rs`:
- Around line 107-112: The code removes the channel by channel_id after a failed
handle.upgrade(), which creates a TOCTOU race that can remove a newly-registered
handle; change the removal to only remove when the stored entry still
corresponds to the same (stale) weak handle: read the map entry (self.channels)
and compare the stored Weak/Arc identity with the Weak handle you attempted to
upgrade (e.g., compare pointer or use Arc::ptr_eq after upgrading a clone or
store a unique id), and only call remove(channel_id) if they match; otherwise
return ControlActionResult::NotFound. Apply the same guarded-removal logic
around cancel_worker_with_reason/upgrade failure sites instead of
unconditionally calling self.channels.write().await.remove(channel_id).

In `@src/api/config.rs`:
- Around line 588-590: The code writes a usize into TOML using an unchecked cast
(v as i64) which can overflow; change the write to perform a checked conversion
from usize to i64 (e.g. use i64::try_from(v) or similar) and handle the failure
path instead of casting: in the block referencing
cortex.supervisor_kill_budget_per_tick and
table["supervisor_kill_budget_per_tick"], attempt a try_from conversion, and on
Err either return an error/result or skip/log the invalid value so you never
insert a wrapped negative integer into the TOML table.

In `@src/lib.rs`:
- Around line 267-277: Public function create_process_event_buses_with_capacity
passes capacities directly to tokio::sync::broadcast::channel which panics on 0;
clamp or validate both control_event_capacity and memory_event_capacity to be at
least 1 (e.g., compute local_control = control_event_capacity.max(1) and
local_memory = memory_event_capacity.max(1)) and use those when calling
tokio::sync::broadcast::channel to prevent runtime panic.

ℹ️ Review info

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between cb5a60c and 2165f99.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock, !**/*.lock
📒 Files selected for processing (26)
  • docs/content/docs/(configuration)/config.mdx
  • docs/content/docs/(core)/architecture.mdx
  • docs/design-docs/cortex-history.md
  • docs/design-docs/cortex-implementation.md
  • src/agent.rs
  • src/agent/channel.rs
  • src/agent/channel_dispatch.rs
  • src/agent/channel_history.rs
  • src/agent/compactor.rs
  • src/agent/cortex.rs
  • src/agent/ingestion.rs
  • src/agent/process_control.rs
  • src/agent/status.rs
  • src/api/agents.rs
  • src/api/channels.rs
  • src/api/config.rs
  • src/config.rs
  • src/lib.rs
  • src/main.rs
  • src/telemetry/registry.rs
  • src/tools.rs
  • src/tools/cancel.rs
  • src/tools/memory_save.rs
  • tests/bulletin.rs
  • tests/context_dump.rs
  • tests/detached_worker_bootstrap.rs
🚧 Files skipped from review as they are similar to previous changes (9)
  • src/agent.rs
  • tests/detached_worker_bootstrap.rs
  • docs/content/docs/(configuration)/config.mdx
  • src/config.rs
  • src/api/channels.rs
  • src/agent/status.rs
  • src/tools/cancel.rs
  • docs/design-docs/cortex-history.md
  • src/agent/channel_dispatch.rs

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/agent/channel.rs (1)

1705-1746: ⚠️ Potential issue | 🟠 Major

Preserve branch_reply_targets value before using retrigger metadata.

At Line 1706, the entry is removed unconditionally, so the later lookup at Line 1742 always returns None. This drops reply-thread metadata for regular branch completions.

🐛 Suggested fix
-                let was_memory_persistence = self.memory_persistence_branches.remove(branch_id);
-                let _ = self.branch_reply_targets.remove(branch_id);
+                let was_memory_persistence = self.memory_persistence_branches.remove(branch_id);
+                let branch_reply_target = self.branch_reply_targets.remove(branch_id);
@@
-                    if let Some(message_id) = self.branch_reply_targets.remove(branch_id) {
+                    if let Some(message_id) = branch_reply_target {
                         retrigger_metadata.insert(
                             crate::metadata_keys::REPLY_TO_MESSAGE_ID.to_string(),
                             serde_json::Value::from(message_id),
                         );
                     }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/agent/channel.rs` around lines 1705 - 1746, The code removes the
branch_reply_targets entry unconditionally
(self.branch_reply_targets.remove(branch_id)) before it’s later needed for
retrigger metadata, causing the lookup to return None; change the logic in the
branch completion path to first retrieve/clone the reply target (e.g., let
reply_target = self.branch_reply_targets.get(branch_id).cloned()) and use that
value to insert REPLY_TO_MESSAGE_ID into retrigger_metadata for regular branches
(identifiers: branch_reply_targets, branch_id, pending_results,
retrigger_metadata), then remove the map entry only after it has been consumed
(or remove it conditionally using the cloned value).
🧹 Nitpick comments (1)
src/api/config.rs (1)

839-863: Make the overflow test architecture-aware.

At Line 854, usize::MAX only overflows i64 on 64-bit targets. On 32-bit targets, this test can fail even though the implementation is correct.

♻️ Suggested tweak
 #[test]
 fn test_update_cortex_table_rejects_large_usize_value() {
+    // On 32-bit targets, usize always fits into i64.
+    if usize::BITS <= 63 {
+        return;
+    }
+
     let mut doc: toml_edit::DocumentMut = r#"
 [[agents]]
 id = "main"
 "#
@@
+    let overflowing = usize::try_from(i64::MAX)
+        .expect("usize should hold i64::MAX on 64-bit targets")
+        .checked_add(1)
+        .expect("expected overflowing usize value");
+
     let update = CortexUpdate {
@@
-        supervisor_kill_budget_per_tick: Some(usize::MAX),
+        supervisor_kill_budget_per_tick: Some(overflowing),
@@
     let result = update_cortex_table(&mut doc, agent_idx, &update);
     assert_eq!(result, Err(StatusCode::BAD_REQUEST));
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/api/config.rs` around lines 839 - 863, The test
test_update_cortex_table_rejects_large_usize_value assumes usize::MAX overflows
i64 on all targets; make it architecture-aware by skipping or gating the test on
targets where usize is narrower than 64 bits. Update the test (involving
CortexUpdate and the call to update_cortex_table) to either use
#[cfg(target_pointer_width = "64")] or perform a runtime check like if
usize::BITS < 64 { return; } before constructing the overflow value (usize::MAX)
so the assertion only runs on platforms where usize can exceed i64::MAX.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/agent/cortex.rs`:
- Around line 2651-2692: In the Ok(None) and Err(update_error) branches of the
timeout update handling, emit a WorkerComplete event with success=false
(matching how other error/panic paths do) so health tracking remains consistent;
call the same event-emission logic used elsewhere to create WorkerComplete for
the task (include task.task_number, worker_id, success=false and any relevant
metadata like supervisor_timeout_count/ supervisor_timeout_exhausted/
retry_limit) immediately before or after the logger.log calls in those branches
to mirror error handling.
- Around line 2433-2450: When requeue_result is Err in the worker-failure
handling (the block that currently logs task_pickup_failed_to_persist and
tracing::warn with update_error), also emit a WorkerComplete event with
success=false for that worker/task so HealthRuntimeState.worker_trackers is
updated immediately; use the same event shape/codepath as the success/failure
persistence branch (the WorkerComplete emission used in the success path around
task pickup failure handling) and include the worker_id and task_number and the
error info. Apply the same change in the panic handling path (the branch around
requeue on panic where lines ~2511-2527 log the failure) so both requeue failure
and panic-requeue failure always send WorkerComplete(success=false) before
returning. Ensure the emitted event is consistent with existing WorkerComplete
usage so downstream cleanup logic sees the terminal state.

---

Outside diff comments:
In `@src/agent/channel.rs`:
- Around line 1705-1746: The code removes the branch_reply_targets entry
unconditionally (self.branch_reply_targets.remove(branch_id)) before it’s later
needed for retrigger metadata, causing the lookup to return None; change the
logic in the branch completion path to first retrieve/clone the reply target
(e.g., let reply_target = self.branch_reply_targets.get(branch_id).cloned()) and
use that value to insert REPLY_TO_MESSAGE_ID into retrigger_metadata for regular
branches (identifiers: branch_reply_targets, branch_id, pending_results,
retrigger_metadata), then remove the map entry only after it has been consumed
(or remove it conditionally using the cloned value).

---

Nitpick comments:
In `@src/api/config.rs`:
- Around line 839-863: The test
test_update_cortex_table_rejects_large_usize_value assumes usize::MAX overflows
i64 on all targets; make it architecture-aware by skipping or gating the test on
targets where usize is narrower than 64 bits. Update the test (involving
CortexUpdate and the call to update_cortex_table) to either use
#[cfg(target_pointer_width = "64")] or perform a runtime check like if
usize::BITS < 64 { return; } before constructing the overflow value (usize::MAX)
so the assertion only runs on platforms where usize can exceed i64::MAX.

ℹ️ Review info
Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 5e3f432e-9f12-4106-8108-5b1ab0c5a4ea

📥 Commits

Reviewing files that changed from the base of the PR and between 2165f99 and 454774a.

📒 Files selected for processing (10)
  • docs/content/docs/(configuration)/config.mdx
  • docs/content/docs/(core)/architecture.mdx
  • docs/design-docs/cortex-history.md
  • docs/design-docs/cortex-implementation.md
  • src/agent/channel.rs
  • src/agent/cortex.rs
  • src/agent/process_control.rs
  • src/api/config.rs
  • src/lib.rs
  • src/tools/cancel.rs
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/tools/cancel.rs
  • docs/content/docs/(core)/architecture.mdx

@vsumner vsumner force-pushed the phase2/cortex-health-supervision branch from 454774a to 152a387 Compare March 5, 2026 02:29
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/agent/channel.rs`:
- Around line 2164-2173: The code removes branch_reply_targets immediately then
later reads it when building retrigger metadata, so preserve the reply target
before removal: in the block handling branch removal (the one that computes
was_active, was_memory_persistence and calls
self.branch_reply_targets.remove(branch_id)), fetch and clone or take the reply
target from self.branch_reply_targets (e.g., via get or remove_into_temp) into a
local variable (using branch_id as key) before any removal, use that preserved
value when constructing retrigger metadata, and only remove from
branch_reply_targets once you no longer need the value; apply the same fix to
the other removal site around where was_active is computed (the similar block at
the other location mentioned).

In `@src/api/channels.rs`:
- Around line 294-297: The code currently collapses the Result from
channel_state.cancel_worker_with_reason(worker_id, "cancelled via API").await
using .is_ok(), which discards error details; change this to explicitly match or
use .map_err/.inspect_err so you handle different error cases (e.g., NotFound vs
transient failure), log or return the error from cancel_worker_with_reason when
it fails, and only proceed to fallback reconciliation when the cancel call
returns a NotFound/worker-missing variant; reference cancel_worker_with_reason,
channel_state, and worker_id when updating the control flow to propagate or log
errors instead of using .is_ok().

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 8d14a55e-6397-482c-a9d4-2731f80fee23

📥 Commits

Reviewing files that changed from the base of the PR and between 454774a and 152a387.

📒 Files selected for processing (31)
  • docs/content/docs/(configuration)/config.mdx
  • docs/content/docs/(core)/architecture.mdx
  • docs/design-docs/cortex-history.md
  • docs/design-docs/cortex-implementation.md
  • src/agent.rs
  • src/agent/channel.rs
  • src/agent/channel_dispatch.rs
  • src/agent/channel_history.rs
  • src/agent/compactor.rs
  • src/agent/cortex.rs
  • src/agent/ingestion.rs
  • src/agent/invariant_harness.rs
  • src/agent/process_control.rs
  • src/agent/status.rs
  • src/api/agents.rs
  • src/api/channels.rs
  • src/api/config.rs
  • src/api/state.rs
  • src/api/system.rs
  • src/config/load.rs
  • src/config/toml_schema.rs
  • src/config/types.rs
  • src/lib.rs
  • src/main.rs
  • src/telemetry/registry.rs
  • src/tools.rs
  • src/tools/cancel.rs
  • src/tools/memory_save.rs
  • tests/bulletin.rs
  • tests/context_dump.rs
  • tests/detached_worker_bootstrap.rs
🚧 Files skipped from review as they are similar to previous changes (5)
  • tests/bulletin.rs
  • src/agent/status.rs
  • docs/content/docs/(configuration)/config.mdx
  • tests/detached_worker_bootstrap.rs
  • src/agent/compactor.rs

@vsumner vsumner force-pushed the phase2/cortex-health-supervision branch from 152a387 to 2627c2d Compare March 5, 2026 02:55
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (2)
src/agent/cortex.rs (1)

2461-2487: ⚠️ Potential issue | 🟠 Major

Unify terminal-event behavior when task persistence fails.

The success path persistence failure (Line 2461+) emits WorkerComplete(success=false), but other persistence-failure branches (Line 2545+, Line 2626+, Line 2766+) do not emit terminal completion. This creates split-brain semantics between runtime events and DB state.

Please pick one consistent policy (recommended: only emit terminal completion after a persisted terminal/requeue transition, with rollback/compensation on failure).

Based on learnings "For changes in async/stateful paths (worker lifecycle, cancellation, retrigger, recall cache behavior), include explicit race/terminal-state reasoning in the PR summary and run targeted tests in addition to just gate-pr".

Also applies to: 2545-2563, 2626-2643, 2766-2807

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/agent/cortex.rs` around lines 2461 - 2487, The code currently sends a
terminal ProcessEvent::WorkerComplete (via event_tx.send(...)) when db_updated
Err occurs in the success-path block (using db_updated,
run_logger.log_worker_completed, logger.log, agent_id, worker_id, result_text),
but other persistence-failure branches do not emit the terminal event; unify
behavior by changing all persistence-failure branches to NOT emit WorkerComplete
immediately and instead only send ProcessEvent::WorkerComplete after a
successful persisted terminal or requeue transition; implement
rollback/compensation logic on persistence failure (e.g., revert in-memory
success markers, undo run_logger.log_worker_completed side-effects, and log the
failure with logger.log) so the system retains a single source-of-truth and only
emits terminal events when persistence succeeds, updating the blocks that
currently call event_tx.send(ProcessEvent::WorkerComplete { ... }) to follow
this unified policy.
src/agent/channel.rs (1)

2164-2173: ⚠️ Potential issue | 🟠 Major

Preserve reply target before branch cleanup.

At Line 2172, branch_reply_targets.remove(branch_id) runs before the active-branch branch completes, then Line 2208 tries to remove again for retrigger metadata. The second lookup is always None, so reply-target metadata is lost for normal branch completions.

💡 Suggested fix
-                let was_memory_persistence = self.memory_persistence_branches.remove(branch_id);
-                let _ = self.branch_reply_targets.remove(branch_id);
+                let was_memory_persistence = self.memory_persistence_branches.remove(branch_id);
+                let reply_target_message_id = self.branch_reply_targets.remove(branch_id);
@@
-                    if let Some(message_id) = self.branch_reply_targets.remove(branch_id) {
+                    if let Some(message_id) = reply_target_message_id {
                         retrigger_metadata.insert(
                             crate::metadata_keys::REPLY_TO_MESSAGE_ID.to_string(),
                             serde_json::Value::from(message_id),
                         );
                     }

Also applies to: 2208-2213

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/agent/channel.rs` around lines 2164 - 2173, The branch reply-target is
removed too early: instead of calling
self.branch_reply_targets.remove(branch_id) before completing active-branch
cleanup (which causes the later removal at the retrigger/metadata path to find
None), first fetch/clone the reply-target entry (e.g., via a get/clone or take
into a local variable) and only remove it once you actually need to drop it;
specifically, in the block that manipulates self.state.active_branches and
self.memory_persistence_branches preserve the reply target from
self.branch_reply_targets into a local variable and use that preserved value for
any retrigger/metadata logic later (or defer the remove to the later cleanup),
so both the normal branch completion and the retrigger path can access the
reply-target. Ensure changes reference self.branch_reply_targets,
self.state.active_branches, and self.memory_persistence_branches and adjust both
the early removal site and the later removal at the retrigger/metadata handling
so the reply-target isn’t lost.
🧹 Nitpick comments (1)
src/agent/process_control.rs (1)

96-120: Collapse the weak-handle double-upgrade into a single upgrade flow.

Line 108/116 and Line 135/143 do two separate upgrade() calls. If the handle drops between them, cancellation can return NotFound after already passing a liveness check. A single-upgrade path is simpler and tighter.

Refactor sketch
 pub async fn cancel_channel_worker(
@@
-        let handle = {
+        let handle = {
             let mut channels = self.channels.write().await;
-            let Some(handle) = channels.get(channel_id).cloned() else {
+            let Some(handle) = channels.get(channel_id).cloned() else {
                 return ControlActionResult::NotFound;
             };
-
-            if handle.upgrade().is_none() {
-                channels.remove(channel_id);
-                return ControlActionResult::NotFound;
-            }
-
-            handle
+            match handle.upgrade() {
+                Some(handle) => handle,
+                None => {
+                    channels.remove(channel_id);
+                    return ControlActionResult::NotFound;
+                }
+            }
         };
-
-        if let Some(handle) = handle.upgrade() {
-            handle.cancel_worker_with_reason(worker_id, reason).await
-        } else {
-            ControlActionResult::NotFound
-        }
+        handle.cancel_worker_with_reason(worker_id, reason).await
     }

 pub async fn cancel_channel_branch(
@@
-        let handle = {
+        let handle = {
             let mut channels = self.channels.write().await;
             let Some(handle) = channels.get(channel_id).cloned() else {
                 return ControlActionResult::NotFound;
             };
-
-            if handle.upgrade().is_none() {
-                channels.remove(channel_id);
-                return ControlActionResult::NotFound;
-            }
-
-            handle
+            match handle.upgrade() {
+                Some(handle) => handle,
+                None => {
+                    channels.remove(channel_id);
+                    return ControlActionResult::NotFound;
+                }
+            }
         };
-
-        let Some(handle) = handle.upgrade() else {
-            return ControlActionResult::NotFound;
-        };
-
         handle.cancel_branch_with_reason(branch_id, reason).await
     }

Also applies to: 123-148

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/agent/process_control.rs` around lines 96 - 120, The double call to
Weak::upgrade() in cancel_channel_worker can race: upgrade() is called inside
the write lock to check liveness and then called again after the lock, allowing
the handle to drop between calls; instead perform a single upgrade while holding
the lock and keep the resulting Arc for use outside the lock. Concretely, in
cancel_channel_worker fetch the Weak from self.channels.write().await, call
weak.upgrade() once, return ControlActionResult::NotFound and remove the map
entry if upgrade() returns None, otherwise take the resulting Arc (strong
handle) out of the lock scope and then call
handle.cancel_worker_with_reason(worker_id, reason).await; apply the same
single-upgrade refactor to the similar code in the other region mentioned (lines
123-148).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/agent/cortex.rs`:
- Around line 941-979: The current block handles ControlActionResult::Cancelled,
::AlreadyTerminal, and ::NotFound the same way and emits
"worker_killed"/"branch_killed" logs and increments kill_actions even for
AlreadyTerminal/NotFound; change the logic so only when result ==
ControlActionResult::Cancelled you push IDs into
cancelled_worker_ids/cancelled_branch_ids, call
logger.log("worker_killed"/"branch_killed", ...), and saturating_add to
kill_actions; for ControlActionResult::AlreadyTerminal and ::NotFound simply
skip logging/ID collection/increment (no "killed" audit) and continue, keeping
the match on target (KillTarget::Worker/KillTarget::Branch) intact and still
using the existing variables (result, target, cancelled_worker_ids,
cancelled_branch_ids, kill_actions, logger.log, worker_timeout, branch_timeout).

---

Duplicate comments:
In `@src/agent/channel.rs`:
- Around line 2164-2173: The branch reply-target is removed too early: instead
of calling self.branch_reply_targets.remove(branch_id) before completing
active-branch cleanup (which causes the later removal at the retrigger/metadata
path to find None), first fetch/clone the reply-target entry (e.g., via a
get/clone or take into a local variable) and only remove it once you actually
need to drop it; specifically, in the block that manipulates
self.state.active_branches and self.memory_persistence_branches preserve the
reply target from self.branch_reply_targets into a local variable and use that
preserved value for any retrigger/metadata logic later (or defer the remove to
the later cleanup), so both the normal branch completion and the retrigger path
can access the reply-target. Ensure changes reference self.branch_reply_targets,
self.state.active_branches, and self.memory_persistence_branches and adjust both
the early removal site and the later removal at the retrigger/metadata handling
so the reply-target isn’t lost.

In `@src/agent/cortex.rs`:
- Around line 2461-2487: The code currently sends a terminal
ProcessEvent::WorkerComplete (via event_tx.send(...)) when db_updated Err occurs
in the success-path block (using db_updated, run_logger.log_worker_completed,
logger.log, agent_id, worker_id, result_text), but other persistence-failure
branches do not emit the terminal event; unify behavior by changing all
persistence-failure branches to NOT emit WorkerComplete immediately and instead
only send ProcessEvent::WorkerComplete after a successful persisted terminal or
requeue transition; implement rollback/compensation logic on persistence failure
(e.g., revert in-memory success markers, undo run_logger.log_worker_completed
side-effects, and log the failure with logger.log) so the system retains a
single source-of-truth and only emits terminal events when persistence succeeds,
updating the blocks that currently call
event_tx.send(ProcessEvent::WorkerComplete { ... }) to follow this unified
policy.

---

Nitpick comments:
In `@src/agent/process_control.rs`:
- Around line 96-120: The double call to Weak::upgrade() in
cancel_channel_worker can race: upgrade() is called inside the write lock to
check liveness and then called again after the lock, allowing the handle to drop
between calls; instead perform a single upgrade while holding the lock and keep
the resulting Arc for use outside the lock. Concretely, in cancel_channel_worker
fetch the Weak from self.channels.write().await, call weak.upgrade() once,
return ControlActionResult::NotFound and remove the map entry if upgrade()
returns None, otherwise take the resulting Arc (strong handle) out of the lock
scope and then call handle.cancel_worker_with_reason(worker_id, reason).await;
apply the same single-upgrade refactor to the similar code in the other region
mentioned (lines 123-148).

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: d6c0a42a-cf6e-4936-8365-a5e6a53a830a

📥 Commits

Reviewing files that changed from the base of the PR and between 152a387 and 2627c2d.

📒 Files selected for processing (23)
  • docs/content/docs/(configuration)/config.mdx
  • docs/content/docs/(core)/architecture.mdx
  • docs/design-docs/cortex-history.md
  • docs/design-docs/cortex-implementation.md
  • src/agent.rs
  • src/agent/channel.rs
  • src/agent/channel_dispatch.rs
  • src/agent/cortex.rs
  • src/agent/process_control.rs
  • src/agent/status.rs
  • src/api/agents.rs
  • src/api/channels.rs
  • src/api/config.rs
  • src/config/load.rs
  • src/config/toml_schema.rs
  • src/config/types.rs
  • src/conversation/history.rs
  • src/lib.rs
  • src/main.rs
  • src/tools/cancel.rs
  • tests/bulletin.rs
  • tests/context_dump.rs
  • tests/detached_worker_bootstrap.rs
🚧 Files skipped from review as they are similar to previous changes (10)
  • tests/context_dump.rs
  • src/config/types.rs
  • src/agent.rs
  • src/api/agents.rs
  • docs/content/docs/(configuration)/config.mdx
  • src/config/toml_schema.rs
  • docs/content/docs/(core)/architecture.mdx
  • src/tools/cancel.rs
  • tests/bulletin.rs
  • src/api/channels.rs

Copy link
Member

@jamiepine jamiepine left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is better than anything I would have built, incredible PR.

@jamiepine jamiepine merged commit 7f22fcb into spacedriveapp:main Mar 5, 2026
4 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants