From 9da80493692756876ca02b936a78b1ce891a82cc Mon Sep 17 00:00:00 2001 From: jif-oai Date: Tue, 20 Jan 2026 18:55:56 +0000 Subject: [PATCH 1/2] fix: memory leak issue --- codex-rs/core/src/exec.rs | 88 ++++++++++++++++++++++++++------------- 1 file changed, 59 insertions(+), 29 deletions(-) diff --git a/codex-rs/core/src/exec.rs b/codex-rs/core/src/exec.rs index 2a918732383..e27f213a176 100644 --- a/codex-rs/core/src/exec.rs +++ b/codex-rs/core/src/exec.rs @@ -47,6 +47,12 @@ const EXEC_TIMEOUT_EXIT_CODE: i32 = 124; // conventional timeout exit code const READ_CHUNK_SIZE: usize = 8192; // bytes per read const AGGREGATE_BUFFER_INITIAL_CAPACITY: usize = 8 * 1024; // 8 KiB +/// Hard cap on bytes retained from exec stdout/stderr/aggregated output. +/// +/// This mirrors unified exec's output cap so a single runaway command cannot +/// OOM the process by dumping huge amounts of data to stdout/stderr. +const EXEC_OUTPUT_MAX_BYTES: usize = 1024 * 1024; // 1 MiB + /// Limit the number of ExecCommandOutputDelta events emitted per exec call. /// Aggregation still collects full output; only the live event stream is capped. pub(crate) const MAX_EXEC_OUTPUT_DELTAS_PER_CALL: usize = 10_000; @@ -290,18 +296,32 @@ async fn exec_windows_sandbox( }; let exit_status = synthetic_exit_status(capture.exit_code); + let mut stdout_text = capture.stdout; + if stdout_text.len() > EXEC_OUTPUT_MAX_BYTES { + stdout_text.truncate(EXEC_OUTPUT_MAX_BYTES); + } + let mut stderr_text = capture.stderr; + if stderr_text.len() > EXEC_OUTPUT_MAX_BYTES { + stderr_text.truncate(EXEC_OUTPUT_MAX_BYTES); + } let stdout = StreamOutput { - text: capture.stdout, + text: stdout_text, truncated_after_lines: None, }; let stderr = StreamOutput { - text: capture.stderr, + text: stderr_text, truncated_after_lines: None, }; - // Best-effort aggregate: stdout then stderr - let mut aggregated = Vec::with_capacity(stdout.text.len() + stderr.text.len()); - append_all(&mut aggregated, &stdout.text); - append_all(&mut aggregated, &stderr.text); + // Best-effort aggregate: stdout then stderr (capped). + let mut aggregated = Vec::with_capacity( + stdout + .text + .len() + .saturating_add(stderr.text.len()) + .min(EXEC_OUTPUT_MAX_BYTES), + ); + append_capped(&mut aggregated, &stdout.text, EXEC_OUTPUT_MAX_BYTES); + append_capped(&mut aggregated, &stderr.text, EXEC_OUTPUT_MAX_BYTES); let aggregated_output = StreamOutput { text: aggregated, truncated_after_lines: None, @@ -490,8 +510,13 @@ impl StreamOutput> { } #[inline] -fn append_all(dst: &mut Vec, src: &[u8]) { - dst.extend_from_slice(src); +fn append_capped(dst: &mut Vec, src: &[u8], max_bytes: usize) { + if dst.len() >= max_bytes { + return; + } + let remaining = max_bytes.saturating_sub(dst.len()); + let take = remaining.min(src.len()); + dst.extend_from_slice(&src[..take]); } #[derive(Clone, Debug)] @@ -584,19 +609,15 @@ async fn consume_truncated_output( )) })?; - let (agg_tx, agg_rx) = async_channel::unbounded::>(); - let stdout_handle = tokio::spawn(read_capped( BufReader::new(stdout_reader), stdout_stream.clone(), false, - Some(agg_tx.clone()), )); let stderr_handle = tokio::spawn(read_capped( BufReader::new(stderr_reader), stdout_stream.clone(), true, - Some(agg_tx.clone()), )); let (exit_status, timed_out) = tokio::select! { @@ -662,15 +683,18 @@ async fn consume_truncated_output( Duration::from_millis(IO_DRAIN_TIMEOUT_MS), ) .await?; - - drop(agg_tx); - - let mut combined_buf = Vec::with_capacity(AGGREGATE_BUFFER_INITIAL_CAPACITY); - while let Ok(chunk) = agg_rx.recv().await { - append_all(&mut combined_buf, &chunk); - } + // Best-effort aggregate: stdout then stderr (capped). + let mut aggregated = Vec::with_capacity( + stdout + .text + .len() + .saturating_add(stderr.text.len()) + .min(EXEC_OUTPUT_MAX_BYTES), + ); + append_capped(&mut aggregated, &stdout.text, EXEC_OUTPUT_MAX_BYTES); + append_capped(&mut aggregated, &stderr.text, EXEC_OUTPUT_MAX_BYTES); let aggregated_output = StreamOutput { - text: combined_buf, + text: aggregated, truncated_after_lines: None, }; @@ -687,14 +711,11 @@ async fn read_capped( mut reader: R, stream: Option, is_stderr: bool, - aggregate_tx: Option>>, ) -> io::Result>> { - let mut buf = Vec::with_capacity(AGGREGATE_BUFFER_INITIAL_CAPACITY); + let mut buf = Vec::with_capacity(AGGREGATE_BUFFER_INITIAL_CAPACITY.min(EXEC_OUTPUT_MAX_BYTES)); let mut tmp = [0u8; READ_CHUNK_SIZE]; let mut emitted_deltas: usize = 0; - // No caps: append all bytes - loop { let n = reader.read(&mut tmp).await?; if n == 0 { @@ -723,11 +744,7 @@ async fn read_capped( emitted_deltas += 1; } - if let Some(tx) = &aggregate_tx { - let _ = tx.send(tmp[..n].to_vec()).await; - } - - append_all(&mut buf, &tmp[..n]); + append_capped(&mut buf, &tmp[..n], EXEC_OUTPUT_MAX_BYTES); // Continue reading to EOF to avoid back-pressure } @@ -755,6 +772,7 @@ fn synthetic_exit_status(code: i32) -> ExitStatus { mod tests { use super::*; use std::time::Duration; + use tokio::io::AsyncWriteExt; fn make_exec_output( exit_code: i32, @@ -816,6 +834,18 @@ mod tests { )); } + #[tokio::test] + async fn read_capped_limits_retained_bytes() { + let (mut writer, reader) = tokio::io::duplex(1024); + let bytes = vec![b'a'; EXEC_OUTPUT_MAX_BYTES.saturating_add(128 * 1024)]; + tokio::spawn(async move { + writer.write_all(&bytes).await.expect("write"); + }); + + let out = read_capped(reader, None, false).await.expect("read"); + assert_eq!(out.text.len(), EXEC_OUTPUT_MAX_BYTES); + } + #[cfg(unix)] #[test] fn sandbox_detection_flags_sigsys_exit_code() { From dccbf16c1e6f2c8589fe2e4c3d00c4485d9f1a73 Mon Sep 17 00:00:00 2001 From: jif-oai Date: Tue, 20 Jan 2026 19:56:35 +0000 Subject: [PATCH 2/2] Update codex-rs/core/src/exec.rs Co-authored-by: Josh McKinney --- codex-rs/core/src/exec.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/codex-rs/core/src/exec.rs b/codex-rs/core/src/exec.rs index e27f213a176..275f2fb8568 100644 --- a/codex-rs/core/src/exec.rs +++ b/codex-rs/core/src/exec.rs @@ -692,7 +692,7 @@ async fn consume_truncated_output( .min(EXEC_OUTPUT_MAX_BYTES), ); append_capped(&mut aggregated, &stdout.text, EXEC_OUTPUT_MAX_BYTES); - append_capped(&mut aggregated, &stderr.text, EXEC_OUTPUT_MAX_BYTES); + append_capped(&mut aggregated, &stderr.text, EXEC_OUTPUT_MAX_BYTES * 2); let aggregated_output = StreamOutput { text: aggregated, truncated_after_lines: None,