Skip to content
Closed
4 changes: 2 additions & 2 deletions codex-rs/app-server/src/fs_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use codex_app_server_protocol::FsWriteFileResponse;
use codex_app_server_protocol::JSONRPCErrorError;
use codex_exec_server::CopyOptions;
use codex_exec_server::CreateDirectoryOptions;
use codex_exec_server::Environment;
use codex_exec_server::ExecutorFileSystem;
use codex_exec_server::LocalFileSystem;
use codex_exec_server::RemoveOptions;
use std::io;
use std::sync::Arc;
Expand All @@ -34,7 +34,7 @@ pub(crate) struct FsApi {
impl Default for FsApi {
fn default() -> Self {
Self {
file_system: Arc::new(Environment::default().get_filesystem()),
file_system: Arc::new(LocalFileSystem),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion codex-rs/exec-server/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ use tracing::debug;
use tracing::warn;

use crate::client_api::ExecServerClientConnectOptions;
use crate::client_api::ExecServerEvent;
use crate::client_api::RemoteExecServerConnectArgs;
use crate::connection::JsonRpcConnection;
use crate::process::ExecServerEvent;
use crate::protocol::EXEC_EXITED_METHOD;
use crate::protocol::EXEC_METHOD;
use crate::protocol::EXEC_OUTPUT_DELTA_METHOD;
Expand Down
10 changes: 0 additions & 10 deletions codex-rs/exec-server/src/client_api.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
use std::time::Duration;

use crate::protocol::ExecExitedNotification;
use crate::protocol::ExecOutputDeltaNotification;

/// Connection options for any exec-server client transport.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ExecServerClientConnectOptions {
Expand All @@ -18,10 +15,3 @@ pub struct RemoteExecServerConnectArgs {
pub connect_timeout: Duration,
pub initialize_timeout: Duration,
}

/// Connection-level server events.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ExecServerEvent {
OutputDelta(ExecOutputDeltaNotification),
Exited(ExecExitedNotification),
}
41 changes: 25 additions & 16 deletions codex-rs/exec-server/src/environment.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
use crate::ExecServerClient;
use crate::ExecServerClientConnectOptions;
use crate::ExecServerError;
use crate::RemoteExecServerConnectArgs;
use crate::fs;
use crate::fs::ExecutorFileSystem;
use crate::local_process::LocalExecProcess;
use crate::process::ExecProcess;
use crate::remote_process::RemoteExecProcess;
use std::sync::Arc;

#[derive(Clone, Default)]
#[derive(Clone)]
pub struct Environment {
experimental_exec_server_url: Option<String>,
remote_exec_server_client: Option<ExecServerClient>,
executor: Arc<dyn ExecProcess>,
}

impl std::fmt::Debug for Environment {
Expand All @@ -19,7 +24,7 @@ impl std::fmt::Debug for Environment {
)
.field(
"has_remote_exec_server_client",
&self.remote_exec_server_client.is_some(),
&self.experimental_exec_server_url.is_some(),
)
.finish()
}
Expand All @@ -29,38 +34,43 @@ impl Environment {
pub async fn create(
experimental_exec_server_url: Option<String>,
) -> Result<Self, ExecServerError> {
let remote_exec_server_client =
let executor: Arc<dyn ExecProcess> =
if let Some(websocket_url) = experimental_exec_server_url.as_deref() {
Some(
ExecServerClient::connect_websocket(RemoteExecServerConnectArgs::new(
websocket_url.to_string(),
"codex-core".to_string(),
))
.await?,
)
let client = ExecServerClient::connect_websocket(RemoteExecServerConnectArgs::new(
websocket_url.to_string(),
"codex-core".to_string(),
))
.await?;
Arc::new(RemoteExecProcess::new(client))
} else {
None
Arc::new(LocalExecProcess::new())
};

Ok(Self {
experimental_exec_server_url,
remote_exec_server_client,
executor,
})
}

pub fn experimental_exec_server_url(&self) -> Option<&str> {
self.experimental_exec_server_url.as_deref()
}

pub fn remote_exec_server_client(&self) -> Option<&ExecServerClient> {
self.remote_exec_server_client.as_ref()
pub fn get_executor(&self) -> Arc<dyn ExecProcess> {
Arc::clone(&self.executor)
}

pub fn get_filesystem(&self) -> impl ExecutorFileSystem + use<> {
fs::LocalFileSystem
}
}

impl crate::ExecutorEnvironment for Environment {
fn get_executor(&self) -> Arc<dyn ExecProcess> {
self.get_executor()
}
}

#[cfg(test)]
mod tests {
use super::Environment;
Expand All @@ -71,6 +81,5 @@ mod tests {
let environment = Environment::create(None).await.expect("create environment");

assert_eq!(environment.experimental_exec_server_url(), None);
assert!(environment.remote_exec_server_client().is_none());
}
}
2 changes: 1 addition & 1 deletion codex-rs/exec-server/src/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub trait ExecutorFileSystem: Send + Sync {
}

#[derive(Clone, Default)]
pub(crate) struct LocalFileSystem;
pub struct LocalFileSystem;

#[async_trait]
impl ExecutorFileSystem for LocalFileSystem {
Expand Down
10 changes: 9 additions & 1 deletion codex-rs/exec-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ mod client_api;
mod connection;
mod environment;
mod fs;
mod local_process;
mod process;
mod protocol;
mod remote_process;
mod rpc;
mod server;

pub use client::ExecServerClient;
pub use client::ExecServerError;
pub use client_api::ExecServerClientConnectOptions;
pub use client_api::ExecServerEvent;
pub use client_api::RemoteExecServerConnectArgs;
pub use codex_app_server_protocol::FsCopyParams;
pub use codex_app_server_protocol::FsCopyResponse;
Expand All @@ -33,8 +35,11 @@ pub use fs::CreateDirectoryOptions;
pub use fs::ExecutorFileSystem;
pub use fs::FileMetadata;
pub use fs::FileSystemResult;
pub use fs::LocalFileSystem;
pub use fs::ReadDirectoryEntry;
pub use fs::RemoveOptions;
pub use process::ExecProcess;
pub use process::ExecServerEvent;
pub use protocol::ExecExitedNotification;
pub use protocol::ExecOutputDeltaNotification;
pub use protocol::ExecOutputStream;
Expand All @@ -50,5 +55,8 @@ pub use protocol::WriteParams;
pub use protocol::WriteResponse;
pub use server::DEFAULT_LISTEN_URL;
pub use server::ExecServerListenUrlParseError;
pub trait ExecutorEnvironment: Send + Sync {
fn get_executor(&self) -> std::sync::Arc<dyn ExecProcess>;
}
pub use server::run_main;
pub use server::run_main_with_listen_url;
146 changes: 146 additions & 0 deletions codex-rs/exec-server/src/local_process.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
use std::sync::Arc;

use async_trait::async_trait;
use codex_app_server_protocol::JSONRPCErrorError;
use serde_json::Value;
use tokio::sync::broadcast;
use tokio::sync::mpsc;

use crate::ExecProcess;
use crate::ExecServerError;
use crate::ExecServerEvent;
use crate::process::ExecServerEvent::Exited;
use crate::process::ExecServerEvent::OutputDelta;
use crate::protocol::EXEC_EXITED_METHOD;
use crate::protocol::EXEC_OUTPUT_DELTA_METHOD;
use crate::protocol::ExecExitedNotification;
use crate::protocol::ExecOutputDeltaNotification;
use crate::protocol::ExecParams;
use crate::protocol::ExecResponse;
use crate::protocol::ReadParams;
use crate::protocol::ReadResponse;
use crate::protocol::TerminateParams;
use crate::protocol::TerminateResponse;
use crate::protocol::WriteParams;
use crate::protocol::WriteResponse;
use crate::rpc::RpcNotificationSender;
use crate::rpc::RpcServerOutboundMessage;
use crate::server::ProcessHandler;

#[derive(Clone)]
pub(crate) struct LocalExecProcess {
inner: Arc<Inner>,
}

struct Inner {
process_handler: ProcessHandler,
events_tx: broadcast::Sender<ExecServerEvent>,
reader_task: tokio::task::JoinHandle<()>,
}

impl Drop for Inner {
fn drop(&mut self) {
if let Ok(handle) = tokio::runtime::Handle::try_current() {
let process_handler = self.process_handler.clone();
handle.spawn(async move {
process_handler.shutdown().await;
});
}
self.reader_task.abort();
}
}

impl LocalExecProcess {
pub(crate) fn new() -> Self {
let (outgoing_tx, mut outgoing_rx) = mpsc::channel::<RpcServerOutboundMessage>(256);
let process_handler = ProcessHandler::new(RpcNotificationSender::new(outgoing_tx));
let events_tx = broadcast::channel(256).0;
let events_tx_for_task = events_tx.clone();
let reader_task = tokio::spawn(async move {
while let Some(message) = outgoing_rx.recv().await {
if let RpcServerOutboundMessage::Notification(notification) = message {
match notification.method.as_str() {
EXEC_OUTPUT_DELTA_METHOD => {
if let Ok(params) = serde_json::from_value::<ExecOutputDeltaNotification>(
notification.params.unwrap_or(Value::Null),
) {
let _ = events_tx_for_task.send(OutputDelta(params));
}
}
EXEC_EXITED_METHOD => {
if let Ok(params) = serde_json::from_value::<ExecExitedNotification>(
notification.params.unwrap_or(Value::Null),
) {
let _ = events_tx_for_task.send(Exited(params));
}
}
_ => {}
}
}
}
});

Self {
inner: Arc::new(Inner {
process_handler,
events_tx,
reader_task,
}),
}
}
}

#[async_trait]
impl ExecProcess for LocalExecProcess {
async fn start(&self, params: ExecParams) -> Result<ExecResponse, ExecServerError> {
self.inner
.process_handler
.exec(params)
.await
.map_err(map_local_error)
}

async fn read(&self, params: ReadParams) -> Result<ReadResponse, ExecServerError> {
self.inner
.process_handler
.exec_read(params)
.await
.map_err(map_local_error)
}

async fn write(
&self,
process_id: &str,
chunk: Vec<u8>,
) -> Result<WriteResponse, ExecServerError> {
self.inner
.process_handler
.exec_write(WriteParams {
process_id: process_id.to_string(),
chunk: chunk.into(),
})
.await
.map_err(map_local_error)
}

async fn terminate(&self, process_id: &str) -> Result<TerminateResponse, ExecServerError> {
self.inner
.process_handler
.terminate(TerminateParams {
process_id: process_id.to_string(),
})
.await
.map_err(map_local_error)
}

fn subscribe_events(&self) -> broadcast::Receiver<ExecServerEvent> {
self.inner.events_tx.subscribe()
}
}

fn map_local_error(error: JSONRPCErrorError) -> ExecServerError {
ExecServerError::Server {
code: error.code,
message: error.message,
}
}
35 changes: 35 additions & 0 deletions codex-rs/exec-server/src/process.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use async_trait::async_trait;
use tokio::sync::broadcast;

use crate::ExecServerError;
use crate::protocol::ExecExitedNotification;
use crate::protocol::ExecOutputDeltaNotification;
use crate::protocol::ExecParams;
use crate::protocol::ExecResponse;
use crate::protocol::ReadParams;
use crate::protocol::ReadResponse;
use crate::protocol::TerminateResponse;
use crate::protocol::WriteResponse;

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ExecServerEvent {
OutputDelta(ExecOutputDeltaNotification),
Exited(ExecExitedNotification),
}

#[async_trait]
pub trait ExecProcess: Send + Sync {
async fn start(&self, params: ExecParams) -> Result<ExecResponse, ExecServerError>;

async fn read(&self, params: ReadParams) -> Result<ReadResponse, ExecServerError>;

async fn write(
&self,
process_id: &str,
chunk: Vec<u8>,
) -> Result<WriteResponse, ExecServerError>;

async fn terminate(&self, process_id: &str) -> Result<TerminateResponse, ExecServerError>;

fn subscribe_events(&self) -> broadcast::Receiver<ExecServerEvent>;
}
Loading
Loading