Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions codex-rs/app-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,16 +425,16 @@ impl CodexMessageProcessor {
self.thread_manager.skills_manager().clear_cache();
}

pub(crate) async fn maybe_start_curated_repo_sync_for_latest_config(&self) {
pub(crate) async fn maybe_start_plugin_startup_tasks_for_latest_config(&self) {
match self.load_latest_config(/*fallback_cwd*/ None).await {
Ok(config) => self
.thread_manager
.plugins_manager()
.maybe_start_curated_repo_sync_for_config(
.maybe_start_plugin_startup_tasks_for_config(
&config,
self.thread_manager.auth_manager(),
),
Err(err) => warn!("failed to load latest config for curated plugin sync: {err:?}"),
Err(err) => warn!("failed to load latest config for plugin startup tasks: {err:?}"),
}
}

Expand Down Expand Up @@ -5489,7 +5489,7 @@ impl CodexMessageProcessor {

if force_remote_sync {
match plugins_manager
.sync_plugins_from_remote(&config, auth.as_ref())
.sync_plugins_from_remote(&config, auth.as_ref(), /*additive_only*/ false)
.await
{
Ok(sync_result) => {
Expand Down
6 changes: 3 additions & 3 deletions codex-rs/app-server/src/message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ impl MessageProcessor {
// TODO(xl): Move into PluginManager once this no longer depends on config feature gating.
thread_manager
.plugins_manager()
.maybe_start_curated_repo_sync_for_config(&config, auth_manager.clone());
.maybe_start_plugin_startup_tasks_for_config(&config, auth_manager.clone());
let config_api = ConfigApi::new(
config.codex_home.clone(),
cli_overrides,
Expand Down Expand Up @@ -790,7 +790,7 @@ impl MessageProcessor {
Ok(response) => {
self.codex_message_processor.clear_plugin_related_caches();
self.codex_message_processor
.maybe_start_curated_repo_sync_for_latest_config()
.maybe_start_plugin_startup_tasks_for_latest_config()
.await;
self.outgoing.send_response(request_id, response).await;
}
Expand All @@ -807,7 +807,7 @@ impl MessageProcessor {
Ok(response) => {
self.codex_message_processor.clear_plugin_related_caches();
self.codex_message_processor
.maybe_start_curated_repo_sync_for_latest_config()
.maybe_start_plugin_startup_tasks_for_latest_config()
.await;
self.outgoing.send_response(request_id, response).await;
}
Expand Down
117 changes: 112 additions & 5 deletions codex-rs/app-server/tests/suite/v2/plugin_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use wiremock::matchers::path;

const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
const TEST_CURATED_PLUGIN_SHA: &str = "0123456789abcdef0123456789abcdef01234567";
const STARTUP_REMOTE_PLUGIN_SYNC_MARKER_FILE: &str = ".tmp/app-server-remote-plugin-sync-v1";

fn write_plugins_enabled_config(codex_home: &std::path::Path) -> std::io::Result<()> {
std::fs::write(
Expand Down Expand Up @@ -755,6 +756,91 @@ async fn plugin_list_force_remote_sync_reconciles_curated_plugin_state() -> Resu
Ok(())
}

#[tokio::test]
async fn app_server_startup_remote_plugin_sync_runs_once() -> Result<()> {
let codex_home = TempDir::new()?;
let server = MockServer::start().await;
write_plugin_sync_config(codex_home.path(), &format!("{}/backend-api/", server.uri()))?;
write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("chatgpt-token")
.account_id("account-123")
.chatgpt_user_id("user-123")
.chatgpt_account_id("account-123"),
AuthCredentialsStoreMode::File,
)?;
write_openai_curated_marketplace(codex_home.path(), &["linear"])?;

Mock::given(method("GET"))
.and(path("/backend-api/plugins/list"))
.and(header("authorization", "Bearer chatgpt-token"))
.and(header("chatgpt-account-id", "account-123"))
.respond_with(ResponseTemplate::new(200).set_body_string(
r#"[
{"id":"1","name":"linear","marketplace_name":"openai-curated","version":"1.0.0","enabled":true}
]"#,
))
.mount(&server)
.await;
Mock::given(method("GET"))
.and(path("/backend-api/plugins/featured"))
.and(header("authorization", "Bearer chatgpt-token"))
.and(header("chatgpt-account-id", "account-123"))
.respond_with(ResponseTemplate::new(200).set_body_string(r#"["linear@openai-curated"]"#))
.mount(&server)
.await;

let marker_path = codex_home
.path()
.join(STARTUP_REMOTE_PLUGIN_SYNC_MARKER_FILE);

{
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;

wait_for_path_exists(&marker_path).await?;
wait_for_remote_plugin_request_count(&server, "/plugins/list", 1).await?;
let request_id = mcp
.send_plugin_list_request(PluginListParams {
cwds: None,
force_remote_sync: false,
})
.await?;
let response: JSONRPCResponse = timeout(
DEFAULT_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let response: PluginListResponse = to_response(response)?;
let curated_marketplace = response
.marketplaces
.into_iter()
.find(|marketplace| marketplace.name == "openai-curated")
.expect("expected openai-curated marketplace entry");
assert_eq!(
curated_marketplace
.plugins
.into_iter()
.map(|plugin| (plugin.id, plugin.installed, plugin.enabled))
.collect::<Vec<_>>(),
vec![("linear@openai-curated".to_string(), true, true)]
);
wait_for_remote_plugin_request_count(&server, "/plugins/list", 1).await?;
}

let config = std::fs::read_to_string(codex_home.path().join("config.toml"))?;
assert!(config.contains(r#"[plugins."linear@openai-curated"]"#));

{
let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??;
}

tokio::time::sleep(Duration::from_millis(250)).await;
wait_for_remote_plugin_request_count(&server, "/plugins/list", 1).await?;
Ok(())
}

#[tokio::test]
async fn plugin_list_fetches_featured_plugin_ids_without_chatgpt_auth() -> Result<()> {
let codex_home = TempDir::new()?;
Expand Down Expand Up @@ -836,24 +922,32 @@ async fn plugin_list_uses_warmed_featured_plugin_ids_cache_on_first_request() ->
async fn wait_for_featured_plugin_request_count(
server: &MockServer,
expected_count: usize,
) -> Result<()> {
wait_for_remote_plugin_request_count(server, "/plugins/featured", expected_count).await
}

async fn wait_for_remote_plugin_request_count(
server: &MockServer,
path_suffix: &str,
expected_count: usize,
) -> Result<()> {
timeout(DEFAULT_TIMEOUT, async {
loop {
let Some(requests) = server.received_requests().await else {
bail!("wiremock did not record requests");
};
let featured_request_count = requests
let request_count = requests
.iter()
.filter(|request| {
request.method == "GET" && request.url.path().ends_with("/plugins/featured")
request.method == "GET" && request.url.path().ends_with(path_suffix)
})
.count();
if featured_request_count == expected_count {
if request_count == expected_count {
return Ok::<(), anyhow::Error>(());
}
if featured_request_count > expected_count {
if request_count > expected_count {
bail!(
"expected exactly {expected_count} /plugins/featured requests, got {featured_request_count}"
"expected exactly {expected_count} {path_suffix} requests, got {request_count}"
);
}
tokio::time::sleep(Duration::from_millis(10)).await;
Expand All @@ -863,6 +957,19 @@ async fn wait_for_featured_plugin_request_count(
Ok(())
}

async fn wait_for_path_exists(path: &std::path::Path) -> Result<()> {
timeout(DEFAULT_TIMEOUT, async {
loop {
if path.exists() {
return Ok::<(), anyhow::Error>(());
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
})
.await??;
Ok(())
}

fn write_installed_plugin(
codex_home: &TempDir,
marketplace_name: &str,
Expand Down
16 changes: 12 additions & 4 deletions codex-rs/core/src/plugins/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use super::remote::enable_remote_plugin;
use super::remote::fetch_remote_featured_plugin_ids;
use super::remote::fetch_remote_plugin_status;
use super::remote::uninstall_remote_plugin;
use super::startup_sync::start_startup_remote_plugin_sync_once;
use super::store::DEFAULT_PLUGIN_VERSION;
use super::store::PluginId;
use super::store::PluginIdError;
Expand Down Expand Up @@ -58,7 +59,6 @@ use std::sync::Arc;
use std::sync::RwLock;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Duration;
use std::time::Instant;
use toml_edit::value;
use tracing::info;
Expand All @@ -70,7 +70,8 @@ const DEFAULT_APP_CONFIG_FILE: &str = ".app.json";
pub const OPENAI_CURATED_MARKETPLACE_NAME: &str = "openai-curated";
static CURATED_REPO_SYNC_STARTED: AtomicBool = AtomicBool::new(false);
const MAX_CAPABILITY_SUMMARY_DESCRIPTION_LEN: usize = 1024;
const FEATURED_PLUGIN_IDS_CACHE_TTL: Duration = Duration::from_secs(60 * 60 * 3);
const FEATURED_PLUGIN_IDS_CACHE_TTL: std::time::Duration =
std::time::Duration::from_secs(60 * 60 * 3);

#[derive(Clone, PartialEq, Eq)]
struct FeaturedPluginIdsCacheKey {
Expand Down Expand Up @@ -774,6 +775,7 @@ impl PluginsManager {
&self,
config: &Config,
auth: Option<&CodexAuth>,
additive_only: bool,
) -> Result<RemotePluginSyncResult, PluginRemoteSyncError> {
if !config.features.enabled(Feature::Plugins) {
return Ok(RemotePluginSyncResult::default());
Expand Down Expand Up @@ -913,7 +915,7 @@ impl PluginsManager {
value: value(true),
});
}
} else {
} else if !additive_only {
if is_installed {
uninstalls.push(plugin_id);
}
Expand Down Expand Up @@ -1110,7 +1112,7 @@ impl PluginsManager {
})
}

pub fn maybe_start_curated_repo_sync_for_config(
pub fn maybe_start_plugin_startup_tasks_for_config(
self: &Arc<Self>,
config: &Config,
auth_manager: Arc<AuthManager>,
Expand Down Expand Up @@ -1138,6 +1140,12 @@ impl PluginsManager {
.collect::<Vec<_>>();
configured_curated_plugin_ids.sort_unstable_by_key(super::store::PluginId::as_key);
self.start_curated_repo_sync(configured_curated_plugin_ids);
start_startup_remote_plugin_sync_once(
Arc::clone(self),
self.codex_home.clone(),
config.clone(),
auth_manager.clone(),
);

let config = config.clone();
let manager = Arc::clone(self);
Expand Down
Loading
Loading