Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
2b8befd
fix: prevent repeating interrupted turns
swordfish444 Jan 10, 2026
80434a3
Merge branch 'main' into fix/interrupt-aborted-turn-marker
swordfish444 Jan 11, 2026
07b15fe
Merge branch 'main' into fix/interrupt-aborted-turn-marker
swordfish444 Jan 12, 2026
8745d8c
Merge branch 'main' into fix/interrupt-aborted-turn-marker
swordfish444 Jan 12, 2026
03761d1
core: refine interrupted-turn marker
swordfish444 Jan 12, 2026
de8719c
Merge branch 'main' into fix/interrupt-aborted-turn-marker
swordfish444 Jan 12, 2026
a1f39d3
core: document abort test expectations
swordfish444 Jan 12, 2026
bb44a06
Merge branch 'main' into fix/interrupt-aborted-turn-marker
swordfish444 Jan 12, 2026
04730ea
Merge branch 'main' into fix/interrupt-aborted-turn-marker
swordfish444 Jan 12, 2026
2cc4233
Merge branch 'main' into fix/interrupt-aborted-turn-marker
swordfish444 Jan 12, 2026
4e1b1cb
Merge branch 'main' into fix/interrupt-aborted-turn-marker
swordfish444 Jan 13, 2026
344f408
Merge branch 'main' into fix/interrupt-aborted-turn-marker
swordfish444 Jan 13, 2026
3920f4a
Merge branch 'main' into fix/interrupt-aborted-turn-marker
jif-oai Jan 13, 2026
111de08
Merge branch 'main' into fix/interrupt-aborted-turn-marker
swordfish444 Jan 13, 2026
2e6deb5
Merge branch 'main' into fix/interrupt-aborted-turn-marker
swordfish444 Jan 13, 2026
445691a
Merge branch 'main' into fix/interrupt-aborted-turn-marker
swordfish444 Jan 13, 2026
850a4f4
Merge branch 'main' into fix/interrupt-aborted-turn-marker
swordfish444 Jan 13, 2026
b728a8c
Merge branch 'main' into fix/interrupt-aborted-turn-marker
swordfish444 Jan 13, 2026
2c52858
Merge branch 'main' into fix/interrupt-aborted-turn-marker
swordfish444 Jan 14, 2026
06f9a52
Merge branch 'main' into fix/interrupt-aborted-turn-marker
swordfish444 Jan 14, 2026
75816d8
Merge branch 'main' into fix/interrupt-aborted-turn-marker
swordfish444 Jan 14, 2026
c775d57
Merge branch 'main' into fix/interrupt-aborted-turn-marker
swordfish444 Jan 15, 2026
b246b5c
Merge branch 'main' into fix/interrupt-aborted-turn-marker
swordfish444 Jan 15, 2026
d6b1a98
Merge branch 'main' into fix/interrupt-aborted-turn-marker
swordfish444 Jan 16, 2026
67257da
Merge branch 'main' into fix/interrupt-aborted-turn-marker
etraut-openai Jan 16, 2026
e758326
Merge branch 'main' into fix/interrupt-aborted-turn-marker
swordfish444 Jan 16, 2026
e8fb3f2
core: include text_elements in abort task tests
swordfish444 Jan 16, 2026
377583c
core: keep <turn_aborted> during compaction
swordfish444 Jan 17, 2026
80f526f
Merge branch 'main' into fix/interrupt-aborted-turn-marker
swordfish444 Jan 17, 2026
4f2b27b
Merge upstream/main
swordfish444 Jan 18, 2026
b80ac16
Merge branch 'main' into fix/interrupt-aborted-turn-marker
swordfish444 Jan 19, 2026
6a15428
Merge branch 'main' into fix/interrupt-aborted-turn-marker
swordfish444 Jan 19, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 60 additions & 26 deletions codex-rs/core/src/codex.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4218,6 +4218,8 @@ mod tests {

sess.abort_all_tasks(TurnAbortReason::Interrupted).await;

// Interrupts persist a model-visible `<turn_aborted>` marker into history, but there is no
// separate client-visible event for that marker (only `EventMsg::TurnAborted`).
let evt = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())
.await
.expect("timeout waiting for event")
Expand All @@ -4226,6 +4228,7 @@ mod tests {
EventMsg::TurnAborted(e) => assert_eq!(TurnAbortReason::Interrupted, e.reason),
other => panic!("unexpected event: {other:?}"),
}
// No extra events should be emitted after an abort.
assert!(rx.try_recv().is_err());
}

