Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 34 additions & 14 deletions codex-rs/app-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,13 @@ enum EnsureConversationListenerResult {
ConnectionClosed,
}

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum RefreshTokenRequestOutcome {
NotAttemptedOrSucceeded,
FailedTransiently,
FailedPermanently,
}

pub(crate) struct CodexMessageProcessorArgs {
pub(crate) auth_manager: Arc<AuthManager>,
pub(crate) thread_manager: Arc<ThreadManager>,
Expand Down Expand Up @@ -1338,13 +1345,19 @@ impl CodexMessageProcessor {
}
}

async fn refresh_token_if_requested(&self, do_refresh: bool) {
async fn refresh_token_if_requested(&self, do_refresh: bool) -> RefreshTokenRequestOutcome {
if self.auth_manager.is_external_auth_active() {
return;
return RefreshTokenRequestOutcome::NotAttemptedOrSucceeded;
}
if do_refresh && let Err(err) = self.auth_manager.refresh_token().await {
tracing::warn!("failed to refresh token while getting account: {err}");
let failed_reason = err.failed_reason();
if failed_reason.is_none() {
tracing::warn!("failed to refresh token while getting account: {err}");
return RefreshTokenRequestOutcome::FailedTransiently;
}
return RefreshTokenRequestOutcome::FailedPermanently;
}
RefreshTokenRequestOutcome::NotAttemptedOrSucceeded
}

async fn get_auth_status(&self, request_id: ConnectionRequestId, params: GetAuthStatusParams) {
Expand All @@ -1367,18 +1380,25 @@ impl CodexMessageProcessor {
} else {
match self.auth_manager.auth().await {
Some(auth) => {
let permanent_refresh_failure =
self.auth_manager.refresh_failure_for_auth(&auth).is_some();
let auth_mode = auth.api_auth_mode();
let (reported_auth_method, token_opt) = match auth.get_token() {
Ok(token) if !token.is_empty() => {
let tok = if include_token { Some(token) } else { None };
(Some(auth_mode), tok)
}
Ok(_) => (None, None),
Err(err) => {
tracing::warn!("failed to get token for auth status: {err}");
(None, None)
}
};
let (reported_auth_method, token_opt) =
if include_token && permanent_refresh_failure {
(Some(auth_mode), None)
Comment thread
celia-oai marked this conversation as resolved.
} else {
match auth.get_token() {
Ok(token) if !token.is_empty() => {
let tok = if include_token { Some(token) } else { None };
(Some(auth_mode), tok)
}
Ok(_) => (None, None),
Err(err) => {
tracing::warn!("failed to get token for auth status: {err}");
(None, None)
}
}
};
GetAuthStatusResponse {
auth_method: reported_auth_method,
auth_token: token_opt,
Expand Down
293 changes: 293 additions & 0 deletions codex-rs/app-server/tests/suite/auth.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,28 @@
use anyhow::Result;
use app_test_support::ChatGptAuthFixture;
use app_test_support::McpProcess;
use app_test_support::to_response;
use app_test_support::write_chatgpt_auth;
use chrono::Duration;
use chrono::Utc;
use codex_app_server_protocol::AuthMode;
use codex_app_server_protocol::GetAuthStatusParams;
use codex_app_server_protocol::GetAuthStatusResponse;
use codex_app_server_protocol::JSONRPCError;
use codex_app_server_protocol::JSONRPCResponse;
use codex_app_server_protocol::LoginAccountResponse;
use codex_app_server_protocol::RequestId;
use codex_core::auth::AuthCredentialsStoreMode;
use codex_core::auth::REFRESH_TOKEN_URL_OVERRIDE_ENV_VAR;
use pretty_assertions::assert_eq;
use std::path::Path;
use tempfile::TempDir;
use tokio::time::timeout;
use wiremock::Mock;
use wiremock::MockServer;
use wiremock::ResponseTemplate;
use wiremock::matchers::method;
use wiremock::matchers::path;

const DEFAULT_READ_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10);

Expand Down Expand Up @@ -207,6 +218,288 @@ async fn get_auth_status_with_api_key_no_include_token() -> Result<()> {
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_auth_status_with_api_key_refresh_requested() -> Result<()> {
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path())?;

let mut mcp = McpProcess::new(codex_home.path()).await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;

login_with_api_key_via_request(&mut mcp, "sk-test-key").await?;

let request_id = mcp
.send_get_auth_status_request(GetAuthStatusParams {
include_token: Some(true),
refresh_token: Some(true),
})
.await?;

let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let status: GetAuthStatusResponse = to_response(resp)?;
assert_eq!(
status,
GetAuthStatusResponse {
auth_method: Some(AuthMode::ApiKey),
auth_token: Some("sk-test-key".to_string()),
requires_openai_auth: Some(true),
}
);
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_auth_status_omits_token_after_permanent_refresh_failure() -> Result<()> {
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path())?;
write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("stale-access-token")
.refresh_token("stale-refresh-token")
.account_id("acct_123")
.email("user@example.com")
.plan_type("pro"),
AuthCredentialsStoreMode::File,
)?;

let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/oauth/token"))
.respond_with(ResponseTemplate::new(401).set_body_json(serde_json::json!({
"error": {
"code": "refresh_token_reused"
}
})))
.expect(1)
.mount(&server)
.await;

let refresh_url = format!("{}/oauth/token", server.uri());
let mut mcp = McpProcess::new_with_env(
codex_home.path(),
&[
("OPENAI_API_KEY", None),
(
REFRESH_TOKEN_URL_OVERRIDE_ENV_VAR,
Some(refresh_url.as_str()),
),
],
)
.await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;

let request_id = mcp
.send_get_auth_status_request(GetAuthStatusParams {
include_token: Some(true),
refresh_token: Some(true),
})
.await?;

let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let status: GetAuthStatusResponse = to_response(resp)?;
assert_eq!(
status,
GetAuthStatusResponse {
auth_method: Some(AuthMode::Chatgpt),
auth_token: None,
requires_openai_auth: Some(true),
}
);

let second_request_id = mcp
.send_get_auth_status_request(GetAuthStatusParams {
include_token: Some(true),
refresh_token: Some(true),
})
.await?;

let second_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(second_request_id)),
)
.await??;
let second_status: GetAuthStatusResponse = to_response(second_resp)?;
assert_eq!(second_status, status);

server.verify().await;
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_auth_status_omits_token_after_proactive_refresh_failure() -> Result<()> {
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path())?;
write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("stale-access-token")
.refresh_token("stale-refresh-token")
.account_id("acct_123")
.email("user@example.com")
.plan_type("pro")
.last_refresh(Some(Utc::now() - Duration::days(9))),
AuthCredentialsStoreMode::File,
)?;

let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/oauth/token"))
.respond_with(ResponseTemplate::new(401).set_body_json(serde_json::json!({
"error": {
"code": "refresh_token_reused"
}
})))
.expect(2)
.mount(&server)
.await;

let refresh_url = format!("{}/oauth/token", server.uri());
let mut mcp = McpProcess::new_with_env(
codex_home.path(),
&[
("OPENAI_API_KEY", None),
(
REFRESH_TOKEN_URL_OVERRIDE_ENV_VAR,
Some(refresh_url.as_str()),
),
],
)
.await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;

let request_id = mcp
.send_get_auth_status_request(GetAuthStatusParams {
include_token: Some(true),
refresh_token: Some(false),
})
.await?;

let resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(request_id)),
)
.await??;
let status: GetAuthStatusResponse = to_response(resp)?;
assert_eq!(
status,
GetAuthStatusResponse {
auth_method: Some(AuthMode::Chatgpt),
auth_token: None,
requires_openai_auth: Some(true),
}
);

