diff --git a/codex-rs/exec-server/src/local_process.rs b/codex-rs/exec-server/src/local_process.rs index 6446c365075a..bc9b2ba2042d 100644 --- a/codex-rs/exec-server/src/local_process.rs +++ b/codex-rs/exec-server/src/local_process.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; use std::collections::VecDeque; +use std::collections::hash_map::Entry; use std::sync::Arc; use std::time::Duration; @@ -628,16 +629,7 @@ async fn watch_exit( .await; } - maybe_emit_closed(process_id.clone(), Arc::clone(&inner)).await; - - tokio::time::sleep(EXITED_PROCESS_RETENTION).await; - let mut processes = inner.processes.lock().await; - if matches!( - processes.get(&process_id), - Some(ProcessEntry::Running(process)) if process.exit_code == Some(exit_code) - ) { - processes.remove(&process_id); - } + maybe_emit_closed(process_id, Arc::clone(&inner)).await; } async fn finish_output_stream(process_id: ProcessId, inner: Arc) { @@ -656,7 +648,7 @@ async fn finish_output_stream(process_id: ProcessId, inner: Arc) { } async fn maybe_emit_closed(process_id: ProcessId, inner: Arc) { - let notification = { + let (notification, output_notify) = { let mut processes = inner.processes.lock().await; let Some(ProcessEntry::Running(process)) = processes.get_mut(&process_id) else { return; @@ -671,15 +663,30 @@ async fn maybe_emit_closed(process_id: ProcessId, inner: Arc) { process.next_seq += 1; let _ = process.wake_tx.send(seq); process.events.publish(ExecProcessEvent::Closed { seq }); - Some(ExecClosedNotification { - process_id: process_id.clone(), - seq, - }) + ( + ExecClosedNotification { + process_id: process_id.clone(), + seq, + }, + Arc::clone(&process.output_notify), + ) }; - let Some(notification) = notification else { - return; - }; + output_notify.notify_waiters(); + let cleanup_process_id = process_id.clone(); + let cleanup_inner = Arc::clone(&inner); + tokio::spawn(async move { + tokio::time::sleep(EXITED_PROCESS_RETENTION).await; + let mut processes = cleanup_inner.processes.lock().await; + match processes.entry(cleanup_process_id) { + Entry::Occupied(entry) => { + if matches!(entry.get(), ProcessEntry::Running(process) if process.closed) { + entry.remove(); + } + } + Entry::Vacant(_) => {} + } + }); if let Some(notifications) = notification_sender(&inner) { let _ = notifications @@ -700,6 +707,10 @@ fn notification_sender(inner: &Inner) -> Option { mod tests { use super::*; use codex_config::types::ShellEnvironmentPolicyInherit; + use codex_utils_pty::ProcessDriver; + use pretty_assertions::assert_eq; + use tokio::sync::oneshot; + use tokio::time::timeout; fn test_exec_params(env: HashMap) -> ExecParams { ExecParams { @@ -748,4 +759,222 @@ mod tests { assert_eq!(child_env(¶ms), expected); } + + #[tokio::test] + async fn exited_process_retains_late_output_past_retention() { + let backend = LocalProcess::default(); + let mut process = spawn_test_process(&backend, "proc-late-output").await; + + process.exit(/*exit_code*/ 0); + let exit_response = + read_process_until_change(&backend, &process.process_id, /*after_seq*/ None).await; + assert_eq!( + exit_response, + ReadResponse { + chunks: Vec::new(), + next_seq: 2, + exited: true, + exit_code: Some(0), + closed: false, + failure: None, + } + ); + + tokio::time::sleep(EXITED_PROCESS_RETENTION + Duration::from_millis(10)).await; + process + .stdout_tx + .send(b"late output after retention\n".to_vec()) + .await + .expect("send late stdout"); + + let late_response = + read_process_until_change(&backend, &process.process_id, /*after_seq*/ Some(1)).await; + assert_eq!( + late_response.chunks, + vec![ProcessOutputChunk { + seq: 2, + stream: ExecOutputStream::Stdout, + chunk: b"late output after retention\n".to_vec().into(), + }] + ); + assert_eq!(late_response.exit_code, Some(0)); + assert!(!late_response.closed); + + drop(process.stdout_tx); + drop(process.stderr_tx); + let _closed_response = timeout( + Duration::from_secs(1), + read_process_until_closed(&backend, &process.process_id), + ) + .await + .expect("process should close"); + backend.shutdown().await; + } + + #[tokio::test] + async fn closed_process_is_evicted_after_retention() { + let backend = LocalProcess::default(); + let mut process = spawn_test_process(&backend, "proc-closed-eviction").await; + let process_id = process.process_id.clone(); + + process.exit(/*exit_code*/ 0); + drop(process.stdout_tx); + drop(process.stderr_tx); + + let closed_response = timeout( + Duration::from_secs(1), + read_process_until_closed(&backend, &process_id), + ) + .await + .expect("process should close"); + assert!(closed_response.closed); + + timeout(Duration::from_secs(1), async { + loop { + { + let processes = backend.inner.processes.lock().await; + if !processes.contains_key(&process_id) { + break; + } + } + tokio::time::sleep(Duration::from_millis(5)).await; + } + }) + .await + .expect("closed process should be evicted"); + backend.shutdown().await; + } + + struct TestProcess { + process_id: ProcessId, + stdout_tx: mpsc::Sender>, + stderr_tx: mpsc::Sender>, + exit_tx: Option>, + } + + impl TestProcess { + fn exit(&mut self, exit_code: i32) { + self.exit_tx + .take() + .expect("process should not have exited") + .send(exit_code) + .expect("send process exit"); + } + } + + async fn spawn_test_process(backend: &LocalProcess, process_id: &str) -> TestProcess { + let process_id = ProcessId::from(process_id); + let (stdout_tx, stdout_rx) = mpsc::channel(16); + let (stderr_tx, stderr_rx) = mpsc::channel(16); + let (exit_tx, exit_rx) = oneshot::channel(); + let output_notify = Arc::new(Notify::new()); + let (wake_tx, _wake_rx) = watch::channel(0); + let events = ExecProcessEventLog::new( + PROCESS_EVENT_CHANNEL_CAPACITY, + RETAINED_OUTPUT_BYTES_PER_PROCESS, + ); + + let mut processes = backend.inner.processes.lock().await; + let previous = processes.insert( + process_id.clone(), + ProcessEntry::Running(Box::new(RunningProcess { + session: dummy_session(), + tty: false, + pipe_stdin: false, + output: VecDeque::new(), + retained_bytes: 0, + next_seq: 1, + exit_code: None, + wake_tx: wake_tx.clone(), + events: events.clone(), + output_notify: Arc::clone(&output_notify), + open_streams: 2, + closed: false, + })), + ); + assert!(previous.is_none()); + drop(processes); + + tokio::spawn(stream_output( + process_id.clone(), + ExecOutputStream::Stdout, + stdout_rx, + Arc::clone(&backend.inner), + Arc::clone(&output_notify), + )); + tokio::spawn(stream_output( + process_id.clone(), + ExecOutputStream::Stderr, + stderr_rx, + Arc::clone(&backend.inner), + Arc::clone(&output_notify), + )); + tokio::spawn(watch_exit( + process_id.clone(), + exit_rx, + Arc::clone(&backend.inner), + output_notify, + )); + + TestProcess { + process_id, + stdout_tx, + stderr_tx, + exit_tx: Some(exit_tx), + } + } + + fn dummy_session() -> ExecCommandSession { + let (writer_tx, _writer_rx) = mpsc::channel(1); + let (_stdout_tx, stdout_rx) = tokio::sync::broadcast::channel(1); + let (_stderr_tx, stderr_rx) = tokio::sync::broadcast::channel(1); + let (_exit_tx, exit_rx) = oneshot::channel(); + + codex_utils_pty::spawn_from_driver(ProcessDriver { + writer_tx, + stdout_rx, + stderr_rx: Some(stderr_rx), + exit_rx, + terminator: None, + writer_handle: None, + resizer: None, + }) + .session + } + + async fn read_process_until_change( + backend: &LocalProcess, + process_id: &ProcessId, + after_seq: Option, + ) -> ReadResponse { + timeout( + Duration::from_secs(1), + backend.exec_read(ReadParams { + process_id: process_id.clone(), + after_seq, + max_bytes: None, + wait_ms: Some(1_000), + }), + ) + .await + .expect("process read should finish") + .expect("process read") + } + + async fn read_process_until_closed( + backend: &LocalProcess, + process_id: &ProcessId, + ) -> ReadResponse { + let mut after_seq = None; + loop { + let response = read_process_until_change(backend, process_id, after_seq).await; + if response.closed { + return response; + } + for chunk in &response.chunks { + after_seq = Some(chunk.seq); + } + after_seq = response.next_seq.checked_sub(1).or(after_seq); + } + } } diff --git a/codex-rs/exec-server/src/server/handler/tests.rs b/codex-rs/exec-server/src/server/handler/tests.rs index e4c743fa2bbc..6b632fe8909b 100644 --- a/codex-rs/exec-server/src/server/handler/tests.rs +++ b/codex-rs/exec-server/src/server/handler/tests.rs @@ -170,13 +170,12 @@ async fn long_poll_read_fails_after_session_resume() { .expect("initialize"); first_handler.initialized().expect("initialized"); + // Keep the process quiet and alive so the pending read can only complete + // after session resume, not because the process produced output or exited. first_handler .exec(exec_params_with_argv( "proc-long-poll", - shell_argv( - "sleep 0.1; printf resumed", - "ping -n 2 127.0.0.1 >NUL && echo resumed", - ), + shell_argv("sleep 5", "ping -n 6 127.0.0.1 >NUL"), )) .await .expect("start process"); diff --git a/codex-rs/exec-server/tests/common/mod.rs b/codex-rs/exec-server/tests/common/mod.rs index c206d8b97274..387edf36db77 100644 --- a/codex-rs/exec-server/tests/common/mod.rs +++ b/codex-rs/exec-server/tests/common/mod.rs @@ -1,5 +1,10 @@ use std::env; +use std::io::Write; +use std::path::Path; use std::path::PathBuf; +use std::process::Command; +use std::process::Stdio; +use std::time::Duration; use codex_exec_server::CODEX_FS_HELPER_ARG1; use codex_exec_server::ExecServerRuntimePaths; @@ -11,6 +16,11 @@ use ctor::ctor; pub(crate) mod exec_server; +pub(crate) const DELAYED_OUTPUT_AFTER_EXIT_PARENT_ARG: &str = + "--codex-test-delayed-output-after-exit-parent"; + +const DELAYED_OUTPUT_AFTER_EXIT_CHILD_ARG: &str = "--codex-test-delayed-output-after-exit-child"; + #[ctor] pub static TEST_BINARY_DISPATCH_GUARD: Option = { let guard = configure_test_binary_dispatch("codex-exec-server-tests", |exe_name, argv1| { @@ -22,6 +32,7 @@ pub static TEST_BINARY_DISPATCH_GUARD: Option = { } TestBinaryDispatchMode::InstallAliases }); + maybe_run_delayed_output_after_exit_from_test_binary(); maybe_run_exec_server_from_test_binary(guard.as_ref()); guard }; @@ -39,6 +50,82 @@ pub(crate) fn current_test_binary_helper_paths() -> anyhow::Result<(PathBuf, Opt Ok((current_exe, codex_linux_sandbox_exe)) } +fn maybe_run_delayed_output_after_exit_from_test_binary() { + let mut args = env::args(); + let _program = args.next(); + let Some(command) = args.next() else { + return; + }; + match command.as_str() { + DELAYED_OUTPUT_AFTER_EXIT_PARENT_ARG => { + let release_path = next_release_path_arg(args); + run_delayed_output_after_exit_parent(&release_path); + } + DELAYED_OUTPUT_AFTER_EXIT_CHILD_ARG => { + let release_path = next_release_path_arg(args); + run_delayed_output_after_exit_child(&release_path); + } + _ => {} + } +} + +fn next_release_path_arg(mut args: impl Iterator) -> PathBuf { + let Some(release_path) = args.next() else { + eprintln!("expected release path"); + std::process::exit(1); + }; + if args.next().is_some() { + eprintln!("unexpected extra arguments"); + std::process::exit(1); + } + PathBuf::from(release_path) +} + +fn run_delayed_output_after_exit_parent(release_path: &Path) { + let current_exe = match env::current_exe() { + Ok(current_exe) => current_exe, + Err(error) => { + eprintln!("failed to resolve current test binary: {error}"); + std::process::exit(1); + } + }; + match Command::new(current_exe) + .arg(DELAYED_OUTPUT_AFTER_EXIT_CHILD_ARG) + .arg(release_path) + .stdin(Stdio::null()) + .spawn() + { + Ok(_) => std::process::exit(0), + Err(error) => { + eprintln!("failed to spawn delayed output child: {error}"); + std::process::exit(1); + } + } +} + +fn run_delayed_output_after_exit_child(release_path: &Path) { + for _ in 0..1_000 { + if release_path.exists() { + let mut stdout = std::io::stdout().lock(); + if let Err(error) = writeln!(stdout, "late output after exit") { + eprintln!("failed to write delayed output: {error}"); + std::process::exit(1); + } + if let Err(error) = stdout.flush() { + eprintln!("failed to flush delayed output: {error}"); + std::process::exit(1); + } + std::process::exit(0); + } + std::thread::sleep(Duration::from_millis(10)); + } + eprintln!( + "timed out waiting for release path {}", + release_path.display() + ); + std::process::exit(1); +} + fn maybe_run_exec_server_from_test_binary(guard: Option<&TestBinaryDispatchGuard>) { let mut args = env::args(); let _program = args.next(); diff --git a/codex-rs/exec-server/tests/exec_process.rs b/codex-rs/exec-server/tests/exec_process.rs index 9972cc004a78..e1f330fc4ed0 100644 --- a/codex-rs/exec-server/tests/exec_process.rs +++ b/codex-rs/exec-server/tests/exec_process.rs @@ -17,11 +17,14 @@ use codex_exec_server::ReadResponse; use codex_exec_server::StartedExecProcess; use codex_exec_server::WriteStatus; use pretty_assertions::assert_eq; +use tempfile::TempDir; use test_case::test_case; use tokio::sync::watch; use tokio::time::Duration; use tokio::time::timeout; +use common::DELAYED_OUTPUT_AFTER_EXIT_PARENT_ARG; +use common::current_test_binary_helper_paths; use common::exec_server::ExecServerHarness; use common::exec_server::exec_server; @@ -320,6 +323,81 @@ async fn assert_exec_process_replays_events_after_close(use_remote: bool) -> Res Ok(()) } +async fn assert_exec_process_retains_output_after_exit_until_streams_close( + use_remote: bool, +) -> Result<()> { + let context = create_process_context(use_remote).await?; + let (helper_binary, _) = current_test_binary_helper_paths()?; + let release_dir = TempDir::new()?; + let release_path = release_dir.path().join("release-delayed-output"); + let process_id = "proc-output-after-exit".to_string(); + let session = context + .backend + .start(ExecParams { + process_id: process_id.clone().into(), + argv: vec![ + helper_binary.to_string_lossy().into_owned(), + DELAYED_OUTPUT_AFTER_EXIT_PARENT_ARG.to_string(), + release_path.to_string_lossy().into_owned(), + ], + cwd: std::env::current_dir()?, + env_policy: /*env_policy*/ None, + env: Default::default(), + tty: false, + pipe_stdin: false, + arg0: None, + }) + .await?; + assert_eq!(session.process.process_id().as_str(), process_id); + + let StartedExecProcess { process } = session; + + let exit_response = timeout( + Duration::from_secs(2), + process.read( + /*after_seq*/ None, + /*max_bytes*/ None, + /*wait_ms*/ Some(2_000), + ), + ) + .await??; + assert!( + exit_response.chunks.is_empty(), + "parent should exit before child writes delayed output" + ); + assert_eq!(exit_response.exit_code, Some(0)); + assert!(!exit_response.closed); + let exit_seq = exit_response + .next_seq + .checked_sub(1) + .context("exit response should advance next_seq")?; + std::fs::write(&release_path, b"go")?; + + let late_response = timeout( + Duration::from_secs(2), + process.read( + /*after_seq*/ Some(exit_seq), + /*max_bytes*/ None, + /*wait_ms*/ Some(2_000), + ), + ) + .await??; + let mut late_output = String::new(); + for chunk in late_response.chunks { + assert_eq!(chunk.stream, ExecOutputStream::Stdout); + late_output.push_str(&String::from_utf8_lossy(&chunk.chunk.into_inner())); + } + assert_eq!(late_output, "late output after exit\n"); + + let wake_rx = process.subscribe_wake(); + let actual = collect_process_output_from_reads(process, wake_rx).await?; + assert_eq!( + actual, + ("late output after exit\n".to_string(), Some(0), true) + ); + Ok(()) +} + async fn assert_exec_process_write_then_read(use_remote: bool) -> Result<()> { let context = create_process_context(use_remote).await?; let process_id = "proc-stdin".to_string(); @@ -586,6 +664,17 @@ async fn exec_process_replays_events_after_close(use_remote: bool) -> Result<()> assert_exec_process_replays_events_after_close(use_remote).await } +#[test_case(false ; "local")] +#[test_case(true ; "remote")] +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +// Serialize tests that launch a real exec-server process through the full CLI. +#[serial_test::serial(remote_exec_server)] +async fn exec_process_retains_output_after_exit_until_streams_close( + use_remote: bool, +) -> Result<()> { + assert_exec_process_retains_output_after_exit_until_streams_close(use_remote).await +} + #[test_case(false ; "local")] #[test_case(true ; "remote")] #[tokio::test(flavor = "multi_thread", worker_threads = 2)]