Expand All @@ -4248,11 +4251,17 @@ mod tests {

sess.abort_all_tasks(TurnAbortReason::Interrupted).await;

let evt = rx.recv().await.expect("event");
// Even if tasks handle cancellation gracefully, interrupts still result in `TurnAborted`
// being the only client-visible signal.
let evt = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())
.await
.expect("timeout waiting for event")
.expect("event");
match evt.msg {
EventMsg::TurnAborted(e) => assert_eq!(TurnAbortReason::Interrupted, e.reason),
other => panic!("unexpected event: {other:?}"),
}
// No extra events should be emitted after an abort.
assert!(rx.try_recv().is_err());
}

Expand All @@ -4268,42 +4277,67 @@ mod tests {

sess.abort_all_tasks(TurnAbortReason::Interrupted).await;

// Drain events until we observe ExitedReviewMode; earlier
// RawResponseItem entries (e.g., environment context) may arrive first.
loop {
let evt = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv())
// Aborting a review task should exit review mode before surfacing the abort to the client.
// We scan for these events (rather than relying on fixed ordering) since unrelated events
// may interleave.
let mut exited_review_mode_idx = None;
let mut turn_aborted_idx = None;
let mut idx = 0usize;
let deadline = tokio::time::Instant::now() + std::time::Duration::from_secs(3);
while tokio::time::Instant::now() < deadline {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
let evt = tokio::time::timeout(remaining, rx.recv())
.await
.expect("timeout waiting for first event")
.expect("first event");
.expect("timeout waiting for event")
.expect("event");
let event_idx = idx;
idx = idx.saturating_add(1);
match evt.msg {
EventMsg::ExitedReviewMode(ev) => {
assert!(ev.review_output.is_none());
break;
exited_review_mode_idx = Some(event_idx);
}
// Ignore any non-critical events before exit.
_ => continue,
}
}
loop {
let evt = tokio::time::timeout(std::time::Duration::from_secs(2), rx.recv())
.await
.expect("timeout waiting for next event")
.expect("event");
match evt.msg {
EventMsg::RawResponseItem(_) => continue,
EventMsg::ItemStarted(_) | EventMsg::ItemCompleted(_) => continue,
EventMsg::AgentMessage(_) => continue,
EventMsg::TurnAborted(e) => {
assert_eq!(TurnAbortReason::Interrupted, e.reason);
EventMsg::TurnAborted(ev) => {
assert_eq!(TurnAbortReason::Interrupted, ev.reason);
turn_aborted_idx = Some(event_idx);
break;
}
other => panic!("unexpected second event: {other:?}"),
_ => {}
}
}
assert!(
exited_review_mode_idx.is_some(),
"expected ExitedReviewMode after abort"
);
assert!(
turn_aborted_idx.is_some(),
"expected TurnAborted after abort"
);
assert!(
exited_review_mode_idx.unwrap() < turn_aborted_idx.unwrap(),
"expected ExitedReviewMode before TurnAborted"
);

// TODO(jif) investigate what is this?
let history = sess.clone_history().await;
let _ = history.raw_items();
// The `<turn_aborted>` marker is silent in the event stream, so verify it is still
// recorded in history for the model.
assert!(
history.raw_items().iter().any(|item| {
let ResponseItem::Message { role, content, .. } = item else {
return false;
};
if role != "user" {
return false;
}
content.iter().any(|content_item| {
let ContentItem::InputText { text } = content_item else {
return false;
};
text.contains(crate::session_prefix::TURN_ABORTED_OPEN_TAG)
})
}),
"expected a model-visible turn aborted marker in history after interrupt"
);
}

#[tokio::test]
Expand Down
61 changes: 60 additions & 1 deletion codex-rs/core/src/compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::protocol::EventMsg;
use crate::protocol::TurnContextItem;
use crate::protocol::TurnStartedEvent;
use crate::protocol::WarningEvent;
use crate::session_prefix::TURN_ABORTED_OPEN_TAG;
use crate::truncate::TruncationPolicy;
use crate::truncate::approx_token_count;
use crate::truncate::truncate_text;
Expand Down Expand Up @@ -223,11 +224,31 @@ pub(crate) fn collect_user_messages(items: &[ResponseItem]) -> Vec<String> {
Some(user.message())
}
}
_ => None,
_ => collect_turn_aborted_marker(item),
})
.collect()
}

