Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions codex-rs/acp/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,27 @@ The `pending_tool_calls` state is shared via `Arc<Mutex<HashMap<String, Accumula

Late-arriving tool events that race past the agent's final response are handled at the TUI layer via the `turn_finished` gate (see `@/codex-rs/tui/docs.md`).

**Turn Interrupt Guard** (`submit_and_ops.rs`, `user_input.rs`):

When `Op::Interrupt` fires, the backend emits `TurnLifecycle::Aborted` synchronously and calls `cancel()` on the ACP connection. However, the background tokio task spawned by `handle_user_input()` continues running after cancellation and unconditionally emits `TurnLifecycle::Completed` at the end of its event loop. If the user submits a new message before this stale `Completed` arrives, it races with the next turn's `TurnLifecycle::Started` and prematurely terminates it.

The `turn_interrupted: Arc<AtomicBool>` field on `AcpBackend` prevents this:

```
Op::Interrupt:
1. turn_interrupted.store(true) -- flag the current turn as interrupted
2. connection.cancel() -- cancel the ACP session

handle_user_input():
1. turn_interrupted.store(false) -- reset for new turn
...
spawned task epilogue:
if !turn_interrupted -- only emit Completed if not interrupted
emit TurnLifecycle::Completed
```

Since `TurnLifecycle::Aborted` already serves as the turn-ending signal for interrupted turns, suppressing the stale `Completed` is safe. The TUI also has a defense-in-depth counter (`pending_stale_completes`) that ignores stale `Completed` events at the presentation layer (see `@/codex-rs/tui/docs.md`).

**Tool Classification System:**

| ACP ToolKind | ParsedCommand | TUI Rendering |
Expand Down
5 changes: 5 additions & 0 deletions codex-rs/acp/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;

use anyhow::Result;
use codex_core::config::types::McpServerConfig;
Expand Down Expand Up @@ -323,6 +324,10 @@ pub struct AcpBackend {
client_event_normalizer: Arc<Mutex<ClientEventNormalizer>>,
/// MCP server configuration forwarded to ACP agents at session creation.
mcp_servers: HashMap<String, McpServerConfig>,
/// Set when `Op::Interrupt` fires; checked by the spawned prompt task
/// before emitting `TurnLifecycle::Completed`. Prevents a stale
/// `Completed` from a cancelled task from interfering with the next turn.
turn_interrupted: Arc<AtomicBool>,
}

mod helpers;
Expand Down
1 change: 1 addition & 0 deletions codex-rs/acp/src/backend/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,7 @@ impl AcpBackend {
script_timeout: config.script_timeout,
client_event_normalizer: Arc::clone(&client_event_normalizer),
mcp_servers: config.mcp_servers.clone(),
turn_interrupted: Arc::new(AtomicBool::new(false)),
};

// Execute session_start hooks
Expand Down
1 change: 1 addition & 0 deletions codex-rs/acp/src/backend/spawn_and_relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ impl AcpBackend {
script_timeout: config.script_timeout,
client_event_normalizer: Arc::clone(&client_event_normalizer),
mcp_servers: config.mcp_servers.clone(),
turn_interrupted: Arc::new(AtomicBool::new(false)),
};

// Execute session_start hooks
Expand Down
28 changes: 17 additions & 11 deletions codex-rs/acp/src/backend/submit_and_ops.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::atomic::Ordering;

use super::*;

impl AcpBackend {
Expand All @@ -21,6 +23,7 @@ impl AcpBackend {
self.handle_user_input(items, &id).await?;
}
Op::Interrupt => {
self.turn_interrupted.store(true, Ordering::SeqCst);
self.connection
.cancel(&*self.session_id.read().await)
.await?;
Expand Down Expand Up @@ -289,6 +292,7 @@ impl AcpBackend {
let client_event_normalizer = Arc::clone(&self.client_event_normalizer);
let backend_event_tx = self.backend_event_tx.clone();
let transcript_recorder = self.transcript_recorder.clone();
let turn_interrupted = Arc::clone(&self.turn_interrupted);

// Spawn task to handle the prompt and capture the summary
tokio::spawn(async move {
Expand Down Expand Up @@ -394,17 +398,19 @@ impl AcpBackend {
.await;
}

// Send TaskComplete event
emit_client_event(
&backend_event_tx,
transcript_recorder.as_ref(),
nori_protocol::ClientEvent::TurnLifecycle(
nori_protocol::TurnLifecycle::Completed {
last_agent_message: None,
},
),
)
.await;
// Send TaskComplete event, unless the turn was interrupted.
if !turn_interrupted.load(Ordering::SeqCst) {
emit_client_event(
&backend_event_tx,
transcript_recorder.as_ref(),
nori_protocol::ClientEvent::TurnLifecycle(
nori_protocol::TurnLifecycle::Completed {
last_agent_message: None,
},
),
)
.await;
}

// Start idle timer if configured
if let Some(duration) = notify_after_idle.as_duration() {
Expand Down
33 changes: 22 additions & 11 deletions codex-rs/acp/src/backend/user_input.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
use std::sync::atomic::Ordering;

use super::*;

impl AcpBackend {
/// Handle user input by sending a prompt to the ACP agent.
pub(super) async fn handle_user_input(&self, items: Vec<UserInput>, id: &str) -> Result<()> {
// Reset the interrupt flag so this turn's Completed will be emitted.
self.turn_interrupted.store(false, Ordering::SeqCst);

// Separate text items (needed for hooks, summary, transcript) from
// image items (converted to ACP ContentBlock::Image).
let mut prompt_text = String::new();
Expand Down Expand Up @@ -193,6 +198,7 @@ impl AcpBackend {
let pending_hook_context = Arc::clone(&self.pending_hook_context);
let client_event_normalizer = Arc::clone(&self.client_event_normalizer);
let backend_event_tx = self.backend_event_tx.clone();
let turn_interrupted = Arc::clone(&self.turn_interrupted);

// Spawn task to handle the prompt and translate events
tokio::spawn(async move {
Expand Down Expand Up @@ -531,17 +537,22 @@ impl AcpBackend {
);
}

// Send TaskComplete event (always, to end the turn)
emit_client_event(
&backend_event_tx,
transcript_recorder.as_ref(),
nori_protocol::ClientEvent::TurnLifecycle(
nori_protocol::TurnLifecycle::Completed {
last_agent_message: None,
},
),
)
.await;
// Send TaskComplete event to end the turn, unless this turn was
// interrupted. When Op::Interrupt fires, it emits
// TurnLifecycle::Aborted synchronously; emitting a Completed here
// would race with the next turn and prematurely terminate it.
if !turn_interrupted.load(Ordering::SeqCst) {
emit_client_event(
&backend_event_tx,
transcript_recorder.as_ref(),
nori_protocol::ClientEvent::TurnLifecycle(
nori_protocol::TurnLifecycle::Completed {
last_agent_message: None,
},
),
)
.await;
}

// Start idle timer if configured
if let Some(duration) = notify_after_idle.as_duration() {
Expand Down
11 changes: 11 additions & 0 deletions codex-rs/tui/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,17 @@ The ACP protocol has no end-of-turn synchronization guarantee. Answer deltas, re

The gate is checked both in the legacy exec/mcp handlers and in the normalized ACP tool-snapshot handlers. When `turn_finished` is true, those methods return immediately without rendering any UI. This is complementary to the interrupt queue: the queue handles deferral during streaming within a turn, while `turn_finished` handles events that arrive after the turn ends entirely.

**Stale Completed Guard** (`chatwidget/mod.rs`, `chatwidget/event_handlers.rs`):

When a turn is interrupted (ESC), the ACP backend emits `TurnLifecycle::Aborted` synchronously, but the background task may still emit a stale `TurnLifecycle::Completed` later. If that stale `Completed` arrives after the next turn has started, `on_task_complete()` would set `turn_finished = true` and discard all subsequent tool events for the new turn. The `pending_stale_completes: i32` counter on `ChatWidget` acts as defense-in-depth against this race:

| Action | Method | Effect |
|--------|--------|--------|
| Interrupt received | `on_interrupted_turn()` | Increments `pending_stale_completes` |
| Stale Completed arrives | `on_task_complete()` | If counter > 0, decrements and returns early (skips turn finalization) |

This is complementary to the ACP backend's `turn_interrupted` flag (`@/codex-rs/acp/docs.md`), which suppresses the stale `Completed` at the source. The TUI counter provides a safety net in case the backend guard is bypassed.

**Turn-Boundary Cleanup of Incomplete Tool Cells** (`chatwidget/event_handlers.rs`):

Because the `turn_finished` gate blocks late-arriving End events, tool cells that began but never received their End event would remain stuck in `active_cell` or `pending_exec_cells`, filling the viewport and blocking the agent's text from rendering. Both `on_agent_message()` and `on_task_complete()` now explicitly finalize incomplete cells at turn boundaries:
Expand Down
2 changes: 2 additions & 0 deletions codex-rs/tui/src/chatwidget/constructors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ impl ChatWidget {
#[cfg(feature = "nori-config")]
loop_count_override: None,
turn_finished: false,
pending_stale_completes: 0,
plan_drawer_mode: PlanDrawerMode::Off,
pinned_plan: None,
terminal_title_animation_origin: std::time::Instant::now(),
Expand Down Expand Up @@ -207,6 +208,7 @@ impl ChatWidget {
#[cfg(feature = "nori-config")]
loop_count_override: None,
turn_finished: false,
pending_stale_completes: 0,
plan_drawer_mode: PlanDrawerMode::Off,
pinned_plan: None,
terminal_title_animation_origin: std::time::Instant::now(),
Expand Down
14 changes: 14 additions & 0 deletions codex-rs/tui/src/chatwidget/event_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,15 @@ impl ChatWidget {
}

pub(super) fn on_task_complete(&mut self, last_agent_message: Option<String>) {
// If this Completed is a stale leftover from a cancelled turn, skip it.
// Each on_interrupted_turn increments pending_stale_completes; the
// matching background task will eventually emit Completed which we
// must ignore to avoid prematurely ending the current turn.
if self.pending_stale_completes > 0 {
self.pending_stale_completes -= 1;
return;
}

// If a stream is currently active, finalize it.
self.flush_answer_stream_with_separator();

Expand Down Expand Up @@ -428,6 +437,11 @@ impl ChatWidget {
/// When there are queued user messages, restore them into the composer
/// separated by newlines rather than auto‑submitting the next one.
pub(super) fn on_interrupted_turn(&mut self, _reason: TurnAbortReason) {
// The cancelled background task will eventually emit
// TurnLifecycle::Completed; record that we expect one stale
// Completed so on_task_complete can ignore it.
self.pending_stale_completes += 1;

// Finalize, log a gentle prompt, and clear running state.
self.finalize_turn();
self.cancel_loop();
Expand Down
4 changes: 4 additions & 0 deletions codex-rs/tui/src/chatwidget/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,10 @@ pub(crate) struct ChatWidget {
// Gate: set when AgentMessage is received, cleared on next TaskStarted.
// While true, late-arriving tool events are silently discarded.
turn_finished: bool,
// Number of stale TurnLifecycle::Completed events expected after
// interrupts. Each on_interrupted_turn increments this; each
// on_task_complete decrements and skips processing while > 0.
pending_stale_completes: i32,
/// Whether and how plan updates are rendered in a pinned drawer instead of
/// history cells.
plan_drawer_mode: PlanDrawerMode,
Expand Down
2 changes: 2 additions & 0 deletions codex-rs/tui/src/chatwidget/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ pub(crate) fn make_chatwidget_manual() -> (
#[cfg(feature = "nori-config")]
loop_count_override: None,
turn_finished: false,
pending_stale_completes: 0,
plan_drawer_mode: PlanDrawerMode::Off,
pinned_plan: None,
terminal_title_animation_origin: std::time::Instant::now(),
Expand All @@ -339,3 +340,4 @@ mod part3;
mod part4;
mod part5;
mod part6;
mod part7;
73 changes: 73 additions & 0 deletions codex-rs/tui/src/chatwidget/tests/part7.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use super::*;

/// When the stale Completed arrives during an active turn, tool events for the
/// active turn should NOT be discarded.
///
/// Race condition sequence:
/// 1. Started(A) → task running
/// 2. Aborted(A) → task stopped (user pressed ESC)
/// 3. Started(B) → new task running (user submitted new message)
/// 4. Completed(A) → stale event from cancelled background task
///
/// After step 4, turn B's tool events must still be processed. Before the fix,
/// the stale Completed prematurely gated tool events, causing them to be
/// silently discarded.
#[test]
fn stale_completed_should_not_block_tool_events_for_next_turn() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();

// Start and interrupt turn A
chat.on_task_started();
drain_insert_history(&mut rx);
chat.on_interrupted_turn(TurnAbortReason::Interrupted);
drain_insert_history(&mut rx);

// Start turn B
chat.on_task_started();
drain_insert_history(&mut rx);

// Stale Completed from turn A
chat.on_task_complete(None);
drain_insert_history(&mut rx);

// Tool event for turn B should NOT be discarded
begin_exec(&mut chat, "turn-b-call", "echo hello from turn B");

assert!(
chat.active_cell.is_some(),
"ExecCell should be created - stale Completed should not block turn B's tool events"
);
}

/// Multiple consecutive interrupts should each produce one stale Completed
/// that is correctly drained before the real turn's events arrive.
#[test]
fn multiple_interrupts_drain_stale_completes_in_order() {
let (mut chat, mut rx, _op_rx) = make_chatwidget_manual();

// Interrupt twice in a row
chat.on_task_started();
drain_insert_history(&mut rx);
chat.on_interrupted_turn(TurnAbortReason::Interrupted);
drain_insert_history(&mut rx);

chat.on_task_started();
drain_insert_history(&mut rx);
chat.on_interrupted_turn(TurnAbortReason::Interrupted);
drain_insert_history(&mut rx);

// Start the real turn
chat.on_task_started();
drain_insert_history(&mut rx);

// Two stale Completeds arrive
chat.on_task_complete(None);
chat.on_task_complete(None);

// Real tool events should still work
begin_exec(&mut chat, "real-call", "echo real");
assert!(
chat.active_cell.is_some(),
"ExecCell should be created after draining multiple stale Completeds"
);
}
Loading