Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions codex-rs/app-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ use codex_core::config_loader::CloudRequirementsLoader;
use codex_core::default_client::set_default_client_residency_requirement;
use codex_core::error::CodexErr;
use codex_core::error::Result as CodexResult;
use codex_core::exec::ExecCapturePolicy;
use codex_core::exec::ExecExpiration;
use codex_core::exec::ExecParams;
use codex_core::exec_env::create_env;
Expand Down Expand Up @@ -1672,11 +1673,17 @@ impl CodexMessageProcessor {
None => ExecExpiration::DefaultTimeout,
}
};
let capture_policy = if disable_output_cap {
ExecCapturePolicy::FullBuffer
} else {
ExecCapturePolicy::ShellTool
};
let sandbox_cwd = self.config.cwd.clone();
let exec_params = ExecParams {
command,
cwd: cwd.clone(),
expiration,
capture_policy,
env,
network: started_network_proxy
.as_ref()
Expand Down
2 changes: 2 additions & 0 deletions codex-rs/app-server/src/command_exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -733,6 +733,7 @@ mod tests {
env: HashMap::new(),
network: None,
expiration: ExecExpiration::DefaultTimeout,
capture_policy: codex_core::exec::ExecCapturePolicy::ShellTool,
sandbox: SandboxType::WindowsRestrictedToken,
windows_sandbox_level: WindowsSandboxLevel::Disabled,
windows_sandbox_private_desktop: false,
Expand Down Expand Up @@ -845,6 +846,7 @@ mod tests {
env: HashMap::new(),
network: None,
expiration: ExecExpiration::Cancellation(CancellationToken::new()),
capture_policy: codex_core::exec::ExecCapturePolicy::ShellTool,
sandbox: SandboxType::None,
windows_sandbox_level: WindowsSandboxLevel::Disabled,
windows_sandbox_private_desktop: false,
Expand Down
3 changes: 3 additions & 0 deletions codex-rs/core/src/codex_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::config_loader::ConfigLayerStackOrdering;
use crate::config_loader::NetworkConstraints;
use crate::config_loader::RequirementSource;
use crate::config_loader::Sourced;
use crate::exec::ExecCapturePolicy;
use crate::exec::ExecToolCallOutput;
use crate::function_tool::FunctionCallError;
use crate::mcp_connection_manager::ToolInfo;
Expand Down Expand Up @@ -4788,6 +4789,7 @@ async fn rejects_escalated_permissions_when_policy_not_on_request() {
},
cwd: turn_context.cwd.clone(),
expiration: timeout_ms.into(),
capture_policy: ExecCapturePolicy::ShellTool,
env: HashMap::new(),
network: None,
sandbox_permissions,
Expand All @@ -4805,6 +4807,7 @@ async fn rejects_escalated_permissions_when_policy_not_on_request() {
command: params.command.clone(),
cwd: params.cwd.clone(),
expiration: timeout_ms.into(),
capture_policy: ExecCapturePolicy::ShellTool,
env: HashMap::new(),
network: None,
windows_sandbox_level: turn_context.windows_sandbox_level,
Expand Down
2 changes: 2 additions & 0 deletions codex-rs/core/src/codex_tests_guardian.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use crate::compact::InitialContextInjection;
use crate::config_loader::ConfigLayerEntry;
use crate::config_loader::ConfigRequirements;
use crate::config_loader::ConfigRequirementsToml;
use crate::exec::ExecCapturePolicy;
use crate::exec::ExecParams;
use crate::exec_policy::ExecPolicyManager;
use crate::features::Feature;
Expand Down Expand Up @@ -124,6 +125,7 @@ async fn guardian_allows_shell_additional_permissions_requests_past_policy_valid
},
cwd: turn_context.cwd.clone(),
expiration: expiration_ms.into(),
capture_policy: ExecCapturePolicy::ShellTool,
env: HashMap::new(),
network: None,
sandbox_permissions: SandboxPermissions::WithAdditionalPermissions,
Expand Down
127 changes: 98 additions & 29 deletions codex-rs/core/src/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ pub struct ExecParams {
pub command: Vec<String>,
pub cwd: PathBuf,
pub expiration: ExecExpiration,
pub capture_policy: ExecCapturePolicy,
pub env: HashMap<String, String>,
pub network: Option<NetworkProxy>,
pub sandbox_permissions: SandboxPermissions,
Expand All @@ -87,6 +88,16 @@ pub struct ExecParams {
pub arg0: Option<String>,
}

#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub enum ExecCapturePolicy {
/// Shell-like execs keep the historical output cap and timeout behavior.
#[default]
ShellTool,
/// Trusted internal helpers can buffer the full child output in memory
/// without the shell-oriented output cap or exec-expiration behavior.
FullBuffer,
}

fn select_process_exec_tool_sandbox_type(
file_system_sandbox_policy: &FileSystemSandboxPolicy,
network_sandbox_policy: NetworkSandboxPolicy,
Expand Down Expand Up @@ -147,6 +158,26 @@ impl ExecExpiration {
}
}

impl ExecCapturePolicy {
fn retained_bytes_cap(self) -> Option<usize> {
match self {
Self::ShellTool => Some(EXEC_OUTPUT_MAX_BYTES),
Self::FullBuffer => None,
}
}

fn io_drain_timeout(self) -> Duration {
Duration::from_millis(IO_DRAIN_TIMEOUT_MS)
}

fn uses_expiration(self) -> bool {
match self {
Self::ShellTool => true,
Self::FullBuffer => false,
}
}
}

#[derive(Clone, Copy, Debug, PartialEq)]
pub enum SandboxType {
None,
Expand Down Expand Up @@ -230,6 +261,7 @@ pub fn build_exec_request(
cwd,
mut env,
expiration,
capture_policy,
network,
sandbox_permissions,
windows_sandbox_level,
Expand All @@ -253,6 +285,7 @@ pub fn build_exec_request(
cwd,
env,
expiration,
capture_policy,
sandbox_permissions,
additional_permissions: None,
justification,
Expand Down Expand Up @@ -292,6 +325,7 @@ pub(crate) async fn execute_exec_request(
env,
network,
expiration,
capture_policy,
sandbox,
windows_sandbox_level,
windows_sandbox_private_desktop,
Expand All @@ -308,6 +342,7 @@ pub(crate) async fn execute_exec_request(
command,
cwd,
expiration,
capture_policy,
env,
network: network.clone(),
sandbox_permissions,
Expand Down Expand Up @@ -414,6 +449,7 @@ async fn exec_windows_sandbox(
mut env,
network,
expiration,
capture_policy,
windows_sandbox_level,
windows_sandbox_private_desktop,
..
Expand All @@ -424,7 +460,11 @@ async fn exec_windows_sandbox(

// TODO(iceweasel-oai): run_windows_sandbox_capture should support all
// variants of ExecExpiration, not just timeout.
let timeout_ms = expiration.timeout_ms();
let timeout_ms = if capture_policy.uses_expiration() {
expiration.timeout_ms()
} else {
None
};

let policy_str = serde_json::to_string(sandbox_policy).map_err(|err| {
CodexErr::Io(io::Error::other(format!(
Expand Down Expand Up @@ -488,12 +528,16 @@ async fn exec_windows_sandbox(

let exit_status = synthetic_exit_status(capture.exit_code);
let mut stdout_text = capture.stdout;
if stdout_text.len() > EXEC_OUTPUT_MAX_BYTES {
stdout_text.truncate(EXEC_OUTPUT_MAX_BYTES);
if let Some(max_bytes) = capture_policy.retained_bytes_cap()
&& stdout_text.len() > max_bytes
{
stdout_text.truncate(max_bytes);
}
let mut stderr_text = capture.stderr;
if stderr_text.len() > EXEC_OUTPUT_MAX_BYTES {
stderr_text.truncate(EXEC_OUTPUT_MAX_BYTES);
if let Some(max_bytes) = capture_policy.retained_bytes_cap()
&& stderr_text.len() > max_bytes
{
stderr_text.truncate(max_bytes);
}
let stdout = StreamOutput {
text: stdout_text,
Expand All @@ -503,7 +547,7 @@ async fn exec_windows_sandbox(
text: stderr_text,
truncated_after_lines: None,
};
let aggregated_output = aggregate_output(&stdout, &stderr);
let aggregated_output = aggregate_output(&stdout, &stderr, capture_policy.retained_bytes_cap());

Ok(RawExecToolCallOutput {
exit_status,
Expand Down Expand Up @@ -701,9 +745,20 @@ fn append_capped(dst: &mut Vec<u8>, src: &[u8], max_bytes: usize) {
fn aggregate_output(
stdout: &StreamOutput<Vec<u8>>,
stderr: &StreamOutput<Vec<u8>>,
max_bytes: Option<usize>,
) -> StreamOutput<Vec<u8>> {
let Some(max_bytes) = max_bytes else {
let total_len = stdout.text.len().saturating_add(stderr.text.len());
let mut aggregated = Vec::with_capacity(total_len);
aggregated.extend_from_slice(&stdout.text);
aggregated.extend_from_slice(&stderr.text);
return StreamOutput {
text: aggregated,
truncated_after_lines: None,
};
};

let total_len = stdout.text.len().saturating_add(stderr.text.len());
let max_bytes = EXEC_OUTPUT_MAX_BYTES;
let mut aggregated = Vec::with_capacity(total_len.min(max_bytes));

if total_len <= max_bytes {
Expand Down Expand Up @@ -785,6 +840,7 @@ async fn exec(
network,
arg0,
expiration,
capture_policy,
windows_sandbox_level: _,
..
} = params;
Expand Down Expand Up @@ -816,7 +872,7 @@ async fn exec(
if let Some(after_spawn) = after_spawn {
after_spawn();
}
consume_truncated_output(child, expiration, stdout_stream).await
consume_output(child, expiration, capture_policy, stdout_stream).await
}

#[cfg_attr(not(target_os = "windows"), allow(dead_code))]
Expand Down Expand Up @@ -870,11 +926,12 @@ fn windows_restricted_token_sandbox_support(
}
}

/// Consumes the output of a child process, truncating it so it is suitable for
/// use as the output of a `shell` tool call. Also enforces specified timeout.
async fn consume_truncated_output(
/// Consumes the output of a child process according to the configured capture
/// policy.
async fn consume_output(
mut child: Child,
expiration: ExecExpiration,
capture_policy: ExecCapturePolicy,
stdout_stream: Option<StdoutStream>,
) -> Result<RawExecToolCallOutput> {
// Both stdout and stderr were configured with `Stdio::piped()`
Expand All @@ -892,23 +949,34 @@ async fn consume_truncated_output(
))
})?;

let stdout_handle = tokio::spawn(read_capped(
let retained_bytes_cap = capture_policy.retained_bytes_cap();
let stdout_handle = tokio::spawn(read_output(
BufReader::new(stdout_reader),
stdout_stream.clone(),
/*is_stderr*/ false,
retained_bytes_cap,
));
let stderr_handle = tokio::spawn(read_capped(
let stderr_handle = tokio::spawn(read_output(
BufReader::new(stderr_reader),
stdout_stream.clone(),
/*is_stderr*/ true,
retained_bytes_cap,
));

let expiration_wait = async {
if capture_policy.uses_expiration() {
expiration.wait().await;
} else {
std::future::pending::<()>().await;
}
};
tokio::pin!(expiration_wait);
let (exit_status, timed_out) = tokio::select! {
status_result = child.wait() => {
let exit_status = status_result?;
(exit_status, false)
}
_ = expiration.wait() => {
_ = &mut expiration_wait => {
kill_child_process_group(&mut child)?;
child.start_kill()?;
(synthetic_exit_status(EXIT_CODE_SIGNAL_BASE + TIMEOUT_CODE), true)
Expand All @@ -923,7 +991,7 @@ async fn consume_truncated_output(
// We need mutable bindings so we can `abort()` them on timeout.
use tokio::task::JoinHandle;

async fn await_with_timeout(
async fn await_output(
handle: &mut JoinHandle<std::io::Result<StreamOutput<Vec<u8>>>>,
timeout: Duration,
) -> std::io::Result<StreamOutput<Vec<u8>>> {
Expand All @@ -946,17 +1014,9 @@ async fn consume_truncated_output(
let mut stdout_handle = stdout_handle;
let mut stderr_handle = stderr_handle;

let stdout = await_with_timeout(
&mut stdout_handle,
Duration::from_millis(IO_DRAIN_TIMEOUT_MS),
)
.await?;
let stderr = await_with_timeout(
&mut stderr_handle,
Duration::from_millis(IO_DRAIN_TIMEOUT_MS),
)
.await?;
let aggregated_output = aggregate_output(&stdout, &stderr);
let stdout = await_output(&mut stdout_handle, capture_policy.io_drain_timeout()).await?;
let stderr = await_output(&mut stderr_handle, capture_policy.io_drain_timeout()).await?;
Comment on lines +1017 to +1018
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Keep an I/O drain guard for FullBuffer execs

In consume_output, FullBuffer passes None here, so the post-exit pipe drain no longer has the timeout that the comment above IO_DRAIN_TIMEOUT_MS says is needed when grandchildren inherit stdout/stderr. In that scenario the direct child can exit successfully while a background child keeps the pipe open, and a FullBuffer caller will hang forever waiting for EOF instead of returning the captured output.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in d886f18afeb4.

FullBuffer no longer disables the post-exit pipe drain guard: ExecCapturePolicy::io_drain_timeout() now always returns IO_DRAIN_TIMEOUT_MS, while uses_expiration() remains the switch that disables exec expiration for FullBuffer.

I also added a regression test in core/src/exec_tests.rs that runs a FullBuffer exec where a background descendant inherits stdout/stderr and keeps the pipe open after the direct child exits. The test wraps the call in an outer timeout and now proves we return once the drain guard fires instead of hanging forever.

let aggregated_output = aggregate_output(&stdout, &stderr, retained_bytes_cap);

Ok(RawExecToolCallOutput {
exit_status,
Expand All @@ -967,12 +1027,17 @@ async fn consume_truncated_output(
})
}

async fn read_capped<R: AsyncRead + Unpin + Send + 'static>(
async fn read_output<R: AsyncRead + Unpin + Send + 'static>(
mut reader: R,
stream: Option<StdoutStream>,
is_stderr: bool,
max_bytes: Option<usize>,
) -> io::Result<StreamOutput<Vec<u8>>> {
let mut buf = Vec::with_capacity(AGGREGATE_BUFFER_INITIAL_CAPACITY.min(EXEC_OUTPUT_MAX_BYTES));
let mut buf = Vec::with_capacity(
max_bytes.map_or(AGGREGATE_BUFFER_INITIAL_CAPACITY, |max_bytes| {
AGGREGATE_BUFFER_INITIAL_CAPACITY.min(max_bytes)
}),
);
let mut tmp = [0u8; READ_CHUNK_SIZE];
let mut emitted_deltas: usize = 0;

Expand Down Expand Up @@ -1004,7 +1069,11 @@ async fn read_capped<R: AsyncRead + Unpin + Send + 'static>(
emitted_deltas += 1;
}

append_capped(&mut buf, &tmp[..n], EXEC_OUTPUT_MAX_BYTES);
if let Some(max_bytes) = max_bytes {
append_capped(&mut buf, &tmp[..n], max_bytes);
} else {
buf.extend_from_slice(&tmp[..n]);
}
// Continue reading to EOF to avoid back-pressure
}

Expand Down
Loading
Loading