fn collect_turn_aborted_marker(item: &ResponseItem) -> Option<String> {
let ResponseItem::Message { role, content, .. } = item else {
return None;
};
if role != "user" {
return None;
}

let text = content_items_to_text(content)?;
if text
.trim_start()
.to_ascii_lowercase()
.starts_with(TURN_ABORTED_OPEN_TAG)
{
Some(text)
} else {
None
}
}

pub(crate) fn is_summary_message(message: &str) -> bool {
message.starts_with(format!("{SUMMARY_PREFIX}\n").as_str())
}
Expand Down Expand Up @@ -337,6 +358,7 @@ async fn drain_to_completed(
mod tests {

use super::*;
use crate::session_prefix::TURN_ABORTED_OPEN_TAG;
use pretty_assertions::assert_eq;

#[test]
Expand Down Expand Up @@ -489,4 +511,41 @@ mod tests {
};
assert_eq!(summary, summary_text);
}

#[test]
fn build_compacted_history_preserves_turn_aborted_markers() {
let marker = format!(
"{TURN_ABORTED_OPEN_TAG}\n <turn_id>turn-1</turn_id>\n <reason>interrupted</reason>\n</turn_aborted>"
);
let items = vec![
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: marker.clone(),
}],
},
ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: "real user message".to_string(),
}],
},
];

let user_messages = collect_user_messages(&items);
let history = build_compacted_history(Vec::new(), &user_messages, "SUMMARY");

let found_marker = history.iter().any(|item| match item {
ResponseItem::Message { role, content, .. } if role == "user" => {
content_items_to_text(content).is_some_and(|text| text == marker)
}
_ => false,
});
assert!(
found_marker,
"expected compacted history to retain <turn_aborted> marker"
);
}
}
7 changes: 1 addition & 6 deletions codex-rs/core/src/context_manager/history.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::codex::TurnContext;
use crate::context_manager::normalize;
use crate::instructions::SkillInstructions;
use crate::instructions::UserInstructions;
use crate::session_prefix::is_session_prefix;
use crate::truncate::TruncationPolicy;
use crate::truncate::approx_token_count;
use crate::truncate::approx_tokens_from_byte_count;
Expand Down Expand Up @@ -328,12 +329,6 @@ fn estimate_reasoning_length(encoded_len: usize) -> usize {
.saturating_sub(650)
}

fn is_session_prefix(text: &str) -> bool {
let trimmed = text.trim_start();
let lowered = trimmed.to_ascii_lowercase();
lowered.starts_with("<environment_context>")
}

