Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
266 changes: 197 additions & 69 deletions codex-rs/app-server/src/codex_message_processor/plugins.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::*;
use codex_app_server_protocol::PluginInstallPolicy;

impl CodexMessageProcessor {
pub(super) async fn plugin_list(
Expand Down Expand Up @@ -358,17 +359,7 @@ impl CodexMessageProcessor {
let marketplace_path = match (marketplace_path, remote_marketplace_name) {
(Some(marketplace_path), None) => marketplace_path,
(None, Some(remote_marketplace_name)) => {
self.outgoing
.send_error(
request_id,
JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"remote plugin install is not supported yet for marketplace {remote_marketplace_name}"
),
data: None,
},
)
self.remote_plugin_install(request_id, remote_marketplace_name, plugin_name)
.await;
return;
}
Expand Down Expand Up @@ -426,66 +417,14 @@ impl CodexMessageProcessor {

let plugin_apps = load_plugin_apps(result.installed_path.as_path()).await;
let auth = self.auth_manager.auth().await;
let apps_needing_auth = if plugin_apps.is_empty()
|| !config.features.apps_enabled_for_auth(
let apps_needing_auth = self
.plugin_apps_needing_auth_for_install(
&config,
auth.as_ref().is_some_and(CodexAuth::is_chatgpt_auth),
) {
Vec::new()
} else {
let environment_manager = self.thread_manager.environment_manager();
let (all_connectors_result, accessible_connectors_result) = tokio::join!(
connectors::list_all_connectors_with_options(&config, /*force_refetch*/ true),
connectors::list_accessible_connectors_from_mcp_tools_with_environment_manager(
&config, /*force_refetch*/ true, &environment_manager
),
);

let all_connectors = match all_connectors_result {
Ok(connectors) => connectors,
Err(err) => {
warn!(
plugin = result.plugin_id.as_key(),
"failed to load app metadata after plugin install: {err:#}"
);
connectors::list_cached_all_connectors(&config)
.await
.unwrap_or_default()
}
};
let all_connectors =
connectors::connectors_for_plugin_apps(all_connectors, &plugin_apps);
let (accessible_connectors, codex_apps_ready) =
match accessible_connectors_result {
Ok(status) => (status.connectors, status.codex_apps_ready),
Err(err) => {
warn!(
plugin = result.plugin_id.as_key(),
"failed to load accessible apps after plugin install: {err:#}"
);
(
connectors::list_cached_accessible_connectors_from_mcp_tools(
&config,
)
.await
.unwrap_or_default(),
false,
)
}
};
if !codex_apps_ready {
warn!(
plugin = result.plugin_id.as_key(),
"codex_apps MCP not ready after plugin install; skipping appsNeedingAuth check"
);
}

plugin_app_helpers::plugin_apps_needing_auth(
&all_connectors,
&accessible_connectors,
&result.plugin_id.as_key(),
&plugin_apps,
codex_apps_ready,
)
};
.await;

self.outgoing
.send_response(
Expand Down Expand Up @@ -542,6 +481,193 @@ impl CodexMessageProcessor {
}
}

async fn remote_plugin_install(
&self,
request_id: ConnectionRequestId,
remote_marketplace_name: String,
plugin_name: String,
) {
let config = match self.load_latest_config(/*fallback_cwd*/ None).await {
Ok(config) => config,
Err(err) => {
self.outgoing.send_error(request_id, err).await;
return;
}
};
if !config.features.enabled(Feature::Plugins)
|| !config.features.enabled(Feature::RemotePlugin)
{
self.outgoing
.send_error(
request_id,
JSONRPCErrorError {
code: INVALID_REQUEST_ERROR_CODE,
message: format!(
"remote plugin install is not enabled for marketplace {remote_marketplace_name}"
),
data: None,
},
)
.await;
return;
}
if plugin_name.is_empty()
|| !plugin_name
.chars()
.all(|ch| ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' || ch == '~')
{
self.send_invalid_request_error(
request_id,
"invalid remote plugin id: only ASCII letters, digits, `_`, `-`, and `~` are allowed"
.to_string(),
)
.await;
return;
}

let auth = self.auth_manager.auth().await;
let remote_plugin_service_config = RemotePluginServiceConfig {
chatgpt_base_url: config.chatgpt_base_url.clone(),
};
let remote_detail = match codex_core_plugins::remote::fetch_remote_plugin_detail(
&remote_plugin_service_config,
auth.as_ref(),
&remote_marketplace_name,
&plugin_name,
)
.await
{
Ok(remote_detail) => remote_detail,
Err(err) => {
self.outgoing
.send_error(
request_id,
remote_plugin_catalog_error_to_jsonrpc(
err,
"read remote plugin details before install",
),
)
.await;
return;
}
};
if remote_detail.summary.install_policy == PluginInstallPolicy::NotAvailable {
self.send_invalid_request_error(
request_id,
format!("remote plugin {plugin_name} is not available for install"),
)
.await;
return;
}

if let Err(err) = codex_core_plugins::remote::install_remote_plugin(
&remote_plugin_service_config,
auth.as_ref(),
&remote_marketplace_name,
&plugin_name,
)
.await
{
self.outgoing
.send_error(
request_id,
remote_plugin_catalog_error_to_jsonrpc(err, "install remote plugin"),
)
.await;
return;
}

self.clear_plugin_related_caches();

let plugin_apps = remote_detail
.app_ids
.into_iter()
.map(codex_core::plugins::AppConnectorId)
.collect::<Vec<_>>();
let apps_needing_auth = self
.plugin_apps_needing_auth_for_install(
&config,
auth.as_ref().is_some_and(CodexAuth::is_chatgpt_auth),
&plugin_name,
&plugin_apps,
)
.await;

self.outgoing
.send_response(
request_id,
PluginInstallResponse {
auth_policy: remote_detail.summary.auth_policy,
apps_needing_auth,
},
)
.await;
}

async fn plugin_apps_needing_auth_for_install(
&self,
config: &Config,
is_chatgpt_auth: bool,
plugin_id: &str,
plugin_apps: &[codex_core::plugins::AppConnectorId],
) -> Vec<AppSummary> {
if plugin_apps.is_empty() || !config.features.apps_enabled_for_auth(is_chatgpt_auth) {
return Vec::new();
}

let environment_manager = self.thread_manager.environment_manager();
let (all_connectors_result, accessible_connectors_result) = tokio::join!(
connectors::list_all_connectors_with_options(config, /*force_refetch*/ true),
connectors::list_accessible_connectors_from_mcp_tools_with_environment_manager(
config,
/*force_refetch*/ true,
&environment_manager
),
);

let all_connectors = match all_connectors_result {
Ok(connectors) => connectors,
Err(err) => {
warn!(
plugin = plugin_id,
"failed to load app metadata after plugin install: {err:#}"
);
connectors::list_cached_all_connectors(config)
.await
.unwrap_or_default()
}
};
let all_connectors = connectors::connectors_for_plugin_apps(all_connectors, plugin_apps);
let (accessible_connectors, codex_apps_ready) = match accessible_connectors_result {
Ok(status) => (status.connectors, status.codex_apps_ready),
Err(err) => {
warn!(
plugin = plugin_id,
"failed to load accessible apps after plugin install: {err:#}"
);
(
connectors::list_cached_accessible_connectors_from_mcp_tools(config)
.await
.unwrap_or_default(),
false,
)
}
};
if !codex_apps_ready {
warn!(
plugin = plugin_id,
"codex_apps MCP not ready after plugin install; skipping appsNeedingAuth check"
);
}

plugin_app_helpers::plugin_apps_needing_auth(
&all_connectors,
&accessible_connectors,
plugin_apps,
codex_apps_ready,
)
}

pub(super) async fn plugin_uninstall(
&self,
request_id: ConnectionRequestId,
Expand Down Expand Up @@ -686,7 +812,9 @@ fn remote_plugin_catalog_error_to_jsonrpc(
RemotePluginCatalogError::AuthToken(_)
| RemotePluginCatalogError::Request { .. }
| RemotePluginCatalogError::UnexpectedStatus { .. }
| RemotePluginCatalogError::Decode { .. } => JSONRPCErrorError {
| RemotePluginCatalogError::Decode { .. }
| RemotePluginCatalogError::UnexpectedPluginId { .. }
| RemotePluginCatalogError::UnexpectedEnabledState { .. } => JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("{context}: {err}"),
data: None,
Expand Down
Loading
Loading