Skip to content
44 changes: 36 additions & 8 deletions codex-rs/acp/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -808,26 +808,54 @@ The `pending_tool_calls` state is shared via `Arc<Mutex<HashMap<String, Accumula

Late-arriving tool events that race past the agent's final response are handled at the TUI layer via the `turn_finished` gate (see `@/codex-rs/tui/docs.md`).

**Turn Interrupt Guard** (`submit_and_ops.rs`, `user_input.rs`):
**Prompt Update Channel Lifecycle** (`sacp_connection.rs`, `user_input.rs`, `submit_and_ops.rs`):

When `Op::Interrupt` fires, the backend emits `TurnLifecycle::Aborted` synchronously and calls `cancel()` on the ACP connection. However, the background tokio task spawned by `handle_user_input()` continues running after cancellation and unconditionally emits `TurnLifecycle::Completed` at the end of its event loop. If the user submits a new message before this stale `Completed` arrives, it races with the next turn's `TurnLifecycle::Started` and prematurely terminates it.
Each prompt/load_session call gets a dedicated `mpsc` channel (`update_tx`/`update_rx`) for receiving `SessionUpdate` notifications from the ACP agent. The connection layer routes notifications through a shared `active_update_tx` slot (an `Arc<Mutex<Option<(u64, Sender)>>>`) that pairs the sender with a monotonic generation counter. The routing logic in the notification handler uses `try_send` with fallthrough: if the per-prompt channel fails (receiver dropped, or channel full), the notification falls through to the `persistent_tx` channel instead of being silently dropped.

The `turn_interrupted: Arc<AtomicBool>` field on `AcpBackend` prevents this:
The critical invariant is that `prompt()` does **not** clear `active_update_tx` when it returns. This is because `block_task()` (the SACP request/response mechanism) can return before all `SessionNotification` events have been delivered. Instead, callers use a `done_tx`/`done_rx` oneshot to signal the `update_handler` task:

```
prompt() returns
|
v
done_tx.send(()) -- signals update_handler that prompt is done
|
v
update_handler enters drain mode:
tokio::select! switches from waiting on (update_rx OR done_rx)
to waiting on update_rx with a 500ms timeout
|
v
After timeout or channel close, update_handler exits
(dropping update_rx, which causes future try_send to fail)
|
v
Next prompt() overwrites active_update_tx slot with a fresh sender
```

The generation counter on `active_update_tx` prevents stale cleanup: `close_update_channel(generation)` only clears the slot if the generation matches, so it is safe for `load_session` (which is sequential) to clear its own channel without risking a concurrent prompt's channel. `prompt()` callers do not call `close_update_channel` at all — they rely on the done/drain pattern instead.

**Turn Interrupt Guard — Monotonic Turn Counter** (`submit_and_ops.rs`, `user_input.rs`):

When `Op::Interrupt` fires, the backend emits `TurnLifecycle::Aborted` synchronously and calls `cancel()` on the ACP connection. However, the background tokio task spawned by `handle_user_input()` (and `handle_compact()`) continues running after cancellation and may emit stale `TurnLifecycle::Completed` or `ErrorEvent` at the end of its event loop. If the user submits a new message before these stale events arrive, they race with the next turn and can prematurely terminate it.

The `turn_id: Arc<AtomicU64>` field on `AcpBackend` is a monotonic counter that eliminates this race. It is incremented on every `Op::Interrupt` and on every new turn (`handle_user_input()`, `handle_compact()`). Each spawned task captures its own turn ID at spawn time and only emits tail events (errors, warnings, `Completed`) if the counter still matches:

```
Op::Interrupt:
1. turn_interrupted.store(true) -- flag the current turn as interrupted
1. turn_id.fetch_add(1) -- advance the counter, invalidating the current task
2. connection.cancel() -- cancel the ACP session