pub(crate) fn is_user_turn_boundary(item: &ResponseItem) -> bool {
let ResponseItem::Message { role, content, .. } = item else {
return false;
Expand Down
7 changes: 1 addition & 6 deletions codex-rs/core/src/event_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,9 @@ use uuid::Uuid;

use crate::instructions::SkillInstructions;
use crate::instructions::UserInstructions;
use crate::session_prefix::is_session_prefix;
use crate::user_shell_command::is_user_shell_command_text;

fn is_session_prefix(text: &str) -> bool {
let trimmed = text.trim_start();
let lowered = trimmed.to_ascii_lowercase();
lowered.starts_with("<environment_context>")
}

fn parse_user_message(message: &[ContentItem]) -> Option<UserMessageItem> {
if UserInstructions::is_user_instructions(message)
|| SkillInstructions::is_skill_instructions(message)
Expand Down
1 change: 1 addition & 0 deletions codex-rs/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub mod parse_command;
pub mod path_utils;
pub mod powershell;
pub mod sandboxing;
mod session_prefix;
mod stream_events_utils;
mod text_encoding;
pub mod token_data;
Expand Down
15 changes: 15 additions & 0 deletions codex-rs/core/src/session_prefix.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/// Helpers for identifying model-visible "session prefix" messages.
///
/// A session prefix is a user-role message that carries configuration or state needed by
/// follow-up turns (e.g. `<environment_context>`, `<turn_aborted>`). These items are persisted in
/// history so the model can see them, but they are not user intent and must not create user-turn
/// boundaries.
pub(crate) const ENVIRONMENT_CONTEXT_OPEN_TAG: &str = "<environment_context>";
pub(crate) const TURN_ABORTED_OPEN_TAG: &str = "<turn_aborted>";

/// Returns true if `text` starts with a session prefix marker (case-insensitive).
pub(crate) fn is_session_prefix(text: &str) -> bool {
let trimmed = text.trim_start();
let lowered = trimmed.to_ascii_lowercase();
lowered.starts_with(ENVIRONMENT_CONTEXT_OPEN_TAG) || lowered.starts_with(TURN_ABORTED_OPEN_TAG)
}
24 changes: 24 additions & 0 deletions codex-rs/core/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@ use crate::protocol::EventMsg;
use crate::protocol::TurnAbortReason;
use crate::protocol::TurnAbortedEvent;
use crate::protocol::TurnCompleteEvent;
use crate::session_prefix::TURN_ABORTED_OPEN_TAG;
use crate::state::ActiveTurn;
use crate::state::RunningTask;
use crate::state::TaskKind;
use codex_protocol::models::ContentItem;
use codex_protocol::models::ResponseItem;
use codex_protocol::protocol::RolloutItem;
use codex_protocol::user_input::UserInput;

pub(crate) use compact::CompactTask;
Expand All @@ -37,6 +41,7 @@ pub(crate) use undo::UndoTask;
pub(crate) use user_shell::UserShellCommandTask;

const GRACEFULL_INTERRUPTION_TIMEOUT_MS: u64 = 100;
const TURN_ABORTED_INTERRUPTED_GUIDANCE: &str = "The user interrupted the previous turn. Do not continue or repeat work from that turn unless the user explicitly asks. If any tools/commands were aborted, they may have partially executed; verify current state before retrying.";

/// Thin wrapper that exposes the parts of [`Session`] task runners need.
#[derive(Clone)]
Expand Down Expand Up @@ -242,6 +247,25 @@ impl Session {
.abort(session_ctx, Arc::clone(&task.turn_context))
.await;

if reason == TurnAbortReason::Interrupted {
let marker = ResponseItem::Message {
id: None,
role: "user".to_string(),
content: vec![ContentItem::InputText {
text: format!(
"{TURN_ABORTED_OPEN_TAG}\n <turn_id>{sub_id}</turn_id>\n <reason>interrupted</reason>\n <guidance>{TURN_ABORTED_INTERRUPTED_GUIDANCE}</guidance>\n</turn_aborted>"
),
}],
};
self.record_into_history(std::slice::from_ref(&marker), task.turn_context.as_ref())
.await;
self.persist_rollout_items(&[RolloutItem::ResponseItem(marker)])
.await;
// Ensure the marker is durably visible before emitting TurnAborted: some clients
// synchronously re-read the rollout on receipt of the abort event.
self.flush_rollout().await;
}

let event = EventMsg::TurnAborted(TurnAbortedEvent { reason });
self.send_event(task.turn_context.as_ref(), event).await;
}
Expand Down
Loading
Loading