server.verify().await;
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn get_auth_status_returns_token_after_proactive_refresh_recovery() -> Result<()> {
let codex_home = TempDir::new()?;
create_config_toml(codex_home.path())?;
write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("stale-access-token")
.refresh_token("stale-refresh-token")
.account_id("acct_123")
.email("user@example.com")
.plan_type("pro")
.last_refresh(Some(Utc::now() - Duration::days(9))),
AuthCredentialsStoreMode::File,
)?;

let server = MockServer::start().await;
Mock::given(method("POST"))
.and(path("/oauth/token"))
.respond_with(ResponseTemplate::new(401).set_body_json(serde_json::json!({
"error": {
"code": "refresh_token_reused"
}
})))
.expect(2)
.mount(&server)
.await;

let refresh_url = format!("{}/oauth/token", server.uri());
let mut mcp = McpProcess::new_with_env(
codex_home.path(),
&[
("OPENAI_API_KEY", None),
(
REFRESH_TOKEN_URL_OVERRIDE_ENV_VAR,
Some(refresh_url.as_str()),
),
],
)
.await?;
timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??;

let failed_request_id = mcp
.send_get_auth_status_request(GetAuthStatusParams {
include_token: Some(true),
refresh_token: Some(true),
})
.await?;

let failed_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(failed_request_id)),
)
.await??;
let failed_status: GetAuthStatusResponse = to_response(failed_resp)?;
assert_eq!(
failed_status,
GetAuthStatusResponse {
auth_method: Some(AuthMode::Chatgpt),
auth_token: None,
requires_openai_auth: Some(true),
}
);

write_chatgpt_auth(
codex_home.path(),
ChatGptAuthFixture::new("recovered-access-token")
.refresh_token("recovered-refresh-token")
.account_id("acct_123")
.email("user@example.com")
.plan_type("pro")
.last_refresh(Some(Utc::now())),
AuthCredentialsStoreMode::File,
)?;

let recovered_request_id = mcp
.send_get_auth_status_request(GetAuthStatusParams {
include_token: Some(true),
refresh_token: Some(false),
})
.await?;

let recovered_resp: JSONRPCResponse = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_response_message(RequestId::Integer(recovered_request_id)),
)
.await??;
let recovered_status: GetAuthStatusResponse = to_response(recovered_resp)?;
assert_eq!(
recovered_status,
GetAuthStatusResponse {
auth_method: Some(AuthMode::Chatgpt),
auth_token: Some("recovered-access-token".to_string()),
requires_openai_auth: Some(true),
}
);

server.verify().await;
Ok(())
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn login_api_key_rejected_when_forced_chatgpt() -> Result<()> {
let codex_home = TempDir::new()?;
Expand Down
Loading
Loading