handle_user_input():
1. turn_interrupted.store(false) -- reset for new turn
handle_user_input() / handle_compact():
1. my_turn_id = turn_id.fetch_add(1) + 1 -- advance counter, capture this turn's ID
...
spawned task epilogue:
if !turn_interrupted -- only emit Completed if not interrupted
if turn_id.load() == my_turn_id -- only emit tail events if still current
emit ErrorEvent (if error)
emit TurnLifecycle::Completed
```

Since `TurnLifecycle::Aborted` already serves as the turn-ending signal for interrupted turns, suppressing the stale `Completed` is safe. The TUI also has a defense-in-depth counter (`pending_stale_completes`) that ignores stale `Completed` events at the presentation layer (see `@/codex-rs/tui/docs.md`).
Because the counter is monotonic and never reset, there is no TOCTOU window: an interrupt always invalidates any previously spawned task, and a new turn always gets a fresh ID that cannot collide with prior tasks. The TUI does not need any complementary guard — stale events are fully suppressed at the backend layer.

**Tool Classification System:**

Expand Down
4 changes: 2 additions & 2 deletions codex-rs/acp/src/backend/hooks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ pub(super) async fn run_prompt_summary(
drop(connection);

match prompt_result {
Ok(Ok(_)) => {}
Ok(Err(e)) => return Err(e),
Ok((Ok(_), _gen)) => {}
Ok((Err(e), _gen)) => return Err(e),
Err(_) => {
debug!("Prompt summary timed out");
return Ok(());
Expand Down
18 changes: 13 additions & 5 deletions codex-rs/acp/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU64;

use anyhow::Result;
use codex_core::config::types::McpServerConfig;
Expand Down Expand Up @@ -50,6 +50,12 @@ use crate::transcript::TranscriptRecorder;
use crate::translator;
use crate::undo::GhostSnapshotStack;

/// Maximum time to wait for late-arriving `SessionNotification` events after
/// `block_task()` returns. Empirically most arrive within ~50 ms; 500 ms
/// provides generous headroom without noticeably delaying the turn lifecycle.
pub(super) const POST_PROMPT_DRAIN_TIMEOUT: std::time::Duration =
std::time::Duration::from_millis(500);

// =============================================================================
// Error Categorization
// =============================================================================
Expand Down Expand Up @@ -324,10 +330,12 @@ pub struct AcpBackend {
client_event_normalizer: Arc<Mutex<ClientEventNormalizer>>,
/// MCP server configuration forwarded to ACP agents at session creation.
mcp_servers: HashMap<String, McpServerConfig>,
/// Set when `Op::Interrupt` fires; checked by the spawned prompt task
/// before emitting `TurnLifecycle::Completed`. Prevents a stale
/// `Completed` from a cancelled task from interfering with the next turn.
turn_interrupted: Arc<AtomicBool>,
/// Monotonic turn counter incremented on every new turn and on every
/// interrupt. Each spawned prompt task captures its turn ID at spawn
/// time and only emits `TurnLifecycle::Completed` if the counter still
/// matches — guaranteeing that stale tasks from cancelled turns never
/// emit a Completed that could interfere with a subsequent turn.
turn_id: Arc<AtomicU64>,
}

mod helpers;
Expand Down
6 changes: 1 addition & 5 deletions codex-rs/acp/src/backend/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,6 @@ impl AcpBackend {

match connection.load_session(sid, &cwd, update_tx).await {
Ok(session_id) => {
// Wait for all updates to be collected. This is safe
// because the collect task buffers into a Vec (no
// backpressure) and update_rx closes when load_session
// completes (the worker thread drops update_tx).
let buffered_client_events = collect_handle.await.unwrap_or_default();
if !buffered_client_events.is_empty() {
debug!(
Expand Down Expand Up @@ -253,7 +249,7 @@ impl AcpBackend {
script_timeout: config.script_timeout,
client_event_normalizer: Arc::clone(&client_event_normalizer),
mcp_servers: config.mcp_servers.clone(),
turn_interrupted: Arc::new(AtomicBool::new(false)),
turn_id: Arc::new(AtomicU64::new(0)),
};

// Execute session_start hooks
Expand Down
2 changes: 1 addition & 1 deletion codex-rs/acp/src/backend/spawn_and_relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl AcpBackend {
script_timeout: config.script_timeout,
client_event_normalizer: Arc::clone(&client_event_normalizer),
mcp_servers: config.mcp_servers.clone(),
turn_interrupted: Arc::new(AtomicBool::new(false)),
turn_id: Arc::new(AtomicU64::new(0)),
};

// Execute session_start hooks
Expand Down
165 changes: 93 additions & 72 deletions codex-rs/acp/src/backend/submit_and_ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl AcpBackend {
self.handle_user_input(items, &id).await?;
}
Op::Interrupt => {
self.turn_interrupted.store(true, Ordering::SeqCst);
self.turn_id.fetch_add(1, Ordering::SeqCst);
self.connection
.cancel(&*self.session_id.read().await)
.await?;
Expand Down Expand Up @@ -276,6 +276,7 @@ impl AcpBackend {

// Create channel for receiving session updates
let (update_tx, mut update_rx) = mpsc::channel(32);
let (done_tx, mut done_rx) = tokio::sync::oneshot::channel::<()>();

// Clone what we need for capturing the response
let event_tx = self.event_tx.clone();
Expand All @@ -292,7 +293,8 @@ impl AcpBackend {
let client_event_normalizer = Arc::clone(&self.client_event_normalizer);
let backend_event_tx = self.backend_event_tx.clone();
let transcript_recorder = self.transcript_recorder.clone();
let turn_interrupted = Arc::clone(&self.turn_interrupted);
let turn_id = Arc::clone(&self.turn_id);
let my_turn_id = turn_id.fetch_add(1, Ordering::SeqCst) + 1;

// Spawn task to handle the prompt and capture the summary
tokio::spawn(async move {
Expand All @@ -316,8 +318,31 @@ impl AcpBackend {

let update_handler = tokio::spawn(async move {
let mut summary_text = String::new();

while let Some(update) = update_rx.recv().await {
let mut done = false;

loop {
let update = if done {
match tokio::time::timeout(
super::POST_PROMPT_DRAIN_TIMEOUT,
update_rx.recv(),
)
.await
{
Ok(Some(u)) => u,
_ => break,
}
} else {
tokio::select! {
msg = update_rx.recv() => match msg {
Some(u) => u,
None => break,
},
_ = &mut done_rx => {
done = true;
continue;
}
}
};
let client_events =
normalize_session_update(&client_event_normalizer, &update).await;
forward_client_events(&backend_event_tx_for_updates, &client_events).await;
Expand All @@ -338,68 +363,63 @@ impl AcpBackend {

// Send the summarization prompt
let session_id_for_timer = session_id.to_string();
let result = connection.prompt(session_id, prompt, update_tx).await;
let (result, _update_gen) = connection.prompt(session_id, prompt, update_tx).await;

// Signal the update_handler to drain remaining events and stop.
let _ = done_tx.send(());

// Wait for all updates to be processed
let _ = update_handler.await;

// If prompt failed, send error event and clear any partial summary
if let Err(ref e) = result {
warn!("Compact prompt failed: {e}");
// Clear any partial summary that may have been stored
*pending_compact_summary.lock().await = None;
let _ = event_tx
.send(Event {
id: id_clone.clone(),
msg: EventMsg::Error(ErrorEvent {
message: format!("Compact failed: {e}"),
codex_error_info: None,
}),
})
.await;
} else {
// Create a new session to clear the agent's conversation history.
// The summary we captured will be prepended to the next user prompt,
// giving the agent context about the previous conversation.
match connection.create_session(&cwd, mcp_servers).await {
Ok(new_session_id) => {
debug!("Created new session after compact: {:?}", new_session_id);
*session_id_lock.write().await = new_session_id;
}
Err(e) => {
warn!("Failed to create new session after compact: {e}");
// Continue anyway - summary will still be prepended but agent
// will retain its full history, which is suboptimal but functional
// Only emit tail events if this is still the active turn. When
// the turn_id has advanced, this task is stale and all its late
// events (errors, warnings, Completed) must be suppressed.
if turn_id.load(Ordering::SeqCst) == my_turn_id {
if let Err(ref e) = result {
warn!("Compact prompt failed: {e}");
*pending_compact_summary.lock().await = None;
let _ = event_tx
.send(Event {
id: id_clone.clone(),
msg: EventMsg::Error(ErrorEvent {
message: format!("Compact failed: {e}"),
codex_error_info: None,
}),
})
.await;
} else {
match connection.create_session(&cwd, mcp_servers).await {
Ok(new_session_id) => {
debug!("Created new session after compact: {:?}", new_session_id);
*session_id_lock.write().await = new_session_id;
}
Err(e) => {
warn!("Failed to create new session after compact: {e}");
}
}
}

// Send ContextCompacted event to notify TUI, including the
// summary text so the TUI can reprint it under a new session header.
let compact_summary = pending_compact_summary.lock().await.clone();
emit_client_event(
&backend_event_tx,
transcript_recorder.as_ref(),
nori_protocol::ClientEvent::TurnLifecycle(
nori_protocol::TurnLifecycle::ContextCompacted {
summary: compact_summary.clone(),
},
),
)
.await;

// Send warning about long conversations
let _ = event_tx
.send(Event {
id: id_clone.clone(),
msg: EventMsg::Warning(WarningEvent {
message: "Heads up: Long conversations and multiple compactions can cause the model to be less accurate. Start a new conversation when possible to keep conversations small and targeted.".to_string(),
}),
})
let compact_summary = pending_compact_summary.lock().await.clone();
emit_client_event(
&backend_event_tx,
transcript_recorder.as_ref(),
nori_protocol::ClientEvent::TurnLifecycle(
nori_protocol::TurnLifecycle::ContextCompacted {
summary: compact_summary.clone(),
},
),
)
.await;
}

// Send TaskComplete event, unless the turn was interrupted.
if !turn_interrupted.load(Ordering::SeqCst) {
let _ = event_tx
.send(Event {
id: id_clone.clone(),
msg: EventMsg::Warning(WarningEvent {
message: "Heads up: Long conversations and multiple compactions can cause the model to be less accurate. Start a new conversation when possible to keep conversations small and targeted.".to_string(),
}),
})
.await;
}

emit_client_event(
&backend_event_tx,
transcript_recorder.as_ref(),
Expand All @@ -410,21 +430,22 @@ impl AcpBackend {
),
)
.await;
}

// Start idle timer if configured
if let Some(duration) = notify_after_idle.as_duration() {
let idle_secs = duration.as_secs();
let user_notifier_for_timer = Arc::clone(&user_notifier);
let idle_task = tokio::spawn(async move {
tokio::time::sleep(duration).await;
user_notifier_for_timer.notify(&codex_core::UserNotification::Idle {
session_id: session_id_for_timer,
idle_duration_secs: idle_secs,
// Start idle timer if configured
if let Some(duration) = notify_after_idle.as_duration() {
let idle_secs = duration.as_secs();
let user_notifier_for_timer = Arc::clone(&user_notifier);
let idle_task = tokio::spawn(async move {
tokio::time::sleep(duration).await;
user_notifier_for_timer.notify(&codex_core::UserNotification::Idle {
session_id: session_id_for_timer,
idle_duration_secs: idle_secs,
});
});
});
// Store the abort handle so the timer can be cancelled on new activity
*idle_timer_abort.lock().await = Some(idle_task.abort_handle());
*idle_timer_abort.lock().await = Some(idle_task.abort_handle());
}
} else if let Err(ref e) = result {
warn!("Compact prompt failed (stale turn, suppressed): {e}");
*pending_compact_summary.lock().await = None;
}
});

Expand Down
Loading
Loading