Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion codex-rs/app-server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ Example with notification opt-out:
- `skills/changed` — notification emitted when watched local skill files change.
- `app/list` — list available apps.
- `skills/config/write` — write user-level skill config by path.
- `plugin/install` — install a plugin from a discovered marketplace entry, rejecting marketplace entries marked unavailable for install, and return the effective plugin auth policy plus any apps that still need auth (**under development; do not call from production clients yet**).
- `plugin/install` — install a plugin from a discovered marketplace entry, rejecting marketplace entries marked unavailable for install, install MCPs if any, and return the effective plugin auth policy plus any apps that still need auth (**under development; do not call from production clients yet**).
- `plugin/uninstall` — uninstall a plugin by id by removing its cached files and clearing its user-level config entry (**under development; do not call from production clients yet**).
- `mcpServer/oauth/login` — start an OAuth login for a configured MCP server; returns an `authorization_url` and later emits `mcpServer/oauthLogin/completed` once the browser flow finishes.
- `tool/requestUserInput` — prompt the user with 1–3 short questions for a tool call and return their answers (experimental).
Expand Down
52 changes: 37 additions & 15 deletions codex-rs/app-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ use codex_core::plugins::PluginInstallRequest;
use codex_core::plugins::PluginReadRequest;
use codex_core::plugins::PluginUninstallError as CorePluginUninstallError;
use codex_core::plugins::load_plugin_apps;
use codex_core::plugins::load_plugin_mcp_servers;
use codex_core::read_head_for_summary;
use codex_core::read_session_meta_line;
use codex_core::rollout_date_parts;
Expand Down Expand Up @@ -311,6 +312,7 @@ use codex_app_server_protocol::ServerRequest;

mod apps_list_helpers;
mod plugin_app_helpers;
mod plugin_mcp_oauth;

use crate::filters::compute_source_filters;
use crate::filters::source_kind_matches;
Expand Down Expand Up @@ -4587,36 +4589,42 @@ impl CodexMessageProcessor {
}
};

let configured_servers = self
.thread_manager
.mcp_manager()
.configured_servers(&config);
if let Err(error) = self.queue_mcp_server_refresh_for_config(&config).await {
self.outgoing.send_error(request_id, error).await;
return;
}

let response = McpServerRefreshResponse {};
self.outgoing.send_response(request_id, response).await;
}

async fn queue_mcp_server_refresh_for_config(
&self,
config: &Config,
) -> Result<(), JSONRPCErrorError> {
let configured_servers = self.thread_manager.mcp_manager().configured_servers(config);
let mcp_servers = match serde_json::to_value(configured_servers) {
Ok(value) => value,
Err(err) => {
let error = JSONRPCErrorError {
return Err(JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to serialize MCP servers: {err}"),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
});
}
};

let mcp_oauth_credentials_store_mode =
match serde_json::to_value(config.mcp_oauth_credentials_store_mode) {
Ok(value) => value,
Err(err) => {
let error = JSONRPCErrorError {
return Err(JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!(
"failed to serialize MCP OAuth credentials store mode: {err}"
),
data: None,
};
self.outgoing.send_error(request_id, error).await;
return;
});
}
};

Expand All @@ -4629,8 +4637,7 @@ impl CodexMessageProcessor {
// active turn to avoid work for threads that never resume.
let thread_manager = Arc::clone(&self.thread_manager);
thread_manager.refresh_mcp_servers(refresh_config).await;
let response = McpServerRefreshResponse {};
self.outgoing.send_response(request_id, response).await;
Ok(())
}

async fn mcp_server_oauth_login(
Expand Down Expand Up @@ -5742,6 +5749,22 @@ impl CodexMessageProcessor {
self.config.as_ref().clone()
}
};

self.clear_plugin_related_caches();

let plugin_mcp_servers = load_plugin_mcp_servers(result.installed_path.as_path());

if !plugin_mcp_servers.is_empty() {
if let Err(err) = self.queue_mcp_server_refresh_for_config(&config).await {
warn!(
plugin = result.plugin_id.as_key(),
"failed to queue MCP refresh after plugin install: {err:?}"
);
}
self.start_plugin_mcp_oauth_logins(&config, plugin_mcp_servers)
.await;
}

let plugin_apps = load_plugin_apps(result.installed_path.as_path());
let apps_needing_auth = if plugin_apps.is_empty()
|| !config.features.apps_enabled(Some(&self.auth_manager)).await
Expand Down Expand Up @@ -5802,7 +5825,6 @@ impl CodexMessageProcessor {
)
};

