From eb849d943d94bdd50e18c4a486ef86b3498a1095 Mon Sep 17 00:00:00 2001 From: Josiah Grace Date: Mon, 13 Apr 2026 12:52:18 -0700 Subject: [PATCH] Add opt-in parallel MCP tool calls --- codex-rs/cli/src/mcp_cmd.rs | 1 + codex-rs/codex-mcp/src/mcp/mod.rs | 1 + codex-rs/codex-mcp/src/mcp/mod_tests.rs | 2 + .../codex-mcp/src/mcp/skill_dependencies.rs | 2 + .../src/mcp/skill_dependencies_tests.rs | 2 + .../src/mcp_connection_manager_tests.rs | 2 + codex-rs/config/src/mcp_edit.rs | 3 + codex-rs/config/src/mcp_edit_tests.rs | 2 + codex-rs/config/src/mcp_types.rs | 8 + codex-rs/config/src/mcp_types_tests.rs | 33 +++ codex-rs/core/config.schema.json | 4 + codex-rs/core/src/codex.rs | 15 +- codex-rs/core/src/codex_tests.rs | 2 + codex-rs/core/src/config/config_tests.rs | 16 ++ codex-rs/core/src/config/edit.rs | 3 + codex-rs/core/src/config/edit_tests.rs | 8 + codex-rs/core/src/mcp_skill_dependencies.rs | 2 + codex-rs/core/src/plugins/manager_tests.rs | 4 + codex-rs/core/src/tools/code_mode/mod.rs | 18 +- codex-rs/core/src/tools/js_repl/mod.rs | 4 +- codex-rs/core/src/tools/parallel.rs | 2 +- codex-rs/core/src/tools/router.rs | 17 +- codex-rs/core/src/tools/router_tests.rs | 64 ++++- codex-rs/core/src/tools/spec_tests.rs | 1 + codex-rs/core/tests/suite/code_mode.rs | 1 + codex-rs/core/tests/suite/rmcp_client.rs | 269 ++++++++++++++++++ codex-rs/core/tests/suite/search_tool.rs | 1 + codex-rs/core/tests/suite/sqlite_state.rs | 1 + codex-rs/core/tests/suite/truncation.rs | 3 + .../rmcp-client/src/bin/test_stdio_server.rs | 186 ++++++++++++ docs/config.md | 14 + 31 files changed, 681 insertions(+), 10 deletions(-) diff --git a/codex-rs/cli/src/mcp_cmd.rs b/codex-rs/cli/src/mcp_cmd.rs index 4aaa322ed651..b8e3fc670d63 100644 --- a/codex-rs/cli/src/mcp_cmd.rs +++ b/codex-rs/cli/src/mcp_cmd.rs @@ -299,6 +299,7 @@ async fn run_add(config_overrides: &CliConfigOverrides, add_args: AddArgs) -> Re transport: transport.clone(), enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, diff --git a/codex-rs/codex-mcp/src/mcp/mod.rs b/codex-rs/codex-mcp/src/mcp/mod.rs index ba0442957878..d53532b95127 100644 --- a/codex-rs/codex-mcp/src/mcp/mod.rs +++ b/codex-rs/codex-mcp/src/mcp/mod.rs @@ -272,6 +272,7 @@ fn codex_apps_mcp_server_config(config: &McpConfig, auth: Option<&CodexAuth>) -> }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: Some(Duration::from_secs(30)), tool_timeout_sec: None, diff --git a/codex-rs/codex-mcp/src/mcp/mod_tests.rs b/codex-rs/codex-mcp/src/mcp/mod_tests.rs index 8dc29bbf98f1..eaffbb8a5d38 100644 --- a/codex-rs/codex-mcp/src/mcp/mod_tests.rs +++ b/codex-rs/codex-mcp/src/mcp/mod_tests.rs @@ -195,6 +195,7 @@ async fn effective_mcp_servers_preserve_user_servers_and_add_codex_apps() { }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, @@ -216,6 +217,7 @@ async fn effective_mcp_servers_preserve_user_servers_and_add_codex_apps() { }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, diff --git a/codex-rs/codex-mcp/src/mcp/skill_dependencies.rs b/codex-rs/codex-mcp/src/mcp/skill_dependencies.rs index aa26fd20fe70..aa0c4d4e7a20 100644 --- a/codex-rs/codex-mcp/src/mcp/skill_dependencies.rs +++ b/codex-rs/codex-mcp/src/mcp/skill_dependencies.rs @@ -121,6 +121,7 @@ fn mcp_dependency_to_server_config( }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, @@ -147,6 +148,7 @@ fn mcp_dependency_to_server_config( }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, diff --git a/codex-rs/codex-mcp/src/mcp/skill_dependencies_tests.rs b/codex-rs/codex-mcp/src/mcp/skill_dependencies_tests.rs index 7a211bacbc6e..0fe2856f0165 100644 --- a/codex-rs/codex-mcp/src/mcp/skill_dependencies_tests.rs +++ b/codex-rs/codex-mcp/src/mcp/skill_dependencies_tests.rs @@ -41,6 +41,7 @@ fn collect_missing_respects_canonical_installed_key() { }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, @@ -91,6 +92,7 @@ fn collect_missing_dedupes_by_canonical_key_but_preserves_original_name() { }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, diff --git a/codex-rs/codex-mcp/src/mcp_connection_manager_tests.rs b/codex-rs/codex-mcp/src/mcp_connection_manager_tests.rs index 2c0d6fda6863..bfa50c4182bd 100644 --- a/codex-rs/codex-mcp/src/mcp_connection_manager_tests.rs +++ b/codex-rs/codex-mcp/src/mcp_connection_manager_tests.rs @@ -757,6 +757,7 @@ fn mcp_init_error_display_prompts_for_github_pat() { }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, @@ -806,6 +807,7 @@ fn mcp_init_error_display_reports_generic_errors() { }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, diff --git a/codex-rs/config/src/mcp_edit.rs b/codex-rs/config/src/mcp_edit.rs index f528751f26c9..965f86936281 100644 --- a/codex-rs/config/src/mcp_edit.rs +++ b/codex-rs/config/src/mcp_edit.rs @@ -177,6 +177,9 @@ fn serialize_mcp_server(config: &McpServerConfig) -> TomlItem { if config.required { entry["required"] = value(true); } + if config.supports_parallel_tool_calls { + entry["supports_parallel_tool_calls"] = value(true); + } if let Some(timeout) = config.startup_timeout_sec { entry["startup_timeout_sec"] = value(timeout.as_secs_f64()); } diff --git a/codex-rs/config/src/mcp_edit_tests.rs b/codex-rs/config/src/mcp_edit_tests.rs index 3a8eddee018e..38ab0852fe3d 100644 --- a/codex-rs/config/src/mcp_edit_tests.rs +++ b/codex-rs/config/src/mcp_edit_tests.rs @@ -24,6 +24,7 @@ async fn replace_mcp_servers_serializes_per_tool_approval_overrides() -> anyhow: }, enabled: true, required: false, + supports_parallel_tool_calls: true, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, @@ -59,6 +60,7 @@ async fn replace_mcp_servers_serializes_per_tool_approval_overrides() -> anyhow: serialized, r#"[mcp_servers.docs] command = "docs-server" +supports_parallel_tool_calls = true [mcp_servers.docs.tools] diff --git a/codex-rs/config/src/mcp_types.rs b/codex-rs/config/src/mcp_types.rs index 52cf71b49b17..c00988c6ffcc 100644 --- a/codex-rs/config/src/mcp_types.rs +++ b/codex-rs/config/src/mcp_types.rs @@ -69,6 +69,10 @@ pub struct McpServerConfig { #[serde(default, skip_serializing_if = "std::ops::Not::not")] pub required: bool, + /// When `true`, every tool from this server is advertised as safe for parallel tool calls. + #[serde(default, skip_serializing_if = "std::ops::Not::not")] + pub supports_parallel_tool_calls: bool, + /// Reason this server was disabled after applying requirements. #[serde(skip)] pub disabled_reason: Option, @@ -146,6 +150,8 @@ pub struct RawMcpServerConfig { #[serde(default)] pub required: Option, #[serde(default)] + pub supports_parallel_tool_calls: Option, + #[serde(default)] pub enabled_tools: Option>, #[serde(default)] pub disabled_tools: Option>, @@ -180,6 +186,7 @@ impl TryFrom for McpServerConfig { tool_timeout_sec, enabled, required, + supports_parallel_tool_calls, enabled_tools, disabled_tools, scopes, @@ -243,6 +250,7 @@ impl TryFrom for McpServerConfig { tool_timeout_sec, enabled: enabled.unwrap_or_else(default_enabled), required: required.unwrap_or_default(), + supports_parallel_tool_calls: supports_parallel_tool_calls.unwrap_or_default(), disabled_reason: None, enabled_tools, disabled_tools, diff --git a/codex-rs/config/src/mcp_types_tests.rs b/codex-rs/config/src/mcp_types_tests.rs index 694314e6e2d4..6b9fb16e908b 100644 --- a/codex-rs/config/src/mcp_types_tests.rs +++ b/codex-rs/config/src/mcp_types_tests.rs @@ -245,6 +245,38 @@ fn deserialize_server_config_with_tool_filters() { assert_eq!(cfg.disabled_tools, Some(vec!["blocked".to_string()])); } +#[test] +fn deserialize_server_config_with_parallel_tool_calls() { + let cfg: McpServerConfig = toml::from_str( + r#" + command = "echo" + supports_parallel_tool_calls = true + "#, + ) + .expect("should deserialize supports_parallel_tool_calls"); + + assert!(cfg.supports_parallel_tool_calls); +} + +#[test] +fn serialize_round_trips_server_config_with_parallel_tool_calls() { + let cfg: McpServerConfig = toml::from_str( + r#" + command = "echo" + supports_parallel_tool_calls = true + tool_timeout_sec = 2.0 + "#, + ) + .expect("should deserialize supports_parallel_tool_calls"); + + let serialized = toml::to_string(&cfg).expect("should serialize MCP config"); + assert!(serialized.contains("supports_parallel_tool_calls = true")); + + let round_tripped: McpServerConfig = + toml::from_str(&serialized).expect("should deserialize serialized MCP config"); + assert_eq!(round_tripped, cfg); +} + #[test] fn deserialize_ignores_unknown_server_fields() { let cfg: McpServerConfig = toml::from_str( @@ -267,6 +299,7 @@ fn deserialize_ignores_unknown_server_fields() { }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, diff --git a/codex-rs/core/config.schema.json b/codex-rs/core/config.schema.json index bd74c46595e6..f47fee612aa9 100644 --- a/codex-rs/core/config.schema.json +++ b/codex-rs/core/config.schema.json @@ -1522,6 +1522,10 @@ "format": "double", "type": "number" }, + "supports_parallel_tool_calls": { + "default": null, + "type": "boolean" + }, "tool_timeout_sec": { "default": null, "format": "double", diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 0516f0cda443..10d7e1ac848d 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -7127,11 +7127,24 @@ pub(crate) async fn built_tools( ); let direct_mcp_tools = has_mcp_servers.then_some(mcp_tool_exposure.direct_tools); + let parallel_mcp_server_names = turn_context + .config + .mcp_servers + .get() + .iter() + .filter_map(|(server_name, server_config)| { + server_config + .supports_parallel_tool_calls + .then_some(server_name.clone()) + }) + .collect::>(); + Ok(Arc::new(ToolRouter::from_config( &turn_context.tools_config, ToolRouterParams { - deferred_mcp_tools: mcp_tool_exposure.deferred_tools, mcp_tools: direct_mcp_tools, + deferred_mcp_tools: mcp_tool_exposure.deferred_tools, + parallel_mcp_server_names, discoverable_tools, dynamic_tools: turn_context.dynamic_tools.as_slice(), }, diff --git a/codex-rs/core/src/codex_tests.rs b/codex-rs/core/src/codex_tests.rs index 5648fffd7c72..34d1b4a62586 100644 --- a/codex-rs/core/src/codex_tests.rs +++ b/codex-rs/core/src/codex_tests.rs @@ -312,6 +312,7 @@ fn test_tool_runtime(session: Arc, turn_context: Arc) -> T crate::tools::router::ToolRouterParams { mcp_tools: None, deferred_mcp_tools: None, + parallel_mcp_server_names: HashSet::new(), discoverable_tools: None, dynamic_tools: turn_context.dynamic_tools.as_slice(), }, @@ -5353,6 +5354,7 @@ async fn fatal_tool_error_stops_turn_and_reports_error() { crate::tools::router::ToolRouterParams { deferred_mcp_tools, mcp_tools: Some(tools), + parallel_mcp_server_names: HashSet::new(), discoverable_tools: None, dynamic_tools: turn_context.dynamic_tools.as_slice(), }, diff --git a/codex-rs/core/src/config/config_tests.rs b/codex-rs/core/src/config/config_tests.rs index 079f0653cd45..b10c62ed14f2 100644 --- a/codex-rs/core/src/config/config_tests.rs +++ b/codex-rs/core/src/config/config_tests.rs @@ -83,6 +83,7 @@ fn stdio_mcp(command: &str) -> McpServerConfig { }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, @@ -104,6 +105,7 @@ fn http_mcp(url: &str) -> McpServerConfig { }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, @@ -2011,6 +2013,7 @@ async fn replace_mcp_servers_round_trips_entries() -> anyhow::Result<()> { }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: Some(Duration::from_secs(3)), tool_timeout_sec: Some(Duration::from_secs(5)), @@ -2257,6 +2260,7 @@ async fn replace_mcp_servers_serializes_env_sorted() -> anyhow::Result<()> { }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, @@ -2330,6 +2334,7 @@ async fn replace_mcp_servers_serializes_env_vars() -> anyhow::Result<()> { }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, @@ -2383,6 +2388,7 @@ async fn replace_mcp_servers_serializes_cwd() -> anyhow::Result<()> { }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, @@ -2434,6 +2440,7 @@ async fn replace_mcp_servers_streamable_http_serializes_bearer_token() -> anyhow }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: Some(Duration::from_secs(2)), tool_timeout_sec: None, @@ -2501,6 +2508,7 @@ async fn replace_mcp_servers_streamable_http_serializes_custom_headers() -> anyh }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: Some(Duration::from_secs(2)), tool_timeout_sec: None, @@ -2580,6 +2588,7 @@ async fn replace_mcp_servers_streamable_http_removes_optional_sections() -> anyh }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: Some(Duration::from_secs(2)), tool_timeout_sec: None, @@ -2612,6 +2621,7 @@ async fn replace_mcp_servers_streamable_http_removes_optional_sections() -> anyh }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, @@ -2679,6 +2689,7 @@ async fn replace_mcp_servers_streamable_http_isolates_headers_between_servers() }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: Some(Duration::from_secs(2)), tool_timeout_sec: None, @@ -2701,6 +2712,7 @@ async fn replace_mcp_servers_streamable_http_isolates_headers_between_servers() }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, @@ -2786,6 +2798,7 @@ async fn replace_mcp_servers_serializes_disabled_flag() -> anyhow::Result<()> { }, enabled: false, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, @@ -2833,6 +2846,7 @@ async fn replace_mcp_servers_serializes_required_flag() -> anyhow::Result<()> { }, enabled: true, required: true, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, @@ -2880,6 +2894,7 @@ async fn replace_mcp_servers_serializes_tool_filters() -> anyhow::Result<()> { }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, @@ -2931,6 +2946,7 @@ async fn replace_mcp_servers_streamable_http_serializes_oauth_resource() -> anyh }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, diff --git a/codex-rs/core/src/config/edit.rs b/codex-rs/core/src/config/edit.rs index 0f1189b22dbc..443d00c93d27 100644 --- a/codex-rs/core/src/config/edit.rs +++ b/codex-rs/core/src/config/edit.rs @@ -225,6 +225,9 @@ mod document_helpers { if config.required { entry["required"] = value(true); } + if config.supports_parallel_tool_calls { + entry["supports_parallel_tool_calls"] = value(true); + } if let Some(timeout) = config.startup_timeout_sec { entry["startup_timeout_sec"] = value(timeout.as_secs_f64()); } diff --git a/codex-rs/core/src/config/edit_tests.rs b/codex-rs/core/src/config/edit_tests.rs index af1251b34f41..c9288be30b56 100644 --- a/codex-rs/core/src/config/edit_tests.rs +++ b/codex-rs/core/src/config/edit_tests.rs @@ -577,6 +577,7 @@ fn blocking_replace_mcp_servers_round_trips() { }, enabled: true, required: false, + supports_parallel_tool_calls: true, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, @@ -603,6 +604,7 @@ fn blocking_replace_mcp_servers_round_trips() { }, enabled: false, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: Some(std::time::Duration::from_secs(5)), tool_timeout_sec: None, @@ -638,6 +640,7 @@ Z-Header = \"z\" command = \"cmd\" args = [\"--flag\"] env_vars = [\"FOO\"] +supports_parallel_tool_calls = true enabled_tools = [\"one\", \"two\"] [mcp_servers.stdio.env] @@ -665,6 +668,7 @@ fn blocking_replace_mcp_servers_serializes_tool_approval_overrides() { }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, @@ -725,6 +729,7 @@ foo = { command = "cmd" } }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, @@ -776,6 +781,7 @@ foo = { command = "cmd" } # keep me }, enabled: false, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, @@ -826,6 +832,7 @@ foo = { command = "cmd", args = ["--flag"] } # keep me }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, @@ -877,6 +884,7 @@ foo = { command = "cmd" } }, enabled: false, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, diff --git a/codex-rs/core/src/mcp_skill_dependencies.rs b/codex-rs/core/src/mcp_skill_dependencies.rs index c4e302307f22..536c4eb4b808 100644 --- a/codex-rs/core/src/mcp_skill_dependencies.rs +++ b/codex-rs/core/src/mcp_skill_dependencies.rs @@ -364,6 +364,7 @@ fn mcp_dependency_to_server_config( }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, @@ -390,6 +391,7 @@ fn mcp_dependency_to_server_config( }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, diff --git a/codex-rs/core/src/plugins/manager_tests.rs b/codex-rs/core/src/plugins/manager_tests.rs index b7e0117eed13..3b06a08a9fa5 100644 --- a/codex-rs/core/src/plugins/manager_tests.rs +++ b/codex-rs/core/src/plugins/manager_tests.rs @@ -179,6 +179,7 @@ fn load_plugins_loads_default_skills_and_mcp_servers() { }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, @@ -510,6 +511,7 @@ fn load_plugins_uses_manifest_configured_component_paths() { }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, @@ -617,6 +619,7 @@ fn load_plugins_ignores_manifest_component_paths_without_dot_slash() { }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, @@ -772,6 +775,7 @@ fn capability_index_filters_inactive_and_zero_capability_plugins() { }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: None, tool_timeout_sec: None, diff --git a/codex-rs/core/src/tools/code_mode/mod.rs b/codex-rs/core/src/tools/code_mode/mod.rs index 99c8ec0e0d87..de3a1fa33046 100644 --- a/codex-rs/core/src/tools/code_mode/mod.rs +++ b/codex-rs/core/src/tools/code_mode/mod.rs @@ -2,6 +2,7 @@ mod execute_handler; mod response_adapter; mod wait_handler; +use std::collections::HashSet; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; @@ -251,7 +252,7 @@ pub(super) async fn build_enabled_tools( async fn build_nested_router(exec: &ExecContext) -> ToolRouter { let nested_tools_config = exec.turn.tools_config.for_code_mode_nested_tools(); - let mcp_tools = exec + let listed_mcp_tools = exec .session .services .mcp_connection_manager @@ -259,12 +260,25 @@ async fn build_nested_router(exec: &ExecContext) -> ToolRouter { .await .list_all_tools() .await; + let parallel_mcp_server_names = exec + .turn + .config + .mcp_servers + .get() + .iter() + .filter_map(|(server_name, server_config)| { + server_config + .supports_parallel_tool_calls + .then_some(server_name.clone()) + }) + .collect::>(); ToolRouter::from_config( &nested_tools_config, ToolRouterParams { deferred_mcp_tools: None, - mcp_tools: Some(mcp_tools), + mcp_tools: Some(listed_mcp_tools), + parallel_mcp_server_names, discoverable_tools: None, dynamic_tools: exec.turn.dynamic_tools.as_slice(), }, diff --git a/codex-rs/core/src/tools/js_repl/mod.rs b/codex-rs/core/src/tools/js_repl/mod.rs index e307f9718cd3..9e2e88a5aad8 100644 --- a/codex-rs/core/src/tools/js_repl/mod.rs +++ b/codex-rs/core/src/tools/js_repl/mod.rs @@ -1561,12 +1561,14 @@ impl JsReplManager { .await .list_all_tools() .await; - let router = ToolRouter::from_config( &exec.turn.tools_config, crate::tools::router::ToolRouterParams { deferred_mcp_tools: None, mcp_tools: Some(mcp_tools), + // JS REPL dispatches nested tool calls directly, not through + // `ToolCallRuntime`'s parallel scheduling lock. + parallel_mcp_server_names: std::collections::HashSet::new(), discoverable_tools: None, dynamic_tools: exec.turn.dynamic_tools.as_slice(), }, diff --git a/codex-rs/core/src/tools/parallel.rs b/codex-rs/core/src/tools/parallel.rs index a0818dfc8307..a1965db5cce4 100644 --- a/codex-rs/core/src/tools/parallel.rs +++ b/codex-rs/core/src/tools/parallel.rs @@ -78,7 +78,7 @@ impl ToolCallRuntime { source: ToolCallSource, cancellation_token: CancellationToken, ) -> impl std::future::Future> { - let supports_parallel = self.router.tool_supports_parallel(&call.tool_name); + let supports_parallel = self.router.tool_supports_parallel(&call); let router = Arc::clone(&self.router); let session = Arc::clone(&self.session); let turn = Arc::clone(&self.turn_context); diff --git a/codex-rs/core/src/tools/router.rs b/codex-rs/core/src/tools/router.rs index af9d27858557..645650525cc0 100644 --- a/codex-rs/core/src/tools/router.rs +++ b/codex-rs/core/src/tools/router.rs @@ -20,6 +20,7 @@ use codex_tools::ToolName; use codex_tools::ToolSpec; use codex_tools::ToolsConfig; use std::collections::HashMap; +use std::collections::HashSet; use std::sync::Arc; use tracing::instrument; @@ -36,11 +37,13 @@ pub struct ToolRouter { registry: ToolRegistry, specs: Vec, model_visible_specs: Vec, + parallel_mcp_server_names: HashSet, } pub(crate) struct ToolRouterParams<'a> { pub(crate) mcp_tools: Option>, pub(crate) deferred_mcp_tools: Option>, + pub(crate) parallel_mcp_server_names: HashSet, pub(crate) discoverable_tools: Option>, pub(crate) dynamic_tools: &'a [DynamicToolSpec], } @@ -50,6 +53,7 @@ impl ToolRouter { let ToolRouterParams { mcp_tools, deferred_mcp_tools, + parallel_mcp_server_names, discoverable_tools, dynamic_tools, } = params; @@ -83,6 +87,7 @@ impl ToolRouter { registry, specs, model_visible_specs, + parallel_mcp_server_names, } } @@ -104,7 +109,7 @@ impl ToolRouter { .map(|config| config.spec.clone()) } - pub fn tool_supports_parallel(&self, tool_name: &ToolName) -> bool { + fn configured_tool_supports_parallel(&self, tool_name: &ToolName) -> bool { tool_name.namespace.is_none() && self .specs @@ -113,6 +118,16 @@ impl ToolRouter { .any(|config| config.name() == tool_name.name.as_str()) } + pub fn tool_supports_parallel(&self, call: &ToolCall) -> bool { + match &call.payload { + // MCP parallel support is configured per server, including for deferred + // tools that may not have a matching spec entry. Use the parsed payload + // server so similarly named servers/tools cannot collide. + ToolPayload::Mcp { server, .. } => self.parallel_mcp_server_names.contains(server), + _ => self.configured_tool_supports_parallel(&call.tool_name), + } + } + #[instrument(level = "trace", skip_all, err)] pub async fn build_tool_call( session: &Session, diff --git a/codex-rs/core/src/tools/router_tests.rs b/codex-rs/core/src/tools/router_tests.rs index 0478ebca6436..f4c0fb7c5ae6 100644 --- a/codex-rs/core/src/tools/router_tests.rs +++ b/codex-rs/core/src/tools/router_tests.rs @@ -1,3 +1,4 @@ +use std::collections::HashSet; use std::sync::Arc; use crate::codex::make_session_and_context; @@ -32,6 +33,7 @@ async fn js_repl_tools_only_blocks_direct_tool_calls() -> anyhow::Result<()> { ToolRouterParams { deferred_mcp_tools, mcp_tools: Some(mcp_tools), + parallel_mcp_server_names: HashSet::new(), discoverable_tools: None, dynamic_tools: turn.dynamic_tools.as_slice(), }, @@ -84,6 +86,7 @@ async fn js_repl_tools_only_allows_js_repl_source_calls() -> anyhow::Result<()> ToolRouterParams { deferred_mcp_tools, mcp_tools: Some(mcp_tools), + parallel_mcp_server_names: HashSet::new(), discoverable_tools: None, dynamic_tools: turn.dynamic_tools.as_slice(), }, @@ -129,6 +132,7 @@ async fn js_repl_tools_only_blocks_namespaced_js_repl_tool() -> anyhow::Result<( ToolRouterParams { deferred_mcp_tools: None, mcp_tools: None, + parallel_mcp_server_names: HashSet::new(), discoverable_tools: None, dynamic_tools: turn.dynamic_tools.as_slice(), }, @@ -178,6 +182,7 @@ async fn parallel_support_does_not_match_namespaced_local_tool_names() -> anyhow ToolRouterParams { deferred_mcp_tools: None, mcp_tools: Some(mcp_tools), + parallel_mcp_server_names: HashSet::new(), discoverable_tools: None, dynamic_tools: turn.dynamic_tools.as_slice(), }, @@ -185,12 +190,24 @@ async fn parallel_support_does_not_match_namespaced_local_tool_names() -> anyhow let parallel_tool_name = ["shell", "local_shell", "exec_command", "shell_command"] .into_iter() - .find(|name| router.tool_supports_parallel(&ToolName::plain(*name))) + .find(|name| { + router.tool_supports_parallel(&ToolCall { + tool_name: ToolName::plain(*name), + call_id: "call-parallel-tool".to_string(), + payload: ToolPayload::Function { + arguments: "{}".to_string(), + }, + }) + }) .expect("test session should expose a parallel shell-like tool"); - assert!( - !router.tool_supports_parallel(&ToolName::namespaced("mcp__server__", parallel_tool_name)) - ); + assert!(!router.tool_supports_parallel(&ToolCall { + tool_name: ToolName::namespaced("mcp__server__", parallel_tool_name), + call_id: "call-namespaced-tool".to_string(), + payload: ToolPayload::Function { + arguments: "{}".to_string(), + }, + })); Ok(()) } @@ -228,3 +245,42 @@ async fn build_tool_call_uses_namespace_for_registry_name() -> anyhow::Result<() Ok(()) } + +#[tokio::test] +async fn mcp_parallel_support_uses_exact_payload_server() -> anyhow::Result<()> { + let (_, turn) = make_session_and_context().await; + let router = ToolRouter::from_config( + &turn.tools_config, + ToolRouterParams { + deferred_mcp_tools: None, + mcp_tools: None, + parallel_mcp_server_names: HashSet::from(["echo".to_string()]), + discoverable_tools: None, + dynamic_tools: turn.dynamic_tools.as_slice(), + }, + ); + + let deferred_call = ToolCall { + tool_name: ToolName::namespaced("mcp__echo__", "query_with_delay"), + call_id: "call-deferred".to_string(), + payload: ToolPayload::Mcp { + server: "echo".to_string(), + tool: "query_with_delay".to_string(), + raw_arguments: "{}".to_string(), + }, + }; + assert!(router.tool_supports_parallel(&deferred_call)); + + let different_server_call = ToolCall { + tool_name: ToolName::namespaced("mcp__hello_echo__", "query_with_delay"), + call_id: "call-other-server".to_string(), + payload: ToolPayload::Mcp { + server: "hello_echo".to_string(), + tool: "query_with_delay".to_string(), + raw_arguments: "{}".to_string(), + }, + }; + assert!(!router.tool_supports_parallel(&different_server_call)); + + Ok(()) +} diff --git a/codex-rs/core/src/tools/spec_tests.rs b/codex-rs/core/src/tools/spec_tests.rs index b0195e4563d5..b0fe3f843611 100644 --- a/codex-rs/core/src/tools/spec_tests.rs +++ b/codex-rs/core/src/tools/spec_tests.rs @@ -314,6 +314,7 @@ fn assert_model_tools( ToolRouterParams { mcp_tools: None, deferred_mcp_tools: None, + parallel_mcp_server_names: std::collections::HashSet::new(), discoverable_tools: None, dynamic_tools: &[], }, diff --git a/codex-rs/core/tests/suite/code_mode.rs b/codex-rs/core/tests/suite/code_mode.rs index 54da2f4fe4ed..0c65158d3cf2 100644 --- a/codex-rs/core/tests/suite/code_mode.rs +++ b/codex-rs/core/tests/suite/code_mode.rs @@ -200,6 +200,7 @@ async fn run_code_mode_turn_with_rmcp( }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: Some(Duration::from_secs(10)), tool_timeout_sec: None, diff --git a/codex-rs/core/tests/suite/rmcp_client.rs b/codex-rs/core/tests/suite/rmcp_client.rs index 40d144223827..b0ca1d75612c 100644 --- a/codex-rs/core/tests/suite/rmcp_client.rs +++ b/codex-rs/core/tests/suite/rmcp_client.rs @@ -75,6 +75,12 @@ fn assert_wall_time_header(output: &str) { assert_eq!(marker, "Output:"); } +#[derive(Debug, PartialEq, Eq)] +enum McpCallEvent { + Begin(String), + End(String), +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] #[serial(mcp_test_value)] async fn stdio_server_round_trip() -> anyhow::Result<()> { @@ -125,6 +131,7 @@ async fn stdio_server_round_trip() -> anyhow::Result<()> { }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: Some(Duration::from_secs(10)), tool_timeout_sec: None, @@ -230,6 +237,263 @@ async fn stdio_server_round_trip() -> anyhow::Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn stdio_mcp_parallel_tool_calls_default_false_runs_serially() -> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = responses::start_mock_server().await; + + let first_call_id = "sync-serial-1"; + let second_call_id = "sync-serial-2"; + let server_name = "rmcp"; + let tool_name = format!("mcp__{server_name}__sync"); + let args = json!({ "sleep_after_ms": 100 }).to_string(); + + mount_sse_once( + &server, + responses::sse(vec![ + responses::ev_response_created("resp-1"), + responses::ev_function_call(first_call_id, &tool_name, &args), + responses::ev_function_call(second_call_id, &tool_name, &args), + responses::ev_completed("resp-1"), + ]), + ) + .await; + let final_mock = mount_sse_once( + &server, + responses::sse(vec![ + responses::ev_assistant_message("msg-1", "rmcp sync tools completed successfully."), + responses::ev_completed("resp-2"), + ]), + ) + .await; + + let rmcp_test_server_bin = stdio_server_bin()?; + + let fixture = test_codex() + .with_config(move |config| { + let mut servers = config.mcp_servers.get().clone(); + servers.insert( + server_name.to_string(), + McpServerConfig { + transport: McpServerTransportConfig::Stdio { + command: rmcp_test_server_bin, + args: Vec::new(), + env: None, + env_vars: Vec::new(), + cwd: None, + }, + enabled: true, + required: false, + supports_parallel_tool_calls: false, + disabled_reason: None, + startup_timeout_sec: Some(Duration::from_secs(10)), + tool_timeout_sec: Some(Duration::from_secs(2)), + enabled_tools: None, + disabled_tools: None, + scopes: None, + oauth_resource: None, + tools: HashMap::new(), + }, + ); + config + .mcp_servers + .set(servers) + .expect("test mcp servers should accept any configuration"); + }) + .build(&server) + .await?; + let session_model = fixture.session_configured.model.clone(); + + fixture + .codex + .submit(Op::UserTurn { + items: vec![UserInput::Text { + text: "call the rmcp sync tool twice".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + cwd: fixture.cwd.path().to_path_buf(), + approval_policy: AskForApproval::Never, + approvals_reviewer: None, + sandbox_policy: SandboxPolicy::new_read_only_policy(), + model: session_model, + effort: None, + summary: None, + service_tier: None, + collaboration_mode: None, + personality: None, + }) + .await?; + + let mut call_events = Vec::new(); + while call_events.len() < 4 { + let event = wait_for_event(&fixture.codex, |ev| { + matches!( + ev, + EventMsg::McpToolCallBegin(_) | EventMsg::McpToolCallEnd(_) + ) + }) + .await; + match event { + EventMsg::McpToolCallBegin(begin) => { + call_events.push(McpCallEvent::Begin(begin.call_id)); + } + EventMsg::McpToolCallEnd(end) => { + call_events.push(McpCallEvent::End(end.call_id)); + } + _ => unreachable!("event guard guarantees MCP call events"), + } + } + + let event_index = |needle: McpCallEvent| { + call_events + .iter() + .position(|event| event == &needle) + .expect("expected MCP call event") + }; + let first_begin = event_index(McpCallEvent::Begin(first_call_id.to_string())); + let first_end = event_index(McpCallEvent::End(first_call_id.to_string())); + let second_begin = event_index(McpCallEvent::Begin(second_call_id.to_string())); + let second_end = event_index(McpCallEvent::End(second_call_id.to_string())); + assert!( + first_end < second_begin || second_end < first_begin, + "default MCP tool calls should run serially; saw events: {call_events:?}" + ); + + wait_for_event(&fixture.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + let request = final_mock.single_request(); + for call_id in [first_call_id, second_call_id] { + let output_text = request + .function_call_output_text(call_id) + .expect("function_call_output present for rmcp sync call"); + let wrapped_payload = split_wall_time_wrapped_output(&output_text); + let output_json: Value = serde_json::from_str(wrapped_payload) + .expect("wrapped MCP output should preserve structured JSON"); + assert_eq!(output_json, json!({ "result": "ok" })); + } + + server.verify().await; + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn stdio_mcp_parallel_tool_calls_opt_in_runs_concurrently() -> anyhow::Result<()> { + skip_if_no_network!(Ok(())); + + let server = responses::start_mock_server().await; + + let first_call_id = "sync-1"; + let second_call_id = "sync-2"; + let server_name = "rmcp"; + let tool_name = format!("mcp__{server_name}__sync"); + let args = json!({ + "sleep_after_ms": 100, + "barrier": { + "id": "stdio-mcp-parallel-tool-calls", + "participants": 2, + "timeout_ms": 1_000 + } + }) + .to_string(); + + mount_sse_once( + &server, + responses::sse(vec![ + responses::ev_response_created("resp-1"), + responses::ev_function_call(first_call_id, &tool_name, &args), + responses::ev_function_call(second_call_id, &tool_name, &args), + responses::ev_completed("resp-1"), + ]), + ) + .await; + let final_mock = mount_sse_once( + &server, + responses::sse(vec![ + responses::ev_assistant_message("msg-1", "rmcp sync tools completed successfully."), + responses::ev_completed("resp-2"), + ]), + ) + .await; + + let rmcp_test_server_bin = stdio_server_bin()?; + + let fixture = test_codex() + .with_config(move |config| { + let mut servers = config.mcp_servers.get().clone(); + servers.insert( + server_name.to_string(), + McpServerConfig { + transport: McpServerTransportConfig::Stdio { + command: rmcp_test_server_bin, + args: Vec::new(), + env: None, + env_vars: Vec::new(), + cwd: None, + }, + enabled: true, + required: false, + supports_parallel_tool_calls: true, + disabled_reason: None, + startup_timeout_sec: Some(Duration::from_secs(10)), + tool_timeout_sec: Some(Duration::from_secs(2)), + enabled_tools: None, + disabled_tools: None, + scopes: None, + oauth_resource: None, + tools: HashMap::new(), + }, + ); + config + .mcp_servers + .set(servers) + .expect("test mcp servers should accept any configuration"); + }) + .build(&server) + .await?; + let session_model = fixture.session_configured.model.clone(); + + fixture + .codex + .submit(Op::UserTurn { + items: vec![UserInput::Text { + text: "call the rmcp sync tool twice".into(), + text_elements: Vec::new(), + }], + final_output_json_schema: None, + cwd: fixture.cwd.path().to_path_buf(), + approval_policy: AskForApproval::Never, + approvals_reviewer: None, + sandbox_policy: SandboxPolicy::new_read_only_policy(), + model: session_model, + effort: None, + summary: None, + service_tier: None, + collaboration_mode: None, + personality: None, + }) + .await?; + + wait_for_event(&fixture.codex, |ev| matches!(ev, EventMsg::TurnComplete(_))).await; + + let request = final_mock.single_request(); + for call_id in [first_call_id, second_call_id] { + let output_text = request + .function_call_output_text(call_id) + .expect("function_call_output present for rmcp sync call"); + let wrapped_payload = split_wall_time_wrapped_output(&output_text); + let output_json: Value = serde_json::from_str(wrapped_payload) + .expect("wrapped MCP output should preserve structured JSON"); + assert_eq!(output_json, json!({ "result": "ok" })); + } + + server.verify().await; + + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] #[serial(mcp_test_value)] async fn stdio_image_responses_round_trip() -> anyhow::Result<()> { @@ -282,6 +546,7 @@ async fn stdio_image_responses_round_trip() -> anyhow::Result<()> { }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: Some(Duration::from_secs(10)), tool_timeout_sec: None, @@ -514,6 +779,7 @@ async fn stdio_image_responses_are_sanitized_for_text_only_model() -> anyhow::Re }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: Some(Duration::from_secs(10)), tool_timeout_sec: None, @@ -637,6 +903,7 @@ async fn stdio_server_propagates_whitelisted_env_vars() -> anyhow::Result<()> { }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: Some(Duration::from_secs(10)), tool_timeout_sec: None, @@ -800,6 +1067,7 @@ async fn streamable_http_tool_call_round_trip() -> anyhow::Result<()> { }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: Some(Duration::from_secs(10)), tool_timeout_sec: None, @@ -1023,6 +1291,7 @@ async fn streamable_http_with_oauth_round_trip_impl() -> anyhow::Result<()> { }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: Some(Duration::from_secs(10)), tool_timeout_sec: None, diff --git a/codex-rs/core/tests/suite/search_tool.rs b/codex-rs/core/tests/suite/search_tool.rs index 62b3c60cc505..c52431665013 100644 --- a/codex-rs/core/tests/suite/search_tool.rs +++ b/codex-rs/core/tests/suite/search_tool.rs @@ -649,6 +649,7 @@ async fn tool_search_indexes_only_enabled_non_app_mcp_tools() -> Result<()> { disabled_tools: Some(vec!["image".to_string()]), scopes: None, oauth_resource: None, + supports_parallel_tool_calls: false, tools: HashMap::new(), }, ); diff --git a/codex-rs/core/tests/suite/sqlite_state.rs b/codex-rs/core/tests/suite/sqlite_state.rs index 763ef0db16b8..1c9d6a87e1ca 100644 --- a/codex-rs/core/tests/suite/sqlite_state.rs +++ b/codex-rs/core/tests/suite/sqlite_state.rs @@ -368,6 +368,7 @@ async fn mcp_call_marks_thread_memory_mode_polluted_when_configured() -> Result< }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: Some(Duration::from_secs(10)), tool_timeout_sec: None, diff --git a/codex-rs/core/tests/suite/truncation.rs b/codex-rs/core/tests/suite/truncation.rs index b6c4736a13c4..288e8f4bdb1a 100644 --- a/codex-rs/core/tests/suite/truncation.rs +++ b/codex-rs/core/tests/suite/truncation.rs @@ -372,6 +372,7 @@ async fn mcp_tool_call_output_exceeds_limit_truncated_for_model() -> Result<()> }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: Some(std::time::Duration::from_secs(10)), tool_timeout_sec: None, @@ -468,6 +469,7 @@ async fn mcp_image_output_preserves_image_and_no_text_summary() -> Result<()> { }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: Some(Duration::from_secs(10)), tool_timeout_sec: None, @@ -742,6 +744,7 @@ async fn mcp_tool_call_output_not_truncated_with_custom_limit() -> Result<()> { }, enabled: true, required: false, + supports_parallel_tool_calls: false, disabled_reason: None, startup_timeout_sec: Some(std::time::Duration::from_secs(10)), tool_timeout_sec: None, diff --git a/codex-rs/rmcp-client/src/bin/test_stdio_server.rs b/codex-rs/rmcp-client/src/bin/test_stdio_server.rs index efee52041c89..7295b6315993 100644 --- a/codex-rs/rmcp-client/src/bin/test_stdio_server.rs +++ b/codex-rs/rmcp-client/src/bin/test_stdio_server.rs @@ -1,6 +1,9 @@ use std::borrow::Cow; use std::collections::HashMap; +use std::collections::hash_map::Entry; use std::sync::Arc; +use std::sync::OnceLock; +use std::time::Duration; use rmcp::ErrorData as McpError; use rmcp::ServiceExt; @@ -25,7 +28,9 @@ use rmcp::model::Tool; use rmcp::model::ToolAnnotations; use serde::Deserialize; use serde_json::json; +use tokio::sync::Barrier; use tokio::task; +use tokio::time::sleep; #[derive(Clone)] struct TestToolServer { @@ -47,6 +52,7 @@ impl TestToolServer { let tools = vec![ Self::echo_tool(), Self::echo_dash_tool(), + Self::sync_tool(), Self::image_tool(), Self::image_scenario_tool(), ]; @@ -112,6 +118,50 @@ impl TestToolServer { tool } + fn sync_tool() -> Tool { + #[expect(clippy::expect_used)] + let schema: JsonObject = serde_json::from_value(json!({ + "type": "object", + "properties": { + "sleep_before_ms": { "type": "number" }, + "sleep_after_ms": { "type": "number" }, + "barrier": { + "type": "object", + "properties": { + "id": { "type": "string" }, + "participants": { "type": "number" }, + "timeout_ms": { "type": "number" } + }, + "required": ["id", "participants"], + "additionalProperties": false + } + }, + "additionalProperties": false + })) + .expect("sync tool schema should deserialize"); + + let mut tool = Tool::new( + Cow::Borrowed("sync"), + Cow::Borrowed( + "Synchronize concurrent test calls and optionally delay before or after the barrier.", + ), + Arc::new(schema), + ); + #[expect(clippy::expect_used)] + let output_schema: JsonObject = serde_json::from_value(json!({ + "type": "object", + "properties": { + "result": { "type": "string" } + }, + "required": ["result"], + "additionalProperties": false + })) + .expect("sync tool output schema should deserialize"); + tool.output_schema = Some(Arc::new(output_schema)); + tool.annotations = Some(ToolAnnotations::new().read_only(true)); + tool + } + fn image_tool() -> Tool { #[expect(clippy::expect_used)] let schema: JsonObject = serde_json::from_value(serde_json::json!({ @@ -227,6 +277,42 @@ struct EchoArgs { env_var: Option, } +const DEFAULT_SYNC_TIMEOUT_MS: u64 = 1_000; + +static SYNC_BARRIERS: OnceLock>> = + OnceLock::new(); + +struct SyncBarrierState { + barrier: Arc, + participants: usize, +} + +#[derive(Debug, Deserialize)] +struct SyncBarrierArgs { + id: String, + participants: usize, + #[serde(default = "default_sync_timeout_ms")] + timeout_ms: u64, +} + +#[derive(Debug, Deserialize)] +struct SyncArgs { + #[serde(default)] + sleep_before_ms: Option, + #[serde(default)] + sleep_after_ms: Option, + #[serde(default)] + barrier: Option, +} + +fn default_sync_timeout_ms() -> u64 { + DEFAULT_SYNC_TIMEOUT_MS +} + +fn sync_barrier_map() -> &'static tokio::sync::Mutex> { + SYNC_BARRIERS.get_or_init(|| tokio::sync::Mutex::new(HashMap::new())) +} + #[derive(Deserialize, Debug)] #[serde(rename_all = "snake_case")] /// Scenarios for `image_scenario`, intended to exercise Codex TUI handling of MCP image outputs. @@ -387,6 +473,10 @@ impl ServerHandler for TestToolServer { let args = Self::parse_call_args::(&request, "image_scenario")?; Self::image_scenario_result(args) } + "sync" => { + let args = Self::parse_call_args::(&request, "sync")?; + Self::sync_result(args).await + } other => Err(McpError::invalid_params( format!("unknown tool: {other}"), None, @@ -469,6 +559,102 @@ impl TestToolServer { Ok(CallToolResult::success(content)) } + + async fn sync_result(args: SyncArgs) -> Result { + if let Some(delay) = args.sleep_before_ms + && delay > 0 + { + sleep(Duration::from_millis(delay)).await; + } + + if let Some(barrier) = args.barrier { + wait_on_sync_barrier(barrier).await?; + } + + if let Some(delay) = args.sleep_after_ms + && delay > 0 + { + sleep(Duration::from_millis(delay)).await; + } + + Ok(CallToolResult { + content: Vec::new(), + structured_content: Some(json!({ "result": "ok" })), + is_error: Some(false), + meta: None, + }) + } +} + +async fn wait_on_sync_barrier(args: SyncBarrierArgs) -> Result<(), McpError> { + if args.participants == 0 { + return Err(McpError::invalid_params( + "barrier participants must be greater than zero", + None, + )); + } + + if args.timeout_ms == 0 { + return Err(McpError::invalid_params( + "barrier timeout must be greater than zero", + None, + )); + } + + let barrier_id = args.id.clone(); + let barrier = { + let mut map = sync_barrier_map().lock().await; + match map.entry(barrier_id.clone()) { + Entry::Occupied(entry) => { + let state = entry.get(); + if state.participants != args.participants { + let existing = state.participants; + return Err(McpError::invalid_params( + format!( + "barrier {barrier_id} already registered with {existing} participants" + ), + None, + )); + } + state.barrier.clone() + } + Entry::Vacant(entry) => { + let barrier = Arc::new(Barrier::new(args.participants)); + entry.insert(SyncBarrierState { + barrier: barrier.clone(), + participants: args.participants, + }); + barrier + } + } + }; + + let wait_result = + match tokio::time::timeout(Duration::from_millis(args.timeout_ms), barrier.wait()).await { + Ok(wait_result) => wait_result, + Err(_) => { + remove_sync_barrier_if_current(&barrier_id, &barrier).await; + return Err(McpError::invalid_params( + "sync barrier wait timed out", + None, + )); + } + }; + + if wait_result.is_leader() { + remove_sync_barrier_if_current(&barrier_id, &barrier).await; + } + + Ok(()) +} + +async fn remove_sync_barrier_if_current(barrier_id: &str, barrier: &Arc) { + let mut map = sync_barrier_map().lock().await; + if let Some(state) = map.get(barrier_id) + && Arc::ptr_eq(&state.barrier, barrier) + { + map.remove(barrier_id); + } } fn parse_data_url(url: &str) -> Option<(String, String)> { diff --git a/docs/config.md b/docs/config.md index 71f3548debb2..c314ce22837e 100644 --- a/docs/config.md +++ b/docs/config.md @@ -12,6 +12,20 @@ Codex can connect to MCP servers configured in `~/.codex/config.toml`. See the c - https://developers.openai.com/codex/config-reference +MCP tools default to serialized calls. To mark every tool exposed by one server +as eligible for parallel tool calls, set `supports_parallel_tool_calls` on that +server: + +```toml +[mcp_servers.docs] +command = "docs-server" +supports_parallel_tool_calls = true +``` + +Only enable parallel calls for MCP servers whose tools are safe to run at the +same time. If tools read and write shared state, files, databases, or external +resources, review those read/write race conditions before enabling this setting. + ## MCP tool approvals Codex stores per-tool approval overrides for custom MCP servers under