Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 59 additions & 29 deletions codex-rs/core/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ const EXEC_TIMEOUT_EXIT_CODE: i32 = 124; // conventional timeout exit code
const READ_CHUNK_SIZE: usize = 8192; // bytes per read
const AGGREGATE_BUFFER_INITIAL_CAPACITY: usize = 8 * 1024; // 8 KiB

/// Hard cap on bytes retained from exec stdout/stderr/aggregated output.
///
/// This mirrors unified exec's output cap so a single runaway command cannot
/// OOM the process by dumping huge amounts of data to stdout/stderr.
const EXEC_OUTPUT_MAX_BYTES: usize = 1024 * 1024; // 1 MiB

/// Limit the number of ExecCommandOutputDelta events emitted per exec call.
/// Aggregation still collects full output; only the live event stream is capped.
pub(crate) const MAX_EXEC_OUTPUT_DELTAS_PER_CALL: usize = 10_000;
Expand Down Expand Up @@ -290,18 +296,32 @@ async fn exec_windows_sandbox(
};

let exit_status = synthetic_exit_status(capture.exit_code);
let mut stdout_text = capture.stdout;
if stdout_text.len() > EXEC_OUTPUT_MAX_BYTES {
stdout_text.truncate(EXEC_OUTPUT_MAX_BYTES);
}
let mut stderr_text = capture.stderr;
if stderr_text.len() > EXEC_OUTPUT_MAX_BYTES {
stderr_text.truncate(EXEC_OUTPUT_MAX_BYTES);
}
let stdout = StreamOutput {
text: capture.stdout,
text: stdout_text,
truncated_after_lines: None,
};
let stderr = StreamOutput {
text: capture.stderr,
text: stderr_text,
truncated_after_lines: None,
};
// Best-effort aggregate: stdout then stderr
let mut aggregated = Vec::with_capacity(stdout.text.len() + stderr.text.len());
append_all(&mut aggregated, &stdout.text);
append_all(&mut aggregated, &stderr.text);
// Best-effort aggregate: stdout then stderr (capped).
let mut aggregated = Vec::with_capacity(
stdout
.text
.len()
.saturating_add(stderr.text.len())
.min(EXEC_OUTPUT_MAX_BYTES),
);
append_capped(&mut aggregated, &stdout.text, EXEC_OUTPUT_MAX_BYTES);
append_capped(&mut aggregated, &stderr.text, EXEC_OUTPUT_MAX_BYTES);
let aggregated_output = StreamOutput {
text: aggregated,
truncated_after_lines: None,
Expand Down Expand Up @@ -490,8 +510,13 @@ impl StreamOutput<Vec<u8>> {
}

#[inline]
fn append_all(dst: &mut Vec<u8>, src: &[u8]) {
dst.extend_from_slice(src);
fn append_capped(dst: &mut Vec<u8>, src: &[u8], max_bytes: usize) {
if dst.len() >= max_bytes {
return;
}
let remaining = max_bytes.saturating_sub(dst.len());
let take = remaining.min(src.len());
dst.extend_from_slice(&src[..take]);
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -584,19 +609,15 @@ async fn consume_truncated_output(
))
})?;

let (agg_tx, agg_rx) = async_channel::unbounded::<Vec<u8>>();

let stdout_handle = tokio::spawn(read_capped(
BufReader::new(stdout_reader),
stdout_stream.clone(),
false,
Some(agg_tx.clone()),
));
let stderr_handle = tokio::spawn(read_capped(
BufReader::new(stderr_reader),
stdout_stream.clone(),
true,
Some(agg_tx.clone()),
));

let (exit_status, timed_out) = tokio::select! {
Expand Down Expand Up @@ -662,15 +683,18 @@ async fn consume_truncated_output(
Duration::from_millis(IO_DRAIN_TIMEOUT_MS),
)
.await?;

drop(agg_tx);

let mut combined_buf = Vec::with_capacity(AGGREGATE_BUFFER_INITIAL_CAPACITY);
while let Ok(chunk) = agg_rx.recv().await {
append_all(&mut combined_buf, &chunk);
}
// Best-effort aggregate: stdout then stderr (capped).
let mut aggregated = Vec::with_capacity(
stdout
.text
.len()
.saturating_add(stderr.text.len())
.min(EXEC_OUTPUT_MAX_BYTES),
);
append_capped(&mut aggregated, &stdout.text, EXEC_OUTPUT_MAX_BYTES);
append_capped(&mut aggregated, &stderr.text, EXEC_OUTPUT_MAX_BYTES * 2);
let aggregated_output = StreamOutput {
text: combined_buf,
text: aggregated,
truncated_after_lines: None,
};

Expand All @@ -687,14 +711,11 @@ async fn read_capped<R: AsyncRead + Unpin + Send + 'static>(
mut reader: R,
stream: Option<StdoutStream>,
is_stderr: bool,
aggregate_tx: Option<Sender<Vec<u8>>>,
) -> io::Result<StreamOutput<Vec<u8>>> {
let mut buf = Vec::with_capacity(AGGREGATE_BUFFER_INITIAL_CAPACITY);
let mut buf = Vec::with_capacity(AGGREGATE_BUFFER_INITIAL_CAPACITY.min(EXEC_OUTPUT_MAX_BYTES));
let mut tmp = [0u8; READ_CHUNK_SIZE];
let mut emitted_deltas: usize = 0;

// No caps: append all bytes

loop {
let n = reader.read(&mut tmp).await?;
if n == 0 {
Expand Down Expand Up @@ -723,11 +744,7 @@ async fn read_capped<R: AsyncRead + Unpin + Send + 'static>(
emitted_deltas += 1;
}

if let Some(tx) = &aggregate_tx {
let _ = tx.send(tmp[..n].to_vec()).await;
}

append_all(&mut buf, &tmp[..n]);
append_capped(&mut buf, &tmp[..n], EXEC_OUTPUT_MAX_BYTES);
// Continue reading to EOF to avoid back-pressure
}

Expand Down Expand Up @@ -755,6 +772,7 @@ fn synthetic_exit_status(code: i32) -> ExitStatus {
mod tests {
use super::*;
use std::time::Duration;
use tokio::io::AsyncWriteExt;

fn make_exec_output(
exit_code: i32,
Expand Down Expand Up @@ -816,6 +834,18 @@ mod tests {
));
}

#[tokio::test]
async fn read_capped_limits_retained_bytes() {
let (mut writer, reader) = tokio::io::duplex(1024);
let bytes = vec![b'a'; EXEC_OUTPUT_MAX_BYTES.saturating_add(128 * 1024)];
tokio::spawn(async move {
writer.write_all(&bytes).await.expect("write");
});

let out = read_capped(reader, None, false).await.expect("read");
assert_eq!(out.text.len(), EXEC_OUTPUT_MAX_BYTES);
}

#[cfg(unix)]
#[test]
fn sandbox_detection_flags_sigsys_exit_code() {
Expand Down
Loading