diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 3f26f563f876..225d7f692a72 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -2116,6 +2116,7 @@ dependencies = [ "codex-protocol", "codex-responses-api-proxy", "codex-rmcp-client", + "codex-rollout-trace", "codex-sandboxing", "codex-state", "codex-stdio-to-uds", diff --git a/codex-rs/cli/Cargo.toml b/codex-rs/cli/Cargo.toml index d318297f8f6a..2a9c5a6ff7ba 100644 --- a/codex-rs/cli/Cargo.toml +++ b/codex-rs/cli/Cargo.toml @@ -43,6 +43,7 @@ codex-model-provider = { workspace = true } codex-protocol = { workspace = true } codex-responses-api-proxy = { workspace = true } codex-rmcp-client = { workspace = true } +codex-rollout-trace = { workspace = true } codex-sandboxing = { workspace = true } codex-state = { workspace = true } codex-stdio-to-uds = { workspace = true } diff --git a/codex-rs/cli/src/main.rs b/codex-rs/cli/src/main.rs index 3dee811fac3f..f378afad2c0c 100644 --- a/codex-rs/cli/src/main.rs +++ b/codex-rs/cli/src/main.rs @@ -22,6 +22,8 @@ use codex_exec::Command as ExecCommand; use codex_exec::ReviewArgs; use codex_execpolicy::ExecPolicyCheckCommand; use codex_responses_api_proxy::Args as ResponsesApiProxyArgs; +use codex_rollout_trace::REDUCED_STATE_FILE_NAME; +use codex_rollout_trace::replay_bundle; use codex_state::StateRuntime; use codex_state::state_db_path; use codex_tui::AppExitInfo; @@ -216,6 +218,10 @@ enum DebugSubcommand { /// Render the model-visible prompt input list as JSON. PromptInput(DebugPromptInputCommand), + /// Replay a rollout trace bundle and write reduced state JSON. + #[clap(hide = true)] + TraceReduce(DebugTraceReduceCommand), + /// Internal: reset local memory state for a fresh start. #[clap(hide = true)] ClearMemories, @@ -257,6 +263,17 @@ struct DebugModelsCommand { bundled: bool, } +#[derive(Debug, Parser)] +struct DebugTraceReduceCommand { + /// Trace bundle directory containing manifest.json and trace.jsonl. + #[arg(value_name = "TRACE_BUNDLE")] + trace_bundle: PathBuf, + + /// Output path for reduced RolloutTrace JSON. Defaults to TRACE_BUNDLE/state.json. + #[arg(long = "output", short = 'o', value_name = "FILE")] + output: Option, +} + #[derive(Debug, Parser)] struct ResumeCommand { /// Conversation/session id (UUID) or thread name. UUIDs take precedence if it parses. @@ -1065,6 +1082,14 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> { ) .await?; } + DebugSubcommand::TraceReduce(cmd) => { + reject_remote_mode_for_subcommand( + root_remote.as_deref(), + root_remote_auth_token_env.as_deref(), + "debug trace-reduce", + )?; + run_debug_trace_reduce_command(cmd).await?; + } DebugSubcommand::ClearMemories => { reject_remote_mode_for_subcommand( root_remote.as_deref(), @@ -1265,6 +1290,19 @@ fn maybe_print_under_development_feature_warning( ); } +async fn run_debug_trace_reduce_command(cmd: DebugTraceReduceCommand) -> anyhow::Result<()> { + let output = cmd + .output + .unwrap_or_else(|| cmd.trace_bundle.join(REDUCED_STATE_FILE_NAME)); + + let trace = replay_bundle(&cmd.trace_bundle)?; + let reduced_json = serde_json::to_vec_pretty(&trace)?; + tokio::fs::write(&output, reduced_json).await?; + println!("{}", output.display()); + + Ok(()) +} + async fn run_debug_prompt_input_command( cmd: DebugPromptInputCommand, root_config_overrides: CliConfigOverrides, diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index 1a30d3263fbc..977f7829e14d 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -92,7 +92,7 @@ pub(crate) async fn run_codex_thread_interactive( inherited_shell_snapshot: None, user_shell_override: None, inherited_exec_policy: Some(Arc::clone(&parent_session.services.exec_policy)), - inherited_rollout_trace: codex_rollout_trace::RolloutTraceRecorder::disabled(), + parent_rollout_thread_trace: codex_rollout_trace::ThreadTraceContext::disabled(), parent_trace: None, analytics_events_client: Some(parent_session.services.analytics_events_client.clone()), thread_store: Arc::clone(&parent_session.services.thread_store), diff --git a/codex-rs/core/src/compact_remote.rs b/codex-rs/core/src/compact_remote.rs index 962b3e67216d..0623ceb3b689 100644 --- a/codex-rs/core/src/compact_remote.rs +++ b/codex-rs/core/src/compact_remote.rs @@ -118,8 +118,7 @@ async fn run_remote_compact_task_inner_impl( let context_compaction_item = ContextCompactionItem::new(); // Use the UI compaction item ID as the trace compaction ID so protocol lifecycle events, // endpoint attempts, and the installed history checkpoint all have one join key. - let compaction_trace = sess.services.rollout_trace.compaction_trace_context( - sess.conversation_id, + let compaction_trace = sess.services.rollout_thread_trace.compaction_trace_context( turn_context.sub_id.as_str(), context_compaction_item.id.as_str(), turn_context.model_info.slug.as_str(), diff --git a/codex-rs/core/src/session/handlers.rs b/codex-rs/core/src/session/handlers.rs index 7656082c0363..1ab51def3668 100644 --- a/codex-rs/core/src/session/handlers.rs +++ b/codex-rs/core/src/session/handlers.rs @@ -977,6 +977,9 @@ pub async fn shutdown(sess: &Arc, sub_id: String) -> bool { msg: EventMsg::ShutdownComplete, }; sess.send_event_raw(event).await; + sess.services + .rollout_thread_trace + .record_ended(codex_rollout_trace::RolloutStatus::Completed); true } diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index 22a322b2a3d5..6b17fbc96e44 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -121,8 +121,9 @@ use codex_protocol::request_user_input::RequestUserInputArgs; use codex_protocol::request_user_input::RequestUserInputResponse; use codex_rmcp_client::ElicitationResponse; use codex_rollout::state_db; -use codex_rollout_trace::RolloutTraceRecorder; +use codex_rollout_trace::AgentResultTracePayload; use codex_rollout_trace::ThreadStartedTraceMetadata; +use codex_rollout_trace::ThreadTraceContext; use codex_sandboxing::policy_transforms::intersect_permission_profiles; use codex_shell_command::parse_command::parse_command; use codex_terminal_detection::user_agent; @@ -402,8 +403,11 @@ pub(crate) struct CodexSpawnArgs { pub(crate) metrics_service_name: Option, pub(crate) inherited_shell_snapshot: Option>, pub(crate) inherited_exec_policy: Option>, - /// Parent rollout-tree recorder, or a disabled recorder when this spawn has no parent trace. - pub(crate) inherited_rollout_trace: RolloutTraceRecorder, + /// Parent rollout trace used only to derive fresh spawned child traces. + /// + /// Root sessions and non-thread-spawn subagents pass a disabled context; + /// `Session::new` creates the root trace itself when rollout tracing is enabled. + pub(crate) parent_rollout_thread_trace: ThreadTraceContext, pub(crate) user_shell_override: Option, pub(crate) parent_trace: Option, pub(crate) analytics_events_client: Option, @@ -460,7 +464,7 @@ impl Codex { inherited_shell_snapshot, user_shell_override, inherited_exec_policy, - inherited_rollout_trace, + parent_rollout_thread_trace, parent_trace: _, analytics_events_client, thread_store, @@ -667,7 +671,7 @@ impl Codex { environment_manager, analytics_events_client, thread_store, - inherited_rollout_trace, + parent_rollout_thread_trace, ) .await .map_err(|e| { @@ -1448,6 +1452,12 @@ impl Session { /// Persist the event to rollout and send it to clients. pub(crate) async fn send_event(&self, turn_context: &TurnContext, msg: EventMsg) { let legacy_source = msg.clone(); + self.services + .rollout_thread_trace + .record_codex_turn_event(&turn_context.sub_id, &legacy_source); + self.services + .rollout_thread_trace + .record_tool_call_event(turn_context.sub_id.clone(), &legacy_source); let event = Event { id: turn_context.sub_id.clone(), msg, @@ -1500,13 +1510,19 @@ impl Session { return; } - self.forward_child_completion_to_parent(*parent_thread_id, child_agent_path, status) - .await; + self.forward_child_completion_to_parent( + turn_context, + *parent_thread_id, + child_agent_path, + status, + ) + .await; } /// Sends the standard completion envelope from a spawned MultiAgentV2 child to its parent. async fn forward_child_completion_to_parent( &self, + turn_context: &TurnContext, parent_thread_id: ThreadId, child_agent_path: &codex_protocol::AgentPath, status: AgentStatus, @@ -1520,6 +1536,13 @@ impl Session { }; let message = format_subagent_notification_message(child_agent_path.as_str(), &status); + // `communication` owns the message. Keep a second copy only when the + // recorder will actually need it after parent delivery succeeds. + let trace_message = self + .services + .rollout_thread_trace + .is_enabled() + .then(|| message.clone()); let communication = InterAgentCommunication::new( child_agent_path.clone(), parent_agent_path, @@ -1534,6 +1557,20 @@ impl Session { .await { debug!("failed to notify parent thread {parent_thread_id}: {err}"); + return; + } + if let Some(message) = trace_message { + self.services + .rollout_thread_trace + .record_agent_result_interaction( + turn_context.sub_id.as_str(), + parent_thread_id, + &AgentResultTracePayload { + child_agent_path: child_agent_path.as_str(), + message: &message, + status: &status, + }, + ); } } @@ -1565,6 +1602,9 @@ impl Session { // Persist the event into rollout storage (the store filters as needed). let rollout_items = vec![RolloutItem::EventMsg(event.msg.clone())]; self.persist_rollout_items(&rollout_items).await; + self.services + .rollout_thread_trace + .record_protocol_event(&event.msg); self.deliver_event_raw(event).await; } diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index 42e98ea5869e..8316a35c4028 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -273,7 +273,7 @@ impl Session { environment_manager: Arc, analytics_events_client: Option, thread_store: Arc, - inherited_rollout_trace: RolloutTraceRecorder, + parent_rollout_thread_trace: ThreadTraceContext, ) -> anyhow::Result> { debug!( "Configuring session: model={}; provider={:?}", @@ -449,18 +449,17 @@ impl Session { approval_policy: session_configuration.approval_policy.value().to_string(), sandbox_policy: format!("{:?}", session_configuration.sandbox_policy.get()), }; - let rollout_trace = if matches!( + let rollout_thread_trace = if matches!( session_configuration.session_source, SessionSource::SubAgent(SubAgentSource::ThreadSpawn { .. }) ) { - // Spawned child threads are part of their root rollout tree. If - // the parent had no trace recorder, do not create an orphan child - // bundle that looks like an independent rollout. - inherited_rollout_trace + // Spawned child threads are part of their root rollout tree. If the + // parent had no trace bundle, do not create an orphan child bundle + // that looks like an independent rollout. + parent_rollout_thread_trace.start_child_thread_trace_or_disabled(trace_metadata) } else { - RolloutTraceRecorder::create_root_or_disabled(conversation_id) + ThreadTraceContext::start_root_or_disabled(trace_metadata) }; - rollout_trace.record_thread_started(trace_metadata); let mut post_session_configured_events = Vec::::new(); @@ -739,7 +738,7 @@ impl Session { main_execve_wrapper_exe: config.main_execve_wrapper_exe.clone(), analytics_events_client, hooks, - rollout_trace, + rollout_thread_trace, user_shell: Arc::new(default_shell), shell_snapshot_tx, show_raw_agent_reasoning: config.show_raw_agent_reasoning, diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index b469e3e97123..0f7930907277 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -3150,7 +3150,7 @@ async fn session_new_fails_when_zsh_fork_enabled_without_zsh_path() { Arc::new(codex_thread_store::LocalThreadStore::new( codex_rollout::RolloutConfig::from_view(config.as_ref()), )), - RolloutTraceRecorder::disabled(), + codex_rollout_trace::ThreadTraceContext::disabled(), ) .await; @@ -3272,7 +3272,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { legacy_notify_argv: config.notify.clone(), ..HooksConfig::default() }), - rollout_trace: RolloutTraceRecorder::disabled(), + rollout_thread_trace: codex_rollout_trace::ThreadTraceContext::disabled(), user_shell: Arc::new(default_user_shell()), shell_snapshot_tx: watch::channel(None).0, show_raw_agent_reasoning: config.show_raw_agent_reasoning, @@ -3472,7 +3472,7 @@ async fn make_session_with_config_and_rx( Arc::new(codex_thread_store::LocalThreadStore::new( codex_rollout::RolloutConfig::from_view(config.as_ref()), )), - RolloutTraceRecorder::disabled(), + codex_rollout_trace::ThreadTraceContext::disabled(), ) .await?; @@ -4588,7 +4588,7 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx( legacy_notify_argv: config.notify.clone(), ..HooksConfig::default() }), - rollout_trace: RolloutTraceRecorder::disabled(), + rollout_thread_trace: codex_rollout_trace::ThreadTraceContext::disabled(), user_shell: Arc::new(default_user_shell()), shell_snapshot_tx: watch::channel(None).0, show_raw_agent_reasoning: config.show_raw_agent_reasoning, diff --git a/codex-rs/core/src/session/tests/guardian_tests.rs b/codex-rs/core/src/session/tests/guardian_tests.rs index e8b0e3b0d621..84865190d207 100644 --- a/codex-rs/core/src/session/tests/guardian_tests.rs +++ b/codex-rs/core/src/session/tests/guardian_tests.rs @@ -772,7 +772,7 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() { metrics_service_name: None, inherited_shell_snapshot: None, inherited_exec_policy: Some(Arc::new(parent_exec_policy)), - inherited_rollout_trace: RolloutTraceRecorder::disabled(), + parent_rollout_thread_trace: codex_rollout_trace::ThreadTraceContext::disabled(), user_shell_override: None, parent_trace: None, analytics_events_client: None, diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 54accc3afe07..db5df955d59c 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -1875,8 +1875,7 @@ async fn try_run_sampling_request( auth_mode = sess.services.auth_manager.auth_mode(), features = sess.features.enabled_features(), ); - let inference_trace = sess.services.rollout_trace.inference_trace_context( - sess.conversation_id, + let inference_trace = sess.services.rollout_thread_trace.inference_trace_context( turn_context.sub_id.as_str(), turn_context.model_info.slug.as_str(), turn_context.provider.info().name.as_str(), diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index 2c62e04c8cd9..e3086f14a729 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -23,7 +23,7 @@ use codex_mcp::McpConnectionManager; use codex_models_manager::manager::ModelsManager; use codex_otel::SessionTelemetry; use codex_rollout::state_db::StateDbHandle; -use codex_rollout_trace::RolloutTraceRecorder; +use codex_rollout_trace::ThreadTraceContext; use codex_thread_store::LiveThread; use codex_thread_store::ThreadStore; use std::path::PathBuf; @@ -43,7 +43,7 @@ pub(crate) struct SessionServices { pub(crate) main_execve_wrapper_exe: Option, pub(crate) analytics_events_client: AnalyticsEventsClient, pub(crate) hooks: Hooks, - pub(crate) rollout_trace: RolloutTraceRecorder, + pub(crate) rollout_thread_trace: ThreadTraceContext, pub(crate) user_shell: Arc, pub(crate) shell_snapshot_tx: watch::Sender>>, pub(crate) show_raw_agent_reasoning: bool, diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 55071f19fe3e..1d4bdfe6fe92 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -41,6 +41,7 @@ use codex_protocol::protocol::Op; use codex_protocol::protocol::RolloutItem; use codex_protocol::protocol::SessionConfiguredEvent; use codex_protocol::protocol::SessionSource; +use codex_protocol::protocol::SubAgentSource; use codex_protocol::protocol::TurnAbortReason; use codex_protocol::protocol::TurnAbortedEvent; use codex_protocol::protocol::W3cTraceContext; @@ -964,6 +965,9 @@ impl ThreadManagerState { } Some(_) | None => crate::file_watcher::WatchRegistration::default(), }; + let parent_rollout_thread_trace = self + .parent_rollout_thread_trace_for_source(&session_source, &initial_history) + .await; let CodexSpawnOk { codex, thread_id, .. } = Codex::spawn(CodexSpawnArgs { @@ -983,7 +987,7 @@ impl ThreadManagerState { metrics_service_name, inherited_shell_snapshot, inherited_exec_policy, - inherited_rollout_trace: codex_rollout_trace::RolloutTraceRecorder::disabled(), + parent_rollout_thread_trace, user_shell_override, parent_trace, analytics_events_client: self.analytics_events_client.clone(), @@ -1029,6 +1033,36 @@ impl ThreadManagerState { pub(crate) fn notify_thread_created(&self, thread_id: ThreadId) { let _ = self.thread_created_tx.send(thread_id); } + + async fn parent_rollout_thread_trace_for_source( + &self, + session_source: &SessionSource, + initial_history: &InitialHistory, + ) -> codex_rollout_trace::ThreadTraceContext { + // A fresh v2 child belongs to the same rollout tree as its parent, so + // session startup derives its child trace from the parent's thread + // context. Resumed children already have a prior `ThreadStarted` event + // for this thread id; deriving a child trace during resume would write + // that start event again and make the bundle unreplayable. + let SessionSource::SubAgent(SubAgentSource::ThreadSpawn { + parent_thread_id, .. + }) = session_source + else { + return codex_rollout_trace::ThreadTraceContext::disabled(); + }; + if matches!(initial_history, InitialHistory::Resumed(_)) { + return codex_rollout_trace::ThreadTraceContext::disabled(); + } + // Parent lookup can fail if the parent was closed or released between + // spawn preparation and session construction. Tracing is diagnostic, so + // that race should not block child creation; the child simply starts + // without a parent rollout trace. + self.get_thread(*parent_thread_id) + .await + .ok() + .map(|thread| thread.codex.session.services.rollout_thread_trace.clone()) + .unwrap_or_else(codex_rollout_trace::ThreadTraceContext::disabled) + } } /// Return a fork snapshot cut strictly before the nth user message (0-based). diff --git a/codex-rs/core/src/tools/code_mode/execute_handler.rs b/codex-rs/core/src/tools/code_mode/execute_handler.rs index 861375470111..6b99e09b56da 100644 --- a/codex-rs/core/src/tools/code_mode/execute_handler.rs +++ b/codex-rs/core/src/tools/code_mode/execute_handler.rs @@ -34,13 +34,16 @@ impl CodeModeExecuteHandler { // Allocate before starting V8 so the trace can create the parent // CodeCell before model-authored JavaScript issues nested tool calls. let runtime_cell_id = exec.session.services.code_mode_service.allocate_cell_id(); - let code_cell_trace = exec.session.services.rollout_trace.start_code_cell_trace( - exec.session.conversation_id, - exec.turn.sub_id.as_str(), - runtime_cell_id.as_str(), - call_id.as_str(), - args.code.as_str(), - ); + let code_cell_trace = exec + .session + .services + .rollout_thread_trace + .start_code_cell_trace( + exec.turn.sub_id.as_str(), + runtime_cell_id.as_str(), + call_id.as_str(), + args.code.as_str(), + ); let started_at = std::time::Instant::now(); let response = exec .session diff --git a/codex-rs/core/src/tools/code_mode/wait_handler.rs b/codex-rs/core/src/tools/code_mode/wait_handler.rs index 4d2b1e42d36f..70fa51251a44 100644 --- a/codex-rs/core/src/tools/code_mode/wait_handler.rs +++ b/codex-rs/core/src/tools/code_mode/wait_handler.rs @@ -85,12 +85,8 @@ impl ToolHandler for CodeModeWaitHandler { }; exec.session .services - .rollout_trace - .code_cell_trace_context( - exec.session.conversation_id, - exec.turn.sub_id.as_str(), - runtime_cell_id, - ) + .rollout_thread_trace + .code_cell_trace_context(exec.turn.sub_id.as_str(), runtime_cell_id) .record_ended(response); } handle_runtime_response(&exec, wait_response.into(), args.max_tokens, started_at) diff --git a/codex-rs/core/src/tools/tool_dispatch_trace.rs b/codex-rs/core/src/tools/tool_dispatch_trace.rs index 89dc71f96048..b95dc1b69fc6 100644 --- a/codex-rs/core/src/tools/tool_dispatch_trace.rs +++ b/codex-rs/core/src/tools/tool_dispatch_trace.rs @@ -26,7 +26,7 @@ impl ToolDispatchTrace { let context = invocation .session .services - .rollout_trace + .rollout_thread_trace .start_tool_dispatch_trace(|| tool_dispatch_invocation(invocation)); Self { context } } diff --git a/codex-rs/core/src/tools/tool_dispatch_trace_tests.rs b/codex-rs/core/src/tools/tool_dispatch_trace_tests.rs index 5beccd78162c..b2a7cfe977eb 100644 --- a/codex-rs/core/src/tools/tool_dispatch_trace_tests.rs +++ b/codex-rs/core/src/tools/tool_dispatch_trace_tests.rs @@ -5,7 +5,6 @@ use std::sync::Arc; use codex_protocol::protocol::SessionSource; use codex_rollout_trace::ExecutionStatus; -use codex_rollout_trace::RolloutTraceRecorder; use codex_rollout_trace::ThreadStartedTraceMetadata; use codex_rollout_trace::ToolCallRequester; use pretty_assertions::assert_eq; @@ -47,8 +46,7 @@ async fn dispatch_lifecycle_trace_records_direct_and_code_mode_requesters() -> a let temp = TempDir::new()?; let (mut session, turn) = make_session_and_context().await; attach_test_trace(&mut session, &turn, temp.path())?; - session.services.rollout_trace.start_code_cell_trace( - session.conversation_id, + session.services.rollout_thread_trace.start_code_cell_trace( turn.sub_id.as_str(), "cell-1", "call-code", @@ -307,23 +305,26 @@ fn test_invocation_with_payload( fn attach_test_trace(session: &mut Session, turn: &TurnContext, root: &Path) -> anyhow::Result<()> { let thread_id = session.conversation_id; - let recorder = RolloutTraceRecorder::create_in_root_for_test(root, thread_id)?; - recorder.record_thread_started(ThreadStartedTraceMetadata { - thread_id: thread_id.to_string(), - agent_path: "/root".to_string(), - task_name: None, - nickname: None, - agent_role: None, - session_source: SessionSource::Exec, - cwd: PathBuf::from("/workspace"), - rollout_path: None, - model: "gpt-test".to_string(), - provider_name: "test-provider".to_string(), - approval_policy: "never".to_string(), - sandbox_policy: "danger-full-access".to_string(), - }); - recorder.record_codex_turn_started(thread_id, turn.sub_id.as_str()); - session.services.rollout_trace = recorder; + let rollout_thread_trace = + codex_rollout_trace::ThreadTraceContext::start_root_in_root_for_test( + root, + ThreadStartedTraceMetadata { + thread_id: thread_id.to_string(), + agent_path: "/root".to_string(), + task_name: None, + nickname: None, + agent_role: None, + session_source: SessionSource::Exec, + cwd: PathBuf::from("/workspace"), + rollout_path: None, + model: "gpt-test".to_string(), + provider_name: "test-provider".to_string(), + approval_policy: "never".to_string(), + sandbox_policy: "danger-full-access".to_string(), + }, + )?; + rollout_thread_trace.record_codex_turn_started(turn.sub_id.as_str()); + session.services.rollout_thread_trace = rollout_thread_trace; Ok(()) } diff --git a/codex-rs/rollout-trace/README.md b/codex-rs/rollout-trace/README.md index 693401f900fd..540a494638d8 100644 --- a/codex-rs/rollout-trace/README.md +++ b/codex-rs/rollout-trace/README.md @@ -1,11 +1,15 @@ # Rollout Trace -> **Privacy:** Rollout tracing does **not** collect, upload, or report user data; -> it only writes local bundles when `CODEX_ROLLOUT_TRACE_ROOT` is set. +> **Privacy:** Rollout tracing is not telemetry. Codex does **not** upload or +> report these traces; it writes local bundles only when +> `CODEX_ROLLOUT_TRACE_ROOT` is set. Those local bundles can contain prompts, +> responses, tool inputs/outputs, terminal output, and paths, so treat them as +> sensitive. Rollout tracing is an opt-in diagnostic path for understanding what happened -during a Codex session. It records raw runtime evidence into a local bundle, then -replays that bundle into a semantic graph that a debugger or UI can inspect. +during a Codex session. It records raw runtime evidence into a local bundle on +disk, then replays that bundle into a semantic graph that a debugger or UI can +inspect. The key design choice is: **observe first, interpret later**. @@ -45,7 +49,7 @@ flowchart TD Agents["multi_agent_v2\nspawn, task delivery, result, close"] end - Recorder["RolloutTraceRecorder\nthin best-effort producer"] + Context["ThreadTraceContext\nroot/child no-op-capable producer"] Writer["TraceWriter\nassigns seq and writes payloads before events"] subgraph Bundle["trace bundle"] @@ -64,14 +68,14 @@ flowchart TD RawRefs["raw_payload refs"] end - Protocol --> Recorder - Inference --> Recorder - Tools --> Recorder - CodeMode --> Recorder - Terminal --> Recorder - Agents --> Recorder + Protocol --> Context + Inference --> Context + Tools --> Context + CodeMode --> Context + Terminal --> Context + Agents --> Context - Recorder --> Writer + Context --> Writer Writer --> Manifest Writer --> Payloads Writer --> Events @@ -87,9 +91,15 @@ flowchart TD Reducer --> RawRefs ``` -The recorder is deliberately small. It is enabled by `CODEX_ROLLOUT_TRACE_ROOT` -and must never make a Codex session fail just because tracing failed. Core emits -raw observations; this crate owns the bundle schema, writer API, and reducer. +The thread context is deliberately small and no-op capable. A root session starts +one from `CODEX_ROLLOUT_TRACE_ROOT`; fresh spawned child threads derive their +own context from the parent's context so the whole rollout tree shares one +writer. Disabled contexts accept the same calls and record nothing. + +Trace startup and writes are best-effort. Rollout tracing must never make a +Codex session fail just because diagnostic recording failed. Core emits raw +observations; this crate owns the bundle schema, trace-context APIs, writer, and +reducer. ## Bundle Layout @@ -111,7 +121,8 @@ To reduce a bundle: codex debug trace-reduce ``` -By default this writes `/state.json`. +By default this writes `/state.json`. Rust callers can also call +`codex_rollout_trace::replay_bundle` directly. ## Raw Evidence vs Reduced Graph diff --git a/codex-rs/rollout-trace/src/lib.rs b/codex-rs/rollout-trace/src/lib.rs index 24d4c9add685..3d9e04f36b4e 100644 --- a/codex-rs/rollout-trace/src/lib.rs +++ b/codex-rs/rollout-trace/src/lib.rs @@ -12,9 +12,10 @@ mod compaction; mod inference; mod model; mod payload; +mod protocol_event; mod raw_event; -mod recorder; mod reducer; +mod thread; mod tool_dispatch; mod writer; @@ -50,14 +51,16 @@ pub use raw_event::RawTraceEvent; pub use raw_event::RawTraceEventContext; /// Typed payload for one raw trace event. pub use raw_event::RawTraceEventPayload; -/// Environment variable that enables local trace-bundle recording. -pub use recorder::CODEX_ROLLOUT_TRACE_ROOT_ENV; -/// Best-effort hot-path recorder for one rollout trace bundle. -pub use recorder::RolloutTraceRecorder; -/// Raw metadata captured when a thread starts. -pub use recorder::ThreadStartedTraceMetadata; /// Replay a raw trace bundle and write/read its reduced `RolloutTrace`. pub use reducer::replay_bundle; +/// Raw payload captured when a child agent reports completion to its parent. +pub use thread::AgentResultTracePayload; +/// Environment variable that enables local trace-bundle recording. +pub use thread::CODEX_ROLLOUT_TRACE_ROOT_ENV; +/// Raw metadata captured when a thread starts. +pub use thread::ThreadStartedTraceMetadata; +/// No-op-capable handle for recording one thread in a rollout bundle. +pub use thread::ThreadTraceContext; /// Request data for the canonical Codex tool boundary. pub use tool_dispatch::ToolDispatchInvocation; /// Tool input observed at the registry boundary. diff --git a/codex-rs/rollout-trace/src/protocol_event.rs b/codex-rs/rollout-trace/src/protocol_event.rs new file mode 100644 index 000000000000..b3267a23ea18 --- /dev/null +++ b/codex-rs/rollout-trace/src/protocol_event.rs @@ -0,0 +1,410 @@ +//! Mapping from Codex protocol events into raw rollout-trace events. +//! +//! The session layer already emits protocol events for turn lifecycle, terminal +//! sessions, patch application, MCP calls, and collaboration tools. Rollout +//! tracing reuses those observations instead of adding another set of hooks in +//! `codex-core`: this module translates the protocol surface into the smaller +//! trace vocabulary and keeps the mapping isolated inside `codex-rollout-trace`. +//! +//! The long explicit `EventMsg` matches are intentional. Most protocol events +//! are not trace runtime boundaries, but spelling them out makes new protocol +//! variants a compile-time prompt to decide whether the trace should capture +//! them. + +use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::ExecCommandBeginEvent; +use codex_protocol::protocol::ExecCommandEndEvent; +use codex_protocol::protocol::ExecCommandSource; +use codex_protocol::protocol::ExecCommandStatus; +use codex_protocol::protocol::McpToolCallBeginEvent; +use codex_protocol::protocol::McpToolCallEndEvent; +use codex_protocol::protocol::PatchApplyBeginEvent; +use codex_protocol::protocol::PatchApplyEndEvent; +use codex_protocol::protocol::PatchApplyStatus; +use codex_protocol::protocol::TurnAbortReason; +use serde::Serialize; + +use crate::AgentThreadId; +use crate::CodexTurnId; +use crate::ExecutionStatus; +use crate::RawTraceEventPayload; + +pub(crate) struct CodexTurnTraceEvent { + pub context_turn_id: CodexTurnId, + pub payload: RawTraceEventPayload, +} + +pub(crate) fn codex_turn_trace_event( + thread_id: AgentThreadId, + default_turn_id: &str, + event: &EventMsg, +) -> Option { + match event { + EventMsg::TurnStarted(event) => { + let codex_turn_id = event.turn_id.clone(); + Some(CodexTurnTraceEvent { + context_turn_id: codex_turn_id.clone(), + payload: RawTraceEventPayload::CodexTurnStarted { + codex_turn_id, + thread_id, + }, + }) + } + EventMsg::TurnComplete(event) => { + let codex_turn_id = event.turn_id.clone(); + Some(CodexTurnTraceEvent { + context_turn_id: codex_turn_id.clone(), + payload: RawTraceEventPayload::CodexTurnEnded { + codex_turn_id, + status: ExecutionStatus::Completed, + }, + }) + } + EventMsg::TurnAborted(event) => { + let codex_turn_id = event + .turn_id + .clone() + .unwrap_or_else(|| default_turn_id.to_string()); + Some(CodexTurnTraceEvent { + context_turn_id: codex_turn_id.clone(), + payload: RawTraceEventPayload::CodexTurnEnded { + codex_turn_id, + status: execution_status_for_abort_reason(&event.reason), + }, + }) + } + _ => None, + } +} + +pub(crate) enum ToolRuntimeTraceEvent<'a> { + Started { + tool_call_id: &'a str, + payload: ToolRuntimePayload<'a>, + }, + Ended { + tool_call_id: &'a str, + status: ExecutionStatus, + payload: ToolRuntimePayload<'a>, + }, +} + +/// Borrowed protocol payload that should be persisted as tool runtime data. +/// +/// The trace wants the exact protocol payload shape for E2E debugging, while +/// reducers consume the surrounding typed trace events. This enum lets the +/// recorder serialize the original event by reference, without first cloning it +/// or converting it through `serde_json::Value`. +pub(crate) enum ToolRuntimePayload<'a> { + ExecCommandBegin(&'a ExecCommandBeginEvent), + ExecCommandEnd(&'a ExecCommandEndEvent), + PatchApplyBegin(&'a PatchApplyBeginEvent), + PatchApplyEnd(&'a PatchApplyEndEvent), + McpToolCallBegin(&'a McpToolCallBeginEvent), + McpToolCallEnd(&'a McpToolCallEndEvent), + CollabAgentSpawnBegin(&'a codex_protocol::protocol::CollabAgentSpawnBeginEvent), + CollabAgentSpawnEnd(&'a codex_protocol::protocol::CollabAgentSpawnEndEvent), + CollabAgentInteractionBegin(&'a codex_protocol::protocol::CollabAgentInteractionBeginEvent), + CollabAgentInteractionEnd(&'a codex_protocol::protocol::CollabAgentInteractionEndEvent), + CollabWaitingBegin(&'a codex_protocol::protocol::CollabWaitingBeginEvent), + CollabWaitingEnd(&'a codex_protocol::protocol::CollabWaitingEndEvent), + CollabCloseBegin(&'a codex_protocol::protocol::CollabCloseBeginEvent), + CollabCloseEnd(&'a codex_protocol::protocol::CollabCloseEndEvent), +} + +impl Serialize for ToolRuntimePayload<'_> { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + match self { + ToolRuntimePayload::ExecCommandBegin(event) => event.serialize(serializer), + ToolRuntimePayload::ExecCommandEnd(event) => event.serialize(serializer), + ToolRuntimePayload::PatchApplyBegin(event) => event.serialize(serializer), + ToolRuntimePayload::PatchApplyEnd(event) => event.serialize(serializer), + ToolRuntimePayload::McpToolCallBegin(event) => event.serialize(serializer), + ToolRuntimePayload::McpToolCallEnd(event) => event.serialize(serializer), + ToolRuntimePayload::CollabAgentSpawnBegin(event) => event.serialize(serializer), + ToolRuntimePayload::CollabAgentSpawnEnd(event) => event.serialize(serializer), + ToolRuntimePayload::CollabAgentInteractionBegin(event) => event.serialize(serializer), + ToolRuntimePayload::CollabAgentInteractionEnd(event) => event.serialize(serializer), + ToolRuntimePayload::CollabWaitingBegin(event) => event.serialize(serializer), + ToolRuntimePayload::CollabWaitingEnd(event) => event.serialize(serializer), + ToolRuntimePayload::CollabCloseBegin(event) => event.serialize(serializer), + ToolRuntimePayload::CollabCloseEnd(event) => event.serialize(serializer), + } + } +} + +pub(crate) fn tool_runtime_trace_event(event: &EventMsg) -> Option> { + match event { + EventMsg::ExecCommandBegin(event) if event.source != ExecCommandSource::UserShell => { + Some(ToolRuntimeTraceEvent::Started { + tool_call_id: &event.call_id, + payload: ToolRuntimePayload::ExecCommandBegin(event), + }) + } + EventMsg::ExecCommandEnd(event) if event.source != ExecCommandSource::UserShell => { + Some(ToolRuntimeTraceEvent::Ended { + tool_call_id: &event.call_id, + status: event.status.trace_execution_status(), + payload: ToolRuntimePayload::ExecCommandEnd(event), + }) + } + EventMsg::PatchApplyBegin(event) => Some(ToolRuntimeTraceEvent::Started { + tool_call_id: &event.call_id, + payload: ToolRuntimePayload::PatchApplyBegin(event), + }), + EventMsg::PatchApplyEnd(event) => Some(ToolRuntimeTraceEvent::Ended { + tool_call_id: &event.call_id, + status: event.status.trace_execution_status(), + payload: ToolRuntimePayload::PatchApplyEnd(event), + }), + EventMsg::McpToolCallBegin(event) => Some(ToolRuntimeTraceEvent::Started { + tool_call_id: &event.call_id, + payload: ToolRuntimePayload::McpToolCallBegin(event), + }), + EventMsg::McpToolCallEnd(event) => Some(ToolRuntimeTraceEvent::Ended { + tool_call_id: &event.call_id, + status: if event.result.is_ok() { + ExecutionStatus::Completed + } else { + ExecutionStatus::Failed + }, + payload: ToolRuntimePayload::McpToolCallEnd(event), + }), + EventMsg::CollabAgentSpawnBegin(event) => Some(ToolRuntimeTraceEvent::Started { + tool_call_id: &event.call_id, + payload: ToolRuntimePayload::CollabAgentSpawnBegin(event), + }), + EventMsg::CollabAgentSpawnEnd(event) => Some(ToolRuntimeTraceEvent::Ended { + tool_call_id: &event.call_id, + // A spawn end without a child thread id means the runtime boundary + // finished without creating the requested child thread. + status: if event.new_thread_id.is_some() { + ExecutionStatus::Completed + } else { + ExecutionStatus::Failed + }, + payload: ToolRuntimePayload::CollabAgentSpawnEnd(event), + }), + EventMsg::CollabAgentInteractionBegin(event) => Some(ToolRuntimeTraceEvent::Started { + tool_call_id: &event.call_id, + payload: ToolRuntimePayload::CollabAgentInteractionBegin(event), + }), + EventMsg::CollabAgentInteractionEnd(event) => Some(ToolRuntimeTraceEvent::Ended { + tool_call_id: &event.call_id, + status: ExecutionStatus::Completed, + payload: ToolRuntimePayload::CollabAgentInteractionEnd(event), + }), + EventMsg::CollabWaitingBegin(event) => Some(ToolRuntimeTraceEvent::Started { + tool_call_id: &event.call_id, + payload: ToolRuntimePayload::CollabWaitingBegin(event), + }), + EventMsg::CollabWaitingEnd(event) => Some(ToolRuntimeTraceEvent::Ended { + tool_call_id: &event.call_id, + status: ExecutionStatus::Completed, + payload: ToolRuntimePayload::CollabWaitingEnd(event), + }), + EventMsg::CollabCloseBegin(event) => Some(ToolRuntimeTraceEvent::Started { + tool_call_id: &event.call_id, + payload: ToolRuntimePayload::CollabCloseBegin(event), + }), + EventMsg::CollabCloseEnd(event) => Some(ToolRuntimeTraceEvent::Ended { + tool_call_id: &event.call_id, + status: ExecutionStatus::Completed, + payload: ToolRuntimePayload::CollabCloseEnd(event), + }), + EventMsg::Error(_) + | EventMsg::Warning(_) + | EventMsg::GuardianWarning(_) + | EventMsg::RealtimeConversationStarted(_) + | EventMsg::RealtimeConversationRealtime(_) + | EventMsg::RealtimeConversationClosed(_) + | EventMsg::RealtimeConversationSdp(_) + | EventMsg::ModelReroute(_) + | EventMsg::ModelVerification(_) + | EventMsg::ContextCompacted(_) + | EventMsg::ThreadRolledBack(_) + | EventMsg::TurnStarted(_) + | EventMsg::TurnComplete(_) + | EventMsg::TokenCount(_) + | EventMsg::AgentMessage(_) + | EventMsg::UserMessage(_) + | EventMsg::AgentMessageDelta(_) + | EventMsg::AgentReasoning(_) + | EventMsg::AgentReasoningDelta(_) + | EventMsg::AgentReasoningRawContent(_) + | EventMsg::AgentReasoningRawContentDelta(_) + | EventMsg::AgentReasoningSectionBreak(_) + | EventMsg::SessionConfigured(_) + | EventMsg::ThreadNameUpdated(_) + | EventMsg::McpStartupUpdate(_) + | EventMsg::McpStartupComplete(_) + | EventMsg::WebSearchBegin(_) + | EventMsg::WebSearchEnd(_) + | EventMsg::ImageGenerationBegin(_) + | EventMsg::ImageGenerationEnd(_) + | EventMsg::ExecCommandBegin(_) + | EventMsg::ExecCommandOutputDelta(_) + | EventMsg::TerminalInteraction(_) + | EventMsg::ExecCommandEnd(_) + | EventMsg::ViewImageToolCall(_) + | EventMsg::ExecApprovalRequest(_) + | EventMsg::RequestPermissions(_) + | EventMsg::RequestUserInput(_) + | EventMsg::DynamicToolCallRequest(_) + | EventMsg::DynamicToolCallResponse(_) + | EventMsg::ElicitationRequest(_) + | EventMsg::ApplyPatchApprovalRequest(_) + | EventMsg::GuardianAssessment(_) + | EventMsg::DeprecationNotice(_) + | EventMsg::BackgroundEvent(_) + | EventMsg::UndoStarted(_) + | EventMsg::UndoCompleted(_) + | EventMsg::StreamError(_) + | EventMsg::PatchApplyUpdated(_) + | EventMsg::TurnDiff(_) + | EventMsg::GetHistoryEntryResponse(_) + | EventMsg::McpListToolsResponse(_) + | EventMsg::ListSkillsResponse(_) + | EventMsg::RealtimeConversationListVoicesResponse(_) + | EventMsg::SkillsUpdateAvailable + | EventMsg::PlanUpdate(_) + | EventMsg::TurnAborted(_) + | EventMsg::ShutdownComplete + | EventMsg::EnteredReviewMode(_) + | EventMsg::ExitedReviewMode(_) + | EventMsg::RawResponseItem(_) + | EventMsg::ItemStarted(_) + | EventMsg::ItemCompleted(_) + | EventMsg::HookStarted(_) + | EventMsg::HookCompleted(_) + | EventMsg::AgentMessageContentDelta(_) + | EventMsg::PlanDelta(_) + | EventMsg::ReasoningContentDelta(_) + | EventMsg::ReasoningRawContentDelta(_) + | EventMsg::CollabResumeBegin(_) + | EventMsg::CollabResumeEnd(_) => None, + } +} + +pub(crate) fn wrapped_protocol_event_type(event: &EventMsg) -> Option<&'static str> { + match event { + EventMsg::SessionConfigured(_) => Some("session_configured"), + EventMsg::TurnStarted(_) => Some("turn_started"), + EventMsg::TurnComplete(_) => Some("turn_complete"), + EventMsg::TurnAborted(_) => Some("turn_aborted"), + EventMsg::ThreadNameUpdated(_) => Some("thread_name_updated"), + EventMsg::ThreadRolledBack(_) => Some("thread_rolled_back"), + EventMsg::Error(_) => Some("error"), + EventMsg::Warning(_) => Some("warning"), + EventMsg::ShutdownComplete => Some("shutdown_complete"), + EventMsg::GuardianWarning(_) + | EventMsg::RealtimeConversationStarted(_) + | EventMsg::RealtimeConversationRealtime(_) + | EventMsg::RealtimeConversationClosed(_) + | EventMsg::RealtimeConversationSdp(_) + | EventMsg::ModelReroute(_) + | EventMsg::ModelVerification(_) + | EventMsg::ContextCompacted(_) + | EventMsg::TokenCount(_) + | EventMsg::AgentMessage(_) + | EventMsg::UserMessage(_) + | EventMsg::AgentMessageDelta(_) + | EventMsg::AgentReasoning(_) + | EventMsg::AgentReasoningDelta(_) + | EventMsg::AgentReasoningRawContent(_) + | EventMsg::AgentReasoningRawContentDelta(_) + | EventMsg::AgentReasoningSectionBreak(_) + | EventMsg::McpStartupUpdate(_) + | EventMsg::McpStartupComplete(_) + | EventMsg::McpToolCallBegin(_) + | EventMsg::McpToolCallEnd(_) + | EventMsg::WebSearchBegin(_) + | EventMsg::WebSearchEnd(_) + | EventMsg::ImageGenerationBegin(_) + | EventMsg::ImageGenerationEnd(_) + | EventMsg::ExecCommandBegin(_) + | EventMsg::ExecCommandOutputDelta(_) + | EventMsg::TerminalInteraction(_) + | EventMsg::ExecCommandEnd(_) + | EventMsg::ViewImageToolCall(_) + | EventMsg::ExecApprovalRequest(_) + | EventMsg::RequestPermissions(_) + | EventMsg::RequestUserInput(_) + | EventMsg::DynamicToolCallRequest(_) + | EventMsg::DynamicToolCallResponse(_) + | EventMsg::ElicitationRequest(_) + | EventMsg::ApplyPatchApprovalRequest(_) + | EventMsg::GuardianAssessment(_) + | EventMsg::DeprecationNotice(_) + | EventMsg::BackgroundEvent(_) + | EventMsg::UndoStarted(_) + | EventMsg::UndoCompleted(_) + | EventMsg::StreamError(_) + | EventMsg::PatchApplyBegin(_) + | EventMsg::PatchApplyUpdated(_) + | EventMsg::PatchApplyEnd(_) + | EventMsg::TurnDiff(_) + | EventMsg::GetHistoryEntryResponse(_) + | EventMsg::McpListToolsResponse(_) + | EventMsg::ListSkillsResponse(_) + | EventMsg::RealtimeConversationListVoicesResponse(_) + | EventMsg::SkillsUpdateAvailable + | EventMsg::PlanUpdate(_) + | EventMsg::EnteredReviewMode(_) + | EventMsg::ExitedReviewMode(_) + | EventMsg::RawResponseItem(_) + | EventMsg::ItemStarted(_) + | EventMsg::ItemCompleted(_) + | EventMsg::HookStarted(_) + | EventMsg::HookCompleted(_) + | EventMsg::AgentMessageContentDelta(_) + | EventMsg::PlanDelta(_) + | EventMsg::ReasoningContentDelta(_) + | EventMsg::ReasoningRawContentDelta(_) + | EventMsg::CollabAgentSpawnBegin(_) + | EventMsg::CollabAgentSpawnEnd(_) + | EventMsg::CollabAgentInteractionBegin(_) + | EventMsg::CollabAgentInteractionEnd(_) + | EventMsg::CollabWaitingBegin(_) + | EventMsg::CollabWaitingEnd(_) + | EventMsg::CollabCloseBegin(_) + | EventMsg::CollabCloseEnd(_) + | EventMsg::CollabResumeBegin(_) + | EventMsg::CollabResumeEnd(_) => None, + } +} + +trait TraceExecutionStatus { + fn trace_execution_status(&self) -> ExecutionStatus; +} + +impl TraceExecutionStatus for ExecCommandStatus { + fn trace_execution_status(&self) -> ExecutionStatus { + match self { + ExecCommandStatus::Completed => ExecutionStatus::Completed, + ExecCommandStatus::Failed => ExecutionStatus::Failed, + ExecCommandStatus::Declined => ExecutionStatus::Cancelled, + } + } +} + +impl TraceExecutionStatus for PatchApplyStatus { + fn trace_execution_status(&self) -> ExecutionStatus { + match self { + PatchApplyStatus::Completed => ExecutionStatus::Completed, + PatchApplyStatus::Failed => ExecutionStatus::Failed, + PatchApplyStatus::Declined => ExecutionStatus::Cancelled, + } + } +} + +fn execution_status_for_abort_reason(reason: &TurnAbortReason) -> ExecutionStatus { + match reason { + TurnAbortReason::Interrupted | TurnAbortReason::Replaced | TurnAbortReason::ReviewEnded => { + ExecutionStatus::Cancelled + } + } +} diff --git a/codex-rs/rollout-trace/src/recorder.rs b/codex-rs/rollout-trace/src/recorder.rs deleted file mode 100644 index 833355f943ae..000000000000 --- a/codex-rs/rollout-trace/src/recorder.rs +++ /dev/null @@ -1,331 +0,0 @@ -//! Opt-in hot-path producer for rollout trace bundles. - -use std::path::Path; -use std::path::PathBuf; -use std::sync::Arc; - -use codex_protocol::ThreadId; -use codex_protocol::protocol::SessionSource; -use serde::Serialize; -use tracing::debug; -use tracing::warn; -use uuid::Uuid; - -use crate::AgentThreadId; -use crate::CodeCellTraceContext; -use crate::CodexTurnId; -use crate::CompactionId; -use crate::CompactionTraceContext; -use crate::InferenceTraceContext; -use crate::RawPayloadKind; -use crate::RawPayloadRef; -use crate::RawTraceEventPayload; -use crate::ToolDispatchInvocation; -use crate::ToolDispatchTraceContext; -use crate::TraceWriter; - -/// Environment variable that enables local trace-bundle recording. -/// -/// The value is a root directory. Each independent root session gets one child -/// bundle directory. Spawned child threads share their root session's bundle so -/// one reduced `state.json` describes the whole multi-agent rollout tree. -pub const CODEX_ROLLOUT_TRACE_ROOT_ENV: &str = "CODEX_ROLLOUT_TRACE_ROOT"; - -/// Lightweight handle stored in `SessionServices`. -/// -/// Cloning the handle is cheap; all sequencing and file ownership remains -/// inside `TraceWriter`. Disabled handles intentionally accept the same calls -/// as enabled handles so hot-path session code can describe traceable events -/// without repeatedly branching on whether diagnostic recording is enabled. -#[derive(Clone, Debug)] -pub struct RolloutTraceRecorder { - state: RolloutTraceRecorderState, -} - -#[derive(Clone, Debug)] -enum RolloutTraceRecorderState { - Disabled, - Enabled(EnabledRolloutTraceRecorder), -} - -#[derive(Clone, Debug)] -struct EnabledRolloutTraceRecorder { - writer: Arc, -} - -/// Metadata captured once at thread/session start. -/// -/// This payload is intentionally operational rather than reduced: it is a raw -/// payload that later reducers can mine as the reduced thread model evolves. -#[derive(Serialize)] -pub struct ThreadStartedTraceMetadata { - pub thread_id: String, - pub agent_path: String, - pub task_name: Option, - pub nickname: Option, - pub agent_role: Option, - pub session_source: SessionSource, - pub cwd: PathBuf, - pub rollout_path: Option, - pub model: String, - pub provider_name: String, - pub approval_policy: String, - pub sandbox_policy: String, -} - -impl RolloutTraceRecorder { - /// Builds a recorder handle that accepts trace calls and records nothing. - pub fn disabled() -> Self { - Self { - state: RolloutTraceRecorderState::Disabled, - } - } - - /// Creates and starts a root trace bundle, or returns a disabled recorder. - /// - /// Trace startup is best-effort. A tracing failure must not make the Codex - /// session unusable, because traces are diagnostic and can be enabled while - /// debugging unrelated production failures. The returned recorder has not - /// emitted `ThreadStarted`; session setup records that event uniformly for - /// root and inherited child recorders. - pub fn create_root_or_disabled(thread_id: ThreadId) -> Self { - let Some(root) = std::env::var_os(CODEX_ROLLOUT_TRACE_ROOT_ENV) else { - return Self::disabled(); - }; - let root = PathBuf::from(root); - match Self::create_in_root(root.as_path(), thread_id) { - Ok(recorder) => recorder, - Err(err) => { - warn!("failed to initialize rollout trace recorder: {err:#}"); - Self::disabled() - } - } - } - - /// Creates a trace bundle in a known root directory. - /// - /// This is public so integration tests in downstream crates can replay the - /// exact bundle they produced without mutating process environment. - pub fn create_in_root_for_test(root: &Path, thread_id: ThreadId) -> anyhow::Result { - Self::create_in_root(root, thread_id) - } - - fn create_in_root(root: &Path, thread_id: ThreadId) -> anyhow::Result { - let trace_id = Uuid::new_v4().to_string(); - let thread_id = thread_id.to_string(); - let bundle_dir = root.join(format!("trace-{trace_id}-{thread_id}")); - let writer = TraceWriter::create( - &bundle_dir, - trace_id.clone(), - thread_id.clone(), - thread_id.clone(), - )?; - let recorder = EnabledRolloutTraceRecorder { - writer: Arc::new(writer), - }; - - recorder.append_best_effort(RawTraceEventPayload::RolloutStarted { - trace_id, - root_thread_id: thread_id, - }); - - debug!("recording rollout trace at {}", bundle_dir.display()); - Ok(Self::enabled(recorder)) - } - - fn enabled(inner: EnabledRolloutTraceRecorder) -> Self { - Self { - state: RolloutTraceRecorderState::Enabled(inner), - } - } - - /// Emits the lifecycle event and metadata for one thread in this rollout tree. - /// - /// Root sessions call this immediately after `RolloutStarted`; spawned - /// child sessions call it on the inherited recorder. Keeping children in - /// the root bundle preserves one raw payload namespace and one reduced - /// `RolloutTrace` for the whole multi-agent task. - pub fn record_thread_started(&self, metadata: ThreadStartedTraceMetadata) { - let RolloutTraceRecorderState::Enabled(recorder) = &self.state else { - return; - }; - let metadata_payload = - recorder.write_json_payload_best_effort(RawPayloadKind::SessionMetadata, &metadata); - recorder.append_best_effort(RawTraceEventPayload::ThreadStarted { - thread_id: metadata.thread_id, - agent_path: metadata.agent_path, - metadata_payload, - }); - } - - /// Emits a turn-start lifecycle event. - /// - /// Most production turn lifecycle wiring lives outside this PR layer, but - /// trace-focused integration tests need a small explicit hook so reducer - /// inputs remain valid without exercising the full session loop. - pub fn record_codex_turn_started( - &self, - thread_id: impl Into, - codex_turn_id: impl Into, - ) { - let RolloutTraceRecorderState::Enabled(recorder) = &self.state else { - return; - }; - let thread_id = thread_id.into(); - let codex_turn_id = codex_turn_id.into(); - recorder.append_with_context_best_effort( - thread_id.clone(), - codex_turn_id.clone(), - RawTraceEventPayload::CodexTurnStarted { - codex_turn_id, - thread_id, - }, - ); - } - - /// Starts a first-class code-mode cell lifecycle and returns its trace handle. - pub fn start_code_cell_trace( - &self, - thread_id: impl Into, - codex_turn_id: impl Into, - runtime_cell_id: impl Into, - model_visible_call_id: impl Into, - source_js: impl Into, - ) -> CodeCellTraceContext { - let context = self.code_cell_trace_context(thread_id, codex_turn_id, runtime_cell_id); - context.record_started(model_visible_call_id, source_js); - context - } - - /// Builds a trace handle for an already-started code-mode runtime cell. - pub fn code_cell_trace_context( - &self, - thread_id: impl Into, - codex_turn_id: impl Into, - runtime_cell_id: impl Into, - ) -> CodeCellTraceContext { - let RolloutTraceRecorderState::Enabled(recorder) = &self.state else { - return CodeCellTraceContext::disabled(); - }; - - CodeCellTraceContext::enabled( - Arc::clone(&recorder.writer), - thread_id, - codex_turn_id, - runtime_cell_id, - ) - } - - /// Starts one dispatch-level tool lifecycle and returns its trace handle. - /// - /// `invocation` is lazy because adapting core tool objects into trace-owned - /// payloads can clone large arguments. Disabled tracing should not pay that - /// cost on the hot tool-dispatch path. - pub fn start_tool_dispatch_trace( - &self, - invocation: impl FnOnce() -> Option, - ) -> ToolDispatchTraceContext { - let RolloutTraceRecorderState::Enabled(recorder) = &self.state else { - return ToolDispatchTraceContext::disabled(); - }; - let Some(invocation) = invocation() else { - return ToolDispatchTraceContext::disabled(); - }; - - ToolDispatchTraceContext::start(Arc::clone(&recorder.writer), invocation) - } - - /// Builds reusable inference trace context for one Codex turn. - /// - /// The returned context is intentionally not "an inference call" yet. - /// Transport code owns retry/fallback attempts and calls `start_attempt` - /// only after it has built the concrete request payload for that attempt. - pub fn inference_trace_context( - &self, - thread_id: impl Into, - codex_turn_id: impl Into, - model: impl Into, - provider_name: impl Into, - ) -> InferenceTraceContext { - let RolloutTraceRecorderState::Enabled(recorder) = &self.state else { - return InferenceTraceContext::disabled(); - }; - - InferenceTraceContext::enabled( - Arc::clone(&recorder.writer), - thread_id.into(), - codex_turn_id.into(), - model.into(), - provider_name.into(), - ) - } - - /// Builds remote-compaction trace context for one checkpoint. - /// - /// Rollout tracing currently has a first-class checkpoint model only for remote compaction. - /// The compact endpoint is a model-facing request whose output replaces live history, so it - /// needs both request/response attempt events and a later checkpoint event when processed - /// replacement history is installed. - pub fn compaction_trace_context( - &self, - thread_id: impl Into, - codex_turn_id: impl Into, - compaction_id: impl Into, - model: impl Into, - provider_name: impl Into, - ) -> CompactionTraceContext { - let RolloutTraceRecorderState::Enabled(recorder) = &self.state else { - return CompactionTraceContext::disabled(); - }; - - CompactionTraceContext::enabled( - Arc::clone(&recorder.writer), - thread_id.into(), - codex_turn_id.into(), - compaction_id.into(), - model.into(), - provider_name.into(), - ) - } -} - -impl EnabledRolloutTraceRecorder { - fn write_json_payload_best_effort( - &self, - kind: RawPayloadKind, - payload: &impl Serialize, - ) -> Option { - match self.writer.write_json_payload(kind, payload) { - Ok(payload_ref) => Some(payload_ref), - Err(err) => { - warn!("failed to write rollout trace payload: {err:#}"); - None - } - } - } - - fn append_best_effort(&self, payload: RawTraceEventPayload) { - if let Err(err) = self.writer.append(payload) { - warn!("failed to append rollout trace event: {err:#}"); - } - } - - fn append_with_context_best_effort( - &self, - thread_id: AgentThreadId, - codex_turn_id: CodexTurnId, - payload: RawTraceEventPayload, - ) { - let context = crate::RawTraceEventContext { - thread_id: Some(thread_id), - codex_turn_id: Some(codex_turn_id), - }; - if let Err(err) = self.writer.append_with_context(context, payload) { - warn!("failed to append rollout trace event: {err:#}"); - } - } -} - -#[cfg(test)] -#[path = "recorder_tests.rs"] -mod tests; diff --git a/codex-rs/rollout-trace/src/thread.rs b/codex-rs/rollout-trace/src/thread.rs new file mode 100644 index 000000000000..47526807a17c --- /dev/null +++ b/codex-rs/rollout-trace/src/thread.rs @@ -0,0 +1,509 @@ +//! Thread-scoped rollout trace helpers. +//! +//! A rollout bundle can contain a root thread plus spawned child threads. This +//! context owns the stable identity for one thread inside that bundle. Keeping +//! thread-local event methods here avoids repeatedly plumbing `thread_id` +//! through session code. + +use codex_protocol::protocol::AgentStatus; +use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::SessionSource; +use serde::Serialize; +use std::path::Path; +use std::path::PathBuf; +use std::sync::Arc; +use tracing::debug; +use tracing::warn; +use uuid::Uuid; + +use crate::AgentThreadId; +use crate::CodeCellTraceContext; +use crate::CodexTurnId; +use crate::CompactionId; +use crate::CompactionTraceContext; +use crate::InferenceTraceContext; +use crate::RawPayloadKind; +use crate::RawPayloadRef; +use crate::RawTraceEventContext; +use crate::RawTraceEventPayload; +use crate::RolloutStatus; +use crate::ToolDispatchInvocation; +use crate::ToolDispatchTraceContext; +use crate::TraceWriter; +use crate::protocol_event::codex_turn_trace_event; +use crate::protocol_event::tool_runtime_trace_event; +use crate::protocol_event::wrapped_protocol_event_type; + +/// Environment variable that enables local trace-bundle recording. +/// +/// The value is a root directory. Each independent root session gets one child +/// bundle directory. Spawned child threads share their root session's bundle so +/// one reduced `state.json` describes the whole multi-agent rollout tree. +pub const CODEX_ROLLOUT_TRACE_ROOT_ENV: &str = "CODEX_ROLLOUT_TRACE_ROOT"; + +/// Metadata captured once at thread/session start. +/// +/// This payload is intentionally operational rather than reduced: it is a raw +/// payload that later reducers can mine as the reduced thread model evolves. +#[derive(Serialize)] +pub struct ThreadStartedTraceMetadata { + pub thread_id: String, + pub agent_path: String, + pub task_name: Option, + pub nickname: Option, + pub agent_role: Option, + pub session_source: SessionSource, + pub cwd: std::path::PathBuf, + pub rollout_path: Option, + pub model: String, + pub provider_name: String, + pub approval_policy: String, + pub sandbox_policy: String, +} + +/// Trace-only payload for a child completion notification delivered to its parent. +#[derive(Serialize)] +pub struct AgentResultTracePayload<'a> { + pub child_agent_path: &'a str, + pub message: &'a str, + pub status: &'a AgentStatus, +} + +/// No-op capable trace handle for one thread in a rollout bundle. +#[derive(Clone, Debug)] +pub struct ThreadTraceContext { + state: ThreadTraceContextState, +} + +#[derive(Clone, Debug)] +enum ThreadTraceContextState { + Disabled, + Enabled(EnabledThreadTraceContext), +} + +#[derive(Clone, Debug)] +struct EnabledThreadTraceContext { + writer: Arc, + root_thread_id: AgentThreadId, + thread_id: AgentThreadId, +} + +impl ThreadTraceContext { + /// Builds a context that accepts trace calls and records nothing. + pub fn disabled() -> Self { + Self { + state: ThreadTraceContextState::Disabled, + } + } + + /// Starts a root thread trace from `CODEX_ROLLOUT_TRACE_ROOT`, or disables tracing. + /// + /// Trace startup is best-effort. A tracing failure must not make the Codex + /// session unusable, because traces are diagnostic and can be enabled while + /// debugging unrelated production failures. + pub fn start_root_or_disabled(metadata: ThreadStartedTraceMetadata) -> Self { + let Some(root) = std::env::var_os(CODEX_ROLLOUT_TRACE_ROOT_ENV) else { + return Self::disabled(); + }; + let root = PathBuf::from(root); + match start_root_in_root(root.as_path(), metadata) { + Ok(context) => context, + Err(err) => { + warn!("failed to initialize rollout trace bundle: {err:#}"); + Self::disabled() + } + } + } + + /// Starts a root trace in a known directory. + /// + /// This is public for tests that need replayable trace bundles without + /// mutating process environment. + pub fn start_root_in_root_for_test( + root: &Path, + metadata: ThreadStartedTraceMetadata, + ) -> anyhow::Result { + start_root_in_root(root, metadata) + } + + /// Starts one thread lifecycle inside an existing rollout bundle. + pub(crate) fn start( + writer: Arc, + root_thread_id: AgentThreadId, + metadata: ThreadStartedTraceMetadata, + ) -> Self { + let context = EnabledThreadTraceContext { + writer, + root_thread_id, + thread_id: metadata.thread_id.clone(), + }; + record_thread_started(&context, metadata); + Self { + state: ThreadTraceContextState::Enabled(context), + } + } + + /// Returns whether this handle will write trace events. + /// + /// Most methods have their own disabled fast path. Callers should branch on + /// this only when preparing trace payloads would otherwise clone data the + /// production path needs to move elsewhere. + pub fn is_enabled(&self) -> bool { + matches!(self.state, ThreadTraceContextState::Enabled(_)) + } + + /// Starts a fresh child thread in this context's rollout tree. + /// + /// Callers should use [`ThreadTraceContext::disabled`] for resumed children: + /// reusing the parent trace would emit a duplicate `ThreadStarted` event + /// for an existing thread id and make the bundle unreplayable. + pub fn start_child_thread_trace_or_disabled( + &self, + metadata: ThreadStartedTraceMetadata, + ) -> Self { + match &self.state { + ThreadTraceContextState::Disabled => Self::disabled(), + ThreadTraceContextState::Enabled(context) => Self::start( + Arc::clone(&context.writer), + context.root_thread_id.clone(), + metadata, + ), + } + } + + /// Emits terminal trace events for graceful thread shutdown. + /// + /// Spawned child sessions share their root bundle, so only the root + /// thread end closes the rollout. Child thread ends update the child thread + /// execution state without marking the whole bundle complete. + pub fn record_ended(&self, status: RolloutStatus) { + let ThreadTraceContextState::Enabled(context) = &self.state else { + return; + }; + context.append_best_effort(RawTraceEventPayload::ThreadEnded { + thread_id: context.thread_id.clone(), + status: status.clone(), + }); + if context.thread_id == context.root_thread_id { + context.append_best_effort(RawTraceEventPayload::RolloutEnded { status }); + } + } + + /// Wraps selected protocol events as raw trace breadcrumbs. + /// + /// High-volume stream deltas stay out of this wrapper; typed inference, + /// tool, terminal, and code-mode hooks provide the canonical runtime data. + pub fn record_protocol_event(&self, event: &EventMsg) { + let ThreadTraceContextState::Enabled(context) = &self.state else { + return; + }; + let Some(event_type) = wrapped_protocol_event_type(event) else { + return; + }; + let Some(event_payload) = + context.write_json_payload_best_effort(RawPayloadKind::ProtocolEvent, event) + else { + return; + }; + context.append_best_effort(RawTraceEventPayload::ProtocolEventObserved { + event_type: event_type.to_string(), + event_payload, + }); + } + + /// Emits typed Codex turn lifecycle events from protocol lifecycle events. + pub fn record_codex_turn_event(&self, default_turn_id: &str, event: &EventMsg) { + let ThreadTraceContextState::Enabled(context) = &self.state else { + return; + }; + let Some(trace_event) = + codex_turn_trace_event(context.thread_id.clone(), default_turn_id, event) + else { + return; + }; + context.append_with_context_best_effort( + trace_event.context_turn_id.clone(), + trace_event.payload, + ); + } + + /// Emits typed runtime tool events from existing protocol lifecycle events. + /// + /// These events are runtime observations on an already-dispatched tool. The + /// dispatch trace records the caller-facing boundary; these payloads explain + /// what Codex did while executing that boundary. + pub fn record_tool_call_event(&self, codex_turn_id: impl Into, event: &EventMsg) { + let ThreadTraceContextState::Enabled(context) = &self.state else { + return; + }; + let Some(trace_event) = tool_runtime_trace_event(event) else { + return; + }; + let Some(payload) = context.raw_tool_runtime_payload(trace_event) else { + return; + }; + context.append_with_context_best_effort(codex_turn_id.into(), payload); + } + + /// Emits the v2 child-to-parent completion message as an explicit graph edge. + /// + /// The notification is runtime delivery from a completed child turn into + /// the parent's mailbox, not a tool call executed by the child. Recording it + /// directly preserves timing and source without making the reducer infer + /// the edge from a later parent prompt snapshot. + pub fn record_agent_result_interaction( + &self, + child_codex_turn_id: impl Into, + parent_thread_id: impl Into, + payload: &AgentResultTracePayload<'_>, + ) { + let ThreadTraceContextState::Enabled(context) = &self.state else { + return; + }; + let child_codex_turn_id = child_codex_turn_id.into(); + let parent_thread_id = parent_thread_id.into(); + let carried_payload = + context.write_json_payload_best_effort(RawPayloadKind::AgentResult, payload); + context.append_with_context_best_effort( + child_codex_turn_id.clone(), + RawTraceEventPayload::AgentResultObserved { + edge_id: format!( + "edge:agent_result:{}:{child_codex_turn_id}:{parent_thread_id}", + context.thread_id + ), + child_thread_id: context.thread_id.clone(), + child_codex_turn_id, + parent_thread_id, + message: payload.message.to_string(), + carried_payload, + }, + ); + } + + /// Emits a turn-start lifecycle event. + /// + /// Most production turn lifecycle wiring lives outside this PR layer, but + /// trace-focused integration tests need a small explicit hook so reducer + /// inputs remain valid without exercising the full session loop. + pub fn record_codex_turn_started(&self, codex_turn_id: impl Into) { + let ThreadTraceContextState::Enabled(context) = &self.state else { + return; + }; + let codex_turn_id = codex_turn_id.into(); + context.append_with_context_best_effort( + codex_turn_id.clone(), + RawTraceEventPayload::CodexTurnStarted { + codex_turn_id, + thread_id: context.thread_id.clone(), + }, + ); + } + + /// Starts a first-class code-mode cell lifecycle and returns its trace handle. + pub fn start_code_cell_trace( + &self, + codex_turn_id: impl Into, + runtime_cell_id: impl Into, + model_visible_call_id: impl Into, + source_js: impl Into, + ) -> CodeCellTraceContext { + let context = self.code_cell_trace_context(codex_turn_id, runtime_cell_id); + context.record_started(model_visible_call_id, source_js); + context + } + + /// Builds a trace handle for an already-started code-mode runtime cell. + pub fn code_cell_trace_context( + &self, + codex_turn_id: impl Into, + runtime_cell_id: impl Into, + ) -> CodeCellTraceContext { + let ThreadTraceContextState::Enabled(context) = &self.state else { + return CodeCellTraceContext::disabled(); + }; + CodeCellTraceContext::enabled( + Arc::clone(&context.writer), + context.thread_id.clone(), + codex_turn_id, + runtime_cell_id, + ) + } + + /// Starts one dispatch-level tool lifecycle and returns its trace handle. + /// + /// `invocation` is lazy because adapting core tool objects into trace-owned + /// payloads can clone large arguments. Disabled tracing should not pay that + /// cost on the hot tool-dispatch path. + pub fn start_tool_dispatch_trace( + &self, + invocation: impl FnOnce() -> Option, + ) -> ToolDispatchTraceContext { + let ThreadTraceContextState::Enabled(context) = &self.state else { + return ToolDispatchTraceContext::disabled(); + }; + let Some(invocation) = invocation() else { + return ToolDispatchTraceContext::disabled(); + }; + ToolDispatchTraceContext::start(Arc::clone(&context.writer), invocation) + } + + /// Builds reusable inference trace context for one Codex turn. + /// + /// The returned context is intentionally not "an inference call" yet. + /// Transport code owns retry/fallback attempts and calls `start_attempt` + /// only after it has built the concrete request payload for that attempt. + pub fn inference_trace_context( + &self, + codex_turn_id: impl Into, + model: impl Into, + provider_name: impl Into, + ) -> InferenceTraceContext { + let ThreadTraceContextState::Enabled(context) = &self.state else { + return InferenceTraceContext::disabled(); + }; + InferenceTraceContext::enabled( + Arc::clone(&context.writer), + context.thread_id.clone(), + codex_turn_id.into(), + model.into(), + provider_name.into(), + ) + } + + /// Builds remote-compaction trace context for one checkpoint. + /// + /// Rollout tracing currently has a first-class checkpoint model only for remote compaction. + /// The compact endpoint is a model-facing request whose output replaces live history, so it + /// needs both request/response attempt events and a later checkpoint event when processed + /// replacement history is installed. + pub fn compaction_trace_context( + &self, + codex_turn_id: impl Into, + compaction_id: impl Into, + model: impl Into, + provider_name: impl Into, + ) -> CompactionTraceContext { + let ThreadTraceContextState::Enabled(context) = &self.state else { + return CompactionTraceContext::disabled(); + }; + CompactionTraceContext::enabled( + Arc::clone(&context.writer), + context.thread_id.clone(), + codex_turn_id.into(), + compaction_id.into(), + model.into(), + provider_name.into(), + ) + } +} + +fn start_root_in_root( + root: &Path, + metadata: ThreadStartedTraceMetadata, +) -> anyhow::Result { + let trace_id = Uuid::new_v4().to_string(); + let thread_id = metadata.thread_id.clone(); + let bundle_dir = root.join(format!("trace-{trace_id}-{thread_id}")); + let writer = TraceWriter::create( + &bundle_dir, + trace_id.clone(), + thread_id.clone(), + thread_id.clone(), + )?; + let writer = Arc::new(writer); + + if let Err(err) = writer.append(RawTraceEventPayload::RolloutStarted { + trace_id, + root_thread_id: thread_id.clone(), + }) { + warn!("failed to append rollout trace event: {err:#}"); + } + + debug!("recording rollout trace at {}", bundle_dir.display()); + Ok(ThreadTraceContext::start(writer, thread_id, metadata)) +} + +fn record_thread_started( + context: &EnabledThreadTraceContext, + metadata: ThreadStartedTraceMetadata, +) { + let metadata_payload = + context.write_json_payload_best_effort(RawPayloadKind::SessionMetadata, &metadata); + context.append_best_effort(RawTraceEventPayload::ThreadStarted { + thread_id: metadata.thread_id, + agent_path: metadata.agent_path, + metadata_payload, + }); +} + +impl EnabledThreadTraceContext { + fn write_json_payload_best_effort( + &self, + kind: RawPayloadKind, + payload: &impl Serialize, + ) -> Option { + match self.writer.write_json_payload(kind, payload) { + Ok(payload_ref) => Some(payload_ref), + Err(err) => { + warn!("failed to write rollout trace payload: {err:#}"); + None + } + } + } + + fn raw_tool_runtime_payload( + &self, + trace_event: crate::protocol_event::ToolRuntimeTraceEvent<'_>, + ) -> Option { + match trace_event { + crate::protocol_event::ToolRuntimeTraceEvent::Started { + tool_call_id, + payload, + } => { + let runtime_payload = self + .write_json_payload_best_effort(RawPayloadKind::ToolRuntimeEvent, &payload)?; + Some(RawTraceEventPayload::ToolCallRuntimeStarted { + tool_call_id: tool_call_id.to_string(), + runtime_payload, + }) + } + crate::protocol_event::ToolRuntimeTraceEvent::Ended { + tool_call_id, + status, + payload, + } => { + let runtime_payload = self + .write_json_payload_best_effort(RawPayloadKind::ToolRuntimeEvent, &payload)?; + Some(RawTraceEventPayload::ToolCallRuntimeEnded { + tool_call_id: tool_call_id.to_string(), + status, + runtime_payload, + }) + } + } + } + + fn append_best_effort(&self, payload: RawTraceEventPayload) { + if let Err(err) = self.writer.append(payload) { + warn!("failed to append rollout trace event: {err:#}"); + } + } + + fn append_with_context_best_effort( + &self, + codex_turn_id: CodexTurnId, + payload: RawTraceEventPayload, + ) { + let event_context = RawTraceEventContext { + thread_id: Some(self.thread_id.clone()), + codex_turn_id: Some(codex_turn_id), + }; + if let Err(err) = self.writer.append_with_context(event_context, payload) { + warn!("failed to append rollout trace event: {err:#}"); + } + } +} + +#[cfg(test)] +#[path = "thread_tests.rs"] +mod tests; diff --git a/codex-rs/rollout-trace/src/recorder_tests.rs b/codex-rs/rollout-trace/src/thread_tests.rs similarity index 60% rename from codex-rs/rollout-trace/src/recorder_tests.rs rename to codex-rs/rollout-trace/src/thread_tests.rs index be5f2a4d055d..4d582bbe0f36 100644 --- a/codex-rs/rollout-trace/src/recorder_tests.rs +++ b/codex-rs/rollout-trace/src/thread_tests.rs @@ -5,13 +5,18 @@ use std::path::PathBuf; use codex_protocol::AgentPath; use codex_protocol::ThreadId; +use codex_protocol::protocol::AgentStatus; +use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::SandboxPolicy; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::SubAgentSource; use tempfile::TempDir; use super::*; +use crate::AgentResultTracePayload; use crate::CompactionCheckpointTracePayload; +use crate::ExecutionStatus; +use crate::RawTraceEventPayload; use crate::RolloutStatus; use crate::replay_bundle; @@ -19,27 +24,30 @@ use crate::replay_bundle; fn create_in_root_writes_replayable_lifecycle_events() -> anyhow::Result<()> { let temp = TempDir::new()?; let thread_id = ThreadId::new(); - let recorder = - RolloutTraceRecorder::create_in_root(temp.path(), thread_id).expect("trace recorder"); - recorder.record_thread_started(ThreadStartedTraceMetadata { - thread_id: thread_id.to_string(), - agent_path: "/root".to_string(), - task_name: None, - nickname: None, - agent_role: None, - session_source: SessionSource::Exec, - cwd: PathBuf::from("/workspace"), - rollout_path: Some(PathBuf::from("/tmp/rollout.jsonl")), - model: "gpt-test".to_string(), - provider_name: "test-provider".to_string(), - approval_policy: "never".to_string(), - sandbox_policy: format!("{:?}", SandboxPolicy::DangerFullAccess), - }); + let thread_trace = ThreadTraceContext::start_root_in_root_for_test( + temp.path(), + ThreadStartedTraceMetadata { + thread_id: thread_id.to_string(), + agent_path: "/root".to_string(), + task_name: None, + nickname: None, + agent_role: None, + session_source: SessionSource::Exec, + cwd: PathBuf::from("/workspace"), + rollout_path: Some(PathBuf::from("/tmp/rollout.jsonl")), + model: "gpt-test".to_string(), + provider_name: "test-provider".to_string(), + approval_policy: "never".to_string(), + sandbox_policy: format!("{:?}", SandboxPolicy::DangerFullAccess), + }, + )?; + + thread_trace.record_ended(RolloutStatus::Completed); let bundle_dir = single_bundle_dir(temp.path())?; let replayed = replay_bundle(&bundle_dir)?; - assert_eq!(replayed.status, RolloutStatus::Running); + assert_eq!(replayed.status, RolloutStatus::Completed); assert_eq!(replayed.root_thread_id, thread_id.to_string()); assert_eq!(replayed.threads[&thread_id.to_string()].agent_path, "/root"); assert_eq!(replayed.raw_payloads.len(), 1); @@ -52,11 +60,12 @@ fn spawned_thread_start_appends_to_root_bundle() -> anyhow::Result<()> { let temp = TempDir::new()?; let root_thread_id = ThreadId::new(); let child_thread_id = ThreadId::new(); - let recorder = - RolloutTraceRecorder::create_in_root(temp.path(), root_thread_id).expect("trace recorder"); - recorder.record_thread_started(minimal_metadata(root_thread_id)); + let root_trace = ThreadTraceContext::start_root_in_root_for_test( + temp.path(), + minimal_metadata(root_thread_id), + )?; - recorder.record_thread_started(ThreadStartedTraceMetadata { + let child_trace = root_trace.start_child_thread_trace_or_disabled(ThreadStartedTraceMetadata { thread_id: child_thread_id.to_string(), agent_path: "/root/repo_file_counter".to_string(), task_name: Some("repo_file_counter".to_string()), @@ -78,6 +87,7 @@ fn spawned_thread_start_appends_to_root_bundle() -> anyhow::Result<()> { approval_policy: "never".to_string(), sandbox_policy: format!("{:?}", SandboxPolicy::DangerFullAccess), }); + child_trace.record_ended(RolloutStatus::Completed); let bundle_dir = single_bundle_dir(temp.path())?; let replayed = replay_bundle(&bundle_dir)?; @@ -92,7 +102,7 @@ fn spawned_thread_start_appends_to_root_bundle() -> anyhow::Result<()> { replayed.threads[&child_thread_id.to_string()] .execution .status, - crate::ExecutionStatus::Running + ExecutionStatus::Completed ); assert_eq!(replayed.raw_payloads.len(), 2); @@ -100,23 +110,33 @@ fn spawned_thread_start_appends_to_root_bundle() -> anyhow::Result<()> { } #[test] -fn disabled_recorder_accepts_trace_calls_without_writing() -> anyhow::Result<()> { +fn disabled_thread_context_accepts_trace_calls_without_writing() -> anyhow::Result<()> { let temp = TempDir::new()?; - let thread_id = ThreadId::new(); - let recorder = RolloutTraceRecorder::disabled(); + let thread_trace = ThreadTraceContext::disabled(); - recorder.record_thread_started(minimal_metadata(thread_id)); + thread_trace.record_ended(RolloutStatus::Completed); + thread_trace.record_protocol_event(&EventMsg::ShutdownComplete); + thread_trace.record_codex_turn_event("turn-1", &EventMsg::ShutdownComplete); + thread_trace.record_tool_call_event("turn-1", &EventMsg::ShutdownComplete); + thread_trace.record_agent_result_interaction( + "turn-1", + ThreadId::new(), + &AgentResultTracePayload { + child_agent_path: "/root/child", + message: "done", + status: &AgentStatus::Completed(Some("done".to_string())), + }, + ); let inference_trace = - recorder.inference_trace_context(thread_id, "turn-1", "gpt-test", "test-provider"); + thread_trace.inference_trace_context("turn-1", "gpt-test", "test-provider"); let inference_attempt = inference_trace.start_attempt(); inference_attempt.record_started(&serde_json::json!({ "kind": "inference" })); let token_usage: Option = None; inference_attempt.record_completed("response-1", &token_usage, &[]); inference_attempt.record_failed("inference failed"); - let compaction_trace = recorder.compaction_trace_context( - thread_id, + let compaction_trace = thread_trace.compaction_trace_context( "turn-1", "compaction-1", "gpt-test", @@ -132,7 +152,7 @@ fn disabled_recorder_accepts_trace_calls_without_writing() -> anyhow::Result<()> }); let built_dispatch_invocation = Cell::new(false); - let dispatch_trace = recorder.start_tool_dispatch_trace(|| { + let dispatch_trace = thread_trace.start_tool_dispatch_trace(|| { built_dispatch_invocation.set(true); None }); @@ -144,6 +164,31 @@ fn disabled_recorder_accepts_trace_calls_without_writing() -> anyhow::Result<()> Ok(()) } +#[test] +fn protocol_wrapper_records_selected_events_as_raw_payloads() -> anyhow::Result<()> { + let temp = TempDir::new()?; + let thread_id = ThreadId::new(); + let thread_trace = + ThreadTraceContext::start_root_in_root_for_test(temp.path(), minimal_metadata(thread_id))?; + + thread_trace.record_protocol_event(&EventMsg::ShutdownComplete); + + let event_log = fs::read_to_string(single_bundle_dir(temp.path())?.join("trace.jsonl"))?; + let protocol_event_seen = event_log.lines().any(|line| { + let event: crate::RawTraceEvent = serde_json::from_str(line).expect("raw trace event"); + matches!( + event.payload, + RawTraceEventPayload::ProtocolEventObserved { + event_type, + .. + } if event_type == "shutdown_complete" + ) + }); + + assert!(protocol_event_seen); + Ok(()) +} + fn minimal_metadata(thread_id: ThreadId) -> ThreadStartedTraceMetadata { ThreadStartedTraceMetadata { thread_id: thread_id.to_string(),