Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions codex-rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions codex-rs/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ codex-model-provider = { workspace = true }
codex-protocol = { workspace = true }
codex-responses-api-proxy = { workspace = true }
codex-rmcp-client = { workspace = true }
codex-rollout-trace = { workspace = true }
codex-sandboxing = { workspace = true }
codex-state = { workspace = true }
codex-stdio-to-uds = { workspace = true }
Expand Down
38 changes: 38 additions & 0 deletions codex-rs/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use codex_exec::Command as ExecCommand;
use codex_exec::ReviewArgs;
use codex_execpolicy::ExecPolicyCheckCommand;
use codex_responses_api_proxy::Args as ResponsesApiProxyArgs;
use codex_rollout_trace::REDUCED_STATE_FILE_NAME;
use codex_rollout_trace::replay_bundle;
use codex_state::StateRuntime;
use codex_state::state_db_path;
use codex_tui::AppExitInfo;
Expand Down Expand Up @@ -216,6 +218,10 @@ enum DebugSubcommand {
/// Render the model-visible prompt input list as JSON.
PromptInput(DebugPromptInputCommand),

/// Replay a rollout trace bundle and write reduced state JSON.
#[clap(hide = true)]
TraceReduce(DebugTraceReduceCommand),
Comment thread
cassirer-openai marked this conversation as resolved.

/// Internal: reset local memory state for a fresh start.
#[clap(hide = true)]
ClearMemories,
Expand Down Expand Up @@ -257,6 +263,17 @@ struct DebugModelsCommand {
bundled: bool,
}

#[derive(Debug, Parser)]
struct DebugTraceReduceCommand {
/// Trace bundle directory containing manifest.json and trace.jsonl.
#[arg(value_name = "TRACE_BUNDLE")]
trace_bundle: PathBuf,

/// Output path for reduced RolloutTrace JSON. Defaults to TRACE_BUNDLE/state.json.
#[arg(long = "output", short = 'o', value_name = "FILE")]
output: Option<PathBuf>,
}

#[derive(Debug, Parser)]
struct ResumeCommand {
/// Conversation/session id (UUID) or thread name. UUIDs take precedence if it parses.
Expand Down Expand Up @@ -1065,6 +1082,14 @@ async fn cli_main(arg0_paths: Arg0DispatchPaths) -> anyhow::Result<()> {
)
.await?;
}
DebugSubcommand::TraceReduce(cmd) => {
reject_remote_mode_for_subcommand(
root_remote.as_deref(),
root_remote_auth_token_env.as_deref(),
"debug trace-reduce",
)?;
run_debug_trace_reduce_command(cmd).await?;
}
DebugSubcommand::ClearMemories => {
reject_remote_mode_for_subcommand(
root_remote.as_deref(),
Expand Down Expand Up @@ -1265,6 +1290,19 @@ fn maybe_print_under_development_feature_warning(
);
}

async fn run_debug_trace_reduce_command(cmd: DebugTraceReduceCommand) -> anyhow::Result<()> {
let output = cmd
.output
.unwrap_or_else(|| cmd.trace_bundle.join(REDUCED_STATE_FILE_NAME));

let trace = replay_bundle(&cmd.trace_bundle)?;
let reduced_json = serde_json::to_vec_pretty(&trace)?;
tokio::fs::write(&output, reduced_json).await?;
println!("{}", output.display());

Ok(())
}

async fn run_debug_prompt_input_command(
cmd: DebugPromptInputCommand,
root_config_overrides: CliConfigOverrides,
Expand Down
2 changes: 1 addition & 1 deletion codex-rs/core/src/codex_delegate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ pub(crate) async fn run_codex_thread_interactive(
inherited_shell_snapshot: None,
user_shell_override: None,
inherited_exec_policy: Some(Arc::clone(&parent_session.services.exec_policy)),
inherited_rollout_trace: codex_rollout_trace::RolloutTraceRecorder::disabled(),
parent_rollout_thread_trace: codex_rollout_trace::ThreadTraceContext::disabled(),
parent_trace: None,
analytics_events_client: Some(parent_session.services.analytics_events_client.clone()),
thread_store: Arc::clone(&parent_session.services.thread_store),
Expand Down
3 changes: 1 addition & 2 deletions codex-rs/core/src/compact_remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,7 @@ async fn run_remote_compact_task_inner_impl(
let context_compaction_item = ContextCompactionItem::new();
// Use the UI compaction item ID as the trace compaction ID so protocol lifecycle events,
// endpoint attempts, and the installed history checkpoint all have one join key.
let compaction_trace = sess.services.rollout_trace.compaction_trace_context(
sess.conversation_id,
let compaction_trace = sess.services.rollout_thread_trace.compaction_trace_context(
turn_context.sub_id.as_str(),
context_compaction_item.id.as_str(),
turn_context.model_info.slug.as_str(),
Expand Down
3 changes: 3 additions & 0 deletions codex-rs/core/src/session/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,9 @@ pub async fn shutdown(sess: &Arc<Session>, sub_id: String) -> bool {
msg: EventMsg::ShutdownComplete,
};
sess.send_event_raw(event).await;
sess.services
.rollout_thread_trace
.record_ended(codex_rollout_trace::RolloutStatus::Completed);
true
}

Expand Down
54 changes: 47 additions & 7 deletions codex-rs/core/src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,9 @@ use codex_protocol::request_user_input::RequestUserInputArgs;
use codex_protocol::request_user_input::RequestUserInputResponse;
use codex_rmcp_client::ElicitationResponse;
use codex_rollout::state_db;
use codex_rollout_trace::RolloutTraceRecorder;
use codex_rollout_trace::AgentResultTracePayload;
use codex_rollout_trace::ThreadStartedTraceMetadata;
use codex_rollout_trace::ThreadTraceContext;
use codex_sandboxing::policy_transforms::intersect_permission_profiles;
use codex_shell_command::parse_command::parse_command;
use codex_terminal_detection::user_agent;
Expand Down Expand Up @@ -402,8 +403,11 @@ pub(crate) struct CodexSpawnArgs {
pub(crate) metrics_service_name: Option<String>,
pub(crate) inherited_shell_snapshot: Option<Arc<ShellSnapshot>>,
pub(crate) inherited_exec_policy: Option<Arc<ExecPolicyManager>>,
/// Parent rollout-tree recorder, or a disabled recorder when this spawn has no parent trace.
pub(crate) inherited_rollout_trace: RolloutTraceRecorder,
/// Parent rollout trace used only to derive fresh spawned child traces.
///
/// Root sessions and non-thread-spawn subagents pass a disabled context;
/// `Session::new` creates the root trace itself when rollout tracing is enabled.
pub(crate) parent_rollout_thread_trace: ThreadTraceContext,
pub(crate) user_shell_override: Option<shell::Shell>,
pub(crate) parent_trace: Option<W3cTraceContext>,
pub(crate) analytics_events_client: Option<AnalyticsEventsClient>,
Expand Down Expand Up @@ -460,7 +464,7 @@ impl Codex {
inherited_shell_snapshot,
user_shell_override,
inherited_exec_policy,
inherited_rollout_trace,
parent_rollout_thread_trace,
parent_trace: _,
analytics_events_client,
thread_store,
Expand Down Expand Up @@ -667,7 +671,7 @@ impl Codex {
environment_manager,
analytics_events_client,
thread_store,
inherited_rollout_trace,
parent_rollout_thread_trace,
)
.await
.map_err(|e| {
Expand Down Expand Up @@ -1448,6 +1452,12 @@ impl Session {
/// Persist the event to rollout and send it to clients.
pub(crate) async fn send_event(&self, turn_context: &TurnContext, msg: EventMsg) {
let legacy_source = msg.clone();
self.services
.rollout_thread_trace
.record_codex_turn_event(&turn_context.sub_id, &legacy_source);
self.services
.rollout_thread_trace
.record_tool_call_event(turn_context.sub_id.clone(), &legacy_source);
let event = Event {
id: turn_context.sub_id.clone(),
msg,
Expand Down Expand Up @@ -1500,13 +1510,19 @@ impl Session {
return;
}

self.forward_child_completion_to_parent(*parent_thread_id, child_agent_path, status)
.await;
self.forward_child_completion_to_parent(
turn_context,
*parent_thread_id,
child_agent_path,
status,
)
.await;
}

/// Sends the standard completion envelope from a spawned MultiAgentV2 child to its parent.
async fn forward_child_completion_to_parent(
&self,
turn_context: &TurnContext,
parent_thread_id: ThreadId,
child_agent_path: &codex_protocol::AgentPath,
status: AgentStatus,
Expand All @@ -1520,6 +1536,13 @@ impl Session {
};

let message = format_subagent_notification_message(child_agent_path.as_str(), &status);
// `communication` owns the message. Keep a second copy only when the
// recorder will actually need it after parent delivery succeeds.
let trace_message = self
.services
.rollout_thread_trace
.is_enabled()
.then(|| message.clone());
let communication = InterAgentCommunication::new(
child_agent_path.clone(),
parent_agent_path,
Expand All @@ -1534,6 +1557,20 @@ impl Session {
.await
{
debug!("failed to notify parent thread {parent_thread_id}: {err}");
return;
}
if let Some(message) = trace_message {
self.services
.rollout_thread_trace
.record_agent_result_interaction(
turn_context.sub_id.as_str(),
parent_thread_id,
&AgentResultTracePayload {
child_agent_path: child_agent_path.as_str(),
message: &message,
status: &status,
},
);
}
}

Expand Down Expand Up @@ -1565,6 +1602,9 @@ impl Session {
// Persist the event into rollout storage (the store filters as needed).
let rollout_items = vec![RolloutItem::EventMsg(event.msg.clone())];
self.persist_rollout_items(&rollout_items).await;
self.services
.rollout_thread_trace
.record_protocol_event(&event.msg);
self.deliver_event_raw(event).await;
}

Expand Down
17 changes: 8 additions & 9 deletions codex-rs/core/src/session/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ impl Session {
environment_manager: Arc<EnvironmentManager>,
analytics_events_client: Option<AnalyticsEventsClient>,
thread_store: Arc<dyn ThreadStore>,
inherited_rollout_trace: RolloutTraceRecorder,
parent_rollout_thread_trace: ThreadTraceContext,
) -> anyhow::Result<Arc<Self>> {
debug!(
"Configuring session: model={}; provider={:?}",
Expand Down Expand Up @@ -449,18 +449,17 @@ impl Session {
approval_policy: session_configuration.approval_policy.value().to_string(),
sandbox_policy: format!("{:?}", session_configuration.sandbox_policy.get()),
};
let rollout_trace = if matches!(
let rollout_thread_trace = if matches!(
session_configuration.session_source,
SessionSource::SubAgent(SubAgentSource::ThreadSpawn { .. })
) {
// Spawned child threads are part of their root rollout tree. If
// the parent had no trace recorder, do not create an orphan child
// bundle that looks like an independent rollout.
inherited_rollout_trace
// Spawned child threads are part of their root rollout tree. If the
// parent had no trace bundle, do not create an orphan child bundle
// that looks like an independent rollout.
parent_rollout_thread_trace.start_child_thread_trace_or_disabled(trace_metadata)
} else {
RolloutTraceRecorder::create_root_or_disabled(conversation_id)
ThreadTraceContext::start_root_or_disabled(trace_metadata)
};
rollout_trace.record_thread_started(trace_metadata);

let mut post_session_configured_events = Vec::<Event>::new();

Expand Down Expand Up @@ -739,7 +738,7 @@ impl Session {
main_execve_wrapper_exe: config.main_execve_wrapper_exe.clone(),
analytics_events_client,
hooks,
rollout_trace,
rollout_thread_trace,
user_shell: Arc::new(default_shell),
shell_snapshot_tx,
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
Expand Down
8 changes: 4 additions & 4 deletions codex-rs/core/src/session/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3150,7 +3150,7 @@ async fn session_new_fails_when_zsh_fork_enabled_without_zsh_path() {
Arc::new(codex_thread_store::LocalThreadStore::new(
codex_rollout::RolloutConfig::from_view(config.as_ref()),
)),
RolloutTraceRecorder::disabled(),
codex_rollout_trace::ThreadTraceContext::disabled(),
)
.await;

Expand Down Expand Up @@ -3272,7 +3272,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
legacy_notify_argv: config.notify.clone(),
..HooksConfig::default()
}),
rollout_trace: RolloutTraceRecorder::disabled(),
rollout_thread_trace: codex_rollout_trace::ThreadTraceContext::disabled(),
user_shell: Arc::new(default_user_shell()),
shell_snapshot_tx: watch::channel(None).0,
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
Expand Down Expand Up @@ -3472,7 +3472,7 @@ async fn make_session_with_config_and_rx(
Arc::new(codex_thread_store::LocalThreadStore::new(
codex_rollout::RolloutConfig::from_view(config.as_ref()),
)),
RolloutTraceRecorder::disabled(),
codex_rollout_trace::ThreadTraceContext::disabled(),
)
.await?;

Expand Down Expand Up @@ -4588,7 +4588,7 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx(
legacy_notify_argv: config.notify.clone(),
..HooksConfig::default()
}),
rollout_trace: RolloutTraceRecorder::disabled(),
rollout_thread_trace: codex_rollout_trace::ThreadTraceContext::disabled(),
user_shell: Arc::new(default_user_shell()),
shell_snapshot_tx: watch::channel(None).0,
show_raw_agent_reasoning: config.show_raw_agent_reasoning,
Expand Down
2 changes: 1 addition & 1 deletion codex-rs/core/src/session/tests/guardian_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -772,7 +772,7 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() {
metrics_service_name: None,
inherited_shell_snapshot: None,
inherited_exec_policy: Some(Arc::new(parent_exec_policy)),
inherited_rollout_trace: RolloutTraceRecorder::disabled(),
parent_rollout_thread_trace: codex_rollout_trace::ThreadTraceContext::disabled(),
user_shell_override: None,
parent_trace: None,
analytics_events_client: None,
Expand Down
3 changes: 1 addition & 2 deletions codex-rs/core/src/session/turn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1875,8 +1875,7 @@ async fn try_run_sampling_request(
auth_mode = sess.services.auth_manager.auth_mode(),
features = sess.features.enabled_features(),
);
let inference_trace = sess.services.rollout_trace.inference_trace_context(
sess.conversation_id,
let inference_trace = sess.services.rollout_thread_trace.inference_trace_context(
turn_context.sub_id.as_str(),
turn_context.model_info.slug.as_str(),
turn_context.provider.info().name.as_str(),
Expand Down
4 changes: 2 additions & 2 deletions codex-rs/core/src/state/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use codex_mcp::McpConnectionManager;
use codex_models_manager::manager::ModelsManager;
use codex_otel::SessionTelemetry;
use codex_rollout::state_db::StateDbHandle;
use codex_rollout_trace::RolloutTraceRecorder;
use codex_rollout_trace::ThreadTraceContext;
use codex_thread_store::LiveThread;
use codex_thread_store::ThreadStore;
use std::path::PathBuf;
Expand All @@ -43,7 +43,7 @@ pub(crate) struct SessionServices {
pub(crate) main_execve_wrapper_exe: Option<PathBuf>,
pub(crate) analytics_events_client: AnalyticsEventsClient,
pub(crate) hooks: Hooks,
pub(crate) rollout_trace: RolloutTraceRecorder,
pub(crate) rollout_thread_trace: ThreadTraceContext,
pub(crate) user_shell: Arc<crate::shell::Shell>,
pub(crate) shell_snapshot_tx: watch::Sender<Option<Arc<crate::shell_snapshot::ShellSnapshot>>>,
pub(crate) show_raw_agent_reasoning: bool,
Expand Down
Loading
Loading