Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions codex-rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions codex-rs/code-mode/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ pub use description::render_json_schema_to_typescript;
pub use response::DEFAULT_IMAGE_DETAIL;
pub use response::FunctionCallOutputContentItem;
pub use response::ImageDetail;
pub use runtime::CodeModeNestedToolCall;
pub use runtime::DEFAULT_EXEC_YIELD_TIME_MS;
pub use runtime::DEFAULT_MAX_OUTPUT_TOKENS_PER_EXEC_CALL;
pub use runtime::DEFAULT_WAIT_YIELD_TIME_MS;
pub use runtime::ExecuteRequest;
pub use runtime::RuntimeResponse;
pub use runtime::WaitOutcome;
pub use runtime::WaitRequest;
pub use service::CodeModeService;
pub use service::CodeModeTurnHost;
Expand Down
50 changes: 44 additions & 6 deletions codex-rs/code-mode/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::sync::mpsc as std_mpsc;
use std::thread;

use codex_protocol::ToolName;
use serde::Serialize;
use serde_json::Value as JsonValue;
use tokio::sync::mpsc;

Expand All @@ -25,6 +26,11 @@ const EXIT_SENTINEL: &str = "__codex_code_mode_exit__";

#[derive(Clone, Debug)]
pub struct ExecuteRequest {
/// Runtime cell id for this execution.
///
/// Callers allocate this before execution so tracing, waits, and nested tool
/// calls can refer to the cell as soon as JavaScript starts.
pub cell_id: String,
pub tool_call_id: String,
pub enabled_tools: Vec<ToolDefinition>,
pub source: String,
Expand All @@ -40,7 +46,30 @@ pub struct WaitRequest {
pub terminate: bool,
}

/// Result of waiting on a code-mode cell.
///
/// The wrapped `RuntimeResponse` is the model-facing wait result. The enum
/// variant carries the extra lifecycle provenance that `RuntimeResponse` cannot:
/// a failed real cell and a missing-cell wait both use
/// `RuntimeResponse::Result { error_text: Some(..), .. }`, but only the former
/// should be treated as a code-cell lifecycle event.
#[derive(Debug, PartialEq)]
pub enum WaitOutcome {
/// The requested code cell was live when the wait command was accepted.
LiveCell(RuntimeResponse),
/// The requested code cell was not live.
MissingCell(RuntimeResponse),
}

impl From<WaitOutcome> for RuntimeResponse {
fn from(outcome: WaitOutcome) -> Self {
match outcome {
WaitOutcome::LiveCell(response) | WaitOutcome::MissingCell(response) => response,
}
}
}

#[derive(Debug, PartialEq, Serialize)]
pub enum RuntimeResponse {
Yielded {
cell_id: String,
Expand All @@ -58,14 +87,22 @@ pub enum RuntimeResponse {
},
}

/// Nested tool request emitted by one code-mode cell.
///
/// Code mode owns the per-cell runtime id. Hosts should preserve it for
/// provenance/debugging, but should still assign their own runtime tool call id
/// if their tool-call graph requires globally unique ids.
#[derive(Debug)]
pub struct CodeModeNestedToolCall {
pub cell_id: String,
pub runtime_tool_call_id: String,
pub tool_name: ToolName,
pub input: Option<JsonValue>,
}

#[derive(Debug)]
pub(crate) enum TurnMessage {
ToolCall {
cell_id: String,
id: String,
name: ToolName,
input: Option<JsonValue>,
},
ToolCall(CodeModeNestedToolCall),
Notify {
cell_id: String,
call_id: String,
Expand Down Expand Up @@ -331,6 +368,7 @@ mod tests {

fn execute_request(source: &str) -> ExecuteRequest {
ExecuteRequest {
cell_id: "1".to_string(),
tool_call_id: "call_1".to_string(),
enabled_tools: Vec::new(),
source: source.to_string(),
Expand Down
128 changes: 92 additions & 36 deletions codex-rs/code-mode/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use std::sync::atomic::Ordering;
use std::time::Duration;

use async_trait::async_trait;
use codex_protocol::ToolName;
use serde_json::Value as JsonValue;
use tokio::sync::Mutex;
use tokio::sync::mpsc;
Expand All @@ -14,21 +13,22 @@ use tokio_util::sync::CancellationToken;
use tracing::warn;

use crate::FunctionCallOutputContentItem;
use crate::runtime::CodeModeNestedToolCall;
use crate::runtime::DEFAULT_EXEC_YIELD_TIME_MS;
use crate::runtime::ExecuteRequest;
use crate::runtime::RuntimeCommand;
use crate::runtime::RuntimeEvent;
use crate::runtime::RuntimeResponse;
use crate::runtime::TurnMessage;
use crate::runtime::WaitOutcome;
use crate::runtime::WaitRequest;
use crate::runtime::spawn_runtime;

#[async_trait]
pub trait CodeModeTurnHost: Send + Sync {
async fn invoke_tool(
&self,
tool_name: ToolName,
input: Option<JsonValue>,
invocation: CodeModeNestedToolCall,
cancellation_token: CancellationToken,
) -> Result<JsonValue, String>;

Expand Down Expand Up @@ -76,24 +76,45 @@ impl CodeModeService {
*self.inner.stored_values.lock().await = values;
}

pub async fn execute(&self, request: ExecuteRequest) -> Result<RuntimeResponse, String> {
let cell_id = self
.inner
/// Reserves the runtime cell id for a future `execute` request.
///
/// The runtime can issue nested tool calls before the first `execute`
/// response is returned. Hosts that need a parent trace object for those
/// nested calls should allocate the cell id up front and pass it back on the
/// `ExecuteRequest`.
pub fn allocate_cell_id(&self) -> String {
self.inner
.next_cell_id
.fetch_add(1, Ordering::Relaxed)
.to_string();
.to_string()
}

pub async fn execute(&self, request: ExecuteRequest) -> Result<RuntimeResponse, String> {
let cell_id = request.cell_id.clone();
let initial_yield_time_ms = request.yield_time_ms.unwrap_or(DEFAULT_EXEC_YIELD_TIME_MS);
let (event_tx, event_rx) = mpsc::unbounded_channel();
let (runtime_tx, runtime_terminate_handle) = spawn_runtime(request.clone(), event_tx)?;
let (control_tx, control_rx) = mpsc::unbounded_channel();
let (response_tx, response_rx) = oneshot::channel();
let (runtime_tx, runtime_terminate_handle) = {
let mut sessions = self.inner.sessions.lock().await;
if sessions.contains_key(&cell_id) {
return Err(format!("exec cell {cell_id} already exists"));
}

self.inner.sessions.lock().await.insert(
cell_id.clone(),
SessionHandle {
control_tx: control_tx.clone(),
runtime_tx: runtime_tx.clone(),
},
);
let (runtime_tx, runtime_terminate_handle) = spawn_runtime(request, event_tx)?;

// Keep the session registry locked through insertion so a
// caller-owned cell id cannot race with another execute and replace
// a live runtime.
sessions.insert(
cell_id.clone(),
SessionHandle {
control_tx,
runtime_tx: runtime_tx.clone(),
},
);
(runtime_tx, runtime_terminate_handle)
};

tokio::spawn(run_session_control(
Arc::clone(&self.inner),
Expand All @@ -105,15 +126,15 @@ impl CodeModeService {
event_rx,
control_rx,
response_tx,
request.yield_time_ms.unwrap_or(DEFAULT_EXEC_YIELD_TIME_MS),
initial_yield_time_ms,
));

response_rx
.await
.map_err(|_| "exec runtime ended unexpectedly".to_string())
}

pub async fn wait(&self, request: WaitRequest) -> Result<RuntimeResponse, String> {
pub async fn wait(&self, request: WaitRequest) -> Result<WaitOutcome, String> {
let cell_id = request.cell_id.clone();
let handle = self
.inner
Expand All @@ -123,7 +144,7 @@ impl CodeModeService {
.get(&request.cell_id)
.cloned();
let Some(handle) = handle else {
return Ok(missing_cell_response(cell_id));
return Ok(WaitOutcome::MissingCell(missing_cell_response(cell_id)));
};
let (response_tx, response_rx) = oneshot::channel();
let control_message = if request.terminate {
Expand All @@ -135,11 +156,13 @@ impl CodeModeService {
}
};
if handle.control_tx.send(control_message).is_err() {
return Ok(missing_cell_response(cell_id));
return Ok(WaitOutcome::MissingCell(missing_cell_response(cell_id)));
}
match response_rx.await {
Ok(response) => Ok(response),
Err(_) => Ok(missing_cell_response(request.cell_id)),
Ok(response) => Ok(WaitOutcome::LiveCell(response)),
Err(_) => Ok(WaitOutcome::MissingCell(missing_cell_response(
request.cell_id,
))),
}
}

Expand Down Expand Up @@ -169,18 +192,14 @@ impl CodeModeService {
);
}
}
TurnMessage::ToolCall {
cell_id,
id,
name,
input,
} => {
TurnMessage::ToolCall(invocation) => {
let host = Arc::clone(&host);
let inner = Arc::clone(&inner);
tokio::spawn(async move {
let response = host
.invoke_tool(name, input, CancellationToken::new())
.await;
let cell_id = invocation.cell_id.clone();
let runtime_tool_call_id = invocation.runtime_tool_call_id.clone();
let response =
host.invoke_tool(invocation, CancellationToken::new()).await;
let runtime_tx = inner
.sessions
.lock()
Expand All @@ -191,8 +210,14 @@ impl CodeModeService {
return;
};
let command = match response {
Ok(result) => RuntimeCommand::ToolResponse { id, result },
Err(error_text) => RuntimeCommand::ToolError { id, error_text },
Ok(result) => RuntimeCommand::ToolResponse {
id: runtime_tool_call_id,
result,
},
Err(error_text) => RuntimeCommand::ToolError {
id: runtime_tool_call_id,
error_text,
},
};
let _ = runtime_tx.send(command);
});
Expand Down Expand Up @@ -361,12 +386,16 @@ async fn run_session_control(
}).await;
}
RuntimeEvent::ToolCall { id, name, input } => {
let _ = inner.turn_message_tx.send(TurnMessage::ToolCall {
let tool_call = CodeModeNestedToolCall {
cell_id: cell_id.clone(),
id,
name,
runtime_tool_call_id: id,
tool_name: name,
input,
}).await;
};
let _ = inner
.turn_message_tx
.send(TurnMessage::ToolCall(tool_call))
.await;
}
RuntimeEvent::Result {
stored_values,
Expand Down Expand Up @@ -479,6 +508,8 @@ mod tests {
use super::RuntimeResponse;
use super::SessionControlCommand;
use super::SessionControlContext;
use super::WaitOutcome;
use super::WaitRequest;
use super::run_session_control;
use crate::FunctionCallOutputContentItem;
use crate::runtime::ExecuteRequest;
Expand All @@ -487,6 +518,7 @@ mod tests {

fn execute_request(source: &str) -> ExecuteRequest {
ExecuteRequest {
cell_id: "1".to_string(),
tool_call_id: "call_1".to_string(),
enabled_tools: Vec::new(),
source: source.to_string(),
Expand Down Expand Up @@ -829,6 +861,30 @@ image({
);
}

#[tokio::test]
async fn wait_reports_missing_cell_separately_from_runtime_results() {
let service = CodeModeService::new();

let response = service
.wait(WaitRequest {
cell_id: "missing".to_string(),
yield_time_ms: 1,
terminate: false,
})
.await
.unwrap();

assert_eq!(
response,
WaitOutcome::MissingCell(RuntimeResponse::Result {
cell_id: "missing".to_string(),
content_items: Vec::new(),
stored_values: HashMap::new(),
error_text: Some("exec cell missing not found".to_string()),
})
);
}

#[tokio::test]
async fn terminate_waits_for_runtime_shutdown_before_responding() {
let inner = test_inner();
Expand Down
2 changes: 2 additions & 0 deletions codex-rs/core/src/session/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6919,6 +6919,7 @@ async fn rejects_escalated_permissions_when_policy_not_on_request() {
tracker: Arc::clone(&turn_diff_tracker),
call_id,
tool_name: codex_tools::ToolName::plain(tool_name),
source: crate::tools::context::ToolCallSource::Direct,
payload: ToolPayload::Function {
arguments: serde_json::json!({
"command": params.command.clone(),
Expand Down Expand Up @@ -6998,6 +6999,7 @@ async fn unified_exec_rejects_escalated_permissions_when_policy_not_on_request()
tracker: Arc::clone(&tracker),
call_id: "exec-call".to_string(),
tool_name: codex_tools::ToolName::plain("exec_command"),
source: crate::tools::context::ToolCallSource::Direct,
payload: ToolPayload::Function {
arguments: serde_json::json!({
"cmd": "echo hi",
Expand Down
Loading
Loading