From c806579596f60ae32b0c9b19b78a9492e02b7caa Mon Sep 17 00:00:00 2001 From: starr-openai Date: Wed, 18 Mar 2026 14:53:45 -0700 Subject: [PATCH 01/10] Add exec-server filesystem RPC implementation Co-authored-by: Codex --- codex-rs/Cargo.lock | 1 + codex-rs/exec-server/Cargo.toml | 1 + codex-rs/exec-server/src/client.rs | 144 +++++++++++++++ .../exec-server/src/client/local_backend.rs | 105 +++++++++++ codex-rs/exec-server/src/lib.rs | 15 ++ codex-rs/exec-server/src/protocol.rs | 7 + codex-rs/exec-server/src/server.rs | 1 + codex-rs/exec-server/src/server/filesystem.rs | 170 ++++++++++++++++++ codex-rs/exec-server/src/server/handler.rs | 95 ++++++++-- codex-rs/exec-server/src/server/registry.rs | 56 ++++++ codex-rs/exec-server/tests/stdio_smoke.rs | 105 +++++++++++ 11 files changed, 689 insertions(+), 11 deletions(-) create mode 100644 codex-rs/exec-server/src/server/filesystem.rs diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 11b3fa6e812..1103b58b92b 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -2011,6 +2011,7 @@ dependencies = [ "base64 0.22.1", "clap", "codex-app-server-protocol", + "codex-environment", "codex-utils-cargo-bin", "codex-utils-pty", "futures", diff --git a/codex-rs/exec-server/Cargo.toml b/codex-rs/exec-server/Cargo.toml index 1b47760975a..744b2011f35 100644 --- a/codex-rs/exec-server/Cargo.toml +++ b/codex-rs/exec-server/Cargo.toml @@ -15,6 +15,7 @@ workspace = true base64 = { workspace = true } clap = { workspace = true, features = ["derive"] } codex-app-server-protocol = { workspace = true } +codex-environment = { workspace = true } codex-utils-pty = { workspace = true } futures = { workspace = true } serde = { workspace = true, features = ["derive"] } diff --git a/codex-rs/exec-server/src/client.rs b/codex-rs/exec-server/src/client.rs index 652c9d01d95..2f314f3db3f 100644 --- a/codex-rs/exec-server/src/client.rs +++ b/codex-rs/exec-server/src/client.rs @@ -1,6 +1,20 @@ use std::sync::Arc; use std::time::Duration; +use codex_app_server_protocol::FsCopyParams; +use codex_app_server_protocol::FsCopyResponse; +use codex_app_server_protocol::FsCreateDirectoryParams; +use codex_app_server_protocol::FsCreateDirectoryResponse; +use codex_app_server_protocol::FsGetMetadataParams; +use codex_app_server_protocol::FsGetMetadataResponse; +use codex_app_server_protocol::FsReadDirectoryParams; +use codex_app_server_protocol::FsReadDirectoryResponse; +use codex_app_server_protocol::FsReadFileParams; +use codex_app_server_protocol::FsReadFileResponse; +use codex_app_server_protocol::FsRemoveParams; +use codex_app_server_protocol::FsRemoveResponse; +use codex_app_server_protocol::FsWriteFileParams; +use codex_app_server_protocol::FsWriteFileResponse; use codex_app_server_protocol::JSONRPCNotification; use serde_json::Value; use tokio::io::AsyncRead; @@ -26,6 +40,13 @@ use crate::protocol::ExecExitedNotification; use crate::protocol::ExecOutputDeltaNotification; use crate::protocol::ExecParams; use crate::protocol::ExecResponse; +use crate::protocol::FS_COPY_METHOD; +use crate::protocol::FS_CREATE_DIRECTORY_METHOD; +use crate::protocol::FS_GET_METADATA_METHOD; +use crate::protocol::FS_READ_DIRECTORY_METHOD; +use crate::protocol::FS_READ_FILE_METHOD; +use crate::protocol::FS_REMOVE_METHOD; +use crate::protocol::FS_WRITE_FILE_METHOD; use crate::protocol::INITIALIZE_METHOD; use crate::protocol::INITIALIZED_METHOD; use crate::protocol::InitializeParams; @@ -326,6 +347,129 @@ impl ExecServerClient { .map_err(Into::into) } + pub async fn fs_read_file( + &self, + params: FsReadFileParams, + ) -> Result { + if let Some(backend) = self.inner.backend.as_local() { + return backend.fs_read_file(params).await; + } + let Some(remote) = self.inner.backend.as_remote() else { + return Err(ExecServerError::Protocol( + "remote backend missing during fs/readFile".to_string(), + )); + }; + remote + .call(FS_READ_FILE_METHOD, ¶ms) + .await + .map_err(Into::into) + } + + pub async fn fs_write_file( + &self, + params: FsWriteFileParams, + ) -> Result { + if let Some(backend) = self.inner.backend.as_local() { + return backend.fs_write_file(params).await; + } + let Some(remote) = self.inner.backend.as_remote() else { + return Err(ExecServerError::Protocol( + "remote backend missing during fs/writeFile".to_string(), + )); + }; + remote + .call(FS_WRITE_FILE_METHOD, ¶ms) + .await + .map_err(Into::into) + } + + pub async fn fs_create_directory( + &self, + params: FsCreateDirectoryParams, + ) -> Result { + if let Some(backend) = self.inner.backend.as_local() { + return backend.fs_create_directory(params).await; + } + let Some(remote) = self.inner.backend.as_remote() else { + return Err(ExecServerError::Protocol( + "remote backend missing during fs/createDirectory".to_string(), + )); + }; + remote + .call(FS_CREATE_DIRECTORY_METHOD, ¶ms) + .await + .map_err(Into::into) + } + + pub async fn fs_get_metadata( + &self, + params: FsGetMetadataParams, + ) -> Result { + if let Some(backend) = self.inner.backend.as_local() { + return backend.fs_get_metadata(params).await; + } + let Some(remote) = self.inner.backend.as_remote() else { + return Err(ExecServerError::Protocol( + "remote backend missing during fs/getMetadata".to_string(), + )); + }; + remote + .call(FS_GET_METADATA_METHOD, ¶ms) + .await + .map_err(Into::into) + } + + pub async fn fs_read_directory( + &self, + params: FsReadDirectoryParams, + ) -> Result { + if let Some(backend) = self.inner.backend.as_local() { + return backend.fs_read_directory(params).await; + } + let Some(remote) = self.inner.backend.as_remote() else { + return Err(ExecServerError::Protocol( + "remote backend missing during fs/readDirectory".to_string(), + )); + }; + remote + .call(FS_READ_DIRECTORY_METHOD, ¶ms) + .await + .map_err(Into::into) + } + + pub async fn fs_remove( + &self, + params: FsRemoveParams, + ) -> Result { + if let Some(backend) = self.inner.backend.as_local() { + return backend.fs_remove(params).await; + } + let Some(remote) = self.inner.backend.as_remote() else { + return Err(ExecServerError::Protocol( + "remote backend missing during fs/remove".to_string(), + )); + }; + remote + .call(FS_REMOVE_METHOD, ¶ms) + .await + .map_err(Into::into) + } + + pub async fn fs_copy(&self, params: FsCopyParams) -> Result { + if let Some(backend) = self.inner.backend.as_local() { + return backend.fs_copy(params).await; + } + let Some(remote) = self.inner.backend.as_remote() else { + return Err(ExecServerError::Protocol( + "remote backend missing during fs/copy".to_string(), + )); + }; + remote + .call(FS_COPY_METHOD, ¶ms) + .await + .map_err(Into::into) + } + async fn connect( connection: JsonRpcConnection, options: ExecServerClientConnectOptions, diff --git a/codex-rs/exec-server/src/client/local_backend.rs b/codex-rs/exec-server/src/client/local_backend.rs index 16b16d3b103..e23a5361d3a 100644 --- a/codex-rs/exec-server/src/client/local_backend.rs +++ b/codex-rs/exec-server/src/client/local_backend.rs @@ -10,6 +10,20 @@ use crate::protocol::TerminateResponse; use crate::protocol::WriteParams; use crate::protocol::WriteResponse; use crate::server::ExecServerHandler; +use codex_app_server_protocol::FsCopyParams; +use codex_app_server_protocol::FsCopyResponse; +use codex_app_server_protocol::FsCreateDirectoryParams; +use codex_app_server_protocol::FsCreateDirectoryResponse; +use codex_app_server_protocol::FsGetMetadataParams; +use codex_app_server_protocol::FsGetMetadataResponse; +use codex_app_server_protocol::FsReadDirectoryParams; +use codex_app_server_protocol::FsReadDirectoryResponse; +use codex_app_server_protocol::FsReadFileParams; +use codex_app_server_protocol::FsReadFileResponse; +use codex_app_server_protocol::FsRemoveParams; +use codex_app_server_protocol::FsRemoveResponse; +use codex_app_server_protocol::FsWriteFileParams; +use codex_app_server_protocol::FsWriteFileResponse; use super::ExecServerError; @@ -92,4 +106,95 @@ impl LocalBackend { message: error.message, }) } + + pub(super) async fn fs_read_file( + &self, + params: FsReadFileParams, + ) -> Result { + self.handler + .fs_read_file(params) + .await + .map_err(|error| ExecServerError::Server { + code: error.code, + message: error.message, + }) + } + + pub(super) async fn fs_write_file( + &self, + params: FsWriteFileParams, + ) -> Result { + self.handler + .fs_write_file(params) + .await + .map_err(|error| ExecServerError::Server { + code: error.code, + message: error.message, + }) + } + + pub(super) async fn fs_create_directory( + &self, + params: FsCreateDirectoryParams, + ) -> Result { + self.handler + .fs_create_directory(params) + .await + .map_err(|error| ExecServerError::Server { + code: error.code, + message: error.message, + }) + } + + pub(super) async fn fs_get_metadata( + &self, + params: FsGetMetadataParams, + ) -> Result { + self.handler + .fs_get_metadata(params) + .await + .map_err(|error| ExecServerError::Server { + code: error.code, + message: error.message, + }) + } + + pub(super) async fn fs_read_directory( + &self, + params: FsReadDirectoryParams, + ) -> Result { + self.handler + .fs_read_directory(params) + .await + .map_err(|error| ExecServerError::Server { + code: error.code, + message: error.message, + }) + } + + pub(super) async fn fs_remove( + &self, + params: FsRemoveParams, + ) -> Result { + self.handler + .fs_remove(params) + .await + .map_err(|error| ExecServerError::Server { + code: error.code, + message: error.message, + }) + } + + pub(super) async fn fs_copy( + &self, + params: FsCopyParams, + ) -> Result { + self.handler + .fs_copy(params) + .await + .map_err(|error| ExecServerError::Server { + code: error.code, + message: error.message, + }) + } } diff --git a/codex-rs/exec-server/src/lib.rs b/codex-rs/exec-server/src/lib.rs index 13e910351ea..12bf0e17f9d 100644 --- a/codex-rs/exec-server/src/lib.rs +++ b/codex-rs/exec-server/src/lib.rs @@ -11,6 +11,21 @@ pub use client::ExecServerError; pub use client_api::ExecServerClientConnectOptions; pub use client_api::ExecServerEvent; pub use client_api::RemoteExecServerConnectArgs; +pub use codex_app_server_protocol::FsCopyParams; +pub use codex_app_server_protocol::FsCopyResponse; +pub use codex_app_server_protocol::FsCreateDirectoryParams; +pub use codex_app_server_protocol::FsCreateDirectoryResponse; +pub use codex_app_server_protocol::FsGetMetadataParams; +pub use codex_app_server_protocol::FsGetMetadataResponse; +pub use codex_app_server_protocol::FsReadDirectoryEntry; +pub use codex_app_server_protocol::FsReadDirectoryParams; +pub use codex_app_server_protocol::FsReadDirectoryResponse; +pub use codex_app_server_protocol::FsReadFileParams; +pub use codex_app_server_protocol::FsReadFileResponse; +pub use codex_app_server_protocol::FsRemoveParams; +pub use codex_app_server_protocol::FsRemoveResponse; +pub use codex_app_server_protocol::FsWriteFileParams; +pub use codex_app_server_protocol::FsWriteFileResponse; pub use local::ExecServerLaunchCommand; pub use local::SpawnedExecServer; pub use local::spawn_local_exec_server; diff --git a/codex-rs/exec-server/src/protocol.rs b/codex-rs/exec-server/src/protocol.rs index 7ed8e20ae46..ca7d89d3551 100644 --- a/codex-rs/exec-server/src/protocol.rs +++ b/codex-rs/exec-server/src/protocol.rs @@ -13,6 +13,13 @@ pub const EXEC_WRITE_METHOD: &str = "process/write"; pub const EXEC_TERMINATE_METHOD: &str = "process/terminate"; pub const EXEC_OUTPUT_DELTA_METHOD: &str = "process/output"; pub const EXEC_EXITED_METHOD: &str = "process/exited"; +pub const FS_READ_FILE_METHOD: &str = "fs/readFile"; +pub const FS_WRITE_FILE_METHOD: &str = "fs/writeFile"; +pub const FS_CREATE_DIRECTORY_METHOD: &str = "fs/createDirectory"; +pub const FS_GET_METADATA_METHOD: &str = "fs/getMetadata"; +pub const FS_READ_DIRECTORY_METHOD: &str = "fs/readDirectory"; +pub const FS_REMOVE_METHOD: &str = "fs/remove"; +pub const FS_COPY_METHOD: &str = "fs/copy"; pub const PROTOCOL_VERSION: &str = "exec-server.v0"; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] diff --git a/codex-rs/exec-server/src/server.rs b/codex-rs/exec-server/src/server.rs index ba074e617fd..bdd00130ee0 100644 --- a/codex-rs/exec-server/src/server.rs +++ b/codex-rs/exec-server/src/server.rs @@ -1,3 +1,4 @@ +mod filesystem; mod handler; mod processor; mod registry; diff --git a/codex-rs/exec-server/src/server/filesystem.rs b/codex-rs/exec-server/src/server/filesystem.rs new file mode 100644 index 00000000000..09467e9e020 --- /dev/null +++ b/codex-rs/exec-server/src/server/filesystem.rs @@ -0,0 +1,170 @@ +use std::io; +use std::sync::Arc; + +use base64::Engine as _; +use base64::engine::general_purpose::STANDARD; +use codex_app_server_protocol::FsCopyParams; +use codex_app_server_protocol::FsCopyResponse; +use codex_app_server_protocol::FsCreateDirectoryParams; +use codex_app_server_protocol::FsCreateDirectoryResponse; +use codex_app_server_protocol::FsGetMetadataParams; +use codex_app_server_protocol::FsGetMetadataResponse; +use codex_app_server_protocol::FsReadDirectoryEntry; +use codex_app_server_protocol::FsReadDirectoryParams; +use codex_app_server_protocol::FsReadDirectoryResponse; +use codex_app_server_protocol::FsReadFileParams; +use codex_app_server_protocol::FsReadFileResponse; +use codex_app_server_protocol::FsRemoveParams; +use codex_app_server_protocol::FsRemoveResponse; +use codex_app_server_protocol::FsWriteFileParams; +use codex_app_server_protocol::FsWriteFileResponse; +use codex_app_server_protocol::JSONRPCErrorError; +use codex_environment::CopyOptions; +use codex_environment::CreateDirectoryOptions; +use codex_environment::Environment; +use codex_environment::ExecutorFileSystem; +use codex_environment::RemoveOptions; + +use crate::rpc::internal_error; +use crate::rpc::invalid_request; + +#[derive(Clone)] +pub(crate) struct ExecServerFileSystem { + file_system: Arc, +} + +impl Default for ExecServerFileSystem { + fn default() -> Self { + Self { + file_system: Arc::new(Environment.get_filesystem()), + } + } +} + +impl ExecServerFileSystem { + pub(crate) async fn read_file( + &self, + params: FsReadFileParams, + ) -> Result { + let bytes = self + .file_system + .read_file(¶ms.path) + .await + .map_err(map_fs_error)?; + Ok(FsReadFileResponse { + data_base64: STANDARD.encode(bytes), + }) + } + + pub(crate) async fn write_file( + &self, + params: FsWriteFileParams, + ) -> Result { + let bytes = STANDARD.decode(params.data_base64).map_err(|err| { + invalid_request(format!( + "fs/writeFile requires valid base64 dataBase64: {err}" + )) + })?; + self.file_system + .write_file(¶ms.path, bytes) + .await + .map_err(map_fs_error)?; + Ok(FsWriteFileResponse {}) + } + + pub(crate) async fn create_directory( + &self, + params: FsCreateDirectoryParams, + ) -> Result { + self.file_system + .create_directory( + ¶ms.path, + CreateDirectoryOptions { + recursive: params.recursive.unwrap_or(true), + }, + ) + .await + .map_err(map_fs_error)?; + Ok(FsCreateDirectoryResponse {}) + } + + pub(crate) async fn get_metadata( + &self, + params: FsGetMetadataParams, + ) -> Result { + let metadata = self + .file_system + .get_metadata(¶ms.path) + .await + .map_err(map_fs_error)?; + Ok(FsGetMetadataResponse { + is_directory: metadata.is_directory, + is_file: metadata.is_file, + created_at_ms: metadata.created_at_ms, + modified_at_ms: metadata.modified_at_ms, + }) + } + + pub(crate) async fn read_directory( + &self, + params: FsReadDirectoryParams, + ) -> Result { + let entries = self + .file_system + .read_directory(¶ms.path) + .await + .map_err(map_fs_error)?; + Ok(FsReadDirectoryResponse { + entries: entries + .into_iter() + .map(|entry| FsReadDirectoryEntry { + file_name: entry.file_name, + is_directory: entry.is_directory, + is_file: entry.is_file, + }) + .collect(), + }) + } + + pub(crate) async fn remove( + &self, + params: FsRemoveParams, + ) -> Result { + self.file_system + .remove( + ¶ms.path, + RemoveOptions { + recursive: params.recursive.unwrap_or(true), + force: params.force.unwrap_or(true), + }, + ) + .await + .map_err(map_fs_error)?; + Ok(FsRemoveResponse {}) + } + + pub(crate) async fn copy( + &self, + params: FsCopyParams, + ) -> Result { + self.file_system + .copy( + ¶ms.source_path, + ¶ms.destination_path, + CopyOptions { + recursive: params.recursive, + }, + ) + .await + .map_err(map_fs_error)?; + Ok(FsCopyResponse {}) + } +} + +fn map_fs_error(err: io::Error) -> JSONRPCErrorError { + if err.kind() == io::ErrorKind::InvalidInput { + invalid_request(err.to_string()) + } else { + internal_error(err.to_string()) + } +} diff --git a/codex-rs/exec-server/src/server/handler.rs b/codex-rs/exec-server/src/server/handler.rs index 40a13cf1f99..a5a75a8e32e 100644 --- a/codex-rs/exec-server/src/server/handler.rs +++ b/codex-rs/exec-server/src/server/handler.rs @@ -5,6 +5,20 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::time::Duration; +use codex_app_server_protocol::FsCopyParams; +use codex_app_server_protocol::FsCopyResponse; +use codex_app_server_protocol::FsCreateDirectoryParams; +use codex_app_server_protocol::FsCreateDirectoryResponse; +use codex_app_server_protocol::FsGetMetadataParams; +use codex_app_server_protocol::FsGetMetadataResponse; +use codex_app_server_protocol::FsReadDirectoryParams; +use codex_app_server_protocol::FsReadDirectoryResponse; +use codex_app_server_protocol::FsReadFileParams; +use codex_app_server_protocol::FsReadFileResponse; +use codex_app_server_protocol::FsRemoveParams; +use codex_app_server_protocol::FsRemoveResponse; +use codex_app_server_protocol::FsWriteFileParams; +use codex_app_server_protocol::FsWriteFileResponse; use codex_app_server_protocol::JSONRPCErrorError; use codex_utils_pty::ExecCommandSession; use codex_utils_pty::TerminalSize; @@ -30,6 +44,7 @@ use crate::rpc::RpcNotificationSender; use crate::rpc::internal_error; use crate::rpc::invalid_params; use crate::rpc::invalid_request; +use crate::server::filesystem::ExecServerFileSystem; const RETAINED_OUTPUT_BYTES_PER_PROCESS: usize = 1024 * 1024; #[cfg(test)] @@ -61,6 +76,7 @@ enum ProcessEntry { pub(crate) struct ExecServerHandler { notifications: RpcNotificationSender, + file_system: ExecServerFileSystem, processes: Arc>>, initialize_requested: AtomicBool, initialized: AtomicBool, @@ -70,6 +86,7 @@ impl ExecServerHandler { pub(crate) fn new(notifications: RpcNotificationSender) -> Self { Self { notifications, + file_system: ExecServerFileSystem::default(), processes: Arc::new(Mutex::new(HashMap::new())), initialize_requested: AtomicBool::new(false), initialized: AtomicBool::new(false), @@ -111,22 +128,22 @@ impl ExecServerHandler { Ok(()) } - fn require_initialized(&self) -> Result<(), JSONRPCErrorError> { + fn require_initialized_for(&self, method_family: &str) -> Result<(), JSONRPCErrorError> { if !self.initialize_requested.load(Ordering::SeqCst) { - return Err(invalid_request( - "client must call initialize before using exec methods".to_string(), - )); + return Err(invalid_request(format!( + "client must call initialize before using {method_family} methods" + ))); } if !self.initialized.load(Ordering::SeqCst) { - return Err(invalid_request( - "client must send initialized before using exec methods".to_string(), - )); + return Err(invalid_request(format!( + "client must send initialized before using {method_family} methods" + ))); } Ok(()) } pub(crate) async fn exec(&self, params: ExecParams) -> Result { - self.require_initialized()?; + self.require_initialized_for("exec")?; let process_id = params.process_id.clone(); let (program, args) = params @@ -231,7 +248,7 @@ impl ExecServerHandler { &self, params: ReadParams, ) -> Result { - self.require_initialized()?; + self.require_initialized_for("exec")?; let after_seq = params.after_seq.unwrap_or(0); let max_bytes = params.max_bytes.unwrap_or(usize::MAX); let wait = Duration::from_millis(params.wait_ms.unwrap_or(0)); @@ -300,7 +317,7 @@ impl ExecServerHandler { &self, params: WriteParams, ) -> Result { - self.require_initialized()?; + self.require_initialized_for("exec")?; let writer_tx = { let process_map = self.processes.lock().await; let process = process_map.get(¶ms.process_id).ok_or_else(|| { @@ -333,7 +350,7 @@ impl ExecServerHandler { &self, params: TerminateParams, ) -> Result { - self.require_initialized()?; + self.require_initialized_for("exec")?; let running = { let process_map = self.processes.lock().await; match process_map.get(¶ms.process_id) { @@ -347,6 +364,62 @@ impl ExecServerHandler { Ok(TerminateResponse { running }) } + + pub(crate) async fn fs_read_file( + &self, + params: FsReadFileParams, + ) -> Result { + self.require_initialized_for("filesystem")?; + self.file_system.read_file(params).await + } + + pub(crate) async fn fs_write_file( + &self, + params: FsWriteFileParams, + ) -> Result { + self.require_initialized_for("filesystem")?; + self.file_system.write_file(params).await + } + + pub(crate) async fn fs_create_directory( + &self, + params: FsCreateDirectoryParams, + ) -> Result { + self.require_initialized_for("filesystem")?; + self.file_system.create_directory(params).await + } + + pub(crate) async fn fs_get_metadata( + &self, + params: FsGetMetadataParams, + ) -> Result { + self.require_initialized_for("filesystem")?; + self.file_system.get_metadata(params).await + } + + pub(crate) async fn fs_read_directory( + &self, + params: FsReadDirectoryParams, + ) -> Result { + self.require_initialized_for("filesystem")?; + self.file_system.read_directory(params).await + } + + pub(crate) async fn fs_remove( + &self, + params: FsRemoveParams, + ) -> Result { + self.require_initialized_for("filesystem")?; + self.file_system.remove(params).await + } + + pub(crate) async fn fs_copy( + &self, + params: FsCopyParams, + ) -> Result { + self.require_initialized_for("filesystem")?; + self.file_system.copy(params).await + } } async fn stream_output( diff --git a/codex-rs/exec-server/src/server/registry.rs b/codex-rs/exec-server/src/server/registry.rs index 50efdddd71e..6ddfa0a4300 100644 --- a/codex-rs/exec-server/src/server/registry.rs +++ b/codex-rs/exec-server/src/server/registry.rs @@ -5,6 +5,13 @@ use crate::protocol::EXEC_READ_METHOD; use crate::protocol::EXEC_TERMINATE_METHOD; use crate::protocol::EXEC_WRITE_METHOD; use crate::protocol::ExecParams; +use crate::protocol::FS_COPY_METHOD; +use crate::protocol::FS_CREATE_DIRECTORY_METHOD; +use crate::protocol::FS_GET_METADATA_METHOD; +use crate::protocol::FS_READ_DIRECTORY_METHOD; +use crate::protocol::FS_READ_FILE_METHOD; +use crate::protocol::FS_REMOVE_METHOD; +use crate::protocol::FS_WRITE_FILE_METHOD; use crate::protocol::INITIALIZE_METHOD; use crate::protocol::INITIALIZED_METHOD; use crate::protocol::InitializeParams; @@ -13,6 +20,13 @@ use crate::protocol::TerminateParams; use crate::protocol::WriteParams; use crate::rpc::RpcRouter; use crate::server::ExecServerHandler; +use codex_app_server_protocol::FsCopyParams; +use codex_app_server_protocol::FsCreateDirectoryParams; +use codex_app_server_protocol::FsGetMetadataParams; +use codex_app_server_protocol::FsReadDirectoryParams; +use codex_app_server_protocol::FsReadFileParams; +use codex_app_server_protocol::FsRemoveParams; +use codex_app_server_protocol::FsWriteFileParams; pub(crate) fn build_router() -> RpcRouter { let mut router = RpcRouter::new(); @@ -48,5 +62,47 @@ pub(crate) fn build_router() -> RpcRouter { handler.terminate(params).await }, ); + router.request( + FS_READ_FILE_METHOD, + |handler: Arc, params: FsReadFileParams| async move { + handler.fs_read_file(params).await + }, + ); + router.request( + FS_WRITE_FILE_METHOD, + |handler: Arc, params: FsWriteFileParams| async move { + handler.fs_write_file(params).await + }, + ); + router.request( + FS_CREATE_DIRECTORY_METHOD, + |handler: Arc, params: FsCreateDirectoryParams| async move { + handler.fs_create_directory(params).await + }, + ); + router.request( + FS_GET_METADATA_METHOD, + |handler: Arc, params: FsGetMetadataParams| async move { + handler.fs_get_metadata(params).await + }, + ); + router.request( + FS_READ_DIRECTORY_METHOD, + |handler: Arc, params: FsReadDirectoryParams| async move { + handler.fs_read_directory(params).await + }, + ); + router.request( + FS_REMOVE_METHOD, + |handler: Arc, params: FsRemoveParams| async move { + handler.fs_remove(params).await + }, + ); + router.request( + FS_COPY_METHOD, + |handler: Arc, params: FsCopyParams| async move { + handler.fs_copy(params).await + }, + ); router } diff --git a/codex-rs/exec-server/tests/stdio_smoke.rs b/codex-rs/exec-server/tests/stdio_smoke.rs index c08d7f3c9b2..77374dc4679 100644 --- a/codex-rs/exec-server/tests/stdio_smoke.rs +++ b/codex-rs/exec-server/tests/stdio_smoke.rs @@ -4,6 +4,8 @@ use std::process::Stdio; use std::time::Duration; use anyhow::Context; +use base64::Engine as _; +use base64::engine::general_purpose::STANDARD as BASE64_STANDARD; use codex_app_server_protocol::JSONRPCMessage; use codex_app_server_protocol::JSONRPCNotification; use codex_app_server_protocol::JSONRPCRequest; @@ -15,6 +17,13 @@ use codex_exec_server::ExecServerClient; use codex_exec_server::ExecServerClientConnectOptions; use codex_exec_server::ExecServerEvent; use codex_exec_server::ExecServerLaunchCommand; +use codex_exec_server::FsCopyParams; +use codex_exec_server::FsCreateDirectoryParams; +use codex_exec_server::FsGetMetadataParams; +use codex_exec_server::FsReadDirectoryParams; +use codex_exec_server::FsReadFileParams; +use codex_exec_server::FsRemoveParams; +use codex_exec_server::FsWriteFileParams; use codex_exec_server::InitializeParams; use codex_exec_server::InitializeResponse; use codex_exec_server::RemoteExecServerConnectArgs; @@ -200,6 +209,102 @@ async fn exec_server_client_connects_over_websocket() -> anyhow::Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn exec_server_client_filesystem_round_trip_over_stdio() -> anyhow::Result<()> { + let server = spawn_local_exec_server( + ExecServerLaunchCommand { + program: cargo_bin("codex-exec-server")?, + args: Vec::new(), + }, + ExecServerClientConnectOptions { + client_name: "exec-server-test".to_string(), + initialize_timeout: Duration::from_secs(5), + }, + ) + .await?; + let client = server.client(); + + let root = std::env::temp_dir().join(format!( + "codex-exec-server-fs-{}-{}", + std::process::id(), + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH)? + .as_nanos() + )); + let directory = root.join("dir"); + let file_path = directory.join("hello.txt"); + let copy_path = directory.join("copy.txt"); + + client + .fs_create_directory(FsCreateDirectoryParams { + path: directory.clone().try_into()?, + recursive: Some(true), + }) + .await?; + + client + .fs_write_file(FsWriteFileParams { + path: file_path.clone().try_into()?, + data_base64: BASE64_STANDARD.encode(b"hello"), + }) + .await?; + + let metadata = client + .fs_get_metadata(FsGetMetadataParams { + path: file_path.clone().try_into()?, + }) + .await?; + assert!(metadata.is_file); + assert!(!metadata.is_directory); + + let read_file = client + .fs_read_file(FsReadFileParams { + path: file_path.clone().try_into()?, + }) + .await?; + assert_eq!(read_file.data_base64, BASE64_STANDARD.encode(b"hello")); + + let read_directory = client + .fs_read_directory(FsReadDirectoryParams { + path: directory.clone().try_into()?, + }) + .await?; + assert!( + read_directory + .entries + .iter() + .any(|entry| entry.file_name == "hello.txt" && entry.is_file) + ); + + client + .fs_copy(FsCopyParams { + source_path: file_path.clone().try_into()?, + destination_path: copy_path.clone().try_into()?, + recursive: false, + }) + .await?; + let copied = client + .fs_read_file(FsReadFileParams { + path: copy_path.clone().try_into()?, + }) + .await?; + assert_eq!(copied.data_base64, BASE64_STANDARD.encode(b"hello")); + + client + .fs_remove(FsRemoveParams { + path: root.clone().try_into()?, + recursive: Some(true), + force: Some(true), + }) + .await?; + + assert!( + !root.exists(), + "filesystem cleanup should remove the test tree" + ); + Ok(()) +} + async fn read_websocket_url(lines: &mut tokio::io::Lines>) -> anyhow::Result where R: tokio::io::AsyncRead + Unpin, From 956a459b3253f9d2e5a564869ce549c34e1b7b44 Mon Sep 17 00:00:00 2001 From: starr-openai Date: Thu, 19 Mar 2026 01:14:53 +0000 Subject: [PATCH 02/10] Fix exec-server in-order request handling and stdio transport Co-authored-by: Codex --- codex-rs/exec-server/src/connection.rs | 22 +- codex-rs/exec-server/src/local.rs | 1 + codex-rs/exec-server/src/rpc.rs | 212 ++++++++++++++++++ codex-rs/exec-server/src/server.rs | 15 +- codex-rs/exec-server/src/server/processor.rs | 24 +- codex-rs/exec-server/src/server/registry.rs | 4 +- codex-rs/exec-server/src/server/transport.rs | 147 ++++-------- .../exec-server/src/server/transport_tests.rs | 50 +++++ codex-rs/exec-server/tests/initialize.rs | 39 ++++ codex-rs/exec-server/tests/process.rs | 74 ++++++ codex-rs/exec-server/tests/stdio_smoke.rs | 1 + codex-rs/exec-server/tests/websocket.rs | 65 ++++++ 12 files changed, 533 insertions(+), 121 deletions(-) create mode 100644 codex-rs/exec-server/src/server/transport_tests.rs create mode 100644 codex-rs/exec-server/tests/initialize.rs create mode 100644 codex-rs/exec-server/tests/process.rs create mode 100644 codex-rs/exec-server/tests/websocket.rs diff --git a/codex-rs/exec-server/src/connection.rs b/codex-rs/exec-server/src/connection.rs index 6c9f66d9a84..18aa2040b3a 100644 --- a/codex-rs/exec-server/src/connection.rs +++ b/codex-rs/exec-server/src/connection.rs @@ -11,11 +11,13 @@ use tokio::sync::mpsc; use tokio_tungstenite::WebSocketStream; use tokio_tungstenite::tungstenite::Message; + pub(crate) const CHANNEL_CAPACITY: usize = 128; #[derive(Debug)] pub(crate) enum JsonRpcConnectionEvent { Message(JSONRPCMessage), + MalformedMessage { reason: String }, Disconnected { reason: Option }, } @@ -55,14 +57,13 @@ impl JsonRpcConnection { } } Err(err) => { - send_disconnected( + send_malformed_message( &incoming_tx_for_reader, Some(format!( "failed to parse JSON-RPC message from {reader_label}: {err}" )), ) .await; - break; } } } @@ -132,14 +133,13 @@ impl JsonRpcConnection { } } Err(err) => { - send_disconnected( + send_malformed_message( &incoming_tx_for_reader, Some(format!( "failed to parse websocket JSON-RPC message from {reader_label}: {err}" )), ) .await; - break; } } } @@ -155,14 +155,13 @@ impl JsonRpcConnection { } } Err(err) => { - send_disconnected( + send_malformed_message( &incoming_tx_for_reader, Some(format!( "failed to parse websocket JSON-RPC message from {reader_label}: {err}" )), ) .await; - break; } } } @@ -247,6 +246,17 @@ async fn send_disconnected( .await; } +async fn send_malformed_message( + incoming_tx: &mpsc::Sender, + reason: Option, +) { + let _ = incoming_tx + .send(JsonRpcConnectionEvent::MalformedMessage { + reason: reason.unwrap_or_else(|| "malformed JSON-RPC message".to_string()), + }) + .await; +} + async fn write_jsonrpc_line_message( writer: &mut BufWriter, message: &JSONRPCMessage, diff --git a/codex-rs/exec-server/src/local.rs b/codex-rs/exec-server/src/local.rs index 20b59a983cb..e51c9439482 100644 --- a/codex-rs/exec-server/src/local.rs +++ b/codex-rs/exec-server/src/local.rs @@ -42,6 +42,7 @@ pub async fn spawn_local_exec_server( ) -> Result { let mut child = Command::new(&command.program); child.args(&command.args); + child.args(["--listen", "stdio://"]); child.stdin(Stdio::piped()); child.stdout(Stdio::piped()); child.stderr(Stdio::inherit()); diff --git a/codex-rs/exec-server/src/rpc.rs b/codex-rs/exec-server/src/rpc.rs index 94a891e22cb..a57e164919e 100644 --- a/codex-rs/exec-server/src/rpc.rs +++ b/codex-rs/exec-server/src/rpc.rs @@ -4,6 +4,7 @@ use std::pin::Pin; use std::sync::Arc; use std::sync::atomic::AtomicI64; use std::sync::atomic::Ordering; +use std::pin::Pin; use codex_app_server_protocol::JSONRPCError; use codex_app_server_protocol::JSONRPCErrorError; @@ -441,6 +442,217 @@ async fn drain_pending(pending: &Mutex>) { } } +#[derive(Debug)] +pub(crate) enum RpcServerOutboundMessage { + Response { + request_id: RequestId, + result: Value, + }, + Error { + request_id: RequestId, + error: JSONRPCErrorError, + }, + Notification(JSONRPCNotification), +} + +impl RpcServerOutboundMessage { + fn response(request_id: RequestId, result: Result) -> Self { + match result { + Ok(result) => Self::Response { + request_id, + result, + }, + Err(error) => Self::Error { + request_id, + error, + }, + } + } +} + +pub(crate) fn invalid_request(message: String) -> JSONRPCErrorError { + JSONRPCErrorError { + code: -32600, + data: None, + message, + } +} + +pub(crate) fn invalid_params(message: String) -> JSONRPCErrorError { + JSONRPCErrorError { + code: -32602, + data: None, + message, + } +} + +pub(crate) fn method_not_found(message: String) -> JSONRPCErrorError { + JSONRPCErrorError { + code: -32601, + data: None, + message, + } +} + +pub(crate) fn internal_error(message: String) -> JSONRPCErrorError { + JSONRPCErrorError { + code: -32603, + data: None, + message, + } +} + +pub(crate) fn encode_server_message( + message: RpcServerOutboundMessage, +) -> Result { + Ok(match message { + RpcServerOutboundMessage::Response { request_id, result } => { + JSONRPCMessage::Response(JSONRPCResponse { id: request_id, result }) + } + RpcServerOutboundMessage::Error { request_id, error } => { + JSONRPCMessage::Error(JSONRPCError { id: request_id, error }) + } + RpcServerOutboundMessage::Notification(notification) => { + JSONRPCMessage::Notification(notification) + } + }) +} + +#[derive(Clone)] +pub(crate) struct RpcNotificationSender { + tx: mpsc::Sender, +} + +impl RpcNotificationSender { + pub(crate) fn new(tx: mpsc::Sender) -> Self { + Self { tx } + } + + pub(crate) async fn notify( + &self, + method: &str, + params: &P, + ) -> Result<(), serde_json::Error> { + let params = serde_json::to_value(params)?; + self.tx + .send(RpcServerOutboundMessage::Notification(JSONRPCNotification { + method: method.to_string(), + params: Some(params), + })) + .await + .map_err(|_| { + serde_json::Error::io(std::io::Error::new( + std::io::ErrorKind::BrokenPipe, + "JSON-RPC transport closed", + )) + }) + } +} + +type RpcRequestRoute = dyn Fn( + Arc, + codex_app_server_protocol::JSONRPCRequest, + ) -> Pin + Send>> + + Send + + Sync; + +type RpcNotificationRoute = dyn Fn( + Arc, + codex_app_server_protocol::JSONRPCNotification, + ) -> Pin> + Send>> + + Send + + Sync; + +pub(crate) struct RpcRouter { + request_routes: HashMap>>, + notification_routes: HashMap>>, +} + +impl RpcRouter { + pub(crate) fn new() -> Self { + Self { + request_routes: HashMap::new(), + notification_routes: HashMap::new(), + } + } + + pub(crate) fn request(&mut self, method: &str, handler: F) + where + P: DeserializeOwned + Send + 'static, + R: Serialize + Send + 'static, + F: Fn(Arc, P) -> Fut + Send + Sync + 'static, + Fut: std::future::Future> + Send + 'static, + { + let method = method.to_string(); + let handler = std::sync::Arc::new(handler); + self.request_routes.insert( + method, + Box::new( + move |server_handler: Arc, request: codex_app_server_protocol::JSONRPCRequest| { + let handler = std::sync::Arc::clone(&handler); + let params = serde_json::from_value::

