From 3aaacfc79a5fd1b5bc821f1f5e65470e82403bfc Mon Sep 17 00:00:00 2001 From: Albin Cassirer Date: Thu, 23 Apr 2026 09:43:50 -0700 Subject: [PATCH] Trace tool and code-mode runtime boundaries --- codex-rs/Cargo.lock | 1 + codex-rs/code-mode/src/lib.rs | 2 + codex-rs/code-mode/src/runtime/mod.rs | 50 +- codex-rs/code-mode/src/service.rs | 128 +++-- codex-rs/core/src/session/tests.rs | 2 + .../core/src/session/tests/guardian_tests.rs | 5 + .../src/tools/code_mode/execute_handler.rs | 25 +- codex-rs/core/src/tools/code_mode/mod.rs | 38 +- .../core/src/tools/code_mode/wait_handler.rs | 25 +- codex-rs/core/src/tools/context.rs | 12 +- .../src/tools/handlers/apply_patch_tests.rs | 1 + codex-rs/core/src/tools/handlers/mcp.rs | 3 + .../src/tools/handlers/multi_agents_tests.rs | 1 + .../handlers/request_user_input_tests.rs | 1 + .../core/src/tools/handlers/shell_tests.rs | 4 + .../src/tools/handlers/unified_exec_tests.rs | 4 + codex-rs/core/src/tools/mod.rs | 1 + codex-rs/core/src/tools/registry.rs | 40 +- codex-rs/core/src/tools/registry_tests.rs | 6 +- codex-rs/core/src/tools/router.rs | 3 +- .../core/src/tools/tool_dispatch_trace.rs | 139 ++++++ .../src/tools/tool_dispatch_trace_tests.rs | 337 +++++++++++++ codex-rs/rollout-trace/Cargo.toml | 1 + codex-rs/rollout-trace/src/code_cell.rs | 185 +++++++ codex-rs/rollout-trace/src/compaction.rs | 6 +- codex-rs/rollout-trace/src/inference.rs | 6 +- codex-rs/rollout-trace/src/lib.rs | 14 + codex-rs/rollout-trace/src/recorder.rs | 103 ++++ codex-rs/rollout-trace/src/recorder_tests.rs | 9 + codex-rs/rollout-trace/src/tool_dispatch.rs | 471 ++++++++++++++++++ 30 files changed, 1553 insertions(+), 70 deletions(-) create mode 100644 codex-rs/core/src/tools/tool_dispatch_trace.rs create mode 100644 codex-rs/core/src/tools/tool_dispatch_trace_tests.rs create mode 100644 codex-rs/rollout-trace/src/code_cell.rs create mode 100644 codex-rs/rollout-trace/src/tool_dispatch.rs diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index d1787fdfe30a..60b8309c43f2 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -3202,6 +3202,7 @@ name = "codex-rollout-trace" version = "0.0.0" dependencies = [ "anyhow", + "codex-code-mode", "codex-protocol", "pretty_assertions", "serde", diff --git a/codex-rs/code-mode/src/lib.rs b/codex-rs/code-mode/src/lib.rs index bb27d999601a..bf0ce699ccc3 100644 --- a/codex-rs/code-mode/src/lib.rs +++ b/codex-rs/code-mode/src/lib.rs @@ -18,11 +18,13 @@ pub use description::render_json_schema_to_typescript; pub use response::DEFAULT_IMAGE_DETAIL; pub use response::FunctionCallOutputContentItem; pub use response::ImageDetail; +pub use runtime::CodeModeNestedToolCall; pub use runtime::DEFAULT_EXEC_YIELD_TIME_MS; pub use runtime::DEFAULT_MAX_OUTPUT_TOKENS_PER_EXEC_CALL; pub use runtime::DEFAULT_WAIT_YIELD_TIME_MS; pub use runtime::ExecuteRequest; pub use runtime::RuntimeResponse; +pub use runtime::WaitOutcome; pub use runtime::WaitRequest; pub use service::CodeModeService; pub use service::CodeModeTurnHost; diff --git a/codex-rs/code-mode/src/runtime/mod.rs b/codex-rs/code-mode/src/runtime/mod.rs index 0f50edd329c7..200a47c98970 100644 --- a/codex-rs/code-mode/src/runtime/mod.rs +++ b/codex-rs/code-mode/src/runtime/mod.rs @@ -10,6 +10,7 @@ use std::sync::mpsc as std_mpsc; use std::thread; use codex_protocol::ToolName; +use serde::Serialize; use serde_json::Value as JsonValue; use tokio::sync::mpsc; @@ -25,6 +26,11 @@ const EXIT_SENTINEL: &str = "__codex_code_mode_exit__"; #[derive(Clone, Debug)] pub struct ExecuteRequest { + /// Runtime cell id for this execution. + /// + /// Callers allocate this before execution so tracing, waits, and nested tool + /// calls can refer to the cell as soon as JavaScript starts. + pub cell_id: String, pub tool_call_id: String, pub enabled_tools: Vec, pub source: String, @@ -40,7 +46,30 @@ pub struct WaitRequest { pub terminate: bool, } +/// Result of waiting on a code-mode cell. +/// +/// The wrapped `RuntimeResponse` is the model-facing wait result. The enum +/// variant carries the extra lifecycle provenance that `RuntimeResponse` cannot: +/// a failed real cell and a missing-cell wait both use +/// `RuntimeResponse::Result { error_text: Some(..), .. }`, but only the former +/// should be treated as a code-cell lifecycle event. #[derive(Debug, PartialEq)] +pub enum WaitOutcome { + /// The requested code cell was live when the wait command was accepted. + LiveCell(RuntimeResponse), + /// The requested code cell was not live. + MissingCell(RuntimeResponse), +} + +impl From for RuntimeResponse { + fn from(outcome: WaitOutcome) -> Self { + match outcome { + WaitOutcome::LiveCell(response) | WaitOutcome::MissingCell(response) => response, + } + } +} + +#[derive(Debug, PartialEq, Serialize)] pub enum RuntimeResponse { Yielded { cell_id: String, @@ -58,14 +87,22 @@ pub enum RuntimeResponse { }, } +/// Nested tool request emitted by one code-mode cell. +/// +/// Code mode owns the per-cell runtime id. Hosts should preserve it for +/// provenance/debugging, but should still assign their own runtime tool call id +/// if their tool-call graph requires globally unique ids. +#[derive(Debug)] +pub struct CodeModeNestedToolCall { + pub cell_id: String, + pub runtime_tool_call_id: String, + pub tool_name: ToolName, + pub input: Option, +} + #[derive(Debug)] pub(crate) enum TurnMessage { - ToolCall { - cell_id: String, - id: String, - name: ToolName, - input: Option, - }, + ToolCall(CodeModeNestedToolCall), Notify { cell_id: String, call_id: String, @@ -331,6 +368,7 @@ mod tests { fn execute_request(source: &str) -> ExecuteRequest { ExecuteRequest { + cell_id: "1".to_string(), tool_call_id: "call_1".to_string(), enabled_tools: Vec::new(), source: source.to_string(), diff --git a/codex-rs/code-mode/src/service.rs b/codex-rs/code-mode/src/service.rs index 4a46d36b41ba..7326c834e2a9 100644 --- a/codex-rs/code-mode/src/service.rs +++ b/codex-rs/code-mode/src/service.rs @@ -5,7 +5,6 @@ use std::sync::atomic::Ordering; use std::time::Duration; use async_trait::async_trait; -use codex_protocol::ToolName; use serde_json::Value as JsonValue; use tokio::sync::Mutex; use tokio::sync::mpsc; @@ -14,12 +13,14 @@ use tokio_util::sync::CancellationToken; use tracing::warn; use crate::FunctionCallOutputContentItem; +use crate::runtime::CodeModeNestedToolCall; use crate::runtime::DEFAULT_EXEC_YIELD_TIME_MS; use crate::runtime::ExecuteRequest; use crate::runtime::RuntimeCommand; use crate::runtime::RuntimeEvent; use crate::runtime::RuntimeResponse; use crate::runtime::TurnMessage; +use crate::runtime::WaitOutcome; use crate::runtime::WaitRequest; use crate::runtime::spawn_runtime; @@ -27,8 +28,7 @@ use crate::runtime::spawn_runtime; pub trait CodeModeTurnHost: Send + Sync { async fn invoke_tool( &self, - tool_name: ToolName, - input: Option, + invocation: CodeModeNestedToolCall, cancellation_token: CancellationToken, ) -> Result; @@ -76,24 +76,45 @@ impl CodeModeService { *self.inner.stored_values.lock().await = values; } - pub async fn execute(&self, request: ExecuteRequest) -> Result { - let cell_id = self - .inner + /// Reserves the runtime cell id for a future `execute` request. + /// + /// The runtime can issue nested tool calls before the first `execute` + /// response is returned. Hosts that need a parent trace object for those + /// nested calls should allocate the cell id up front and pass it back on the + /// `ExecuteRequest`. + pub fn allocate_cell_id(&self) -> String { + self.inner .next_cell_id .fetch_add(1, Ordering::Relaxed) - .to_string(); + .to_string() + } + + pub async fn execute(&self, request: ExecuteRequest) -> Result { + let cell_id = request.cell_id.clone(); + let initial_yield_time_ms = request.yield_time_ms.unwrap_or(DEFAULT_EXEC_YIELD_TIME_MS); let (event_tx, event_rx) = mpsc::unbounded_channel(); - let (runtime_tx, runtime_terminate_handle) = spawn_runtime(request.clone(), event_tx)?; let (control_tx, control_rx) = mpsc::unbounded_channel(); let (response_tx, response_rx) = oneshot::channel(); + let (runtime_tx, runtime_terminate_handle) = { + let mut sessions = self.inner.sessions.lock().await; + if sessions.contains_key(&cell_id) { + return Err(format!("exec cell {cell_id} already exists")); + } - self.inner.sessions.lock().await.insert( - cell_id.clone(), - SessionHandle { - control_tx: control_tx.clone(), - runtime_tx: runtime_tx.clone(), - }, - ); + let (runtime_tx, runtime_terminate_handle) = spawn_runtime(request, event_tx)?; + + // Keep the session registry locked through insertion so a + // caller-owned cell id cannot race with another execute and replace + // a live runtime. + sessions.insert( + cell_id.clone(), + SessionHandle { + control_tx, + runtime_tx: runtime_tx.clone(), + }, + ); + (runtime_tx, runtime_terminate_handle) + }; tokio::spawn(run_session_control( Arc::clone(&self.inner), @@ -105,7 +126,7 @@ impl CodeModeService { event_rx, control_rx, response_tx, - request.yield_time_ms.unwrap_or(DEFAULT_EXEC_YIELD_TIME_MS), + initial_yield_time_ms, )); response_rx @@ -113,7 +134,7 @@ impl CodeModeService { .map_err(|_| "exec runtime ended unexpectedly".to_string()) } - pub async fn wait(&self, request: WaitRequest) -> Result { + pub async fn wait(&self, request: WaitRequest) -> Result { let cell_id = request.cell_id.clone(); let handle = self .inner @@ -123,7 +144,7 @@ impl CodeModeService { .get(&request.cell_id) .cloned(); let Some(handle) = handle else { - return Ok(missing_cell_response(cell_id)); + return Ok(WaitOutcome::MissingCell(missing_cell_response(cell_id))); }; let (response_tx, response_rx) = oneshot::channel(); let control_message = if request.terminate { @@ -135,11 +156,13 @@ impl CodeModeService { } }; if handle.control_tx.send(control_message).is_err() { - return Ok(missing_cell_response(cell_id)); + return Ok(WaitOutcome::MissingCell(missing_cell_response(cell_id))); } match response_rx.await { - Ok(response) => Ok(response), - Err(_) => Ok(missing_cell_response(request.cell_id)), + Ok(response) => Ok(WaitOutcome::LiveCell(response)), + Err(_) => Ok(WaitOutcome::MissingCell(missing_cell_response( + request.cell_id, + ))), } } @@ -169,18 +192,14 @@ impl CodeModeService { ); } } - TurnMessage::ToolCall { - cell_id, - id, - name, - input, - } => { + TurnMessage::ToolCall(invocation) => { let host = Arc::clone(&host); let inner = Arc::clone(&inner); tokio::spawn(async move { - let response = host - .invoke_tool(name, input, CancellationToken::new()) - .await; + let cell_id = invocation.cell_id.clone(); + let runtime_tool_call_id = invocation.runtime_tool_call_id.clone(); + let response = + host.invoke_tool(invocation, CancellationToken::new()).await; let runtime_tx = inner .sessions .lock() @@ -191,8 +210,14 @@ impl CodeModeService { return; }; let command = match response { - Ok(result) => RuntimeCommand::ToolResponse { id, result }, - Err(error_text) => RuntimeCommand::ToolError { id, error_text }, + Ok(result) => RuntimeCommand::ToolResponse { + id: runtime_tool_call_id, + result, + }, + Err(error_text) => RuntimeCommand::ToolError { + id: runtime_tool_call_id, + error_text, + }, }; let _ = runtime_tx.send(command); }); @@ -361,12 +386,16 @@ async fn run_session_control( }).await; } RuntimeEvent::ToolCall { id, name, input } => { - let _ = inner.turn_message_tx.send(TurnMessage::ToolCall { + let tool_call = CodeModeNestedToolCall { cell_id: cell_id.clone(), - id, - name, + runtime_tool_call_id: id, + tool_name: name, input, - }).await; + }; + let _ = inner + .turn_message_tx + .send(TurnMessage::ToolCall(tool_call)) + .await; } RuntimeEvent::Result { stored_values, @@ -479,6 +508,8 @@ mod tests { use super::RuntimeResponse; use super::SessionControlCommand; use super::SessionControlContext; + use super::WaitOutcome; + use super::WaitRequest; use super::run_session_control; use crate::FunctionCallOutputContentItem; use crate::runtime::ExecuteRequest; @@ -487,6 +518,7 @@ mod tests { fn execute_request(source: &str) -> ExecuteRequest { ExecuteRequest { + cell_id: "1".to_string(), tool_call_id: "call_1".to_string(), enabled_tools: Vec::new(), source: source.to_string(), @@ -829,6 +861,30 @@ image({ ); } + #[tokio::test] + async fn wait_reports_missing_cell_separately_from_runtime_results() { + let service = CodeModeService::new(); + + let response = service + .wait(WaitRequest { + cell_id: "missing".to_string(), + yield_time_ms: 1, + terminate: false, + }) + .await + .unwrap(); + + assert_eq!( + response, + WaitOutcome::MissingCell(RuntimeResponse::Result { + cell_id: "missing".to_string(), + content_items: Vec::new(), + stored_values: HashMap::new(), + error_text: Some("exec cell missing not found".to_string()), + }) + ); + } + #[tokio::test] async fn terminate_waits_for_runtime_shutdown_before_responding() { let inner = test_inner(); diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index 4e47596b8411..4327cb3878e6 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -6919,6 +6919,7 @@ async fn rejects_escalated_permissions_when_policy_not_on_request() { tracker: Arc::clone(&turn_diff_tracker), call_id, tool_name: codex_tools::ToolName::plain(tool_name), + source: crate::tools::context::ToolCallSource::Direct, payload: ToolPayload::Function { arguments: serde_json::json!({ "command": params.command.clone(), @@ -6998,6 +6999,7 @@ async fn unified_exec_rejects_escalated_permissions_when_policy_not_on_request() tracker: Arc::clone(&tracker), call_id: "exec-call".to_string(), tool_name: codex_tools::ToolName::plain("exec_command"), + source: crate::tools::context::ToolCallSource::Direct, payload: ToolPayload::Function { arguments: serde_json::json!({ "cmd": "echo hi", diff --git a/codex-rs/core/src/session/tests/guardian_tests.rs b/codex-rs/core/src/session/tests/guardian_tests.rs index 22db9f393729..af678215f7ed 100644 --- a/codex-rs/core/src/session/tests/guardian_tests.rs +++ b/codex-rs/core/src/session/tests/guardian_tests.rs @@ -9,6 +9,7 @@ use crate::exec_policy::ExecPolicyManager; use crate::guardian::GUARDIAN_REVIEWER_NAME; use crate::sandboxing::SandboxPermissions; use crate::tools::context::FunctionToolOutput; +use crate::tools::context::ToolCallSource; use crate::turn_diff_tracker::TurnDiffTracker; use codex_app_server_protocol::ConfigLayerSource; use codex_exec_server::EnvironmentManager; @@ -341,6 +342,7 @@ async fn guardian_allows_shell_additional_permissions_requests_past_policy_valid tracker: Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new())), call_id: "test-call".to_string(), tool_name: codex_tools::ToolName::plain("shell"), + source: crate::tools::context::ToolCallSource::Direct, payload: ToolPayload::Function { arguments: serde_json::json!({ "command": params.command.clone(), @@ -476,6 +478,7 @@ async fn strict_auto_review_turn_grant_forces_guardian_for_shell_policy_skip() { tracker: Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new())), call_id: "strict-shell-call".to_string(), tool_name: codex_tools::ToolName::plain("shell"), + source: ToolCallSource::Direct, payload: ToolPayload::Function { arguments: serde_json::json!({ "command": command, @@ -521,6 +524,7 @@ async fn guardian_allows_unified_exec_additional_permissions_requests_past_polic tracker: Arc::clone(&tracker), call_id: "exec-call".to_string(), tool_name: codex_tools::ToolName::plain("exec_command"), + source: crate::tools::context::ToolCallSource::Direct, payload: ToolPayload::Function { arguments: serde_json::json!({ "cmd": "echo hi", @@ -639,6 +643,7 @@ async fn shell_handler_allows_sticky_turn_permissions_without_inline_request_per tracker: Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new())), call_id: "sticky-turn-grant".to_string(), tool_name: codex_tools::ToolName::plain("shell"), + source: crate::tools::context::ToolCallSource::Direct, payload: ToolPayload::Function { arguments: serde_json::json!({ "command": [ diff --git a/codex-rs/core/src/tools/code_mode/execute_handler.rs b/codex-rs/core/src/tools/code_mode/execute_handler.rs index ac452dc9f2b4..861375470111 100644 --- a/codex-rs/core/src/tools/code_mode/execute_handler.rs +++ b/codex-rs/core/src/tools/code_mode/execute_handler.rs @@ -9,6 +9,7 @@ use super::ExecContext; use super::PUBLIC_TOOL_NAME; use super::build_enabled_tools; use super::handle_runtime_response; +use super::is_exec_tool_name; pub struct CodeModeExecuteHandler; @@ -30,12 +31,23 @@ impl CodeModeExecuteHandler { .code_mode_service .stored_values() .await; + // Allocate before starting V8 so the trace can create the parent + // CodeCell before model-authored JavaScript issues nested tool calls. + let runtime_cell_id = exec.session.services.code_mode_service.allocate_cell_id(); + let code_cell_trace = exec.session.services.rollout_trace.start_code_cell_trace( + exec.session.conversation_id, + exec.turn.sub_id.as_str(), + runtime_cell_id.as_str(), + call_id.as_str(), + args.code.as_str(), + ); let started_at = std::time::Instant::now(); let response = exec .session .services .code_mode_service .execute(codex_code_mode::ExecuteRequest { + cell_id: runtime_cell_id, tool_call_id: call_id, enabled_tools, source: args.code, @@ -45,6 +57,15 @@ impl CodeModeExecuteHandler { }) .await .map_err(FunctionCallError::RespondToModel)?; + // Record the raw runtime boundary. The model-visible custom-tool output + // is produced by `handle_runtime_response` and later linked through + // `CodeCell.output_item_ids` in the reduced trace. + code_cell_trace.record_initial_response(&response); + // Yielded cells keep running, so terminal lifecycle is only emitted + // here when the first response also ended the runtime. + if !matches!(response, codex_code_mode::RuntimeResponse::Yielded { .. }) { + code_cell_trace.record_ended(&response); + } handle_runtime_response(&exec, response, args.max_output_tokens, started_at) .await .map_err(FunctionCallError::RespondToModel) @@ -73,9 +94,7 @@ impl ToolHandler for CodeModeExecuteHandler { } = invocation; match payload { - ToolPayload::Custom { input } - if tool_name.namespace.is_none() && tool_name.name.as_str() == PUBLIC_TOOL_NAME => - { + ToolPayload::Custom { input } if is_exec_tool_name(&tool_name) => { self.execute(session, turn, call_id, input).await } _ => Err(FunctionCallError::RespondToModel(format!( diff --git a/codex-rs/core/src/tools/code_mode/mod.rs b/codex-rs/core/src/tools/code_mode/mod.rs index 575d3f9a89d0..8032b9f31863 100644 --- a/codex-rs/core/src/tools/code_mode/mod.rs +++ b/codex-rs/core/src/tools/code_mode/mod.rs @@ -7,6 +7,7 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; +use codex_code_mode::CodeModeNestedToolCall; use codex_code_mode::CodeModeTurnHost; use codex_code_mode::RuntimeResponse; use codex_protocol::models::FunctionCallOutputContentItem; @@ -45,6 +46,11 @@ pub(crate) const PUBLIC_TOOL_NAME: &str = codex_code_mode::PUBLIC_TOOL_NAME; pub(crate) const WAIT_TOOL_NAME: &str = codex_code_mode::WAIT_TOOL_NAME; pub(crate) const DEFAULT_WAIT_YIELD_TIME_MS: u64 = codex_code_mode::DEFAULT_WAIT_YIELD_TIME_MS; +/// Returns true for the un-namespaced code-mode `exec` tool. +pub(crate) fn is_exec_tool_name(tool_name: &ToolName) -> bool { + tool_name.namespace.is_none() && tool_name.name == PUBLIC_TOOL_NAME +} + #[derive(Clone)] pub(crate) struct ExecContext { pub(super) session: Arc, @@ -73,6 +79,10 @@ impl CodeModeService { self.inner.replace_stored_values(values).await; } + pub(crate) fn allocate_cell_id(&self) -> String { + self.inner.allocate_cell_id() + } + pub(crate) async fn execute( &self, request: codex_code_mode::ExecuteRequest, @@ -83,7 +93,7 @@ impl CodeModeService { pub(crate) async fn wait( &self, request: codex_code_mode::WaitRequest, - ) -> Result { + ) -> Result { self.inner.wait(request).await } @@ -118,15 +128,13 @@ struct CoreTurnHost { impl CodeModeTurnHost for CoreTurnHost { async fn invoke_tool( &self, - tool_name: ToolName, - input: Option, + invocation: CodeModeNestedToolCall, cancellation_token: CancellationToken, ) -> Result { call_nested_tool( self.exec.clone(), self.tool_runtime.clone(), - tool_name, - input, + invocation, cancellation_token, ) .await @@ -302,11 +310,16 @@ async fn build_nested_router(exec: &ExecContext) -> ToolRouter { async fn call_nested_tool( exec: ExecContext, tool_runtime: ToolCallRuntime, - tool_name: ToolName, - input: Option, + invocation: CodeModeNestedToolCall, cancellation_token: CancellationToken, ) -> Result { - if tool_name.namespace.is_none() && tool_name.name == PUBLIC_TOOL_NAME { + let CodeModeNestedToolCall { + cell_id, + runtime_tool_call_id, + tool_name, + input, + } = invocation; + if is_exec_tool_name(&tool_name) { return Err(FunctionCallError::RespondToModel(format!( "{PUBLIC_TOOL_NAME} cannot invoke itself" ))); @@ -339,7 +352,14 @@ async fn call_nested_tool( payload, }; let result = tool_runtime - .handle_tool_call_with_source(call, ToolCallSource::CodeMode, cancellation_token) + .handle_tool_call_with_source( + call, + ToolCallSource::CodeMode { + cell_id, + runtime_tool_call_id, + }, + cancellation_token, + ) .await?; Ok(result.code_mode_result()) } diff --git a/codex-rs/core/src/tools/code_mode/wait_handler.rs b/codex-rs/core/src/tools/code_mode/wait_handler.rs index dcc416b8329b..4d2b1e42d36f 100644 --- a/codex-rs/core/src/tools/code_mode/wait_handler.rs +++ b/codex-rs/core/src/tools/code_mode/wait_handler.rs @@ -61,7 +61,7 @@ impl ToolHandler for CodeModeWaitHandler { let args: ExecWaitArgs = parse_arguments(&arguments)?; let exec = ExecContext { session, turn }; let started_at = std::time::Instant::now(); - let response = exec + let wait_response = exec .session .services .code_mode_service @@ -72,7 +72,28 @@ impl ToolHandler for CodeModeWaitHandler { }) .await .map_err(FunctionCallError::RespondToModel)?; - handle_runtime_response(&exec, response, args.max_tokens, started_at) + if let codex_code_mode::WaitOutcome::LiveCell(response) = &wait_response + && !matches!(response, codex_code_mode::RuntimeResponse::Yielded { .. }) + { + // Only a live-cell wait can close a CodeCell. A missing + // cell is still an ordinary `wait` tool result, but there + // is no runtime object for the reducer to complete. + let runtime_cell_id = match response { + codex_code_mode::RuntimeResponse::Yielded { cell_id, .. } + | codex_code_mode::RuntimeResponse::Terminated { cell_id, .. } + | codex_code_mode::RuntimeResponse::Result { cell_id, .. } => cell_id, + }; + exec.session + .services + .rollout_trace + .code_cell_trace_context( + exec.session.conversation_id, + exec.turn.sub_id.as_str(), + runtime_cell_id, + ) + .record_ended(response); + } + handle_runtime_response(&exec, wait_response.into(), args.max_tokens, started_at) .await .map_err(FunctionCallError::RespondToModel) } diff --git a/codex-rs/core/src/tools/context.rs b/codex-rs/core/src/tools/context.rs index 63bb641ef879..89aef248a67a 100644 --- a/codex-rs/core/src/tools/context.rs +++ b/codex-rs/core/src/tools/context.rs @@ -30,11 +30,18 @@ use tokio_util::sync::CancellationToken; pub type SharedTurnDiffTracker = Arc>; -#[derive(Clone, Copy, Debug, Eq, PartialEq)] +#[derive(Clone, Debug, Eq, PartialEq)] pub enum ToolCallSource { Direct, JsRepl, - CodeMode, + CodeMode { + /// Runtime cell that issued the nested tool request. + cell_id: String, + /// Code-mode's per-cell tool invocation id. This is useful for + /// debugging the JS/runtime bridge, but it is not the Codex tool call id + /// because the runtime id only needs to be unique within one cell. + runtime_tool_call_id: String, + }, } #[derive(Clone)] @@ -45,6 +52,7 @@ pub struct ToolInvocation { pub tracker: SharedTurnDiffTracker, pub call_id: String, pub tool_name: ToolName, + pub source: ToolCallSource, pub payload: ToolPayload, } diff --git a/codex-rs/core/src/tools/handlers/apply_patch_tests.rs b/codex-rs/core/src/tools/handlers/apply_patch_tests.rs index b1b9d0cbbe44..f3d9bdd7de4f 100644 --- a/codex-rs/core/src/tools/handlers/apply_patch_tests.rs +++ b/codex-rs/core/src/tools/handlers/apply_patch_tests.rs @@ -37,6 +37,7 @@ async fn invocation_for_payload(payload: ToolPayload) -> ToolInvocation { tracker: Arc::new(Mutex::new(TurnDiffTracker::new())), call_id: "call-apply-patch".to_string(), tool_name: codex_tools::ToolName::plain("apply_patch"), + source: crate::tools::context::ToolCallSource::Direct, payload, } } diff --git a/codex-rs/core/src/tools/handlers/mcp.rs b/codex-rs/core/src/tools/handlers/mcp.rs index 0de45ea6bcda..d828119d9195 100644 --- a/codex-rs/core/src/tools/handlers/mcp.rs +++ b/codex-rs/core/src/tools/handlers/mcp.rs @@ -112,6 +112,7 @@ fn mcp_hook_tool_input(raw_arguments: &str) -> Value { mod tests { use super::*; use crate::session::tests::make_session_and_context; + use crate::tools::context::ToolCallSource; use crate::turn_diff_tracker::TurnDiffTracker; use pretty_assertions::assert_eq; use serde_json::json; @@ -141,6 +142,7 @@ mod tests { tracker: Arc::new(Mutex::new(TurnDiffTracker::new())), call_id: "call-mcp-pre".to_string(), tool_name: codex_tools::ToolName::namespaced("mcp__memory__", "create_entities"), + source: ToolCallSource::Direct, payload, }), Some(PreToolUsePayload { @@ -188,6 +190,7 @@ mod tests { tracker: Arc::new(Mutex::new(TurnDiffTracker::new())), call_id: "call-mcp-post".to_string(), tool_name: codex_tools::ToolName::namespaced("mcp__filesystem__", "read_file"), + source: ToolCallSource::Direct, payload, }; assert_eq!( diff --git a/codex-rs/core/src/tools/handlers/multi_agents_tests.rs b/codex-rs/core/src/tools/handlers/multi_agents_tests.rs index d7117dcc4dfc..eefee4678fa0 100644 --- a/codex-rs/core/src/tools/handlers/multi_agents_tests.rs +++ b/codex-rs/core/src/tools/handlers/multi_agents_tests.rs @@ -72,6 +72,7 @@ fn invocation( tracker: Arc::new(Mutex::new(TurnDiffTracker::default())), call_id: "call-1".to_string(), tool_name: codex_tools::ToolName::plain(tool_name), + source: crate::tools::context::ToolCallSource::Direct, payload, } } diff --git a/codex-rs/core/src/tools/handlers/request_user_input_tests.rs b/codex-rs/core/src/tools/handlers/request_user_input_tests.rs index e0a0907668bd..3c21d66702a1 100644 --- a/codex-rs/core/src/tools/handlers/request_user_input_tests.rs +++ b/codex-rs/core/src/tools/handlers/request_user_input_tests.rs @@ -31,6 +31,7 @@ async fn multi_agent_v2_request_user_input_rejects_subagent_threads() { tracker: Arc::new(Mutex::new(TurnDiffTracker::default())), call_id: "call-1".to_string(), tool_name: codex_tools::ToolName::plain(REQUEST_USER_INPUT_TOOL_NAME), + source: crate::tools::context::ToolCallSource::Direct, payload: ToolPayload::Function { arguments: json!({ "questions": [{ diff --git a/codex-rs/core/src/tools/handlers/shell_tests.rs b/codex-rs/core/src/tools/handlers/shell_tests.rs index 83f52d0ea5e1..49e2cf8f75d1 100644 --- a/codex-rs/core/src/tools/handlers/shell_tests.rs +++ b/codex-rs/core/src/tools/handlers/shell_tests.rs @@ -13,6 +13,7 @@ use crate::shell::Shell; use crate::shell::ShellType; use crate::shell_snapshot::ShellSnapshot; use crate::tools::context::FunctionToolOutput; +use crate::tools::context::ToolCallSource; use crate::tools::context::ToolInvocation; use crate::tools::context::ToolPayload; use crate::tools::handlers::ShellCommandHandler; @@ -230,6 +231,7 @@ async fn shell_pre_tool_use_payload_uses_joined_command() { tracker: Arc::new(Mutex::new(TurnDiffTracker::new())), call_id: "call-41".to_string(), tool_name: codex_tools::ToolName::plain("shell"), + source: crate::tools::context::ToolCallSource::Direct, payload, }), Some(crate::tools::registry::PreToolUsePayload { @@ -257,6 +259,7 @@ async fn shell_command_pre_tool_use_payload_uses_raw_command() { tracker: Arc::new(Mutex::new(TurnDiffTracker::new())), call_id: "call-42".to_string(), tool_name: codex_tools::ToolName::plain("shell_command"), + source: crate::tools::context::ToolCallSource::Direct, payload, }), Some(crate::tools::registry::PreToolUsePayload { @@ -287,6 +290,7 @@ async fn build_post_tool_use_payload_uses_tool_output_wire_value() { tracker: Arc::new(Mutex::new(TurnDiffTracker::new())), call_id: "call-42".to_string(), tool_name: codex_tools::ToolName::plain("shell_command"), + source: ToolCallSource::Direct, payload, }; assert_eq!( diff --git a/codex-rs/core/src/tools/handlers/unified_exec_tests.rs b/codex-rs/core/src/tools/handlers/unified_exec_tests.rs index 0dd656bd6571..e0a0af5edc7e 100644 --- a/codex-rs/core/src/tools/handlers/unified_exec_tests.rs +++ b/codex-rs/core/src/tools/handlers/unified_exec_tests.rs @@ -15,6 +15,7 @@ use tempfile::tempdir; use crate::session::tests::make_session_and_context; use crate::tools::context::ExecCommandToolOutput; +use crate::tools::context::ToolCallSource; use crate::tools::context::ToolInvocation; use crate::tools::context::ToolPayload; use crate::tools::hook_names::HookToolName; @@ -35,6 +36,7 @@ async fn invocation_for_payload( tracker: Arc::new(Mutex::new(TurnDiffTracker::new())), call_id: call_id.to_string(), tool_name: codex_tools::ToolName::plain(tool_name), + source: ToolCallSource::Direct, payload, } } @@ -232,6 +234,7 @@ async fn exec_command_pre_tool_use_payload_uses_raw_command() { tracker: Arc::new(Mutex::new(TurnDiffTracker::new())), call_id: "call-43".to_string(), tool_name: codex_tools::ToolName::plain("exec_command"), + source: crate::tools::context::ToolCallSource::Direct, payload, }), Some(crate::tools::registry::PreToolUsePayload { @@ -257,6 +260,7 @@ async fn exec_command_pre_tool_use_payload_skips_write_stdin() { tracker: Arc::new(Mutex::new(TurnDiffTracker::new())), call_id: "call-44".to_string(), tool_name: codex_tools::ToolName::plain("write_stdin"), + source: crate::tools::context::ToolCallSource::Direct, payload, }), None diff --git a/codex-rs/core/src/tools/mod.rs b/codex-rs/core/src/tools/mod.rs index ebee03b03123..a6e31dc8b8e1 100644 --- a/codex-rs/core/src/tools/mod.rs +++ b/codex-rs/core/src/tools/mod.rs @@ -12,6 +12,7 @@ pub(crate) mod router; pub(crate) mod runtimes; pub(crate) mod sandboxing; pub(crate) mod spec; +pub(crate) mod tool_dispatch_trace; pub(crate) mod tool_search_entry; use codex_protocol::exec_output::ExecToolCallOutput; diff --git a/codex-rs/core/src/tools/registry.rs b/codex-rs/core/src/tools/registry.rs index 523d8ed65042..08bd21548cfe 100644 --- a/codex-rs/core/src/tools/registry.rs +++ b/codex-rs/core/src/tools/registry.rs @@ -15,6 +15,7 @@ use crate::tools::context::ToolInvocation; use crate::tools::context::ToolOutput; use crate::tools::context::ToolPayload; use crate::tools::hook_names::HookToolName; +use crate::tools::tool_dispatch_trace::ToolDispatchTrace; use codex_hooks::HookEvent; use codex_hooks::HookEventAfterToolUse; use codex_hooks::HookPayload; @@ -219,6 +220,19 @@ impl ToolRegistry { Self { handlers } } + #[cfg(test)] + pub(crate) fn empty_for_test() -> Self { + Self::new(HashMap::new()) + } + + #[cfg(test)] + pub(crate) fn with_handler_for_test(name: ToolName, handler: Arc) -> Self + where + T: ToolHandler + 'static, + { + Self::new(HashMap::from([(name, handler as Arc)])) + } + fn handler(&self, name: &ToolName) -> Option> { self.handlers.get(name).map(Arc::clone) } @@ -294,6 +308,8 @@ impl ToolRegistry { } } + let dispatch_trace = ToolDispatchTrace::start(&invocation); + let handler = match self.handler(&tool_name) { Some(handler) => handler, None => { @@ -309,7 +325,9 @@ impl ToolRegistry { mcp_server_ref, mcp_server_origin_ref, ); - return Err(FunctionCallError::RespondToModel(message)); + let err = FunctionCallError::RespondToModel(message); + dispatch_trace.record_failed(&err); + return Err(err); } }; @@ -326,7 +344,9 @@ impl ToolRegistry { mcp_server_ref, mcp_server_origin_ref, ); - return Err(FunctionCallError::Fatal(message)); + let err = FunctionCallError::Fatal(message); + dispatch_trace.record_failed(&err); + return Err(err); } if let Some(pre_tool_use_payload) = handler.pre_tool_use_payload(&invocation) @@ -339,7 +359,9 @@ impl ToolRegistry { ) .await { - return Err(FunctionCallError::RespondToModel(message)); + let err = FunctionCallError::RespondToModel(message); + dispatch_trace.record_failed(&err); + return Err(err); } let is_mutating = handler.is_mutating(&invocation).await; @@ -420,6 +442,7 @@ impl ToolRegistry { .await; if let Some(err) = hook_abort_error { + dispatch_trace.record_failed(&err); return Err(err); } @@ -459,9 +482,18 @@ impl ToolRegistry { let result = guard.take().ok_or_else(|| { FunctionCallError::Fatal("tool produced no output".to_string()) })?; + dispatch_trace.record_completed( + &invocation, + &result.call_id, + &result.payload, + result.result.as_ref(), + ); Ok(result) } - Err(err) => Err(err), + Err(err) => { + dispatch_trace.record_failed(&err); + Err(err) + } } } } diff --git a/codex-rs/core/src/tools/registry_tests.rs b/codex-rs/core/src/tools/registry_tests.rs index f4fb72e791ce..d44c3d0f9b8b 100644 --- a/codex-rs/core/src/tools/registry_tests.rs +++ b/codex-rs/core/src/tools/registry_tests.rs @@ -1,6 +1,7 @@ use super::*; use pretty_assertions::assert_eq; +#[derive(Default)] struct TestHandler; impl ToolHandler for TestHandler { @@ -11,7 +12,10 @@ impl ToolHandler for TestHandler { } async fn handle(&self, _invocation: ToolInvocation) -> Result { - unreachable!("test handler should not be invoked") + Ok(crate::tools::context::FunctionToolOutput::from_text( + "ok".to_string(), + Some(true), + )) } } diff --git a/codex-rs/core/src/tools/router.rs b/codex-rs/core/src/tools/router.rs index c3b4b86d368e..ad635126cf83 100644 --- a/codex-rs/core/src/tools/router.rs +++ b/codex-rs/core/src/tools/router.rs @@ -281,7 +281,7 @@ impl ToolRouter { let direct_js_repl_call = tool_name.namespace.is_none() && matches!(tool_name.name.as_str(), "js_repl" | "js_repl_reset"); - if source == ToolCallSource::Direct + if matches!(&source, ToolCallSource::Direct) && turn.tools_config.js_repl_tools_only && !direct_js_repl_call { @@ -298,6 +298,7 @@ impl ToolRouter { tracker, call_id, tool_name, + source, payload, }; diff --git a/codex-rs/core/src/tools/tool_dispatch_trace.rs b/codex-rs/core/src/tools/tool_dispatch_trace.rs new file mode 100644 index 000000000000..89dc71f96048 --- /dev/null +++ b/codex-rs/core/src/tools/tool_dispatch_trace.rs @@ -0,0 +1,139 @@ +//! Adapter between core tool dispatch objects and rollout-trace events. +//! +//! `codex-rollout-trace` owns the event schema and writer behavior. This module +//! keeps the core-specific mapping from registry invocations/results out of the +//! registry control flow. + +use crate::function_tool::FunctionCallError; +use crate::tools::context::ToolCallSource; +use crate::tools::context::ToolInvocation; +use crate::tools::context::ToolOutput; +use crate::tools::context::ToolPayload; +use codex_rollout_trace::ExecutionStatus; +use codex_rollout_trace::ToolDispatchInvocation; +use codex_rollout_trace::ToolDispatchPayload; +use codex_rollout_trace::ToolDispatchRequester; +use codex_rollout_trace::ToolDispatchResult; +use codex_rollout_trace::ToolDispatchTraceContext; + +/// Keeps registry early-return paths paired with trace end events. +pub(crate) struct ToolDispatchTrace { + context: ToolDispatchTraceContext, +} + +impl ToolDispatchTrace { + pub(crate) fn start(invocation: &ToolInvocation) -> Self { + let context = invocation + .session + .services + .rollout_trace + .start_tool_dispatch_trace(|| tool_dispatch_invocation(invocation)); + Self { context } + } + + pub(crate) fn record_completed( + &self, + invocation: &ToolInvocation, + call_id: &str, + payload: &ToolPayload, + result: &dyn ToolOutput, + ) { + if !self.context.is_enabled() { + return; + } + + let Some(result_payload) = tool_dispatch_result(invocation, call_id, payload, result) + else { + return; + }; + let status = if result.success_for_logging() { + ExecutionStatus::Completed + } else { + ExecutionStatus::Failed + }; + self.context.record_completed(status, result_payload); + } + + pub(crate) fn record_failed(&self, error: &FunctionCallError) { + self.context.record_failed(error); + } +} + +fn tool_dispatch_invocation(invocation: &ToolInvocation) -> Option { + let requester = match &invocation.source { + ToolCallSource::Direct => ToolDispatchRequester::Model { + model_visible_call_id: invocation.call_id.clone(), + }, + ToolCallSource::CodeMode { + cell_id, + runtime_tool_call_id, + } => ToolDispatchRequester::CodeCell { + runtime_cell_id: cell_id.clone(), + runtime_tool_call_id: runtime_tool_call_id.clone(), + }, + ToolCallSource::JsRepl => return None, + }; + + Some(ToolDispatchInvocation { + thread_id: invocation.session.conversation_id.to_string(), + codex_turn_id: invocation.turn.sub_id.clone(), + tool_call_id: invocation.call_id.clone(), + tool_name: invocation.tool_name.name.clone(), + tool_namespace: invocation.tool_name.namespace.clone(), + requester, + payload: tool_dispatch_payload(&invocation.payload), + }) +} + +fn tool_dispatch_result( + invocation: &ToolInvocation, + call_id: &str, + payload: &ToolPayload, + result: &dyn ToolOutput, +) -> Option { + match invocation.source { + ToolCallSource::Direct => Some(ToolDispatchResult::DirectResponse { + response_item: result.to_response_item(call_id, payload), + }), + ToolCallSource::CodeMode { .. } => Some(ToolDispatchResult::CodeModeResponse { + value: result.code_mode_result(payload), + }), + ToolCallSource::JsRepl => None, + } +} + +fn tool_dispatch_payload(payload: &ToolPayload) -> ToolDispatchPayload { + match payload { + ToolPayload::Function { arguments } => ToolDispatchPayload::Function { + arguments: arguments.clone(), + }, + ToolPayload::ToolSearch { arguments } => ToolDispatchPayload::ToolSearch { + arguments: arguments.clone(), + }, + ToolPayload::Custom { input } => ToolDispatchPayload::Custom { + input: input.clone(), + }, + ToolPayload::LocalShell { params } => ToolDispatchPayload::LocalShell { + command: params.command.clone(), + workdir: params.workdir.clone(), + timeout_ms: params.timeout_ms, + sandbox_permissions: params.sandbox_permissions, + prefix_rule: params.prefix_rule.clone(), + additional_permissions: params.additional_permissions.clone(), + justification: params.justification.clone(), + }, + ToolPayload::Mcp { + server, + tool, + raw_arguments, + } => ToolDispatchPayload::Mcp { + server: server.clone(), + tool: tool.clone(), + raw_arguments: raw_arguments.clone(), + }, + } +} + +#[cfg(test)] +#[path = "tool_dispatch_trace_tests.rs"] +mod tests; diff --git a/codex-rs/core/src/tools/tool_dispatch_trace_tests.rs b/codex-rs/core/src/tools/tool_dispatch_trace_tests.rs new file mode 100644 index 000000000000..5beccd78162c --- /dev/null +++ b/codex-rs/core/src/tools/tool_dispatch_trace_tests.rs @@ -0,0 +1,337 @@ +use std::fs; +use std::path::Path; +use std::path::PathBuf; +use std::sync::Arc; + +use codex_protocol::protocol::SessionSource; +use codex_rollout_trace::ExecutionStatus; +use codex_rollout_trace::RolloutTraceRecorder; +use codex_rollout_trace::ThreadStartedTraceMetadata; +use codex_rollout_trace::ToolCallRequester; +use pretty_assertions::assert_eq; +use tempfile::TempDir; +use tokio_util::sync::CancellationToken; + +use crate::function_tool::FunctionCallError; +use crate::session::session::Session; +use crate::session::tests::make_session_and_context; +use crate::session::turn_context::TurnContext; +use crate::tools::code_mode::CodeModeWaitHandler; +use crate::tools::code_mode::WAIT_TOOL_NAME; +use crate::tools::context::FunctionToolOutput; +use crate::tools::context::ToolCallSource; +use crate::tools::context::ToolInvocation; +use crate::tools::context::ToolPayload; +use crate::tools::registry::ToolHandler; +use crate::tools::registry::ToolKind; +use crate::tools::registry::ToolRegistry; +use crate::turn_diff_tracker::TurnDiffTracker; + +#[derive(Default)] +struct TestHandler; + +impl ToolHandler for TestHandler { + type Output = FunctionToolOutput; + + fn kind(&self) -> ToolKind { + ToolKind::Function + } + + async fn handle(&self, _invocation: ToolInvocation) -> Result { + Ok(FunctionToolOutput::from_text("ok".to_string(), Some(true))) + } +} + +#[tokio::test] +async fn dispatch_lifecycle_trace_records_direct_and_code_mode_requesters() -> anyhow::Result<()> { + let temp = TempDir::new()?; + let (mut session, turn) = make_session_and_context().await; + attach_test_trace(&mut session, &turn, temp.path())?; + session.services.rollout_trace.start_code_cell_trace( + session.conversation_id, + turn.sub_id.as_str(), + "cell-1", + "call-code", + "await tools.test_tool({})", + ); + + let registry = ToolRegistry::with_handler_for_test( + codex_tools::ToolName::plain("test_tool"), + Arc::new(TestHandler), + ); + let session = Arc::new(session); + let turn = Arc::new(turn); + + registry + .dispatch_any(test_invocation( + Arc::clone(&session), + Arc::clone(&turn), + "direct-call", + "test_tool", + ToolCallSource::Direct, + "{}", + )) + .await?; + registry + .dispatch_any(test_invocation( + session, + turn, + "code-mode-call", + "test_tool", + ToolCallSource::CodeMode { + cell_id: "cell-1".to_string(), + runtime_tool_call_id: "tool-1".to_string(), + }, + "{}", + )) + .await?; + + let replayed = codex_rollout_trace::replay_bundle(single_bundle_dir(temp.path())?)?; + assert_eq!( + replayed.tool_calls["direct-call"].model_visible_call_id, + Some("direct-call".to_string()), + ); + assert_eq!( + replayed.tool_calls["direct-call"].requester, + ToolCallRequester::Model, + ); + assert!( + replayed.tool_calls["direct-call"] + .raw_invocation_payload_id + .is_some(), + "dispatch tracing should keep the tool invocation payload", + ); + assert!( + replayed.tool_calls["direct-call"] + .raw_result_payload_id + .is_some(), + "direct calls should keep the model-facing result payload", + ); + assert_eq!( + replayed.tool_calls["code-mode-call"].model_visible_call_id, + None, + ); + assert_eq!( + replayed.tool_calls["code-mode-call"].code_mode_runtime_tool_id, + Some("tool-1".to_string()), + ); + assert_eq!( + replayed.tool_calls["code-mode-call"].requester, + ToolCallRequester::CodeCell { + code_cell_id: "code_cell:call-code".to_string(), + }, + ); + assert!( + replayed.tool_calls["code-mode-call"] + .raw_result_payload_id + .is_some(), + "code-mode calls should keep the result returned to JavaScript", + ); + + Ok(()) +} + +#[tokio::test] +async fn dispatch_lifecycle_trace_skips_noncanonical_boundaries() -> anyhow::Result<()> { + assert_dispatch_trace_skips(ToolCallSource::JsRepl).await +} + +#[tokio::test] +async fn dispatch_lifecycle_trace_records_unsupported_tool_failures() -> anyhow::Result<()> { + let temp = TempDir::new()?; + let (mut session, turn) = make_session_and_context().await; + attach_test_trace(&mut session, &turn, temp.path())?; + + let registry = ToolRegistry::empty_for_test(); + let session = Arc::new(session); + let turn = Arc::new(turn); + + let result = registry + .dispatch_any(test_invocation( + session, + turn, + "unsupported-call", + "missing_tool", + ToolCallSource::Direct, + "{}", + )) + .await; + + assert!(matches!(result, Err(FunctionCallError::RespondToModel(_)))); + let replayed = codex_rollout_trace::replay_bundle(single_bundle_dir(temp.path())?)?; + let tool_call = &replayed.tool_calls["unsupported-call"]; + assert_eq!(tool_call.execution.status, ExecutionStatus::Failed); + assert!(tool_call.raw_result_payload_id.is_some()); + + Ok(()) +} + +#[tokio::test] +async fn dispatch_lifecycle_trace_records_incompatible_payload_failures() -> anyhow::Result<()> { + let temp = TempDir::new()?; + let (mut session, turn) = make_session_and_context().await; + attach_test_trace(&mut session, &turn, temp.path())?; + + let registry = ToolRegistry::with_handler_for_test( + codex_tools::ToolName::plain("test_tool"), + Arc::new(TestHandler), + ); + let session = Arc::new(session); + let turn = Arc::new(turn); + + let result = registry + .dispatch_any(test_invocation_with_payload( + session, + turn, + "incompatible-call", + codex_tools::ToolName::plain("test_tool"), + ToolCallSource::Direct, + ToolPayload::Custom { + input: "{}".to_string(), + }, + )) + .await; + + assert!(matches!(result, Err(FunctionCallError::Fatal(_)))); + let replayed = codex_rollout_trace::replay_bundle(single_bundle_dir(temp.path())?)?; + let tool_call = &replayed.tool_calls["incompatible-call"]; + assert_eq!(tool_call.execution.status, ExecutionStatus::Failed); + assert!(tool_call.raw_result_payload_id.is_some()); + + Ok(()) +} + +#[tokio::test] +async fn missing_code_mode_wait_traces_only_the_wait_tool_call() -> anyhow::Result<()> { + let temp = TempDir::new()?; + let (mut session, turn) = make_session_and_context().await; + attach_test_trace(&mut session, &turn, temp.path())?; + + let registry = ToolRegistry::with_handler_for_test( + codex_tools::ToolName::plain(WAIT_TOOL_NAME), + Arc::new(CodeModeWaitHandler), + ); + let session = Arc::new(session); + let turn = Arc::new(turn); + + registry + .dispatch_any(test_invocation( + session, + turn, + "wait-call", + WAIT_TOOL_NAME, + ToolCallSource::Direct, + r#"{"cell_id":"noop","terminate":true}"#, + )) + .await?; + + let replayed = codex_rollout_trace::replay_bundle(single_bundle_dir(temp.path())?)?; + assert_eq!(replayed.code_cells.len(), 0); + assert!( + replayed.tool_calls["wait-call"] + .raw_result_payload_id + .is_some() + ); + + Ok(()) +} + +async fn assert_dispatch_trace_skips(source: ToolCallSource) -> anyhow::Result<()> { + let temp = TempDir::new()?; + let (mut session, turn) = make_session_and_context().await; + attach_test_trace(&mut session, &turn, temp.path())?; + + let registry = ToolRegistry::with_handler_for_test( + codex_tools::ToolName::plain("test_tool"), + Arc::new(TestHandler), + ); + let session = Arc::new(session); + let turn = Arc::new(turn); + + registry + .dispatch_any(test_invocation( + session, + turn, + "skipped-call", + "test_tool", + source, + "{}", + )) + .await?; + + let replayed = codex_rollout_trace::replay_bundle(single_bundle_dir(temp.path())?)?; + assert_eq!(replayed.tool_calls, Default::default()); + + Ok(()) +} + +fn test_invocation( + session: Arc, + turn: Arc, + call_id: &str, + tool_name: &str, + source: ToolCallSource, + arguments: &str, +) -> ToolInvocation { + test_invocation_with_payload( + session, + turn, + call_id, + codex_tools::ToolName::plain(tool_name), + source, + ToolPayload::Function { + arguments: arguments.to_string(), + }, + ) +} + +fn test_invocation_with_payload( + session: Arc, + turn: Arc, + call_id: &str, + tool_name: codex_tools::ToolName, + source: ToolCallSource, + payload: ToolPayload, +) -> ToolInvocation { + ToolInvocation { + session, + turn, + cancellation_token: CancellationToken::new(), + tracker: Arc::new(tokio::sync::Mutex::new(TurnDiffTracker::new())), + call_id: call_id.to_string(), + tool_name, + source, + payload, + } +} + +fn attach_test_trace(session: &mut Session, turn: &TurnContext, root: &Path) -> anyhow::Result<()> { + let thread_id = session.conversation_id; + let recorder = RolloutTraceRecorder::create_in_root_for_test(root, thread_id)?; + recorder.record_thread_started(ThreadStartedTraceMetadata { + thread_id: thread_id.to_string(), + agent_path: "/root".to_string(), + task_name: None, + nickname: None, + agent_role: None, + session_source: SessionSource::Exec, + cwd: PathBuf::from("/workspace"), + rollout_path: None, + model: "gpt-test".to_string(), + provider_name: "test-provider".to_string(), + approval_policy: "never".to_string(), + sandbox_policy: "danger-full-access".to_string(), + }); + recorder.record_codex_turn_started(thread_id, turn.sub_id.as_str()); + session.services.rollout_trace = recorder; + Ok(()) +} + +fn single_bundle_dir(root: &Path) -> anyhow::Result { + let mut entries = fs::read_dir(root)? + .map(|entry| entry.map(|entry| entry.path())) + .collect::, _>>()?; + entries.sort(); + assert_eq!(entries.len(), 1); + Ok(entries.remove(0)) +} diff --git a/codex-rs/rollout-trace/Cargo.toml b/codex-rs/rollout-trace/Cargo.toml index 540f794fb7a2..a74304611544 100644 --- a/codex-rs/rollout-trace/Cargo.toml +++ b/codex-rs/rollout-trace/Cargo.toml @@ -14,6 +14,7 @@ workspace = true [dependencies] anyhow = { workspace = true } +codex-code-mode = { workspace = true } codex-protocol = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } diff --git a/codex-rs/rollout-trace/src/code_cell.rs b/codex-rs/rollout-trace/src/code_cell.rs new file mode 100644 index 000000000000..5f2603b70adf --- /dev/null +++ b/codex-rs/rollout-trace/src/code_cell.rs @@ -0,0 +1,185 @@ +//! Hot-path helpers for recording code-mode runtime cell lifecycles. +//! +//! The public `exec` tool is reduced as a first-class `CodeCell` instead of a +//! generic tool call. This module keeps the runtime response serialization and +//! lifecycle event policy inside the trace crate while core carries a compact, +//! no-op capable handle through execution and waits. + +use std::sync::Arc; + +use codex_code_mode::RuntimeResponse; +use serde::Serialize; +use tracing::warn; + +use crate::model::AgentThreadId; +use crate::model::CodeCellRuntimeStatus; +use crate::model::CodexTurnId; +use crate::model::ModelVisibleCallId; +use crate::payload::RawPayloadKind; +use crate::payload::RawPayloadRef; +use crate::raw_event::RawTraceEventContext; +use crate::raw_event::RawTraceEventPayload; +use crate::writer::TraceWriter; + +/// No-op capable trace handle for one code-mode runtime cell. +#[derive(Clone, Debug)] +pub struct CodeCellTraceContext { + state: CodeCellTraceContextState, +} + +#[derive(Clone, Debug)] +enum CodeCellTraceContextState { + Disabled, + Enabled(EnabledCodeCellTraceContext), +} + +#[derive(Clone, Debug)] +struct EnabledCodeCellTraceContext { + writer: Arc, + thread_id: AgentThreadId, + codex_turn_id: CodexTurnId, + runtime_cell_id: String, +} + +/// Raw code-mode response captured at the runtime boundary. +/// +/// This is not the model-visible custom-tool output. The reducer links that +/// output through `CodeCell.output_item_ids` once the conversation item appears. +/// Keeping the raw runtime payload here preserves stored-value and lifecycle +/// evidence without duplicating the model-facing transcript. +#[derive(Serialize)] +struct CodeCellResponseTracePayload<'a> { + response: &'a RuntimeResponse, +} + +impl CodeCellTraceContext { + /// Builds a context that accepts trace calls and records nothing. + pub(crate) fn disabled() -> Self { + Self { + state: CodeCellTraceContextState::Disabled, + } + } + + /// Builds a context for an already-known code-mode runtime cell. + pub(crate) fn enabled( + writer: Arc, + thread_id: impl Into, + codex_turn_id: impl Into, + runtime_cell_id: impl Into, + ) -> Self { + Self { + state: CodeCellTraceContextState::Enabled(EnabledCodeCellTraceContext { + writer, + thread_id: thread_id.into(), + codex_turn_id: codex_turn_id.into(), + runtime_cell_id: runtime_cell_id.into(), + }), + } + } + + /// Records the parent runtime object before JavaScript can issue nested tool calls. + pub fn record_started( + &self, + model_visible_call_id: impl Into, + source_js: impl Into, + ) { + let CodeCellTraceContextState::Enabled(context) = &self.state else { + return; + }; + append_with_context_best_effort( + context, + RawTraceEventPayload::CodeCellStarted { + runtime_cell_id: context.runtime_cell_id.clone(), + model_visible_call_id: model_visible_call_id.into(), + source_js: source_js.into(), + }, + ); + } + + /// Records the first response returned by the public code-mode `exec` tool. + /// + /// A yielded response returns control to the model while the cell keeps + /// running. Terminal initial responses should be followed by `record_ended` + /// by the caller so the reducer can distinguish model-visible output from + /// runtime completion. + pub fn record_initial_response(&self, response: &RuntimeResponse) { + let CodeCellTraceContextState::Enabled(context) = &self.state else { + return; + }; + append_with_context_best_effort( + context, + RawTraceEventPayload::CodeCellInitialResponse { + runtime_cell_id: context.runtime_cell_id.clone(), + status: code_cell_status_for_runtime_response(response), + response_payload: code_cell_response_payload(context, response), + }, + ); + } + + /// Records the terminal lifecycle point for a code-mode runtime cell. + pub fn record_ended(&self, response: &RuntimeResponse) { + let CodeCellTraceContextState::Enabled(context) = &self.state else { + return; + }; + append_with_context_best_effort( + context, + RawTraceEventPayload::CodeCellEnded { + runtime_cell_id: context.runtime_cell_id.clone(), + status: code_cell_status_for_runtime_response(response), + response_payload: code_cell_response_payload(context, response), + }, + ); + } +} + +fn code_cell_status_for_runtime_response(response: &RuntimeResponse) -> CodeCellRuntimeStatus { + match response { + RuntimeResponse::Yielded { .. } => CodeCellRuntimeStatus::Yielded, + RuntimeResponse::Terminated { .. } => CodeCellRuntimeStatus::Terminated, + RuntimeResponse::Result { error_text, .. } => { + if error_text.is_some() { + CodeCellRuntimeStatus::Failed + } else { + CodeCellRuntimeStatus::Completed + } + } + } +} + +fn code_cell_response_payload( + context: &EnabledCodeCellTraceContext, + response: &RuntimeResponse, +) -> Option { + write_json_payload_best_effort( + &context.writer, + RawPayloadKind::ToolResult, + &CodeCellResponseTracePayload { response }, + ) +} + +fn write_json_payload_best_effort( + writer: &TraceWriter, + kind: RawPayloadKind, + payload: &impl Serialize, +) -> Option { + match writer.write_json_payload(kind, payload) { + Ok(payload_ref) => Some(payload_ref), + Err(err) => { + warn!("failed to write rollout trace payload: {err:#}"); + None + } + } +} + +fn append_with_context_best_effort( + context: &EnabledCodeCellTraceContext, + payload: RawTraceEventPayload, +) { + let event_context = RawTraceEventContext { + thread_id: Some(context.thread_id.clone()), + codex_turn_id: Some(context.codex_turn_id.clone()), + }; + if let Err(err) = context.writer.append_with_context(event_context, payload) { + warn!("failed to append rollout trace event: {err:#}"); + } +} diff --git a/codex-rs/rollout-trace/src/compaction.rs b/codex-rs/rollout-trace/src/compaction.rs index 01608974ac6c..5dffad2462d7 100644 --- a/codex-rs/rollout-trace/src/compaction.rs +++ b/codex-rs/rollout-trace/src/compaction.rs @@ -206,12 +206,12 @@ impl CompactionTraceAttempt { /// inference streams: traces are evidence, while normal ResponseItem /// serialization is shaped for future request construction. pub fn record_completed(&self, output_items: &[ResponseItem]) { - let response_payload = TracedCompactionCompleted { - output_items: output_items.iter().map(trace_response_item_json).collect(), - }; let CompactionTraceAttemptState::Enabled(attempt) = &self.state else { return; }; + let response_payload = TracedCompactionCompleted { + output_items: output_items.iter().map(trace_response_item_json).collect(), + }; let Some(response_payload) = write_json_payload_best_effort( &attempt.context.writer, RawPayloadKind::CompactionResponse, diff --git a/codex-rs/rollout-trace/src/inference.rs b/codex-rs/rollout-trace/src/inference.rs index 506be96ad9c1..935a2af6c920 100644 --- a/codex-rs/rollout-trace/src/inference.rs +++ b/codex-rs/rollout-trace/src/inference.rs @@ -174,14 +174,14 @@ impl InferenceTraceAttempt { token_usage: &Option, output_items: &[ResponseItem], ) { + let InferenceTraceAttemptState::Enabled(attempt) = &self.state else { + return; + }; let response_payload = TracedResponseStreamCompleted { response_id, token_usage, output_items: output_items.iter().map(trace_response_item_json).collect(), }; - let InferenceTraceAttemptState::Enabled(attempt) = &self.state else { - return; - }; let Some(response_payload) = write_json_payload_best_effort( &attempt.context.writer, RawPayloadKind::InferenceResponse, diff --git a/codex-rs/rollout-trace/src/lib.rs b/codex-rs/rollout-trace/src/lib.rs index 0ca522c9a204..24d4c9add685 100644 --- a/codex-rs/rollout-trace/src/lib.rs +++ b/codex-rs/rollout-trace/src/lib.rs @@ -7,6 +7,7 @@ //! See `README.md` for the system diagram and reducer model. mod bundle; +mod code_cell; mod compaction; mod inference; mod model; @@ -14,10 +15,13 @@ mod payload; mod raw_event; mod recorder; mod reducer; +mod tool_dispatch; mod writer; /// Conventional reduced-state cache name written next to a raw trace bundle. pub use bundle::REDUCED_STATE_FILE_NAME; +/// No-op-capable handle for recording one code-mode runtime cell. +pub use code_cell::CodeCellTraceContext; /// Raw checkpoint payload for a remote compaction install event. pub use compaction::CompactionCheckpointTracePayload; /// No-op-capable handle for recording remote-compaction requests. @@ -54,5 +58,15 @@ pub use recorder::RolloutTraceRecorder; pub use recorder::ThreadStartedTraceMetadata; /// Replay a raw trace bundle and write/read its reduced `RolloutTrace`. pub use reducer::replay_bundle; +/// Request data for the canonical Codex tool boundary. +pub use tool_dispatch::ToolDispatchInvocation; +/// Tool input observed at the registry boundary. +pub use tool_dispatch::ToolDispatchPayload; +/// Runtime source that caused a dispatch-level tool call. +pub use tool_dispatch::ToolDispatchRequester; +/// Result data returned from a dispatch-level tool call. +pub use tool_dispatch::ToolDispatchResult; +/// No-op-capable handle for recording one resolved tool dispatch. +pub use tool_dispatch::ToolDispatchTraceContext; /// Append-only writer used by hot-path Codex instrumentation. pub use writer::TraceWriter; diff --git a/codex-rs/rollout-trace/src/recorder.rs b/codex-rs/rollout-trace/src/recorder.rs index d17951fda59b..833355f943ae 100644 --- a/codex-rs/rollout-trace/src/recorder.rs +++ b/codex-rs/rollout-trace/src/recorder.rs @@ -12,6 +12,7 @@ use tracing::warn; use uuid::Uuid; use crate::AgentThreadId; +use crate::CodeCellTraceContext; use crate::CodexTurnId; use crate::CompactionId; use crate::CompactionTraceContext; @@ -19,6 +20,8 @@ use crate::InferenceTraceContext; use crate::RawPayloadKind; use crate::RawPayloadRef; use crate::RawTraceEventPayload; +use crate::ToolDispatchInvocation; +use crate::ToolDispatchTraceContext; use crate::TraceWriter; /// Environment variable that enables local trace-bundle recording. @@ -99,6 +102,14 @@ impl RolloutTraceRecorder { } } + /// Creates a trace bundle in a known root directory. + /// + /// This is public so integration tests in downstream crates can replay the + /// exact bundle they produced without mutating process environment. + pub fn create_in_root_for_test(root: &Path, thread_id: ThreadId) -> anyhow::Result { + Self::create_in_root(root, thread_id) + } + fn create_in_root(root: &Path, thread_id: ThreadId) -> anyhow::Result { let trace_id = Uuid::new_v4().to_string(); let thread_id = thread_id.to_string(); @@ -147,6 +158,83 @@ impl RolloutTraceRecorder { }); } + /// Emits a turn-start lifecycle event. + /// + /// Most production turn lifecycle wiring lives outside this PR layer, but + /// trace-focused integration tests need a small explicit hook so reducer + /// inputs remain valid without exercising the full session loop. + pub fn record_codex_turn_started( + &self, + thread_id: impl Into, + codex_turn_id: impl Into, + ) { + let RolloutTraceRecorderState::Enabled(recorder) = &self.state else { + return; + }; + let thread_id = thread_id.into(); + let codex_turn_id = codex_turn_id.into(); + recorder.append_with_context_best_effort( + thread_id.clone(), + codex_turn_id.clone(), + RawTraceEventPayload::CodexTurnStarted { + codex_turn_id, + thread_id, + }, + ); + } + + /// Starts a first-class code-mode cell lifecycle and returns its trace handle. + pub fn start_code_cell_trace( + &self, + thread_id: impl Into, + codex_turn_id: impl Into, + runtime_cell_id: impl Into, + model_visible_call_id: impl Into, + source_js: impl Into, + ) -> CodeCellTraceContext { + let context = self.code_cell_trace_context(thread_id, codex_turn_id, runtime_cell_id); + context.record_started(model_visible_call_id, source_js); + context + } + + /// Builds a trace handle for an already-started code-mode runtime cell. + pub fn code_cell_trace_context( + &self, + thread_id: impl Into, + codex_turn_id: impl Into, + runtime_cell_id: impl Into, + ) -> CodeCellTraceContext { + let RolloutTraceRecorderState::Enabled(recorder) = &self.state else { + return CodeCellTraceContext::disabled(); + }; + + CodeCellTraceContext::enabled( + Arc::clone(&recorder.writer), + thread_id, + codex_turn_id, + runtime_cell_id, + ) + } + + /// Starts one dispatch-level tool lifecycle and returns its trace handle. + /// + /// `invocation` is lazy because adapting core tool objects into trace-owned + /// payloads can clone large arguments. Disabled tracing should not pay that + /// cost on the hot tool-dispatch path. + pub fn start_tool_dispatch_trace( + &self, + invocation: impl FnOnce() -> Option, + ) -> ToolDispatchTraceContext { + let RolloutTraceRecorderState::Enabled(recorder) = &self.state else { + return ToolDispatchTraceContext::disabled(); + }; + let Some(invocation) = invocation() else { + return ToolDispatchTraceContext::disabled(); + }; + + ToolDispatchTraceContext::start(Arc::clone(&recorder.writer), invocation) + } + /// Builds reusable inference trace context for one Codex turn. /// /// The returned context is intentionally not "an inference call" yet. @@ -221,6 +309,21 @@ impl EnabledRolloutTraceRecorder { warn!("failed to append rollout trace event: {err:#}"); } } + + fn append_with_context_best_effort( + &self, + thread_id: AgentThreadId, + codex_turn_id: CodexTurnId, + payload: RawTraceEventPayload, + ) { + let context = crate::RawTraceEventContext { + thread_id: Some(thread_id), + codex_turn_id: Some(codex_turn_id), + }; + if let Err(err) = self.writer.append_with_context(context, payload) { + warn!("failed to append rollout trace event: {err:#}"); + } + } } #[cfg(test)] diff --git a/codex-rs/rollout-trace/src/recorder_tests.rs b/codex-rs/rollout-trace/src/recorder_tests.rs index 2bd05c77479c..be5f2a4d055d 100644 --- a/codex-rs/rollout-trace/src/recorder_tests.rs +++ b/codex-rs/rollout-trace/src/recorder_tests.rs @@ -1,3 +1,4 @@ +use std::cell::Cell; use std::fs; use std::path::Path; use std::path::PathBuf; @@ -130,6 +131,14 @@ fn disabled_recorder_accepts_trace_calls_without_writing() -> anyhow::Result<()> replacement_history: &[], }); + let built_dispatch_invocation = Cell::new(false); + let dispatch_trace = recorder.start_tool_dispatch_trace(|| { + built_dispatch_invocation.set(true); + None + }); + assert!(!built_dispatch_invocation.get()); + assert!(!dispatch_trace.is_enabled()); + assert_eq!(fs::read_dir(temp.path())?.count(), 0); Ok(()) diff --git a/codex-rs/rollout-trace/src/tool_dispatch.rs b/codex-rs/rollout-trace/src/tool_dispatch.rs new file mode 100644 index 000000000000..0389814cbaa3 --- /dev/null +++ b/codex-rs/rollout-trace/src/tool_dispatch.rs @@ -0,0 +1,471 @@ +//! Hot-path helpers for recording canonical tool dispatch boundaries. +//! +//! Core owns tool routing and result conversion. The trace crate owns the raw +//! event schema, payload shape, and no-op behavior, so core only adapts its +//! domain objects into the small request/result structs defined here. + +use std::fmt::Display; +use std::sync::Arc; + +use codex_protocol::models::PermissionProfile; +use codex_protocol::models::ResponseInputItem; +use codex_protocol::models::SandboxPermissions; +use codex_protocol::models::SearchToolCallParams; +use serde::Serialize; +use serde_json::Value as JsonValue; +use serde_json::json; +use tracing::warn; + +use crate::model::AgentThreadId; +use crate::model::CodeModeRuntimeToolId; +use crate::model::CodexTurnId; +use crate::model::ExecutionStatus; +use crate::model::ModelVisibleCallId; +use crate::model::ToolCallId; +use crate::model::ToolCallKind; +use crate::model::ToolCallSummary; +use crate::payload::RawPayloadKind; +use crate::payload::RawPayloadRef; +use crate::raw_event::RawToolCallRequester; +use crate::raw_event::RawTraceEventContext; +use crate::raw_event::RawTraceEventPayload; +use crate::writer::TraceWriter; + +/// No-op capable trace handle for one resolved tool dispatch. +#[derive(Clone, Debug)] +pub struct ToolDispatchTraceContext { + state: ToolDispatchTraceContextState, +} + +#[derive(Clone, Debug)] +enum ToolDispatchTraceContextState { + Disabled, + Enabled(EnabledToolDispatchTraceContext), +} + +#[derive(Clone, Debug)] +struct EnabledToolDispatchTraceContext { + writer: Arc, + thread_id: AgentThreadId, + codex_turn_id: CodexTurnId, + tool_call_id: ToolCallId, +} + +/// Core-facing request data for the canonical Codex tool boundary. +pub struct ToolDispatchInvocation { + pub thread_id: AgentThreadId, + pub codex_turn_id: CodexTurnId, + pub tool_call_id: ToolCallId, + pub tool_name: String, + pub tool_namespace: Option, + pub requester: ToolDispatchRequester, + pub payload: ToolDispatchPayload, +} + +/// Runtime source that caused a dispatch-level tool call. +pub enum ToolDispatchRequester { + Model { + model_visible_call_id: ModelVisibleCallId, + }, + CodeCell { + runtime_cell_id: String, + runtime_tool_call_id: CodeModeRuntimeToolId, + }, +} + +/// Tool input observed at the registry boundary. +pub enum ToolDispatchPayload { + Function { + arguments: String, + }, + ToolSearch { + arguments: SearchToolCallParams, + }, + Custom { + input: String, + }, + LocalShell { + command: Vec, + workdir: Option, + timeout_ms: Option, + sandbox_permissions: Option, + prefix_rule: Option>, + additional_permissions: Option, + justification: Option, + }, + Mcp { + server: String, + tool: String, + raw_arguments: String, + }, +} + +/// Result data returned from a dispatch-level tool call. +#[derive(Serialize)] +#[serde(rename_all = "snake_case", tag = "type")] +pub enum ToolDispatchResult { + DirectResponse { response_item: ResponseInputItem }, + CodeModeResponse { value: JsonValue }, +} + +/// Raw invocation payload for the canonical Codex tool boundary. +#[derive(Serialize)] +struct DispatchedToolTraceRequest<'a> { + tool_name: &'a str, + tool_namespace: Option<&'a str>, + payload: &'a JsonValue, +} + +/// Raw response payload for dispatch-level tool trace events. +#[derive(Serialize)] +#[serde(rename_all = "snake_case", tag = "type")] +enum DispatchedToolTraceResponse<'a> { + DirectResponse { + response_item: &'a ResponseInputItem, + }, + CodeModeResponse { + value: &'a JsonValue, + }, + Error { + error: String, + }, +} + +impl ToolDispatchTraceContext { + /// Builds a context that accepts trace calls and records nothing. + pub(crate) fn disabled() -> Self { + Self { + state: ToolDispatchTraceContextState::Disabled, + } + } + + /// Returns whether caller-side result conversion would be recorded. + /// + /// Core uses this to avoid formatting or cloning tool outputs when the + /// dispatch lifecycle is suppressed or tracing is disabled. + pub fn is_enabled(&self) -> bool { + matches!(self.state, ToolDispatchTraceContextState::Enabled(_)) + } + + /// Starts one dispatch-level lifecycle and returns the handle for its result. + pub(crate) fn start(writer: Arc, invocation: ToolDispatchInvocation) -> Self { + if suppresses_tool_dispatch_trace(&invocation) { + return Self::disabled(); + } + + let context = EnabledToolDispatchTraceContext { + writer, + thread_id: invocation.thread_id.clone(), + codex_turn_id: invocation.codex_turn_id.clone(), + tool_call_id: invocation.tool_call_id.clone(), + }; + record_started(&context, invocation); + Self { + state: ToolDispatchTraceContextState::Enabled(context), + } + } + + /// Records the caller-facing successful or failed tool result. + pub fn record_completed(&self, status: ExecutionStatus, result: ToolDispatchResult) { + let ToolDispatchTraceContextState::Enabled(context) = &self.state else { + return; + }; + let response = match &result { + ToolDispatchResult::DirectResponse { response_item } => { + DispatchedToolTraceResponse::DirectResponse { response_item } + } + ToolDispatchResult::CodeModeResponse { value } => { + DispatchedToolTraceResponse::CodeModeResponse { value } + } + }; + append_tool_call_ended(context, status, &response); + } + + /// Records a dispatch failure before the tool produced a normal result payload. + pub fn record_failed(&self, error: impl Display) { + let ToolDispatchTraceContextState::Enabled(context) = &self.state else { + return; + }; + append_tool_call_ended( + context, + ExecutionStatus::Failed, + &DispatchedToolTraceResponse::Error { + error: error.to_string(), + }, + ); + } +} + +fn suppresses_tool_dispatch_trace(invocation: &ToolDispatchInvocation) -> bool { + matches!(invocation.payload, ToolDispatchPayload::Custom { .. }) + && invocation.tool_namespace.is_none() + && invocation.tool_name == codex_code_mode::PUBLIC_TOOL_NAME +} + +fn record_started(context: &EnabledToolDispatchTraceContext, invocation: ToolDispatchInvocation) { + let tool_name = invocation.tool_name; + let tool_namespace = invocation.tool_namespace; + let kind = dispatched_tool_kind(&tool_name, &invocation.payload); + let label = dispatched_tool_label(&tool_name, tool_namespace.as_deref(), &invocation.payload); + let input_preview = Some(invocation.payload.log_payload_preview()); + let payload = invocation.payload.into_json_payload(); + let request = DispatchedToolTraceRequest { + tool_name: tool_name.as_str(), + tool_namespace: tool_namespace.as_deref(), + payload: &payload, + }; + let request_payload = + write_json_payload_best_effort(&context.writer, RawPayloadKind::ToolInvocation, &request); + let (model_visible_call_id, code_mode_runtime_tool_id, requester) = + requester_fields(invocation.requester); + + append_with_context_best_effort( + context, + RawTraceEventPayload::ToolCallStarted { + tool_call_id: context.tool_call_id.clone(), + model_visible_call_id, + code_mode_runtime_tool_id, + requester, + kind, + summary: ToolCallSummary::Generic { + label, + input_preview, + output_preview: None, + }, + invocation_payload: request_payload, + }, + ); +} + +fn requester_fields( + requester: ToolDispatchRequester, +) -> ( + Option, + Option, + RawToolCallRequester, +) { + match requester { + ToolDispatchRequester::Model { + model_visible_call_id, + } => ( + Some(model_visible_call_id), + None, + RawToolCallRequester::Model, + ), + ToolDispatchRequester::CodeCell { + runtime_cell_id, + runtime_tool_call_id, + } => ( + None, + Some(runtime_tool_call_id), + RawToolCallRequester::CodeCell { runtime_cell_id }, + ), + } +} + +fn dispatched_tool_kind(tool_name: &str, payload: &ToolDispatchPayload) -> ToolCallKind { + if let ToolDispatchPayload::Mcp { server, tool, .. } = payload { + return ToolCallKind::Mcp { + server: server.clone(), + tool: tool.clone(), + }; + } + + match tool_name { + "exec_command" | "local_shell" | "shell" | "shell_command" => ToolCallKind::ExecCommand, + "write_stdin" => ToolCallKind::WriteStdin, + "apply_patch" => ToolCallKind::ApplyPatch, + "web_search" | "web_search_preview" => ToolCallKind::Web, + "image_generation" | "image_query" => ToolCallKind::ImageGeneration, + "spawn_agent" => ToolCallKind::SpawnAgent, + "send_message" => ToolCallKind::SendMessage, + "followup_task" => ToolCallKind::AssignAgentTask, + "wait_agent" => ToolCallKind::WaitAgent, + "close_agent" => ToolCallKind::CloseAgent, + other => ToolCallKind::Other { + name: other.to_string(), + }, + } +} + +fn dispatched_tool_label( + tool_name: &str, + tool_namespace: Option<&str>, + payload: &ToolDispatchPayload, +) -> String { + if let ToolDispatchPayload::Mcp { server, tool, .. } = payload { + return format!("mcp:{server}:{tool}"); + } + + match tool_namespace { + Some(namespace) => format!("{namespace}.{tool_name}"), + None => tool_name.to_string(), + } +} + +impl ToolDispatchPayload { + fn log_payload_preview(&self) -> String { + match self { + ToolDispatchPayload::Function { arguments } => truncate_preview(arguments), + ToolDispatchPayload::ToolSearch { arguments } => truncate_preview(&arguments.query), + ToolDispatchPayload::Custom { input } => truncate_preview(input), + ToolDispatchPayload::LocalShell { command, .. } => truncate_preview(&command.join(" ")), + ToolDispatchPayload::Mcp { raw_arguments, .. } => truncate_preview(raw_arguments), + } + } + + fn into_json_payload(self) -> JsonValue { + match self { + ToolDispatchPayload::Function { arguments } => json!({ + "type": "function", + "arguments": arguments, + }), + ToolDispatchPayload::ToolSearch { arguments } => json!({ + "type": "tool_search", + "arguments": arguments, + }), + ToolDispatchPayload::Custom { input } => json!({ + "type": "custom", + "input": input, + }), + ToolDispatchPayload::LocalShell { + command, + workdir, + timeout_ms, + sandbox_permissions, + prefix_rule, + additional_permissions, + justification, + } => json!({ + "type": "local_shell", + "command": command, + "workdir": workdir, + "timeout_ms": timeout_ms, + "sandbox_permissions": sandbox_permissions, + "prefix_rule": prefix_rule, + "additional_permissions": additional_permissions, + "justification": justification, + }), + ToolDispatchPayload::Mcp { + server, + tool, + raw_arguments, + } => json!({ + "type": "mcp", + "server": server, + "tool": tool, + "raw_arguments": raw_arguments, + }), + } + } +} + +fn truncate_preview(value: &str) -> String { + const MAX_PREVIEW_CHARS: usize = 160; + let mut chars = value.chars(); + let mut preview = chars.by_ref().take(MAX_PREVIEW_CHARS).collect::(); + if chars.next().is_some() { + preview.push_str("..."); + } + preview +} + +fn append_tool_call_ended( + context: &EnabledToolDispatchTraceContext, + status: ExecutionStatus, + response: &DispatchedToolTraceResponse<'_>, +) { + let response_payload = + write_json_payload_best_effort(&context.writer, RawPayloadKind::ToolResult, response); + append_with_context_best_effort( + context, + RawTraceEventPayload::ToolCallEnded { + tool_call_id: context.tool_call_id.clone(), + status, + result_payload: response_payload, + }, + ); +} + +fn write_json_payload_best_effort( + writer: &TraceWriter, + kind: RawPayloadKind, + payload: &impl Serialize, +) -> Option { + match writer.write_json_payload(kind, payload) { + Ok(payload_ref) => Some(payload_ref), + Err(err) => { + warn!("failed to write rollout trace payload: {err:#}"); + None + } + } +} + +fn append_with_context_best_effort( + context: &EnabledToolDispatchTraceContext, + payload: RawTraceEventPayload, +) { + let event_context = RawTraceEventContext { + thread_id: Some(context.thread_id.clone()), + codex_turn_id: Some(context.codex_turn_id.clone()), + }; + if let Err(err) = context.writer.append_with_context(event_context, payload) { + warn!("failed to append rollout trace event: {err:#}"); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn suppresses_only_noncanonical_dispatch_boundaries() { + assert!(suppresses_tool_dispatch_trace(&invocation( + codex_code_mode::PUBLIC_TOOL_NAME, + /*tool_namespace*/ None, + ToolDispatchRequester::Model { + model_visible_call_id: "call-exec".to_string(), + }, + ToolDispatchPayload::Custom { + input: "1 + 1".to_string(), + }, + ))); + assert!(!suppresses_tool_dispatch_trace(&invocation( + "custom_tool", + /*tool_namespace*/ None, + ToolDispatchRequester::Model { + model_visible_call_id: "call-custom".to_string(), + }, + ToolDispatchPayload::Custom { + input: "payload".to_string(), + }, + ))); + assert!(!suppresses_tool_dispatch_trace(&invocation( + codex_code_mode::PUBLIC_TOOL_NAME, + Some("mcp__server".to_string()), + ToolDispatchRequester::Model { + model_visible_call_id: "call-namespaced".to_string(), + }, + ToolDispatchPayload::Custom { + input: "payload".to_string(), + }, + ))); + } + + fn invocation( + tool_name: &str, + tool_namespace: Option, + requester: ToolDispatchRequester, + payload: ToolDispatchPayload, + ) -> ToolDispatchInvocation { + ToolDispatchInvocation { + thread_id: "thread-1".to_string(), + codex_turn_id: "turn-1".to_string(), + tool_call_id: "tool-call-1".to_string(), + tool_name: tool_name.to_string(), + tool_namespace, + requester, + payload, + } + } +}