diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 5ca971c121d..e6201d530c5 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -4218,6 +4218,8 @@ mod tests { sess.abort_all_tasks(TurnAbortReason::Interrupted).await; + // Interrupts persist a model-visible `` marker into history, but there is no + // separate client-visible event for that marker (only `EventMsg::TurnAborted`). let evt = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()) .await .expect("timeout waiting for event") @@ -4226,6 +4228,7 @@ mod tests { EventMsg::TurnAborted(e) => assert_eq!(TurnAbortReason::Interrupted, e.reason), other => panic!("unexpected event: {other:?}"), } + // No extra events should be emitted after an abort. assert!(rx.try_recv().is_err()); } @@ -4248,11 +4251,17 @@ mod tests { sess.abort_all_tasks(TurnAbortReason::Interrupted).await; - let evt = rx.recv().await.expect("event"); + // Even if tasks handle cancellation gracefully, interrupts still result in `TurnAborted` + // being the only client-visible signal. + let evt = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()) + .await + .expect("timeout waiting for event") + .expect("event"); match evt.msg { EventMsg::TurnAborted(e) => assert_eq!(TurnAbortReason::Interrupted, e.reason), other => panic!("unexpected event: {other:?}"), } + // No extra events should be emitted after an abort. assert!(rx.try_recv().is_err()); } @@ -4268,42 +4277,67 @@ mod tests { sess.abort_all_tasks(TurnAbortReason::Interrupted).await; - // Drain events until we observe ExitedReviewMode; earlier - // RawResponseItem entries (e.g., environment context) may arrive first. - loop { - let evt = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv()) + // Aborting a review task should exit review mode before surfacing the abort to the client. + // We scan for these events (rather than relying on fixed ordering) since unrelated events + // may interleave. + let mut exited_review_mode_idx = None; + let mut turn_aborted_idx = None; + let mut idx = 0usize; + let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(3); + while tokio::time::Instant::now() < deadline { + let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); + let evt = tokio::time::timeout(remaining, rx.recv()) .await - .expect("timeout waiting for first event") - .expect("first event"); + .expect("timeout waiting for event") + .expect("event"); + let event_idx = idx; + idx = idx.saturating_add(1); match evt.msg { EventMsg::ExitedReviewMode(ev) => { assert!(ev.review_output.is_none()); - break; + exited_review_mode_idx = Some(event_idx); } - // Ignore any non-critical events before exit. - _ => continue, - } - } - loop { - let evt = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv()) - .await - .expect("timeout waiting for next event") - .expect("event"); - match evt.msg { - EventMsg::RawResponseItem(_) => continue, - EventMsg::ItemStarted(_) | EventMsg::ItemCompleted(_) => continue, - EventMsg::AgentMessage(_) => continue, - EventMsg::TurnAborted(e) => { - assert_eq!(TurnAbortReason::Interrupted, e.reason); + EventMsg::TurnAborted(ev) => { + assert_eq!(TurnAbortReason::Interrupted, ev.reason); + turn_aborted_idx = Some(event_idx); break; } - other => panic!("unexpected second event: {other:?}"), + _ => {} } } + assert!( + exited_review_mode_idx.is_some(), + "expected ExitedReviewMode after abort" + ); + assert!( + turn_aborted_idx.is_some(), + "expected TurnAborted after abort" + ); + assert!( + exited_review_mode_idx.unwrap() < turn_aborted_idx.unwrap(), + "expected ExitedReviewMode before TurnAborted" + ); - // TODO(jif) investigate what is this? let history = sess.clone_history().await; - let _ = history.raw_items(); + // The `` marker is silent in the event stream, so verify it is still + // recorded in history for the model. + assert!( + history.raw_items().iter().any(|item| { + let ResponseItem::Message { role, content, .. } = item else { + return false; + }; + if role != "user" { + return false; + } + content.iter().any(|content_item| { + let ContentItem::InputText { text } = content_item else { + return false; + }; + text.contains(crate::session_prefix::TURN_ABORTED_OPEN_TAG) + }) + }), + "expected a model-visible turn aborted marker in history after interrupt" + ); } #[tokio::test] diff --git a/codex-rs/core/src/compact.rs b/codex-rs/core/src/compact.rs index 40e6284debb..0e519256d4e 100644 --- a/codex-rs/core/src/compact.rs +++ b/codex-rs/core/src/compact.rs @@ -15,6 +15,7 @@ use crate::protocol::EventMsg; use crate::protocol::TurnContextItem; use crate::protocol::TurnStartedEvent; use crate::protocol::WarningEvent; +use crate::session_prefix::TURN_ABORTED_OPEN_TAG; use crate::truncate::TruncationPolicy; use crate::truncate::approx_token_count; use crate::truncate::truncate_text; @@ -223,11 +224,31 @@ pub(crate) fn collect_user_messages(items: &[ResponseItem]) -> Vec { Some(user.message()) } } - _ => None, + _ => collect_turn_aborted_marker(item), }) .collect() } +fn collect_turn_aborted_marker(item: &ResponseItem) -> Option { + let ResponseItem::Message { role, content, .. } = item else { + return None; + }; + if role != "user" { + return None; + } + + let text = content_items_to_text(content)?; + if text + .trim_start() + .to_ascii_lowercase() + .starts_with(TURN_ABORTED_OPEN_TAG) + { + Some(text) + } else { + None + } +} + pub(crate) fn is_summary_message(message: &str) -> bool { message.starts_with(format!("{SUMMARY_PREFIX}\n").as_str()) } @@ -337,6 +358,7 @@ async fn drain_to_completed( mod tests { use super::*; + use crate::session_prefix::TURN_ABORTED_OPEN_TAG; use pretty_assertions::assert_eq; #[test] @@ -489,4 +511,41 @@ mod tests { }; assert_eq!(summary, summary_text); } + + #[test] + fn build_compacted_history_preserves_turn_aborted_markers() { + let marker = format!( + "{TURN_ABORTED_OPEN_TAG}\n turn-1\n interrupted\n" + ); + let items = vec![ + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: marker.clone(), + }], + }, + ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: "real user message".to_string(), + }], + }, + ]; + + let user_messages = collect_user_messages(&items); + let history = build_compacted_history(Vec::new(), &user_messages, "SUMMARY"); + + let found_marker = history.iter().any(|item| match item { + ResponseItem::Message { role, content, .. } if role == "user" => { + content_items_to_text(content).is_some_and(|text| text == marker) + } + _ => false, + }); + assert!( + found_marker, + "expected compacted history to retain marker" + ); + } } diff --git a/codex-rs/core/src/context_manager/history.rs b/codex-rs/core/src/context_manager/history.rs index 4feeddc29fa..354b3c2d145 100644 --- a/codex-rs/core/src/context_manager/history.rs +++ b/codex-rs/core/src/context_manager/history.rs @@ -2,6 +2,7 @@ use crate::codex::TurnContext; use crate::context_manager::normalize; use crate::instructions::SkillInstructions; use crate::instructions::UserInstructions; +use crate::session_prefix::is_session_prefix; use crate::truncate::TruncationPolicy; use crate::truncate::approx_token_count; use crate::truncate::approx_tokens_from_byte_count; @@ -328,12 +329,6 @@ fn estimate_reasoning_length(encoded_len: usize) -> usize { .saturating_sub(650) } -fn is_session_prefix(text: &str) -> bool { - let trimmed = text.trim_start(); - let lowered = trimmed.to_ascii_lowercase(); - lowered.starts_with("") -} - pub(crate) fn is_user_turn_boundary(item: &ResponseItem) -> bool { let ResponseItem::Message { role, content, .. } = item else { return false; diff --git a/codex-rs/core/src/event_mapping.rs b/codex-rs/core/src/event_mapping.rs index eaa477b0852..087b554cd51 100644 --- a/codex-rs/core/src/event_mapping.rs +++ b/codex-rs/core/src/event_mapping.rs @@ -19,14 +19,9 @@ use uuid::Uuid; use crate::instructions::SkillInstructions; use crate::instructions::UserInstructions; +use crate::session_prefix::is_session_prefix; use crate::user_shell_command::is_user_shell_command_text; -fn is_session_prefix(text: &str) -> bool { - let trimmed = text.trim_start(); - let lowered = trimmed.to_ascii_lowercase(); - lowered.starts_with("") -} - fn parse_user_message(message: &[ContentItem]) -> Option { if UserInstructions::is_user_instructions(message) || SkillInstructions::is_skill_instructions(message) diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index a23ae55aa85..6630d22a767 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -46,6 +46,7 @@ pub mod parse_command; pub mod path_utils; pub mod powershell; pub mod sandboxing; +mod session_prefix; mod stream_events_utils; mod text_encoding; pub mod token_data; diff --git a/codex-rs/core/src/session_prefix.rs b/codex-rs/core/src/session_prefix.rs new file mode 100644 index 00000000000..99283082b6f --- /dev/null +++ b/codex-rs/core/src/session_prefix.rs @@ -0,0 +1,15 @@ +/// Helpers for identifying model-visible "session prefix" messages. +/// +/// A session prefix is a user-role message that carries configuration or state needed by +/// follow-up turns (e.g. ``, ``). These items are persisted in +/// history so the model can see them, but they are not user intent and must not create user-turn +/// boundaries. +pub(crate) const ENVIRONMENT_CONTEXT_OPEN_TAG: &str = ""; +pub(crate) const TURN_ABORTED_OPEN_TAG: &str = ""; + +/// Returns true if `text` starts with a session prefix marker (case-insensitive). +pub(crate) fn is_session_prefix(text: &str) -> bool { + let trimmed = text.trim_start(); + let lowered = trimmed.to_ascii_lowercase(); + lowered.starts_with(ENVIRONMENT_CONTEXT_OPEN_TAG) || lowered.starts_with(TURN_ABORTED_OPEN_TAG) +} diff --git a/codex-rs/core/src/tasks/mod.rs b/codex-rs/core/src/tasks/mod.rs index 83326b1f05a..91303a3bdcd 100644 --- a/codex-rs/core/src/tasks/mod.rs +++ b/codex-rs/core/src/tasks/mod.rs @@ -24,9 +24,13 @@ use crate::protocol::EventMsg; use crate::protocol::TurnAbortReason; use crate::protocol::TurnAbortedEvent; use crate::protocol::TurnCompleteEvent; +use crate::session_prefix::TURN_ABORTED_OPEN_TAG; use crate::state::ActiveTurn; use crate::state::RunningTask; use crate::state::TaskKind; +use codex_protocol::models::ContentItem; +use codex_protocol::models::ResponseItem; +use codex_protocol::protocol::RolloutItem; use codex_protocol::user_input::UserInput; pub(crate) use compact::CompactTask; @@ -37,6 +41,7 @@ pub(crate) use undo::UndoTask; pub(crate) use user_shell::UserShellCommandTask; const GRACEFULL_INTERRUPTION_TIMEOUT_MS: u64 = 100; +const TURN_ABORTED_INTERRUPTED_GUIDANCE: &str = "The user interrupted the previous turn. Do not continue or repeat work from that turn unless the user explicitly asks. If any tools/commands were aborted, they may have partially executed; verify current state before retrying."; /// Thin wrapper that exposes the parts of [`Session`] task runners need. #[derive(Clone)] @@ -242,6 +247,25 @@ impl Session { .abort(session_ctx, Arc::clone(&task.turn_context)) .await; + if reason == TurnAbortReason::Interrupted { + let marker = ResponseItem::Message { + id: None, + role: "user".to_string(), + content: vec![ContentItem::InputText { + text: format!( + "{TURN_ABORTED_OPEN_TAG}\n {sub_id}\n interrupted\n {TURN_ABORTED_INTERRUPTED_GUIDANCE}\n" + ), + }], + }; + self.record_into_history(std::slice::from_ref(&marker), task.turn_context.as_ref()) + .await; + self.persist_rollout_items(&[RolloutItem::ResponseItem(marker)]) + .await; + // Ensure the marker is durably visible before emitting TurnAborted: some clients + // synchronously re-read the rollout on receipt of the abort event. + self.flush_rollout().await; + } + let event = EventMsg::TurnAborted(TurnAbortedEvent { reason }); self.send_event(task.turn_context.as_ref(), event).await; } diff --git a/codex-rs/core/tests/suite/abort_tasks.rs b/codex-rs/core/tests/suite/abort_tasks.rs index c020fa00cbf..53b5a26b022 100644 --- a/codex-rs/core/tests/suite/abort_tasks.rs +++ b/codex-rs/core/tests/suite/abort_tasks.rs @@ -163,3 +163,79 @@ async fn interrupt_tool_records_history_entries() { "expected at least one tenth of a second of elapsed time, got {secs}" ); } + +/// After an interrupt we persist a model-visible `` marker in the conversation +/// history. This test asserts that the marker is included in the next `/responses` request. +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn interrupt_persists_turn_aborted_marker_in_next_request() { + let command = "sleep 60"; + let call_id = "call-turn-aborted-marker"; + + let args = json!({ + "command": command, + "timeout_ms": 60_000 + }) + .to_string(); + let first_body = sse(vec![ + ev_response_created("resp-marker"), + ev_function_call(call_id, "shell_command", &args), + ev_completed("resp-marker"), + ]); + let follow_up_body = sse(vec![ + ev_response_created("resp-followup"), + ev_completed("resp-followup"), + ]); + + let server = start_mock_server().await; + let response_mock = mount_sse_sequence(&server, vec![first_body, follow_up_body]).await; + + let fixture = test_codex() + .with_model("gpt-5.1") + .build(&server) + .await + .unwrap(); + let codex = Arc::clone(&fixture.codex); + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "start interrupt marker".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await + .unwrap(); + + wait_for_event(&codex, |ev| matches!(ev, EventMsg::ExecCommandBegin(_))).await; + + tokio::time::sleep(Duration::from_secs_f32(0.1)).await; + codex.submit(Op::Interrupt).await.unwrap(); + + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnAborted(_))).await; + + codex + .submit(Op::UserInput { + items: vec![UserInput::Text { + text: "follow up".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + }) + .await + .unwrap(); + + wait_for_event(&codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + let requests = response_mock.requests(); + assert_eq!(requests.len(), 2, "expected two calls to the responses API"); + + let follow_up_request = &requests[1]; + let user_texts = follow_up_request.message_input_texts("user"); + assert!( + user_texts + .iter() + .any(|text| text.contains("")), + "expected marker in follow-up request" + ); +}