diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index c4c2799ca8d..e7b41c04cb5 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -409,6 +409,13 @@ enum EnsureConversationListenerResult { ConnectionClosed, } +#[derive(Clone, Copy, Debug, Eq, PartialEq)] +enum RefreshTokenRequestOutcome { + NotAttemptedOrSucceeded, + FailedTransiently, + FailedPermanently, +} + pub(crate) struct CodexMessageProcessorArgs { pub(crate) auth_manager: Arc, pub(crate) thread_manager: Arc, @@ -1338,13 +1345,19 @@ impl CodexMessageProcessor { } } - async fn refresh_token_if_requested(&self, do_refresh: bool) { + async fn refresh_token_if_requested(&self, do_refresh: bool) -> RefreshTokenRequestOutcome { if self.auth_manager.is_external_auth_active() { - return; + return RefreshTokenRequestOutcome::NotAttemptedOrSucceeded; } if do_refresh && let Err(err) = self.auth_manager.refresh_token().await { - tracing::warn!("failed to refresh token while getting account: {err}"); + let failed_reason = err.failed_reason(); + if failed_reason.is_none() { + tracing::warn!("failed to refresh token while getting account: {err}"); + return RefreshTokenRequestOutcome::FailedTransiently; + } + return RefreshTokenRequestOutcome::FailedPermanently; } + RefreshTokenRequestOutcome::NotAttemptedOrSucceeded } async fn get_auth_status(&self, request_id: ConnectionRequestId, params: GetAuthStatusParams) { @@ -1367,18 +1380,25 @@ impl CodexMessageProcessor { } else { match self.auth_manager.auth().await { Some(auth) => { + let permanent_refresh_failure = + self.auth_manager.refresh_failure_for_auth(&auth).is_some(); let auth_mode = auth.api_auth_mode(); - let (reported_auth_method, token_opt) = match auth.get_token() { - Ok(token) if !token.is_empty() => { - let tok = if include_token { Some(token) } else { None }; - (Some(auth_mode), tok) - } - Ok(_) => (None, None), - Err(err) => { - tracing::warn!("failed to get token for auth status: {err}"); - (None, None) - } - }; + let (reported_auth_method, token_opt) = + if include_token && permanent_refresh_failure { + (Some(auth_mode), None) + } else { + match auth.get_token() { + Ok(token) if !token.is_empty() => { + let tok = if include_token { Some(token) } else { None }; + (Some(auth_mode), tok) + } + Ok(_) => (None, None), + Err(err) => { + tracing::warn!("failed to get token for auth status: {err}"); + (None, None) + } + } + }; GetAuthStatusResponse { auth_method: reported_auth_method, auth_token: token_opt, diff --git a/codex-rs/app-server/tests/suite/auth.rs b/codex-rs/app-server/tests/suite/auth.rs index 68d0bcd95d5..ff50cd744ae 100644 --- a/codex-rs/app-server/tests/suite/auth.rs +++ b/codex-rs/app-server/tests/suite/auth.rs @@ -1,6 +1,10 @@ use anyhow::Result; +use app_test_support::ChatGptAuthFixture; use app_test_support::McpProcess; use app_test_support::to_response; +use app_test_support::write_chatgpt_auth; +use chrono::Duration; +use chrono::Utc; use codex_app_server_protocol::AuthMode; use codex_app_server_protocol::GetAuthStatusParams; use codex_app_server_protocol::GetAuthStatusResponse; @@ -8,10 +12,17 @@ use codex_app_server_protocol::JSONRPCError; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::LoginAccountResponse; use codex_app_server_protocol::RequestId; +use codex_core::auth::AuthCredentialsStoreMode; +use codex_core::auth::REFRESH_TOKEN_URL_OVERRIDE_ENV_VAR; use pretty_assertions::assert_eq; use std::path::Path; use tempfile::TempDir; use tokio::time::timeout; +use wiremock::Mock; +use wiremock::MockServer; +use wiremock::ResponseTemplate; +use wiremock::matchers::method; +use wiremock::matchers::path; const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); @@ -207,6 +218,288 @@ async fn get_auth_status_with_api_key_no_include_token() -> Result<()> { Ok(()) } +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn get_auth_status_with_api_key_refresh_requested() -> Result<()> { + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path())?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + login_with_api_key_via_request(&mut mcp, "sk-test-key").await?; + + let request_id = mcp + .send_get_auth_status_request(GetAuthStatusParams { + include_token: Some(true), + refresh_token: Some(true), + }) + .await?; + + let resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + let status: GetAuthStatusResponse = to_response(resp)?; + assert_eq!( + status, + GetAuthStatusResponse { + auth_method: Some(AuthMode::ApiKey), + auth_token: Some("sk-test-key".to_string()), + requires_openai_auth: Some(true), + } + ); + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn get_auth_status_omits_token_after_permanent_refresh_failure() -> Result<()> { + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path())?; + write_chatgpt_auth( + codex_home.path(), + ChatGptAuthFixture::new("stale-access-token") + .refresh_token("stale-refresh-token") + .account_id("acct_123") + .email("user@example.com") + .plan_type("pro"), + AuthCredentialsStoreMode::File, + )?; + + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/oauth/token")) + .respond_with(ResponseTemplate::new(401).set_body_json(serde_json::json!({ + "error": { + "code": "refresh_token_reused" + } + }))) + .expect(1) + .mount(&server) + .await; + + let refresh_url = format!("{}/oauth/token", server.uri()); + let mut mcp = McpProcess::new_with_env( + codex_home.path(), + &[ + ("OPENAI_API_KEY", None), + ( + REFRESH_TOKEN_URL_OVERRIDE_ENV_VAR, + Some(refresh_url.as_str()), + ), + ], + ) + .await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let request_id = mcp + .send_get_auth_status_request(GetAuthStatusParams { + include_token: Some(true), + refresh_token: Some(true), + }) + .await?; + + let resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + let status: GetAuthStatusResponse = to_response(resp)?; + assert_eq!( + status, + GetAuthStatusResponse { + auth_method: Some(AuthMode::Chatgpt), + auth_token: None, + requires_openai_auth: Some(true), + } + ); + + let second_request_id = mcp + .send_get_auth_status_request(GetAuthStatusParams { + include_token: Some(true), + refresh_token: Some(true), + }) + .await?; + + let second_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(second_request_id)), + ) + .await??; + let second_status: GetAuthStatusResponse = to_response(second_resp)?; + assert_eq!(second_status, status); + + server.verify().await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn get_auth_status_omits_token_after_proactive_refresh_failure() -> Result<()> { + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path())?; + write_chatgpt_auth( + codex_home.path(), + ChatGptAuthFixture::new("stale-access-token") + .refresh_token("stale-refresh-token") + .account_id("acct_123") + .email("user@example.com") + .plan_type("pro") + .last_refresh(Some(Utc::now() - Duration::days(9))), + AuthCredentialsStoreMode::File, + )?; + + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/oauth/token")) + .respond_with(ResponseTemplate::new(401).set_body_json(serde_json::json!({ + "error": { + "code": "refresh_token_reused" + } + }))) + .expect(2) + .mount(&server) + .await; + + let refresh_url = format!("{}/oauth/token", server.uri()); + let mut mcp = McpProcess::new_with_env( + codex_home.path(), + &[ + ("OPENAI_API_KEY", None), + ( + REFRESH_TOKEN_URL_OVERRIDE_ENV_VAR, + Some(refresh_url.as_str()), + ), + ], + ) + .await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let request_id = mcp + .send_get_auth_status_request(GetAuthStatusParams { + include_token: Some(true), + refresh_token: Some(false), + }) + .await?; + + let resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(request_id)), + ) + .await??; + let status: GetAuthStatusResponse = to_response(resp)?; + assert_eq!( + status, + GetAuthStatusResponse { + auth_method: Some(AuthMode::Chatgpt), + auth_token: None, + requires_openai_auth: Some(true), + } + ); + + server.verify().await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn get_auth_status_returns_token_after_proactive_refresh_recovery() -> Result<()> { + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path())?; + write_chatgpt_auth( + codex_home.path(), + ChatGptAuthFixture::new("stale-access-token") + .refresh_token("stale-refresh-token") + .account_id("acct_123") + .email("user@example.com") + .plan_type("pro") + .last_refresh(Some(Utc::now() - Duration::days(9))), + AuthCredentialsStoreMode::File, + )?; + + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/oauth/token")) + .respond_with(ResponseTemplate::new(401).set_body_json(serde_json::json!({ + "error": { + "code": "refresh_token_reused" + } + }))) + .expect(2) + .mount(&server) + .await; + + let refresh_url = format!("{}/oauth/token", server.uri()); + let mut mcp = McpProcess::new_with_env( + codex_home.path(), + &[ + ("OPENAI_API_KEY", None), + ( + REFRESH_TOKEN_URL_OVERRIDE_ENV_VAR, + Some(refresh_url.as_str()), + ), + ], + ) + .await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let failed_request_id = mcp + .send_get_auth_status_request(GetAuthStatusParams { + include_token: Some(true), + refresh_token: Some(true), + }) + .await?; + + let failed_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(failed_request_id)), + ) + .await??; + let failed_status: GetAuthStatusResponse = to_response(failed_resp)?; + assert_eq!( + failed_status, + GetAuthStatusResponse { + auth_method: Some(AuthMode::Chatgpt), + auth_token: None, + requires_openai_auth: Some(true), + } + ); + + write_chatgpt_auth( + codex_home.path(), + ChatGptAuthFixture::new("recovered-access-token") + .refresh_token("recovered-refresh-token") + .account_id("acct_123") + .email("user@example.com") + .plan_type("pro") + .last_refresh(Some(Utc::now())), + AuthCredentialsStoreMode::File, + )?; + + let recovered_request_id = mcp + .send_get_auth_status_request(GetAuthStatusParams { + include_token: Some(true), + refresh_token: Some(false), + }) + .await?; + + let recovered_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(recovered_request_id)), + ) + .await??; + let recovered_status: GetAuthStatusResponse = to_response(recovered_resp)?; + assert_eq!( + recovered_status, + GetAuthStatusResponse { + auth_method: Some(AuthMode::Chatgpt), + auth_token: Some("recovered-access-token".to_string()), + requires_openai_auth: Some(true), + } + ); + + server.verify().await; + Ok(()) +} + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] async fn login_api_key_rejected_when_forced_chatgpt() -> Result<()> { let codex_home = TempDir::new()?; diff --git a/codex-rs/core/tests/suite/auth_refresh.rs b/codex-rs/core/tests/suite/auth_refresh.rs index 278124c63a4..0acc24cabe8 100644 --- a/codex-rs/core/tests/suite/auth_refresh.rs +++ b/codex-rs/core/tests/suite/auth_refresh.rs @@ -543,6 +543,153 @@ async fn refresh_token_returns_permanent_error_for_expired_refresh_token() -> Re Ok(()) } +#[serial_test::serial(auth_refresh)] +#[tokio::test] +async fn refresh_token_does_not_retry_after_permanent_failure() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/oauth/token")) + .respond_with(ResponseTemplate::new(401).set_body_json(json!({ + "error": { + "code": "refresh_token_reused" + } + }))) + .expect(1) + .mount(&server) + .await; + + let ctx = RefreshTokenTestContext::new(&server)?; + let initial_last_refresh = Utc::now() - Duration::days(1); + let initial_tokens = build_tokens(INITIAL_ACCESS_TOKEN, INITIAL_REFRESH_TOKEN); + let initial_auth = AuthDotJson { + auth_mode: Some(AuthMode::Chatgpt), + openai_api_key: None, + tokens: Some(initial_tokens.clone()), + last_refresh: Some(initial_last_refresh), + }; + ctx.write_auth(&initial_auth)?; + + let first_err = ctx + .auth_manager + .refresh_token() + .await + .err() + .context("first refresh should fail")?; + assert_eq!( + first_err.failed_reason(), + Some(RefreshTokenFailedReason::Exhausted) + ); + + let second_err = ctx + .auth_manager + .refresh_token() + .await + .err() + .context("second refresh should fail without retrying")?; + assert_eq!( + second_err.failed_reason(), + Some(RefreshTokenFailedReason::Exhausted) + ); + + let stored = ctx.load_auth()?; + assert_eq!(stored, initial_auth); + let cached_auth = ctx + .auth_manager + .auth() + .await + .context("auth should remain cached")?; + let cached = cached_auth + .get_token_data() + .context("token data should remain cached")?; + assert_eq!(cached, initial_tokens); + + server.verify().await; + Ok(()) +} + +#[serial_test::serial(auth_refresh)] +#[tokio::test] +async fn refresh_token_reloads_changed_auth_after_permanent_failure() -> Result<()> { + skip_if_no_network!(Ok(())); + + let server = MockServer::start().await; + Mock::given(method("POST")) + .and(path("/oauth/token")) + .respond_with(ResponseTemplate::new(401).set_body_json(json!({ + "error": { + "code": "refresh_token_reused" + } + }))) + .expect(1) + .mount(&server) + .await; + + let ctx = RefreshTokenTestContext::new(&server)?; + let initial_last_refresh = Utc::now() - Duration::days(1); + let initial_tokens = build_tokens(INITIAL_ACCESS_TOKEN, INITIAL_REFRESH_TOKEN); + let initial_auth = AuthDotJson { + auth_mode: Some(AuthMode::Chatgpt), + openai_api_key: None, + tokens: Some(initial_tokens.clone()), + last_refresh: Some(initial_last_refresh), + }; + ctx.write_auth(&initial_auth)?; + + let first_err = ctx + .auth_manager + .refresh_token() + .await + .err() + .context("first refresh should fail")?; + assert_eq!( + first_err.failed_reason(), + Some(RefreshTokenFailedReason::Exhausted) + ); + + let fresh_refresh = Utc::now() - Duration::hours(1); + let disk_tokens = build_tokens("disk-access-token", "disk-refresh-token"); + let disk_auth = AuthDotJson { + auth_mode: Some(AuthMode::Chatgpt), + openai_api_key: None, + tokens: Some(disk_tokens.clone()), + last_refresh: Some(fresh_refresh), + }; + save_auth( + ctx.codex_home.path(), + &disk_auth, + AuthCredentialsStoreMode::File, + )?; + + ctx.auth_manager + .refresh_token() + .await + .context("refresh should reload changed auth without retrying")?; + + let stored = ctx.load_auth()?; + assert_eq!(stored, disk_auth); + + let cached_auth = ctx + .auth_manager + .auth_cached() + .context("auth should be cached")?; + let cached = cached_auth + .get_token_data() + .context("token data should reload from disk")?; + assert_eq!(cached, disk_tokens); + + let requests = server.received_requests().await.unwrap_or_default(); + assert_eq!( + requests.len(), + 1, + "expected only the initial refresh request" + ); + + server.verify().await; + Ok(()) +} + #[serial_test::serial(auth_refresh)] #[tokio::test] async fn refresh_token_returns_transient_error_on_server_failure() -> Result<()> { diff --git a/codex-rs/login/src/auth/auth_tests.rs b/codex-rs/login/src/auth/auth_tests.rs index f9fb58a9d5f..60511caa4da 100644 --- a/codex-rs/login/src/auth/auth_tests.rs +++ b/codex-rs/login/src/auth/auth_tests.rs @@ -197,6 +197,49 @@ fn unauthorized_recovery_reports_mode_and_step_names() { assert_eq!(external.step_name(), "external_refresh"); } +#[test] +fn refresh_failure_is_scoped_to_the_matching_auth_snapshot() { + let codex_home = tempdir().unwrap(); + write_auth_file( + AuthFileParams { + openai_api_key: None, + chatgpt_plan_type: Some("pro".to_string()), + chatgpt_account_id: Some("org_mine".to_string()), + }, + codex_home.path(), + ) + .expect("failed to write auth file"); + + let auth = super::load_auth(codex_home.path(), false, AuthCredentialsStoreMode::File) + .expect("load auth") + .expect("auth available"); + let mut updated_auth_dot_json = auth + .get_current_auth_json() + .expect("AuthDotJson should exist"); + let updated_tokens = updated_auth_dot_json + .tokens + .as_mut() + .expect("tokens should exist"); + updated_tokens.access_token = "new-access-token".to_string(); + updated_tokens.refresh_token = "new-refresh-token".to_string(); + let updated_auth = CodexAuth::from_auth_dot_json( + codex_home.path(), + updated_auth_dot_json, + AuthCredentialsStoreMode::File, + ) + .expect("updated auth should parse"); + + let manager = AuthManager::from_auth_for_testing(auth.clone()); + let error = RefreshTokenFailedError::new( + RefreshTokenFailedReason::Exhausted, + "refresh token already used", + ); + manager.record_permanent_refresh_failure_if_unchanged(&auth, &error); + + assert_eq!(manager.refresh_failure_for_auth(&auth), Some(error)); + assert_eq!(manager.refresh_failure_for_auth(&updated_auth), None); +} + struct AuthFileParams { openai_api_key: Option, chatgpt_plan_type: Option, diff --git a/codex-rs/login/src/auth/manager.rs b/codex-rs/login/src/auth/manager.rs index 31860cd5857..8508f99fa32 100644 --- a/codex-rs/login/src/auth/manager.rs +++ b/codex-rs/login/src/auth/manager.rs @@ -796,6 +796,15 @@ struct CachedAuth { auth: Option, /// Callback used to refresh external auth by asking the parent app for new tokens. external_refresher: Option>, + /// Permanent refresh failure cached for the current auth snapshot so + /// later refresh attempts for the same credentials fail fast without network. + permanent_refresh_failure: Option, +} + +#[derive(Clone)] +struct AuthScopedRefreshFailure { + auth: CodexAuth, + error: RefreshTokenFailedError, } impl Debug for CachedAuth { @@ -809,6 +818,13 @@ impl Debug for CachedAuth { "external_refresher", &self.external_refresher.as_ref().map(|_| "present"), ) + .field( + "permanent_refresh_failure", + &self + .permanent_refresh_failure + .as_ref() + .map(|failure| failure.error.reason), + ) .finish() } } @@ -1046,6 +1062,7 @@ impl AuthManager { inner: RwLock::new(CachedAuth { auth: managed_auth, external_refresher: None, + permanent_refresh_failure: None, }), enable_codex_api_key_env, auth_credentials_store_mode, @@ -1058,6 +1075,7 @@ impl AuthManager { let cached = CachedAuth { auth: Some(auth), external_refresher: None, + permanent_refresh_failure: None, }; Arc::new(Self { @@ -1074,6 +1092,7 @@ impl AuthManager { let cached = CachedAuth { auth: Some(auth), external_refresher: None, + permanent_refresh_failure: None, }; Arc::new(Self { codex_home, @@ -1089,6 +1108,16 @@ impl AuthManager { self.inner.read().ok().and_then(|c| c.auth.clone()) } + pub fn refresh_failure_for_auth(&self, auth: &CodexAuth) -> Option { + self.inner.read().ok().and_then(|cached| { + cached + .permanent_refresh_failure + .as_ref() + .filter(|failure| Self::auths_equal_for_refresh(Some(auth), Some(&failure.auth))) + .map(|failure| failure.error.clone()) + }) + } + /// Current cached auth (clone). May be `None` if not logged in or load failed. /// For stale managed ChatGPT auth, first performs a guarded reload and then /// refreshes only if the on-disk auth is unchanged. @@ -1166,6 +1195,25 @@ impl AuthManager { } } + /// Records a permanent refresh failure only if the failed refresh was + /// attempted against the auth snapshot that is still cached. + fn record_permanent_refresh_failure_if_unchanged( + &self, + attempted_auth: &CodexAuth, + error: &RefreshTokenFailedError, + ) { + if let Ok(mut guard) = self.inner.write() { + let current_auth_matches = + Self::auths_equal_for_refresh(Some(attempted_auth), guard.auth.as_ref()); + if current_auth_matches { + guard.permanent_refresh_failure = Some(AuthScopedRefreshFailure { + auth: attempted_auth.clone(), + error: error.clone(), + }); + } + } + } + fn load_auth_from_storage(&self) -> Option { load_auth( &self.codex_home, @@ -1180,6 +1228,11 @@ impl AuthManager { if let Ok(mut guard) = self.inner.write() { let previous = guard.auth.as_ref(); let changed = !AuthManager::auths_equal(previous, new_auth.as_ref()); + let auth_changed_for_refresh = + !Self::auths_equal_for_refresh(previous, new_auth.as_ref()); + if auth_changed_for_refresh { + guard.permanent_refresh_failure = None; + } tracing::info!("Reloaded auth, changed: {changed}"); guard.auth = new_auth; changed @@ -1255,6 +1308,12 @@ impl AuthManager { /// token is the same as the cached, then ask the token authority to refresh. pub async fn refresh_token(&self) -> Result<(), RefreshTokenError> { let auth_before_reload = self.auth_cached(); + if auth_before_reload + .as_ref() + .is_some_and(CodexAuth::is_api_key_auth) + { + return Ok(()); + } let expected_account_id = auth_before_reload .as_ref() .and_then(CodexAuth::get_account_id); @@ -1285,7 +1344,12 @@ impl AuthManager { Some(auth) => auth, None => return Ok(()), }; - match auth { + if let Some(error) = self.refresh_failure_for_auth(&auth) { + return Err(RefreshTokenError::Permanent(error)); + } + + let attempted_auth = auth.clone(); + let result = match auth { CodexAuth::ChatgptAuthTokens(_) => { self.refresh_external_auth(ExternalAuthRefreshReason::Unauthorized) .await @@ -1297,11 +1361,14 @@ impl AuthManager { )) })?; self.refresh_and_persist_chatgpt_token(&chatgpt_auth, token_data.refresh_token) - .await?; - Ok(()) + .await } CodexAuth::ApiKey(_) => Ok(()), + }; + if let Err(RefreshTokenError::Permanent(error)) = &result { + self.record_permanent_refresh_failure_if_unchanged(&attempted_auth, error); } + result } /// Log out by deleting the on‑disk auth.json (if present). Returns Ok(true)