diff --git a/codex-rs/cli/src/main.rs b/codex-rs/cli/src/main.rs index f818b743e919..3b8e76796d51 100644 --- a/codex-rs/cli/src/main.rs +++ b/codex-rs/cli/src/main.rs @@ -1079,6 +1079,12 @@ fn merge_interactive_cli_flags(interactive: &mut TuiCli, subcommand_cli: TuiCli) if !subcommand_cli.add_dir.is_empty() { interactive.add_dir.extend(subcommand_cli.add_dir); } + if subcommand_cli.event_stream.is_some() { + interactive.event_stream = subcommand_cli.event_stream; + } + if subcommand_cli.event_schema_version.is_some() { + interactive.event_schema_version = subcommand_cli.event_schema_version; + } if let Some(prompt) = subcommand_cli.prompt { // Normalize CRLF/CR to LF so CLI-provided text can't leak `\r` into TUI state. interactive.prompt = Some(prompt.replace("\r\n", "\n").replace('\r', "\n")); diff --git a/codex-rs/tui/src/cli.rs b/codex-rs/tui/src/cli.rs index 86bea97abe5b..718a5ffd243c 100644 --- a/codex-rs/tui/src/cli.rs +++ b/codex-rs/tui/src/cli.rs @@ -110,6 +110,18 @@ pub struct Cli { #[arg(long = "no-alt-screen", default_value_t = false)] pub no_alt_screen: bool, + /// Emit structured JSONL lifecycle events to a file path, or `-` for stdout. + #[arg(long = "event-stream", value_name = "PATH_OR_STDOUT")] + pub event_stream: Option, + + /// Pin the event stream schema version for compatibility. + #[arg( + long = "event-schema-version", + value_name = "INT", + requires = "event_stream" + )] + pub event_schema_version: Option, + #[clap(skip)] pub config_overrides: CliConfigOverrides, } diff --git a/codex-rs/tui/src/lib.rs b/codex-rs/tui/src/lib.rs index 8dfdbcd87179..01379bb46cc5 100644 --- a/codex-rs/tui/src/lib.rs +++ b/codex-rs/tui/src/lib.rs @@ -577,8 +577,14 @@ async fn run_ratatui_app( } } - // Initialize high-fidelity session event logging if enabled. - session_log::maybe_init(&initial_config); + // Initialize session event logging if enabled. + if let Err(err) = session_log::maybe_init(&initial_config, &cli) { + restore(); + let _ = tui.terminal.clear(); + return Ok(AppExitInfo::fatal(format!( + "Failed to initialize session event logging: {err}" + ))); + } let auth_manager = AuthManager::shared( initial_config.codex_home.clone(), diff --git a/codex-rs/tui/src/session_log.rs b/codex-rs/tui/src/session_log.rs index e512c2f3cd69..c58c3f0d0504 100644 --- a/codex-rs/tui/src/session_log.rs +++ b/codex-rs/tui/src/session_log.rs @@ -1,5 +1,6 @@ use std::fs::File; use std::fs::OpenOptions; +use std::io::IsTerminal; use std::io::Write; use std::path::PathBuf; use std::sync::LazyLock; @@ -7,26 +8,52 @@ use std::sync::Mutex; use std::sync::OnceLock; use codex_core::config::Config; +use codex_protocol::ThreadId; +use codex_protocol::protocol::Event; use codex_protocol::protocol::Op; use serde::Serialize; +use serde_json::Value; use serde_json::json; +use crate::Cli; use crate::app_event::AppEvent; +const EVENT_SCHEMA_V1: u32 = 1; +const EVENT_SCHEMA_V2: u32 = 2; +const EVENT_SCHEMA_CURRENT: u32 = EVENT_SCHEMA_V2; + static LOGGER: LazyLock = LazyLock::new(SessionLogger::new); +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +enum LogMode { + Legacy, + EventStream, +} + +#[derive(Debug)] +struct EventStreamState { + schema_version: u32, + seq: u64, + session_epoch: u64, + session_id: Option, +} + struct SessionLogger { - file: OnceLock>, + writer: OnceLock>>, + mode: OnceLock, + event_stream_state: OnceLock>, } impl SessionLogger { fn new() -> Self { Self { - file: OnceLock::new(), + writer: OnceLock::new(), + mode: OnceLock::new(), + event_stream_state: OnceLock::new(), } } - fn open(&self, path: PathBuf) -> std::io::Result<()> { + fn open_legacy(&self, path: PathBuf) -> std::io::Result<()> { let mut opts = OpenOptions::new(); opts.create(true).truncate(true).write(true); @@ -37,52 +64,219 @@ impl SessionLogger { } let file = opts.open(path)?; - self.file.get_or_init(|| Mutex::new(file)); + self.writer.get_or_init(|| Mutex::new(Box::new(file))); + self.mode.get_or_init(|| LogMode::Legacy); Ok(()) } - fn write_json_line(&self, value: serde_json::Value) { - let Some(mutex) = self.file.get() else { + fn open_event_stream(&self, target: &str, schema_version: u32) -> std::io::Result<()> { + validate_schema_version(schema_version)?; + + if target == "-" { + // Interactive TUI rendering owns stdout when attached to a TTY. + // Route stream records to stderr in that mode to avoid mixing + // JSONL with terminal control output. + let stream: Box = if std::io::stdout().is_terminal() { + Box::new(std::io::stderr()) + } else { + Box::new(std::io::stdout()) + }; + self.writer.get_or_init(|| Mutex::new(stream)); + } else { + let mut opts = OpenOptions::new(); + opts.create(true).truncate(true).write(true); + + #[cfg(unix)] + { + use std::os::unix::fs::OpenOptionsExt; + opts.mode(0o600); + } + + let file = opts.open(PathBuf::from(target))?; + self.writer.get_or_init(|| Mutex::new(Box::new(file))); + } + + self.mode.get_or_init(|| LogMode::EventStream); + self.event_stream_state.get_or_init(|| { + Mutex::new(EventStreamState { + schema_version, + seq: 0, + session_epoch: 1, + session_id: None, + }) + }); + + Ok(()) + } + + fn mode(&self) -> Option { + self.mode.get().copied() + } + + fn write_json_line(&self, value: &Value) { + let Some(mutex) = self.writer.get() else { return; }; + let mut guard = match mutex.lock() { Ok(g) => g, Err(poisoned) => poisoned.into_inner(), }; - match serde_json::to_string(&value) { + + match serde_json::to_string(value) { Ok(serialized) => { - if let Err(e) = guard.write_all(serialized.as_bytes()) { - tracing::warn!("session log write error: {}", e); + if let Err(err) = guard.write_all(serialized.as_bytes()) { + tracing::warn!("session log write error: {err}"); return; } - if let Err(e) = guard.write_all(b"\n") { - tracing::warn!("session log write error: {}", e); + if let Err(err) = guard.write_all(b"\n") { + tracing::warn!("session log write error: {err}"); return; } - if let Err(e) = guard.flush() { - tracing::warn!("session log flush error: {}", e); + if let Err(err) = guard.flush() { + tracing::warn!("session log flush error: {err}"); } } - Err(e) => tracing::warn!("session log serialize error: {}", e), + Err(err) => tracing::warn!("session log serialize error: {err}"), } } + fn write_event_stream_record( + &self, + event_type: &str, + payload: Value, + session_id_override: Option, + ) { + let Some(state_mutex) = self.event_stream_state.get() else { + return; + }; + + let mut state = match state_mutex.lock() { + Ok(g) => g, + Err(poisoned) => poisoned.into_inner(), + }; + + if event_type == "session_configured" + && let Some(session_id) = payload.get("session_id").and_then(Value::as_str) + { + state.session_id = Some(session_id.to_string()); + } + + state.seq += 1; + let session_id = session_id_override + .or_else(|| state.session_id.clone()) + .unwrap_or_else(|| "unknown".to_string()); + + let record = match state.schema_version { + EVENT_SCHEMA_V1 => json!({ + "schema_version": EVENT_SCHEMA_V1, + "ts": now_ts(), + "session_id": session_id, + "event_type": event_type, + "payload": payload, + }), + EVENT_SCHEMA_V2 => json!({ + "schema_version": EVENT_SCHEMA_V2, + "ts": now_ts(), + "session_id": session_id, + "seq": state.seq, + "session_epoch": state.session_epoch, + "event_type": event_type, + "payload": payload, + }), + _ => return, + }; + + self.write_json_line(&record); + } + + fn write_protocol_event(&self, event: &Event, thread_id_override: Option<&ThreadId>) { + let mut payload = match serde_json::to_value(&event.msg) { + Ok(value) => value, + Err(err) => { + tracing::warn!("event stream serialize error: {err}"); + return; + } + }; + + let event_type = payload + .get("type") + .and_then(Value::as_str) + .unwrap_or("unknown") + .to_string(); + let event_type = normalize_contract_event_type(&event_type); + + if let Some(obj) = payload.as_object_mut() { + obj.remove("type"); + } + + self.write_event_stream_record( + &event_type, + payload, + thread_id_override.map(ToString::to_string), + ); + } + fn is_enabled(&self) -> bool { - self.file.get().is_some() + self.writer.get().is_some() } } fn now_ts() -> String { - // RFC3339 for readability; consumers can parse as needed. chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true) } -pub(crate) fn maybe_init(config: &Config) { +fn validate_schema_version(schema_version: u32) -> std::io::Result<()> { + if matches!(schema_version, EVENT_SCHEMA_V1 | EVENT_SCHEMA_V2) { + return Ok(()); + } + + Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, + format!( + "unsupported event schema version `{schema_version}`; supported versions are {EVENT_SCHEMA_V1} and {EVENT_SCHEMA_V2}" + ), + )) +} + +fn normalize_contract_event_type(event_type: &str) -> String { + match event_type { + "task_started" => "turn_started".to_string(), + "task_complete" => "turn_complete".to_string(), + _ => event_type.to_string(), + } +} + +pub(crate) fn maybe_init(config: &Config, cli: &Cli) -> std::io::Result<()> { + if cli.event_stream.is_some() || cli.event_schema_version.is_some() { + let target = cli.event_stream.clone().ok_or_else(|| { + std::io::Error::new( + std::io::ErrorKind::InvalidInput, + "--event-schema-version requires --event-stream", + ) + })?; + + let schema_version = cli.event_schema_version.unwrap_or(EVENT_SCHEMA_CURRENT); + LOGGER.open_event_stream(&target, schema_version)?; + + LOGGER.write_event_stream_record( + "session_start", + json!({ + "cwd": config.cwd, + "model": config.model, + "model_provider_id": config.model_provider_id, + "model_provider_name": config.model_provider.name, + }), + None, + ); + return Ok(()); + } + let enabled = std::env::var("CODEX_TUI_RECORD_SESSION") .map(|v| matches!(v.as_str(), "1" | "true" | "TRUE" | "yes" | "YES")) .unwrap_or(false); if !enabled { - return; + return Ok(()); } let path = if let Ok(path) = std::env::var("CODEX_TUI_SESSION_LOG_PATH") { @@ -100,13 +294,12 @@ pub(crate) fn maybe_init(config: &Config) { p }; - if let Err(e) = LOGGER.open(path.clone()) { - tracing::error!("failed to open session log {:?}: {}", path, e); - return; + if let Err(err) = LOGGER.open_legacy(path.clone()) { + tracing::error!("failed to open session log {:?}: {}", path, err); + return Ok(()); } - // Write a header record so we can attach context. - let header = json!({ + LOGGER.write_json_line(&json!({ "ts": now_ts(), "dir": "meta", "kind": "session_start", @@ -114,74 +307,77 @@ pub(crate) fn maybe_init(config: &Config) { "model": config.model, "model_provider_id": config.model_provider_id, "model_provider_name": config.model_provider.name, - }); - LOGGER.write_json_line(header); + })); + + Ok(()) } pub(crate) fn log_inbound_app_event(event: &AppEvent) { - // Log only if enabled if !LOGGER.is_enabled() { return; } - match event { - AppEvent::CodexEvent(ev) => { - write_record("to_tui", "codex_event", ev); - } - AppEvent::NewSession => { - let value = json!({ - "ts": now_ts(), - "dir": "to_tui", - "kind": "new_session", - }); - LOGGER.write_json_line(value); - } - AppEvent::ClearUi => { - let value = json!({ - "ts": now_ts(), - "dir": "to_tui", - "kind": "clear_ui", - }); - LOGGER.write_json_line(value); - } - AppEvent::InsertHistoryCell(cell) => { - let value = json!({ - "ts": now_ts(), - "dir": "to_tui", - "kind": "insert_history_cell", - "lines": cell.transcript_lines(u16::MAX).len(), - }); - LOGGER.write_json_line(value); - } - AppEvent::StartFileSearch(query) => { - let value = json!({ - "ts": now_ts(), - "dir": "to_tui", - "kind": "file_search_start", - "query": query, - }); - LOGGER.write_json_line(value); - } - AppEvent::FileSearchResult { query, matches } => { - let value = json!({ - "ts": now_ts(), - "dir": "to_tui", - "kind": "file_search_result", - "query": query, - "matches": matches.len(), - }); - LOGGER.write_json_line(value); - } - // Noise or control flow – record variant only - other => { - let value = json!({ - "ts": now_ts(), - "dir": "to_tui", - "kind": "app_event", - "variant": format!("{other:?}").split('(').next().unwrap_or("app_event"), - }); - LOGGER.write_json_line(value); - } + match LOGGER.mode() { + Some(LogMode::EventStream) => match event { + AppEvent::CodexEvent(ev) => LOGGER.write_protocol_event(ev, None), + AppEvent::ThreadEvent { thread_id, event } => { + LOGGER.write_protocol_event(event, Some(thread_id)) + } + _ => {} + }, + Some(LogMode::Legacy) => match event { + AppEvent::CodexEvent(ev) => { + write_legacy_record("to_tui", "codex_event", ev); + } + AppEvent::NewSession => { + LOGGER.write_json_line(&json!({ + "ts": now_ts(), + "dir": "to_tui", + "kind": "new_session", + })); + } + AppEvent::ClearUi => { + LOGGER.write_json_line(&json!({ + "ts": now_ts(), + "dir": "to_tui", + "kind": "clear_ui", + })); + } + AppEvent::InsertHistoryCell(cell) => { + LOGGER.write_json_line(&json!({ + "ts": now_ts(), + "dir": "to_tui", + "kind": "insert_history_cell", + "lines": cell.transcript_lines(u16::MAX).len(), + })); + } + AppEvent::StartFileSearch(query) => { + LOGGER.write_json_line(&json!({ + "ts": now_ts(), + "dir": "to_tui", + "kind": "file_search_start", + "query": query, + })); + } + AppEvent::FileSearchResult { query, matches } => { + LOGGER.write_json_line(&json!({ + "ts": now_ts(), + "dir": "to_tui", + "kind": "file_search_result", + "query": query, + "matches": matches.len(), + })); + } + other => { + LOGGER.write_json_line(&json!({ + "ts": now_ts(), + "dir": "to_tui", + "kind": "app_event", + "variant": format!("{other:?}").split('(').next().unwrap_or("app_event"), + })); + } + }, + None => {} } } @@ -189,30 +385,106 @@ pub(crate) fn log_outbound_op(op: &Op) { if !LOGGER.is_enabled() { return; } - write_record("from_tui", "op", op); + + match LOGGER.mode() { + Some(LogMode::EventStream) => { + let payload = match serde_json::to_value(op) { + Ok(value) => value, + Err(err) => { + tracing::warn!("event stream serialize error: {err}"); + return; + } + }; + LOGGER.write_event_stream_record("op_submitted", payload, None); + } + Some(LogMode::Legacy) => write_legacy_record("from_tui", "op", op), + None => {} + } } pub(crate) fn log_session_end() { if !LOGGER.is_enabled() { return; } - let value = json!({ - "ts": now_ts(), - "dir": "meta", - "kind": "session_end", - }); - LOGGER.write_json_line(value); + + match LOGGER.mode() { + Some(LogMode::EventStream) => { + LOGGER.write_event_stream_record("session_end", json!({}), None); + } + Some(LogMode::Legacy) => { + LOGGER.write_json_line(&json!({ + "ts": now_ts(), + "dir": "meta", + "kind": "session_end", + })); + } + None => {} + } } -fn write_record(dir: &str, kind: &str, obj: &T) +fn write_legacy_record(dir: &str, kind: &str, obj: &T) where T: Serialize, { - let value = json!({ + LOGGER.write_json_line(&json!({ "ts": now_ts(), "dir": dir, "kind": kind, "payload": obj, - }); - LOGGER.write_json_line(value); + })); +} + +#[cfg(test)] +mod tests { + use serde_json::Value; + + use super::EVENT_SCHEMA_CURRENT; + use super::EVENT_SCHEMA_V1; + use super::EVENT_SCHEMA_V2; + use super::normalize_contract_event_type; + use super::validate_schema_version; + + fn assert_common_fields(record: &Value) { + assert!(record.get("schema_version").and_then(Value::as_u64).is_some()); + assert!(record.get("ts").and_then(Value::as_str).is_some()); + assert!(record.get("session_id").and_then(Value::as_str).is_some()); + assert!(record.get("event_type").and_then(Value::as_str).is_some()); + assert!(record.get("payload").is_some()); + assert!(record.get("payload").is_some_and(Value::is_object)); + } + + #[test] + fn accepts_previous_schema_fixture() { + let raw = include_str!("../tests/fixtures/event_stream/v1_turn_started.json"); + let record: Value = serde_json::from_str(raw).expect("fixture parses"); + assert_common_fields(&record); + assert_eq!(record["schema_version"], Value::from(EVENT_SCHEMA_V1)); + assert!(record.get("seq").is_none()); + assert!(record.get("session_epoch").is_none()); + } + + #[test] + fn accepts_current_schema_fixture() { + let raw = include_str!("../tests/fixtures/event_stream/v2_turn_started.json"); + let record: Value = serde_json::from_str(raw).expect("fixture parses"); + assert_common_fields(&record); + assert_eq!(record["schema_version"], Value::from(EVENT_SCHEMA_CURRENT)); + assert!(record.get("seq").and_then(Value::as_u64).is_some()); + assert!(record.get("session_epoch").and_then(Value::as_u64).is_some()); + } + + #[test] + fn rejects_unsupported_schema_versions() { + assert!(validate_schema_version(0).is_err()); + assert!(validate_schema_version(EVENT_SCHEMA_V2 + 1).is_err()); + assert!(validate_schema_version(EVENT_SCHEMA_V1).is_ok()); + assert!(validate_schema_version(EVENT_SCHEMA_V2).is_ok()); + } + + #[test] + fn normalizes_legacy_turn_event_names() { + assert_eq!(normalize_contract_event_type("task_started"), "turn_started"); + assert_eq!(normalize_contract_event_type("task_complete"), "turn_complete"); + assert_eq!(normalize_contract_event_type("exec_command_begin"), "exec_command_begin"); + } } diff --git a/codex-rs/tui/tests/fixtures/event_stream/v1_turn_started.json b/codex-rs/tui/tests/fixtures/event_stream/v1_turn_started.json new file mode 100644 index 000000000000..6be9135387ec --- /dev/null +++ b/codex-rs/tui/tests/fixtures/event_stream/v1_turn_started.json @@ -0,0 +1 @@ +{"schema_version":1,"ts":"2026-03-01T00:00:00.000Z","session_id":"01957117-d96d-7f17-af57-af28ec451245","event_type":"turn_started","payload":{"turn_id":"turn_001","model_context_window":128000,"collaboration_mode_kind":"none"}} diff --git a/codex-rs/tui/tests/fixtures/event_stream/v2_turn_started.json b/codex-rs/tui/tests/fixtures/event_stream/v2_turn_started.json new file mode 100644 index 000000000000..77339717f200 --- /dev/null +++ b/codex-rs/tui/tests/fixtures/event_stream/v2_turn_started.json @@ -0,0 +1 @@ +{"schema_version":2,"ts":"2026-03-01T00:00:00.000Z","session_id":"01957117-d96d-7f17-af57-af28ec451245","seq":42,"session_epoch":1,"event_type":"turn_started","payload":{"turn_id":"turn_001","model_context_window":128000,"collaboration_mode_kind":"none"}} diff --git a/docs/event-stream-contract.md b/docs/event-stream-contract.md new file mode 100644 index 000000000000..a1491ca1df28 --- /dev/null +++ b/docs/event-stream-contract.md @@ -0,0 +1,75 @@ +# Interactive Event Stream Contract + +Codex interactive mode supports an explicit JSONL event stream contract intended for machine consumers. + +## CLI Surface + +- `--event-stream ` + - Writes newline-delimited JSON records to a file path. + - Use `-` to emit records to stdio: + - non-TTY stdout: stream writes to stdout + - interactive TTY stdout: stream writes to stderr to avoid mixing JSONL with TUI control output +- `--event-schema-version ` + - Pins schema parsing/serialization behavior to a supported version. + - Currently supported: `1`, `2`. + - Current default if omitted: `2`. + +## Record Shape + +All schema versions include the following required top-level fields: + +- `schema_version` (number) +- `ts` (RFC3339 timestamp string, UTC) +- `session_id` (string) +- `event_type` (string) +- `payload` (object) + +`event_type` and `payload` are derived from protocol `EventMsg` values. Payload values keep parity with protocol event field names and types. + +## Version Details + +### Schema `1` + +```json +{ + "schema_version": 1, + "ts": "2026-03-01T00:00:00.000Z", + "session_id": "01957117-d96d-7f17-af57-af28ec451245", + "event_type": "turn_started", + "payload": { + "turn_id": "turn_001", + "model_context_window": 128000, + "collaboration_mode_kind": "none" + } +} +``` + +### Schema `2` (current) + +Schema `2` adds additive ordering metadata: + +- `seq` (monotonic per-process stream sequence) +- `session_epoch` (stream epoch, currently `1`) + +```json +{ + "schema_version": 2, + "ts": "2026-03-01T00:00:00.000Z", + "session_id": "01957117-d96d-7f17-af57-af28ec451245", + "seq": 42, + "session_epoch": 1, + "event_type": "turn_started", + "payload": { + "turn_id": "turn_001", + "model_context_window": 128000, + "collaboration_mode_kind": "none" + } +} +``` + +## Compatibility Policy + +- Additive-only field additions are allowed within a schema version. +- Field removals, renames, or type changes require a schema version bump. +- Consumers should pin `--event-schema-version` to guarantee deterministic parsing. +- Compatibility tests cover at least previous and current schema fixtures.