Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
265 changes: 247 additions & 18 deletions codex-rs/exec-server/src/local_process.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::collections::VecDeque;
use std::collections::hash_map::Entry;
use std::sync::Arc;
use std::time::Duration;

Expand Down Expand Up @@ -628,16 +629,7 @@ async fn watch_exit(
.await;
}

maybe_emit_closed(process_id.clone(), Arc::clone(&inner)).await;

tokio::time::sleep(EXITED_PROCESS_RETENTION).await;
let mut processes = inner.processes.lock().await;
if matches!(
processes.get(&process_id),
Some(ProcessEntry::Running(process)) if process.exit_code == Some(exit_code)
) {
processes.remove(&process_id);
}
maybe_emit_closed(process_id, Arc::clone(&inner)).await;
}

async fn finish_output_stream(process_id: ProcessId, inner: Arc<Inner>) {
Expand All @@ -656,7 +648,7 @@ async fn finish_output_stream(process_id: ProcessId, inner: Arc<Inner>) {
}

async fn maybe_emit_closed(process_id: ProcessId, inner: Arc<Inner>) {
let notification = {
let (notification, output_notify) = {
let mut processes = inner.processes.lock().await;
let Some(ProcessEntry::Running(process)) = processes.get_mut(&process_id) else {
return;
Expand All @@ -671,15 +663,30 @@ async fn maybe_emit_closed(process_id: ProcessId, inner: Arc<Inner>) {
process.next_seq += 1;
let _ = process.wake_tx.send(seq);
process.events.publish(ExecProcessEvent::Closed { seq });
Some(ExecClosedNotification {
process_id: process_id.clone(),
seq,
})
(
ExecClosedNotification {
process_id: process_id.clone(),
seq,
},
Arc::clone(&process.output_notify),
)
};

let Some(notification) = notification else {
return;
};
output_notify.notify_waiters();
let cleanup_process_id = process_id.clone();
let cleanup_inner = Arc::clone(&inner);
tokio::spawn(async move {
tokio::time::sleep(EXITED_PROCESS_RETENTION).await;
let mut processes = cleanup_inner.processes.lock().await;
match processes.entry(cleanup_process_id) {
Entry::Occupied(entry) => {
if matches!(entry.get(), ProcessEntry::Running(process) if process.closed) {
entry.remove();
}
}
Entry::Vacant(_) => {}
}
});

if let Some(notifications) = notification_sender(&inner) {
let _ = notifications
Expand All @@ -700,6 +707,10 @@ fn notification_sender(inner: &Inner) -> Option<RpcNotificationSender> {
mod tests {
use super::*;
use codex_config::types::ShellEnvironmentPolicyInherit;
use codex_utils_pty::ProcessDriver;
use pretty_assertions::assert_eq;
use tokio::sync::oneshot;
use tokio::time::timeout;

fn test_exec_params(env: HashMap<String, String>) -> ExecParams {
ExecParams {
Expand Down Expand Up @@ -748,4 +759,222 @@ mod tests {

assert_eq!(child_env(&params), expected);
}

#[tokio::test]
async fn exited_process_retains_late_output_past_retention() {
let backend = LocalProcess::default();
let mut process = spawn_test_process(&backend, "proc-late-output").await;

process.exit(/*exit_code*/ 0);
let exit_response =
read_process_until_change(&backend, &process.process_id, /*after_seq*/ None).await;
assert_eq!(
exit_response,
ReadResponse {
chunks: Vec::new(),
next_seq: 2,
exited: true,
exit_code: Some(0),
closed: false,
failure: None,
}
);

tokio::time::sleep(EXITED_PROCESS_RETENTION + Duration::from_millis(10)).await;
process
.stdout_tx
.send(b"late output after retention\n".to_vec())
.await
.expect("send late stdout");

let late_response =
read_process_until_change(&backend, &process.process_id, /*after_seq*/ Some(1)).await;
assert_eq!(
late_response.chunks,
vec![ProcessOutputChunk {
seq: 2,
stream: ExecOutputStream::Stdout,
chunk: b"late output after retention\n".to_vec().into(),
}]
);
assert_eq!(late_response.exit_code, Some(0));
assert!(!late_response.closed);

drop(process.stdout_tx);
drop(process.stderr_tx);
let _closed_response = timeout(
Duration::from_secs(1),
read_process_until_closed(&backend, &process.process_id),
)
.await
.expect("process should close");
backend.shutdown().await;
}

#[tokio::test]
async fn closed_process_is_evicted_after_retention() {
let backend = LocalProcess::default();
let mut process = spawn_test_process(&backend, "proc-closed-eviction").await;
let process_id = process.process_id.clone();

process.exit(/*exit_code*/ 0);
drop(process.stdout_tx);
drop(process.stderr_tx);

let closed_response = timeout(
Duration::from_secs(1),
read_process_until_closed(&backend, &process_id),
)
.await
.expect("process should close");
assert!(closed_response.closed);

timeout(Duration::from_secs(1), async {
loop {
{
let processes = backend.inner.processes.lock().await;
if !processes.contains_key(&process_id) {
break;
}
}
tokio::time::sleep(Duration::from_millis(5)).await;
}
})
.await
.expect("closed process should be evicted");
backend.shutdown().await;
}

struct TestProcess {
process_id: ProcessId,
stdout_tx: mpsc::Sender<Vec<u8>>,
stderr_tx: mpsc::Sender<Vec<u8>>,
exit_tx: Option<oneshot::Sender<i32>>,
}

impl TestProcess {
fn exit(&mut self, exit_code: i32) {
self.exit_tx
.take()
.expect("process should not have exited")
.send(exit_code)
.expect("send process exit");
}
}

async fn spawn_test_process(backend: &LocalProcess, process_id: &str) -> TestProcess {
let process_id = ProcessId::from(process_id);
let (stdout_tx, stdout_rx) = mpsc::channel(16);
let (stderr_tx, stderr_rx) = mpsc::channel(16);
let (exit_tx, exit_rx) = oneshot::channel();
let output_notify = Arc::new(Notify::new());
let (wake_tx, _wake_rx) = watch::channel(0);
let events = ExecProcessEventLog::new(
PROCESS_EVENT_CHANNEL_CAPACITY,
RETAINED_OUTPUT_BYTES_PER_PROCESS,
);

let mut processes = backend.inner.processes.lock().await;
let previous = processes.insert(
process_id.clone(),
ProcessEntry::Running(Box::new(RunningProcess {
session: dummy_session(),
tty: false,
pipe_stdin: false,
output: VecDeque::new(),
retained_bytes: 0,
next_seq: 1,
exit_code: None,
wake_tx: wake_tx.clone(),
events: events.clone(),
output_notify: Arc::clone(&output_notify),
open_streams: 2,
closed: false,
})),
);
assert!(previous.is_none());
drop(processes);

tokio::spawn(stream_output(
process_id.clone(),
ExecOutputStream::Stdout,
stdout_rx,
Arc::clone(&backend.inner),
Arc::clone(&output_notify),
));
tokio::spawn(stream_output(
process_id.clone(),
ExecOutputStream::Stderr,
stderr_rx,
Arc::clone(&backend.inner),
Arc::clone(&output_notify),
));
tokio::spawn(watch_exit(
process_id.clone(),
exit_rx,
Arc::clone(&backend.inner),
output_notify,
));

TestProcess {
process_id,
stdout_tx,
stderr_tx,
exit_tx: Some(exit_tx),
}
}

fn dummy_session() -> ExecCommandSession {
let (writer_tx, _writer_rx) = mpsc::channel(1);
let (_stdout_tx, stdout_rx) = tokio::sync::broadcast::channel(1);
let (_stderr_tx, stderr_rx) = tokio::sync::broadcast::channel(1);
let (_exit_tx, exit_rx) = oneshot::channel();

codex_utils_pty::spawn_from_driver(ProcessDriver {
writer_tx,
stdout_rx,
stderr_rx: Some(stderr_rx),
exit_rx,
terminator: None,
writer_handle: None,
resizer: None,
})
.session
}

async fn read_process_until_change(
backend: &LocalProcess,
process_id: &ProcessId,
after_seq: Option<u64>,
) -> ReadResponse {
timeout(
Duration::from_secs(1),
backend.exec_read(ReadParams {
process_id: process_id.clone(),
after_seq,
max_bytes: None,
wait_ms: Some(1_000),
}),
)
.await
.expect("process read should finish")
.expect("process read")
}

async fn read_process_until_closed(
backend: &LocalProcess,
process_id: &ProcessId,
) -> ReadResponse {
let mut after_seq = None;
loop {
let response = read_process_until_change(backend, process_id, after_seq).await;
if response.closed {
return response;
}
for chunk in &response.chunks {
after_seq = Some(chunk.seq);
}
after_seq = response.next_seq.checked_sub(1).or(after_seq);
}
}
}
7 changes: 3 additions & 4 deletions codex-rs/exec-server/src/server/handler/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,12 @@ async fn long_poll_read_fails_after_session_resume() {
.expect("initialize");
first_handler.initialized().expect("initialized");

// Keep the process quiet and alive so the pending read can only complete
// after session resume, not because the process produced output or exited.
first_handler
.exec(exec_params_with_argv(
"proc-long-poll",
shell_argv(
"sleep 0.1; printf resumed",
"ping -n 2 127.0.0.1 >NUL && echo resumed",
),
shell_argv("sleep 5", "ping -n 6 127.0.0.1 >NUL"),
))
.await
.expect("start process");
Expand Down
Loading
Loading