(request.params.unwrap_or(Value::Null)) + .map_err(|error| invalid_params(error.to_string())); + let request_id = request.id; + Box::pin(async move { + let result = match params { + Ok(params) => handler(server_handler.clone(), params) + .await + .and_then(|value| { + serde_json::to_value(value) + .map_err(|error| invalid_params(error.to_string())) + }), + Err(error) => Err(error), + }; + RpcServerOutboundMessage::response(request_id, result) + }) + }, + ), + ); + } + + pub(crate) fn notification(&mut self, method: &str, handler: F) + where + P: DeserializeOwned + Send + 'static, + F: Fn(Arc, P) -> Fut + Send + Sync + 'static, + Fut: std::future::Future> + Send + 'static, + { + let method = method.to_string(); + let handler = std::sync::Arc::new(handler); + self.notification_routes.insert( + method, + Box::new( + move | + server_handler: Arc, + notification: codex_app_server_protocol::JSONRPCNotification| { + let handler = std::sync::Arc::clone(&handler); + let params = serde_json::from_value::

(notification.params.unwrap_or(Value::Null)) + .map_err(|err| err.to_string()); + Box::pin(async move { + match params { + Ok(params) => handler(server_handler.clone(), params).await, + Err(error) => Err(error), + } + }) + }, + ), + ); + } + + pub(crate) fn request_route( + &self, + method: &str, + ) -> Option<&Box>> { + self.request_routes.get(method) + } + + pub(crate) fn notification_route( + &self, + method: &str, + ) -> Option<&Box>> { + self.notification_routes.get(method) + } +} + #[cfg(test)] mod tests { use std::time::Duration; diff --git a/codex-rs/exec-server/src/server.rs b/codex-rs/exec-server/src/server.rs index bdd00130ee0..10e6b19c66d 100644 --- a/codex-rs/exec-server/src/server.rs +++ b/codex-rs/exec-server/src/server.rs @@ -1,19 +1,20 @@ mod filesystem; mod handler; -mod processor; +mod jsonrpc; mod registry; +mod processor; mod transport; pub(crate) use handler::ExecServerHandler; -pub use transport::ExecServerTransport; -pub use transport::ExecServerTransportParseError; +pub use transport::DEFAULT_LISTEN_URL; +pub use transport::ExecServerListenUrlParseError; pub async fn run_main() -> Result<(), Box> { - run_main_with_transport(ExecServerTransport::Stdio).await + run_main_with_listen_url(DEFAULT_LISTEN_URL).await } -pub async fn run_main_with_transport( - transport: ExecServerTransport, +pub async fn run_main_with_listen_url( + listen_url: &str, ) -> Result<(), Box> { - transport::run_transport(transport).await + transport::run_transport(listen_url).await } diff --git a/codex-rs/exec-server/src/server/processor.rs b/codex-rs/exec-server/src/server/processor.rs index 888bbf7f302..2fc8995e6ac 100644 --- a/codex-rs/exec-server/src/server/processor.rs +++ b/codex-rs/exec-server/src/server/processor.rs @@ -10,6 +10,7 @@ use crate::connection::JsonRpcConnectionEvent; use crate::rpc::RpcNotificationSender; use crate::rpc::RpcServerOutboundMessage; use crate::rpc::encode_server_message; +use crate::rpc::invalid_request; use crate::rpc::method_not_found; use crate::server::ExecServerHandler; use crate::server::registry::build_router; @@ -39,15 +40,26 @@ pub(crate) async fn run_connection(connection: JsonRpcConnection) { while let Some(event) = incoming_rx.recv().await { match event { + JsonRpcConnectionEvent::MalformedMessage { reason } => { + warn!("ignoring malformed exec-server message: {reason}"); + if outgoing_tx + .send(RpcServerOutboundMessage::Error { + request_id: codex_app_server_protocol::RequestId::Integer(-1), + error: invalid_request(reason), + }) + .await + .is_err() + { + break; + } + } JsonRpcConnectionEvent::Message(message) => match message { codex_app_server_protocol::JSONRPCMessage::Request(request) => { if let Some(route) = router.request_route(request.method.as_str()) { - let route = route(handler.clone(), request); - let outgoing_tx = outgoing_tx.clone(); - tokio::spawn(async move { - let message = route.await; - let _ = outgoing_tx.send(message).await; - }); + let message = route(handler.clone(), request).await; + if outgoing_tx.send(message).await.is_err() { + break; + } } else if outgoing_tx .send(RpcServerOutboundMessage::Error { request_id: request.id, diff --git a/codex-rs/exec-server/src/server/registry.rs b/codex-rs/exec-server/src/server/registry.rs index 6ddfa0a4300..482e5ab6107 100644 --- a/codex-rs/exec-server/src/server/registry.rs +++ b/codex-rs/exec-server/src/server/registry.rs @@ -38,7 +38,9 @@ pub(crate) fn build_router() -> RpcRouter { ); router.notification( INITIALIZED_METHOD, - |handler: Arc, (): ()| async move { handler.initialized() }, + |handler: Arc, _params: serde_json::Value| async move { + handler.initialized() + }, ); router.request( EXEC_METHOD, diff --git a/codex-rs/exec-server/src/server/transport.rs b/codex-rs/exec-server/src/server/transport.rs index b653c0b79b7..2c83b8b87c2 100644 --- a/codex-rs/exec-server/src/server/transport.rs +++ b/codex-rs/exec-server/src/server/transport.rs @@ -1,33 +1,29 @@ use std::net::SocketAddr; -use std::str::FromStr; use tokio::net::TcpListener; +use tokio::io; use tokio_tungstenite::accept_async; use tracing::warn; use crate::connection::JsonRpcConnection; use crate::server::processor::run_connection; -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub enum ExecServerTransport { - Stdio, - WebSocket { bind_address: SocketAddr }, -} +pub const DEFAULT_LISTEN_URL: &str = "ws://127.0.0.1:0"; #[derive(Debug, Clone, Eq, PartialEq)] -pub enum ExecServerTransportParseError { +pub enum ExecServerListenUrlParseError { UnsupportedListenUrl(String), InvalidWebSocketListenUrl(String), } -impl std::fmt::Display for ExecServerTransportParseError { +impl std::fmt::Display for ExecServerListenUrlParseError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - ExecServerTransportParseError::UnsupportedListenUrl(listen_url) => write!( + ExecServerListenUrlParseError::UnsupportedListenUrl(listen_url) => write!( f, - "unsupported --listen URL `{listen_url}`; expected `stdio://` or `ws://IP:PORT`" + "unsupported --listen URL `{listen_url}`; expected `ws://IP:PORT`" ), - ExecServerTransportParseError::InvalidWebSocketListenUrl(listen_url) => write!( + ExecServerListenUrlParseError::InvalidWebSocketListenUrl(listen_url) => write!( f, "invalid websocket --listen URL `{listen_url}`; expected `ws://IP:PORT`" ), @@ -35,62 +31,59 @@ impl std::fmt::Display for ExecServerTransportParseError { } } -impl std::error::Error for ExecServerTransportParseError {} - -impl ExecServerTransport { - pub const DEFAULT_LISTEN_URL: &str = "stdio://"; - - pub fn from_listen_url(listen_url: &str) -> Result { - if listen_url == Self::DEFAULT_LISTEN_URL { - return Ok(Self::Stdio); - } - - if let Some(socket_addr) = listen_url.strip_prefix("ws://") { - let bind_address = socket_addr.parse::().map_err(|_| { - ExecServerTransportParseError::InvalidWebSocketListenUrl(listen_url.to_string()) - })?; - return Ok(Self::WebSocket { bind_address }); - } +impl std::error::Error for ExecServerListenUrlParseError {} - Err(ExecServerTransportParseError::UnsupportedListenUrl( - listen_url.to_string(), - )) - } +#[derive(Debug, Clone, Eq, PartialEq)] +enum ListenAddress { + Websocket(SocketAddr), + Stdio, } -impl FromStr for ExecServerTransport { - type Err = ExecServerTransportParseError; - - fn from_str(s: &str) -> Result { - Self::from_listen_url(s) +fn parse_listen_url(listen_url: &str) -> Result { + if let Some(socket_addr) = listen_url.strip_prefix("ws://") { + return socket_addr + .parse::() + .map(ListenAddress::Websocket) + .map_err(|_| { + ExecServerListenUrlParseError::InvalidWebSocketListenUrl(listen_url.to_string()) + }); + } + if listen_url == "stdio://" { + return Ok(ListenAddress::Stdio); } + + Err(ExecServerListenUrlParseError::UnsupportedListenUrl( + listen_url.to_string(), + )) } pub(crate) async fn run_transport( - transport: ExecServerTransport, + listen_url: &str, ) -> Result<(), Box> { - match transport { - ExecServerTransport::Stdio => { - run_connection(JsonRpcConnection::from_stdio( - tokio::io::stdin(), - tokio::io::stdout(), - "exec-server stdio".to_string(), - )) - .await; - Ok(()) - } - ExecServerTransport::WebSocket { bind_address } => { - run_websocket_listener(bind_address).await - } + match parse_listen_url(listen_url)? { + ListenAddress::Websocket(bind_address) => run_websocket_listener(bind_address).await, + ListenAddress::Stdio => run_stdio_listener().await, } } +async fn run_stdio_listener() -> Result<(), Box> { + run_connection(JsonRpcConnection::from_stdio( + io::stdin(), + io::stdout(), + "exec-server stdio".to_string(), + )) + .await; + Ok(()) +} + async fn run_websocket_listener( bind_address: SocketAddr, ) -> Result<(), Box> { let listener = TcpListener::bind(bind_address).await?; let local_addr = listener.local_addr()?; - print_websocket_startup_banner(local_addr); + let listen_message = format!("codex-exec-server listening on ws://{local_addr}"); + tracing::info!("{}", listen_message); + eprintln!("{listen_message}"); loop { let (stream, peer_addr) = listener.accept().await?; @@ -113,54 +106,6 @@ async fn run_websocket_listener( } } -#[allow(clippy::print_stderr)] -fn print_websocket_startup_banner(addr: SocketAddr) { - eprintln!("codex-exec-server listening on ws://{addr}"); -} - #[cfg(test)] -mod tests { - use pretty_assertions::assert_eq; - - use super::ExecServerTransport; - - #[test] - fn exec_server_transport_parses_stdio_listen_url() { - let transport = - ExecServerTransport::from_listen_url(ExecServerTransport::DEFAULT_LISTEN_URL) - .expect("stdio listen URL should parse"); - assert_eq!(transport, ExecServerTransport::Stdio); - } - - #[test] - fn exec_server_transport_parses_websocket_listen_url() { - let transport = ExecServerTransport::from_listen_url("ws://127.0.0.1:1234") - .expect("websocket listen URL should parse"); - assert_eq!( - transport, - ExecServerTransport::WebSocket { - bind_address: "127.0.0.1:1234".parse().expect("valid socket address"), - } - ); - } - - #[test] - fn exec_server_transport_rejects_invalid_websocket_listen_url() { - let err = ExecServerTransport::from_listen_url("ws://localhost:1234") - .expect_err("hostname bind address should be rejected"); - assert_eq!( - err.to_string(), - "invalid websocket --listen URL `ws://localhost:1234`; expected `ws://IP:PORT`" - ); - } - - #[test] - fn exec_server_transport_rejects_unsupported_listen_url() { - let err = ExecServerTransport::from_listen_url("http://127.0.0.1:1234") - .expect_err("unsupported scheme should fail"); - assert_eq!( - err.to_string(), - "unsupported --listen URL `http://127.0.0.1:1234`; expected `stdio://` or `ws://IP:PORT`" - ); - } -} +#[path = "transport_tests.rs"] +mod transport_tests; diff --git a/codex-rs/exec-server/src/server/transport_tests.rs b/codex-rs/exec-server/src/server/transport_tests.rs new file mode 100644 index 00000000000..6aed843cac0 --- /dev/null +++ b/codex-rs/exec-server/src/server/transport_tests.rs @@ -0,0 +1,50 @@ +use pretty_assertions::assert_eq; + +use super::DEFAULT_LISTEN_URL; +use super::parse_listen_url; + +#[test] +fn parse_listen_url_accepts_default_websocket_url() { + let bind_address = parse_listen_url(DEFAULT_LISTEN_URL) + .expect("default listen URL should parse"); + assert_eq!( + bind_address, + super::ListenAddress::Websocket("127.0.0.1:0".parse().expect("valid socket address")) + ); +} + +#[test] +fn parse_listen_url_accepts_stdio_url() { + let listen_address = parse_listen_url("stdio://").expect("stdio listen URL should parse"); + assert_eq!(listen_address, super::ListenAddress::Stdio); +} + +#[test] +fn parse_listen_url_accepts_websocket_url() { + let bind_address = parse_listen_url("ws://127.0.0.1:1234") + .expect("websocket listen URL should parse"); + assert_eq!( + bind_address, + super::ListenAddress::Websocket("127.0.0.1:1234".parse().expect("valid socket address")) + ); +} + +#[test] +fn parse_listen_url_rejects_invalid_websocket_url() { + let err = parse_listen_url("ws://localhost:1234") + .expect_err("hostname bind address should be rejected"); + assert_eq!( + err.to_string(), + "invalid websocket --listen URL `ws://localhost:1234`; expected `ws://IP:PORT`" + ); +} + +#[test] +fn parse_listen_url_rejects_unsupported_url() { + let err = + parse_listen_url("http://127.0.0.1:1234").expect_err("unsupported scheme should fail"); + assert_eq!( + err.to_string(), + "unsupported --listen URL `http://127.0.0.1:1234`; expected `ws://IP:PORT`" + ); +} diff --git a/codex-rs/exec-server/tests/initialize.rs b/codex-rs/exec-server/tests/initialize.rs new file mode 100644 index 00000000000..04a36348c82 --- /dev/null +++ b/codex-rs/exec-server/tests/initialize.rs @@ -0,0 +1,39 @@ +#![cfg(unix)] + +mod common; + +use codex_app_server_protocol::JSONRPCMessage; +use codex_app_server_protocol::JSONRPCResponse; +use codex_exec_server::InitializeParams; +use codex_exec_server::InitializeResponse; +use common::exec_server::exec_server; +use pretty_assertions::assert_eq; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn exec_server_accepts_initialize() -> anyhow::Result<()> { + let mut server = exec_server().await?; + let initialize_id = server + .send_request( + "initialize", + serde_json::to_value(InitializeParams { + client_name: "exec-server-test".to_string(), + })?, + ) + .await?; + + let response = server.next_event().await?; + let JSONRPCMessage::Response(JSONRPCResponse { id, result }) = response else { + panic!("expected initialize response"); + }; + assert_eq!(id, initialize_id); + let initialize_response: InitializeResponse = serde_json::from_value(result)?; + assert_eq!( + initialize_response, + InitializeResponse { + protocol_version: "exec-server.v0".to_string() + } + ); + + server.shutdown().await?; + Ok(()) +} diff --git a/codex-rs/exec-server/tests/process.rs b/codex-rs/exec-server/tests/process.rs new file mode 100644 index 00000000000..95583c61aa3 --- /dev/null +++ b/codex-rs/exec-server/tests/process.rs @@ -0,0 +1,74 @@ +#![cfg(unix)] + +mod common; + +use codex_app_server_protocol::JSONRPCMessage; +use codex_app_server_protocol::JSONRPCResponse; +use codex_exec_server::InitializeParams; +use codex_exec_server::ExecResponse; +use common::exec_server::exec_server; +use pretty_assertions::assert_eq; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn exec_server_stubs_process_start_over_websocket() -> anyhow::Result<()> { + let mut server = exec_server().await?; + let initialize_id = server + .send_request( + "initialize", + serde_json::to_value(InitializeParams { + client_name: "exec-server-test".to_string(), + })?, + ) + .await?; + let _ = server + .wait_for_event(|event| { + matches!( + event, + JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &initialize_id + ) + }) + .await?; + + server + .send_notification( + "initialized", + serde_json::json!({}), + ) + .await?; + + let process_start_id = server + .send_request( + "process/start", + serde_json::json!({ + "processId": "proc-1", + "argv": ["true"], + "cwd": std::env::current_dir()?, + "env": {}, + "tty": false, + "arg0": null + }), + ) + .await?; + let response = server + .wait_for_event(|event| { + matches!( + event, + JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &process_start_id + ) + }) + .await?; + let JSONRPCMessage::Response(JSONRPCResponse { id, result }) = response else { + panic!("expected process/start response"); + }; + assert_eq!(id, process_start_id); + let process_start_response: ExecResponse = serde_json::from_value(result)?; + assert_eq!( + process_start_response, + ExecResponse { + process_id: "proc-1".to_string() + } + ); + + server.shutdown().await?; + Ok(()) +} diff --git a/codex-rs/exec-server/tests/stdio_smoke.rs b/codex-rs/exec-server/tests/stdio_smoke.rs index 77374dc4679..bb033a9efbf 100644 --- a/codex-rs/exec-server/tests/stdio_smoke.rs +++ b/codex-rs/exec-server/tests/stdio_smoke.rs @@ -41,6 +41,7 @@ use tokio::time::timeout; async fn exec_server_accepts_initialize_over_stdio() -> anyhow::Result<()> { let binary = cargo_bin("codex-exec-server")?; let mut child = Command::new(binary); + child.args(["--listen", "stdio://"]); child.stdin(Stdio::piped()); child.stdout(Stdio::piped()); child.stderr(Stdio::inherit()); diff --git a/codex-rs/exec-server/tests/websocket.rs b/codex-rs/exec-server/tests/websocket.rs new file mode 100644 index 00000000000..d653da6e2b4 --- /dev/null +++ b/codex-rs/exec-server/tests/websocket.rs @@ -0,0 +1,65 @@ +#![cfg(unix)] + +mod common; + +use codex_app_server_protocol::JSONRPCError; +use codex_app_server_protocol::JSONRPCMessage; +use codex_app_server_protocol::JSONRPCResponse; +use codex_exec_server::InitializeParams; +use codex_exec_server::InitializeResponse; +use common::exec_server::exec_server; +use pretty_assertions::assert_eq; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn exec_server_reports_malformed_websocket_json_and_keeps_running() -> anyhow::Result<()> { + let mut server = exec_server().await?; + server.send_raw_text("not-json").await?; + + let response = server + .wait_for_event(|event| matches!(event, JSONRPCMessage::Error(_))) + .await?; + let JSONRPCMessage::Error(JSONRPCError { id, error }) = response else { + panic!("expected malformed-message error response"); + }; + assert_eq!(id, codex_app_server_protocol::RequestId::Integer(-1)); + assert_eq!(error.code, -32600); + assert!( + error + .message + .starts_with("failed to parse websocket JSON-RPC message from exec-server websocket"), + "unexpected malformed-message error: {}", + error.message + ); + + let initialize_id = server + .send_request( + "initialize", + serde_json::to_value(InitializeParams { + client_name: "exec-server-test".to_string(), + })?, + ) + .await?; + + let response = server + .wait_for_event(|event| { + matches!( + event, + JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &initialize_id + ) + }) + .await?; + let JSONRPCMessage::Response(JSONRPCResponse { id, result }) = response else { + panic!("expected initialize response after malformed input"); + }; + assert_eq!(id, initialize_id); + let initialize_response: InitializeResponse = serde_json::from_value(result)?; + assert_eq!( + initialize_response, + InitializeResponse { + protocol_version: "exec-server.v0".to_string() + } + ); + + server.shutdown().await?; + Ok(()) +} From ad10fb69cba54f4f471a208904148668d92ff3c7 Mon Sep 17 00:00:00 2001 From: starr-openai Date: Thu, 19 Mar 2026 01:18:16 +0000 Subject: [PATCH 03/10] Fix exec-server compile breakage and websocket test helpers --- .../exec-server/src/bin/codex-exec-server.rs | 8 +- codex-rs/exec-server/src/lib.rs | 3 +- codex-rs/exec-server/src/rpc.rs | 216 +----------------- codex-rs/exec-server/src/server.rs | 11 +- codex-rs/exec-server/tests/common.rs | 182 +++++++++++++++ codex-rs/exec-server/tests/process.rs | 4 +- codex-rs/exec-server/tests/websocket.rs | 2 +- 7 files changed, 203 insertions(+), 223 deletions(-) create mode 100644 codex-rs/exec-server/tests/common.rs diff --git a/codex-rs/exec-server/src/bin/codex-exec-server.rs b/codex-rs/exec-server/src/bin/codex-exec-server.rs index 16df84d9b6f..2638ecd1b76 100644 --- a/codex-rs/exec-server/src/bin/codex-exec-server.rs +++ b/codex-rs/exec-server/src/bin/codex-exec-server.rs @@ -1,5 +1,5 @@ use clap::Parser; -use codex_exec_server::ExecServerTransport; +use codex_exec_server::DEFAULT_LISTEN_URL; #[derive(Debug, Parser)] struct ExecServerArgs { @@ -8,15 +8,15 @@ struct ExecServerArgs { #[arg( long = "listen", value_name = "URL", - default_value = ExecServerTransport::DEFAULT_LISTEN_URL + default_value = DEFAULT_LISTEN_URL )] - listen: ExecServerTransport, + listen: String, } #[tokio::main] async fn main() { let args = ExecServerArgs::parse(); - if let Err(err) = codex_exec_server::run_main_with_transport(args.listen).await { + if let Err(err) = codex_exec_server::run_main_with_transport(&args.listen).await { eprintln!("{err}"); std::process::exit(1); } diff --git a/codex-rs/exec-server/src/lib.rs b/codex-rs/exec-server/src/lib.rs index 12bf0e17f9d..2ffcd30196b 100644 --- a/codex-rs/exec-server/src/lib.rs +++ b/codex-rs/exec-server/src/lib.rs @@ -42,7 +42,8 @@ pub use protocol::TerminateParams; pub use protocol::TerminateResponse; pub use protocol::WriteParams; pub use protocol::WriteResponse; -pub use server::ExecServerTransport; +pub use server::DEFAULT_LISTEN_URL; pub use server::ExecServerTransportParseError; pub use server::run_main; +pub use server::run_main_with_listen_url; pub use server::run_main_with_transport; diff --git a/codex-rs/exec-server/src/rpc.rs b/codex-rs/exec-server/src/rpc.rs index a57e164919e..8d79883c570 100644 --- a/codex-rs/exec-server/src/rpc.rs +++ b/codex-rs/exec-server/src/rpc.rs @@ -4,7 +4,6 @@ use std::pin::Pin; use std::sync::Arc; use std::sync::atomic::AtomicI64; use std::sync::atomic::Ordering; -use std::pin::Pin; use codex_app_server_protocol::JSONRPCError; use codex_app_server_protocol::JSONRPCErrorError; @@ -197,6 +196,10 @@ impl RpcClient { break; } } + JsonRpcConnectionEvent::MalformedMessage { reason } => { + warn!("JSON-RPC client closing after malformed message: {reason}"); + break; + } JsonRpcConnectionEvent::Disconnected { reason } => { let _ = event_tx.send(RpcClientEvent::Disconnected { reason }).await; drain_pending(&pending_for_reader).await; @@ -442,217 +445,6 @@ async fn drain_pending(pending: &Mutex>) { } } -#[derive(Debug)] -pub(crate) enum RpcServerOutboundMessage { - Response { - request_id: RequestId, - result: Value, - }, - Error { - request_id: RequestId, - error: JSONRPCErrorError, - }, - Notification(JSONRPCNotification), -} - -impl RpcServerOutboundMessage { - fn response(request_id: RequestId, result: Result) -> Self { - match result { - Ok(result) => Self::Response { - request_id, - result, - }, - Err(error) => Self::Error { - request_id, - error, - }, - } - } -} - -pub(crate) fn invalid_request(message: String) -> JSONRPCErrorError { - JSONRPCErrorError { - code: -32600, - data: None, - message, - } -} - -pub(crate) fn invalid_params(message: String) -> JSONRPCErrorError { - JSONRPCErrorError { - code: -32602, - data: None, - message, - } -} - -pub(crate) fn method_not_found(message: String) -> JSONRPCErrorError { - JSONRPCErrorError { - code: -32601, - data: None, - message, - } -} - -pub(crate) fn internal_error(message: String) -> JSONRPCErrorError { - JSONRPCErrorError { - code: -32603, - data: None, - message, - } -} - -pub(crate) fn encode_server_message( - message: RpcServerOutboundMessage, -) -> Result { - Ok(match message { - RpcServerOutboundMessage::Response { request_id, result } => { - JSONRPCMessage::Response(JSONRPCResponse { id: request_id, result }) - } - RpcServerOutboundMessage::Error { request_id, error } => { - JSONRPCMessage::Error(JSONRPCError { id: request_id, error }) - } - RpcServerOutboundMessage::Notification(notification) => { - JSONRPCMessage::Notification(notification) - } - }) -} - -#[derive(Clone)] -pub(crate) struct RpcNotificationSender { - tx: mpsc::Sender, -} - -impl RpcNotificationSender { - pub(crate) fn new(tx: mpsc::Sender) -> Self { - Self { tx } - } - - pub(crate) async fn notify( - &self, - method: &str, - params: &P, - ) -> Result<(), serde_json::Error> { - let params = serde_json::to_value(params)?; - self.tx - .send(RpcServerOutboundMessage::Notification(JSONRPCNotification { - method: method.to_string(), - params: Some(params), - })) - .await - .map_err(|_| { - serde_json::Error::io(std::io::Error::new( - std::io::ErrorKind::BrokenPipe, - "JSON-RPC transport closed", - )) - }) - } -} - -type RpcRequestRoute = dyn Fn( - Arc, - codex_app_server_protocol::JSONRPCRequest, - ) -> Pin + Send>> - + Send - + Sync; - -type RpcNotificationRoute = dyn Fn( - Arc, - codex_app_server_protocol::JSONRPCNotification, - ) -> Pin> + Send>> - + Send - + Sync; - -pub(crate) struct RpcRouter { - request_routes: HashMap>>, - notification_routes: HashMap>>, -} - -impl RpcRouter { - pub(crate) fn new() -> Self { - Self { - request_routes: HashMap::new(), - notification_routes: HashMap::new(), - } - } - - pub(crate) fn request(&mut self, method: &str, handler: F) - where - P: DeserializeOwned + Send + 'static, - R: Serialize + Send + 'static, - F: Fn(Arc, P) -> Fut + Send + Sync + 'static, - Fut: std::future::Future> + Send + 'static, - { - let method = method.to_string(); - let handler = std::sync::Arc::new(handler); - self.request_routes.insert( - method, - Box::new( - move |server_handler: Arc, request: codex_app_server_protocol::JSONRPCRequest| { - let handler = std::sync::Arc::clone(&handler); - let params = serde_json::from_value::

