From e8ffc420ab97a2226219830965e0edd4983a2221 Mon Sep 17 00:00:00 2001 From: David de Regt Date: Wed, 22 Apr 2026 11:21:48 -0700 Subject: [PATCH 1/6] Add includeTurns parameter to thread/resume for skipping to pagination For callers who expect to be paginating the results, they can now call thread/resume with includeTurns:false so it will not fetch any pages of turns, and instead only set up the subscription. That call can be immediately followed by pagination requests to thread/turns/list to fetch pages of turns according to the UI's current interactions. --- .../schema/json/ClientRequest.json | 7 ++ .../codex_app_server_protocol.schemas.json | 7 ++ .../codex_app_server_protocol.v2.schemas.json | 7 ++ .../schema/json/v2/ThreadResumeParams.json | 7 ++ .../typescript/v2/ThreadResumeParams.ts | 6 + .../app-server-protocol/src/protocol/v2.rs | 5 + codex-rs/app-server/README.md | 8 ++ .../app-server/src/codex_message_processor.rs | 35 ++++-- codex-rs/app-server/src/thread_state.rs | 1 + .../tests/suite/v2/thread_resume.rs | 117 ++++++++++++++++++ 10 files changed, 187 insertions(+), 13 deletions(-) diff --git a/codex-rs/app-server-protocol/schema/json/ClientRequest.json b/codex-rs/app-server-protocol/schema/json/ClientRequest.json index 8f2324d8144f..2e72bd70b8b4 100644 --- a/codex-rs/app-server-protocol/schema/json/ClientRequest.json +++ b/codex-rs/app-server-protocol/schema/json/ClientRequest.json @@ -3715,6 +3715,13 @@ "null" ] }, + "includeTurns": { + "description": "When false, return only thread metadata and live-resume state without populating `thread.turns`. This is useful when the client plans to call `thread/turns/list` immediately after resuming.", + "type": [ + "boolean", + "null" + ] + }, "model": { "description": "Configuration overrides for the resumed thread, if any.", "type": [ diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index 3eaf25b9b2d1..153eaec1bfac 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -15785,6 +15785,13 @@ "null" ] }, + "includeTurns": { + "description": "When false, return only thread metadata and live-resume state without populating `thread.turns`. This is useful when the client plans to call `thread/turns/list` immediately after resuming.", + "type": [ + "boolean", + "null" + ] + }, "model": { "description": "Configuration overrides for the resumed thread, if any.", "type": [ diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json index b1f9935150ca..e439f2d7540a 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json @@ -13672,6 +13672,13 @@ "null" ] }, + "includeTurns": { + "description": "When false, return only thread metadata and live-resume state without populating `thread.turns`. This is useful when the client plans to call `thread/turns/list` immediately after resuming.", + "type": [ + "boolean", + "null" + ] + }, "model": { "description": "Configuration overrides for the resumed thread, if any.", "type": [ diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeParams.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeParams.json index 7fb27c08e86f..8379797bc436 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeParams.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeParams.json @@ -1313,6 +1313,13 @@ "null" ] }, + "includeTurns": { + "description": "When false, return only thread metadata and live-resume state without populating `thread.turns`. This is useful when the client plans to call `thread/turns/list` immediately after resuming.", + "type": [ + "boolean", + "null" + ] + }, "model": { "description": "Configuration overrides for the resumed thread, if any.", "type": [ diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadResumeParams.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadResumeParams.ts index d245efe4a2ad..ae493b46354b 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadResumeParams.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadResumeParams.ts @@ -37,11 +37,17 @@ model?: string | null, modelProvider?: string | null, serviceTier?: ServiceTier * Override where approval requests are routed for review on this thread * and subsequent turns. */ +<<<<<<< HEAD approvalsReviewer?: ApprovalsReviewer | null, sandbox?: SandboxMode | null, /** * Full permissions override for the resumed thread. Cannot be combined * with `sandbox`. */ permissionProfile?: PermissionProfile | null, config?: { [key in string]?: JsonValue } | null, baseInstructions?: string | null, developerInstructions?: string | null, personality?: Personality | null, /** + * When false, return only thread metadata and live-resume state without + * populating `thread.turns`. This is useful when the client plans to call + * `thread/turns/list` immediately after resuming. + */ +includeTurns?: boolean | null, /** * If true, persist additional rollout EventMsg variants required to * reconstruct a richer thread history on subsequent resume/fork/read. */ diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 937678793a21..b90add9e83cd 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -3429,6 +3429,11 @@ pub struct ThreadResumeParams { pub developer_instructions: Option, #[ts(optional = nullable)] pub personality: Option, + /// When false, return only thread metadata and live-resume state without + /// populating `thread.turns`. This is useful when the client plans to call + /// `thread/turns/list` immediately after resuming. + #[ts(optional = nullable)] + pub include_turns: Option, /// If true, persist additional rollout EventMsg variants required to /// reconstruct a richer thread history on subsequent resume/fork/read. #[experimental("thread/resume.persistFullHistory")] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index 0ba0a30ae894..a440cabd4750 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -259,6 +259,8 @@ Valid `personality` values are `"friendly"`, `"pragmatic"`, and `"none"`. When ` To continue a stored session, call `thread/resume` with the `thread.id` you previously recorded. The response shape matches `thread/start`. When the stored session includes persisted token usage, the server emits `thread/tokenUsage/updated` immediately after the response so clients can render restored usage before the next turn starts. You can also pass the same configuration overrides supported by `thread/start`, including `approvalsReviewer`. +By default, `thread/resume` includes the reconstructed turn history in `thread.turns`. Pass `includeTurns: false` to return only thread metadata and live resume state, then call `thread/turns/list` separately if you want to page the turn history over the network. + By default, resume uses the latest persisted `model` and `reasoningEffort` values associated with the thread. Supplying any of `model`, `modelProvider`, `config.model`, or `config.model_reasoning_effort` disables that persisted fallback and uses the explicit overrides plus normal config resolution instead. Example: @@ -269,6 +271,12 @@ Example: "personality": "friendly" } } { "id": 11, "result": { "thread": { "id": "thr_123", … } } } + +{ "method": "thread/resume", "id": 12, "params": { + "threadId": "thr_123", + "includeTurns": false +} } +{ "id": 12, "result": { "thread": { "id": "thr_123", "turns": [], … } } } ``` To branch from a stored session, call `thread/fork` with the `thread.id`. This creates a new thread id and emits a `thread/started` notification for it. When the source history includes persisted token usage, the server also emits `thread/tokenUsage/updated` for the new thread immediately after the response. If the source thread is actively running, the fork snapshots it as if the current turn had been interrupted first. Pass `ephemeral: true` when the fork should stay in-memory only: diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 75a457acbd54..347eafa7ac3b 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -4465,8 +4465,10 @@ impl CodexMessageProcessor { base_instructions, developer_instructions, personality, + include_turns, persist_extended_history, } = params; + let include_turns = include_turns.unwrap_or(true); let thread_history = if let Some(history) = history { let Some(thread_history) = self @@ -4574,6 +4576,7 @@ impl CodexMessageProcessor { rollout_path.as_path(), fallback_model_provider.as_str(), persisted_resume_metadata.as_ref(), + include_turns, ) .await { @@ -4835,6 +4838,7 @@ impl CodexMessageProcessor { config_snapshot, instruction_sources, thread_summary, + include_turns: params.include_turns.unwrap_or(true), }), ); if listener_command_tx.send(command).is_err() { @@ -4940,6 +4944,7 @@ impl CodexMessageProcessor { rollout_path: &Path, fallback_provider: &str, persisted_resume_metadata: Option<&ThreadMetadata>, + include_turns: bool, ) -> std::result::Result { let thread = match thread_history { InitialHistory::Resumed(resumed) => { @@ -4969,13 +4974,15 @@ impl CodexMessageProcessor { let mut thread = thread?; thread.id = thread_id.to_string(); thread.path = Some(rollout_path.to_path_buf()); - let history_items = thread_history.get_rollout_items(); - populate_thread_turns( - &mut thread, - ThreadTurnSource::HistoryItems(&history_items), - /*active_turn*/ None, - ) - .await?; + if include_turns { + let history_items = thread_history.get_rollout_items(); + populate_thread_turns( + &mut thread, + ThreadTurnSource::HistoryItems(&history_items), + /*active_turn*/ None, + ) + .await?; + } self.attach_thread_name(thread_id, &mut thread).await; Ok(thread) } @@ -8640,12 +8647,13 @@ async fn handle_pending_thread_resume_request( let request_id = pending.request_id; let connection_id = request_id.connection_id; let mut thread = pending.thread_summary; - if let Err(message) = populate_thread_turns( - &mut thread, - ThreadTurnSource::RolloutPath(pending.rollout_path.as_path()), - active_turn.as_ref(), - ) - .await + if pending.include_turns + && let Err(message) = populate_thread_turns( + &mut thread, + ThreadTurnSource::RolloutPath(pending.rollout_path.as_path()), + active_turn.as_ref(), + ) + .await { outgoing .send_error( @@ -10610,6 +10618,7 @@ mod tests { base_instructions: None, developer_instructions: None, personality: None, + include_turns: None, persist_extended_history: false, }; let config_snapshot = ThreadConfigSnapshot { diff --git a/codex-rs/app-server/src/thread_state.rs b/codex-rs/app-server/src/thread_state.rs index 504e59468b8c..323aba19d70e 100644 --- a/codex-rs/app-server/src/thread_state.rs +++ b/codex-rs/app-server/src/thread_state.rs @@ -31,6 +31,7 @@ pub(crate) struct PendingThreadResumeRequest { pub(crate) config_snapshot: ThreadConfigSnapshot, pub(crate) instruction_sources: Vec, pub(crate) thread_summary: codex_app_server_protocol::Thread, + pub(crate) include_turns: bool, } // ThreadListenerCommand is used to perform operations in the context of the thread listener, for serialization purposes. diff --git a/codex-rs/app-server/tests/suite/v2/thread_resume.rs b/codex-rs/app-server/tests/suite/v2/thread_resume.rs index 7a5f2c543d0b..989227c32cb4 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_resume.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_resume.rs @@ -285,6 +285,45 @@ async fn thread_resume_returns_rollout_history() -> Result<()> { Ok(()) } +#[tokio::test] +async fn thread_resume_can_skip_turns_for_metadata_only_resume() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let conversation_id = create_fake_rollout_with_text_elements( + codex_home.path(), + "2025-01-05T12-00-00", + "2025-01-05T12:00:00Z", + "Saved user message", + Vec::new(), + Some("mock_provider"), + /*git_info*/ None, + )?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let resume_id = mcp + .send_thread_resume_request(ThreadResumeParams { + thread_id: conversation_id.clone(), + include_turns: Some(false), + ..Default::default() + }) + .await?; + let resume_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(resume_id)), + ) + .await??; + let ThreadResumeResponse { thread, .. } = to_response::(resume_resp)?; + + assert_eq!(thread.id, conversation_id); + assert!(thread.turns.is_empty()); + + Ok(()) +} + #[tokio::test] async fn thread_resume_emits_restored_token_usage_before_next_turn() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; @@ -1339,6 +1378,84 @@ async fn thread_resume_rejoins_running_thread_even_with_override_mismatch() -> R Ok(()) } +#[tokio::test] +async fn thread_resume_can_skip_turns_when_thread_is_running() -> Result<()> { + let server = responses::start_mock_server().await; + let _response_mock = responses::mount_sse_once( + &server, + responses::sse(vec![ + responses::ev_response_created("resp-1"), + responses::ev_assistant_message("msg-1", "Done"), + responses::ev_completed("resp-1"), + ]), + ) + .await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let mut primary = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, primary.initialize()).await??; + + let start_id = primary + .send_thread_start_request(ThreadStartParams { + model: Some("gpt-5.4".to_string()), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_response_message(RequestId::Integer(start_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; + + let turn_id = primary + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![UserInput::Text { + text: "seed history".to_string(), + text_elements: Vec::new(), + }], + ..Default::default() + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_response_message(RequestId::Integer(turn_id)), + ) + .await??; + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_notification_message("turn/completed"), + ) + .await??; + + let mut secondary = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, secondary.initialize()).await??; + + let resume_id = secondary + .send_thread_resume_request(ThreadResumeParams { + thread_id: thread.id.clone(), + include_turns: Some(false), + ..Default::default() + }) + .await?; + let resume_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + secondary.read_stream_until_response_message(RequestId::Integer(resume_id)), + ) + .await??; + let ThreadResumeResponse { + thread: resumed, .. + } = to_response::(resume_resp)?; + + assert_eq!(resumed.id, thread.id); + assert_eq!(resumed.status, ThreadStatus::Idle); + assert!(resumed.turns.is_empty()); + + Ok(()) +} + #[tokio::test] async fn thread_resume_replays_pending_command_execution_request_approval() -> Result<()> { let responses = vec![ From 7dcb6a1c602310c97ef57c81a5f510d9b6224d8b Mon Sep 17 00:00:00 2001 From: David de Regt Date: Wed, 22 Apr 2026 11:30:04 -0700 Subject: [PATCH 2/6] build failure --- codex-rs/app-server/src/codex_message_processor.rs | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 347eafa7ac3b..340f02865112 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -4524,7 +4524,6 @@ impl CodexMessageProcessor { } }; - let fallback_model_provider = config.model_provider_id.clone(); let instruction_sources = Self::instruction_sources_from_config(&config).await; let response_history = thread_history.clone(); @@ -4574,7 +4573,6 @@ impl CodexMessageProcessor { codex_thread.as_ref(), &response_history, rollout_path.as_path(), - fallback_model_provider.as_str(), persisted_resume_metadata.as_ref(), include_turns, ) @@ -4942,23 +4940,22 @@ impl CodexMessageProcessor { thread: &CodexThread, thread_history: &InitialHistory, rollout_path: &Path, - fallback_provider: &str, persisted_resume_metadata: Option<&ThreadMetadata>, include_turns: bool, ) -> std::result::Result { + let config_snapshot = thread.config_snapshot().await; let thread = match thread_history { InitialHistory::Resumed(resumed) => { load_thread_summary_for_rollout( &self.config, resumed.conversation_id, resumed.rollout_path.as_path(), - fallback_provider, + config_snapshot.model_provider_id.as_str(), persisted_resume_metadata, ) .await } InitialHistory::Forked(items) => { - let config_snapshot = thread.config_snapshot().await; let mut thread = build_thread_from_snapshot( thread_id, &config_snapshot, From 5c9cb3e3fe8cc990a923cb23d19a6d1f42529198 Mon Sep 17 00:00:00 2001 From: David de Regt Date: Wed, 22 Apr 2026 11:43:35 -0700 Subject: [PATCH 3/6] fixing token_usage_turn_id calc --- .../app-server/src/codex_message_processor.rs | 14 +--- .../token_usage_replay.rs | 11 ++- .../tests/suite/v2/thread_resume.rs | 78 +++++++++++++++++++ 3 files changed, 86 insertions(+), 17 deletions(-) diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 340f02865112..2a0c10b47f25 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -4631,7 +4631,6 @@ impl CodexMessageProcessor { let token_usage_thread = response.thread.clone(); let token_usage_turn_id = latest_token_usage_turn_id_from_rollout_items( &response_history.get_rollout_items(), - &token_usage_thread, ); self.outgoing.send_response(request_id, response).await; // The client needs restored usage before it starts another turn. @@ -5297,11 +5296,7 @@ impl CodexMessageProcessor { { Some(turn_id) } else { - latest_token_usage_turn_id_from_rollout_path( - rollout_path.as_path(), - &token_usage_thread, - ) - .await + latest_token_usage_turn_id_from_rollout_path(rollout_path.as_path()).await }; self.outgoing.send_response(request_id, response).await; // Mirror the resume contract for forks: the new thread is usable as soon @@ -8736,11 +8731,8 @@ async fn handle_pending_thread_resume_request( reasoning_effort, }; let token_usage_thread = response.thread.clone(); - let token_usage_turn_id = latest_token_usage_turn_id_from_rollout_path( - pending.rollout_path.as_path(), - &token_usage_thread, - ) - .await; + let token_usage_turn_id = + latest_token_usage_turn_id_from_rollout_path(pending.rollout_path.as_path()).await; outgoing.send_response(request_id, response).await; // Rejoining a loaded thread has the same UI contract as a cold resume, but // uses the live conversation state instead of reconstructing a new session. diff --git a/codex-rs/app-server/src/codex_message_processor/token_usage_replay.rs b/codex-rs/app-server/src/codex_message_processor/token_usage_replay.rs index d3c06f5e7cd5..ffc572236e6b 100644 --- a/codex-rs/app-server/src/codex_message_processor/token_usage_replay.rs +++ b/codex-rs/app-server/src/codex_message_processor/token_usage_replay.rs @@ -60,15 +60,14 @@ pub(super) async fn send_thread_token_usage_update_to_connection( pub(super) async fn latest_token_usage_turn_id_for_thread_path(thread: &Thread) -> Option { let rollout_path = thread.path.as_deref()?; - latest_token_usage_turn_id_from_rollout_path(rollout_path, thread).await + latest_token_usage_turn_id_from_rollout_path(rollout_path).await } pub(super) async fn latest_token_usage_turn_id_from_rollout_path( rollout_path: &Path, - thread: &Thread, ) -> Option { let rollout_items = read_rollout_items_from_rollout(rollout_path).await.ok()?; - latest_token_usage_turn_id_from_rollout_items(&rollout_items, thread) + latest_token_usage_turn_id_from_rollout_items(&rollout_items) } /// Identifies the turn that was active when a `TokenCount` record appeared. @@ -82,15 +81,15 @@ struct TokenUsageTurnOwner { pub(super) fn latest_token_usage_turn_id_from_rollout_items( rollout_items: &[RolloutItem], - thread: &Thread, ) -> Option { let owner = latest_token_usage_turn_owner_from_rollout_items(rollout_items)?; - if thread.turns.iter().any(|turn| turn.id == owner.id) { + let rebuilt_turns = super::build_turns_from_rollout_items(rollout_items); + if rebuilt_turns.iter().any(|turn| turn.id == owner.id) { return Some(owner.id); } owner .position - .and_then(|position| thread.turns.get(position)) + .and_then(|position| rebuilt_turns.get(position)) .map(|turn| turn.id.clone()) } diff --git a/codex-rs/app-server/tests/suite/v2/thread_resume.rs b/codex-rs/app-server/tests/suite/v2/thread_resume.rs index 989227c32cb4..8af9112d248f 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_resume.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_resume.rs @@ -377,6 +377,84 @@ async fn thread_resume_emits_restored_token_usage_before_next_turn() -> Result<( Ok(()) } +#[tokio::test] +async fn thread_resume_emits_restored_token_usage_when_turns_are_excluded() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let conversation_id = create_fake_rollout_with_token_usage( + codex_home.path(), + "2025-01-05T12-00-00", + "2025-01-05T12:00:00Z", + "Saved user message", + Some("mock_provider"), + )?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let first_resume_id = mcp + .send_thread_resume_request(ThreadResumeParams { + thread_id: conversation_id.clone(), + ..Default::default() + }) + .await?; + let first_resume_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(first_resume_id)), + ) + .await??; + let ThreadResumeResponse { thread, .. } = + to_response::(first_resume_resp)?; + let expected_turn_id = thread.turns[0].id.clone(); + + let first_note = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("thread/tokenUsage/updated"), + ) + .await??; + let parsed: ServerNotification = first_note.try_into()?; + let ServerNotification::ThreadTokenUsageUpdated(notification) = parsed else { + panic!("expected thread/tokenUsage/updated notification"); + }; + assert_eq!(notification.turn_id, expected_turn_id); + + let second_resume_id = mcp + .send_thread_resume_request(ThreadResumeParams { + thread_id: conversation_id, + include_turns: Some(false), + ..Default::default() + }) + .await?; + let second_resume_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(second_resume_id)), + ) + .await??; + let ThreadResumeResponse { + thread: resumed_again, + .. + } = to_response::(second_resume_resp)?; + assert!(resumed_again.turns.is_empty()); + + let second_note = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("thread/tokenUsage/updated"), + ) + .await??; + let parsed: ServerNotification = second_note.try_into()?; + let ServerNotification::ThreadTokenUsageUpdated(notification) = parsed else { + panic!("expected thread/tokenUsage/updated notification"); + }; + + assert_eq!(notification.thread_id, resumed_again.id); + assert_eq!(notification.turn_id, expected_turn_id); + assert_eq!(notification.token_usage.total.total_tokens, 150); + + Ok(()) +} + #[tokio::test] async fn thread_resume_token_usage_replay_ignores_stale_interrupted_tail_turn() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; From 69050d3016cca7f6c4caf69959ad6fad64357aa4 Mon Sep 17 00:00:00 2001 From: David de Regt Date: Wed, 22 Apr 2026 14:25:40 -0700 Subject: [PATCH 4/6] pr feedback --- .../typescript/v2/ThreadResumeParams.ts | 1 - .../app-server/src/codex_message_processor.rs | 16 +++++-- .../token_usage_replay.rs | 42 +++++++++++-------- 3 files changed, 38 insertions(+), 21 deletions(-) diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadResumeParams.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadResumeParams.ts index ae493b46354b..36f8dae692df 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadResumeParams.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadResumeParams.ts @@ -37,7 +37,6 @@ model?: string | null, modelProvider?: string | null, serviceTier?: ServiceTier * Override where approval requests are routed for review on this thread * and subsequent turns. */ -<<<<<<< HEAD approvalsReviewer?: ApprovalsReviewer | null, sandbox?: SandboxMode | null, /** * Full permissions override for the resumed thread. Cannot be combined * with `sandbox`. diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 2a0c10b47f25..4f18b17ddd9e 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -4631,6 +4631,8 @@ impl CodexMessageProcessor { let token_usage_thread = response.thread.clone(); let token_usage_turn_id = latest_token_usage_turn_id_from_rollout_items( &response_history.get_rollout_items(), + (!token_usage_thread.turns.is_empty()) + .then_some(token_usage_thread.turns.as_slice()), ); self.outgoing.send_response(request_id, response).await; // The client needs restored usage before it starts another turn. @@ -5296,7 +5298,12 @@ impl CodexMessageProcessor { { Some(turn_id) } else { - latest_token_usage_turn_id_from_rollout_path(rollout_path.as_path()).await + latest_token_usage_turn_id_from_rollout_path( + rollout_path.as_path(), + (!token_usage_thread.turns.is_empty()) + .then_some(token_usage_thread.turns.as_slice()), + ) + .await }; self.outgoing.send_response(request_id, response).await; // Mirror the resume contract for forks: the new thread is usable as soon @@ -8731,8 +8738,11 @@ async fn handle_pending_thread_resume_request( reasoning_effort, }; let token_usage_thread = response.thread.clone(); - let token_usage_turn_id = - latest_token_usage_turn_id_from_rollout_path(pending.rollout_path.as_path()).await; + let token_usage_turn_id = latest_token_usage_turn_id_from_rollout_path( + pending.rollout_path.as_path(), + (!token_usage_thread.turns.is_empty()).then_some(token_usage_thread.turns.as_slice()), + ) + .await; outgoing.send_response(request_id, response).await; // Rejoining a loaded thread has the same UI contract as a cold resume, but // uses the live conversation state instead of reconstructing a new session. diff --git a/codex-rs/app-server/src/codex_message_processor/token_usage_replay.rs b/codex-rs/app-server/src/codex_message_processor/token_usage_replay.rs index ffc572236e6b..571311a665cb 100644 --- a/codex-rs/app-server/src/codex_message_processor/token_usage_replay.rs +++ b/codex-rs/app-server/src/codex_message_processor/token_usage_replay.rs @@ -17,6 +17,7 @@ use codex_app_server_protocol::Thread; use codex_app_server_protocol::ThreadHistoryBuilder; use codex_app_server_protocol::ThreadTokenUsage; use codex_app_server_protocol::ThreadTokenUsageUpdatedNotification; +use codex_app_server_protocol::Turn; use codex_app_server_protocol::TurnStatus; use codex_core::CodexThread; use codex_protocol::ThreadId; @@ -60,14 +61,19 @@ pub(super) async fn send_thread_token_usage_update_to_connection( pub(super) async fn latest_token_usage_turn_id_for_thread_path(thread: &Thread) -> Option { let rollout_path = thread.path.as_deref()?; - latest_token_usage_turn_id_from_rollout_path(rollout_path).await + latest_token_usage_turn_id_from_rollout_path( + rollout_path, + (!thread.turns.is_empty()).then_some(thread.turns.as_slice()), + ) + .await } pub(super) async fn latest_token_usage_turn_id_from_rollout_path( rollout_path: &Path, + turns: Option<&[Turn]>, ) -> Option { let rollout_items = read_rollout_items_from_rollout(rollout_path).await.ok()?; - latest_token_usage_turn_id_from_rollout_items(&rollout_items) + latest_token_usage_turn_id_from_rollout_items(&rollout_items, turns) } /// Identifies the turn that was active when a `TokenCount` record appeared. @@ -81,21 +87,8 @@ struct TokenUsageTurnOwner { pub(super) fn latest_token_usage_turn_id_from_rollout_items( rollout_items: &[RolloutItem], + turns: Option<&[Turn]>, ) -> Option { - let owner = latest_token_usage_turn_owner_from_rollout_items(rollout_items)?; - let rebuilt_turns = super::build_turns_from_rollout_items(rollout_items); - if rebuilt_turns.iter().any(|turn| turn.id == owner.id) { - return Some(owner.id); - } - owner - .position - .and_then(|position| rebuilt_turns.get(position)) - .map(|turn| turn.id.clone()) -} - -fn latest_token_usage_turn_owner_from_rollout_items( - rollout_items: &[RolloutItem], -) -> Option { let mut builder = ThreadHistoryBuilder::new(); let mut token_usage_turn_owner = None; @@ -112,7 +105,22 @@ fn latest_token_usage_turn_owner_from_rollout_items( builder.handle_rollout_item(item); } - token_usage_turn_owner + let owner = token_usage_turn_owner?; + let rebuilt_turns; + let turns = if let Some(turns) = turns { + turns + } else { + rebuilt_turns = builder.finish(); + rebuilt_turns.as_slice() + }; + + if turns.iter().any(|turn| turn.id == owner.id) { + return Some(owner.id); + } + owner + .position + .and_then(|position| turns.get(position)) + .map(|turn| turn.id.clone()) } /// Chooses a fallback turn id that should own a replayed token usage update. From 9a6d24cbeffd30a0b655dd065c28618fba589770 Mon Sep 17 00:00:00 2001 From: David de Regt Date: Wed, 22 Apr 2026 20:12:56 -0700 Subject: [PATCH 5/6] PR feedback --- .../schema/json/ClientRequest.json | 9 +- .../codex_app_server_protocol.schemas.json | 9 +- .../codex_app_server_protocol.v2.schemas.json | 9 +- .../schema/json/v2/ThreadResumeParams.json | 9 +- .../typescript/v2/ThreadResumeParams.ts | 4 +- .../app-server-protocol/src/protocol/v2.rs | 6 +- codex-rs/app-server/README.md | 4 +- .../app-server/src/codex_message_processor.rs | 88 ++++++++++--------- .../token_usage_replay.rs | 29 ++---- .../tests/suite/v2/thread_resume.rs | 22 ++--- 10 files changed, 84 insertions(+), 105 deletions(-) diff --git a/codex-rs/app-server-protocol/schema/json/ClientRequest.json b/codex-rs/app-server-protocol/schema/json/ClientRequest.json index 2e72bd70b8b4..061267613e39 100644 --- a/codex-rs/app-server-protocol/schema/json/ClientRequest.json +++ b/codex-rs/app-server-protocol/schema/json/ClientRequest.json @@ -3715,12 +3715,9 @@ "null" ] }, - "includeTurns": { - "description": "When false, return only thread metadata and live-resume state without populating `thread.turns`. This is useful when the client plans to call `thread/turns/list` immediately after resuming.", - "type": [ - "boolean", - "null" - ] + "excludeTurns": { + "description": "When true, return only thread metadata and live-resume state without populating `thread.turns`. This is useful when the client plans to call `thread/turns/list` immediately after resuming.", + "type": "boolean" }, "model": { "description": "Configuration overrides for the resumed thread, if any.", diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index 153eaec1bfac..3aa90d05c433 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -15785,12 +15785,9 @@ "null" ] }, - "includeTurns": { - "description": "When false, return only thread metadata and live-resume state without populating `thread.turns`. This is useful when the client plans to call `thread/turns/list` immediately after resuming.", - "type": [ - "boolean", - "null" - ] + "excludeTurns": { + "description": "When true, return only thread metadata and live-resume state without populating `thread.turns`. This is useful when the client plans to call `thread/turns/list` immediately after resuming.", + "type": "boolean" }, "model": { "description": "Configuration overrides for the resumed thread, if any.", diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json index e439f2d7540a..53f3c1d5cbd2 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json @@ -13672,12 +13672,9 @@ "null" ] }, - "includeTurns": { - "description": "When false, return only thread metadata and live-resume state without populating `thread.turns`. This is useful when the client plans to call `thread/turns/list` immediately after resuming.", - "type": [ - "boolean", - "null" - ] + "excludeTurns": { + "description": "When true, return only thread metadata and live-resume state without populating `thread.turns`. This is useful when the client plans to call `thread/turns/list` immediately after resuming.", + "type": "boolean" }, "model": { "description": "Configuration overrides for the resumed thread, if any.", diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeParams.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeParams.json index 8379797bc436..19ccad14c1f7 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeParams.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeParams.json @@ -1313,12 +1313,9 @@ "null" ] }, - "includeTurns": { - "description": "When false, return only thread metadata and live-resume state without populating `thread.turns`. This is useful when the client plans to call `thread/turns/list` immediately after resuming.", - "type": [ - "boolean", - "null" - ] + "excludeTurns": { + "description": "When true, return only thread metadata and live-resume state without populating `thread.turns`. This is useful when the client plans to call `thread/turns/list` immediately after resuming.", + "type": "boolean" }, "model": { "description": "Configuration overrides for the resumed thread, if any.", diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadResumeParams.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadResumeParams.ts index 36f8dae692df..452126be46b4 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadResumeParams.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadResumeParams.ts @@ -42,11 +42,11 @@ approvalsReviewer?: ApprovalsReviewer | null, sandbox?: SandboxMode | null, /** * with `sandbox`. */ permissionProfile?: PermissionProfile | null, config?: { [key in string]?: JsonValue } | null, baseInstructions?: string | null, developerInstructions?: string | null, personality?: Personality | null, /** - * When false, return only thread metadata and live-resume state without + * When true, return only thread metadata and live-resume state without * populating `thread.turns`. This is useful when the client plans to call * `thread/turns/list` immediately after resuming. */ -includeTurns?: boolean | null, /** +excludeTurns?: boolean, /** * If true, persist additional rollout EventMsg variants required to * reconstruct a richer thread history on subsequent resume/fork/read. */ diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index b90add9e83cd..964ce733221b 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -3429,11 +3429,11 @@ pub struct ThreadResumeParams { pub developer_instructions: Option, #[ts(optional = nullable)] pub personality: Option, - /// When false, return only thread metadata and live-resume state without + /// When true, return only thread metadata and live-resume state without /// populating `thread.turns`. This is useful when the client plans to call /// `thread/turns/list` immediately after resuming. - #[ts(optional = nullable)] - pub include_turns: Option, + #[serde(default, skip_serializing_if = "std::ops::Not::not")] + pub exclude_turns: bool, /// If true, persist additional rollout EventMsg variants required to /// reconstruct a richer thread history on subsequent resume/fork/read. #[experimental("thread/resume.persistFullHistory")] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index a440cabd4750..7faf65cb80a7 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -259,7 +259,7 @@ Valid `personality` values are `"friendly"`, `"pragmatic"`, and `"none"`. When ` To continue a stored session, call `thread/resume` with the `thread.id` you previously recorded. The response shape matches `thread/start`. When the stored session includes persisted token usage, the server emits `thread/tokenUsage/updated` immediately after the response so clients can render restored usage before the next turn starts. You can also pass the same configuration overrides supported by `thread/start`, including `approvalsReviewer`. -By default, `thread/resume` includes the reconstructed turn history in `thread.turns`. Pass `includeTurns: false` to return only thread metadata and live resume state, then call `thread/turns/list` separately if you want to page the turn history over the network. +By default, `thread/resume` includes the reconstructed turn history in `thread.turns`. Pass `excludeTurns: true` to return only thread metadata and live resume state, then call `thread/turns/list` separately if you want to page the turn history over the network. In that mode the server also skips replaying restored `thread/tokenUsage/updated`, which avoids rebuilding turns just to attribute historical usage. By default, resume uses the latest persisted `model` and `reasoningEffort` values associated with the thread. Supplying any of `model`, `modelProvider`, `config.model`, or `config.model_reasoning_effort` disables that persisted fallback and uses the explicit overrides plus normal config resolution instead. @@ -274,7 +274,7 @@ Example: { "method": "thread/resume", "id": 12, "params": { "threadId": "thr_123", - "includeTurns": false + "excludeTurns": true } } { "id": 12, "result": { "thread": { "id": "thr_123", "turns": [], … } } } ``` diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 4f18b17ddd9e..0852c0f90d55 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -4465,10 +4465,10 @@ impl CodexMessageProcessor { base_instructions, developer_instructions, personality, - include_turns, + exclude_turns, persist_extended_history, } = params; - let include_turns = include_turns.unwrap_or(true); + let include_turns = !exclude_turns; let thread_history = if let Some(history) = history { let Some(thread_history) = self @@ -4628,25 +4628,28 @@ impl CodexMessageProcessor { } let connection_id = request_id.connection_id; - let token_usage_thread = response.thread.clone(); - let token_usage_turn_id = latest_token_usage_turn_id_from_rollout_items( - &response_history.get_rollout_items(), - (!token_usage_thread.turns.is_empty()) - .then_some(token_usage_thread.turns.as_slice()), - ); + let token_usage_thread = include_turns.then(|| response.thread.clone()); self.outgoing.send_response(request_id, response).await; - // The client needs restored usage before it starts another turn. - // Sending after the response preserves JSON-RPC request ordering while - // still filling the status line before the next turn lifecycle begins. - send_thread_token_usage_update_to_connection( - &self.outgoing, - connection_id, - thread_id, - &token_usage_thread, - codex_thread.as_ref(), - token_usage_turn_id, - ) - .await; + // `excludeTurns` is explicitly the cheap resume path, so avoid + // rebuilding history only to attribute a replayed usage update. + if let Some(token_usage_thread) = token_usage_thread { + let token_usage_turn_id = latest_token_usage_turn_id_from_rollout_items( + &response_history.get_rollout_items(), + token_usage_thread.turns.as_slice(), + ); + // The client needs restored usage before it starts another turn. + // Sending after the response preserves JSON-RPC request ordering while + // still filling the status line before the next turn lifecycle begins. + send_thread_token_usage_update_to_connection( + &self.outgoing, + connection_id, + thread_id, + &token_usage_thread, + codex_thread.as_ref(), + token_usage_turn_id, + ) + .await; + } } Err(err) => { let error = JSONRPCErrorError { @@ -4837,7 +4840,7 @@ impl CodexMessageProcessor { config_snapshot, instruction_sources, thread_summary, - include_turns: params.include_turns.unwrap_or(true), + include_turns: !params.exclude_turns, }), ); if listener_command_tx.send(command).is_err() { @@ -5300,8 +5303,7 @@ impl CodexMessageProcessor { } else { latest_token_usage_turn_id_from_rollout_path( rollout_path.as_path(), - (!token_usage_thread.turns.is_empty()) - .then_some(token_usage_thread.turns.as_slice()), + token_usage_thread.turns.as_slice(), ) .await }; @@ -8737,24 +8739,28 @@ async fn handle_pending_thread_resume_request( permission_profile, reasoning_effort, }; - let token_usage_thread = response.thread.clone(); - let token_usage_turn_id = latest_token_usage_turn_id_from_rollout_path( - pending.rollout_path.as_path(), - (!token_usage_thread.turns.is_empty()).then_some(token_usage_thread.turns.as_slice()), - ) - .await; + let token_usage_thread = pending.include_turns.then(|| response.thread.clone()); outgoing.send_response(request_id, response).await; - // Rejoining a loaded thread has the same UI contract as a cold resume, but - // uses the live conversation state instead of reconstructing a new session. - send_thread_token_usage_update_to_connection( - outgoing, - connection_id, - conversation_id, - &token_usage_thread, - conversation.as_ref(), - token_usage_turn_id, - ) - .await; + // Match cold resume: metadata-only resume should attach the listener without + // paying the cost of turn reconstruction for historical usage replay. + if let Some(token_usage_thread) = token_usage_thread { + let token_usage_turn_id = latest_token_usage_turn_id_from_rollout_path( + pending.rollout_path.as_path(), + token_usage_thread.turns.as_slice(), + ) + .await; + // Rejoining a loaded thread has the same UI contract as a cold resume, but + // uses the live conversation state instead of reconstructing a new session. + send_thread_token_usage_update_to_connection( + outgoing, + connection_id, + conversation_id, + &token_usage_thread, + conversation.as_ref(), + token_usage_turn_id, + ) + .await; + } outgoing .replay_requests_to_connection_for_thread(connection_id, conversation_id) .await; @@ -10617,7 +10623,7 @@ mod tests { base_instructions: None, developer_instructions: None, personality: None, - include_turns: None, + exclude_turns: false, persist_extended_history: false, }; let config_snapshot = ThreadConfigSnapshot { diff --git a/codex-rs/app-server/src/codex_message_processor/token_usage_replay.rs b/codex-rs/app-server/src/codex_message_processor/token_usage_replay.rs index 571311a665cb..bcd972a47063 100644 --- a/codex-rs/app-server/src/codex_message_processor/token_usage_replay.rs +++ b/codex-rs/app-server/src/codex_message_processor/token_usage_replay.rs @@ -61,16 +61,12 @@ pub(super) async fn send_thread_token_usage_update_to_connection( pub(super) async fn latest_token_usage_turn_id_for_thread_path(thread: &Thread) -> Option { let rollout_path = thread.path.as_deref()?; - latest_token_usage_turn_id_from_rollout_path( - rollout_path, - (!thread.turns.is_empty()).then_some(thread.turns.as_slice()), - ) - .await + latest_token_usage_turn_id_from_rollout_path(rollout_path, thread.turns.as_slice()).await } pub(super) async fn latest_token_usage_turn_id_from_rollout_path( rollout_path: &Path, - turns: Option<&[Turn]>, + turns: &[Turn], ) -> Option { let rollout_items = read_rollout_items_from_rollout(rollout_path).await.ok()?; latest_token_usage_turn_id_from_rollout_items(&rollout_items, turns) @@ -87,7 +83,7 @@ struct TokenUsageTurnOwner { pub(super) fn latest_token_usage_turn_id_from_rollout_items( rollout_items: &[RolloutItem], - turns: Option<&[Turn]>, + turns: &[Turn], ) -> Option { let mut builder = ThreadHistoryBuilder::new(); let mut token_usage_turn_owner = None; @@ -106,21 +102,14 @@ pub(super) fn latest_token_usage_turn_id_from_rollout_items( } let owner = token_usage_turn_owner?; - let rebuilt_turns; - let turns = if let Some(turns) = turns { - turns - } else { - rebuilt_turns = builder.finish(); - rebuilt_turns.as_slice() - }; - if turns.iter().any(|turn| turn.id == owner.id) { - return Some(owner.id); + Some(owner.id) + } else { + owner + .position + .and_then(|position| turns.get(position)) + .map(|turn| turn.id.clone()) } - owner - .position - .and_then(|position| turns.get(position)) - .map(|turn| turn.id.clone()) } /// Chooses a fallback turn id that should own a replayed token usage update. diff --git a/codex-rs/app-server/tests/suite/v2/thread_resume.rs b/codex-rs/app-server/tests/suite/v2/thread_resume.rs index 8af9112d248f..c86f88825d82 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_resume.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_resume.rs @@ -307,7 +307,7 @@ async fn thread_resume_can_skip_turns_for_metadata_only_resume() -> Result<()> { let resume_id = mcp .send_thread_resume_request(ThreadResumeParams { thread_id: conversation_id.clone(), - include_turns: Some(false), + exclude_turns: true, ..Default::default() }) .await?; @@ -378,7 +378,7 @@ async fn thread_resume_emits_restored_token_usage_before_next_turn() -> Result<( } #[tokio::test] -async fn thread_resume_emits_restored_token_usage_when_turns_are_excluded() -> Result<()> { +async fn thread_resume_skips_restored_token_usage_when_turns_are_excluded() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; let codex_home = TempDir::new()?; create_config_toml(codex_home.path(), &server.uri())?; @@ -423,7 +423,7 @@ async fn thread_resume_emits_restored_token_usage_when_turns_are_excluded() -> R let second_resume_id = mcp .send_thread_resume_request(ThreadResumeParams { thread_id: conversation_id, - include_turns: Some(false), + exclude_turns: true, ..Default::default() }) .await?; @@ -442,15 +442,11 @@ async fn thread_resume_emits_restored_token_usage_when_turns_are_excluded() -> R DEFAULT_READ_TIMEOUT, mcp.read_stream_until_notification_message("thread/tokenUsage/updated"), ) - .await??; - let parsed: ServerNotification = second_note.try_into()?; - let ServerNotification::ThreadTokenUsageUpdated(notification) = parsed else { - panic!("expected thread/tokenUsage/updated notification"); - }; - - assert_eq!(notification.thread_id, resumed_again.id); - assert_eq!(notification.turn_id, expected_turn_id); - assert_eq!(notification.token_usage.total.total_tokens, 150); + .await; + assert!( + second_note.is_err(), + "excludeTurns=true should not replay token usage" + ); Ok(()) } @@ -1514,7 +1510,7 @@ async fn thread_resume_can_skip_turns_when_thread_is_running() -> Result<()> { let resume_id = secondary .send_thread_resume_request(ThreadResumeParams { thread_id: thread.id.clone(), - include_turns: Some(false), + exclude_turns: true, ..Default::default() }) .await?; From ebea39365a4c9ce635ae63aba51a9efde35c8b28 Mon Sep 17 00:00:00 2001 From: David de Regt Date: Thu, 23 Apr 2026 00:28:02 -0700 Subject: [PATCH 6/6] add same excludeTurns parameter for thread/fork --- .../schema/json/ClientRequest.json | 4 ++ .../codex_app_server_protocol.schemas.json | 4 ++ .../codex_app_server_protocol.v2.schemas.json | 4 ++ .../schema/json/v2/ThreadForkParams.json | 4 ++ .../schema/typescript/v2/ThreadForkParams.ts | 5 ++ .../app-server-protocol/src/protocol/v2.rs | 5 ++ codex-rs/app-server/README.md | 4 +- .../app-server/src/codex_message_processor.rs | 66 +++++++++++-------- .../app-server/tests/suite/v2/thread_fork.rs | 48 ++++++++++++++ 9 files changed, 114 insertions(+), 30 deletions(-) diff --git a/codex-rs/app-server-protocol/schema/json/ClientRequest.json b/codex-rs/app-server-protocol/schema/json/ClientRequest.json index 061267613e39..d2808a6262b7 100644 --- a/codex-rs/app-server-protocol/schema/json/ClientRequest.json +++ b/codex-rs/app-server-protocol/schema/json/ClientRequest.json @@ -3312,6 +3312,10 @@ "ephemeral": { "type": "boolean" }, + "excludeTurns": { + "description": "When true, return only thread metadata and live fork state without populating `thread.turns`. This is useful when the client plans to call `thread/turns/list` immediately after forking.", + "type": "boolean" + }, "model": { "description": "Configuration overrides for the forked thread, if any.", "type": [ diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index 3aa90d05c433..c87a704c0039 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -14356,6 +14356,10 @@ "ephemeral": { "type": "boolean" }, + "excludeTurns": { + "description": "When true, return only thread metadata and live fork state without populating `thread.turns`. This is useful when the client plans to call `thread/turns/list` immediately after forking.", + "type": "boolean" + }, "model": { "description": "Configuration overrides for the forked thread, if any.", "type": [ diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json index 53f3c1d5cbd2..9f8b3d586efc 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json @@ -12243,6 +12243,10 @@ "ephemeral": { "type": "boolean" }, + "excludeTurns": { + "description": "When true, return only thread metadata and live fork state without populating `thread.turns`. This is useful when the client plans to call `thread/turns/list` immediately after forking.", + "type": "boolean" + }, "model": { "description": "Configuration overrides for the forked thread, if any.", "type": [ diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadForkParams.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadForkParams.json index 16a41d0d26c7..a603aff1e8d6 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadForkParams.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadForkParams.json @@ -400,6 +400,10 @@ "ephemeral": { "type": "boolean" }, + "excludeTurns": { + "description": "When true, return only thread metadata and live fork state without populating `thread.turns`. This is useful when the client plans to call `thread/turns/list` immediately after forking.", + "type": "boolean" + }, "model": { "description": "Configuration overrides for the forked thread, if any.", "type": [ diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadForkParams.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadForkParams.ts index bd73b3c3c3dd..f5f3f1878cc7 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadForkParams.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadForkParams.ts @@ -33,6 +33,11 @@ approvalsReviewer?: ApprovalsReviewer | null, sandbox?: SandboxMode | null, /** * with `sandbox`. */ permissionProfile?: PermissionProfile | null, config?: { [key in string]?: JsonValue } | null, baseInstructions?: string | null, developerInstructions?: string | null, ephemeral?: boolean, /** + * When true, return only thread metadata and live fork state without + * populating `thread.turns`. This is useful when the client plans to call + * `thread/turns/list` immediately after forking. + */ +excludeTurns?: boolean, /** * If true, persist additional rollout EventMsg variants required to * reconstruct a richer thread history on subsequent resume/fork/read. */ diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 964ce733221b..f869b5ddbf55 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -3526,6 +3526,11 @@ pub struct ThreadForkParams { pub developer_instructions: Option, #[serde(default, skip_serializing_if = "std::ops::Not::not")] pub ephemeral: bool, + /// When true, return only thread metadata and live fork state without + /// populating `thread.turns`. This is useful when the client plans to call + /// `thread/turns/list` immediately after forking. + #[serde(default, skip_serializing_if = "std::ops::Not::not")] + pub exclude_turns: bool, /// If true, persist additional rollout EventMsg variants required to /// reconstruct a richer thread history on subsequent resume/fork/read. #[experimental("thread/fork.persistFullHistory")] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index 7faf65cb80a7..9a6f4c4045bb 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -138,7 +138,7 @@ Example with notification opt-out: - `thread/start` — create a new thread; emits `thread/started` (including the current `thread.status`) and auto-subscribes you to turn/item events for that thread. When the request includes a `cwd` and the resolved sandbox is `workspace-write` or full access, app-server also marks that project as trusted in the user `config.toml`. Pass `sessionStartSource: "clear"` when starting a replacement thread after clearing the current session so `SessionStart` hooks receive `source: "clear"` instead of the default `"startup"`. For permissions, prefer `permissionProfile`; the legacy `sandbox` shorthand is still accepted but cannot be combined with `permissionProfile`. - `thread/resume` — reopen an existing thread by id so subsequent `turn/start` calls append to it. Accepts the same permission override rules as `thread/start`. -- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; if the source thread is currently mid-turn, the fork records the same interruption marker as `turn/interrupt` instead of inheriting an unmarked partial turn suffix. The returned `thread.forkedFromId` points at the source thread when known. Accepts `ephemeral: true` for an in-memory temporary fork, emits `thread/started` (including the current `thread.status`), and auto-subscribes you to turn/item events for the new thread. Accepts the same permission override rules as `thread/start`. +- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; if the source thread is currently mid-turn, the fork records the same interruption marker as `turn/interrupt` instead of inheriting an unmarked partial turn suffix. The returned `thread.forkedFromId` points at the source thread when known. Accepts `ephemeral: true` for an in-memory temporary fork, emits `thread/started` (including the current `thread.status`), and auto-subscribes you to turn/item events for the new thread. Pass `excludeTurns: true` when the client plans to page fork history via `thread/turns/list` instead of receiving the full turn array immediately. Accepts the same permission override rules as `thread/start`. - `thread/list` — page through stored rollouts; supports cursor-based pagination and optional `modelProviders`, `sourceKinds`, `archived`, `cwd`, and `searchTerm` filters. Each returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded. - `thread/loaded/list` — list the thread ids currently loaded in memory. - `thread/read` — read a stored thread by id without resuming it; optionally include turns via `includeTurns`. The returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded. @@ -287,6 +287,8 @@ To branch from a stored session, call `thread/fork` with the `thread.id`. This c { "method": "thread/started", "params": { "thread": { … } } } ``` +Like `thread/resume`, `thread/fork` also accepts `excludeTurns: true` to return only thread metadata in `thread.turns` and let the client page history with `thread/turns/list`. In that mode the server skips replaying restored `thread/tokenUsage/updated`, which keeps the fork path from rebuilding turns just to attribute historical usage. + Experimental API: `thread/start`, `thread/resume`, and `thread/fork` accept `persistExtendedHistory: true` to persist a richer subset of ThreadItems for non-lossy history when calling `thread/read`, `thread/resume`, and `thread/fork` later. This does not backfill events that were not persisted previously. ### Example: List threads (with pagination & filters) diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 0852c0f90d55..967329b0bec1 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -5010,8 +5010,10 @@ impl CodexMessageProcessor { base_instructions, developer_instructions, ephemeral, + exclude_turns, persist_extended_history, } = params; + let include_turns = !exclude_turns; if sandbox.is_some() && permission_profile.is_some() { self.send_invalid_request_error( request_id, @@ -5231,12 +5233,13 @@ impl CodexMessageProcessor { }) }) .map(|id| id.to_string()); - if let Err(message) = populate_thread_turns( - &mut thread, - ThreadTurnSource::HistoryItems(&history_items), - /*active_turn*/ None, - ) - .await + if include_turns + && let Err(message) = populate_thread_turns( + &mut thread, + ThreadTurnSource::HistoryItems(&history_items), + /*active_turn*/ None, + ) + .await { self.send_internal_error(request_id, message).await; return; @@ -5245,6 +5248,7 @@ impl CodexMessageProcessor { }; if let Some(fork_rollout_path) = session_configured.rollout_path.as_ref() + && include_turns && let Err(message) = populate_thread_turns( &mut thread, ThreadTurnSource::RolloutPath(fork_rollout_path.as_path()), @@ -5295,30 +5299,34 @@ impl CodexMessageProcessor { } let connection_id = request_id.connection_id; - let token_usage_thread = response.thread.clone(); - let token_usage_turn_id = if let Some(turn_id) = - latest_token_usage_turn_id_for_thread_path(&token_usage_thread).await - { - Some(turn_id) - } else { - latest_token_usage_turn_id_from_rollout_path( - rollout_path.as_path(), - token_usage_thread.turns.as_slice(), - ) - .await - }; + let token_usage_thread = include_turns.then(|| response.thread.clone()); self.outgoing.send_response(request_id, response).await; - // Mirror the resume contract for forks: the new thread is usable as soon - // as the response arrives, so restored usage must follow immediately. - send_thread_token_usage_update_to_connection( - &self.outgoing, - connection_id, - thread_id, - &token_usage_thread, - forked_thread.as_ref(), - token_usage_turn_id, - ) - .await; + // `excludeTurns` is the cheap fork path, so skip restored usage replay + // instead of rebuilding history only to attribute a historical update. + if let Some(token_usage_thread) = token_usage_thread { + let token_usage_turn_id = if let Some(turn_id) = + latest_token_usage_turn_id_for_thread_path(&token_usage_thread).await + { + Some(turn_id) + } else { + latest_token_usage_turn_id_from_rollout_path( + rollout_path.as_path(), + token_usage_thread.turns.as_slice(), + ) + .await + }; + // Mirror the resume contract for forks: the new thread is usable as soon + // as the response arrives, so restored usage must follow immediately. + send_thread_token_usage_update_to_connection( + &self.outgoing, + connection_id, + thread_id, + &token_usage_thread, + forked_thread.as_ref(), + token_usage_turn_id, + ) + .await; + } let notif = ThreadStartedNotification { thread }; self.outgoing diff --git a/codex-rs/app-server/tests/suite/v2/thread_fork.rs b/codex-rs/app-server/tests/suite/v2/thread_fork.rs index 0425ce3e3c9d..7741ced1636c 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_fork.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_fork.rs @@ -241,6 +241,54 @@ async fn thread_fork_emits_restored_token_usage_before_next_turn() -> Result<()> Ok(()) } +#[tokio::test] +async fn thread_fork_can_exclude_turns_and_skip_restored_token_usage() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let conversation_id = create_fake_rollout_with_token_usage( + codex_home.path(), + "2025-01-05T12-00-00", + "2025-01-05T12:00:00Z", + "Saved user message", + Some("mock_provider"), + )?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let fork_id = mcp + .send_thread_fork_request(ThreadForkParams { + thread_id: conversation_id.clone(), + exclude_turns: true, + ..Default::default() + }) + .await?; + let fork_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(fork_id)), + ) + .await??; + let ThreadForkResponse { thread, .. } = to_response::(fork_resp)?; + + assert_eq!(thread.forked_from_id, Some(conversation_id)); + assert_eq!(thread.preview, "Saved user message"); + assert!(thread.turns.is_empty()); + + let note = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("thread/tokenUsage/updated"), + ) + .await; + assert!( + note.is_err(), + "excludeTurns=true should not replay token usage" + ); + + Ok(()) +} + #[tokio::test] async fn thread_fork_tracks_thread_initialized_analytics() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await;