From 952b7212b36da77b738dbf82fcbd25680f1f0d9c Mon Sep 17 00:00:00 2001 From: starr-openai Date: Thu, 19 Mar 2026 14:18:10 -0700 Subject: [PATCH 01/10] Introduce exec process capability traits Add a narrow ExecProcess trait for process lifecycle RPCs and expose it from Environment behind an ExecutorEnvironment trait. Keep the first cut behavior-preserving by delegating remote mode to the existing ExecServerClient and returning an unavailable process stub for default local Environment values. Co-authored-by: Codex --- codex-rs/exec-server/src/client.rs | 29 ++++++++++++ codex-rs/exec-server/src/client_api.rs | 33 +++++++++++++ codex-rs/exec-server/src/environment.rs | 62 +++++++++++++++++++++++++ codex-rs/exec-server/src/lib.rs | 2 + 4 files changed, 126 insertions(+) diff --git a/codex-rs/exec-server/src/client.rs b/codex-rs/exec-server/src/client.rs index a7680e73e8db..6a5bbbf7a246 100644 --- a/codex-rs/exec-server/src/client.rs +++ b/codex-rs/exec-server/src/client.rs @@ -1,6 +1,7 @@ use std::sync::Arc; use std::time::Duration; +use async_trait::async_trait; use codex_app_server_protocol::FsCopyParams; use codex_app_server_protocol::FsCopyResponse; use codex_app_server_protocol::FsCreateDirectoryParams; @@ -26,6 +27,7 @@ use tracing::warn; use crate::client_api::ExecServerClientConnectOptions; use crate::client_api::ExecServerEvent; +use crate::client_api::ExecProcess; use crate::client_api::RemoteExecServerConnectArgs; use crate::connection::JsonRpcConnection; use crate::protocol::EXEC_EXITED_METHOD; @@ -504,6 +506,33 @@ impl ExecServerClient { } } +#[async_trait] +impl ExecProcess for ExecServerClient { + async fn start(&self, params: ExecParams) -> Result { + self.exec(params).await + } + + async fn read(&self, params: ReadParams) -> Result { + self.read(params).await + } + + async fn write( + &self, + process_id: &str, + chunk: Vec, + ) -> Result { + self.write(process_id, chunk).await + } + + async fn terminate(&self, process_id: &str) -> Result { + self.terminate(process_id).await + } + + fn subscribe_events(&self) -> broadcast::Receiver { + self.event_receiver() + } +} + impl From for ExecServerError { fn from(value: RpcCallError) -> Self { match value { diff --git a/codex-rs/exec-server/src/client_api.rs b/codex-rs/exec-server/src/client_api.rs index 962d3ba36483..852ade566a38 100644 --- a/codex-rs/exec-server/src/client_api.rs +++ b/codex-rs/exec-server/src/client_api.rs @@ -1,7 +1,17 @@ use std::time::Duration; +use async_trait::async_trait; +use tokio::sync::broadcast; + +use crate::client::ExecServerError; use crate::protocol::ExecExitedNotification; use crate::protocol::ExecOutputDeltaNotification; +use crate::protocol::ExecParams; +use crate::protocol::ExecResponse; +use crate::protocol::ReadParams; +use crate::protocol::ReadResponse; +use crate::protocol::TerminateResponse; +use crate::protocol::WriteResponse; /// Connection options for any exec-server client transport. #[derive(Debug, Clone, PartialEq, Eq)] @@ -25,3 +35,26 @@ pub enum ExecServerEvent { OutputDelta(ExecOutputDeltaNotification), Exited(ExecExitedNotification), } + +/// Process lifecycle capability for an execution environment. +#[async_trait] +pub trait ExecProcess: Send + Sync { + async fn start(&self, params: ExecParams) -> Result; + + async fn read(&self, params: ReadParams) -> Result; + + async fn write( + &self, + process_id: &str, + chunk: Vec, + ) -> Result; + + async fn terminate(&self, process_id: &str) -> Result; + + fn subscribe_events(&self) -> broadcast::Receiver; +} + +/// Capability bundle exposed by an execution environment. +pub trait ExecutorEnvironment: Send + Sync { + fn process(&self) -> &(dyn ExecProcess + '_); +} diff --git a/codex-rs/exec-server/src/environment.rs b/codex-rs/exec-server/src/environment.rs index c8635ec03a0b..a91c355fff75 100644 --- a/codex-rs/exec-server/src/environment.rs +++ b/codex-rs/exec-server/src/environment.rs @@ -1,8 +1,24 @@ use crate::ExecServerClient; use crate::ExecServerError; +use crate::ExecServerEvent; +use crate::ExecProcess; +use crate::ExecutorEnvironment; use crate::RemoteExecServerConnectArgs; +use crate::protocol::ExecParams; +use crate::protocol::ExecResponse; +use crate::protocol::ReadParams; +use crate::protocol::ReadResponse; +use crate::protocol::TerminateResponse; +use crate::protocol::WriteResponse; use crate::fs; use crate::fs::ExecutorFileSystem; +use async_trait::async_trait; +use tokio::sync::broadcast; + +#[derive(Debug)] +struct UnavailableExecProcess; + +static UNAVAILABLE_EXEC_PROCESS: UnavailableExecProcess = UnavailableExecProcess; #[derive(Clone, Default)] pub struct Environment { @@ -56,11 +72,57 @@ impl Environment { self.remote_exec_server_client.as_ref() } + pub fn process(&self) -> &(dyn ExecProcess + '_) { + self.remote_exec_server_client + .as_ref() + .map_or(&UNAVAILABLE_EXEC_PROCESS as &dyn ExecProcess, |client| client) + } + pub fn get_filesystem(&self) -> impl ExecutorFileSystem + use<> { fs::LocalFileSystem } } +impl ExecutorEnvironment for Environment { + fn process(&self) -> &(dyn ExecProcess + '_) { + self.process() + } +} + +#[async_trait] +impl ExecProcess for UnavailableExecProcess { + async fn start(&self, _params: ExecParams) -> Result { + Err(unavailable_exec_process_error()) + } + + async fn read(&self, _params: ReadParams) -> Result { + Err(unavailable_exec_process_error()) + } + + async fn write( + &self, + _process_id: &str, + _chunk: Vec, + ) -> Result { + Err(unavailable_exec_process_error()) + } + + async fn terminate(&self, _process_id: &str) -> Result { + Err(unavailable_exec_process_error()) + } + + fn subscribe_events(&self) -> broadcast::Receiver { + let (_tx, rx) = broadcast::channel(1); + rx + } +} + +fn unavailable_exec_process_error() -> ExecServerError { + ExecServerError::Protocol( + "exec process capability is unavailable for a local default Environment".to_string(), + ) +} + #[cfg(test)] mod tests { use super::Environment; diff --git a/codex-rs/exec-server/src/lib.rs b/codex-rs/exec-server/src/lib.rs index 3c50d0ec5911..19f0642ad061 100644 --- a/codex-rs/exec-server/src/lib.rs +++ b/codex-rs/exec-server/src/lib.rs @@ -11,6 +11,8 @@ pub use client::ExecServerClient; pub use client::ExecServerError; pub use client_api::ExecServerClientConnectOptions; pub use client_api::ExecServerEvent; +pub use client_api::ExecProcess; +pub use client_api::ExecutorEnvironment; pub use client_api::RemoteExecServerConnectArgs; pub use codex_app_server_protocol::FsCopyParams; pub use codex_app_server_protocol::FsCopyResponse; From 7eebdbd3a83f5dfac586e6053c50f4c756f65f62 Mon Sep 17 00:00:00 2001 From: starr-openai Date: Thu, 19 Mar 2026 14:23:58 -0700 Subject: [PATCH 02/10] Use local exec process by default in Environment::create Fall back to an in-process exec-server client when no remote URL is configured, so constructed Environment values have a real process capability in both local and remote modes. Co-authored-by: Codex --- codex-rs/exec-server/src/environment.rs | 37 ++++++++++++++----------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/codex-rs/exec-server/src/environment.rs b/codex-rs/exec-server/src/environment.rs index a91c355fff75..024c02ef57bb 100644 --- a/codex-rs/exec-server/src/environment.rs +++ b/codex-rs/exec-server/src/environment.rs @@ -1,4 +1,5 @@ use crate::ExecServerClient; +use crate::ExecServerClientConnectOptions; use crate::ExecServerError; use crate::ExecServerEvent; use crate::ExecProcess; @@ -23,7 +24,7 @@ static UNAVAILABLE_EXEC_PROCESS: UnavailableExecProcess = UnavailableExecProcess #[derive(Clone, Default)] pub struct Environment { experimental_exec_server_url: Option, - remote_exec_server_client: Option, + exec_server_client: Option, } impl std::fmt::Debug for Environment { @@ -34,8 +35,8 @@ impl std::fmt::Debug for Environment { &self.experimental_exec_server_url, ) .field( - "has_remote_exec_server_client", - &self.remote_exec_server_client.is_some(), + "has_exec_server_client", + &self.exec_server_client.is_some(), ) .finish() } @@ -45,22 +46,22 @@ impl Environment { pub async fn create( experimental_exec_server_url: Option, ) -> Result { - let remote_exec_server_client = + let exec_server_client = Some( if let Some(websocket_url) = experimental_exec_server_url.as_deref() { - Some( - ExecServerClient::connect_websocket(RemoteExecServerConnectArgs::new( - websocket_url.to_string(), - "codex-core".to_string(), - )) - .await?, - ) + ExecServerClient::connect_websocket(RemoteExecServerConnectArgs::new( + websocket_url.to_string(), + "codex-core".to_string(), + )) + .await? } else { - None - }; + ExecServerClient::connect_in_process(ExecServerClientConnectOptions::default()) + .await? + }, + ); Ok(Self { experimental_exec_server_url, - remote_exec_server_client, + exec_server_client, }) } @@ -69,11 +70,15 @@ impl Environment { } pub fn remote_exec_server_client(&self) -> Option<&ExecServerClient> { - self.remote_exec_server_client.as_ref() + if self.experimental_exec_server_url.is_some() { + self.exec_server_client.as_ref() + } else { + None + } } pub fn process(&self) -> &(dyn ExecProcess + '_) { - self.remote_exec_server_client + self.exec_server_client .as_ref() .map_or(&UNAVAILABLE_EXEC_PROCESS as &dyn ExecProcess, |client| client) } From 2d52d35c0999cf8b6745d220447f45f72b3c1d96 Mon Sep 17 00:00:00 2001 From: starr-openai Date: Thu, 19 Mar 2026 14:31:47 -0700 Subject: [PATCH 03/10] Replace unavailable exec stub with local ExecProcess Give Environment a real local ExecProcess implementation by default and keep remote mode selecting the websocket-backed client when configured. The local implementation lazily initializes an in-process exec-server client on first use. Co-authored-by: Codex --- codex-rs/exec-server/src/environment.rs | 97 +++++++++++++++---------- 1 file changed, 57 insertions(+), 40 deletions(-) diff --git a/codex-rs/exec-server/src/environment.rs b/codex-rs/exec-server/src/environment.rs index 024c02ef57bb..5372de9fdfed 100644 --- a/codex-rs/exec-server/src/environment.rs +++ b/codex-rs/exec-server/src/environment.rs @@ -1,5 +1,4 @@ use crate::ExecServerClient; -use crate::ExecServerClientConnectOptions; use crate::ExecServerError; use crate::ExecServerEvent; use crate::ExecProcess; @@ -14,17 +13,30 @@ use crate::protocol::WriteResponse; use crate::fs; use crate::fs::ExecutorFileSystem; use async_trait::async_trait; +use std::sync::Arc; use tokio::sync::broadcast; - -#[derive(Debug)] -struct UnavailableExecProcess; - -static UNAVAILABLE_EXEC_PROCESS: UnavailableExecProcess = UnavailableExecProcess; +use tokio::sync::OnceCell; #[derive(Clone, Default)] +struct LocalExecProcess { + client: Arc>, +} + +#[derive(Clone)] pub struct Environment { experimental_exec_server_url: Option, - exec_server_client: Option, + remote_exec_server_client: Option, + local_exec_process: LocalExecProcess, +} + +impl Default for Environment { + fn default() -> Self { + Self { + experimental_exec_server_url: None, + remote_exec_server_client: None, + local_exec_process: LocalExecProcess::default(), + } + } } impl std::fmt::Debug for Environment { @@ -35,8 +47,8 @@ impl std::fmt::Debug for Environment { &self.experimental_exec_server_url, ) .field( - "has_exec_server_client", - &self.exec_server_client.is_some(), + "has_remote_exec_server_client", + &self.remote_exec_server_client.is_some(), ) .finish() } @@ -46,22 +58,23 @@ impl Environment { pub async fn create( experimental_exec_server_url: Option, ) -> Result { - let exec_server_client = Some( + let remote_exec_server_client = if let Some(websocket_url) = experimental_exec_server_url.as_deref() { - ExecServerClient::connect_websocket(RemoteExecServerConnectArgs::new( + Some( + ExecServerClient::connect_websocket(RemoteExecServerConnectArgs::new( websocket_url.to_string(), "codex-core".to_string(), )) - .await? + .await?, + ) } else { - ExecServerClient::connect_in_process(ExecServerClientConnectOptions::default()) - .await? - }, - ); + None + }; Ok(Self { experimental_exec_server_url, - exec_server_client, + remote_exec_server_client, + local_exec_process: LocalExecProcess::default(), }) } @@ -70,17 +83,13 @@ impl Environment { } pub fn remote_exec_server_client(&self) -> Option<&ExecServerClient> { - if self.experimental_exec_server_url.is_some() { - self.exec_server_client.as_ref() - } else { - None - } + self.remote_exec_server_client.as_ref() } pub fn process(&self) -> &(dyn ExecProcess + '_) { - self.exec_server_client + self.remote_exec_server_client .as_ref() - .map_or(&UNAVAILABLE_EXEC_PROCESS as &dyn ExecProcess, |client| client) + .map_or(&self.local_exec_process as &dyn ExecProcess, |client| client) } pub fn get_filesystem(&self) -> impl ExecutorFileSystem + use<> { @@ -95,37 +104,45 @@ impl ExecutorEnvironment for Environment { } #[async_trait] -impl ExecProcess for UnavailableExecProcess { - async fn start(&self, _params: ExecParams) -> Result { - Err(unavailable_exec_process_error()) +impl ExecProcess for LocalExecProcess { + async fn start(&self, params: ExecParams) -> Result { + self.client().await?.start(params).await } - async fn read(&self, _params: ReadParams) -> Result { - Err(unavailable_exec_process_error()) + async fn read(&self, params: ReadParams) -> Result { + self.client().await?.read(params).await } async fn write( &self, - _process_id: &str, - _chunk: Vec, + process_id: &str, + chunk: Vec, ) -> Result { - Err(unavailable_exec_process_error()) + self.client().await?.write(process_id, chunk).await } - async fn terminate(&self, _process_id: &str) -> Result { - Err(unavailable_exec_process_error()) + async fn terminate(&self, process_id: &str) -> Result { + self.client().await?.terminate(process_id).await } fn subscribe_events(&self) -> broadcast::Receiver { - let (_tx, rx) = broadcast::channel(1); - rx + if let Some(client) = self.client.get() { + client.event_receiver() + } else { + let (_tx, rx) = broadcast::channel(1); + rx + } } } -fn unavailable_exec_process_error() -> ExecServerError { - ExecServerError::Protocol( - "exec process capability is unavailable for a local default Environment".to_string(), - ) +impl LocalExecProcess { + async fn client(&self) -> Result<&ExecServerClient, ExecServerError> { + self.client + .get_or_try_init(|| async { + ExecServerClient::connect_in_process(Default::default()).await + }) + .await + } } #[cfg(test)] From 7bbc0a4e15734c2bf8e0b66589ab53a686811151 Mon Sep 17 00:00:00 2001 From: starr-openai Date: Thu, 19 Mar 2026 14:36:47 -0700 Subject: [PATCH 04/10] Use a direct local ExecProcess implementation Replace the in-process client wrapper with a local ExecProcess that owns ExecServerHandler directly and forwards process notifications to the trait event stream. This keeps the default Environment process path entirely local instead of routing through ExecServerClient. Co-authored-by: Codex --- codex-rs/exec-server/src/environment.rs | 118 ++++++++++++++++++++---- 1 file changed, 101 insertions(+), 17 deletions(-) diff --git a/codex-rs/exec-server/src/environment.rs b/codex-rs/exec-server/src/environment.rs index 5372de9fdfed..2336941ee21f 100644 --- a/codex-rs/exec-server/src/environment.rs +++ b/codex-rs/exec-server/src/environment.rs @@ -4,22 +4,57 @@ use crate::ExecServerEvent; use crate::ExecProcess; use crate::ExecutorEnvironment; use crate::RemoteExecServerConnectArgs; +use crate::protocol::EXEC_EXITED_METHOD; +use crate::protocol::EXEC_OUTPUT_DELTA_METHOD; +use crate::protocol::ExecExitedNotification; +use crate::protocol::ExecOutputDeltaNotification; use crate::protocol::ExecParams; use crate::protocol::ExecResponse; use crate::protocol::ReadParams; use crate::protocol::ReadResponse; +use crate::protocol::TerminateParams; use crate::protocol::TerminateResponse; +use crate::protocol::WriteParams; use crate::protocol::WriteResponse; +use crate::rpc::RpcNotificationSender; +use crate::rpc::RpcServerOutboundMessage; +use crate::server::ExecServerHandler; use crate::fs; use crate::fs::ExecutorFileSystem; use async_trait::async_trait; +use serde_json::Value; use std::sync::Arc; use tokio::sync::broadcast; +use tokio::sync::mpsc; +use tokio::sync::Mutex; use tokio::sync::OnceCell; -#[derive(Clone, Default)] +#[derive(Clone)] struct LocalExecProcess { - client: Arc>, + handler: Arc, + events_tx: broadcast::Sender, + outgoing_rx: Arc>>>, + reader_task_started: Arc>, +} + +impl Default for LocalExecProcess { + fn default() -> Self { + let (outgoing_tx, outgoing_rx) = mpsc::channel(256); + let handler = Arc::new(ExecServerHandler::new(RpcNotificationSender::new(outgoing_tx))); + let (_initialize_response) = handler + .initialize() + .expect("new local exec process should initialize once"); + handler + .initialized() + .expect("new local exec process should accept initialized notification"); + + Self { + handler, + events_tx: broadcast::channel(256).0, + outgoing_rx: Arc::new(Mutex::new(Some(outgoing_rx))), + reader_task_started: Arc::new(OnceCell::new()), + } + } } #[derive(Clone)] @@ -106,11 +141,13 @@ impl ExecutorEnvironment for Environment { #[async_trait] impl ExecProcess for LocalExecProcess { async fn start(&self, params: ExecParams) -> Result { - self.client().await?.start(params).await + self.ensure_reader_task().await; + self.handler.exec(params).await.map_err(local_server_error) } async fn read(&self, params: ReadParams) -> Result { - self.client().await?.read(params).await + self.ensure_reader_task().await; + self.handler.exec_read(params).await.map_err(local_server_error) } async fn write( @@ -118,30 +155,77 @@ impl ExecProcess for LocalExecProcess { process_id: &str, chunk: Vec, ) -> Result { - self.client().await?.write(process_id, chunk).await + self.ensure_reader_task().await; + self.handler + .exec_write(WriteParams { + process_id: process_id.to_string(), + chunk: chunk.into(), + }) + .await + .map_err(local_server_error) } async fn terminate(&self, process_id: &str) -> Result { - self.client().await?.terminate(process_id).await + self.ensure_reader_task().await; + self.handler + .terminate(TerminateParams { + process_id: process_id.to_string(), + }) + .await + .map_err(local_server_error) } fn subscribe_events(&self) -> broadcast::Receiver { - if let Some(client) = self.client.get() { - client.event_receiver() - } else { - let (_tx, rx) = broadcast::channel(1); - rx - } + self.events_tx.subscribe() } } impl LocalExecProcess { - async fn client(&self) -> Result<&ExecServerClient, ExecServerError> { - self.client - .get_or_try_init(|| async { - ExecServerClient::connect_in_process(Default::default()).await + async fn ensure_reader_task(&self) { + let _ = self + .reader_task_started + .get_or_init(|| async { + let mut outgoing_rx = self.outgoing_rx.lock().await; + let Some(mut outgoing_rx) = outgoing_rx.take() else { + return; + }; + let events_tx = self.events_tx.clone(); + tokio::spawn(async move { + while let Some(message) = outgoing_rx.recv().await { + if let RpcServerOutboundMessage::Notification(notification) = message { + match notification.method.as_str() { + EXEC_OUTPUT_DELTA_METHOD => { + if let Ok(params) = serde_json::from_value::< + ExecOutputDeltaNotification, + >( + notification.params.unwrap_or(Value::Null) + ) { + let _ = events_tx.send(ExecServerEvent::OutputDelta(params)); + } + } + EXEC_EXITED_METHOD => { + if let Ok(params) = + serde_json::from_value::( + notification.params.unwrap_or(Value::Null), + ) + { + let _ = events_tx.send(ExecServerEvent::Exited(params)); + } + } + _ => {} + } + } + } + }); }) - .await + .await; + } +} + +fn local_server_error(error: codex_app_server_protocol::JSONRPCErrorError) -> ExecServerError { + ExecServerError::Server { + code: error.code, + message: error.message, } } From bdfd5abcca4778e3d4244251c99dc942c2806837 Mon Sep 17 00:00:00 2001 From: starr-openai Date: Thu, 19 Mar 2026 14:44:29 -0700 Subject: [PATCH 05/10] Reuse in-process ExecServerClient for local ExecProcess Remove the duplicate direct-handler local process implementation and make the local ExecProcess path delegate to the existing in-process ExecServerClient. Keep lazy initialization only to support sync Environment::default(). Co-authored-by: Codex --- codex-rs/exec-server/src/environment.rs | 115 +++++------------------- 1 file changed, 20 insertions(+), 95 deletions(-) diff --git a/codex-rs/exec-server/src/environment.rs b/codex-rs/exec-server/src/environment.rs index 2336941ee21f..f79084e4115a 100644 --- a/codex-rs/exec-server/src/environment.rs +++ b/codex-rs/exec-server/src/environment.rs @@ -1,58 +1,32 @@ use crate::ExecServerClient; +use crate::ExecServerClientConnectOptions; use crate::ExecServerError; use crate::ExecServerEvent; use crate::ExecProcess; use crate::ExecutorEnvironment; use crate::RemoteExecServerConnectArgs; -use crate::protocol::EXEC_EXITED_METHOD; -use crate::protocol::EXEC_OUTPUT_DELTA_METHOD; -use crate::protocol::ExecExitedNotification; -use crate::protocol::ExecOutputDeltaNotification; use crate::protocol::ExecParams; use crate::protocol::ExecResponse; use crate::protocol::ReadParams; use crate::protocol::ReadResponse; -use crate::protocol::TerminateParams; use crate::protocol::TerminateResponse; -use crate::protocol::WriteParams; use crate::protocol::WriteResponse; -use crate::rpc::RpcNotificationSender; -use crate::rpc::RpcServerOutboundMessage; -use crate::server::ExecServerHandler; use crate::fs; use crate::fs::ExecutorFileSystem; use async_trait::async_trait; -use serde_json::Value; use std::sync::Arc; use tokio::sync::broadcast; -use tokio::sync::mpsc; -use tokio::sync::Mutex; use tokio::sync::OnceCell; #[derive(Clone)] struct LocalExecProcess { - handler: Arc, - events_tx: broadcast::Sender, - outgoing_rx: Arc>>>, - reader_task_started: Arc>, + client: Arc>, } impl Default for LocalExecProcess { fn default() -> Self { - let (outgoing_tx, outgoing_rx) = mpsc::channel(256); - let handler = Arc::new(ExecServerHandler::new(RpcNotificationSender::new(outgoing_tx))); - let (_initialize_response) = handler - .initialize() - .expect("new local exec process should initialize once"); - handler - .initialized() - .expect("new local exec process should accept initialized notification"); - Self { - handler, - events_tx: broadcast::channel(256).0, - outgoing_rx: Arc::new(Mutex::new(Some(outgoing_rx))), - reader_task_started: Arc::new(OnceCell::new()), + client: Arc::new(OnceCell::new()), } } } @@ -97,9 +71,9 @@ impl Environment { if let Some(websocket_url) = experimental_exec_server_url.as_deref() { Some( ExecServerClient::connect_websocket(RemoteExecServerConnectArgs::new( - websocket_url.to_string(), - "codex-core".to_string(), - )) + websocket_url.to_string(), + "codex-core".to_string(), + )) .await?, ) } else { @@ -141,13 +115,11 @@ impl ExecutorEnvironment for Environment { #[async_trait] impl ExecProcess for LocalExecProcess { async fn start(&self, params: ExecParams) -> Result { - self.ensure_reader_task().await; - self.handler.exec(params).await.map_err(local_server_error) + self.client().await?.start(params).await } async fn read(&self, params: ReadParams) -> Result { - self.ensure_reader_task().await; - self.handler.exec_read(params).await.map_err(local_server_error) + self.client().await?.read(params).await } async fn write( @@ -155,80 +127,33 @@ impl ExecProcess for LocalExecProcess { process_id: &str, chunk: Vec, ) -> Result { - self.ensure_reader_task().await; - self.handler - .exec_write(WriteParams { - process_id: process_id.to_string(), - chunk: chunk.into(), - }) - .await - .map_err(local_server_error) + self.client().await?.write(process_id, chunk).await } async fn terminate(&self, process_id: &str) -> Result { - self.ensure_reader_task().await; - self.handler - .terminate(TerminateParams { - process_id: process_id.to_string(), - }) - .await - .map_err(local_server_error) + self.client().await?.terminate(process_id).await } fn subscribe_events(&self) -> broadcast::Receiver { - self.events_tx.subscribe() + if let Some(client) = self.client.get() { + client.event_receiver() + } else { + let (_tx, rx) = broadcast::channel(1); + rx + } } } impl LocalExecProcess { - async fn ensure_reader_task(&self) { - let _ = self - .reader_task_started - .get_or_init(|| async { - let mut outgoing_rx = self.outgoing_rx.lock().await; - let Some(mut outgoing_rx) = outgoing_rx.take() else { - return; - }; - let events_tx = self.events_tx.clone(); - tokio::spawn(async move { - while let Some(message) = outgoing_rx.recv().await { - if let RpcServerOutboundMessage::Notification(notification) = message { - match notification.method.as_str() { - EXEC_OUTPUT_DELTA_METHOD => { - if let Ok(params) = serde_json::from_value::< - ExecOutputDeltaNotification, - >( - notification.params.unwrap_or(Value::Null) - ) { - let _ = events_tx.send(ExecServerEvent::OutputDelta(params)); - } - } - EXEC_EXITED_METHOD => { - if let Ok(params) = - serde_json::from_value::( - notification.params.unwrap_or(Value::Null), - ) - { - let _ = events_tx.send(ExecServerEvent::Exited(params)); - } - } - _ => {} - } - } - } - }); + async fn client(&self) -> Result<&ExecServerClient, ExecServerError> { + self.client + .get_or_try_init(|| async { + ExecServerClient::connect_in_process(ExecServerClientConnectOptions::default()).await }) .await; } } -fn local_server_error(error: codex_app_server_protocol::JSONRPCErrorError) -> ExecServerError { - ExecServerError::Server { - code: error.code, - message: error.message, - } -} - #[cfg(test)] mod tests { use super::Environment; From e60eeec4bf1be56777caf69243fa0fc66cab1e5f Mon Sep 17 00:00:00 2001 From: starr-openai Date: Thu, 19 Mar 2026 14:56:20 -0700 Subject: [PATCH 06/10] Make Environment construction async-only for process setup Remove sync Default from Environment, construct the local process client with connect_in_process() in create(None), and switch the fs-only app server default to a local filesystem constructor. Co-authored-by: Codex --- codex-rs/app-server/src/fs_api.rs | 2 +- codex-rs/exec-server/src/environment.rs | 120 +++++------------------- 2 files changed, 24 insertions(+), 98 deletions(-) diff --git a/codex-rs/app-server/src/fs_api.rs b/codex-rs/app-server/src/fs_api.rs index 9baa2b1dcec7..7ba4b28b9ee3 100644 --- a/codex-rs/app-server/src/fs_api.rs +++ b/codex-rs/app-server/src/fs_api.rs @@ -34,7 +34,7 @@ pub(crate) struct FsApi { impl Default for FsApi { fn default() -> Self { Self { - file_system: Arc::new(Environment::default().get_filesystem()), + file_system: Arc::new(Environment::local_filesystem()), } } } diff --git a/codex-rs/exec-server/src/environment.rs b/codex-rs/exec-server/src/environment.rs index f79084e4115a..68217002131c 100644 --- a/codex-rs/exec-server/src/environment.rs +++ b/codex-rs/exec-server/src/environment.rs @@ -1,51 +1,16 @@ use crate::ExecServerClient; use crate::ExecServerClientConnectOptions; use crate::ExecServerError; -use crate::ExecServerEvent; use crate::ExecProcess; use crate::ExecutorEnvironment; use crate::RemoteExecServerConnectArgs; -use crate::protocol::ExecParams; -use crate::protocol::ExecResponse; -use crate::protocol::ReadParams; -use crate::protocol::ReadResponse; -use crate::protocol::TerminateResponse; -use crate::protocol::WriteResponse; use crate::fs; use crate::fs::ExecutorFileSystem; -use async_trait::async_trait; -use std::sync::Arc; -use tokio::sync::broadcast; -use tokio::sync::OnceCell; - -#[derive(Clone)] -struct LocalExecProcess { - client: Arc>, -} - -impl Default for LocalExecProcess { - fn default() -> Self { - Self { - client: Arc::new(OnceCell::new()), - } - } -} #[derive(Clone)] pub struct Environment { experimental_exec_server_url: Option, - remote_exec_server_client: Option, - local_exec_process: LocalExecProcess, -} - -impl Default for Environment { - fn default() -> Self { - Self { - experimental_exec_server_url: None, - remote_exec_server_client: None, - local_exec_process: LocalExecProcess::default(), - } - } + exec_server_client: ExecServerClient, } impl std::fmt::Debug for Environment { @@ -57,7 +22,7 @@ impl std::fmt::Debug for Environment { ) .field( "has_remote_exec_server_client", - &self.remote_exec_server_client.is_some(), + &self.experimental_exec_server_url.is_some(), ) .finish() } @@ -67,23 +32,20 @@ impl Environment { pub async fn create( experimental_exec_server_url: Option, ) -> Result { - let remote_exec_server_client = - if let Some(websocket_url) = experimental_exec_server_url.as_deref() { - Some( - ExecServerClient::connect_websocket(RemoteExecServerConnectArgs::new( - websocket_url.to_string(), - "codex-core".to_string(), - )) - .await?, - ) - } else { - None - }; + let exec_server_client = if let Some(websocket_url) = experimental_exec_server_url.as_deref() + { + ExecServerClient::connect_websocket(RemoteExecServerConnectArgs::new( + websocket_url.to_string(), + "codex-core".to_string(), + )) + .await? + } else { + ExecServerClient::connect_in_process(ExecServerClientConnectOptions::default()).await? + }; Ok(Self { experimental_exec_server_url, - remote_exec_server_client, - local_exec_process: LocalExecProcess::default(), + exec_server_client, }) } @@ -92,18 +54,24 @@ impl Environment { } pub fn remote_exec_server_client(&self) -> Option<&ExecServerClient> { - self.remote_exec_server_client.as_ref() + if self.experimental_exec_server_url.is_some() { + Some(&self.exec_server_client) + } else { + None + } } pub fn process(&self) -> &(dyn ExecProcess + '_) { - self.remote_exec_server_client - .as_ref() - .map_or(&self.local_exec_process as &dyn ExecProcess, |client| client) + &self.exec_server_client } pub fn get_filesystem(&self) -> impl ExecutorFileSystem + use<> { fs::LocalFileSystem } + + pub fn local_filesystem() -> impl ExecutorFileSystem + use<> { + fs::LocalFileSystem + } } impl ExecutorEnvironment for Environment { @@ -112,48 +80,6 @@ impl ExecutorEnvironment for Environment { } } -#[async_trait] -impl ExecProcess for LocalExecProcess { - async fn start(&self, params: ExecParams) -> Result { - self.client().await?.start(params).await - } - - async fn read(&self, params: ReadParams) -> Result { - self.client().await?.read(params).await - } - - async fn write( - &self, - process_id: &str, - chunk: Vec, - ) -> Result { - self.client().await?.write(process_id, chunk).await - } - - async fn terminate(&self, process_id: &str) -> Result { - self.client().await?.terminate(process_id).await - } - - fn subscribe_events(&self) -> broadcast::Receiver { - if let Some(client) = self.client.get() { - client.event_receiver() - } else { - let (_tx, rx) = broadcast::channel(1); - rx - } - } -} - -impl LocalExecProcess { - async fn client(&self) -> Result<&ExecServerClient, ExecServerError> { - self.client - .get_or_try_init(|| async { - ExecServerClient::connect_in_process(ExecServerClientConnectOptions::default()).await - }) - .await; - } -} - #[cfg(test)] mod tests { use super::Environment; From 048438a44a9e3c1b90ff6594c6dfc1e47cd81366 Mon Sep 17 00:00:00 2001 From: starr-openai Date: Thu, 19 Mar 2026 15:03:44 -0700 Subject: [PATCH 07/10] Rename environment process accessor to get_executor Drop the stale remote_exec_server_client() accessor now that Environment exposes the process capability directly, and rename the capability getter to get_executor() on the environment trait and concrete type. Co-authored-by: Codex --- codex-rs/exec-server/src/client_api.rs | 2 +- codex-rs/exec-server/src/environment.rs | 15 +++------------ 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/codex-rs/exec-server/src/client_api.rs b/codex-rs/exec-server/src/client_api.rs index 852ade566a38..790f0bb66c8e 100644 --- a/codex-rs/exec-server/src/client_api.rs +++ b/codex-rs/exec-server/src/client_api.rs @@ -56,5 +56,5 @@ pub trait ExecProcess: Send + Sync { /// Capability bundle exposed by an execution environment. pub trait ExecutorEnvironment: Send + Sync { - fn process(&self) -> &(dyn ExecProcess + '_); + fn get_executor(&self) -> &(dyn ExecProcess + '_); } diff --git a/codex-rs/exec-server/src/environment.rs b/codex-rs/exec-server/src/environment.rs index 68217002131c..f55cb9909752 100644 --- a/codex-rs/exec-server/src/environment.rs +++ b/codex-rs/exec-server/src/environment.rs @@ -53,15 +53,7 @@ impl Environment { self.experimental_exec_server_url.as_deref() } - pub fn remote_exec_server_client(&self) -> Option<&ExecServerClient> { - if self.experimental_exec_server_url.is_some() { - Some(&self.exec_server_client) - } else { - None - } - } - - pub fn process(&self) -> &(dyn ExecProcess + '_) { + pub fn get_executor(&self) -> &(dyn ExecProcess + '_) { &self.exec_server_client } @@ -75,8 +67,8 @@ impl Environment { } impl ExecutorEnvironment for Environment { - fn process(&self) -> &(dyn ExecProcess + '_) { - self.process() + fn get_executor(&self) -> &(dyn ExecProcess + '_) { + self.get_executor() } } @@ -90,6 +82,5 @@ mod tests { let environment = Environment::create(None).await.expect("create environment"); assert_eq!(environment.experimental_exec_server_url(), None); - assert!(environment.remote_exec_server_client().is_none()); } } From 76978d5a836c422d465aa7203d6086af4ab40a47 Mon Sep 17 00:00:00 2001 From: starr-openai Date: Thu, 19 Mar 2026 15:06:06 -0700 Subject: [PATCH 08/10] Use LocalFileSystem directly in FsApi default Remove the temporary Environment::local_filesystem() helper and export LocalFileSystem so fs-only defaults can construct the local filesystem directly without going through Environment. Co-authored-by: Codex --- codex-rs/app-server/src/fs_api.rs | 4 ++-- codex-rs/exec-server/src/environment.rs | 4 ---- codex-rs/exec-server/src/fs.rs | 2 +- codex-rs/exec-server/src/lib.rs | 1 + 4 files changed, 4 insertions(+), 7 deletions(-) diff --git a/codex-rs/app-server/src/fs_api.rs b/codex-rs/app-server/src/fs_api.rs index 7ba4b28b9ee3..3ce485aaa0b8 100644 --- a/codex-rs/app-server/src/fs_api.rs +++ b/codex-rs/app-server/src/fs_api.rs @@ -20,8 +20,8 @@ use codex_app_server_protocol::FsWriteFileResponse; use codex_app_server_protocol::JSONRPCErrorError; use codex_exec_server::CopyOptions; use codex_exec_server::CreateDirectoryOptions; -use codex_exec_server::Environment; use codex_exec_server::ExecutorFileSystem; +use codex_exec_server::LocalFileSystem; use codex_exec_server::RemoveOptions; use std::io; use std::sync::Arc; @@ -34,7 +34,7 @@ pub(crate) struct FsApi { impl Default for FsApi { fn default() -> Self { Self { - file_system: Arc::new(Environment::local_filesystem()), + file_system: Arc::new(LocalFileSystem), } } } diff --git a/codex-rs/exec-server/src/environment.rs b/codex-rs/exec-server/src/environment.rs index f55cb9909752..ef5afd7a7dea 100644 --- a/codex-rs/exec-server/src/environment.rs +++ b/codex-rs/exec-server/src/environment.rs @@ -60,10 +60,6 @@ impl Environment { pub fn get_filesystem(&self) -> impl ExecutorFileSystem + use<> { fs::LocalFileSystem } - - pub fn local_filesystem() -> impl ExecutorFileSystem + use<> { - fs::LocalFileSystem - } } impl ExecutorEnvironment for Environment { diff --git a/codex-rs/exec-server/src/fs.rs b/codex-rs/exec-server/src/fs.rs index 82e0b8e6e6bc..c5999c1489f8 100644 --- a/codex-rs/exec-server/src/fs.rs +++ b/codex-rs/exec-server/src/fs.rs @@ -72,7 +72,7 @@ pub trait ExecutorFileSystem: Send + Sync { } #[derive(Clone, Default)] -pub(crate) struct LocalFileSystem; +pub struct LocalFileSystem; #[async_trait] impl ExecutorFileSystem for LocalFileSystem { diff --git a/codex-rs/exec-server/src/lib.rs b/codex-rs/exec-server/src/lib.rs index 19f0642ad061..59ace1a628da 100644 --- a/codex-rs/exec-server/src/lib.rs +++ b/codex-rs/exec-server/src/lib.rs @@ -35,6 +35,7 @@ pub use fs::CreateDirectoryOptions; pub use fs::ExecutorFileSystem; pub use fs::FileMetadata; pub use fs::FileSystemResult; +pub use fs::LocalFileSystem; pub use fs::ReadDirectoryEntry; pub use fs::RemoveOptions; pub use protocol::ExecExitedNotification; From 3001e1e7b456f602168443896669913e056b13ba Mon Sep 17 00:00:00 2001 From: starr-openai Date: Thu, 19 Mar 2026 15:21:27 -0700 Subject: [PATCH 09/10] Fix exec-server local filesystem compile path Apply the pending formatting reorderings and switch the server-side filesystem default to LocalFileSystem so the current PR compiles again. Co-authored-by: Codex --- codex-rs/exec-server/src/client.rs | 2 +- codex-rs/exec-server/src/environment.rs | 5 +++-- codex-rs/exec-server/src/lib.rs | 2 +- codex-rs/exec-server/src/server/filesystem.rs | 4 ++-- 4 files changed, 7 insertions(+), 6 deletions(-) diff --git a/codex-rs/exec-server/src/client.rs b/codex-rs/exec-server/src/client.rs index 6a5bbbf7a246..245ba8d914f1 100644 --- a/codex-rs/exec-server/src/client.rs +++ b/codex-rs/exec-server/src/client.rs @@ -25,9 +25,9 @@ use tokio_tungstenite::connect_async; use tracing::debug; use tracing::warn; +use crate::client_api::ExecProcess; use crate::client_api::ExecServerClientConnectOptions; use crate::client_api::ExecServerEvent; -use crate::client_api::ExecProcess; use crate::client_api::RemoteExecServerConnectArgs; use crate::connection::JsonRpcConnection; use crate::protocol::EXEC_EXITED_METHOD; diff --git a/codex-rs/exec-server/src/environment.rs b/codex-rs/exec-server/src/environment.rs index ef5afd7a7dea..5b95c0459e7d 100644 --- a/codex-rs/exec-server/src/environment.rs +++ b/codex-rs/exec-server/src/environment.rs @@ -1,7 +1,7 @@ +use crate::ExecProcess; use crate::ExecServerClient; use crate::ExecServerClientConnectOptions; use crate::ExecServerError; -use crate::ExecProcess; use crate::ExecutorEnvironment; use crate::RemoteExecServerConnectArgs; use crate::fs; @@ -32,7 +32,8 @@ impl Environment { pub async fn create( experimental_exec_server_url: Option, ) -> Result { - let exec_server_client = if let Some(websocket_url) = experimental_exec_server_url.as_deref() + let exec_server_client = if let Some(websocket_url) = + experimental_exec_server_url.as_deref() { ExecServerClient::connect_websocket(RemoteExecServerConnectArgs::new( websocket_url.to_string(), diff --git a/codex-rs/exec-server/src/lib.rs b/codex-rs/exec-server/src/lib.rs index 59ace1a628da..31091fb26a6a 100644 --- a/codex-rs/exec-server/src/lib.rs +++ b/codex-rs/exec-server/src/lib.rs @@ -9,9 +9,9 @@ mod server; pub use client::ExecServerClient; pub use client::ExecServerError; +pub use client_api::ExecProcess; pub use client_api::ExecServerClientConnectOptions; pub use client_api::ExecServerEvent; -pub use client_api::ExecProcess; pub use client_api::ExecutorEnvironment; pub use client_api::RemoteExecServerConnectArgs; pub use codex_app_server_protocol::FsCopyParams; diff --git a/codex-rs/exec-server/src/server/filesystem.rs b/codex-rs/exec-server/src/server/filesystem.rs index bc3d22a4da3b..cfd28be7135b 100644 --- a/codex-rs/exec-server/src/server/filesystem.rs +++ b/codex-rs/exec-server/src/server/filesystem.rs @@ -22,8 +22,8 @@ use codex_app_server_protocol::JSONRPCErrorError; use crate::CopyOptions; use crate::CreateDirectoryOptions; -use crate::Environment; use crate::ExecutorFileSystem; +use crate::LocalFileSystem; use crate::RemoveOptions; use crate::rpc::internal_error; use crate::rpc::invalid_request; @@ -36,7 +36,7 @@ pub(crate) struct ExecServerFileSystem { impl Default for ExecServerFileSystem { fn default() -> Self { Self { - file_system: Arc::new(Environment.get_filesystem()), + file_system: Arc::new(LocalFileSystem), } } } From 90df94ebba21bffe1aeef700bc674e8375d7804e Mon Sep 17 00:00:00 2001 From: starr-openai Date: Thu, 19 Mar 2026 15:26:52 -0700 Subject: [PATCH 10/10] Draft alternate exec process layout like PR 15232 Reshape the exec process seam to mirror the filesystem split style with separate process, local_process, remote_process, and server/process_handler modules, and make Environment own an Arc. Co-authored-by: Codex --- codex-rs/exec-server/src/client.rs | 31 +- codex-rs/exec-server/src/client_api.rs | 43 -- codex-rs/exec-server/src/environment.rs | 40 +- codex-rs/exec-server/src/lib.rs | 11 +- codex-rs/exec-server/src/local_process.rs | 146 +++++++ codex-rs/exec-server/src/process.rs | 35 ++ codex-rs/exec-server/src/remote_process.rs | 51 +++ codex-rs/exec-server/src/server.rs | 2 + codex-rs/exec-server/src/server/handler.rs | 385 +---------------- .../exec-server/src/server/process_handler.rs | 400 ++++++++++++++++++ 10 files changed, 684 insertions(+), 460 deletions(-) create mode 100644 codex-rs/exec-server/src/local_process.rs create mode 100644 codex-rs/exec-server/src/process.rs create mode 100644 codex-rs/exec-server/src/remote_process.rs create mode 100644 codex-rs/exec-server/src/server/process_handler.rs diff --git a/codex-rs/exec-server/src/client.rs b/codex-rs/exec-server/src/client.rs index 245ba8d914f1..7b6a2810ebda 100644 --- a/codex-rs/exec-server/src/client.rs +++ b/codex-rs/exec-server/src/client.rs @@ -1,7 +1,6 @@ use std::sync::Arc; use std::time::Duration; -use async_trait::async_trait; use codex_app_server_protocol::FsCopyParams; use codex_app_server_protocol::FsCopyResponse; use codex_app_server_protocol::FsCreateDirectoryParams; @@ -25,11 +24,10 @@ use tokio_tungstenite::connect_async; use tracing::debug; use tracing::warn; -use crate::client_api::ExecProcess; use crate::client_api::ExecServerClientConnectOptions; -use crate::client_api::ExecServerEvent; use crate::client_api::RemoteExecServerConnectArgs; use crate::connection::JsonRpcConnection; +use crate::process::ExecServerEvent; use crate::protocol::EXEC_EXITED_METHOD; use crate::protocol::EXEC_METHOD; use crate::protocol::EXEC_OUTPUT_DELTA_METHOD; @@ -506,33 +504,6 @@ impl ExecServerClient { } } -#[async_trait] -impl ExecProcess for ExecServerClient { - async fn start(&self, params: ExecParams) -> Result { - self.exec(params).await - } - - async fn read(&self, params: ReadParams) -> Result { - self.read(params).await - } - - async fn write( - &self, - process_id: &str, - chunk: Vec, - ) -> Result { - self.write(process_id, chunk).await - } - - async fn terminate(&self, process_id: &str) -> Result { - self.terminate(process_id).await - } - - fn subscribe_events(&self) -> broadcast::Receiver { - self.event_receiver() - } -} - impl From for ExecServerError { fn from(value: RpcCallError) -> Self { match value { diff --git a/codex-rs/exec-server/src/client_api.rs b/codex-rs/exec-server/src/client_api.rs index 790f0bb66c8e..6e89763416f3 100644 --- a/codex-rs/exec-server/src/client_api.rs +++ b/codex-rs/exec-server/src/client_api.rs @@ -1,18 +1,5 @@ use std::time::Duration; -use async_trait::async_trait; -use tokio::sync::broadcast; - -use crate::client::ExecServerError; -use crate::protocol::ExecExitedNotification; -use crate::protocol::ExecOutputDeltaNotification; -use crate::protocol::ExecParams; -use crate::protocol::ExecResponse; -use crate::protocol::ReadParams; -use crate::protocol::ReadResponse; -use crate::protocol::TerminateResponse; -use crate::protocol::WriteResponse; - /// Connection options for any exec-server client transport. #[derive(Debug, Clone, PartialEq, Eq)] pub struct ExecServerClientConnectOptions { @@ -28,33 +15,3 @@ pub struct RemoteExecServerConnectArgs { pub connect_timeout: Duration, pub initialize_timeout: Duration, } - -/// Connection-level server events. -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum ExecServerEvent { - OutputDelta(ExecOutputDeltaNotification), - Exited(ExecExitedNotification), -} - -/// Process lifecycle capability for an execution environment. -#[async_trait] -pub trait ExecProcess: Send + Sync { - async fn start(&self, params: ExecParams) -> Result; - - async fn read(&self, params: ReadParams) -> Result; - - async fn write( - &self, - process_id: &str, - chunk: Vec, - ) -> Result; - - async fn terminate(&self, process_id: &str) -> Result; - - fn subscribe_events(&self) -> broadcast::Receiver; -} - -/// Capability bundle exposed by an execution environment. -pub trait ExecutorEnvironment: Send + Sync { - fn get_executor(&self) -> &(dyn ExecProcess + '_); -} diff --git a/codex-rs/exec-server/src/environment.rs b/codex-rs/exec-server/src/environment.rs index 5b95c0459e7d..641d4ed2622a 100644 --- a/codex-rs/exec-server/src/environment.rs +++ b/codex-rs/exec-server/src/environment.rs @@ -1,16 +1,18 @@ -use crate::ExecProcess; use crate::ExecServerClient; use crate::ExecServerClientConnectOptions; use crate::ExecServerError; -use crate::ExecutorEnvironment; use crate::RemoteExecServerConnectArgs; use crate::fs; use crate::fs::ExecutorFileSystem; +use crate::local_process::LocalExecProcess; +use crate::process::ExecProcess; +use crate::remote_process::RemoteExecProcess; +use std::sync::Arc; #[derive(Clone)] pub struct Environment { experimental_exec_server_url: Option, - exec_server_client: ExecServerClient, + executor: Arc, } impl std::fmt::Debug for Environment { @@ -32,21 +34,21 @@ impl Environment { pub async fn create( experimental_exec_server_url: Option, ) -> Result { - let exec_server_client = if let Some(websocket_url) = - experimental_exec_server_url.as_deref() - { - ExecServerClient::connect_websocket(RemoteExecServerConnectArgs::new( - websocket_url.to_string(), - "codex-core".to_string(), - )) - .await? - } else { - ExecServerClient::connect_in_process(ExecServerClientConnectOptions::default()).await? - }; + let executor: Arc = + if let Some(websocket_url) = experimental_exec_server_url.as_deref() { + let client = ExecServerClient::connect_websocket(RemoteExecServerConnectArgs::new( + websocket_url.to_string(), + "codex-core".to_string(), + )) + .await?; + Arc::new(RemoteExecProcess::new(client)) + } else { + Arc::new(LocalExecProcess::new()) + }; Ok(Self { experimental_exec_server_url, - exec_server_client, + executor, }) } @@ -54,8 +56,8 @@ impl Environment { self.experimental_exec_server_url.as_deref() } - pub fn get_executor(&self) -> &(dyn ExecProcess + '_) { - &self.exec_server_client + pub fn get_executor(&self) -> Arc { + Arc::clone(&self.executor) } pub fn get_filesystem(&self) -> impl ExecutorFileSystem + use<> { @@ -63,8 +65,8 @@ impl Environment { } } -impl ExecutorEnvironment for Environment { - fn get_executor(&self) -> &(dyn ExecProcess + '_) { +impl crate::ExecutorEnvironment for Environment { + fn get_executor(&self) -> Arc { self.get_executor() } } diff --git a/codex-rs/exec-server/src/lib.rs b/codex-rs/exec-server/src/lib.rs index 31091fb26a6a..fa20ffd18e28 100644 --- a/codex-rs/exec-server/src/lib.rs +++ b/codex-rs/exec-server/src/lib.rs @@ -3,16 +3,16 @@ mod client_api; mod connection; mod environment; mod fs; +mod local_process; +mod process; mod protocol; +mod remote_process; mod rpc; mod server; pub use client::ExecServerClient; pub use client::ExecServerError; -pub use client_api::ExecProcess; pub use client_api::ExecServerClientConnectOptions; -pub use client_api::ExecServerEvent; -pub use client_api::ExecutorEnvironment; pub use client_api::RemoteExecServerConnectArgs; pub use codex_app_server_protocol::FsCopyParams; pub use codex_app_server_protocol::FsCopyResponse; @@ -38,6 +38,8 @@ pub use fs::FileSystemResult; pub use fs::LocalFileSystem; pub use fs::ReadDirectoryEntry; pub use fs::RemoveOptions; +pub use process::ExecProcess; +pub use process::ExecServerEvent; pub use protocol::ExecExitedNotification; pub use protocol::ExecOutputDeltaNotification; pub use protocol::ExecOutputStream; @@ -53,5 +55,8 @@ pub use protocol::WriteParams; pub use protocol::WriteResponse; pub use server::DEFAULT_LISTEN_URL; pub use server::ExecServerListenUrlParseError; +pub trait ExecutorEnvironment: Send + Sync { + fn get_executor(&self) -> std::sync::Arc; +} pub use server::run_main; pub use server::run_main_with_listen_url; diff --git a/codex-rs/exec-server/src/local_process.rs b/codex-rs/exec-server/src/local_process.rs new file mode 100644 index 000000000000..d78725af5e98 --- /dev/null +++ b/codex-rs/exec-server/src/local_process.rs @@ -0,0 +1,146 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use codex_app_server_protocol::JSONRPCErrorError; +use serde_json::Value; +use tokio::sync::broadcast; +use tokio::sync::mpsc; + +use crate::ExecProcess; +use crate::ExecServerError; +use crate::ExecServerEvent; +use crate::process::ExecServerEvent::Exited; +use crate::process::ExecServerEvent::OutputDelta; +use crate::protocol::EXEC_EXITED_METHOD; +use crate::protocol::EXEC_OUTPUT_DELTA_METHOD; +use crate::protocol::ExecExitedNotification; +use crate::protocol::ExecOutputDeltaNotification; +use crate::protocol::ExecParams; +use crate::protocol::ExecResponse; +use crate::protocol::ReadParams; +use crate::protocol::ReadResponse; +use crate::protocol::TerminateParams; +use crate::protocol::TerminateResponse; +use crate::protocol::WriteParams; +use crate::protocol::WriteResponse; +use crate::rpc::RpcNotificationSender; +use crate::rpc::RpcServerOutboundMessage; +use crate::server::ProcessHandler; + +#[derive(Clone)] +pub(crate) struct LocalExecProcess { + inner: Arc, +} + +struct Inner { + process_handler: ProcessHandler, + events_tx: broadcast::Sender, + reader_task: tokio::task::JoinHandle<()>, +} + +impl Drop for Inner { + fn drop(&mut self) { + if let Ok(handle) = tokio::runtime::Handle::try_current() { + let process_handler = self.process_handler.clone(); + handle.spawn(async move { + process_handler.shutdown().await; + }); + } + self.reader_task.abort(); + } +} + +impl LocalExecProcess { + pub(crate) fn new() -> Self { + let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(256); + let process_handler = ProcessHandler::new(RpcNotificationSender::new(outgoing_tx)); + let events_tx = broadcast::channel(256).0; + let events_tx_for_task = events_tx.clone(); + let reader_task = tokio::spawn(async move { + while let Some(message) = outgoing_rx.recv().await { + if let RpcServerOutboundMessage::Notification(notification) = message { + match notification.method.as_str() { + EXEC_OUTPUT_DELTA_METHOD => { + if let Ok(params) = serde_json::from_value::( + notification.params.unwrap_or(Value::Null), + ) { + let _ = events_tx_for_task.send(OutputDelta(params)); + } + } + EXEC_EXITED_METHOD => { + if let Ok(params) = serde_json::from_value::( + notification.params.unwrap_or(Value::Null), + ) { + let _ = events_tx_for_task.send(Exited(params)); + } + } + _ => {} + } + } + } + }); + + Self { + inner: Arc::new(Inner { + process_handler, + events_tx, + reader_task, + }), + } + } +} + +#[async_trait] +impl ExecProcess for LocalExecProcess { + async fn start(&self, params: ExecParams) -> Result { + self.inner + .process_handler + .exec(params) + .await + .map_err(map_local_error) + } + + async fn read(&self, params: ReadParams) -> Result { + self.inner + .process_handler + .exec_read(params) + .await + .map_err(map_local_error) + } + + async fn write( + &self, + process_id: &str, + chunk: Vec, + ) -> Result { + self.inner + .process_handler + .exec_write(WriteParams { + process_id: process_id.to_string(), + chunk: chunk.into(), + }) + .await + .map_err(map_local_error) + } + + async fn terminate(&self, process_id: &str) -> Result { + self.inner + .process_handler + .terminate(TerminateParams { + process_id: process_id.to_string(), + }) + .await + .map_err(map_local_error) + } + + fn subscribe_events(&self) -> broadcast::Receiver { + self.inner.events_tx.subscribe() + } +} + +fn map_local_error(error: JSONRPCErrorError) -> ExecServerError { + ExecServerError::Server { + code: error.code, + message: error.message, + } +} diff --git a/codex-rs/exec-server/src/process.rs b/codex-rs/exec-server/src/process.rs new file mode 100644 index 000000000000..b2d743c329c7 --- /dev/null +++ b/codex-rs/exec-server/src/process.rs @@ -0,0 +1,35 @@ +use async_trait::async_trait; +use tokio::sync::broadcast; + +use crate::ExecServerError; +use crate::protocol::ExecExitedNotification; +use crate::protocol::ExecOutputDeltaNotification; +use crate::protocol::ExecParams; +use crate::protocol::ExecResponse; +use crate::protocol::ReadParams; +use crate::protocol::ReadResponse; +use crate::protocol::TerminateResponse; +use crate::protocol::WriteResponse; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ExecServerEvent { + OutputDelta(ExecOutputDeltaNotification), + Exited(ExecExitedNotification), +} + +#[async_trait] +pub trait ExecProcess: Send + Sync { + async fn start(&self, params: ExecParams) -> Result; + + async fn read(&self, params: ReadParams) -> Result; + + async fn write( + &self, + process_id: &str, + chunk: Vec, + ) -> Result; + + async fn terminate(&self, process_id: &str) -> Result; + + fn subscribe_events(&self) -> broadcast::Receiver; +} diff --git a/codex-rs/exec-server/src/remote_process.rs b/codex-rs/exec-server/src/remote_process.rs new file mode 100644 index 000000000000..2e6829923452 --- /dev/null +++ b/codex-rs/exec-server/src/remote_process.rs @@ -0,0 +1,51 @@ +use async_trait::async_trait; +use tokio::sync::broadcast; + +use crate::ExecProcess; +use crate::ExecServerClient; +use crate::ExecServerError; +use crate::ExecServerEvent; +use crate::protocol::ExecParams; +use crate::protocol::ExecResponse; +use crate::protocol::ReadParams; +use crate::protocol::ReadResponse; +use crate::protocol::TerminateResponse; +use crate::protocol::WriteResponse; + +#[derive(Clone)] +pub(crate) struct RemoteExecProcess { + client: ExecServerClient, +} + +impl RemoteExecProcess { + pub(crate) fn new(client: ExecServerClient) -> Self { + Self { client } + } +} + +#[async_trait] +impl ExecProcess for RemoteExecProcess { + async fn start(&self, params: ExecParams) -> Result { + self.client.exec(params).await + } + + async fn read(&self, params: ReadParams) -> Result { + self.client.read(params).await + } + + async fn write( + &self, + process_id: &str, + chunk: Vec, + ) -> Result { + self.client.write(process_id, chunk).await + } + + async fn terminate(&self, process_id: &str) -> Result { + self.client.terminate(process_id).await + } + + fn subscribe_events(&self) -> broadcast::Receiver { + self.client.event_receiver() + } +} diff --git a/codex-rs/exec-server/src/server.rs b/codex-rs/exec-server/src/server.rs index c403b029d702..368407ec8c75 100644 --- a/codex-rs/exec-server/src/server.rs +++ b/codex-rs/exec-server/src/server.rs @@ -1,10 +1,12 @@ mod filesystem; mod handler; +mod process_handler; mod processor; mod registry; mod transport; pub(crate) use handler::ExecServerHandler; +pub(crate) use process_handler::ProcessHandler; pub use transport::DEFAULT_LISTEN_URL; pub use transport::ExecServerListenUrlParseError; diff --git a/codex-rs/exec-server/src/server/handler.rs b/codex-rs/exec-server/src/server/handler.rs index c21aeecb5c2e..ace15994829e 100644 --- a/codex-rs/exec-server/src/server/handler.rs +++ b/codex-rs/exec-server/src/server/handler.rs @@ -1,10 +1,19 @@ -use std::collections::HashMap; -use std::collections::VecDeque; -use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; -use std::time::Duration; +use crate::protocol::ExecParams; +use crate::protocol::ExecResponse; +use crate::protocol::InitializeResponse; +use crate::protocol::ReadParams; +use crate::protocol::ReadResponse; +use crate::protocol::TerminateParams; +use crate::protocol::TerminateResponse; +use crate::protocol::WriteParams; +use crate::protocol::WriteResponse; +use crate::rpc::RpcNotificationSender; +use crate::rpc::invalid_request; +use crate::server::filesystem::ExecServerFileSystem; +use crate::server::process_handler::ProcessHandler; use codex_app_server_protocol::FsCopyParams; use codex_app_server_protocol::FsCopyResponse; use codex_app_server_protocol::FsCreateDirectoryParams; @@ -20,63 +29,10 @@ use codex_app_server_protocol::FsRemoveResponse; use codex_app_server_protocol::FsWriteFileParams; use codex_app_server_protocol::FsWriteFileResponse; use codex_app_server_protocol::JSONRPCErrorError; -use codex_utils_pty::ExecCommandSession; -use codex_utils_pty::TerminalSize; -use tokio::sync::Mutex; -use tokio::sync::Notify; -use tracing::warn; - -use crate::protocol::ExecExitedNotification; -use crate::protocol::ExecOutputDeltaNotification; -use crate::protocol::ExecOutputStream; -use crate::protocol::ExecParams; -use crate::protocol::ExecResponse; -use crate::protocol::InitializeResponse; -use crate::protocol::ProcessOutputChunk; -use crate::protocol::ReadParams; -use crate::protocol::ReadResponse; -use crate::protocol::TerminateParams; -use crate::protocol::TerminateResponse; -use crate::protocol::WriteParams; -use crate::protocol::WriteResponse; -use crate::rpc::RpcNotificationSender; -use crate::rpc::internal_error; -use crate::rpc::invalid_params; -use crate::rpc::invalid_request; -use crate::server::filesystem::ExecServerFileSystem; - -const RETAINED_OUTPUT_BYTES_PER_PROCESS: usize = 1024 * 1024; -#[cfg(test)] -const EXITED_PROCESS_RETENTION: Duration = Duration::from_millis(25); -#[cfg(not(test))] -const EXITED_PROCESS_RETENTION: Duration = Duration::from_secs(30); - -#[derive(Clone)] -struct RetainedOutputChunk { - seq: u64, - stream: ExecOutputStream, - chunk: Vec, -} - -struct RunningProcess { - session: ExecCommandSession, - tty: bool, - output: VecDeque, - retained_bytes: usize, - next_seq: u64, - exit_code: Option, - output_notify: Arc, -} - -enum ProcessEntry { - Starting, - Running(Box), -} pub(crate) struct ExecServerHandler { - notifications: RpcNotificationSender, file_system: ExecServerFileSystem, - processes: Arc>>, + process_handler: ProcessHandler, initialize_requested: AtomicBool, initialized: AtomicBool, } @@ -84,28 +40,15 @@ pub(crate) struct ExecServerHandler { impl ExecServerHandler { pub(crate) fn new(notifications: RpcNotificationSender) -> Self { Self { - notifications, file_system: ExecServerFileSystem::default(), - processes: Arc::new(Mutex::new(HashMap::new())), + process_handler: ProcessHandler::new(notifications), initialize_requested: AtomicBool::new(false), initialized: AtomicBool::new(false), } } pub(crate) async fn shutdown(&self) { - let remaining = { - let mut processes = self.processes.lock().await; - processes - .drain() - .filter_map(|(_, process)| match process { - ProcessEntry::Starting => None, - ProcessEntry::Running(process) => Some(process), - }) - .collect::>() - }; - for process in remaining { - process.session.terminate(); - } + self.process_handler.shutdown().await; } pub(crate) fn initialize(&self) -> Result { @@ -141,104 +84,7 @@ impl ExecServerHandler { pub(crate) async fn exec(&self, params: ExecParams) -> Result { self.require_initialized_for("exec")?; - let process_id = params.process_id.clone(); - - let (program, args) = params - .argv - .split_first() - .ok_or_else(|| invalid_params("argv must not be empty".to_string()))?; - - { - let mut process_map = self.processes.lock().await; - if process_map.contains_key(&process_id) { - return Err(invalid_request(format!( - "process {process_id} already exists" - ))); - } - process_map.insert(process_id.clone(), ProcessEntry::Starting); - } - - let spawned_result = if params.tty { - codex_utils_pty::spawn_pty_process( - program, - args, - params.cwd.as_path(), - ¶ms.env, - ¶ms.arg0, - TerminalSize::default(), - ) - .await - } else { - codex_utils_pty::spawn_pipe_process_no_stdin( - program, - args, - params.cwd.as_path(), - ¶ms.env, - ¶ms.arg0, - ) - .await - }; - let spawned = match spawned_result { - Ok(spawned) => spawned, - Err(err) => { - let mut process_map = self.processes.lock().await; - if matches!(process_map.get(&process_id), Some(ProcessEntry::Starting)) { - process_map.remove(&process_id); - } - return Err(internal_error(err.to_string())); - } - }; - - let output_notify = Arc::new(Notify::new()); - { - let mut process_map = self.processes.lock().await; - process_map.insert( - process_id.clone(), - ProcessEntry::Running(Box::new(RunningProcess { - session: spawned.session, - tty: params.tty, - output: VecDeque::new(), - retained_bytes: 0, - next_seq: 1, - exit_code: None, - output_notify: Arc::clone(&output_notify), - })), - ); - } - - tokio::spawn(stream_output( - process_id.clone(), - if params.tty { - ExecOutputStream::Pty - } else { - ExecOutputStream::Stdout - }, - spawned.stdout_rx, - self.notifications.clone(), - Arc::clone(&self.processes), - Arc::clone(&output_notify), - )); - tokio::spawn(stream_output( - process_id.clone(), - if params.tty { - ExecOutputStream::Pty - } else { - ExecOutputStream::Stderr - }, - spawned.stderr_rx, - self.notifications.clone(), - Arc::clone(&self.processes), - Arc::clone(&output_notify), - )); - tokio::spawn(watch_exit( - process_id.clone(), - spawned.exit_rx, - self.notifications.clone(), - Arc::clone(&self.processes), - output_notify, - )); - - Ok(ExecResponse { process_id }) + self.process_handler.exec(params).await } pub(crate) async fn exec_read( @@ -246,68 +92,7 @@ impl ExecServerHandler { params: ReadParams, ) -> Result { self.require_initialized_for("exec")?; - let after_seq = params.after_seq.unwrap_or(0); - let max_bytes = params.max_bytes.unwrap_or(usize::MAX); - let wait = Duration::from_millis(params.wait_ms.unwrap_or(0)); - let deadline = tokio::time::Instant::now() + wait; - - loop { - let (response, output_notify) = { - let process_map = self.processes.lock().await; - let process = process_map.get(¶ms.process_id).ok_or_else(|| { - invalid_request(format!("unknown process id {}", params.process_id)) - })?; - let ProcessEntry::Running(process) = process else { - return Err(invalid_request(format!( - "process id {} is starting", - params.process_id - ))); - }; - - let mut chunks = Vec::new(); - let mut total_bytes = 0; - let mut next_seq = process.next_seq; - for retained in process.output.iter().filter(|chunk| chunk.seq > after_seq) { - let chunk_len = retained.chunk.len(); - if !chunks.is_empty() && total_bytes + chunk_len > max_bytes { - break; - } - total_bytes += chunk_len; - chunks.push(ProcessOutputChunk { - seq: retained.seq, - stream: retained.stream, - chunk: retained.chunk.clone().into(), - }); - next_seq = retained.seq + 1; - if total_bytes >= max_bytes { - break; - } - } - - ( - ReadResponse { - chunks, - next_seq, - exited: process.exit_code.is_some(), - exit_code: process.exit_code, - }, - Arc::clone(&process.output_notify), - ) - }; - - if !response.chunks.is_empty() - || response.exited - || tokio::time::Instant::now() >= deadline - { - return Ok(response); - } - - let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); - if remaining.is_zero() { - return Ok(response); - } - let _ = tokio::time::timeout(remaining, output_notify.notified()).await; - } + self.process_handler.exec_read(params).await } pub(crate) async fn exec_write( @@ -315,32 +100,7 @@ impl ExecServerHandler { params: WriteParams, ) -> Result { self.require_initialized_for("exec")?; - let writer_tx = { - let process_map = self.processes.lock().await; - let process = process_map.get(¶ms.process_id).ok_or_else(|| { - invalid_request(format!("unknown process id {}", params.process_id)) - })?; - let ProcessEntry::Running(process) = process else { - return Err(invalid_request(format!( - "process id {} is starting", - params.process_id - ))); - }; - if !process.tty { - return Err(invalid_request(format!( - "stdin is closed for process {}", - params.process_id - ))); - } - process.session.writer_sender() - }; - - writer_tx - .send(params.chunk.into_inner()) - .await - .map_err(|_| internal_error("failed to write to process stdin".to_string()))?; - - Ok(WriteResponse { accepted: true }) + self.process_handler.exec_write(params).await } pub(crate) async fn terminate( @@ -348,21 +108,7 @@ impl ExecServerHandler { params: TerminateParams, ) -> Result { self.require_initialized_for("exec")?; - let running = { - let process_map = self.processes.lock().await; - match process_map.get(¶ms.process_id) { - Some(ProcessEntry::Running(process)) => { - if process.exit_code.is_some() { - return Ok(TerminateResponse { running: false }); - } - process.session.terminate(); - true - } - Some(ProcessEntry::Starting) | None => false, - } - }; - - Ok(TerminateResponse { running }) + self.process_handler.terminate(params).await } pub(crate) async fn fs_read_file( @@ -422,96 +168,5 @@ impl ExecServerHandler { } } -async fn stream_output( - process_id: String, - stream: ExecOutputStream, - mut receiver: tokio::sync::mpsc::Receiver>, - notifications: RpcNotificationSender, - processes: Arc>>, - output_notify: Arc, -) { - while let Some(chunk) = receiver.recv().await { - let notification = { - let mut processes = processes.lock().await; - let Some(entry) = processes.get_mut(&process_id) else { - break; - }; - let ProcessEntry::Running(process) = entry else { - break; - }; - let seq = process.next_seq; - process.next_seq += 1; - process.retained_bytes += chunk.len(); - process.output.push_back(RetainedOutputChunk { - seq, - stream, - chunk: chunk.clone(), - }); - while process.retained_bytes > RETAINED_OUTPUT_BYTES_PER_PROCESS { - let Some(evicted) = process.output.pop_front() else { - break; - }; - process.retained_bytes = process.retained_bytes.saturating_sub(evicted.chunk.len()); - warn!( - "retained output cap exceeded for process {process_id}; dropping oldest output" - ); - } - ExecOutputDeltaNotification { - process_id: process_id.clone(), - stream, - chunk: chunk.into(), - } - }; - output_notify.notify_waiters(); - - if notifications - .notify(crate::protocol::EXEC_OUTPUT_DELTA_METHOD, ¬ification) - .await - .is_err() - { - break; - } - } -} - -async fn watch_exit( - process_id: String, - exit_rx: tokio::sync::oneshot::Receiver, - notifications: RpcNotificationSender, - processes: Arc>>, - output_notify: Arc, -) { - let exit_code = exit_rx.await.unwrap_or(-1); - { - let mut processes = processes.lock().await; - if let Some(ProcessEntry::Running(process)) = processes.get_mut(&process_id) { - process.exit_code = Some(exit_code); - } - } - output_notify.notify_waiters(); - if notifications - .notify( - crate::protocol::EXEC_EXITED_METHOD, - &ExecExitedNotification { - process_id: process_id.clone(), - exit_code, - }, - ) - .await - .is_err() - { - return; - } - - tokio::time::sleep(EXITED_PROCESS_RETENTION).await; - let mut processes = processes.lock().await; - if matches!( - processes.get(&process_id), - Some(ProcessEntry::Running(process)) if process.exit_code == Some(exit_code) - ) { - processes.remove(&process_id); - } -} - #[cfg(test)] mod tests; diff --git a/codex-rs/exec-server/src/server/process_handler.rs b/codex-rs/exec-server/src/server/process_handler.rs new file mode 100644 index 000000000000..eb43093600d5 --- /dev/null +++ b/codex-rs/exec-server/src/server/process_handler.rs @@ -0,0 +1,400 @@ +use std::collections::HashMap; +use std::collections::VecDeque; +use std::sync::Arc; +use std::time::Duration; + +use codex_app_server_protocol::JSONRPCErrorError; +use codex_utils_pty::ExecCommandSession; +use codex_utils_pty::TerminalSize; +use tokio::sync::Mutex; +use tokio::sync::Notify; +use tracing::warn; + +use crate::protocol::ExecExitedNotification; +use crate::protocol::ExecOutputDeltaNotification; +use crate::protocol::ExecOutputStream; +use crate::protocol::ExecParams; +use crate::protocol::ExecResponse; +use crate::protocol::ProcessOutputChunk; +use crate::protocol::ReadParams; +use crate::protocol::ReadResponse; +use crate::protocol::TerminateParams; +use crate::protocol::TerminateResponse; +use crate::protocol::WriteParams; +use crate::protocol::WriteResponse; +use crate::rpc::RpcNotificationSender; +use crate::rpc::internal_error; +use crate::rpc::invalid_params; +use crate::rpc::invalid_request; + +const RETAINED_OUTPUT_BYTES_PER_PROCESS: usize = 1024 * 1024; +#[cfg(test)] +const EXITED_PROCESS_RETENTION: Duration = Duration::from_millis(25); +#[cfg(not(test))] +const EXITED_PROCESS_RETENTION: Duration = Duration::from_secs(30); + +#[derive(Clone)] +struct RetainedOutputChunk { + seq: u64, + stream: ExecOutputStream, + chunk: Vec, +} + +struct RunningProcess { + session: ExecCommandSession, + tty: bool, + output: VecDeque, + retained_bytes: usize, + next_seq: u64, + exit_code: Option, + output_notify: Arc, +} + +enum ProcessEntry { + Starting, + Running(Box), +} + +#[derive(Clone)] +pub(crate) struct ProcessHandler { + notifications: RpcNotificationSender, + processes: Arc>>, +} + +impl ProcessHandler { + pub(crate) fn new(notifications: RpcNotificationSender) -> Self { + Self { + notifications, + processes: Arc::new(Mutex::new(HashMap::new())), + } + } + + pub(crate) async fn shutdown(&self) { + let remaining = { + let mut processes = self.processes.lock().await; + processes + .drain() + .filter_map(|(_, process)| match process { + ProcessEntry::Starting => None, + ProcessEntry::Running(process) => Some(process), + }) + .collect::>() + }; + for process in remaining { + process.session.terminate(); + } + } + + pub(crate) async fn exec(&self, params: ExecParams) -> Result { + let process_id = params.process_id.clone(); + + let (program, args) = params + .argv + .split_first() + .ok_or_else(|| invalid_params("argv must not be empty".to_string()))?; + + { + let mut process_map = self.processes.lock().await; + if process_map.contains_key(&process_id) { + return Err(invalid_request(format!( + "process {process_id} already exists" + ))); + } + process_map.insert(process_id.clone(), ProcessEntry::Starting); + } + + let spawned_result = if params.tty { + codex_utils_pty::spawn_pty_process( + program, + args, + params.cwd.as_path(), + ¶ms.env, + ¶ms.arg0, + TerminalSize::default(), + ) + .await + } else { + codex_utils_pty::spawn_pipe_process_no_stdin( + program, + args, + params.cwd.as_path(), + ¶ms.env, + ¶ms.arg0, + ) + .await + }; + let spawned = match spawned_result { + Ok(spawned) => spawned, + Err(err) => { + let mut process_map = self.processes.lock().await; + if matches!(process_map.get(&process_id), Some(ProcessEntry::Starting)) { + process_map.remove(&process_id); + } + return Err(internal_error(err.to_string())); + } + }; + + let output_notify = Arc::new(Notify::new()); + { + let mut process_map = self.processes.lock().await; + process_map.insert( + process_id.clone(), + ProcessEntry::Running(Box::new(RunningProcess { + session: spawned.session, + tty: params.tty, + output: VecDeque::new(), + retained_bytes: 0, + next_seq: 1, + exit_code: None, + output_notify: Arc::clone(&output_notify), + })), + ); + } + + tokio::spawn(stream_output( + process_id.clone(), + if params.tty { + ExecOutputStream::Pty + } else { + ExecOutputStream::Stdout + }, + spawned.stdout_rx, + self.notifications.clone(), + Arc::clone(&self.processes), + Arc::clone(&output_notify), + )); + tokio::spawn(stream_output( + process_id.clone(), + if params.tty { + ExecOutputStream::Pty + } else { + ExecOutputStream::Stderr + }, + spawned.stderr_rx, + self.notifications.clone(), + Arc::clone(&self.processes), + Arc::clone(&output_notify), + )); + tokio::spawn(watch_exit( + process_id.clone(), + spawned.exit_rx, + self.notifications.clone(), + Arc::clone(&self.processes), + output_notify, + )); + + Ok(ExecResponse { process_id }) + } + + pub(crate) async fn exec_read( + &self, + params: ReadParams, + ) -> Result { + let after_seq = params.after_seq.unwrap_or(0); + let max_bytes = params.max_bytes.unwrap_or(usize::MAX); + let wait = Duration::from_millis(params.wait_ms.unwrap_or(0)); + let deadline = tokio::time::Instant::now() + wait; + + loop { + let (response, output_notify) = { + let process_map = self.processes.lock().await; + let process = process_map.get(¶ms.process_id).ok_or_else(|| { + invalid_request(format!("unknown process id {}", params.process_id)) + })?; + let ProcessEntry::Running(process) = process else { + return Err(invalid_request(format!( + "process id {} is starting", + params.process_id + ))); + }; + + let mut chunks = Vec::new(); + let mut total_bytes = 0; + let mut next_seq = process.next_seq; + for retained in process.output.iter().filter(|chunk| chunk.seq > after_seq) { + let chunk_len = retained.chunk.len(); + if !chunks.is_empty() && total_bytes + chunk_len > max_bytes { + break; + } + total_bytes += chunk_len; + chunks.push(ProcessOutputChunk { + seq: retained.seq, + stream: retained.stream, + chunk: retained.chunk.clone().into(), + }); + next_seq = retained.seq + 1; + if total_bytes >= max_bytes { + break; + } + } + + ( + ReadResponse { + chunks, + next_seq, + exited: process.exit_code.is_some(), + exit_code: process.exit_code, + }, + Arc::clone(&process.output_notify), + ) + }; + + if !response.chunks.is_empty() + || response.exited + || tokio::time::Instant::now() >= deadline + { + return Ok(response); + } + + let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); + if remaining.is_zero() { + return Ok(response); + } + let _ = tokio::time::timeout(remaining, output_notify.notified()).await; + } + } + + pub(crate) async fn exec_write( + &self, + params: WriteParams, + ) -> Result { + let writer_tx = { + let process_map = self.processes.lock().await; + let process = process_map.get(¶ms.process_id).ok_or_else(|| { + invalid_request(format!("unknown process id {}", params.process_id)) + })?; + let ProcessEntry::Running(process) = process else { + return Err(invalid_request(format!( + "process id {} is starting", + params.process_id + ))); + }; + if !process.tty { + return Err(invalid_request(format!( + "stdin is closed for process {}", + params.process_id + ))); + } + process.session.writer_sender() + }; + + writer_tx + .send(params.chunk.into_inner()) + .await + .map_err(|_| internal_error("failed to write to process stdin".to_string()))?; + + Ok(WriteResponse { accepted: true }) + } + + pub(crate) async fn terminate( + &self, + params: TerminateParams, + ) -> Result { + let running = { + let process_map = self.processes.lock().await; + match process_map.get(¶ms.process_id) { + Some(ProcessEntry::Running(process)) => { + if process.exit_code.is_some() { + return Ok(TerminateResponse { running: false }); + } + process.session.terminate(); + true + } + Some(ProcessEntry::Starting) | None => false, + } + }; + + Ok(TerminateResponse { running }) + } +} + +async fn stream_output( + process_id: String, + stream: ExecOutputStream, + mut receiver: tokio::sync::mpsc::Receiver>, + notifications: RpcNotificationSender, + processes: Arc>>, + output_notify: Arc, +) { + while let Some(chunk) = receiver.recv().await { + let notification = { + let mut processes = processes.lock().await; + let Some(entry) = processes.get_mut(&process_id) else { + break; + }; + let ProcessEntry::Running(process) = entry else { + break; + }; + let seq = process.next_seq; + process.next_seq += 1; + process.retained_bytes += chunk.len(); + process.output.push_back(RetainedOutputChunk { + seq, + stream, + chunk: chunk.clone(), + }); + while process.retained_bytes > RETAINED_OUTPUT_BYTES_PER_PROCESS { + let Some(evicted) = process.output.pop_front() else { + break; + }; + process.retained_bytes = process.retained_bytes.saturating_sub(evicted.chunk.len()); + warn!( + "retained output cap exceeded for process {process_id}; dropping oldest output" + ); + } + ExecOutputDeltaNotification { + process_id: process_id.clone(), + stream, + chunk: chunk.into(), + } + }; + output_notify.notify_waiters(); + + if notifications + .notify(crate::protocol::EXEC_OUTPUT_DELTA_METHOD, ¬ification) + .await + .is_err() + { + break; + } + } +} + +async fn watch_exit( + process_id: String, + exit_rx: tokio::sync::oneshot::Receiver, + notifications: RpcNotificationSender, + processes: Arc>>, + output_notify: Arc, +) { + let exit_code = exit_rx.await.unwrap_or(-1); + { + let mut processes = processes.lock().await; + if let Some(ProcessEntry::Running(process)) = processes.get_mut(&process_id) { + process.exit_code = Some(exit_code); + } + } + output_notify.notify_waiters(); + if notifications + .notify( + crate::protocol::EXEC_EXITED_METHOD, + &ExecExitedNotification { + process_id: process_id.clone(), + exit_code, + }, + ) + .await + .is_err() + { + return; + } + + tokio::time::sleep(EXITED_PROCESS_RETENTION).await; + let mut processes = processes.lock().await; + if matches!( + processes.get(&process_id), + Some(ProcessEntry::Running(process)) if process.exit_code == Some(exit_code) + ) { + processes.remove(&process_id); + } +}