(request.params.unwrap_or(Value::Null)) - .map_err(|error| invalid_params(error.to_string())); - let request_id = request.id; - Box::pin(async move { - let result = match params { - Ok(params) => handler(server_handler.clone(), params) - .await - .and_then(|value| { - serde_json::to_value(value) - .map_err(|error| invalid_params(error.to_string())) - }), - Err(error) => Err(error), - }; - RpcServerOutboundMessage::response(request_id, result) - }) - }, - ), - ); - } - - pub(crate) fn notification(&mut self, method: &str, handler: F) - where - P: DeserializeOwned + Send + 'static, - F: Fn(Arc, P) -> Fut + Send + Sync + 'static, - Fut: std::future::Future> + Send + 'static, - { - let method = method.to_string(); - let handler = std::sync::Arc::new(handler); - self.notification_routes.insert( - method, - Box::new( - move | - server_handler: Arc, - notification: codex_app_server_protocol::JSONRPCNotification| { - let handler = std::sync::Arc::clone(&handler); - let params = serde_json::from_value::

(notification.params.unwrap_or(Value::Null)) - .map_err(|err| err.to_string()); - Box::pin(async move { - match params { - Ok(params) => handler(server_handler.clone(), params).await, - Err(error) => Err(error), - } - }) - }, - ), - ); - } - - pub(crate) fn request_route( - &self, - method: &str, - ) -> Option<&Box>> { - self.request_routes.get(method) - } - - pub(crate) fn notification_route( - &self, - method: &str, - ) -> Option<&Box>> { - self.notification_routes.get(method) - } -} - #[cfg(test)] mod tests { use std::time::Duration; diff --git a/codex-rs/exec-server/src/server.rs b/codex-rs/exec-server/src/server.rs index 10e6b19c66d..24c82972415 100644 --- a/codex-rs/exec-server/src/server.rs +++ b/codex-rs/exec-server/src/server.rs @@ -1,16 +1,15 @@ mod filesystem; mod handler; -mod jsonrpc; mod registry; mod processor; mod transport; pub(crate) use handler::ExecServerHandler; pub use transport::DEFAULT_LISTEN_URL; -pub use transport::ExecServerListenUrlParseError; +pub use transport::ExecServerListenUrlParseError as ExecServerTransportParseError; pub async fn run_main() -> Result<(), Box> { - run_main_with_listen_url(DEFAULT_LISTEN_URL).await + run_main_with_transport(DEFAULT_LISTEN_URL).await } pub async fn run_main_with_listen_url( @@ -18,3 +17,9 @@ pub async fn run_main_with_listen_url( ) -> Result<(), Box> { transport::run_transport(listen_url).await } + +pub async fn run_main_with_transport( + listen_url: &str, +) -> Result<(), Box> { + transport::run_transport(listen_url).await +} diff --git a/codex-rs/exec-server/tests/common.rs b/codex-rs/exec-server/tests/common.rs new file mode 100644 index 00000000000..aa076a6bc87 --- /dev/null +++ b/codex-rs/exec-server/tests/common.rs @@ -0,0 +1,182 @@ +use std::sync::atomic::AtomicI64; +use std::sync::atomic::Ordering; + +use codex_app_server_protocol::JSONRPCMessage; +use codex_app_server_protocol::JSONRPCNotification; +use codex_app_server_protocol::JSONRPCRequest; +use codex_app_server_protocol::RequestId; +use codex_utils_cargo_bin::cargo_bin; +use futures::{SinkExt, StreamExt}; +use tokio::io::AsyncBufReadExt; +use tokio::io::BufReader; +use tokio::process::Command; +use std::process::Stdio; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; +use tokio::time::timeout; +use tokio_tungstenite::tungstenite::Message; +use tokio_tungstenite::connect_async; + +enum OutgoingMessage { + Json(JSONRPCMessage), + RawText(String), +} + +pub struct ExecServer { + child: tokio::process::Child, + next_request_id: AtomicI64, + incoming_rx: mpsc::Receiver, + outgoing_tx: mpsc::Sender, + reader_task: JoinHandle<()>, + writer_task: JoinHandle<()>, +} + +impl ExecServer { + pub async fn send_request( + &mut self, + method: &str, + params: serde_json::Value, + ) -> anyhow::Result { + let request_id = RequestId::Integer(self.next_request_id.fetch_add(1, Ordering::SeqCst)); + let request = JSONRPCRequest { + id: request_id.clone(), + method: method.to_string(), + params: Some(params), + trace: None, + }; + self.outgoing_tx + .send(OutgoingMessage::Json(JSONRPCMessage::Request(request))) + .await?; + Ok(request_id) + } + + pub async fn send_notification( + &mut self, + method: &str, + params: serde_json::Value, + ) -> anyhow::Result<()> { + let notification = JSONRPCNotification { + method: method.to_string(), + params: Some(params), + }; + self.outgoing_tx + .send(OutgoingMessage::Json(JSONRPCMessage::Notification( + notification, + ))) + .await?; + Ok(()) + } + + pub async fn send_raw_text(&mut self, text: &str) -> anyhow::Result<()> { + self.outgoing_tx + .send(OutgoingMessage::RawText(text.to_string())) + .await?; + Ok(()) + } + + pub async fn next_event(&mut self) -> anyhow::Result { + self.incoming_rx + .recv() + .await + .ok_or_else(|| anyhow::anyhow!("exec-server closed before next event")) + } + + pub async fn wait_for_event( + &mut self, + predicate: impl Fn(&JSONRPCMessage) -> bool, + ) -> anyhow::Result { + loop { + let event = self.next_event().await?; + if predicate(&event) { + return Ok(event); + } + } + } + + pub async fn shutdown(&mut self) -> anyhow::Result<()> { + self.reader_task.abort(); + self.writer_task.abort(); + self.child.start_kill()?; + Ok(()) + } +} + +pub mod exec_server { + use super::*; + + pub async fn exec_server() -> anyhow::Result { + let binary = cargo_bin("codex-exec-server")?; + let mut child = Command::new(binary); + child.args(["--listen", "ws://127.0.0.1:0"]); + child.stdin(Stdio::null()); + child.stdout(Stdio::null()); + child.stderr(Stdio::piped()); + let mut child = child.spawn()?; + + let stderr = child.stderr.take().expect("stderr should be piped"); + let mut stderr_lines = BufReader::new(stderr).lines(); + let websocket_url = read_websocket_url(&mut stderr_lines).await?; + + let (websocket, _) = connect_async(websocket_url).await?; + let (mut outgoing_ws, mut incoming_ws) = websocket.split(); + let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(128); + let (incoming_tx, incoming_rx) = mpsc::channel::(128); + + let reader_task = tokio::spawn(async move { + while let Some(message) = incoming_ws.next().await { + let Ok(message) = message else { + break; + }; + let outgoing = match message { + Message::Text(text) => serde_json::from_str::(&text), + Message::Binary(bytes) => serde_json::from_slice::(&bytes), + _ => continue, + }; + if let Ok(message) = outgoing && let Err(_err) = incoming_tx.send(message).await { + break; + } + } + }); + + let writer_task = tokio::spawn(async move { + while let Some(message) = outgoing_rx.recv().await { + let outgoing = match message { + OutgoingMessage::Json(message) => { + match serde_json::to_string(&message) { + Ok(json) => Message::Text(json.into()), + Err(_) => continue, + } + } + OutgoingMessage::RawText(message) => Message::Text(message.into()), + }; + if outgoing_ws.send(outgoing).await.is_err() { + break; + } + } + }); + + Ok(ExecServer { + child, + next_request_id: AtomicI64::new(1), + incoming_rx, + outgoing_tx, + reader_task, + writer_task, + }) + } + + async fn read_websocket_url(lines: &mut tokio::io::Lines>) -> anyhow::Result + where + R: tokio::io::AsyncRead + Unpin, + { + let line = timeout(std::time::Duration::from_secs(5), lines.next_line()) + .await?? + .ok_or_else(|| anyhow::anyhow!("missing websocket startup banner"))?; + + let websocket_url = line + .split_whitespace() + .find(|part| part.starts_with("ws://")) + .ok_or_else(|| anyhow::anyhow!("missing websocket URL in startup banner: {line}"))?; + Ok(websocket_url.to_string()) + } +} diff --git a/codex-rs/exec-server/tests/process.rs b/codex-rs/exec-server/tests/process.rs index 95583c61aa3..aac2181bb52 100644 --- a/codex-rs/exec-server/tests/process.rs +++ b/codex-rs/exec-server/tests/process.rs @@ -24,7 +24,7 @@ async fn exec_server_stubs_process_start_over_websocket() -> anyhow::Result<()> .wait_for_event(|event| { matches!( event, - JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &initialize_id + JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if *id == initialize_id ) }) .await?; @@ -53,7 +53,7 @@ async fn exec_server_stubs_process_start_over_websocket() -> anyhow::Result<()> .wait_for_event(|event| { matches!( event, - JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &process_start_id + JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if *id == process_start_id ) }) .await?; diff --git a/codex-rs/exec-server/tests/websocket.rs b/codex-rs/exec-server/tests/websocket.rs index d653da6e2b4..c0c0543a824 100644 --- a/codex-rs/exec-server/tests/websocket.rs +++ b/codex-rs/exec-server/tests/websocket.rs @@ -44,7 +44,7 @@ async fn exec_server_reports_malformed_websocket_json_and_keeps_running() -> any .wait_for_event(|event| { matches!( event, - JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if id == &initialize_id + JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if *id == initialize_id ) }) .await?; From b327fe8283a45a9c4813a89cd2af8c8412cfef87 Mon Sep 17 00:00:00 2001 From: starr-openai Date: Thu, 19 Mar 2026 01:21:58 +0000 Subject: [PATCH 04/10] Fix taiki-e install-action input usage in CI --- .github/workflows/rust-ci.yml | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/.github/workflows/rust-ci.yml b/.github/workflows/rust-ci.yml index 0be65403d37..5dc7cd3295e 100644 --- a/.github/workflows/rust-ci.yml +++ b/.github/workflows/rust-ci.yml @@ -86,8 +86,7 @@ jobs: - uses: dtolnay/rust-toolchain@1.93.0 - uses: taiki-e/install-action@44c6d64aa62cd779e873306675c7a58e86d6d532 # v2 with: - tool: cargo-shear - version: 1.5.1 + tool: cargo-shear@1.5.1 - name: cargo shear run: cargo shear @@ -291,8 +290,7 @@ jobs: if: ${{ env.USE_SCCACHE == 'true' }} uses: taiki-e/install-action@44c6d64aa62cd779e873306675c7a58e86d6d532 # v2 with: - tool: sccache - version: 0.7.5 + tool: sccache@0.7.5 - name: Configure sccache backend if: ${{ env.USE_SCCACHE == 'true' }} @@ -421,8 +419,7 @@ jobs: if: ${{ matrix.profile == 'release' }} uses: taiki-e/install-action@44c6d64aa62cd779e873306675c7a58e86d6d532 # v2 with: - tool: cargo-chef - version: 0.1.71 + tool: cargo-chef@0.1.71 - name: Pre-warm dependency cache (cargo-chef) if: ${{ matrix.profile == 'release' }} @@ -593,8 +590,7 @@ jobs: if: ${{ env.USE_SCCACHE == 'true' }} uses: taiki-e/install-action@44c6d64aa62cd779e873306675c7a58e86d6d532 # v2 with: - tool: sccache - version: 0.7.5 + tool: sccache@0.7.5 - name: Configure sccache backend if: ${{ env.USE_SCCACHE == 'true' }} @@ -628,8 +624,7 @@ jobs: - uses: taiki-e/install-action@44c6d64aa62cd779e873306675c7a58e86d6d532 # v2 with: - tool: nextest - version: 0.9.103 + tool: nextest@0.9.103 - name: Enable unprivileged user namespaces (Linux) if: runner.os == 'Linux' From b97b7484f1ad965591b8fb546702453ae0fbc15c Mon Sep 17 00:00:00 2001 From: starr-openai Date: Thu, 19 Mar 2026 01:25:04 +0000 Subject: [PATCH 05/10] Fix ci npm staging workflow URL selection Co-authored-by: Codex --- .github/workflows/ci.yml | 41 +++++++++++++++++++++++++++++++++------- 1 file changed, 34 insertions(+), 7 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0588d01a78c..d6124c18daf 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -36,14 +36,41 @@ jobs: GH_TOKEN: ${{ github.token }} run: | set -euo pipefail - # Use a rust-release version that includes all native binaries. - CODEX_VERSION=0.74.0 + # Use the newest successful rust-release workflow that still has native artifacts. OUTPUT_DIR="${RUNNER_TEMP}" - python3 ./scripts/stage_npm_packages.py \ - --release-version "$CODEX_VERSION" \ - --package codex \ - --output-dir "$OUTPUT_DIR" - PACK_OUTPUT="${OUTPUT_DIR}/codex-npm-${CODEX_VERSION}.tgz" + set +e + WORKFLOW_URLS=$(gh run list \ + --workflow .github/workflows/rust-release.yml \ + --json status,conclusion,headBranch,url \ + --jq 'map(select(.status=="completed" and .conclusion=="success" and (.headBranch | startswith("rust-v")) and (.url | contains("/actions/runs/")) ) | .[].url') + set -e + + if [ -z "$WORKFLOW_URLS" ]; then + echo "Unable to resolve a completed successful rust-release workflow." + exit 1 + fi + + for WORKFLOW_URL in $WORKFLOW_URLS; do + CODEX_VERSION="$(gh run view "$WORKFLOW_URL" --json headBranch -q '.headBranch | sub("^rust-v"; "")')" + + echo "Attempting npm staging from ${WORKFLOW_URL} (version ${CODEX_VERSION})." + if python3 ./scripts/stage_npm_packages.py \ + --release-version "$CODEX_VERSION" \ + --workflow-url "$WORKFLOW_URL" \ + --package codex \ + --output-dir "$OUTPUT_DIR"; then + PACK_OUTPUT="${OUTPUT_DIR}/codex-npm-${CODEX_VERSION}.tgz" + break + fi + echo "Npm staging failed for ${WORKFLOW_URL}; trying next rust-release run." + done + + if [ -z "${PACK_OUTPUT:-}" ]; then + echo "::error::No eligible rust-release run could produce a stageable npm package." + exit 1 + fi + + echo "Staged package at ${PACK_OUTPUT}" echo "pack_output=$PACK_OUTPUT" >> "$GITHUB_OUTPUT" - name: Upload staged npm package artifact From 2b7df7c8b47268cc75f3a14b5c995d2cc96383d8 Mon Sep 17 00:00:00 2001 From: starr-openai Date: Thu, 19 Mar 2026 01:29:37 +0000 Subject: [PATCH 06/10] Fix ci jq map select syntax for rust-release lookup Co-authored-by: Codex --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d6124c18daf..86c85ff821e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -42,7 +42,7 @@ jobs: WORKFLOW_URLS=$(gh run list \ --workflow .github/workflows/rust-release.yml \ --json status,conclusion,headBranch,url \ - --jq 'map(select(.status=="completed" and .conclusion=="success" and (.headBranch | startswith("rust-v")) and (.url | contains("/actions/runs/")) ) | .[].url') + --jq 'map(select(.status=="completed" and .conclusion=="success" and (.headBranch | startswith("rust-v")) and (.url | contains("/actions/runs/"))) | .[].url') set -e if [ -z "$WORKFLOW_URLS" ]; then From 468dc1f09a41a4ff97e3e204aac447c0090d57a9 Mon Sep 17 00:00:00 2001 From: starr-openai Date: Thu, 19 Mar 2026 01:31:50 +0000 Subject: [PATCH 07/10] Fix jq filter for rust release workflow lookup Co-authored-by: Codex --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 86c85ff821e..fa663141f07 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -42,7 +42,7 @@ jobs: WORKFLOW_URLS=$(gh run list \ --workflow .github/workflows/rust-release.yml \ --json status,conclusion,headBranch,url \ - --jq 'map(select(.status=="completed" and .conclusion=="success" and (.headBranch | startswith("rust-v")) and (.url | contains("/actions/runs/"))) | .[].url') + --jq '.[] | select(.status=="completed" and .conclusion=="success" and (.headBranch | startswith("rust-v")) and (.url | contains("/actions/runs/"))) | .url') set -e if [ -z "$WORKFLOW_URLS" ]; then From 963d109cff741f47959b0107f63552f2b70dc480 Mon Sep 17 00:00:00 2001 From: starr-openai Date: Thu, 19 Mar 2026 01:33:24 +0000 Subject: [PATCH 08/10] Fix rust-release run ID parsing in ci npm staging Co-authored-by: Codex --- .github/workflows/ci.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fa663141f07..504a565fadf 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -51,7 +51,8 @@ jobs: fi for WORKFLOW_URL in $WORKFLOW_URLS; do - CODEX_VERSION="$(gh run view "$WORKFLOW_URL" --json headBranch -q '.headBranch | sub("^rust-v"; "")')" + WORKFLOW_ID="${WORKFLOW_URL##*/}" + CODEX_VERSION="$(gh run view "$WORKFLOW_ID" --json headBranch -q '.headBranch | sub("^rust-v"; "")')" echo "Attempting npm staging from ${WORKFLOW_URL} (version ${CODEX_VERSION})." if python3 ./scripts/stage_npm_packages.py \ From 62450991e1911895584be2b12ff57081b807f8a2 Mon Sep 17 00:00:00 2001 From: starr-openai Date: Thu, 19 Mar 2026 01:54:00 +0000 Subject: [PATCH 09/10] fix exec-server processor ordering and task cleanup Co-authored-by: Codex --- codex-rs/exec-server/src/connection.rs | 1 - codex-rs/exec-server/src/server.rs | 2 +- codex-rs/exec-server/src/server/processor.rs | 7 +++- codex-rs/exec-server/src/server/transport.rs | 2 +- .../exec-server/src/server/transport_tests.rs | 8 ++--- codex-rs/exec-server/tests/common.rs | 32 ++++++++++++------- codex-rs/exec-server/tests/process.rs | 7 ++-- 7 files changed, 34 insertions(+), 25 deletions(-) diff --git a/codex-rs/exec-server/src/connection.rs b/codex-rs/exec-server/src/connection.rs index 18aa2040b3a..af03fc06865 100644 --- a/codex-rs/exec-server/src/connection.rs +++ b/codex-rs/exec-server/src/connection.rs @@ -11,7 +11,6 @@ use tokio::sync::mpsc; use tokio_tungstenite::WebSocketStream; use tokio_tungstenite::tungstenite::Message; - pub(crate) const CHANNEL_CAPACITY: usize = 128; #[derive(Debug)] diff --git a/codex-rs/exec-server/src/server.rs b/codex-rs/exec-server/src/server.rs index 24c82972415..98c55df4a6b 100644 --- a/codex-rs/exec-server/src/server.rs +++ b/codex-rs/exec-server/src/server.rs @@ -1,7 +1,7 @@ mod filesystem; mod handler; -mod registry; mod processor; +mod registry; mod transport; pub(crate) use handler::ExecServerHandler; diff --git a/codex-rs/exec-server/src/server/processor.rs b/codex-rs/exec-server/src/server/processor.rs index 2fc8995e6ac..518a1a78e0d 100644 --- a/codex-rs/exec-server/src/server/processor.rs +++ b/codex-rs/exec-server/src/server/processor.rs @@ -17,7 +17,7 @@ use crate::server::registry::build_router; pub(crate) async fn run_connection(connection: JsonRpcConnection) { let router = Arc::new(build_router()); - let (json_outgoing_tx, mut incoming_rx, _connection_tasks) = connection.into_parts(); + let (json_outgoing_tx, mut incoming_rx, connection_tasks) = connection.into_parts(); let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(CHANNEL_CAPACITY); let notifications = RpcNotificationSender::new(outgoing_tx.clone()); @@ -38,6 +38,7 @@ pub(crate) async fn run_connection(connection: JsonRpcConnection) { } }); + // Process inbound events sequentially to preserve initialize/initialized ordering. while let Some(event) = incoming_rx.recv().await { match event { JsonRpcConnectionEvent::MalformedMessage { reason } => { @@ -114,5 +115,9 @@ pub(crate) async fn run_connection(connection: JsonRpcConnection) { handler.shutdown().await; drop(outgoing_tx); + for task in connection_tasks { + task.abort(); + let _ = task.await; + } let _ = outbound_task.await; } diff --git a/codex-rs/exec-server/src/server/transport.rs b/codex-rs/exec-server/src/server/transport.rs index 2c83b8b87c2..6e8e7e7bbd8 100644 --- a/codex-rs/exec-server/src/server/transport.rs +++ b/codex-rs/exec-server/src/server/transport.rs @@ -1,7 +1,7 @@ use std::net::SocketAddr; -use tokio::net::TcpListener; use tokio::io; +use tokio::net::TcpListener; use tokio_tungstenite::accept_async; use tracing::warn; diff --git a/codex-rs/exec-server/src/server/transport_tests.rs b/codex-rs/exec-server/src/server/transport_tests.rs index 6aed843cac0..7e760167539 100644 --- a/codex-rs/exec-server/src/server/transport_tests.rs +++ b/codex-rs/exec-server/src/server/transport_tests.rs @@ -5,8 +5,8 @@ use super::parse_listen_url; #[test] fn parse_listen_url_accepts_default_websocket_url() { - let bind_address = parse_listen_url(DEFAULT_LISTEN_URL) - .expect("default listen URL should parse"); + let bind_address = + parse_listen_url(DEFAULT_LISTEN_URL).expect("default listen URL should parse"); assert_eq!( bind_address, super::ListenAddress::Websocket("127.0.0.1:0".parse().expect("valid socket address")) @@ -21,8 +21,8 @@ fn parse_listen_url_accepts_stdio_url() { #[test] fn parse_listen_url_accepts_websocket_url() { - let bind_address = parse_listen_url("ws://127.0.0.1:1234") - .expect("websocket listen URL should parse"); + let bind_address = + parse_listen_url("ws://127.0.0.1:1234").expect("websocket listen URL should parse"); assert_eq!( bind_address, super::ListenAddress::Websocket("127.0.0.1:1234".parse().expect("valid socket address")) diff --git a/codex-rs/exec-server/tests/common.rs b/codex-rs/exec-server/tests/common.rs index aa076a6bc87..75090eb1f1a 100644 --- a/codex-rs/exec-server/tests/common.rs +++ b/codex-rs/exec-server/tests/common.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + use std::sync::atomic::AtomicI64; use std::sync::atomic::Ordering; @@ -6,16 +8,17 @@ use codex_app_server_protocol::JSONRPCNotification; use codex_app_server_protocol::JSONRPCRequest; use codex_app_server_protocol::RequestId; use codex_utils_cargo_bin::cargo_bin; -use futures::{SinkExt, StreamExt}; +use futures::SinkExt; +use futures::StreamExt; +use std::process::Stdio; use tokio::io::AsyncBufReadExt; use tokio::io::BufReader; use tokio::process::Command; -use std::process::Stdio; use tokio::sync::mpsc; use tokio::task::JoinHandle; use tokio::time::timeout; -use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::connect_async; +use tokio_tungstenite::tungstenite::Message; enum OutgoingMessage { Json(JSONRPCMessage), @@ -113,7 +116,10 @@ pub mod exec_server { child.stderr(Stdio::piped()); let mut child = child.spawn()?; - let stderr = child.stderr.take().expect("stderr should be piped"); + let stderr = child + .stderr + .take() + .ok_or_else(|| anyhow::anyhow!("stderr should be piped"))?; let mut stderr_lines = BufReader::new(stderr).lines(); let websocket_url = read_websocket_url(&mut stderr_lines).await?; @@ -132,7 +138,9 @@ pub mod exec_server { Message::Binary(bytes) => serde_json::from_slice::(&bytes), _ => continue, }; - if let Ok(message) = outgoing && let Err(_err) = incoming_tx.send(message).await { + if let Ok(message) = outgoing + && let Err(_err) = incoming_tx.send(message).await + { break; } } @@ -141,12 +149,10 @@ pub mod exec_server { let writer_task = tokio::spawn(async move { while let Some(message) = outgoing_rx.recv().await { let outgoing = match message { - OutgoingMessage::Json(message) => { - match serde_json::to_string(&message) { - Ok(json) => Message::Text(json.into()), - Err(_) => continue, - } - } + OutgoingMessage::Json(message) => match serde_json::to_string(&message) { + Ok(json) => Message::Text(json.into()), + Err(_) => continue, + }, OutgoingMessage::RawText(message) => Message::Text(message.into()), }; if outgoing_ws.send(outgoing).await.is_err() { @@ -165,7 +171,9 @@ pub mod exec_server { }) } - async fn read_websocket_url(lines: &mut tokio::io::Lines>) -> anyhow::Result + async fn read_websocket_url( + lines: &mut tokio::io::Lines>, + ) -> anyhow::Result where R: tokio::io::AsyncRead + Unpin, { diff --git a/codex-rs/exec-server/tests/process.rs b/codex-rs/exec-server/tests/process.rs index aac2181bb52..23c50691bde 100644 --- a/codex-rs/exec-server/tests/process.rs +++ b/codex-rs/exec-server/tests/process.rs @@ -4,8 +4,8 @@ mod common; use codex_app_server_protocol::JSONRPCMessage; use codex_app_server_protocol::JSONRPCResponse; -use codex_exec_server::InitializeParams; use codex_exec_server::ExecResponse; +use codex_exec_server::InitializeParams; use common::exec_server::exec_server; use pretty_assertions::assert_eq; @@ -30,10 +30,7 @@ async fn exec_server_stubs_process_start_over_websocket() -> anyhow::Result<()> .await?; server - .send_notification( - "initialized", - serde_json::json!({}), - ) + .send_notification("initialized", serde_json::json!({})) .await?; let process_start_id = server From 3b3e78026e2cec34123bc3974f10904909db2f1b Mon Sep 17 00:00:00 2001 From: starr-openai Date: Thu, 19 Mar 2026 10:05:18 -0700 Subject: [PATCH 10/10] codex: disable sccache for macOS x86 CI (#15091) Co-authored-by: Codex --- .github/workflows/rust-ci.yml | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/.github/workflows/rust-ci.yml b/.github/workflows/rust-ci.yml index 5dc7cd3295e..f8a4c38c456 100644 --- a/.github/workflows/rust-ci.yml +++ b/.github/workflows/rust-ci.yml @@ -140,8 +140,10 @@ jobs: run: working-directory: codex-rs env: - # Speed up repeated builds across CI runs by caching compiled objects (non-Windows). - USE_SCCACHE: ${{ startsWith(matrix.runner, 'windows') && 'false' || 'true' }} + # Speed up repeated builds across CI runs by caching compiled objects, except on + # arm64 macOS runners cross-targeting x86_64 where ring/cc-rs can produce + # mixed-architecture archives under sccache. + USE_SCCACHE: ${{ (startsWith(matrix.runner, 'windows') || (matrix.runner == 'macos-15-xlarge' && matrix.target == 'x86_64-apple-darwin')) && 'false' || 'true' }} CARGO_INCREMENTAL: "0" SCCACHE_CACHE_SIZE: 10G # In rust-ci, representative release-profile checks use thin LTO for faster feedback. @@ -503,8 +505,10 @@ jobs: run: working-directory: codex-rs env: - # Speed up repeated builds across CI runs by caching compiled objects (non-Windows). - USE_SCCACHE: ${{ startsWith(matrix.runner, 'windows') && 'false' || 'true' }} + # Speed up repeated builds across CI runs by caching compiled objects, except on + # arm64 macOS runners cross-targeting x86_64 where ring/cc-rs can produce + # mixed-architecture archives under sccache. + USE_SCCACHE: ${{ (startsWith(matrix.runner, 'windows') || (matrix.runner == 'macos-15-xlarge' && matrix.target == 'x86_64-apple-darwin')) && 'false' || 'true' }} CARGO_INCREMENTAL: "0" SCCACHE_CACHE_SIZE: 10G