self.clear_plugin_related_caches();
self.outgoing
.send_response(
request_id,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
use std::collections::HashMap;
use std::sync::Arc;

use codex_app_server_protocol::McpServerOauthLoginCompletedNotification;
use codex_app_server_protocol::ServerNotification;
use codex_core::config::Config;
use codex_core::config::types::McpServerConfig;
use codex_core::mcp::auth::McpOAuthLoginSupport;
use codex_core::mcp::auth::oauth_login_support;
use codex_core::mcp::auth::resolve_oauth_scopes;
use codex_core::mcp::auth::should_retry_without_scopes;
use codex_rmcp_client::perform_oauth_login;
use tracing::warn;

use super::CodexMessageProcessor;

impl CodexMessageProcessor {
pub(super) async fn start_plugin_mcp_oauth_logins(
&self,
config: &Config,
plugin_mcp_servers: HashMap<String, McpServerConfig>,
) {
for (name, server) in plugin_mcp_servers {
let oauth_config = match oauth_login_support(&server.transport).await {
McpOAuthLoginSupport::Supported(config) => config,
McpOAuthLoginSupport::Unsupported => continue,
McpOAuthLoginSupport::Unknown(err) => {
warn!(
"MCP server may or may not require login for plugin install {name}: {err}"
);
continue;
}
};

let resolved_scopes = resolve_oauth_scopes(
/*explicit_scopes*/ None,
server.scopes.clone(),
oauth_config.discovered_scopes.clone(),
);

let store_mode = config.mcp_oauth_credentials_store_mode;
let callback_port = config.mcp_oauth_callback_port;
let callback_url = config.mcp_oauth_callback_url.clone();
let outgoing = Arc::clone(&self.outgoing);
let notification_name = name.clone();

tokio::spawn(async move {
let first_attempt = perform_oauth_login(
&name,
&oauth_config.url,
store_mode,
oauth_config.http_headers.clone(),
oauth_config.env_http_headers.clone(),
&resolved_scopes.scopes,
server.oauth_resource.as_deref(),
callback_port,
callback_url.as_deref(),
)
.await;

let final_result = match first_attempt {
Err(err) if should_retry_without_scopes(&resolved_scopes, &err) => {
perform_oauth_login(
&name,
&oauth_config.url,
store_mode,
oauth_config.http_headers,
oauth_config.env_http_headers,
&[],
server.oauth_resource.as_deref(),
callback_port,
callback_url.as_deref(),
)
.await
}
result => result,
};

let (success, error) = match final_result {
Ok(()) => (true, None),
Err(err) => (false, Some(err.to_string())),
};

let notification = ServerNotification::McpServerOauthLoginCompleted(
McpServerOauthLoginCompletedNotification {
name: notification_name,
success,
error,
},
);
outgoing.send_server_notification(notification).await;
});
}
}
}
73 changes: 73 additions & 0 deletions codex-rs/app-server/tests/suite/v2/plugin_install.rs
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,79 @@ async fn plugin_install_filters_disallowed_apps_needing_auth() -> Result<()> {
Ok(())
}

#[tokio::test]
async fn plugin_install_makes_bundled_mcp_servers_available_to_followup_requests() -> Result<()> {
let codex_home = TempDir::new()?;
std::fs::write(
codex_home.path().join("config.toml"),
"[features]\nplugins = true\n",
)?;
let repo_root = TempDir::new()?;
write_plugin_marketplace(
repo_root.path(),
"debug",
"sample-plugin",
"./sample-plugin",
None,
None,
)?;
write_plugin_source(repo_root.path(), "sample-plugin", &[])?;
std::fs::write(
repo_root.path().join("sample-plugin/.mcp.json"),
r#"{
"mcpServers": {
"sample-mcp": {
"command": "echo"
}
}
}"#,
)?;
let marketplace_path =
AbsolutePathBuf::try_from(repo_root.path().join(".agents/plugins/marketplace.json"))?;

let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;

let request_id = mcp
.send_plugin_install_request(PluginInstallParams {
marketplace_path,
plugin_name: "sample-plugin".to_string(),
force_remote_sync: false,
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: PluginInstallResponse = to_response(response)?;
assert_eq!(response.apps_needing_auth, Vec::<AppSummary>::new());
let config = std::fs::read_to_string(codex_home.path().join("config.toml"))?;
assert!(!config.contains("[mcp_servers.sample-mcp]"));
assert!(!config.contains("command = \"echo\""));

let request_id = mcp
.send_raw_request(
"mcpServer/oauth/login",
Some(json!({
"name": "sample-mcp",
})),
)
.await?;
let err = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_error_message(RequestId::Integer(request_id)),
)
.await??;

assert_eq!(err.error.code, -32600);
assert_eq!(
err.error.message,
"OAuth login is only supported for streamable HTTP servers."
);
Ok(())
}

#[derive(Clone)]
struct AppsServerState {
response: Arc<StdMutex<serde_json::Value>>,
Expand Down
16 changes: 16 additions & 0 deletions codex-rs/core/src/plugins/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1660,6 +1660,22 @@ pub fn plugin_telemetry_metadata_from_root(
}
}

pub fn load_plugin_mcp_servers(plugin_root: &Path) -> HashMap<String, McpServerConfig> {
let Some(manifest) = load_plugin_manifest(plugin_root) else {
return HashMap::new();
};

let mut mcp_servers = HashMap::new();
for mcp_config_path in plugin_mcp_config_paths(plugin_root, &manifest.paths) {
let plugin_mcp = load_mcp_servers_from_file(plugin_root, &mcp_config_path);
for (name, config) in plugin_mcp.mcp_servers {
mcp_servers.entry(name).or_insert(config);
}
}

mcp_servers
}

pub fn installed_plugin_telemetry_metadata(
codex_home: &Path,
plugin_id: &PluginId,
Expand Down
1 change: 1 addition & 0 deletions codex-rs/core/src/plugins/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub use manager::PluginsManager;
pub use manager::RemotePluginSyncResult;
pub use manager::installed_plugin_telemetry_metadata;
pub use manager::load_plugin_apps;
pub use manager::load_plugin_mcp_servers;
pub(crate) use manager::plugin_namespace_for_skill_path;
pub use manager::plugin_telemetry_metadata_from_root;
pub use manifest::PluginManifestInterface;
Expand Down
Loading