From 3e1f105652e02fe57038495b217910e74a6c5429 Mon Sep 17 00:00:00 2001 From: Amol Kapoor Date: Fri, 3 Apr 2026 11:54:38 -0400 Subject: [PATCH] fix(acp,tui): prevent stale TurnLifecycle::Completed from hiding post-interrupt responses MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a user presses ESC to interrupt a running task, the ACP backend emits TurnLifecycle::Aborted immediately. However, the background tokio task that was handling the prompt continues and unconditionally emits TurnLifecycle::Completed. If the user submits a new message before this stale Completed arrives, it races with the new turn's Started event, setting turn_finished=true and causing tool events to be silently discarded. Fix with defense in depth at two levels: Backend (ACP): Add turn_interrupted AtomicBool flag. Set on Op::Interrupt, reset on new handle_user_input. The spawned task checks the flag before emitting Completed and skips it if the turn was interrupted. TUI (ChatWidget): Add pending_stale_completes counter. Each on_interrupted_turn increments it; on_task_complete decrements and skips processing while the counter is positive. 🤖 Generated with [Nori](https://noriagentic.com) Co-Authored-By: Nori --- codex-rs/acp/docs.md | 21 ++++++ codex-rs/acp/src/backend/mod.rs | 5 ++ codex-rs/acp/src/backend/session.rs | 1 + codex-rs/acp/src/backend/spawn_and_relay.rs | 1 + codex-rs/acp/src/backend/submit_and_ops.rs | 28 ++++--- codex-rs/acp/src/backend/user_input.rs | 33 ++++++--- codex-rs/tui/docs.md | 11 +++ codex-rs/tui/src/chatwidget/constructors.rs | 2 + codex-rs/tui/src/chatwidget/event_handlers.rs | 14 ++++ codex-rs/tui/src/chatwidget/mod.rs | 4 + codex-rs/tui/src/chatwidget/tests/mod.rs | 2 + codex-rs/tui/src/chatwidget/tests/part7.rs | 73 +++++++++++++++++++ 12 files changed, 173 insertions(+), 22 deletions(-) create mode 100644 codex-rs/tui/src/chatwidget/tests/part7.rs diff --git a/codex-rs/acp/docs.md b/codex-rs/acp/docs.md index 8dd072228..cf4ca305b 100644 --- a/codex-rs/acp/docs.md +++ b/codex-rs/acp/docs.md @@ -808,6 +808,27 @@ The `pending_tool_calls` state is shared via `Arc` field on `AcpBackend` prevents this: + +``` +Op::Interrupt: + 1. turn_interrupted.store(true) -- flag the current turn as interrupted + 2. connection.cancel() -- cancel the ACP session + +handle_user_input(): + 1. turn_interrupted.store(false) -- reset for new turn + ... + spawned task epilogue: + if !turn_interrupted -- only emit Completed if not interrupted + emit TurnLifecycle::Completed +``` + +Since `TurnLifecycle::Aborted` already serves as the turn-ending signal for interrupted turns, suppressing the stale `Completed` is safe. The TUI also has a defense-in-depth counter (`pending_stale_completes`) that ignores stale `Completed` events at the presentation layer (see `@/codex-rs/tui/docs.md`). + **Tool Classification System:** | ACP ToolKind | ParsedCommand | TUI Rendering | diff --git a/codex-rs/acp/src/backend/mod.rs b/codex-rs/acp/src/backend/mod.rs index c93ff70f8..cd84bad1f 100644 --- a/codex-rs/acp/src/backend/mod.rs +++ b/codex-rs/acp/src/backend/mod.rs @@ -9,6 +9,7 @@ use std::collections::HashMap; use std::path::PathBuf; use std::sync::Arc; +use std::sync::atomic::AtomicBool; use anyhow::Result; use codex_core::config::types::McpServerConfig; @@ -323,6 +324,10 @@ pub struct AcpBackend { client_event_normalizer: Arc>, /// MCP server configuration forwarded to ACP agents at session creation. mcp_servers: HashMap, + /// Set when `Op::Interrupt` fires; checked by the spawned prompt task + /// before emitting `TurnLifecycle::Completed`. Prevents a stale + /// `Completed` from a cancelled task from interfering with the next turn. + turn_interrupted: Arc, } mod helpers; diff --git a/codex-rs/acp/src/backend/session.rs b/codex-rs/acp/src/backend/session.rs index d44a61e56..8a6eee036 100644 --- a/codex-rs/acp/src/backend/session.rs +++ b/codex-rs/acp/src/backend/session.rs @@ -253,6 +253,7 @@ impl AcpBackend { script_timeout: config.script_timeout, client_event_normalizer: Arc::clone(&client_event_normalizer), mcp_servers: config.mcp_servers.clone(), + turn_interrupted: Arc::new(AtomicBool::new(false)), }; // Execute session_start hooks diff --git a/codex-rs/acp/src/backend/spawn_and_relay.rs b/codex-rs/acp/src/backend/spawn_and_relay.rs index a6ebd839c..b225732da 100644 --- a/codex-rs/acp/src/backend/spawn_and_relay.rs +++ b/codex-rs/acp/src/backend/spawn_and_relay.rs @@ -186,6 +186,7 @@ impl AcpBackend { script_timeout: config.script_timeout, client_event_normalizer: Arc::clone(&client_event_normalizer), mcp_servers: config.mcp_servers.clone(), + turn_interrupted: Arc::new(AtomicBool::new(false)), }; // Execute session_start hooks diff --git a/codex-rs/acp/src/backend/submit_and_ops.rs b/codex-rs/acp/src/backend/submit_and_ops.rs index 1c7677b3c..8f3053697 100644 --- a/codex-rs/acp/src/backend/submit_and_ops.rs +++ b/codex-rs/acp/src/backend/submit_and_ops.rs @@ -1,3 +1,5 @@ +use std::sync::atomic::Ordering; + use super::*; impl AcpBackend { @@ -21,6 +23,7 @@ impl AcpBackend { self.handle_user_input(items, &id).await?; } Op::Interrupt => { + self.turn_interrupted.store(true, Ordering::SeqCst); self.connection .cancel(&*self.session_id.read().await) .await?; @@ -289,6 +292,7 @@ impl AcpBackend { let client_event_normalizer = Arc::clone(&self.client_event_normalizer); let backend_event_tx = self.backend_event_tx.clone(); let transcript_recorder = self.transcript_recorder.clone(); + let turn_interrupted = Arc::clone(&self.turn_interrupted); // Spawn task to handle the prompt and capture the summary tokio::spawn(async move { @@ -394,17 +398,19 @@ impl AcpBackend { .await; } - // Send TaskComplete event - emit_client_event( - &backend_event_tx, - transcript_recorder.as_ref(), - nori_protocol::ClientEvent::TurnLifecycle( - nori_protocol::TurnLifecycle::Completed { - last_agent_message: None, - }, - ), - ) - .await; + // Send TaskComplete event, unless the turn was interrupted. + if !turn_interrupted.load(Ordering::SeqCst) { + emit_client_event( + &backend_event_tx, + transcript_recorder.as_ref(), + nori_protocol::ClientEvent::TurnLifecycle( + nori_protocol::TurnLifecycle::Completed { + last_agent_message: None, + }, + ), + ) + .await; + } // Start idle timer if configured if let Some(duration) = notify_after_idle.as_duration() { diff --git a/codex-rs/acp/src/backend/user_input.rs b/codex-rs/acp/src/backend/user_input.rs index a608002f0..bd0b52d13 100644 --- a/codex-rs/acp/src/backend/user_input.rs +++ b/codex-rs/acp/src/backend/user_input.rs @@ -1,8 +1,13 @@ +use std::sync::atomic::Ordering; + use super::*; impl AcpBackend { /// Handle user input by sending a prompt to the ACP agent. pub(super) async fn handle_user_input(&self, items: Vec, id: &str) -> Result<()> { + // Reset the interrupt flag so this turn's Completed will be emitted. + self.turn_interrupted.store(false, Ordering::SeqCst); + // Separate text items (needed for hooks, summary, transcript) from // image items (converted to ACP ContentBlock::Image). let mut prompt_text = String::new(); @@ -193,6 +198,7 @@ impl AcpBackend { let pending_hook_context = Arc::clone(&self.pending_hook_context); let client_event_normalizer = Arc::clone(&self.client_event_normalizer); let backend_event_tx = self.backend_event_tx.clone(); + let turn_interrupted = Arc::clone(&self.turn_interrupted); // Spawn task to handle the prompt and translate events tokio::spawn(async move { @@ -531,17 +537,22 @@ impl AcpBackend { ); } - // Send TaskComplete event (always, to end the turn) - emit_client_event( - &backend_event_tx, - transcript_recorder.as_ref(), - nori_protocol::ClientEvent::TurnLifecycle( - nori_protocol::TurnLifecycle::Completed { - last_agent_message: None, - }, - ), - ) - .await; + // Send TaskComplete event to end the turn, unless this turn was + // interrupted. When Op::Interrupt fires, it emits + // TurnLifecycle::Aborted synchronously; emitting a Completed here + // would race with the next turn and prematurely terminate it. + if !turn_interrupted.load(Ordering::SeqCst) { + emit_client_event( + &backend_event_tx, + transcript_recorder.as_ref(), + nori_protocol::ClientEvent::TurnLifecycle( + nori_protocol::TurnLifecycle::Completed { + last_agent_message: None, + }, + ), + ) + .await; + } // Start idle timer if configured if let Some(duration) = notify_after_idle.as_duration() { diff --git a/codex-rs/tui/docs.md b/codex-rs/tui/docs.md index b34ec5166..07503d41f 100644 --- a/codex-rs/tui/docs.md +++ b/codex-rs/tui/docs.md @@ -142,6 +142,17 @@ The ACP protocol has no end-of-turn synchronization guarantee. Answer deltas, re The gate is checked both in the legacy exec/mcp handlers and in the normalized ACP tool-snapshot handlers. When `turn_finished` is true, those methods return immediately without rendering any UI. This is complementary to the interrupt queue: the queue handles deferral during streaming within a turn, while `turn_finished` handles events that arrive after the turn ends entirely. +**Stale Completed Guard** (`chatwidget/mod.rs`, `chatwidget/event_handlers.rs`): + +When a turn is interrupted (ESC), the ACP backend emits `TurnLifecycle::Aborted` synchronously, but the background task may still emit a stale `TurnLifecycle::Completed` later. If that stale `Completed` arrives after the next turn has started, `on_task_complete()` would set `turn_finished = true` and discard all subsequent tool events for the new turn. The `pending_stale_completes: i32` counter on `ChatWidget` acts as defense-in-depth against this race: + +| Action | Method | Effect | +|--------|--------|--------| +| Interrupt received | `on_interrupted_turn()` | Increments `pending_stale_completes` | +| Stale Completed arrives | `on_task_complete()` | If counter > 0, decrements and returns early (skips turn finalization) | + +This is complementary to the ACP backend's `turn_interrupted` flag (`@/codex-rs/acp/docs.md`), which suppresses the stale `Completed` at the source. The TUI counter provides a safety net in case the backend guard is bypassed. + **Turn-Boundary Cleanup of Incomplete Tool Cells** (`chatwidget/event_handlers.rs`): Because the `turn_finished` gate blocks late-arriving End events, tool cells that began but never received their End event would remain stuck in `active_cell` or `pending_exec_cells`, filling the viewport and blocking the agent's text from rendering. Both `on_agent_message()` and `on_task_complete()` now explicitly finalize incomplete cells at turn boundaries: diff --git a/codex-rs/tui/src/chatwidget/constructors.rs b/codex-rs/tui/src/chatwidget/constructors.rs index ca104bc2e..094f12b62 100644 --- a/codex-rs/tui/src/chatwidget/constructors.rs +++ b/codex-rs/tui/src/chatwidget/constructors.rs @@ -99,6 +99,7 @@ impl ChatWidget { #[cfg(feature = "nori-config")] loop_count_override: None, turn_finished: false, + pending_stale_completes: 0, plan_drawer_mode: PlanDrawerMode::Off, pinned_plan: None, terminal_title_animation_origin: std::time::Instant::now(), @@ -207,6 +208,7 @@ impl ChatWidget { #[cfg(feature = "nori-config")] loop_count_override: None, turn_finished: false, + pending_stale_completes: 0, plan_drawer_mode: PlanDrawerMode::Off, pinned_plan: None, terminal_title_animation_origin: std::time::Instant::now(), diff --git a/codex-rs/tui/src/chatwidget/event_handlers.rs b/codex-rs/tui/src/chatwidget/event_handlers.rs index 64e71fbb6..eba35eb9e 100644 --- a/codex-rs/tui/src/chatwidget/event_handlers.rs +++ b/codex-rs/tui/src/chatwidget/event_handlers.rs @@ -196,6 +196,15 @@ impl ChatWidget { } pub(super) fn on_task_complete(&mut self, last_agent_message: Option) { + // If this Completed is a stale leftover from a cancelled turn, skip it. + // Each on_interrupted_turn increments pending_stale_completes; the + // matching background task will eventually emit Completed which we + // must ignore to avoid prematurely ending the current turn. + if self.pending_stale_completes > 0 { + self.pending_stale_completes -= 1; + return; + } + // If a stream is currently active, finalize it. self.flush_answer_stream_with_separator(); @@ -428,6 +437,11 @@ impl ChatWidget { /// When there are queued user messages, restore them into the composer /// separated by newlines rather than auto‑submitting the next one. pub(super) fn on_interrupted_turn(&mut self, _reason: TurnAbortReason) { + // The cancelled background task will eventually emit + // TurnLifecycle::Completed; record that we expect one stale + // Completed so on_task_complete can ignore it. + self.pending_stale_completes += 1; + // Finalize, log a gentle prompt, and clear running state. self.finalize_turn(); self.cancel_loop(); diff --git a/codex-rs/tui/src/chatwidget/mod.rs b/codex-rs/tui/src/chatwidget/mod.rs index d9c24064f..9fc9c2d94 100644 --- a/codex-rs/tui/src/chatwidget/mod.rs +++ b/codex-rs/tui/src/chatwidget/mod.rs @@ -424,6 +424,10 @@ pub(crate) struct ChatWidget { // Gate: set when AgentMessage is received, cleared on next TaskStarted. // While true, late-arriving tool events are silently discarded. turn_finished: bool, + // Number of stale TurnLifecycle::Completed events expected after + // interrupts. Each on_interrupted_turn increments this; each + // on_task_complete decrements and skips processing while > 0. + pending_stale_completes: i32, /// Whether and how plan updates are rendered in a pinned drawer instead of /// history cells. plan_drawer_mode: PlanDrawerMode, diff --git a/codex-rs/tui/src/chatwidget/tests/mod.rs b/codex-rs/tui/src/chatwidget/tests/mod.rs index 12c673355..af5736b5c 100644 --- a/codex-rs/tui/src/chatwidget/tests/mod.rs +++ b/codex-rs/tui/src/chatwidget/tests/mod.rs @@ -313,6 +313,7 @@ pub(crate) fn make_chatwidget_manual() -> ( #[cfg(feature = "nori-config")] loop_count_override: None, turn_finished: false, + pending_stale_completes: 0, plan_drawer_mode: PlanDrawerMode::Off, pinned_plan: None, terminal_title_animation_origin: std::time::Instant::now(), @@ -339,3 +340,4 @@ mod part3; mod part4; mod part5; mod part6; +mod part7; diff --git a/codex-rs/tui/src/chatwidget/tests/part7.rs b/codex-rs/tui/src/chatwidget/tests/part7.rs new file mode 100644 index 000000000..5748becf4 --- /dev/null +++ b/codex-rs/tui/src/chatwidget/tests/part7.rs @@ -0,0 +1,73 @@ +use super::*; + +/// When the stale Completed arrives during an active turn, tool events for the +/// active turn should NOT be discarded. +/// +/// Race condition sequence: +/// 1. Started(A) → task running +/// 2. Aborted(A) → task stopped (user pressed ESC) +/// 3. Started(B) → new task running (user submitted new message) +/// 4. Completed(A) → stale event from cancelled background task +/// +/// After step 4, turn B's tool events must still be processed. Before the fix, +/// the stale Completed prematurely gated tool events, causing them to be +/// silently discarded. +#[test] +fn stale_completed_should_not_block_tool_events_for_next_turn() { + let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(); + + // Start and interrupt turn A + chat.on_task_started(); + drain_insert_history(&mut rx); + chat.on_interrupted_turn(TurnAbortReason::Interrupted); + drain_insert_history(&mut rx); + + // Start turn B + chat.on_task_started(); + drain_insert_history(&mut rx); + + // Stale Completed from turn A + chat.on_task_complete(None); + drain_insert_history(&mut rx); + + // Tool event for turn B should NOT be discarded + begin_exec(&mut chat, "turn-b-call", "echo hello from turn B"); + + assert!( + chat.active_cell.is_some(), + "ExecCell should be created - stale Completed should not block turn B's tool events" + ); +} + +/// Multiple consecutive interrupts should each produce one stale Completed +/// that is correctly drained before the real turn's events arrive. +#[test] +fn multiple_interrupts_drain_stale_completes_in_order() { + let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(); + + // Interrupt twice in a row + chat.on_task_started(); + drain_insert_history(&mut rx); + chat.on_interrupted_turn(TurnAbortReason::Interrupted); + drain_insert_history(&mut rx); + + chat.on_task_started(); + drain_insert_history(&mut rx); + chat.on_interrupted_turn(TurnAbortReason::Interrupted); + drain_insert_history(&mut rx); + + // Start the real turn + chat.on_task_started(); + drain_insert_history(&mut rx); + + // Two stale Completeds arrive + chat.on_task_complete(None); + chat.on_task_complete(None); + + // Real tool events should still work + begin_exec(&mut chat, "real-call", "echo real"); + assert!( + chat.active_cell.is_some(), + "ExecCell should be created after draining multiple stale Completeds" + ); +}