diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0588d01a78c..504a565fadf 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -36,14 +36,42 @@ jobs: GH_TOKEN: ${{ github.token }} run: | set -euo pipefail - # Use a rust-release version that includes all native binaries. - CODEX_VERSION=0.74.0 + # Use the newest successful rust-release workflow that still has native artifacts. OUTPUT_DIR="${RUNNER_TEMP}" - python3 ./scripts/stage_npm_packages.py \ - --release-version "$CODEX_VERSION" \ - --package codex \ - --output-dir "$OUTPUT_DIR" - PACK_OUTPUT="${OUTPUT_DIR}/codex-npm-${CODEX_VERSION}.tgz" + set +e + WORKFLOW_URLS=$(gh run list \ + --workflow .github/workflows/rust-release.yml \ + --json status,conclusion,headBranch,url \ + --jq '.[] | select(.status=="completed" and .conclusion=="success" and (.headBranch | startswith("rust-v")) and (.url | contains("/actions/runs/"))) | .url') + set -e + + if [ -z "$WORKFLOW_URLS" ]; then + echo "Unable to resolve a completed successful rust-release workflow." + exit 1 + fi + + for WORKFLOW_URL in $WORKFLOW_URLS; do + WORKFLOW_ID="${WORKFLOW_URL##*/}" + CODEX_VERSION="$(gh run view "$WORKFLOW_ID" --json headBranch -q '.headBranch | sub("^rust-v"; "")')" + + echo "Attempting npm staging from ${WORKFLOW_URL} (version ${CODEX_VERSION})." + if python3 ./scripts/stage_npm_packages.py \ + --release-version "$CODEX_VERSION" \ + --workflow-url "$WORKFLOW_URL" \ + --package codex \ + --output-dir "$OUTPUT_DIR"; then + PACK_OUTPUT="${OUTPUT_DIR}/codex-npm-${CODEX_VERSION}.tgz" + break + fi + echo "Npm staging failed for ${WORKFLOW_URL}; trying next rust-release run." + done + + if [ -z "${PACK_OUTPUT:-}" ]; then + echo "::error::No eligible rust-release run could produce a stageable npm package." + exit 1 + fi + + echo "Staged package at ${PACK_OUTPUT}" echo "pack_output=$PACK_OUTPUT" >> "$GITHUB_OUTPUT" - name: Upload staged npm package artifact diff --git a/.github/workflows/rust-ci.yml b/.github/workflows/rust-ci.yml index 0be65403d37..f8a4c38c456 100644 --- a/.github/workflows/rust-ci.yml +++ b/.github/workflows/rust-ci.yml @@ -86,8 +86,7 @@ jobs: - uses: dtolnay/rust-toolchain@1.93.0 - uses: taiki-e/install-action@44c6d64aa62cd779e873306675c7a58e86d6d532 # v2 with: - tool: cargo-shear - version: 1.5.1 + tool: cargo-shear@1.5.1 - name: cargo shear run: cargo shear @@ -141,8 +140,10 @@ jobs: run: working-directory: codex-rs env: - # Speed up repeated builds across CI runs by caching compiled objects (non-Windows). - USE_SCCACHE: ${{ startsWith(matrix.runner, 'windows') && 'false' || 'true' }} + # Speed up repeated builds across CI runs by caching compiled objects, except on + # arm64 macOS runners cross-targeting x86_64 where ring/cc-rs can produce + # mixed-architecture archives under sccache. + USE_SCCACHE: ${{ (startsWith(matrix.runner, 'windows') || (matrix.runner == 'macos-15-xlarge' && matrix.target == 'x86_64-apple-darwin')) && 'false' || 'true' }} CARGO_INCREMENTAL: "0" SCCACHE_CACHE_SIZE: 10G # In rust-ci, representative release-profile checks use thin LTO for faster feedback. @@ -291,8 +292,7 @@ jobs: if: ${{ env.USE_SCCACHE == 'true' }} uses: taiki-e/install-action@44c6d64aa62cd779e873306675c7a58e86d6d532 # v2 with: - tool: sccache - version: 0.7.5 + tool: sccache@0.7.5 - name: Configure sccache backend if: ${{ env.USE_SCCACHE == 'true' }} @@ -421,8 +421,7 @@ jobs: if: ${{ matrix.profile == 'release' }} uses: taiki-e/install-action@44c6d64aa62cd779e873306675c7a58e86d6d532 # v2 with: - tool: cargo-chef - version: 0.1.71 + tool: cargo-chef@0.1.71 - name: Pre-warm dependency cache (cargo-chef) if: ${{ matrix.profile == 'release' }} @@ -506,8 +505,10 @@ jobs: run: working-directory: codex-rs env: - # Speed up repeated builds across CI runs by caching compiled objects (non-Windows). - USE_SCCACHE: ${{ startsWith(matrix.runner, 'windows') && 'false' || 'true' }} + # Speed up repeated builds across CI runs by caching compiled objects, except on + # arm64 macOS runners cross-targeting x86_64 where ring/cc-rs can produce + # mixed-architecture archives under sccache. + USE_SCCACHE: ${{ (startsWith(matrix.runner, 'windows') || (matrix.runner == 'macos-15-xlarge' && matrix.target == 'x86_64-apple-darwin')) && 'false' || 'true' }} CARGO_INCREMENTAL: "0" SCCACHE_CACHE_SIZE: 10G @@ -593,8 +594,7 @@ jobs: if: ${{ env.USE_SCCACHE == 'true' }} uses: taiki-e/install-action@44c6d64aa62cd779e873306675c7a58e86d6d532 # v2 with: - tool: sccache - version: 0.7.5 + tool: sccache@0.7.5 - name: Configure sccache backend if: ${{ env.USE_SCCACHE == 'true' }} @@ -628,8 +628,7 @@ jobs: - uses: taiki-e/install-action@44c6d64aa62cd779e873306675c7a58e86d6d532 # v2 with: - tool: nextest - version: 0.9.103 + tool: nextest@0.9.103 - name: Enable unprivileged user namespaces (Linux) if: runner.os == 'Linux' diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 11b3fa6e812..1103b58b92b 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -2011,6 +2011,7 @@ dependencies = [ "base64 0.22.1", "clap", "codex-app-server-protocol", + "codex-environment", "codex-utils-cargo-bin", "codex-utils-pty", "futures", diff --git a/codex-rs/exec-server/Cargo.toml b/codex-rs/exec-server/Cargo.toml index 1b47760975a..744b2011f35 100644 --- a/codex-rs/exec-server/Cargo.toml +++ b/codex-rs/exec-server/Cargo.toml @@ -15,6 +15,7 @@ workspace = true base64 = { workspace = true } clap = { workspace = true, features = ["derive"] } codex-app-server-protocol = { workspace = true } +codex-environment = { workspace = true } codex-utils-pty = { workspace = true } futures = { workspace = true } serde = { workspace = true, features = ["derive"] } diff --git a/codex-rs/exec-server/src/bin/codex-exec-server.rs b/codex-rs/exec-server/src/bin/codex-exec-server.rs index 16df84d9b6f..2638ecd1b76 100644 --- a/codex-rs/exec-server/src/bin/codex-exec-server.rs +++ b/codex-rs/exec-server/src/bin/codex-exec-server.rs @@ -1,5 +1,5 @@ use clap::Parser; -use codex_exec_server::ExecServerTransport; +use codex_exec_server::DEFAULT_LISTEN_URL; #[derive(Debug, Parser)] struct ExecServerArgs { @@ -8,15 +8,15 @@ struct ExecServerArgs { #[arg( long = "listen", value_name = "URL", - default_value = ExecServerTransport::DEFAULT_LISTEN_URL + default_value = DEFAULT_LISTEN_URL )] - listen: ExecServerTransport, + listen: String, } #[tokio::main] async fn main() { let args = ExecServerArgs::parse(); - if let Err(err) = codex_exec_server::run_main_with_transport(args.listen).await { + if let Err(err) = codex_exec_server::run_main_with_transport(&args.listen).await { eprintln!("{err}"); std::process::exit(1); } diff --git a/codex-rs/exec-server/src/client.rs b/codex-rs/exec-server/src/client.rs index 652c9d01d95..2f314f3db3f 100644 --- a/codex-rs/exec-server/src/client.rs +++ b/codex-rs/exec-server/src/client.rs @@ -1,6 +1,20 @@ use std::sync::Arc; use std::time::Duration; +use codex_app_server_protocol::FsCopyParams; +use codex_app_server_protocol::FsCopyResponse; +use codex_app_server_protocol::FsCreateDirectoryParams; +use codex_app_server_protocol::FsCreateDirectoryResponse; +use codex_app_server_protocol::FsGetMetadataParams; +use codex_app_server_protocol::FsGetMetadataResponse; +use codex_app_server_protocol::FsReadDirectoryParams; +use codex_app_server_protocol::FsReadDirectoryResponse; +use codex_app_server_protocol::FsReadFileParams; +use codex_app_server_protocol::FsReadFileResponse; +use codex_app_server_protocol::FsRemoveParams; +use codex_app_server_protocol::FsRemoveResponse; +use codex_app_server_protocol::FsWriteFileParams; +use codex_app_server_protocol::FsWriteFileResponse; use codex_app_server_protocol::JSONRPCNotification; use serde_json::Value; use tokio::io::AsyncRead; @@ -26,6 +40,13 @@ use crate::protocol::ExecExitedNotification; use crate::protocol::ExecOutputDeltaNotification; use crate::protocol::ExecParams; use crate::protocol::ExecResponse; +use crate::protocol::FS_COPY_METHOD; +use crate::protocol::FS_CREATE_DIRECTORY_METHOD; +use crate::protocol::FS_GET_METADATA_METHOD; +use crate::protocol::FS_READ_DIRECTORY_METHOD; +use crate::protocol::FS_READ_FILE_METHOD; +use crate::protocol::FS_REMOVE_METHOD; +use crate::protocol::FS_WRITE_FILE_METHOD; use crate::protocol::INITIALIZE_METHOD; use crate::protocol::INITIALIZED_METHOD; use crate::protocol::InitializeParams; @@ -326,6 +347,129 @@ impl ExecServerClient { .map_err(Into::into) } + pub async fn fs_read_file( + &self, + params: FsReadFileParams, + ) -> Result { + if let Some(backend) = self.inner.backend.as_local() { + return backend.fs_read_file(params).await; + } + let Some(remote) = self.inner.backend.as_remote() else { + return Err(ExecServerError::Protocol( + "remote backend missing during fs/readFile".to_string(), + )); + }; + remote + .call(FS_READ_FILE_METHOD, ¶ms) + .await + .map_err(Into::into) + } + + pub async fn fs_write_file( + &self, + params: FsWriteFileParams, + ) -> Result { + if let Some(backend) = self.inner.backend.as_local() { + return backend.fs_write_file(params).await; + } + let Some(remote) = self.inner.backend.as_remote() else { + return Err(ExecServerError::Protocol( + "remote backend missing during fs/writeFile".to_string(), + )); + }; + remote + .call(FS_WRITE_FILE_METHOD, ¶ms) + .await + .map_err(Into::into) + } + + pub async fn fs_create_directory( + &self, + params: FsCreateDirectoryParams, + ) -> Result { + if let Some(backend) = self.inner.backend.as_local() { + return backend.fs_create_directory(params).await; + } + let Some(remote) = self.inner.backend.as_remote() else { + return Err(ExecServerError::Protocol( + "remote backend missing during fs/createDirectory".to_string(), + )); + }; + remote + .call(FS_CREATE_DIRECTORY_METHOD, ¶ms) + .await + .map_err(Into::into) + } + + pub async fn fs_get_metadata( + &self, + params: FsGetMetadataParams, + ) -> Result { + if let Some(backend) = self.inner.backend.as_local() { + return backend.fs_get_metadata(params).await; + } + let Some(remote) = self.inner.backend.as_remote() else { + return Err(ExecServerError::Protocol( + "remote backend missing during fs/getMetadata".to_string(), + )); + }; + remote + .call(FS_GET_METADATA_METHOD, ¶ms) + .await + .map_err(Into::into) + } + + pub async fn fs_read_directory( + &self, + params: FsReadDirectoryParams, + ) -> Result { + if let Some(backend) = self.inner.backend.as_local() { + return backend.fs_read_directory(params).await; + } + let Some(remote) = self.inner.backend.as_remote() else { + return Err(ExecServerError::Protocol( + "remote backend missing during fs/readDirectory".to_string(), + )); + }; + remote + .call(FS_READ_DIRECTORY_METHOD, ¶ms) + .await + .map_err(Into::into) + } + + pub async fn fs_remove( + &self, + params: FsRemoveParams, + ) -> Result { + if let Some(backend) = self.inner.backend.as_local() { + return backend.fs_remove(params).await; + } + let Some(remote) = self.inner.backend.as_remote() else { + return Err(ExecServerError::Protocol( + "remote backend missing during fs/remove".to_string(), + )); + }; + remote + .call(FS_REMOVE_METHOD, ¶ms) + .await + .map_err(Into::into) + } + + pub async fn fs_copy(&self, params: FsCopyParams) -> Result { + if let Some(backend) = self.inner.backend.as_local() { + return backend.fs_copy(params).await; + } + let Some(remote) = self.inner.backend.as_remote() else { + return Err(ExecServerError::Protocol( + "remote backend missing during fs/copy".to_string(), + )); + }; + remote + .call(FS_COPY_METHOD, ¶ms) + .await + .map_err(Into::into) + } + async fn connect( connection: JsonRpcConnection, options: ExecServerClientConnectOptions, diff --git a/codex-rs/exec-server/src/client/local_backend.rs b/codex-rs/exec-server/src/client/local_backend.rs index 16b16d3b103..e23a5361d3a 100644 --- a/codex-rs/exec-server/src/client/local_backend.rs +++ b/codex-rs/exec-server/src/client/local_backend.rs @@ -10,6 +10,20 @@ use crate::protocol::TerminateResponse; use crate::protocol::WriteParams; use crate::protocol::WriteResponse; use crate::server::ExecServerHandler; +use codex_app_server_protocol::FsCopyParams; +use codex_app_server_protocol::FsCopyResponse; +use codex_app_server_protocol::FsCreateDirectoryParams; +use codex_app_server_protocol::FsCreateDirectoryResponse; +use codex_app_server_protocol::FsGetMetadataParams; +use codex_app_server_protocol::FsGetMetadataResponse; +use codex_app_server_protocol::FsReadDirectoryParams; +use codex_app_server_protocol::FsReadDirectoryResponse; +use codex_app_server_protocol::FsReadFileParams; +use codex_app_server_protocol::FsReadFileResponse; +use codex_app_server_protocol::FsRemoveParams; +use codex_app_server_protocol::FsRemoveResponse; +use codex_app_server_protocol::FsWriteFileParams; +use codex_app_server_protocol::FsWriteFileResponse; use super::ExecServerError; @@ -92,4 +106,95 @@ impl LocalBackend { message: error.message, }) } + + pub(super) async fn fs_read_file( + &self, + params: FsReadFileParams, + ) -> Result { + self.handler + .fs_read_file(params) + .await + .map_err(|error| ExecServerError::Server { + code: error.code, + message: error.message, + }) + } + + pub(super) async fn fs_write_file( + &self, + params: FsWriteFileParams, + ) -> Result { + self.handler + .fs_write_file(params) + .await + .map_err(|error| ExecServerError::Server { + code: error.code, + message: error.message, + }) + } + + pub(super) async fn fs_create_directory( + &self, + params: FsCreateDirectoryParams, + ) -> Result { + self.handler + .fs_create_directory(params) + .await + .map_err(|error| ExecServerError::Server { + code: error.code, + message: error.message, + }) + } + + pub(super) async fn fs_get_metadata( + &self, + params: FsGetMetadataParams, + ) -> Result { + self.handler + .fs_get_metadata(params) + .await + .map_err(|error| ExecServerError::Server { + code: error.code, + message: error.message, + }) + } + + pub(super) async fn fs_read_directory( + &self, + params: FsReadDirectoryParams, + ) -> Result { + self.handler + .fs_read_directory(params) + .await + .map_err(|error| ExecServerError::Server { + code: error.code, + message: error.message, + }) + } + + pub(super) async fn fs_remove( + &self, + params: FsRemoveParams, + ) -> Result { + self.handler + .fs_remove(params) + .await + .map_err(|error| ExecServerError::Server { + code: error.code, + message: error.message, + }) + } + + pub(super) async fn fs_copy( + &self, + params: FsCopyParams, + ) -> Result { + self.handler + .fs_copy(params) + .await + .map_err(|error| ExecServerError::Server { + code: error.code, + message: error.message, + }) + } } diff --git a/codex-rs/exec-server/src/connection.rs b/codex-rs/exec-server/src/connection.rs index 6c9f66d9a84..af03fc06865 100644 --- a/codex-rs/exec-server/src/connection.rs +++ b/codex-rs/exec-server/src/connection.rs @@ -16,6 +16,7 @@ pub(crate) const CHANNEL_CAPACITY: usize = 128; #[derive(Debug)] pub(crate) enum JsonRpcConnectionEvent { Message(JSONRPCMessage), + MalformedMessage { reason: String }, Disconnected { reason: Option }, } @@ -55,14 +56,13 @@ impl JsonRpcConnection { } } Err(err) => { - send_disconnected( + send_malformed_message( &incoming_tx_for_reader, Some(format!( "failed to parse JSON-RPC message from {reader_label}: {err}" )), ) .await; - break; } } } @@ -132,14 +132,13 @@ impl JsonRpcConnection { } } Err(err) => { - send_disconnected( + send_malformed_message( &incoming_tx_for_reader, Some(format!( "failed to parse websocket JSON-RPC message from {reader_label}: {err}" )), ) .await; - break; } } } @@ -155,14 +154,13 @@ impl JsonRpcConnection { } } Err(err) => { - send_disconnected( + send_malformed_message( &incoming_tx_for_reader, Some(format!( "failed to parse websocket JSON-RPC message from {reader_label}: {err}" )), ) .await; - break; } } } @@ -247,6 +245,17 @@ async fn send_disconnected( .await; } +async fn send_malformed_message( + incoming_tx: &mpsc::Sender, + reason: Option, +) { + let _ = incoming_tx + .send(JsonRpcConnectionEvent::MalformedMessage { + reason: reason.unwrap_or_else(|| "malformed JSON-RPC message".to_string()), + }) + .await; +} + async fn write_jsonrpc_line_message( writer: &mut BufWriter, message: &JSONRPCMessage, diff --git a/codex-rs/exec-server/src/lib.rs b/codex-rs/exec-server/src/lib.rs index 13e910351ea..2ffcd30196b 100644 --- a/codex-rs/exec-server/src/lib.rs +++ b/codex-rs/exec-server/src/lib.rs @@ -11,6 +11,21 @@ pub use client::ExecServerError; pub use client_api::ExecServerClientConnectOptions; pub use client_api::ExecServerEvent; pub use client_api::RemoteExecServerConnectArgs; +pub use codex_app_server_protocol::FsCopyParams; +pub use codex_app_server_protocol::FsCopyResponse; +pub use codex_app_server_protocol::FsCreateDirectoryParams; +pub use codex_app_server_protocol::FsCreateDirectoryResponse; +pub use codex_app_server_protocol::FsGetMetadataParams; +pub use codex_app_server_protocol::FsGetMetadataResponse; +pub use codex_app_server_protocol::FsReadDirectoryEntry; +pub use codex_app_server_protocol::FsReadDirectoryParams; +pub use codex_app_server_protocol::FsReadDirectoryResponse; +pub use codex_app_server_protocol::FsReadFileParams; +pub use codex_app_server_protocol::FsReadFileResponse; +pub use codex_app_server_protocol::FsRemoveParams; +pub use codex_app_server_protocol::FsRemoveResponse; +pub use codex_app_server_protocol::FsWriteFileParams; +pub use codex_app_server_protocol::FsWriteFileResponse; pub use local::ExecServerLaunchCommand; pub use local::SpawnedExecServer; pub use local::spawn_local_exec_server; @@ -27,7 +42,8 @@ pub use protocol::TerminateParams; pub use protocol::TerminateResponse; pub use protocol::WriteParams; pub use protocol::WriteResponse; -pub use server::ExecServerTransport; +pub use server::DEFAULT_LISTEN_URL; pub use server::ExecServerTransportParseError; pub use server::run_main; +pub use server::run_main_with_listen_url; pub use server::run_main_with_transport; diff --git a/codex-rs/exec-server/src/local.rs b/codex-rs/exec-server/src/local.rs index 20b59a983cb..e51c9439482 100644 --- a/codex-rs/exec-server/src/local.rs +++ b/codex-rs/exec-server/src/local.rs @@ -42,6 +42,7 @@ pub async fn spawn_local_exec_server( ) -> Result { let mut child = Command::new(&command.program); child.args(&command.args); + child.args(["--listen", "stdio://"]); child.stdin(Stdio::piped()); child.stdout(Stdio::piped()); child.stderr(Stdio::inherit()); diff --git a/codex-rs/exec-server/src/protocol.rs b/codex-rs/exec-server/src/protocol.rs index 7ed8e20ae46..ca7d89d3551 100644 --- a/codex-rs/exec-server/src/protocol.rs +++ b/codex-rs/exec-server/src/protocol.rs @@ -13,6 +13,13 @@ pub const EXEC_WRITE_METHOD: &str = "process/write"; pub const EXEC_TERMINATE_METHOD: &str = "process/terminate"; pub const EXEC_OUTPUT_DELTA_METHOD: &str = "process/output"; pub const EXEC_EXITED_METHOD: &str = "process/exited"; +pub const FS_READ_FILE_METHOD: &str = "fs/readFile"; +pub const FS_WRITE_FILE_METHOD: &str = "fs/writeFile"; +pub const FS_CREATE_DIRECTORY_METHOD: &str = "fs/createDirectory"; +pub const FS_GET_METADATA_METHOD: &str = "fs/getMetadata"; +pub const FS_READ_DIRECTORY_METHOD: &str = "fs/readDirectory"; +pub const FS_REMOVE_METHOD: &str = "fs/remove"; +pub const FS_COPY_METHOD: &str = "fs/copy"; pub const PROTOCOL_VERSION: &str = "exec-server.v0"; #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] diff --git a/codex-rs/exec-server/src/rpc.rs b/codex-rs/exec-server/src/rpc.rs index 94a891e22cb..8d79883c570 100644 --- a/codex-rs/exec-server/src/rpc.rs +++ b/codex-rs/exec-server/src/rpc.rs @@ -196,6 +196,10 @@ impl RpcClient { break; } } + JsonRpcConnectionEvent::MalformedMessage { reason } => { + warn!("JSON-RPC client closing after malformed message: {reason}"); + break; + } JsonRpcConnectionEvent::Disconnected { reason } => { let _ = event_tx.send(RpcClientEvent::Disconnected { reason }).await; drain_pending(&pending_for_reader).await; diff --git a/codex-rs/exec-server/src/server.rs b/codex-rs/exec-server/src/server.rs index ba074e617fd..98c55df4a6b 100644 --- a/codex-rs/exec-server/src/server.rs +++ b/codex-rs/exec-server/src/server.rs @@ -1,18 +1,25 @@ +mod filesystem; mod handler; mod processor; mod registry; mod transport; pub(crate) use handler::ExecServerHandler; -pub use transport::ExecServerTransport; -pub use transport::ExecServerTransportParseError; +pub use transport::DEFAULT_LISTEN_URL; +pub use transport::ExecServerListenUrlParseError as ExecServerTransportParseError; pub async fn run_main() -> Result<(), Box> { - run_main_with_transport(ExecServerTransport::Stdio).await + run_main_with_transport(DEFAULT_LISTEN_URL).await +} + +pub async fn run_main_with_listen_url( + listen_url: &str, +) -> Result<(), Box> { + transport::run_transport(listen_url).await } pub async fn run_main_with_transport( - transport: ExecServerTransport, + listen_url: &str, ) -> Result<(), Box> { - transport::run_transport(transport).await + transport::run_transport(listen_url).await } diff --git a/codex-rs/exec-server/src/server/filesystem.rs b/codex-rs/exec-server/src/server/filesystem.rs new file mode 100644 index 00000000000..09467e9e020 --- /dev/null +++ b/codex-rs/exec-server/src/server/filesystem.rs @@ -0,0 +1,170 @@ +use std::io; +use std::sync::Arc; + +use base64::Engine as _; +use base64::engine::general_purpose::STANDARD; +use codex_app_server_protocol::FsCopyParams; +use codex_app_server_protocol::FsCopyResponse; +use codex_app_server_protocol::FsCreateDirectoryParams; +use codex_app_server_protocol::FsCreateDirectoryResponse; +use codex_app_server_protocol::FsGetMetadataParams; +use codex_app_server_protocol::FsGetMetadataResponse; +use codex_app_server_protocol::FsReadDirectoryEntry; +use codex_app_server_protocol::FsReadDirectoryParams; +use codex_app_server_protocol::FsReadDirectoryResponse; +use codex_app_server_protocol::FsReadFileParams; +use codex_app_server_protocol::FsReadFileResponse; +use codex_app_server_protocol::FsRemoveParams; +use codex_app_server_protocol::FsRemoveResponse; +use codex_app_server_protocol::FsWriteFileParams; +use codex_app_server_protocol::FsWriteFileResponse; +use codex_app_server_protocol::JSONRPCErrorError; +use codex_environment::CopyOptions; +use codex_environment::CreateDirectoryOptions; +use codex_environment::Environment; +use codex_environment::ExecutorFileSystem; +use codex_environment::RemoveOptions; + +use crate::rpc::internal_error; +use crate::rpc::invalid_request; + +#[derive(Clone)] +pub(crate) struct ExecServerFileSystem { + file_system: Arc, +} + +impl Default for ExecServerFileSystem { + fn default() -> Self { + Self { + file_system: Arc::new(Environment.get_filesystem()), + } + } +} + +impl ExecServerFileSystem { + pub(crate) async fn read_file( + &self, + params: FsReadFileParams, + ) -> Result { + let bytes = self + .file_system + .read_file(¶ms.path) + .await + .map_err(map_fs_error)?; + Ok(FsReadFileResponse { + data_base64: STANDARD.encode(bytes), + }) + } + + pub(crate) async fn write_file( + &self, + params: FsWriteFileParams, + ) -> Result { + let bytes = STANDARD.decode(params.data_base64).map_err(|err| { + invalid_request(format!( + "fs/writeFile requires valid base64 dataBase64: {err}" + )) + })?; + self.file_system + .write_file(¶ms.path, bytes) + .await + .map_err(map_fs_error)?; + Ok(FsWriteFileResponse {}) + } + + pub(crate) async fn create_directory( + &self, + params: FsCreateDirectoryParams, + ) -> Result { + self.file_system + .create_directory( + ¶ms.path, + CreateDirectoryOptions { + recursive: params.recursive.unwrap_or(true), + }, + ) + .await + .map_err(map_fs_error)?; + Ok(FsCreateDirectoryResponse {}) + } + + pub(crate) async fn get_metadata( + &self, + params: FsGetMetadataParams, + ) -> Result { + let metadata = self + .file_system + .get_metadata(¶ms.path) + .await + .map_err(map_fs_error)?; + Ok(FsGetMetadataResponse { + is_directory: metadata.is_directory, + is_file: metadata.is_file, + created_at_ms: metadata.created_at_ms, + modified_at_ms: metadata.modified_at_ms, + }) + } + + pub(crate) async fn read_directory( + &self, + params: FsReadDirectoryParams, + ) -> Result { + let entries = self + .file_system + .read_directory(¶ms.path) + .await + .map_err(map_fs_error)?; + Ok(FsReadDirectoryResponse { + entries: entries + .into_iter() + .map(|entry| FsReadDirectoryEntry { + file_name: entry.file_name, + is_directory: entry.is_directory, + is_file: entry.is_file, + }) + .collect(), + }) + } + + pub(crate) async fn remove( + &self, + params: FsRemoveParams, + ) -> Result { + self.file_system + .remove( + ¶ms.path, + RemoveOptions { + recursive: params.recursive.unwrap_or(true), + force: params.force.unwrap_or(true), + }, + ) + .await + .map_err(map_fs_error)?; + Ok(FsRemoveResponse {}) + } + + pub(crate) async fn copy( + &self, + params: FsCopyParams, + ) -> Result { + self.file_system + .copy( + ¶ms.source_path, + ¶ms.destination_path, + CopyOptions { + recursive: params.recursive, + }, + ) + .await + .map_err(map_fs_error)?; + Ok(FsCopyResponse {}) + } +} + +fn map_fs_error(err: io::Error) -> JSONRPCErrorError { + if err.kind() == io::ErrorKind::InvalidInput { + invalid_request(err.to_string()) + } else { + internal_error(err.to_string()) + } +} diff --git a/codex-rs/exec-server/src/server/handler.rs b/codex-rs/exec-server/src/server/handler.rs index 40a13cf1f99..a5a75a8e32e 100644 --- a/codex-rs/exec-server/src/server/handler.rs +++ b/codex-rs/exec-server/src/server/handler.rs @@ -5,6 +5,20 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::time::Duration; +use codex_app_server_protocol::FsCopyParams; +use codex_app_server_protocol::FsCopyResponse; +use codex_app_server_protocol::FsCreateDirectoryParams; +use codex_app_server_protocol::FsCreateDirectoryResponse; +use codex_app_server_protocol::FsGetMetadataParams; +use codex_app_server_protocol::FsGetMetadataResponse; +use codex_app_server_protocol::FsReadDirectoryParams; +use codex_app_server_protocol::FsReadDirectoryResponse; +use codex_app_server_protocol::FsReadFileParams; +use codex_app_server_protocol::FsReadFileResponse; +use codex_app_server_protocol::FsRemoveParams; +use codex_app_server_protocol::FsRemoveResponse; +use codex_app_server_protocol::FsWriteFileParams; +use codex_app_server_protocol::FsWriteFileResponse; use codex_app_server_protocol::JSONRPCErrorError; use codex_utils_pty::ExecCommandSession; use codex_utils_pty::TerminalSize; @@ -30,6 +44,7 @@ use crate::rpc::RpcNotificationSender; use crate::rpc::internal_error; use crate::rpc::invalid_params; use crate::rpc::invalid_request; +use crate::server::filesystem::ExecServerFileSystem; const RETAINED_OUTPUT_BYTES_PER_PROCESS: usize = 1024 * 1024; #[cfg(test)] @@ -61,6 +76,7 @@ enum ProcessEntry { pub(crate) struct ExecServerHandler { notifications: RpcNotificationSender, + file_system: ExecServerFileSystem, processes: Arc>>, initialize_requested: AtomicBool, initialized: AtomicBool, @@ -70,6 +86,7 @@ impl ExecServerHandler { pub(crate) fn new(notifications: RpcNotificationSender) -> Self { Self { notifications, + file_system: ExecServerFileSystem::default(), processes: Arc::new(Mutex::new(HashMap::new())), initialize_requested: AtomicBool::new(false), initialized: AtomicBool::new(false), @@ -111,22 +128,22 @@ impl ExecServerHandler { Ok(()) } - fn require_initialized(&self) -> Result<(), JSONRPCErrorError> { + fn require_initialized_for(&self, method_family: &str) -> Result<(), JSONRPCErrorError> { if !self.initialize_requested.load(Ordering::SeqCst) { - return Err(invalid_request( - "client must call initialize before using exec methods".to_string(), - )); + return Err(invalid_request(format!( + "client must call initialize before using {method_family} methods" + ))); } if !self.initialized.load(Ordering::SeqCst) { - return Err(invalid_request( - "client must send initialized before using exec methods".to_string(), - )); + return Err(invalid_request(format!( + "client must send initialized before using {method_family} methods" + ))); } Ok(()) } pub(crate) async fn exec(&self, params: ExecParams) -> Result { - self.require_initialized()?; + self.require_initialized_for("exec")?; let process_id = params.process_id.clone(); let (program, args) = params @@ -231,7 +248,7 @@ impl ExecServerHandler { &self, params: ReadParams, ) -> Result { - self.require_initialized()?; + self.require_initialized_for("exec")?; let after_seq = params.after_seq.unwrap_or(0); let max_bytes = params.max_bytes.unwrap_or(usize::MAX); let wait = Duration::from_millis(params.wait_ms.unwrap_or(0)); @@ -300,7 +317,7 @@ impl ExecServerHandler { &self, params: WriteParams, ) -> Result { - self.require_initialized()?; + self.require_initialized_for("exec")?; let writer_tx = { let process_map = self.processes.lock().await; let process = process_map.get(¶ms.process_id).ok_or_else(|| { @@ -333,7 +350,7 @@ impl ExecServerHandler { &self, params: TerminateParams, ) -> Result { - self.require_initialized()?; + self.require_initialized_for("exec")?; let running = { let process_map = self.processes.lock().await; match process_map.get(¶ms.process_id) { @@ -347,6 +364,62 @@ impl ExecServerHandler { Ok(TerminateResponse { running }) } + + pub(crate) async fn fs_read_file( + &self, + params: FsReadFileParams, + ) -> Result { + self.require_initialized_for("filesystem")?; + self.file_system.read_file(params).await + } + + pub(crate) async fn fs_write_file( + &self, + params: FsWriteFileParams, + ) -> Result { + self.require_initialized_for("filesystem")?; + self.file_system.write_file(params).await + } + + pub(crate) async fn fs_create_directory( + &self, + params: FsCreateDirectoryParams, + ) -> Result { + self.require_initialized_for("filesystem")?; + self.file_system.create_directory(params).await + } + + pub(crate) async fn fs_get_metadata( + &self, + params: FsGetMetadataParams, + ) -> Result { + self.require_initialized_for("filesystem")?; + self.file_system.get_metadata(params).await + } + + pub(crate) async fn fs_read_directory( + &self, + params: FsReadDirectoryParams, + ) -> Result { + self.require_initialized_for("filesystem")?; + self.file_system.read_directory(params).await + } + + pub(crate) async fn fs_remove( + &self, + params: FsRemoveParams, + ) -> Result { + self.require_initialized_for("filesystem")?; + self.file_system.remove(params).await + } + + pub(crate) async fn fs_copy( + &self, + params: FsCopyParams, + ) -> Result { + self.require_initialized_for("filesystem")?; + self.file_system.copy(params).await + } } async fn stream_output( diff --git a/codex-rs/exec-server/src/server/processor.rs b/codex-rs/exec-server/src/server/processor.rs index 888bbf7f302..518a1a78e0d 100644 --- a/codex-rs/exec-server/src/server/processor.rs +++ b/codex-rs/exec-server/src/server/processor.rs @@ -10,13 +10,14 @@ use crate::connection::JsonRpcConnectionEvent; use crate::rpc::RpcNotificationSender; use crate::rpc::RpcServerOutboundMessage; use crate::rpc::encode_server_message; +use crate::rpc::invalid_request; use crate::rpc::method_not_found; use crate::server::ExecServerHandler; use crate::server::registry::build_router; pub(crate) async fn run_connection(connection: JsonRpcConnection) { let router = Arc::new(build_router()); - let (json_outgoing_tx, mut incoming_rx, _connection_tasks) = connection.into_parts(); + let (json_outgoing_tx, mut incoming_rx, connection_tasks) = connection.into_parts(); let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(CHANNEL_CAPACITY); let notifications = RpcNotificationSender::new(outgoing_tx.clone()); @@ -37,17 +38,29 @@ pub(crate) async fn run_connection(connection: JsonRpcConnection) { } }); + // Process inbound events sequentially to preserve initialize/initialized ordering. while let Some(event) = incoming_rx.recv().await { match event { + JsonRpcConnectionEvent::MalformedMessage { reason } => { + warn!("ignoring malformed exec-server message: {reason}"); + if outgoing_tx + .send(RpcServerOutboundMessage::Error { + request_id: codex_app_server_protocol::RequestId::Integer(-1), + error: invalid_request(reason), + }) + .await + .is_err() + { + break; + } + } JsonRpcConnectionEvent::Message(message) => match message { codex_app_server_protocol::JSONRPCMessage::Request(request) => { if let Some(route) = router.request_route(request.method.as_str()) { - let route = route(handler.clone(), request); - let outgoing_tx = outgoing_tx.clone(); - tokio::spawn(async move { - let message = route.await; - let _ = outgoing_tx.send(message).await; - }); + let message = route(handler.clone(), request).await; + if outgoing_tx.send(message).await.is_err() { + break; + } } else if outgoing_tx .send(RpcServerOutboundMessage::Error { request_id: request.id, @@ -102,5 +115,9 @@ pub(crate) async fn run_connection(connection: JsonRpcConnection) { handler.shutdown().await; drop(outgoing_tx); + for task in connection_tasks { + task.abort(); + let _ = task.await; + } let _ = outbound_task.await; } diff --git a/codex-rs/exec-server/src/server/registry.rs b/codex-rs/exec-server/src/server/registry.rs index 50efdddd71e..482e5ab6107 100644 --- a/codex-rs/exec-server/src/server/registry.rs +++ b/codex-rs/exec-server/src/server/registry.rs @@ -5,6 +5,13 @@ use crate::protocol::EXEC_READ_METHOD; use crate::protocol::EXEC_TERMINATE_METHOD; use crate::protocol::EXEC_WRITE_METHOD; use crate::protocol::ExecParams; +use crate::protocol::FS_COPY_METHOD; +use crate::protocol::FS_CREATE_DIRECTORY_METHOD; +use crate::protocol::FS_GET_METADATA_METHOD; +use crate::protocol::FS_READ_DIRECTORY_METHOD; +use crate::protocol::FS_READ_FILE_METHOD; +use crate::protocol::FS_REMOVE_METHOD; +use crate::protocol::FS_WRITE_FILE_METHOD; use crate::protocol::INITIALIZE_METHOD; use crate::protocol::INITIALIZED_METHOD; use crate::protocol::InitializeParams; @@ -13,6 +20,13 @@ use crate::protocol::TerminateParams; use crate::protocol::WriteParams; use crate::rpc::RpcRouter; use crate::server::ExecServerHandler; +use codex_app_server_protocol::FsCopyParams; +use codex_app_server_protocol::FsCreateDirectoryParams; +use codex_app_server_protocol::FsGetMetadataParams; +use codex_app_server_protocol::FsReadDirectoryParams; +use codex_app_server_protocol::FsReadFileParams; +use codex_app_server_protocol::FsRemoveParams; +use codex_app_server_protocol::FsWriteFileParams; pub(crate) fn build_router() -> RpcRouter { let mut router = RpcRouter::new(); @@ -24,7 +38,9 @@ pub(crate) fn build_router() -> RpcRouter { ); router.notification( INITIALIZED_METHOD, - |handler: Arc, (): ()| async move { handler.initialized() }, + |handler: Arc, _params: serde_json::Value| async move { + handler.initialized() + }, ); router.request( EXEC_METHOD, @@ -48,5 +64,47 @@ pub(crate) fn build_router() -> RpcRouter { handler.terminate(params).await }, ); + router.request( + FS_READ_FILE_METHOD, + |handler: Arc, params: FsReadFileParams| async move { + handler.fs_read_file(params).await + }, + ); + router.request( + FS_WRITE_FILE_METHOD, + |handler: Arc, params: FsWriteFileParams| async move { + handler.fs_write_file(params).await + }, + ); + router.request( + FS_CREATE_DIRECTORY_METHOD, + |handler: Arc, params: FsCreateDirectoryParams| async move { + handler.fs_create_directory(params).await + }, + ); + router.request( + FS_GET_METADATA_METHOD, + |handler: Arc, params: FsGetMetadataParams| async move { + handler.fs_get_metadata(params).await + }, + ); + router.request( + FS_READ_DIRECTORY_METHOD, + |handler: Arc, params: FsReadDirectoryParams| async move { + handler.fs_read_directory(params).await + }, + ); + router.request( + FS_REMOVE_METHOD, + |handler: Arc, params: FsRemoveParams| async move { + handler.fs_remove(params).await + }, + ); + router.request( + FS_COPY_METHOD, + |handler: Arc, params: FsCopyParams| async move { + handler.fs_copy(params).await + }, + ); router } diff --git a/codex-rs/exec-server/src/server/transport.rs b/codex-rs/exec-server/src/server/transport.rs index b653c0b79b7..6e8e7e7bbd8 100644 --- a/codex-rs/exec-server/src/server/transport.rs +++ b/codex-rs/exec-server/src/server/transport.rs @@ -1,6 +1,6 @@ use std::net::SocketAddr; -use std::str::FromStr; +use tokio::io; use tokio::net::TcpListener; use tokio_tungstenite::accept_async; use tracing::warn; @@ -8,26 +8,22 @@ use tracing::warn; use crate::connection::JsonRpcConnection; use crate::server::processor::run_connection; -#[derive(Clone, Copy, Debug, Eq, PartialEq)] -pub enum ExecServerTransport { - Stdio, - WebSocket { bind_address: SocketAddr }, -} +pub const DEFAULT_LISTEN_URL: &str = "ws://127.0.0.1:0"; #[derive(Debug, Clone, Eq, PartialEq)] -pub enum ExecServerTransportParseError { +pub enum ExecServerListenUrlParseError { UnsupportedListenUrl(String), InvalidWebSocketListenUrl(String), } -impl std::fmt::Display for ExecServerTransportParseError { +impl std::fmt::Display for ExecServerListenUrlParseError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { - ExecServerTransportParseError::UnsupportedListenUrl(listen_url) => write!( + ExecServerListenUrlParseError::UnsupportedListenUrl(listen_url) => write!( f, - "unsupported --listen URL `{listen_url}`; expected `stdio://` or `ws://IP:PORT`" + "unsupported --listen URL `{listen_url}`; expected `ws://IP:PORT`" ), - ExecServerTransportParseError::InvalidWebSocketListenUrl(listen_url) => write!( + ExecServerListenUrlParseError::InvalidWebSocketListenUrl(listen_url) => write!( f, "invalid websocket --listen URL `{listen_url}`; expected `ws://IP:PORT`" ), @@ -35,62 +31,59 @@ impl std::fmt::Display for ExecServerTransportParseError { } } -impl std::error::Error for ExecServerTransportParseError {} - -impl ExecServerTransport { - pub const DEFAULT_LISTEN_URL: &str = "stdio://"; - - pub fn from_listen_url(listen_url: &str) -> Result { - if listen_url == Self::DEFAULT_LISTEN_URL { - return Ok(Self::Stdio); - } - - if let Some(socket_addr) = listen_url.strip_prefix("ws://") { - let bind_address = socket_addr.parse::().map_err(|_| { - ExecServerTransportParseError::InvalidWebSocketListenUrl(listen_url.to_string()) - })?; - return Ok(Self::WebSocket { bind_address }); - } +impl std::error::Error for ExecServerListenUrlParseError {} - Err(ExecServerTransportParseError::UnsupportedListenUrl( - listen_url.to_string(), - )) - } +#[derive(Debug, Clone, Eq, PartialEq)] +enum ListenAddress { + Websocket(SocketAddr), + Stdio, } -impl FromStr for ExecServerTransport { - type Err = ExecServerTransportParseError; - - fn from_str(s: &str) -> Result { - Self::from_listen_url(s) +fn parse_listen_url(listen_url: &str) -> Result { + if let Some(socket_addr) = listen_url.strip_prefix("ws://") { + return socket_addr + .parse::() + .map(ListenAddress::Websocket) + .map_err(|_| { + ExecServerListenUrlParseError::InvalidWebSocketListenUrl(listen_url.to_string()) + }); + } + if listen_url == "stdio://" { + return Ok(ListenAddress::Stdio); } + + Err(ExecServerListenUrlParseError::UnsupportedListenUrl( + listen_url.to_string(), + )) } pub(crate) async fn run_transport( - transport: ExecServerTransport, + listen_url: &str, ) -> Result<(), Box> { - match transport { - ExecServerTransport::Stdio => { - run_connection(JsonRpcConnection::from_stdio( - tokio::io::stdin(), - tokio::io::stdout(), - "exec-server stdio".to_string(), - )) - .await; - Ok(()) - } - ExecServerTransport::WebSocket { bind_address } => { - run_websocket_listener(bind_address).await - } + match parse_listen_url(listen_url)? { + ListenAddress::Websocket(bind_address) => run_websocket_listener(bind_address).await, + ListenAddress::Stdio => run_stdio_listener().await, } } +async fn run_stdio_listener() -> Result<(), Box> { + run_connection(JsonRpcConnection::from_stdio( + io::stdin(), + io::stdout(), + "exec-server stdio".to_string(), + )) + .await; + Ok(()) +} + async fn run_websocket_listener( bind_address: SocketAddr, ) -> Result<(), Box> { let listener = TcpListener::bind(bind_address).await?; let local_addr = listener.local_addr()?; - print_websocket_startup_banner(local_addr); + let listen_message = format!("codex-exec-server listening on ws://{local_addr}"); + tracing::info!("{}", listen_message); + eprintln!("{listen_message}"); loop { let (stream, peer_addr) = listener.accept().await?; @@ -113,54 +106,6 @@ async fn run_websocket_listener( } } -#[allow(clippy::print_stderr)] -fn print_websocket_startup_banner(addr: SocketAddr) { - eprintln!("codex-exec-server listening on ws://{addr}"); -} - #[cfg(test)] -mod tests { - use pretty_assertions::assert_eq; - - use super::ExecServerTransport; - - #[test] - fn exec_server_transport_parses_stdio_listen_url() { - let transport = - ExecServerTransport::from_listen_url(ExecServerTransport::DEFAULT_LISTEN_URL) - .expect("stdio listen URL should parse"); - assert_eq!(transport, ExecServerTransport::Stdio); - } - - #[test] - fn exec_server_transport_parses_websocket_listen_url() { - let transport = ExecServerTransport::from_listen_url("ws://127.0.0.1:1234") - .expect("websocket listen URL should parse"); - assert_eq!( - transport, - ExecServerTransport::WebSocket { - bind_address: "127.0.0.1:1234".parse().expect("valid socket address"), - } - ); - } - - #[test] - fn exec_server_transport_rejects_invalid_websocket_listen_url() { - let err = ExecServerTransport::from_listen_url("ws://localhost:1234") - .expect_err("hostname bind address should be rejected"); - assert_eq!( - err.to_string(), - "invalid websocket --listen URL `ws://localhost:1234`; expected `ws://IP:PORT`" - ); - } - - #[test] - fn exec_server_transport_rejects_unsupported_listen_url() { - let err = ExecServerTransport::from_listen_url("http://127.0.0.1:1234") - .expect_err("unsupported scheme should fail"); - assert_eq!( - err.to_string(), - "unsupported --listen URL `http://127.0.0.1:1234`; expected `stdio://` or `ws://IP:PORT`" - ); - } -} +#[path = "transport_tests.rs"] +mod transport_tests; diff --git a/codex-rs/exec-server/src/server/transport_tests.rs b/codex-rs/exec-server/src/server/transport_tests.rs new file mode 100644 index 00000000000..7e760167539 --- /dev/null +++ b/codex-rs/exec-server/src/server/transport_tests.rs @@ -0,0 +1,50 @@ +use pretty_assertions::assert_eq; + +use super::DEFAULT_LISTEN_URL; +use super::parse_listen_url; + +#[test] +fn parse_listen_url_accepts_default_websocket_url() { + let bind_address = + parse_listen_url(DEFAULT_LISTEN_URL).expect("default listen URL should parse"); + assert_eq!( + bind_address, + super::ListenAddress::Websocket("127.0.0.1:0".parse().expect("valid socket address")) + ); +} + +#[test] +fn parse_listen_url_accepts_stdio_url() { + let listen_address = parse_listen_url("stdio://").expect("stdio listen URL should parse"); + assert_eq!(listen_address, super::ListenAddress::Stdio); +} + +#[test] +fn parse_listen_url_accepts_websocket_url() { + let bind_address = + parse_listen_url("ws://127.0.0.1:1234").expect("websocket listen URL should parse"); + assert_eq!( + bind_address, + super::ListenAddress::Websocket("127.0.0.1:1234".parse().expect("valid socket address")) + ); +} + +#[test] +fn parse_listen_url_rejects_invalid_websocket_url() { + let err = parse_listen_url("ws://localhost:1234") + .expect_err("hostname bind address should be rejected"); + assert_eq!( + err.to_string(), + "invalid websocket --listen URL `ws://localhost:1234`; expected `ws://IP:PORT`" + ); +} + +#[test] +fn parse_listen_url_rejects_unsupported_url() { + let err = + parse_listen_url("http://127.0.0.1:1234").expect_err("unsupported scheme should fail"); + assert_eq!( + err.to_string(), + "unsupported --listen URL `http://127.0.0.1:1234`; expected `ws://IP:PORT`" + ); +} diff --git a/codex-rs/exec-server/tests/common.rs b/codex-rs/exec-server/tests/common.rs new file mode 100644 index 00000000000..75090eb1f1a --- /dev/null +++ b/codex-rs/exec-server/tests/common.rs @@ -0,0 +1,190 @@ +#![allow(dead_code)] + +use std::sync::atomic::AtomicI64; +use std::sync::atomic::Ordering; + +use codex_app_server_protocol::JSONRPCMessage; +use codex_app_server_protocol::JSONRPCNotification; +use codex_app_server_protocol::JSONRPCRequest; +use codex_app_server_protocol::RequestId; +use codex_utils_cargo_bin::cargo_bin; +use futures::SinkExt; +use futures::StreamExt; +use std::process::Stdio; +use tokio::io::AsyncBufReadExt; +use tokio::io::BufReader; +use tokio::process::Command; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; +use tokio::time::timeout; +use tokio_tungstenite::connect_async; +use tokio_tungstenite::tungstenite::Message; + +enum OutgoingMessage { + Json(JSONRPCMessage), + RawText(String), +} + +pub struct ExecServer { + child: tokio::process::Child, + next_request_id: AtomicI64, + incoming_rx: mpsc::Receiver, + outgoing_tx: mpsc::Sender, + reader_task: JoinHandle<()>, + writer_task: JoinHandle<()>, +} + +impl ExecServer { + pub async fn send_request( + &mut self, + method: &str, + params: serde_json::Value, + ) -> anyhow::Result { + let request_id = RequestId::Integer(self.next_request_id.fetch_add(1, Ordering::SeqCst)); + let request = JSONRPCRequest { + id: request_id.clone(), + method: method.to_string(), + params: Some(params), + trace: None, + }; + self.outgoing_tx + .send(OutgoingMessage::Json(JSONRPCMessage::Request(request))) + .await?; + Ok(request_id) + } + + pub async fn send_notification( + &mut self, + method: &str, + params: serde_json::Value, + ) -> anyhow::Result<()> { + let notification = JSONRPCNotification { + method: method.to_string(), + params: Some(params), + }; + self.outgoing_tx + .send(OutgoingMessage::Json(JSONRPCMessage::Notification( + notification, + ))) + .await?; + Ok(()) + } + + pub async fn send_raw_text(&mut self, text: &str) -> anyhow::Result<()> { + self.outgoing_tx + .send(OutgoingMessage::RawText(text.to_string())) + .await?; + Ok(()) + } + + pub async fn next_event(&mut self) -> anyhow::Result { + self.incoming_rx + .recv() + .await + .ok_or_else(|| anyhow::anyhow!("exec-server closed before next event")) + } + + pub async fn wait_for_event( + &mut self, + predicate: impl Fn(&JSONRPCMessage) -> bool, + ) -> anyhow::Result { + loop { + let event = self.next_event().await?; + if predicate(&event) { + return Ok(event); + } + } + } + + pub async fn shutdown(&mut self) -> anyhow::Result<()> { + self.reader_task.abort(); + self.writer_task.abort(); + self.child.start_kill()?; + Ok(()) + } +} + +pub mod exec_server { + use super::*; + + pub async fn exec_server() -> anyhow::Result { + let binary = cargo_bin("codex-exec-server")?; + let mut child = Command::new(binary); + child.args(["--listen", "ws://127.0.0.1:0"]); + child.stdin(Stdio::null()); + child.stdout(Stdio::null()); + child.stderr(Stdio::piped()); + let mut child = child.spawn()?; + + let stderr = child + .stderr + .take() + .ok_or_else(|| anyhow::anyhow!("stderr should be piped"))?; + let mut stderr_lines = BufReader::new(stderr).lines(); + let websocket_url = read_websocket_url(&mut stderr_lines).await?; + + let (websocket, _) = connect_async(websocket_url).await?; + let (mut outgoing_ws, mut incoming_ws) = websocket.split(); + let (outgoing_tx, mut outgoing_rx) = mpsc::channel::(128); + let (incoming_tx, incoming_rx) = mpsc::channel::(128); + + let reader_task = tokio::spawn(async move { + while let Some(message) = incoming_ws.next().await { + let Ok(message) = message else { + break; + }; + let outgoing = match message { + Message::Text(text) => serde_json::from_str::(&text), + Message::Binary(bytes) => serde_json::from_slice::(&bytes), + _ => continue, + }; + if let Ok(message) = outgoing + && let Err(_err) = incoming_tx.send(message).await + { + break; + } + } + }); + + let writer_task = tokio::spawn(async move { + while let Some(message) = outgoing_rx.recv().await { + let outgoing = match message { + OutgoingMessage::Json(message) => match serde_json::to_string(&message) { + Ok(json) => Message::Text(json.into()), + Err(_) => continue, + }, + OutgoingMessage::RawText(message) => Message::Text(message.into()), + }; + if outgoing_ws.send(outgoing).await.is_err() { + break; + } + } + }); + + Ok(ExecServer { + child, + next_request_id: AtomicI64::new(1), + incoming_rx, + outgoing_tx, + reader_task, + writer_task, + }) + } + + async fn read_websocket_url( + lines: &mut tokio::io::Lines>, + ) -> anyhow::Result + where + R: tokio::io::AsyncRead + Unpin, + { + let line = timeout(std::time::Duration::from_secs(5), lines.next_line()) + .await?? + .ok_or_else(|| anyhow::anyhow!("missing websocket startup banner"))?; + + let websocket_url = line + .split_whitespace() + .find(|part| part.starts_with("ws://")) + .ok_or_else(|| anyhow::anyhow!("missing websocket URL in startup banner: {line}"))?; + Ok(websocket_url.to_string()) + } +} diff --git a/codex-rs/exec-server/tests/initialize.rs b/codex-rs/exec-server/tests/initialize.rs new file mode 100644 index 00000000000..04a36348c82 --- /dev/null +++ b/codex-rs/exec-server/tests/initialize.rs @@ -0,0 +1,39 @@ +#![cfg(unix)] + +mod common; + +use codex_app_server_protocol::JSONRPCMessage; +use codex_app_server_protocol::JSONRPCResponse; +use codex_exec_server::InitializeParams; +use codex_exec_server::InitializeResponse; +use common::exec_server::exec_server; +use pretty_assertions::assert_eq; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn exec_server_accepts_initialize() -> anyhow::Result<()> { + let mut server = exec_server().await?; + let initialize_id = server + .send_request( + "initialize", + serde_json::to_value(InitializeParams { + client_name: "exec-server-test".to_string(), + })?, + ) + .await?; + + let response = server.next_event().await?; + let JSONRPCMessage::Response(JSONRPCResponse { id, result }) = response else { + panic!("expected initialize response"); + }; + assert_eq!(id, initialize_id); + let initialize_response: InitializeResponse = serde_json::from_value(result)?; + assert_eq!( + initialize_response, + InitializeResponse { + protocol_version: "exec-server.v0".to_string() + } + ); + + server.shutdown().await?; + Ok(()) +} diff --git a/codex-rs/exec-server/tests/process.rs b/codex-rs/exec-server/tests/process.rs new file mode 100644 index 00000000000..23c50691bde --- /dev/null +++ b/codex-rs/exec-server/tests/process.rs @@ -0,0 +1,71 @@ +#![cfg(unix)] + +mod common; + +use codex_app_server_protocol::JSONRPCMessage; +use codex_app_server_protocol::JSONRPCResponse; +use codex_exec_server::ExecResponse; +use codex_exec_server::InitializeParams; +use common::exec_server::exec_server; +use pretty_assertions::assert_eq; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn exec_server_stubs_process_start_over_websocket() -> anyhow::Result<()> { + let mut server = exec_server().await?; + let initialize_id = server + .send_request( + "initialize", + serde_json::to_value(InitializeParams { + client_name: "exec-server-test".to_string(), + })?, + ) + .await?; + let _ = server + .wait_for_event(|event| { + matches!( + event, + JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if *id == initialize_id + ) + }) + .await?; + + server + .send_notification("initialized", serde_json::json!({})) + .await?; + + let process_start_id = server + .send_request( + "process/start", + serde_json::json!({ + "processId": "proc-1", + "argv": ["true"], + "cwd": std::env::current_dir()?, + "env": {}, + "tty": false, + "arg0": null + }), + ) + .await?; + let response = server + .wait_for_event(|event| { + matches!( + event, + JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if *id == process_start_id + ) + }) + .await?; + let JSONRPCMessage::Response(JSONRPCResponse { id, result }) = response else { + panic!("expected process/start response"); + }; + assert_eq!(id, process_start_id); + let process_start_response: ExecResponse = serde_json::from_value(result)?; + assert_eq!( + process_start_response, + ExecResponse { + process_id: "proc-1".to_string() + } + ); + + server.shutdown().await?; + Ok(()) +} diff --git a/codex-rs/exec-server/tests/stdio_smoke.rs b/codex-rs/exec-server/tests/stdio_smoke.rs index c08d7f3c9b2..bb033a9efbf 100644 --- a/codex-rs/exec-server/tests/stdio_smoke.rs +++ b/codex-rs/exec-server/tests/stdio_smoke.rs @@ -4,6 +4,8 @@ use std::process::Stdio; use std::time::Duration; use anyhow::Context; +use base64::Engine as _; +use base64::engine::general_purpose::STANDARD as BASE64_STANDARD; use codex_app_server_protocol::JSONRPCMessage; use codex_app_server_protocol::JSONRPCNotification; use codex_app_server_protocol::JSONRPCRequest; @@ -15,6 +17,13 @@ use codex_exec_server::ExecServerClient; use codex_exec_server::ExecServerClientConnectOptions; use codex_exec_server::ExecServerEvent; use codex_exec_server::ExecServerLaunchCommand; +use codex_exec_server::FsCopyParams; +use codex_exec_server::FsCreateDirectoryParams; +use codex_exec_server::FsGetMetadataParams; +use codex_exec_server::FsReadDirectoryParams; +use codex_exec_server::FsReadFileParams; +use codex_exec_server::FsRemoveParams; +use codex_exec_server::FsWriteFileParams; use codex_exec_server::InitializeParams; use codex_exec_server::InitializeResponse; use codex_exec_server::RemoteExecServerConnectArgs; @@ -32,6 +41,7 @@ use tokio::time::timeout; async fn exec_server_accepts_initialize_over_stdio() -> anyhow::Result<()> { let binary = cargo_bin("codex-exec-server")?; let mut child = Command::new(binary); + child.args(["--listen", "stdio://"]); child.stdin(Stdio::piped()); child.stdout(Stdio::piped()); child.stderr(Stdio::inherit()); @@ -200,6 +210,102 @@ async fn exec_server_client_connects_over_websocket() -> anyhow::Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn exec_server_client_filesystem_round_trip_over_stdio() -> anyhow::Result<()> { + let server = spawn_local_exec_server( + ExecServerLaunchCommand { + program: cargo_bin("codex-exec-server")?, + args: Vec::new(), + }, + ExecServerClientConnectOptions { + client_name: "exec-server-test".to_string(), + initialize_timeout: Duration::from_secs(5), + }, + ) + .await?; + let client = server.client(); + + let root = std::env::temp_dir().join(format!( + "codex-exec-server-fs-{}-{}", + std::process::id(), + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH)? + .as_nanos() + )); + let directory = root.join("dir"); + let file_path = directory.join("hello.txt"); + let copy_path = directory.join("copy.txt"); + + client + .fs_create_directory(FsCreateDirectoryParams { + path: directory.clone().try_into()?, + recursive: Some(true), + }) + .await?; + + client + .fs_write_file(FsWriteFileParams { + path: file_path.clone().try_into()?, + data_base64: BASE64_STANDARD.encode(b"hello"), + }) + .await?; + + let metadata = client + .fs_get_metadata(FsGetMetadataParams { + path: file_path.clone().try_into()?, + }) + .await?; + assert!(metadata.is_file); + assert!(!metadata.is_directory); + + let read_file = client + .fs_read_file(FsReadFileParams { + path: file_path.clone().try_into()?, + }) + .await?; + assert_eq!(read_file.data_base64, BASE64_STANDARD.encode(b"hello")); + + let read_directory = client + .fs_read_directory(FsReadDirectoryParams { + path: directory.clone().try_into()?, + }) + .await?; + assert!( + read_directory + .entries + .iter() + .any(|entry| entry.file_name == "hello.txt" && entry.is_file) + ); + + client + .fs_copy(FsCopyParams { + source_path: file_path.clone().try_into()?, + destination_path: copy_path.clone().try_into()?, + recursive: false, + }) + .await?; + let copied = client + .fs_read_file(FsReadFileParams { + path: copy_path.clone().try_into()?, + }) + .await?; + assert_eq!(copied.data_base64, BASE64_STANDARD.encode(b"hello")); + + client + .fs_remove(FsRemoveParams { + path: root.clone().try_into()?, + recursive: Some(true), + force: Some(true), + }) + .await?; + + assert!( + !root.exists(), + "filesystem cleanup should remove the test tree" + ); + Ok(()) +} + async fn read_websocket_url(lines: &mut tokio::io::Lines>) -> anyhow::Result where R: tokio::io::AsyncRead + Unpin, diff --git a/codex-rs/exec-server/tests/websocket.rs b/codex-rs/exec-server/tests/websocket.rs new file mode 100644 index 00000000000..c0c0543a824 --- /dev/null +++ b/codex-rs/exec-server/tests/websocket.rs @@ -0,0 +1,65 @@ +#![cfg(unix)] + +mod common; + +use codex_app_server_protocol::JSONRPCError; +use codex_app_server_protocol::JSONRPCMessage; +use codex_app_server_protocol::JSONRPCResponse; +use codex_exec_server::InitializeParams; +use codex_exec_server::InitializeResponse; +use common::exec_server::exec_server; +use pretty_assertions::assert_eq; + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn exec_server_reports_malformed_websocket_json_and_keeps_running() -> anyhow::Result<()> { + let mut server = exec_server().await?; + server.send_raw_text("not-json").await?; + + let response = server + .wait_for_event(|event| matches!(event, JSONRPCMessage::Error(_))) + .await?; + let JSONRPCMessage::Error(JSONRPCError { id, error }) = response else { + panic!("expected malformed-message error response"); + }; + assert_eq!(id, codex_app_server_protocol::RequestId::Integer(-1)); + assert_eq!(error.code, -32600); + assert!( + error + .message + .starts_with("failed to parse websocket JSON-RPC message from exec-server websocket"), + "unexpected malformed-message error: {}", + error.message + ); + + let initialize_id = server + .send_request( + "initialize", + serde_json::to_value(InitializeParams { + client_name: "exec-server-test".to_string(), + })?, + ) + .await?; + + let response = server + .wait_for_event(|event| { + matches!( + event, + JSONRPCMessage::Response(JSONRPCResponse { id, .. }) if *id == initialize_id + ) + }) + .await?; + let JSONRPCMessage::Response(JSONRPCResponse { id, result }) = response else { + panic!("expected initialize response after malformed input"); + }; + assert_eq!(id, initialize_id); + let initialize_response: InitializeResponse = serde_json::from_value(result)?; + assert_eq!( + initialize_response, + InitializeResponse { + protocol_version: "exec-server.v0".to_string() + } + ); + + server.shutdown().await?; + Ok(()) +}