diff --git a/codex-rs/app-server/src/codex_message_processor/plugins.rs b/codex-rs/app-server/src/codex_message_processor/plugins.rs index 072276eb216d..405dd4523b04 100644 --- a/codex-rs/app-server/src/codex_message_processor/plugins.rs +++ b/codex-rs/app-server/src/codex_message_processor/plugins.rs @@ -1,4 +1,5 @@ use super::*; +use codex_app_server_protocol::PluginInstallPolicy; impl CodexMessageProcessor { pub(super) async fn plugin_list( @@ -358,17 +359,7 @@ impl CodexMessageProcessor { let marketplace_path = match (marketplace_path, remote_marketplace_name) { (Some(marketplace_path), None) => marketplace_path, (None, Some(remote_marketplace_name)) => { - self.outgoing - .send_error( - request_id, - JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: format!( - "remote plugin install is not supported yet for marketplace {remote_marketplace_name}" - ), - data: None, - }, - ) + self.remote_plugin_install(request_id, remote_marketplace_name, plugin_name) .await; return; } @@ -426,66 +417,14 @@ impl CodexMessageProcessor { let plugin_apps = load_plugin_apps(result.installed_path.as_path()).await; let auth = self.auth_manager.auth().await; - let apps_needing_auth = if plugin_apps.is_empty() - || !config.features.apps_enabled_for_auth( + let apps_needing_auth = self + .plugin_apps_needing_auth_for_install( + &config, auth.as_ref().is_some_and(CodexAuth::is_chatgpt_auth), - ) { - Vec::new() - } else { - let environment_manager = self.thread_manager.environment_manager(); - let (all_connectors_result, accessible_connectors_result) = tokio::join!( - connectors::list_all_connectors_with_options(&config, /*force_refetch*/ true), - connectors::list_accessible_connectors_from_mcp_tools_with_environment_manager( - &config, /*force_refetch*/ true, &environment_manager - ), - ); - - let all_connectors = match all_connectors_result { - Ok(connectors) => connectors, - Err(err) => { - warn!( - plugin = result.plugin_id.as_key(), - "failed to load app metadata after plugin install: {err:#}" - ); - connectors::list_cached_all_connectors(&config) - .await - .unwrap_or_default() - } - }; - let all_connectors = - connectors::connectors_for_plugin_apps(all_connectors, &plugin_apps); - let (accessible_connectors, codex_apps_ready) = - match accessible_connectors_result { - Ok(status) => (status.connectors, status.codex_apps_ready), - Err(err) => { - warn!( - plugin = result.plugin_id.as_key(), - "failed to load accessible apps after plugin install: {err:#}" - ); - ( - connectors::list_cached_accessible_connectors_from_mcp_tools( - &config, - ) - .await - .unwrap_or_default(), - false, - ) - } - }; - if !codex_apps_ready { - warn!( - plugin = result.plugin_id.as_key(), - "codex_apps MCP not ready after plugin install; skipping appsNeedingAuth check" - ); - } - - plugin_app_helpers::plugin_apps_needing_auth( - &all_connectors, - &accessible_connectors, + &result.plugin_id.as_key(), &plugin_apps, - codex_apps_ready, ) - }; + .await; self.outgoing .send_response( @@ -542,6 +481,193 @@ impl CodexMessageProcessor { } } + async fn remote_plugin_install( + &self, + request_id: ConnectionRequestId, + remote_marketplace_name: String, + plugin_name: String, + ) { + let config = match self.load_latest_config(/*fallback_cwd*/ None).await { + Ok(config) => config, + Err(err) => { + self.outgoing.send_error(request_id, err).await; + return; + } + }; + if !config.features.enabled(Feature::Plugins) + || !config.features.enabled(Feature::RemotePlugin) + { + self.outgoing + .send_error( + request_id, + JSONRPCErrorError { + code: INVALID_REQUEST_ERROR_CODE, + message: format!( + "remote plugin install is not enabled for marketplace {remote_marketplace_name}" + ), + data: None, + }, + ) + .await; + return; + } + if plugin_name.is_empty() + || !plugin_name + .chars() + .all(|ch| ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' || ch == '~') + { + self.send_invalid_request_error( + request_id, + "invalid remote plugin id: only ASCII letters, digits, `_`, `-`, and `~` are allowed" + .to_string(), + ) + .await; + return; + } + + let auth = self.auth_manager.auth().await; + let remote_plugin_service_config = RemotePluginServiceConfig { + chatgpt_base_url: config.chatgpt_base_url.clone(), + }; + let remote_detail = match codex_core_plugins::remote::fetch_remote_plugin_detail( + &remote_plugin_service_config, + auth.as_ref(), + &remote_marketplace_name, + &plugin_name, + ) + .await + { + Ok(remote_detail) => remote_detail, + Err(err) => { + self.outgoing + .send_error( + request_id, + remote_plugin_catalog_error_to_jsonrpc( + err, + "read remote plugin details before install", + ), + ) + .await; + return; + } + }; + if remote_detail.summary.install_policy == PluginInstallPolicy::NotAvailable { + self.send_invalid_request_error( + request_id, + format!("remote plugin {plugin_name} is not available for install"), + ) + .await; + return; + } + + if let Err(err) = codex_core_plugins::remote::install_remote_plugin( + &remote_plugin_service_config, + auth.as_ref(), + &remote_marketplace_name, + &plugin_name, + ) + .await + { + self.outgoing + .send_error( + request_id, + remote_plugin_catalog_error_to_jsonrpc(err, "install remote plugin"), + ) + .await; + return; + } + + self.clear_plugin_related_caches(); + + let plugin_apps = remote_detail + .app_ids + .into_iter() + .map(codex_core::plugins::AppConnectorId) + .collect::>(); + let apps_needing_auth = self + .plugin_apps_needing_auth_for_install( + &config, + auth.as_ref().is_some_and(CodexAuth::is_chatgpt_auth), + &plugin_name, + &plugin_apps, + ) + .await; + + self.outgoing + .send_response( + request_id, + PluginInstallResponse { + auth_policy: remote_detail.summary.auth_policy, + apps_needing_auth, + }, + ) + .await; + } + + async fn plugin_apps_needing_auth_for_install( + &self, + config: &Config, + is_chatgpt_auth: bool, + plugin_id: &str, + plugin_apps: &[codex_core::plugins::AppConnectorId], + ) -> Vec { + if plugin_apps.is_empty() || !config.features.apps_enabled_for_auth(is_chatgpt_auth) { + return Vec::new(); + } + + let environment_manager = self.thread_manager.environment_manager(); + let (all_connectors_result, accessible_connectors_result) = tokio::join!( + connectors::list_all_connectors_with_options(config, /*force_refetch*/ true), + connectors::list_accessible_connectors_from_mcp_tools_with_environment_manager( + config, + /*force_refetch*/ true, + &environment_manager + ), + ); + + let all_connectors = match all_connectors_result { + Ok(connectors) => connectors, + Err(err) => { + warn!( + plugin = plugin_id, + "failed to load app metadata after plugin install: {err:#}" + ); + connectors::list_cached_all_connectors(config) + .await + .unwrap_or_default() + } + }; + let all_connectors = connectors::connectors_for_plugin_apps(all_connectors, plugin_apps); + let (accessible_connectors, codex_apps_ready) = match accessible_connectors_result { + Ok(status) => (status.connectors, status.codex_apps_ready), + Err(err) => { + warn!( + plugin = plugin_id, + "failed to load accessible apps after plugin install: {err:#}" + ); + ( + connectors::list_cached_accessible_connectors_from_mcp_tools(config) + .await + .unwrap_or_default(), + false, + ) + } + }; + if !codex_apps_ready { + warn!( + plugin = plugin_id, + "codex_apps MCP not ready after plugin install; skipping appsNeedingAuth check" + ); + } + + plugin_app_helpers::plugin_apps_needing_auth( + &all_connectors, + &accessible_connectors, + plugin_apps, + codex_apps_ready, + ) + } + pub(super) async fn plugin_uninstall( &self, request_id: ConnectionRequestId, @@ -686,7 +812,9 @@ fn remote_plugin_catalog_error_to_jsonrpc( RemotePluginCatalogError::AuthToken(_) | RemotePluginCatalogError::Request { .. } | RemotePluginCatalogError::UnexpectedStatus { .. } - | RemotePluginCatalogError::Decode { .. } => JSONRPCErrorError { + | RemotePluginCatalogError::Decode { .. } + | RemotePluginCatalogError::UnexpectedPluginId { .. } + | RemotePluginCatalogError::UnexpectedEnabledState { .. } => JSONRPCErrorError { code: INTERNAL_ERROR_CODE, message: format!("{context}: {err}"), data: None, diff --git a/codex-rs/app-server/tests/suite/v2/plugin_install.rs b/codex-rs/app-server/tests/suite/v2/plugin_install.rs index 3555dd745b08..c2ab2d1590cb 100644 --- a/codex-rs/app-server/tests/suite/v2/plugin_install.rs +++ b/codex-rs/app-server/tests/suite/v2/plugin_install.rs @@ -4,6 +4,7 @@ use std::sync::Mutex as StdMutex; use std::time::Duration; use anyhow::Result; +use anyhow::bail; use app_test_support::ChatGptAuthFixture; use app_test_support::DEFAULT_CLIENT_NAME; use app_test_support::McpProcess; @@ -44,6 +45,13 @@ use tempfile::TempDir; use tokio::net::TcpListener; use tokio::task::JoinHandle; use tokio::time::timeout; +use wiremock::Mock; +use wiremock::MockServer; +use wiremock::ResponseTemplate; +use wiremock::matchers::header; +use wiremock::matchers::method; +use wiremock::matchers::path; +use wiremock::matchers::query_param; // Plugin install tests wait on connector discovery after the install response path // starts, which is noticeably slower on Windows CI. @@ -137,8 +145,7 @@ async fn plugin_install_rejects_multiple_install_sources() -> Result<()> { } #[tokio::test] -async fn plugin_install_rejects_remote_marketplace_until_remote_install_is_supported() -> Result<()> -{ +async fn plugin_install_rejects_remote_marketplace_when_remote_plugin_is_disabled() -> Result<()> { let codex_home = TempDir::new()?; let mut mcp = McpProcess::new(codex_home.path()).await?; timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; @@ -146,8 +153,8 @@ async fn plugin_install_rejects_remote_marketplace_until_remote_install_is_suppo let request_id = mcp .send_plugin_install_request(PluginInstallParams { marketplace_path: None, - remote_marketplace_name: Some("openai-curated".to_string()), - plugin_name: "sample-plugin".to_string(), + remote_marketplace_name: Some("chatgpt-global".to_string()), + plugin_name: "plugins~Plugin_sample".to_string(), }) .await?; @@ -161,9 +168,143 @@ async fn plugin_install_rejects_remote_marketplace_until_remote_install_is_suppo assert!( err.error .message - .contains("remote plugin install is not supported yet") + .contains("remote plugin install is not enabled") + ); + assert!(err.error.message.contains("chatgpt-global")); + Ok(()) +} + +#[tokio::test] +async fn plugin_install_writes_remote_plugin_to_cloud_when_remote_plugin_enabled() -> Result<()> { + let codex_home = TempDir::new()?; + let server = MockServer::start().await; + write_remote_plugin_catalog_config( + codex_home.path(), + &format!("{}/backend-api/", server.uri()), + )?; + write_chatgpt_auth( + codex_home.path(), + ChatGptAuthFixture::new("chatgpt-token") + .account_id("account-123") + .chatgpt_user_id("user-123") + .chatgpt_account_id("account-123"), + AuthCredentialsStoreMode::File, + )?; + + let detail_body = r#"{ + "id": "plugins~Plugin_linear", + "name": "linear", + "scope": "GLOBAL", + "installation_policy": "AVAILABLE", + "authentication_policy": "ON_USE", + "release": { + "display_name": "Linear", + "description": "Track work in Linear", + "app_ids": [], + "interface": { + "short_description": "Plan and track work" + }, + "skills": [] + } +}"#; + let empty_installed_body = r#"{ + "plugins": [], + "pagination": { + "limit": 50, + "next_page_token": null + } +}"#; + + Mock::given(method("GET")) + .and(path("/backend-api/ps/plugins/plugins~Plugin_linear")) + .and(header("authorization", "Bearer chatgpt-token")) + .and(header("chatgpt-account-id", "account-123")) + .respond_with(ResponseTemplate::new(200).set_body_string(detail_body)) + .mount(&server) + .await; + Mock::given(method("GET")) + .and(path("/backend-api/ps/plugins/installed")) + .and(query_param("scope", "GLOBAL")) + .and(header("authorization", "Bearer chatgpt-token")) + .and(header("chatgpt-account-id", "account-123")) + .respond_with(ResponseTemplate::new(200).set_body_string(empty_installed_body)) + .mount(&server) + .await; + Mock::given(method("POST")) + .and(path( + "/backend-api/ps/plugins/plugins~Plugin_linear/install", + )) + .and(header("authorization", "Bearer chatgpt-token")) + .and(header("chatgpt-account-id", "account-123")) + .respond_with( + ResponseTemplate::new(200) + .set_body_string(r#"{"id":"plugins~Plugin_linear","enabled":true}"#), + ) + .mount(&server) + .await; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; + + let request_id = mcp + .send_plugin_install_request(PluginInstallParams { + marketplace_path: None, + remote_marketplace_name: Some("chatgpt-global".to_string()), + plugin_name: "plugins~Plugin_linear".to_string(), + }) + .await?; + let response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + let response: PluginInstallResponse = to_response(response)?; + + assert_eq!( + response, + PluginInstallResponse { + auth_policy: PluginAuthPolicy::OnUse, + apps_needing_auth: Vec::new(), + } + ); + wait_for_remote_plugin_request_count( + &server, + "POST", + "/ps/plugins/plugins~Plugin_linear/install", + /*expected_count*/ 1, + ) + .await?; + Ok(()) +} + +#[tokio::test] +async fn plugin_install_rejects_invalid_remote_plugin_name() -> Result<()> { + let codex_home = TempDir::new()?; + write_remote_plugin_catalog_config(codex_home.path(), "https://example.invalid/backend-api/")?; + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; + + let request_id = mcp + .send_plugin_install_request(PluginInstallParams { + marketplace_path: None, + remote_marketplace_name: Some("chatgpt-global".to_string()), + plugin_name: "linear/../../oops".to_string(), + }) + .await?; + + let err = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_error_message(RequestId::Integer(request_id)), + ) + .await??; + + assert_eq!(err.error.code, -32600); + assert!(err.error.message.contains("invalid remote plugin id")); + assert!( + err.error + .message + .contains("only ASCII letters, digits, `_`, `-`, and `~` are allowed") ); - assert!(err.error.message.contains("openai-curated")); Ok(()) } @@ -773,6 +914,56 @@ fn write_analytics_config(codex_home: &std::path::Path, base_url: &str) -> std:: ) } +fn write_remote_plugin_catalog_config( + codex_home: &std::path::Path, + base_url: &str, +) -> std::io::Result<()> { + std::fs::write( + codex_home.join("config.toml"), + format!( + r#" +chatgpt_base_url = "{base_url}" + +[features] +plugins = true +remote_plugin = true +"# + ), + ) +} + +async fn wait_for_remote_plugin_request_count( + server: &MockServer, + method_name: &str, + path_suffix: &str, + expected_count: usize, +) -> Result<()> { + timeout(DEFAULT_TIMEOUT, async { + loop { + let Some(requests) = server.received_requests().await else { + bail!("wiremock did not record requests"); + }; + let request_count = requests + .iter() + .filter(|request| { + request.method == method_name && request.url.path().ends_with(path_suffix) + }) + .count(); + if request_count == expected_count { + return Ok::<(), anyhow::Error>(()); + } + if request_count > expected_count { + bail!( + "expected exactly {expected_count} {method_name} {path_suffix} requests, got {request_count}" + ); + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await??; + Ok(()) +} + fn write_plugin_marketplace( repo_root: &std::path::Path, marketplace_name: &str, diff --git a/codex-rs/core-plugins/src/remote.rs b/codex-rs/core-plugins/src/remote.rs index 2b16f435b27e..e453c52f97e4 100644 --- a/codex-rs/core-plugins/src/remote.rs +++ b/codex-rs/core-plugins/src/remote.rs @@ -107,6 +107,20 @@ pub enum RemotePluginCatalogError { expected_marketplace_name: String, actual_marketplace_name: String, }, + + #[error( + "remote plugin install returned unexpected plugin id: expected `{expected}`, got `{actual}`" + )] + UnexpectedPluginId { expected: String, actual: String }, + + #[error( + "remote plugin install returned unexpected enabled state for `{plugin_id}`: expected {expected_enabled}, got {actual_enabled}" + )] + UnexpectedEnabledState { + plugin_id: String, + expected_enabled: bool, + actual_enabled: bool, + }, } #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Deserialize)] @@ -258,6 +272,12 @@ struct RemotePluginInstalledResponse { pagination: RemotePluginPagination, } +#[derive(Debug, Clone, PartialEq, Eq, Deserialize)] +struct RemotePluginInstallResponse { + id: String, + enabled: bool, +} + pub async fn fetch_remote_marketplaces( config: &RemotePluginServiceConfig, auth: Option<&CodexAuth>, @@ -418,6 +438,41 @@ pub async fn fetch_remote_plugin_detail( }) } +pub async fn install_remote_plugin( + config: &RemotePluginServiceConfig, + auth: Option<&CodexAuth>, + marketplace_name: &str, + plugin_id: &str, +) -> Result<(), RemotePluginCatalogError> { + let auth = ensure_chatgpt_auth(auth)?; + if RemotePluginScope::from_marketplace_name(marketplace_name).is_none() { + return Err(RemotePluginCatalogError::UnknownMarketplace { + marketplace_name: marketplace_name.to_string(), + }); + } + + let base_url = config.chatgpt_base_url.trim_end_matches('/'); + let url = format!("{base_url}/ps/plugins/{plugin_id}/install"); + let client = build_reqwest_client(); + let request = authenticated_request(client.post(&url), auth)?; + let response: RemotePluginInstallResponse = send_and_decode(request, &url).await?; + if response.id != plugin_id { + return Err(RemotePluginCatalogError::UnexpectedPluginId { + expected: plugin_id.to_string(), + actual: response.id, + }); + } + if !response.enabled { + return Err(RemotePluginCatalogError::UnexpectedEnabledState { + plugin_id: plugin_id.to_string(), + expected_enabled: true, + actual_enabled: response.enabled, + }); + } + + Ok(()) +} + fn build_remote_plugin_summary( plugin: &RemotePluginDirectoryItem, installed_plugin: Option<&RemotePluginInstalledItem>,