From 2b8befdb7d96f12d39ba8afa4ab9d474c63081bd Mon Sep 17 00:00:00 2001 From: Skylar Graika Date: Sat, 10 Jan 2026 14:43:43 -0800 Subject: [PATCH 1/5] fix: prevent repeating interrupted turns Record a model-visible marker in history when a turn is interrupted, and treat it as a session prefix so it doesn't change user-turn boundaries. Add a regression test to ensure follow-up turns don't repeat side effects from the aborted turn. --- codex-rs/core/src/codex.rs | 37 ++- codex-rs/core/src/context_manager/history.rs | 2 +- codex-rs/core/src/event_mapping.rs | 2 +- codex-rs/core/src/tasks/mod.rs | 17 ++ .../interrupt_does_not_repeat_aborted_turn.rs | 248 ++++++++++++++++++ codex-rs/core/tests/suite/mod.rs | 1 + 6 files changed, 292 insertions(+), 15 deletions(-) create mode 100644 codex-rs/core/tests/suite/interrupt_does_not_repeat_aborted_turn.rs diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 00ed4fba0d6..cccd9076461 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -3738,14 +3738,18 @@ mod tests { sess.abort_all_tasks(TurnAbortReason::Interrupted).await; - let evt = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()) - .await - .expect("timeout waiting for event") - .expect("event"); - match evt.msg { - EventMsg::TurnAborted(e) => assert_eq!(TurnAbortReason::Interrupted, e.reason), - other => panic!("unexpected event: {other:?}"), - } + let aborted = loop { + let evt = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()) + .await + .expect("timeout waiting for event") + .expect("event"); + match evt.msg { + EventMsg::RawResponseItem(_) => continue, + EventMsg::TurnAborted(e) => break e, + other => panic!("unexpected event: {other:?}"), + } + }; + assert_eq!(TurnAbortReason::Interrupted, aborted.reason); assert!(rx.try_recv().is_err()); } @@ -3767,11 +3771,18 @@ mod tests { sess.abort_all_tasks(TurnAbortReason::Interrupted).await; - let evt = rx.recv().await.expect("event"); - match evt.msg { - EventMsg::TurnAborted(e) => assert_eq!(TurnAbortReason::Interrupted, e.reason), - other => panic!("unexpected event: {other:?}"), - } + let aborted = loop { + let evt = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()) + .await + .expect("timeout waiting for event") + .expect("event"); + match evt.msg { + EventMsg::RawResponseItem(_) => continue, + EventMsg::TurnAborted(e) => break e, + other => panic!("unexpected event: {other:?}"), + } + }; + assert_eq!(TurnAbortReason::Interrupted, aborted.reason); assert!(rx.try_recv().is_err()); } diff --git a/codex-rs/core/src/context_manager/history.rs b/codex-rs/core/src/context_manager/history.rs index 3e0428c86d7..1bd2b19df9b 100644 --- a/codex-rs/core/src/context_manager/history.rs +++ b/codex-rs/core/src/context_manager/history.rs @@ -319,7 +319,7 @@ fn estimate_reasoning_length(encoded_len: usize) -> usize { fn is_session_prefix(text: &str) -> bool { let trimmed = text.trim_start(); let lowered = trimmed.to_ascii_lowercase(); - lowered.starts_with("") + lowered.starts_with("") || lowered.starts_with("") } pub(crate) fn is_user_turn_boundary(item: &ResponseItem) -> bool { diff --git a/codex-rs/core/src/event_mapping.rs b/codex-rs/core/src/event_mapping.rs index fe592236c11..4dd20a58361 100644 --- a/codex-rs/core/src/event_mapping.rs +++ b/codex-rs/core/src/event_mapping.rs @@ -24,7 +24,7 @@ use crate::user_shell_command::is_user_shell_command_text; fn is_session_prefix(text: &str) -> bool { let trimmed = text.trim_start(); let lowered = trimmed.to_ascii_lowercase(); - lowered.starts_with("") + lowered.starts_with("") || lowered.starts_with("") } fn parse_user_message(message: &[ContentItem]) -> Option { diff --git a/codex-rs/core/src/tasks/mod.rs b/codex-rs/core/src/tasks/mod.rs index d6754c23c8a..163b3d39c6f 100644 --- a/codex-rs/core/src/tasks/mod.rs +++ b/codex-rs/core/src/tasks/mod.rs @@ -27,6 +27,8 @@ use crate::protocol::TurnCompleteEvent; use crate::state::ActiveTurn; use crate::state::RunningTask; use crate::state::TaskKind; +use codex_protocol::models::ContentItem; +use codex_protocol::models::ResponseItem; use codex_protocol::user_input::UserInput; pub(crate) use compact::CompactTask; @@ -235,6 +237,21 @@ impl Session { .abort(session_ctx, Arc::clone(&task.turn_context)) .await; + if reason == TurnAbortReason::Interrupted { + let marker = ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: format!( + "\n {sub_id}\n interrupted\n The user interrupted the previous turn. Do not continue or repeat work from that turn unless the user explicitly asks. If any tools/commands were aborted, they may have partially executed; verify current state before retrying.\n" + ), + }], + }; + self.record_conversation_items(task.turn_context.as_ref(), &[marker]) + .await; + self.flush_rollout().await; + } + let event = EventMsg::TurnAborted(TurnAbortedEvent { reason }); self.send_event(task.turn_context.as_ref(), event).await; } diff --git a/codex-rs/core/tests/suite/interrupt_does_not_repeat_aborted_turn.rs b/codex-rs/core/tests/suite/interrupt_does_not_repeat_aborted_turn.rs new file mode 100644 index 00000000000..cf1ae68f864 --- /dev/null +++ b/codex-rs/core/tests/suite/interrupt_does_not_repeat_aborted_turn.rs @@ -0,0 +1,248 @@ +#![allow(clippy::expect_used, clippy::unwrap_used)] + +use std::path::PathBuf; +use std::sync::Arc; +use std::sync::Mutex; +use std::time::Duration; + +use codex_core::protocol::AskForApproval; +use codex_core::protocol::EventMsg; +use codex_core::protocol::Op; +use codex_core::protocol::SandboxPolicy; +use codex_protocol::config_types::ReasoningSummary; +use codex_protocol::user_input::UserInput; +use core_test_support::responses::ev_assistant_message; +use core_test_support::responses::ev_completed; +use core_test_support::responses::ev_function_call; +use core_test_support::responses::ev_response_created; +use core_test_support::responses::sse; +use core_test_support::responses::start_mock_server; +use core_test_support::test_codex::test_codex; +use core_test_support::wait_for_event; +use pretty_assertions::assert_eq; +use serde_json::Value; +use wiremock::Match; +use wiremock::Mock; +use wiremock::Request; +use wiremock::Respond; +use wiremock::ResponseTemplate; +use wiremock::matchers::method; +use wiremock::matchers::path_regex; + +#[derive(Clone, Default)] +struct RequestsCapture(Arc>>); + +impl RequestsCapture { + fn requests(&self) -> Vec { + self.0.lock().unwrap().clone() + } +} + +impl Match for RequestsCapture { + fn matches(&self, request: &Request) -> bool { + self.0.lock().unwrap().push(request.clone()); + true + } +} + +#[derive(Debug, Default)] +struct ResponderState { + call_count: usize, +} + +#[derive(Clone)] +struct RepeatUnlessTurnAbortedResponder { + state: Arc>, + side_effect_path: PathBuf, +} + +impl RepeatUnlessTurnAbortedResponder { + fn request_contains_turn_aborted_marker(request: &Request) -> bool { + let Ok(body) = request.body_json::() else { + return false; + }; + let Some(input) = body.get("input").and_then(Value::as_array) else { + return false; + }; + input.iter().any(|item| { + item.get("type").and_then(Value::as_str) == Some("message") + && item.get("role").and_then(Value::as_str) == Some("user") + && item + .get("content") + .and_then(Value::as_array) + .into_iter() + .flatten() + .filter(|span| span.get("type").and_then(Value::as_str) == Some("input_text")) + .any(|span| { + span.get("text") + .and_then(Value::as_str) + .is_some_and(|text| text.contains("")) + }) + }) + } + + fn sse_response(body: String) -> ResponseTemplate { + ResponseTemplate::new(200) + .insert_header("content-type", "text/event-stream") + .set_body_string(body) + } +} + +impl Respond for RepeatUnlessTurnAbortedResponder { + fn respond(&self, request: &Request) -> ResponseTemplate { + let mut state = self.state.lock().unwrap(); + let call_num = state.call_count; + state.call_count += 1; + + // First request: return a long-running tool call that performs an immediate side-effect. + if call_num == 0 { + let script = format!( + "echo run >> \"{}\"; sleep 60", + self.side_effect_path.display() + ); + let args = serde_json::json!({ + "command": script, + "timeout_ms": 60_000 + }) + .to_string(); + + return Self::sse_response(sse(vec![ + ev_response_created("resp-1"), + ev_function_call("call-1", "shell_command", &args), + ev_completed("resp-1"), + ])); + } + + // Follow-up after the repeated tool call. + if call_num == 2 { + return Self::sse_response(sse(vec![ + ev_response_created("resp-3"), + ev_assistant_message("msg-3", "ok"), + ev_completed("resp-3"), + ])); + } + + // Second request: if Codex includes a turn-aborted marker in the prompt, + // behave and do not repeat the previous tool call. + if Self::request_contains_turn_aborted_marker(request) { + return Self::sse_response(sse(vec![ + ev_response_created("resp-2"), + ev_assistant_message("msg-2", "moving on"), + ev_completed("resp-2"), + ])); + } + + // Otherwise, simulate the model mistakenly repeating the earlier action. + let command = format!( + "bash -lc 'echo run >> \"{}\"'", + self.side_effect_path.display() + ); + let args = serde_json::json!({ "command": command }).to_string(); + Self::sse_response(sse(vec![ + ev_response_created("resp-2"), + ev_function_call("call-2", "shell_command", &args), + ev_completed("resp-2"), + ])) + } +} + +async fn wait_for_side_effect(path: &std::path::Path) { + let check = async { + loop { + if let Ok(contents) = std::fs::read_to_string(path) + && contents.lines().any(|line| line == "run") + { + return; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + }; + + tokio::time::timeout(Duration::from_secs(5), check) + .await + .expect("side effect should be written before interrupt"); +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn interrupt_should_prevent_repeating_aborted_work() { + let server = start_mock_server().await; + + let fixture = test_codex() + .with_model("gpt-5.1") + .build(&server) + .await + .unwrap(); + + let side_effect_path = fixture.workspace_path("side_effect.txt"); + let capture = RequestsCapture::default(); + let responder = RepeatUnlessTurnAbortedResponder { + state: Arc::new(Mutex::new(ResponderState::default())), + side_effect_path: side_effect_path.clone(), + }; + + Mock::given(method("POST")) + .and(path_regex(".*/responses$")) + .and(capture.clone()) + .respond_with(responder) + .up_to_n_times(10) + .mount(&server) + .await; + + let session_model = fixture.session_configured.model.clone(); + let cwd = fixture.cwd_path().to_path_buf(); + let codex = fixture.codex.clone(); + + codex + .submit(Op::UserTurn { + items: vec![UserInput::Text { + text: "start first task".into(), + }], + final_output_json_schema: None, + cwd: cwd.clone(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::DangerFullAccess, + model: session_model.clone(), + effort: None, + summary: ReasoningSummary::Auto, + }) + .await + .unwrap(); + + wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecCommandBegin(_))).await; + wait_for_side_effect(&side_effect_path).await; + codex.submit(Op::Interrupt).await.unwrap(); + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnAborted(_))).await; + + codex + .submit(Op::UserTurn { + items: vec![UserInput::Text { + text: "do something else".into(), + }], + final_output_json_schema: None, + cwd, + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::DangerFullAccess, + model: session_model, + effort: None, + summary: ReasoningSummary::Auto, + }) + .await + .unwrap(); + + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + let contents = std::fs::read_to_string(&side_effect_path).unwrap_or_default(); + let lines: Vec<&str> = contents.lines().collect(); + assert_eq!( + lines, + vec!["run"], + "Codex should not repeat work from an aborted turn" + ); + + // Sanity check request count so we do not accidentally introduce infinite follow-ups. + assert_eq!( + capture.requests().len(), + 2, + "expected one aborted request + one follow-up request" + ); +} diff --git a/codex-rs/core/tests/suite/mod.rs b/codex-rs/core/tests/suite/mod.rs index 44093778d38..4ffd4be6850 100644 --- a/codex-rs/core/tests/suite/mod.rs +++ b/codex-rs/core/tests/suite/mod.rs @@ -31,6 +31,7 @@ mod exec_policy; mod fork_thread; mod grep_files; mod hierarchical_agents; +mod interrupt_does_not_repeat_aborted_turn; mod items; mod json_result; mod list_dir; From 03761d1be61fc38b2f367ee503758821efb58031 Mon Sep 17 00:00:00 2001 From: Skylar Graika Date: Mon, 12 Jan 2026 11:37:44 -0800 Subject: [PATCH 2/5] core: refine interrupted-turn marker --- codex-rs/core/src/codex.rs | 110 ++++---- codex-rs/core/src/context_manager/history.rs | 7 +- codex-rs/core/src/event_mapping.rs | 7 +- codex-rs/core/src/lib.rs | 1 + codex-rs/core/src/session_prefix.rs | 15 ++ codex-rs/core/src/tasks/mod.rs | 11 +- codex-rs/core/tests/suite/abort_tasks.rs | 74 ++++++ .../interrupt_does_not_repeat_aborted_turn.rs | 248 ------------------ codex-rs/core/tests/suite/mod.rs | 1 - 9 files changed, 162 insertions(+), 312 deletions(-) create mode 100644 codex-rs/core/src/session_prefix.rs delete mode 100644 codex-rs/core/tests/suite/interrupt_does_not_repeat_aborted_turn.rs diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 7f16c02dcd3..8e5a13c7619 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -3736,18 +3736,14 @@ mod tests { sess.abort_all_tasks(TurnAbortReason::Interrupted).await; - let aborted = loop { - let evt = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()) - .await - .expect("timeout waiting for event") - .expect("event"); - match evt.msg { - EventMsg::RawResponseItem(_) => continue, - EventMsg::TurnAborted(e) => break e, - other => panic!("unexpected event: {other:?}"), - } - }; - assert_eq!(TurnAbortReason::Interrupted, aborted.reason); + let evt = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()) + .await + .expect("timeout waiting for event") + .expect("event"); + match evt.msg { + EventMsg::TurnAborted(e) => assert_eq!(TurnAbortReason::Interrupted, e.reason), + other => panic!("unexpected event: {other:?}"), + } assert!(rx.try_recv().is_err()); } @@ -3769,18 +3765,14 @@ mod tests { sess.abort_all_tasks(TurnAbortReason::Interrupted).await; - let aborted = loop { - let evt = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()) - .await - .expect("timeout waiting for event") - .expect("event"); - match evt.msg { - EventMsg::RawResponseItem(_) => continue, - EventMsg::TurnAborted(e) => break e, - other => panic!("unexpected event: {other:?}"), - } - }; - assert_eq!(TurnAbortReason::Interrupted, aborted.reason); + let evt = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()) + .await + .expect("timeout waiting for event") + .expect("event"); + match evt.msg { + EventMsg::TurnAborted(e) => assert_eq!(TurnAbortReason::Interrupted, e.reason), + other => panic!("unexpected event: {other:?}"), + } assert!(rx.try_recv().is_err()); } @@ -3795,42 +3787,62 @@ mod tests { sess.abort_all_tasks(TurnAbortReason::Interrupted).await; - // Drain events until we observe ExitedReviewMode; earlier - // RawResponseItem entries (e.g., environment context) may arrive first. - loop { - let evt = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv()) + let mut exited_review_mode_idx = None; + let mut turn_aborted_idx = None; + let mut idx = 0usize; + let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(3); + while tokio::time::Instant::now() < deadline { + let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); + let evt = tokio::time::timeout(remaining, rx.recv()) .await - .expect("timeout waiting for first event") - .expect("first event"); + .expect("timeout waiting for event") + .expect("event"); + let event_idx = idx; + idx = idx.saturating_add(1); match evt.msg { EventMsg::ExitedReviewMode(ev) => { assert!(ev.review_output.is_none()); - break; + exited_review_mode_idx = Some(event_idx); } - // Ignore any non-critical events before exit. - _ => continue, - } - } - loop { - let evt = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()) - .await - .expect("timeout waiting for next event") - .expect("event"); - match evt.msg { - EventMsg::RawResponseItem(_) => continue, - EventMsg::ItemStarted(_) | EventMsg::ItemCompleted(_) => continue, - EventMsg::AgentMessage(_) => continue, - EventMsg::TurnAborted(e) => { - assert_eq!(TurnAbortReason::Interrupted, e.reason); + EventMsg::TurnAborted(ev) => { + assert_eq!(TurnAbortReason::Interrupted, ev.reason); + turn_aborted_idx = Some(event_idx); break; } - other => panic!("unexpected second event: {other:?}"), + _ => {} } } + assert!( + exited_review_mode_idx.is_some(), + "expected ExitedReviewMode after abort" + ); + assert!( + turn_aborted_idx.is_some(), + "expected TurnAborted after abort" + ); + assert!( + exited_review_mode_idx.unwrap() < turn_aborted_idx.unwrap(), + "expected ExitedReviewMode before TurnAborted" + ); - // TODO(jif) investigate what is this? let history = sess.clone_history().await; - let _ = history.raw_items(); + assert!( + history.raw_items().iter().any(|item| { + let ResponseItem::Message { role, content, .. } = item else { + return false; + }; + if role != "user" { + return false; + } + content.iter().any(|content_item| { + let ContentItem::InputText { text } = content_item else { + return false; + }; + text.contains(crate::session_prefix::TURN_ABORTED_OPEN_TAG) + }) + }), + "expected a model-visible turn aborted marker in history after interrupt" + ); } #[tokio::test] diff --git a/codex-rs/core/src/context_manager/history.rs b/codex-rs/core/src/context_manager/history.rs index 1bd2b19df9b..b1614e65d5d 100644 --- a/codex-rs/core/src/context_manager/history.rs +++ b/codex-rs/core/src/context_manager/history.rs @@ -1,5 +1,6 @@ use crate::codex::TurnContext; use crate::context_manager::normalize; +use crate::session_prefix::is_session_prefix; use crate::truncate::TruncationPolicy; use crate::truncate::approx_token_count; use crate::truncate::approx_tokens_from_byte_count; @@ -316,12 +317,6 @@ fn estimate_reasoning_length(encoded_len: usize) -> usize { .saturating_sub(650) } -fn is_session_prefix(text: &str) -> bool { - let trimmed = text.trim_start(); - let lowered = trimmed.to_ascii_lowercase(); - lowered.starts_with("") || lowered.starts_with("") -} - pub(crate) fn is_user_turn_boundary(item: &ResponseItem) -> bool { let ResponseItem::Message { role, content, .. } = item else { return false; diff --git a/codex-rs/core/src/event_mapping.rs b/codex-rs/core/src/event_mapping.rs index 4dd20a58361..362260d76cd 100644 --- a/codex-rs/core/src/event_mapping.rs +++ b/codex-rs/core/src/event_mapping.rs @@ -17,16 +17,11 @@ use codex_protocol::user_input::UserInput; use tracing::warn; use uuid::Uuid; +use crate::session_prefix::is_session_prefix; use crate::user_instructions::SkillInstructions; use crate::user_instructions::UserInstructions; use crate::user_shell_command::is_user_shell_command_text; -fn is_session_prefix(text: &str) -> bool { - let trimmed = text.trim_start(); - let lowered = trimmed.to_ascii_lowercase(); - lowered.starts_with("") || lowered.starts_with("") -} - fn parse_user_message(message: &[ContentItem]) -> Option { if UserInstructions::is_user_instructions(message) || SkillInstructions::is_skill_instructions(message) diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 1fb25ebc138..ef42e25dcd3 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -45,6 +45,7 @@ pub mod parse_command; pub mod path_utils; pub mod powershell; pub mod sandboxing; +mod session_prefix; mod stream_events_utils; mod text_encoding; pub mod token_data; diff --git a/codex-rs/core/src/session_prefix.rs b/codex-rs/core/src/session_prefix.rs new file mode 100644 index 00000000000..99283082b6f --- /dev/null +++ b/codex-rs/core/src/session_prefix.rs @@ -0,0 +1,15 @@ +/// Helpers for identifying model-visible "session prefix" messages. +/// +/// A session prefix is a user-role message that carries configuration or state needed by +/// follow-up turns (e.g. ``, ``). These items are persisted in +/// history so the model can see them, but they are not user intent and must not create user-turn +/// boundaries. +pub(crate) const ENVIRONMENT_CONTEXT_OPEN_TAG: &str = ""; +pub(crate) const TURN_ABORTED_OPEN_TAG: &str = ""; + +/// Returns true if `text` starts with a session prefix marker (case-insensitive). +pub(crate) fn is_session_prefix(text: &str) -> bool { + let trimmed = text.trim_start(); + let lowered = trimmed.to_ascii_lowercase(); + lowered.starts_with(ENVIRONMENT_CONTEXT_OPEN_TAG) || lowered.starts_with(TURN_ABORTED_OPEN_TAG) +} diff --git a/codex-rs/core/src/tasks/mod.rs b/codex-rs/core/src/tasks/mod.rs index 163b3d39c6f..e55cf44c269 100644 --- a/codex-rs/core/src/tasks/mod.rs +++ b/codex-rs/core/src/tasks/mod.rs @@ -24,11 +24,13 @@ use crate::protocol::EventMsg; use crate::protocol::TurnAbortReason; use crate::protocol::TurnAbortedEvent; use crate::protocol::TurnCompleteEvent; +use crate::session_prefix::TURN_ABORTED_OPEN_TAG; use crate::state::ActiveTurn; use crate::state::RunningTask; use crate::state::TaskKind; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseItem; +use codex_protocol::protocol::RolloutItem; use codex_protocol::user_input::UserInput; pub(crate) use compact::CompactTask; @@ -39,6 +41,7 @@ pub(crate) use undo::UndoTask; pub(crate) use user_shell::UserShellCommandTask; const GRACEFULL_INTERRUPTION_TIMEOUT_MS: u64 = 100; +const TURN_ABORTED_INTERRUPTED_GUIDANCE: &str = "The user interrupted the previous turn. Do not continue or repeat work from that turn unless the user explicitly asks. If any tools/commands were aborted, they may have partially executed; verify current state before retrying."; /// Thin wrapper that exposes the parts of [`Session`] task runners need. #[derive(Clone)] @@ -243,12 +246,16 @@ impl Session { role: "user".to_string(), content: vec![ContentItem::InputText { text: format!( - "\n {sub_id}\n interrupted\n The user interrupted the previous turn. Do not continue or repeat work from that turn unless the user explicitly asks. If any tools/commands were aborted, they may have partially executed; verify current state before retrying.\n" + "{TURN_ABORTED_OPEN_TAG}\n {sub_id}\n interrupted\n {TURN_ABORTED_INTERRUPTED_GUIDANCE}\n" ), }], }; - self.record_conversation_items(task.turn_context.as_ref(), &[marker]) + self.record_into_history(std::slice::from_ref(&marker), task.turn_context.as_ref()) .await; + self.persist_rollout_items(&[RolloutItem::ResponseItem(marker)]) + .await; + // Ensure the marker is durably visible before emitting TurnAborted: some clients + // synchronously re-read the rollout on receipt of the abort event. self.flush_rollout().await; } diff --git a/codex-rs/core/tests/suite/abort_tasks.rs b/codex-rs/core/tests/suite/abort_tasks.rs index 094c10c7786..63bdbb1569d 100644 --- a/codex-rs/core/tests/suite/abort_tasks.rs +++ b/codex-rs/core/tests/suite/abort_tasks.rs @@ -160,3 +160,77 @@ async fn interrupt_tool_records_history_entries() { "expected at least one tenth of a second of elapsed time, got {secs}" ); } + +/// After an interrupt we persist a model-visible `` marker in the conversation +/// history. This test asserts that the marker is included in the next `/responses` request. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn interrupt_persists_turn_aborted_marker_in_next_request() { + let command = "sleep 60"; + let call_id = "call-turn-aborted-marker"; + + let args = json!({ + "command": command, + "timeout_ms": 60_000 + }) + .to_string(); + let first_body = sse(vec![ + ev_response_created("resp-marker"), + ev_function_call(call_id, "shell_command", &args), + ev_completed("resp-marker"), + ]); + let follow_up_body = sse(vec![ + ev_response_created("resp-followup"), + ev_completed("resp-followup"), + ]); + + let server = start_mock_server().await; + let response_mock = mount_sse_sequence(&server, vec![first_body, follow_up_body]).await; + + let fixture = test_codex() + .with_model("gpt-5.1") + .build(&server) + .await + .unwrap(); + let codex = Arc::clone(&fixture.codex); + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "start interrupt marker".into(), + }], + final_output_json_schema: None, + }) + .await + .unwrap(); + + wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecCommandBegin(_))).await; + + tokio::time::sleep(Duration::from_secs_f32(0.1)).await; + codex.submit(Op::Interrupt).await.unwrap(); + + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnAborted(_))).await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "follow up".into(), + }], + final_output_json_schema: None, + }) + .await + .unwrap(); + + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + let requests = response_mock.requests(); + assert_eq!(requests.len(), 2, "expected two calls to the responses API"); + + let follow_up_request = &requests[1]; + let user_texts = follow_up_request.message_input_texts("user"); + assert!( + user_texts + .iter() + .any(|text| text.contains("")), + "expected marker in follow-up request" + ); +} diff --git a/codex-rs/core/tests/suite/interrupt_does_not_repeat_aborted_turn.rs b/codex-rs/core/tests/suite/interrupt_does_not_repeat_aborted_turn.rs deleted file mode 100644 index cf1ae68f864..00000000000 --- a/codex-rs/core/tests/suite/interrupt_does_not_repeat_aborted_turn.rs +++ /dev/null @@ -1,248 +0,0 @@ -#![allow(clippy::expect_used, clippy::unwrap_used)] - -use std::path::PathBuf; -use std::sync::Arc; -use std::sync::Mutex; -use std::time::Duration; - -use codex_core::protocol::AskForApproval; -use codex_core::protocol::EventMsg; -use codex_core::protocol::Op; -use codex_core::protocol::SandboxPolicy; -use codex_protocol::config_types::ReasoningSummary; -use codex_protocol::user_input::UserInput; -use core_test_support::responses::ev_assistant_message; -use core_test_support::responses::ev_completed; -use core_test_support::responses::ev_function_call; -use core_test_support::responses::ev_response_created; -use core_test_support::responses::sse; -use core_test_support::responses::start_mock_server; -use core_test_support::test_codex::test_codex; -use core_test_support::wait_for_event; -use pretty_assertions::assert_eq; -use serde_json::Value; -use wiremock::Match; -use wiremock::Mock; -use wiremock::Request; -use wiremock::Respond; -use wiremock::ResponseTemplate; -use wiremock::matchers::method; -use wiremock::matchers::path_regex; - -#[derive(Clone, Default)] -struct RequestsCapture(Arc>>); - -impl RequestsCapture { - fn requests(&self) -> Vec { - self.0.lock().unwrap().clone() - } -} - -impl Match for RequestsCapture { - fn matches(&self, request: &Request) -> bool { - self.0.lock().unwrap().push(request.clone()); - true - } -} - -#[derive(Debug, Default)] -struct ResponderState { - call_count: usize, -} - -#[derive(Clone)] -struct RepeatUnlessTurnAbortedResponder { - state: Arc>, - side_effect_path: PathBuf, -} - -impl RepeatUnlessTurnAbortedResponder { - fn request_contains_turn_aborted_marker(request: &Request) -> bool { - let Ok(body) = request.body_json::() else { - return false; - }; - let Some(input) = body.get("input").and_then(Value::as_array) else { - return false; - }; - input.iter().any(|item| { - item.get("type").and_then(Value::as_str) == Some("message") - && item.get("role").and_then(Value::as_str) == Some("user") - && item - .get("content") - .and_then(Value::as_array) - .into_iter() - .flatten() - .filter(|span| span.get("type").and_then(Value::as_str) == Some("input_text")) - .any(|span| { - span.get("text") - .and_then(Value::as_str) - .is_some_and(|text| text.contains("")) - }) - }) - } - - fn sse_response(body: String) -> ResponseTemplate { - ResponseTemplate::new(200) - .insert_header("content-type", "text/event-stream") - .set_body_string(body) - } -} - -impl Respond for RepeatUnlessTurnAbortedResponder { - fn respond(&self, request: &Request) -> ResponseTemplate { - let mut state = self.state.lock().unwrap(); - let call_num = state.call_count; - state.call_count += 1; - - // First request: return a long-running tool call that performs an immediate side-effect. - if call_num == 0 { - let script = format!( - "echo run >> \"{}\"; sleep 60", - self.side_effect_path.display() - ); - let args = serde_json::json!({ - "command": script, - "timeout_ms": 60_000 - }) - .to_string(); - - return Self::sse_response(sse(vec![ - ev_response_created("resp-1"), - ev_function_call("call-1", "shell_command", &args), - ev_completed("resp-1"), - ])); - } - - // Follow-up after the repeated tool call. - if call_num == 2 { - return Self::sse_response(sse(vec![ - ev_response_created("resp-3"), - ev_assistant_message("msg-3", "ok"), - ev_completed("resp-3"), - ])); - } - - // Second request: if Codex includes a turn-aborted marker in the prompt, - // behave and do not repeat the previous tool call. - if Self::request_contains_turn_aborted_marker(request) { - return Self::sse_response(sse(vec![ - ev_response_created("resp-2"), - ev_assistant_message("msg-2", "moving on"), - ev_completed("resp-2"), - ])); - } - - // Otherwise, simulate the model mistakenly repeating the earlier action. - let command = format!( - "bash -lc 'echo run >> \"{}\"'", - self.side_effect_path.display() - ); - let args = serde_json::json!({ "command": command }).to_string(); - Self::sse_response(sse(vec![ - ev_response_created("resp-2"), - ev_function_call("call-2", "shell_command", &args), - ev_completed("resp-2"), - ])) - } -} - -async fn wait_for_side_effect(path: &std::path::Path) { - let check = async { - loop { - if let Ok(contents) = std::fs::read_to_string(path) - && contents.lines().any(|line| line == "run") - { - return; - } - tokio::time::sleep(Duration::from_millis(10)).await; - } - }; - - tokio::time::timeout(Duration::from_secs(5), check) - .await - .expect("side effect should be written before interrupt"); -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn interrupt_should_prevent_repeating_aborted_work() { - let server = start_mock_server().await; - - let fixture = test_codex() - .with_model("gpt-5.1") - .build(&server) - .await - .unwrap(); - - let side_effect_path = fixture.workspace_path("side_effect.txt"); - let capture = RequestsCapture::default(); - let responder = RepeatUnlessTurnAbortedResponder { - state: Arc::new(Mutex::new(ResponderState::default())), - side_effect_path: side_effect_path.clone(), - }; - - Mock::given(method("POST")) - .and(path_regex(".*/responses$")) - .and(capture.clone()) - .respond_with(responder) - .up_to_n_times(10) - .mount(&server) - .await; - - let session_model = fixture.session_configured.model.clone(); - let cwd = fixture.cwd_path().to_path_buf(); - let codex = fixture.codex.clone(); - - codex - .submit(Op::UserTurn { - items: vec![UserInput::Text { - text: "start first task".into(), - }], - final_output_json_schema: None, - cwd: cwd.clone(), - approval_policy: AskForApproval::Never, - sandbox_policy: SandboxPolicy::DangerFullAccess, - model: session_model.clone(), - effort: None, - summary: ReasoningSummary::Auto, - }) - .await - .unwrap(); - - wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecCommandBegin(_))).await; - wait_for_side_effect(&side_effect_path).await; - codex.submit(Op::Interrupt).await.unwrap(); - wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnAborted(_))).await; - - codex - .submit(Op::UserTurn { - items: vec![UserInput::Text { - text: "do something else".into(), - }], - final_output_json_schema: None, - cwd, - approval_policy: AskForApproval::Never, - sandbox_policy: SandboxPolicy::DangerFullAccess, - model: session_model, - effort: None, - summary: ReasoningSummary::Auto, - }) - .await - .unwrap(); - - wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; - - let contents = std::fs::read_to_string(&side_effect_path).unwrap_or_default(); - let lines: Vec<&str> = contents.lines().collect(); - assert_eq!( - lines, - vec!["run"], - "Codex should not repeat work from an aborted turn" - ); - - // Sanity check request count so we do not accidentally introduce infinite follow-ups. - assert_eq!( - capture.requests().len(), - 2, - "expected one aborted request + one follow-up request" - ); -} diff --git a/codex-rs/core/tests/suite/mod.rs b/codex-rs/core/tests/suite/mod.rs index 4ffd4be6850..44093778d38 100644 --- a/codex-rs/core/tests/suite/mod.rs +++ b/codex-rs/core/tests/suite/mod.rs @@ -31,7 +31,6 @@ mod exec_policy; mod fork_thread; mod grep_files; mod hierarchical_agents; -mod interrupt_does_not_repeat_aborted_turn; mod items; mod json_result; mod list_dir; From a1f39d366a752b82eb05d3047fcf082af27803d8 Mon Sep 17 00:00:00 2001 From: Skylar Graika Date: Mon, 12 Jan 2026 13:35:56 -0800 Subject: [PATCH 3/5] core: document abort test expectations --- codex-rs/core/src/codex.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 751778a7ae2..8984e59487f 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -3872,6 +3872,8 @@ mod tests { sess.abort_all_tasks(TurnAbortReason::Interrupted).await; + // Interrupts persist a model-visible `` marker into history, but there is no + // separate client-visible event for that marker (only `EventMsg::TurnAborted`). let evt = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()) .await .expect("timeout waiting for event") @@ -3880,6 +3882,7 @@ mod tests { EventMsg::TurnAborted(e) => assert_eq!(TurnAbortReason::Interrupted, e.reason), other => panic!("unexpected event: {other:?}"), } + // No extra events should be emitted after an abort. assert!(rx.try_recv().is_err()); } @@ -3901,6 +3904,8 @@ mod tests { sess.abort_all_tasks(TurnAbortReason::Interrupted).await; + // Even if tasks handle cancellation gracefully, interrupts still result in `TurnAborted` + // being the only client-visible signal. let evt = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()) .await .expect("timeout waiting for event") @@ -3909,6 +3914,7 @@ mod tests { EventMsg::TurnAborted(e) => assert_eq!(TurnAbortReason::Interrupted, e.reason), other => panic!("unexpected event: {other:?}"), } + // No extra events should be emitted after an abort. assert!(rx.try_recv().is_err()); } @@ -3923,6 +3929,9 @@ mod tests { sess.abort_all_tasks(TurnAbortReason::Interrupted).await; + // Aborting a review task should exit review mode before surfacing the abort to the client. + // We scan for these events (rather than relying on fixed ordering) since unrelated events + // may interleave. let mut exited_review_mode_idx = None; let mut turn_aborted_idx = None; let mut idx = 0usize; @@ -3962,6 +3971,8 @@ mod tests { ); let history = sess.clone_history().await; + // The `` marker is silent in the event stream, so verify it is still + // recorded in history for the model. assert!( history.raw_items().iter().any(|item| { let ResponseItem::Message { role, content, .. } = item else { From e8fb3f2760e5c572d86231a1d2e0e73fc17fce6f Mon Sep 17 00:00:00 2001 From: Skylar Graika Date: Fri, 16 Jan 2026 15:22:45 -0800 Subject: [PATCH 4/5] core: include text_elements in abort task tests --- codex-rs/core/tests/suite/abort_tasks.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/codex-rs/core/tests/suite/abort_tasks.rs b/codex-rs/core/tests/suite/abort_tasks.rs index af95b361747..53b5a26b022 100644 --- a/codex-rs/core/tests/suite/abort_tasks.rs +++ b/codex-rs/core/tests/suite/abort_tasks.rs @@ -200,6 +200,7 @@ async fn interrupt_persists_turn_aborted_marker_in_next_request() { .submit(Op::UserInput { items: vec![UserInput::Text { text: "start interrupt marker".into(), + text_elements: Vec::new(), }], final_output_json_schema: None, }) @@ -217,6 +218,7 @@ async fn interrupt_persists_turn_aborted_marker_in_next_request() { .submit(Op::UserInput { items: vec![UserInput::Text { text: "follow up".into(), + text_elements: Vec::new(), }], final_output_json_schema: None, }) From 377583c5c7496ff236672bcc11a867fde22efdb3 Mon Sep 17 00:00:00 2001 From: Skylar Graika Date: Sat, 17 Jan 2026 07:38:27 -0800 Subject: [PATCH 5/5] core: keep during compaction --- codex-rs/core/src/compact.rs | 61 +++++++++++++++++++++++++++++++++++- 1 file changed, 60 insertions(+), 1 deletion(-) diff --git a/codex-rs/core/src/compact.rs b/codex-rs/core/src/compact.rs index 4dc56f10d25..10947aa435c 100644 --- a/codex-rs/core/src/compact.rs +++ b/codex-rs/core/src/compact.rs @@ -15,6 +15,7 @@ use crate::protocol::EventMsg; use crate::protocol::TurnContextItem; use crate::protocol::TurnStartedEvent; use crate::protocol::WarningEvent; +use crate::session_prefix::TURN_ABORTED_OPEN_TAG; use crate::truncate::TruncationPolicy; use crate::truncate::approx_token_count; use crate::truncate::truncate_text; @@ -223,11 +224,31 @@ pub(crate) fn collect_user_messages(items: &[ResponseItem]) -> Vec { Some(user.message()) } } - _ => None, + _ => collect_turn_aborted_marker(item), }) .collect() } +fn collect_turn_aborted_marker(item: &ResponseItem) -> Option { + let ResponseItem::Message { role, content, .. } = item else { + return None; + }; + if role != "user" { + return None; + } + + let text = content_items_to_text(content)?; + if text + .trim_start() + .to_ascii_lowercase() + .starts_with(TURN_ABORTED_OPEN_TAG) + { + Some(text) + } else { + None + } +} + pub(crate) fn is_summary_message(message: &str) -> bool { message.starts_with(format!("{SUMMARY_PREFIX}\n").as_str()) } @@ -334,6 +355,7 @@ async fn drain_to_completed( mod tests { use super::*; + use crate::session_prefix::TURN_ABORTED_OPEN_TAG; use pretty_assertions::assert_eq; #[test] @@ -486,4 +508,41 @@ mod tests { }; assert_eq!(summary, summary_text); } + + #[test] + fn build_compacted_history_preserves_turn_aborted_markers() { + let marker = format!( + "{TURN_ABORTED_OPEN_TAG}\n turn-1\n interrupted\n" + ); + let items = vec![ + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: marker.clone(), + }], + }, + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: "real user message".to_string(), + }], + }, + ]; + + let user_messages = collect_user_messages(&items); + let history = build_compacted_history(Vec::new(), &user_messages, "SUMMARY"); + + let found_marker = history.iter().any(|item| match item { + ResponseItem::Message { role, content, .. } if role == "user" => { + content_items_to_text(content).is_some_and(|text| text == marker) + } + _ => false, + }); + assert!( + found_marker, + "expected compacted history to retain marker" + ); + } }