diff --git a/codex-rs/app-server-protocol/schema/json/ClientRequest.json b/codex-rs/app-server-protocol/schema/json/ClientRequest.json index 8f2324d8144f..d2808a6262b7 100644 --- a/codex-rs/app-server-protocol/schema/json/ClientRequest.json +++ b/codex-rs/app-server-protocol/schema/json/ClientRequest.json @@ -3312,6 +3312,10 @@ "ephemeral": { "type": "boolean" }, + "excludeTurns": { + "description": "When true, return only thread metadata and live fork state without populating `thread.turns`. This is useful when the client plans to call `thread/turns/list` immediately after forking.", + "type": "boolean" + }, "model": { "description": "Configuration overrides for the forked thread, if any.", "type": [ @@ -3715,6 +3719,10 @@ "null" ] }, + "excludeTurns": { + "description": "When true, return only thread metadata and live-resume state without populating `thread.turns`. This is useful when the client plans to call `thread/turns/list` immediately after resuming.", + "type": "boolean" + }, "model": { "description": "Configuration overrides for the resumed thread, if any.", "type": [ diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index 3eaf25b9b2d1..c87a704c0039 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -14356,6 +14356,10 @@ "ephemeral": { "type": "boolean" }, + "excludeTurns": { + "description": "When true, return only thread metadata and live fork state without populating `thread.turns`. This is useful when the client plans to call `thread/turns/list` immediately after forking.", + "type": "boolean" + }, "model": { "description": "Configuration overrides for the forked thread, if any.", "type": [ @@ -15785,6 +15789,10 @@ "null" ] }, + "excludeTurns": { + "description": "When true, return only thread metadata and live-resume state without populating `thread.turns`. This is useful when the client plans to call `thread/turns/list` immediately after resuming.", + "type": "boolean" + }, "model": { "description": "Configuration overrides for the resumed thread, if any.", "type": [ diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json index b1f9935150ca..9f8b3d586efc 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json @@ -12243,6 +12243,10 @@ "ephemeral": { "type": "boolean" }, + "excludeTurns": { + "description": "When true, return only thread metadata and live fork state without populating `thread.turns`. This is useful when the client plans to call `thread/turns/list` immediately after forking.", + "type": "boolean" + }, "model": { "description": "Configuration overrides for the forked thread, if any.", "type": [ @@ -13672,6 +13676,10 @@ "null" ] }, + "excludeTurns": { + "description": "When true, return only thread metadata and live-resume state without populating `thread.turns`. This is useful when the client plans to call `thread/turns/list` immediately after resuming.", + "type": "boolean" + }, "model": { "description": "Configuration overrides for the resumed thread, if any.", "type": [ diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadForkParams.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadForkParams.json index 16a41d0d26c7..a603aff1e8d6 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadForkParams.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadForkParams.json @@ -400,6 +400,10 @@ "ephemeral": { "type": "boolean" }, + "excludeTurns": { + "description": "When true, return only thread metadata and live fork state without populating `thread.turns`. This is useful when the client plans to call `thread/turns/list` immediately after forking.", + "type": "boolean" + }, "model": { "description": "Configuration overrides for the forked thread, if any.", "type": [ diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeParams.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeParams.json index 7fb27c08e86f..19ccad14c1f7 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeParams.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeParams.json @@ -1313,6 +1313,10 @@ "null" ] }, + "excludeTurns": { + "description": "When true, return only thread metadata and live-resume state without populating `thread.turns`. This is useful when the client plans to call `thread/turns/list` immediately after resuming.", + "type": "boolean" + }, "model": { "description": "Configuration overrides for the resumed thread, if any.", "type": [ diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadForkParams.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadForkParams.ts index bd73b3c3c3dd..f5f3f1878cc7 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadForkParams.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadForkParams.ts @@ -33,6 +33,11 @@ approvalsReviewer?: ApprovalsReviewer | null, sandbox?: SandboxMode | null, /** * with `sandbox`. */ permissionProfile?: PermissionProfile | null, config?: { [key in string]?: JsonValue } | null, baseInstructions?: string | null, developerInstructions?: string | null, ephemeral?: boolean, /** + * When true, return only thread metadata and live fork state without + * populating `thread.turns`. This is useful when the client plans to call + * `thread/turns/list` immediately after forking. + */ +excludeTurns?: boolean, /** * If true, persist additional rollout EventMsg variants required to * reconstruct a richer thread history on subsequent resume/fork/read. */ diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadResumeParams.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadResumeParams.ts index d245efe4a2ad..452126be46b4 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadResumeParams.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadResumeParams.ts @@ -42,6 +42,11 @@ approvalsReviewer?: ApprovalsReviewer | null, sandbox?: SandboxMode | null, /** * with `sandbox`. */ permissionProfile?: PermissionProfile | null, config?: { [key in string]?: JsonValue } | null, baseInstructions?: string | null, developerInstructions?: string | null, personality?: Personality | null, /** + * When true, return only thread metadata and live-resume state without + * populating `thread.turns`. This is useful when the client plans to call + * `thread/turns/list` immediately after resuming. + */ +excludeTurns?: boolean, /** * If true, persist additional rollout EventMsg variants required to * reconstruct a richer thread history on subsequent resume/fork/read. */ diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 937678793a21..f869b5ddbf55 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -3429,6 +3429,11 @@ pub struct ThreadResumeParams { pub developer_instructions: Option, #[ts(optional = nullable)] pub personality: Option, + /// When true, return only thread metadata and live-resume state without + /// populating `thread.turns`. This is useful when the client plans to call + /// `thread/turns/list` immediately after resuming. + #[serde(default, skip_serializing_if = "std::ops::Not::not")] + pub exclude_turns: bool, /// If true, persist additional rollout EventMsg variants required to /// reconstruct a richer thread history on subsequent resume/fork/read. #[experimental("thread/resume.persistFullHistory")] @@ -3521,6 +3526,11 @@ pub struct ThreadForkParams { pub developer_instructions: Option, #[serde(default, skip_serializing_if = "std::ops::Not::not")] pub ephemeral: bool, + /// When true, return only thread metadata and live fork state without + /// populating `thread.turns`. This is useful when the client plans to call + /// `thread/turns/list` immediately after forking. + #[serde(default, skip_serializing_if = "std::ops::Not::not")] + pub exclude_turns: bool, /// If true, persist additional rollout EventMsg variants required to /// reconstruct a richer thread history on subsequent resume/fork/read. #[experimental("thread/fork.persistFullHistory")] diff --git a/codex-rs/app-server/README.md b/codex-rs/app-server/README.md index 0ba0a30ae894..9a6f4c4045bb 100644 --- a/codex-rs/app-server/README.md +++ b/codex-rs/app-server/README.md @@ -138,7 +138,7 @@ Example with notification opt-out: - `thread/start` — create a new thread; emits `thread/started` (including the current `thread.status`) and auto-subscribes you to turn/item events for that thread. When the request includes a `cwd` and the resolved sandbox is `workspace-write` or full access, app-server also marks that project as trusted in the user `config.toml`. Pass `sessionStartSource: "clear"` when starting a replacement thread after clearing the current session so `SessionStart` hooks receive `source: "clear"` instead of the default `"startup"`. For permissions, prefer `permissionProfile`; the legacy `sandbox` shorthand is still accepted but cannot be combined with `permissionProfile`. - `thread/resume` — reopen an existing thread by id so subsequent `turn/start` calls append to it. Accepts the same permission override rules as `thread/start`. -- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; if the source thread is currently mid-turn, the fork records the same interruption marker as `turn/interrupt` instead of inheriting an unmarked partial turn suffix. The returned `thread.forkedFromId` points at the source thread when known. Accepts `ephemeral: true` for an in-memory temporary fork, emits `thread/started` (including the current `thread.status`), and auto-subscribes you to turn/item events for the new thread. Accepts the same permission override rules as `thread/start`. +- `thread/fork` — fork an existing thread into a new thread id by copying the stored history; if the source thread is currently mid-turn, the fork records the same interruption marker as `turn/interrupt` instead of inheriting an unmarked partial turn suffix. The returned `thread.forkedFromId` points at the source thread when known. Accepts `ephemeral: true` for an in-memory temporary fork, emits `thread/started` (including the current `thread.status`), and auto-subscribes you to turn/item events for the new thread. Pass `excludeTurns: true` when the client plans to page fork history via `thread/turns/list` instead of receiving the full turn array immediately. Accepts the same permission override rules as `thread/start`. - `thread/list` — page through stored rollouts; supports cursor-based pagination and optional `modelProviders`, `sourceKinds`, `archived`, `cwd`, and `searchTerm` filters. Each returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded. - `thread/loaded/list` — list the thread ids currently loaded in memory. - `thread/read` — read a stored thread by id without resuming it; optionally include turns via `includeTurns`. The returned `thread` includes `status` (`ThreadStatus`), defaulting to `notLoaded` when the thread is not currently loaded. @@ -259,6 +259,8 @@ Valid `personality` values are `"friendly"`, `"pragmatic"`, and `"none"`. When ` To continue a stored session, call `thread/resume` with the `thread.id` you previously recorded. The response shape matches `thread/start`. When the stored session includes persisted token usage, the server emits `thread/tokenUsage/updated` immediately after the response so clients can render restored usage before the next turn starts. You can also pass the same configuration overrides supported by `thread/start`, including `approvalsReviewer`. +By default, `thread/resume` includes the reconstructed turn history in `thread.turns`. Pass `excludeTurns: true` to return only thread metadata and live resume state, then call `thread/turns/list` separately if you want to page the turn history over the network. In that mode the server also skips replaying restored `thread/tokenUsage/updated`, which avoids rebuilding turns just to attribute historical usage. + By default, resume uses the latest persisted `model` and `reasoningEffort` values associated with the thread. Supplying any of `model`, `modelProvider`, `config.model`, or `config.model_reasoning_effort` disables that persisted fallback and uses the explicit overrides plus normal config resolution instead. Example: @@ -269,6 +271,12 @@ Example: "personality": "friendly" } } { "id": 11, "result": { "thread": { "id": "thr_123", … } } } + +{ "method": "thread/resume", "id": 12, "params": { + "threadId": "thr_123", + "excludeTurns": true +} } +{ "id": 12, "result": { "thread": { "id": "thr_123", "turns": [], … } } } ``` To branch from a stored session, call `thread/fork` with the `thread.id`. This creates a new thread id and emits a `thread/started` notification for it. When the source history includes persisted token usage, the server also emits `thread/tokenUsage/updated` for the new thread immediately after the response. If the source thread is actively running, the fork snapshots it as if the current turn had been interrupted first. Pass `ephemeral: true` when the fork should stay in-memory only: @@ -279,6 +287,8 @@ To branch from a stored session, call `thread/fork` with the `thread.id`. This c { "method": "thread/started", "params": { "thread": { … } } } ``` +Like `thread/resume`, `thread/fork` also accepts `excludeTurns: true` to return only thread metadata in `thread.turns` and let the client page history with `thread/turns/list`. In that mode the server skips replaying restored `thread/tokenUsage/updated`, which keeps the fork path from rebuilding turns just to attribute historical usage. + Experimental API: `thread/start`, `thread/resume`, and `thread/fork` accept `persistExtendedHistory: true` to persist a richer subset of ThreadItems for non-lossy history when calling `thread/read`, `thread/resume`, and `thread/fork` later. This does not backfill events that were not persisted previously. ### Example: List threads (with pagination & filters) diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 75a457acbd54..967329b0bec1 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -4465,8 +4465,10 @@ impl CodexMessageProcessor { base_instructions, developer_instructions, personality, + exclude_turns, persist_extended_history, } = params; + let include_turns = !exclude_turns; let thread_history = if let Some(history) = history { let Some(thread_history) = self @@ -4522,7 +4524,6 @@ impl CodexMessageProcessor { } }; - let fallback_model_provider = config.model_provider_id.clone(); let instruction_sources = Self::instruction_sources_from_config(&config).await; let response_history = thread_history.clone(); @@ -4572,8 +4573,8 @@ impl CodexMessageProcessor { codex_thread.as_ref(), &response_history, rollout_path.as_path(), - fallback_model_provider.as_str(), persisted_resume_metadata.as_ref(), + include_turns, ) .await { @@ -4627,24 +4628,28 @@ impl CodexMessageProcessor { } let connection_id = request_id.connection_id; - let token_usage_thread = response.thread.clone(); - let token_usage_turn_id = latest_token_usage_turn_id_from_rollout_items( - &response_history.get_rollout_items(), - &token_usage_thread, - ); + let token_usage_thread = include_turns.then(|| response.thread.clone()); self.outgoing.send_response(request_id, response).await; - // The client needs restored usage before it starts another turn. - // Sending after the response preserves JSON-RPC request ordering while - // still filling the status line before the next turn lifecycle begins. - send_thread_token_usage_update_to_connection( - &self.outgoing, - connection_id, - thread_id, - &token_usage_thread, - codex_thread.as_ref(), - token_usage_turn_id, - ) - .await; + // `excludeTurns` is explicitly the cheap resume path, so avoid + // rebuilding history only to attribute a replayed usage update. + if let Some(token_usage_thread) = token_usage_thread { + let token_usage_turn_id = latest_token_usage_turn_id_from_rollout_items( + &response_history.get_rollout_items(), + token_usage_thread.turns.as_slice(), + ); + // The client needs restored usage before it starts another turn. + // Sending after the response preserves JSON-RPC request ordering while + // still filling the status line before the next turn lifecycle begins. + send_thread_token_usage_update_to_connection( + &self.outgoing, + connection_id, + thread_id, + &token_usage_thread, + codex_thread.as_ref(), + token_usage_turn_id, + ) + .await; + } } Err(err) => { let error = JSONRPCErrorError { @@ -4835,6 +4840,7 @@ impl CodexMessageProcessor { config_snapshot, instruction_sources, thread_summary, + include_turns: !params.exclude_turns, }), ); if listener_command_tx.send(command).is_err() { @@ -4938,22 +4944,22 @@ impl CodexMessageProcessor { thread: &CodexThread, thread_history: &InitialHistory, rollout_path: &Path, - fallback_provider: &str, persisted_resume_metadata: Option<&ThreadMetadata>, + include_turns: bool, ) -> std::result::Result { + let config_snapshot = thread.config_snapshot().await; let thread = match thread_history { InitialHistory::Resumed(resumed) => { load_thread_summary_for_rollout( &self.config, resumed.conversation_id, resumed.rollout_path.as_path(), - fallback_provider, + config_snapshot.model_provider_id.as_str(), persisted_resume_metadata, ) .await } InitialHistory::Forked(items) => { - let config_snapshot = thread.config_snapshot().await; let mut thread = build_thread_from_snapshot( thread_id, &config_snapshot, @@ -4969,13 +4975,15 @@ impl CodexMessageProcessor { let mut thread = thread?; thread.id = thread_id.to_string(); thread.path = Some(rollout_path.to_path_buf()); - let history_items = thread_history.get_rollout_items(); - populate_thread_turns( - &mut thread, - ThreadTurnSource::HistoryItems(&history_items), - /*active_turn*/ None, - ) - .await?; + if include_turns { + let history_items = thread_history.get_rollout_items(); + populate_thread_turns( + &mut thread, + ThreadTurnSource::HistoryItems(&history_items), + /*active_turn*/ None, + ) + .await?; + } self.attach_thread_name(thread_id, &mut thread).await; Ok(thread) } @@ -5002,8 +5010,10 @@ impl CodexMessageProcessor { base_instructions, developer_instructions, ephemeral, + exclude_turns, persist_extended_history, } = params; + let include_turns = !exclude_turns; if sandbox.is_some() && permission_profile.is_some() { self.send_invalid_request_error( request_id, @@ -5223,12 +5233,13 @@ impl CodexMessageProcessor { }) }) .map(|id| id.to_string()); - if let Err(message) = populate_thread_turns( - &mut thread, - ThreadTurnSource::HistoryItems(&history_items), - /*active_turn*/ None, - ) - .await + if include_turns + && let Err(message) = populate_thread_turns( + &mut thread, + ThreadTurnSource::HistoryItems(&history_items), + /*active_turn*/ None, + ) + .await { self.send_internal_error(request_id, message).await; return; @@ -5237,6 +5248,7 @@ impl CodexMessageProcessor { }; if let Some(fork_rollout_path) = session_configured.rollout_path.as_ref() + && include_turns && let Err(message) = populate_thread_turns( &mut thread, ThreadTurnSource::RolloutPath(fork_rollout_path.as_path()), @@ -5287,30 +5299,34 @@ impl CodexMessageProcessor { } let connection_id = request_id.connection_id; - let token_usage_thread = response.thread.clone(); - let token_usage_turn_id = if let Some(turn_id) = - latest_token_usage_turn_id_for_thread_path(&token_usage_thread).await - { - Some(turn_id) - } else { - latest_token_usage_turn_id_from_rollout_path( - rollout_path.as_path(), + let token_usage_thread = include_turns.then(|| response.thread.clone()); + self.outgoing.send_response(request_id, response).await; + // `excludeTurns` is the cheap fork path, so skip restored usage replay + // instead of rebuilding history only to attribute a historical update. + if let Some(token_usage_thread) = token_usage_thread { + let token_usage_turn_id = if let Some(turn_id) = + latest_token_usage_turn_id_for_thread_path(&token_usage_thread).await + { + Some(turn_id) + } else { + latest_token_usage_turn_id_from_rollout_path( + rollout_path.as_path(), + token_usage_thread.turns.as_slice(), + ) + .await + }; + // Mirror the resume contract for forks: the new thread is usable as soon + // as the response arrives, so restored usage must follow immediately. + send_thread_token_usage_update_to_connection( + &self.outgoing, + connection_id, + thread_id, &token_usage_thread, + forked_thread.as_ref(), + token_usage_turn_id, ) - .await - }; - self.outgoing.send_response(request_id, response).await; - // Mirror the resume contract for forks: the new thread is usable as soon - // as the response arrives, so restored usage must follow immediately. - send_thread_token_usage_update_to_connection( - &self.outgoing, - connection_id, - thread_id, - &token_usage_thread, - forked_thread.as_ref(), - token_usage_turn_id, - ) - .await; + .await; + } let notif = ThreadStartedNotification { thread }; self.outgoing @@ -8640,12 +8656,13 @@ async fn handle_pending_thread_resume_request( let request_id = pending.request_id; let connection_id = request_id.connection_id; let mut thread = pending.thread_summary; - if let Err(message) = populate_thread_turns( - &mut thread, - ThreadTurnSource::RolloutPath(pending.rollout_path.as_path()), - active_turn.as_ref(), - ) - .await + if pending.include_turns + && let Err(message) = populate_thread_turns( + &mut thread, + ThreadTurnSource::RolloutPath(pending.rollout_path.as_path()), + active_turn.as_ref(), + ) + .await { outgoing .send_error( @@ -8730,24 +8747,28 @@ async fn handle_pending_thread_resume_request( permission_profile, reasoning_effort, }; - let token_usage_thread = response.thread.clone(); - let token_usage_turn_id = latest_token_usage_turn_id_from_rollout_path( - pending.rollout_path.as_path(), - &token_usage_thread, - ) - .await; + let token_usage_thread = pending.include_turns.then(|| response.thread.clone()); outgoing.send_response(request_id, response).await; - // Rejoining a loaded thread has the same UI contract as a cold resume, but - // uses the live conversation state instead of reconstructing a new session. - send_thread_token_usage_update_to_connection( - outgoing, - connection_id, - conversation_id, - &token_usage_thread, - conversation.as_ref(), - token_usage_turn_id, - ) - .await; + // Match cold resume: metadata-only resume should attach the listener without + // paying the cost of turn reconstruction for historical usage replay. + if let Some(token_usage_thread) = token_usage_thread { + let token_usage_turn_id = latest_token_usage_turn_id_from_rollout_path( + pending.rollout_path.as_path(), + token_usage_thread.turns.as_slice(), + ) + .await; + // Rejoining a loaded thread has the same UI contract as a cold resume, but + // uses the live conversation state instead of reconstructing a new session. + send_thread_token_usage_update_to_connection( + outgoing, + connection_id, + conversation_id, + &token_usage_thread, + conversation.as_ref(), + token_usage_turn_id, + ) + .await; + } outgoing .replay_requests_to_connection_for_thread(connection_id, conversation_id) .await; @@ -10610,6 +10631,7 @@ mod tests { base_instructions: None, developer_instructions: None, personality: None, + exclude_turns: false, persist_extended_history: false, }; let config_snapshot = ThreadConfigSnapshot { diff --git a/codex-rs/app-server/src/codex_message_processor/token_usage_replay.rs b/codex-rs/app-server/src/codex_message_processor/token_usage_replay.rs index d3c06f5e7cd5..bcd972a47063 100644 --- a/codex-rs/app-server/src/codex_message_processor/token_usage_replay.rs +++ b/codex-rs/app-server/src/codex_message_processor/token_usage_replay.rs @@ -17,6 +17,7 @@ use codex_app_server_protocol::Thread; use codex_app_server_protocol::ThreadHistoryBuilder; use codex_app_server_protocol::ThreadTokenUsage; use codex_app_server_protocol::ThreadTokenUsageUpdatedNotification; +use codex_app_server_protocol::Turn; use codex_app_server_protocol::TurnStatus; use codex_core::CodexThread; use codex_protocol::ThreadId; @@ -60,15 +61,15 @@ pub(super) async fn send_thread_token_usage_update_to_connection( pub(super) async fn latest_token_usage_turn_id_for_thread_path(thread: &Thread) -> Option { let rollout_path = thread.path.as_deref()?; - latest_token_usage_turn_id_from_rollout_path(rollout_path, thread).await + latest_token_usage_turn_id_from_rollout_path(rollout_path, thread.turns.as_slice()).await } pub(super) async fn latest_token_usage_turn_id_from_rollout_path( rollout_path: &Path, - thread: &Thread, + turns: &[Turn], ) -> Option { let rollout_items = read_rollout_items_from_rollout(rollout_path).await.ok()?; - latest_token_usage_turn_id_from_rollout_items(&rollout_items, thread) + latest_token_usage_turn_id_from_rollout_items(&rollout_items, turns) } /// Identifies the turn that was active when a `TokenCount` record appeared. @@ -82,21 +83,8 @@ struct TokenUsageTurnOwner { pub(super) fn latest_token_usage_turn_id_from_rollout_items( rollout_items: &[RolloutItem], - thread: &Thread, + turns: &[Turn], ) -> Option { - let owner = latest_token_usage_turn_owner_from_rollout_items(rollout_items)?; - if thread.turns.iter().any(|turn| turn.id == owner.id) { - return Some(owner.id); - } - owner - .position - .and_then(|position| thread.turns.get(position)) - .map(|turn| turn.id.clone()) -} - -fn latest_token_usage_turn_owner_from_rollout_items( - rollout_items: &[RolloutItem], -) -> Option { let mut builder = ThreadHistoryBuilder::new(); let mut token_usage_turn_owner = None; @@ -113,7 +101,15 @@ fn latest_token_usage_turn_owner_from_rollout_items( builder.handle_rollout_item(item); } - token_usage_turn_owner + let owner = token_usage_turn_owner?; + if turns.iter().any(|turn| turn.id == owner.id) { + Some(owner.id) + } else { + owner + .position + .and_then(|position| turns.get(position)) + .map(|turn| turn.id.clone()) + } } /// Chooses a fallback turn id that should own a replayed token usage update. diff --git a/codex-rs/app-server/src/thread_state.rs b/codex-rs/app-server/src/thread_state.rs index 504e59468b8c..323aba19d70e 100644 --- a/codex-rs/app-server/src/thread_state.rs +++ b/codex-rs/app-server/src/thread_state.rs @@ -31,6 +31,7 @@ pub(crate) struct PendingThreadResumeRequest { pub(crate) config_snapshot: ThreadConfigSnapshot, pub(crate) instruction_sources: Vec, pub(crate) thread_summary: codex_app_server_protocol::Thread, + pub(crate) include_turns: bool, } // ThreadListenerCommand is used to perform operations in the context of the thread listener, for serialization purposes. diff --git a/codex-rs/app-server/tests/suite/v2/thread_fork.rs b/codex-rs/app-server/tests/suite/v2/thread_fork.rs index 0425ce3e3c9d..7741ced1636c 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_fork.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_fork.rs @@ -241,6 +241,54 @@ async fn thread_fork_emits_restored_token_usage_before_next_turn() -> Result<()> Ok(()) } +#[tokio::test] +async fn thread_fork_can_exclude_turns_and_skip_restored_token_usage() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let conversation_id = create_fake_rollout_with_token_usage( + codex_home.path(), + "2025-01-05T12-00-00", + "2025-01-05T12:00:00Z", + "Saved user message", + Some("mock_provider"), + )?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let fork_id = mcp + .send_thread_fork_request(ThreadForkParams { + thread_id: conversation_id.clone(), + exclude_turns: true, + ..Default::default() + }) + .await?; + let fork_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(fork_id)), + ) + .await??; + let ThreadForkResponse { thread, .. } = to_response::(fork_resp)?; + + assert_eq!(thread.forked_from_id, Some(conversation_id)); + assert_eq!(thread.preview, "Saved user message"); + assert!(thread.turns.is_empty()); + + let note = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("thread/tokenUsage/updated"), + ) + .await; + assert!( + note.is_err(), + "excludeTurns=true should not replay token usage" + ); + + Ok(()) +} + #[tokio::test] async fn thread_fork_tracks_thread_initialized_analytics() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; diff --git a/codex-rs/app-server/tests/suite/v2/thread_resume.rs b/codex-rs/app-server/tests/suite/v2/thread_resume.rs index 7a5f2c543d0b..c86f88825d82 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_resume.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_resume.rs @@ -285,6 +285,45 @@ async fn thread_resume_returns_rollout_history() -> Result<()> { Ok(()) } +#[tokio::test] +async fn thread_resume_can_skip_turns_for_metadata_only_resume() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let conversation_id = create_fake_rollout_with_text_elements( + codex_home.path(), + "2025-01-05T12-00-00", + "2025-01-05T12:00:00Z", + "Saved user message", + Vec::new(), + Some("mock_provider"), + /*git_info*/ None, + )?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let resume_id = mcp + .send_thread_resume_request(ThreadResumeParams { + thread_id: conversation_id.clone(), + exclude_turns: true, + ..Default::default() + }) + .await?; + let resume_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(resume_id)), + ) + .await??; + let ThreadResumeResponse { thread, .. } = to_response::(resume_resp)?; + + assert_eq!(thread.id, conversation_id); + assert!(thread.turns.is_empty()); + + Ok(()) +} + #[tokio::test] async fn thread_resume_emits_restored_token_usage_before_next_turn() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; @@ -338,6 +377,80 @@ async fn thread_resume_emits_restored_token_usage_before_next_turn() -> Result<( Ok(()) } +#[tokio::test] +async fn thread_resume_skips_restored_token_usage_when_turns_are_excluded() -> Result<()> { + let server = create_mock_responses_server_repeating_assistant("Done").await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let conversation_id = create_fake_rollout_with_token_usage( + codex_home.path(), + "2025-01-05T12-00-00", + "2025-01-05T12:00:00Z", + "Saved user message", + Some("mock_provider"), + )?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let first_resume_id = mcp + .send_thread_resume_request(ThreadResumeParams { + thread_id: conversation_id.clone(), + ..Default::default() + }) + .await?; + let first_resume_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(first_resume_id)), + ) + .await??; + let ThreadResumeResponse { thread, .. } = + to_response::(first_resume_resp)?; + let expected_turn_id = thread.turns[0].id.clone(); + + let first_note = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("thread/tokenUsage/updated"), + ) + .await??; + let parsed: ServerNotification = first_note.try_into()?; + let ServerNotification::ThreadTokenUsageUpdated(notification) = parsed else { + panic!("expected thread/tokenUsage/updated notification"); + }; + assert_eq!(notification.turn_id, expected_turn_id); + + let second_resume_id = mcp + .send_thread_resume_request(ThreadResumeParams { + thread_id: conversation_id, + exclude_turns: true, + ..Default::default() + }) + .await?; + let second_resume_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(second_resume_id)), + ) + .await??; + let ThreadResumeResponse { + thread: resumed_again, + .. + } = to_response::(second_resume_resp)?; + assert!(resumed_again.turns.is_empty()); + + let second_note = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("thread/tokenUsage/updated"), + ) + .await; + assert!( + second_note.is_err(), + "excludeTurns=true should not replay token usage" + ); + + Ok(()) +} + #[tokio::test] async fn thread_resume_token_usage_replay_ignores_stale_interrupted_tail_turn() -> Result<()> { let server = create_mock_responses_server_repeating_assistant("Done").await; @@ -1339,6 +1452,84 @@ async fn thread_resume_rejoins_running_thread_even_with_override_mismatch() -> R Ok(()) } +#[tokio::test] +async fn thread_resume_can_skip_turns_when_thread_is_running() -> Result<()> { + let server = responses::start_mock_server().await; + let _response_mock = responses::mount_sse_once( + &server, + responses::sse(vec![ + responses::ev_response_created("resp-1"), + responses::ev_assistant_message("msg-1", "Done"), + responses::ev_completed("resp-1"), + ]), + ) + .await; + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri())?; + + let mut primary = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, primary.initialize()).await??; + + let start_id = primary + .send_thread_start_request(ThreadStartParams { + model: Some("gpt-5.4".to_string()), + ..Default::default() + }) + .await?; + let start_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_response_message(RequestId::Integer(start_id)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(start_resp)?; + + let turn_id = primary + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![UserInput::Text { + text: "seed history".to_string(), + text_elements: Vec::new(), + }], + ..Default::default() + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_response_message(RequestId::Integer(turn_id)), + ) + .await??; + timeout( + DEFAULT_READ_TIMEOUT, + primary.read_stream_until_notification_message("turn/completed"), + ) + .await??; + + let mut secondary = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, secondary.initialize()).await??; + + let resume_id = secondary + .send_thread_resume_request(ThreadResumeParams { + thread_id: thread.id.clone(), + exclude_turns: true, + ..Default::default() + }) + .await?; + let resume_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + secondary.read_stream_until_response_message(RequestId::Integer(resume_id)), + ) + .await??; + let ThreadResumeResponse { + thread: resumed, .. + } = to_response::(resume_resp)?; + + assert_eq!(resumed.id, thread.id); + assert_eq!(resumed.status, ThreadStatus::Idle); + assert!(resumed.turns.is_empty()); + + Ok(()) +} + #[tokio::test] async fn thread_resume_replays_pending_command_execution_request_approval() -> Result<()> { let responses = vec![