diff --git a/codex-rs/cloud-requirements/src/lib.rs b/codex-rs/cloud-requirements/src/lib.rs index 18c95c3d37c..11df536cc87 100644 --- a/codex-rs/cloud-requirements/src/lib.rs +++ b/codex-rs/cloud-requirements/src/lib.rs @@ -399,7 +399,7 @@ impl CloudRequirementsService { "Cloud requirements request was unauthorized; attempting auth recovery" ); match auth_recovery.next().await { - Ok(()) => { + Ok(_) => { let Some(refreshed_auth) = self.auth_manager.auth().await else { tracing::error!( "Auth recovery succeeded but no auth is available for cloud requirements" diff --git a/codex-rs/codex-api/src/endpoint/responses_websocket.rs b/codex-rs/codex-api/src/endpoint/responses_websocket.rs index 28923238b15..d3b578db697 100644 --- a/codex-rs/codex-api/src/endpoint/responses_websocket.rs +++ b/codex-rs/codex-api/src/endpoint/responses_websocket.rs @@ -214,6 +214,7 @@ impl ResponsesWebsocketConnection { pub async fn stream_request( &self, request: ResponsesWsRequest, + connection_reused: bool, ) -> Result { let (tx_event, rx_event) = mpsc::channel::>(1600); @@ -258,6 +259,7 @@ impl ResponsesWebsocketConnection { request_body, idle_timeout, telemetry, + connection_reused, ) .await }; @@ -534,6 +536,7 @@ async fn run_websocket_response_stream( request_body: Value, idle_timeout: Duration, telemetry: Option>, + connection_reused: bool, ) -> Result<(), ApiError> { let mut last_server_model: Option = None; let request_text = match serde_json::to_string(&request_body) { @@ -553,7 +556,11 @@ async fn run_websocket_response_stream( .map_err(|err| ApiError::Stream(format!("failed to send websocket request: {err}"))); if let Some(t) = telemetry.as_ref() { - t.on_ws_request(request_start.elapsed(), result.as_ref().err()); + t.on_ws_request( + request_start.elapsed(), + result.as_ref().err(), + connection_reused, + ); } result?; diff --git a/codex-rs/codex-api/src/telemetry.rs b/codex-rs/codex-api/src/telemetry.rs index 7b04fd2113b..91918a65b92 100644 --- a/codex-rs/codex-api/src/telemetry.rs +++ b/codex-rs/codex-api/src/telemetry.rs @@ -33,7 +33,7 @@ pub trait SseTelemetry: Send + Sync { /// Telemetry for Responses WebSocket transport. pub trait WebsocketTelemetry: Send + Sync { - fn on_ws_request(&self, duration: Duration, error: Option<&ApiError>); + fn on_ws_request(&self, duration: Duration, error: Option<&ApiError>, connection_reused: bool); fn on_ws_event( &self, diff --git a/codex-rs/core/src/api_bridge.rs b/codex-rs/core/src/api_bridge.rs index f363201ae98..2060b78cf76 100644 --- a/codex-rs/core/src/api_bridge.rs +++ b/codex-rs/core/src/api_bridge.rs @@ -1,3 +1,4 @@ +use base64::Engine; use chrono::DateTime; use chrono::Utc; use codex_api::AuthProvider as ApiAuthProvider; @@ -7,6 +8,7 @@ use codex_api::rate_limits::parse_promo_message; use codex_api::rate_limits::parse_rate_limit_for_limit; use http::HeaderMap; use serde::Deserialize; +use serde_json::Value; use crate::auth::CodexAuth; use crate::error::CodexErr; @@ -30,6 +32,8 @@ pub(crate) fn map_api_error(err: ApiError) -> CodexErr { url: None, cf_ray: None, request_id: None, + identity_authorization_error: None, + identity_error_code: None, }), ApiError::InvalidRequest { message } => CodexErr::InvalidRequest(message), ApiError::Transport(transport) => match transport { @@ -98,6 +102,11 @@ pub(crate) fn map_api_error(err: ApiError) -> CodexErr { url, cf_ray: extract_header(headers.as_ref(), CF_RAY_HEADER), request_id: extract_request_id(headers.as_ref()), + identity_authorization_error: extract_header( + headers.as_ref(), + X_OPENAI_AUTHORIZATION_ERROR_HEADER, + ), + identity_error_code: extract_x_error_json_code(headers.as_ref()), }) } } @@ -118,6 +127,8 @@ const ACTIVE_LIMIT_HEADER: &str = "x-codex-active-limit"; const REQUEST_ID_HEADER: &str = "x-request-id"; const OAI_REQUEST_ID_HEADER: &str = "x-oai-request-id"; const CF_RAY_HEADER: &str = "cf-ray"; +const X_OPENAI_AUTHORIZATION_ERROR_HEADER: &str = "x-openai-authorization-error"; +const X_ERROR_JSON_HEADER: &str = "x-error-json"; #[cfg(test)] #[path = "api_bridge_tests.rs"] @@ -140,6 +151,19 @@ fn extract_header(headers: Option<&HeaderMap>, name: &str) -> Option { }) } +fn extract_x_error_json_code(headers: Option<&HeaderMap>) -> Option { + let encoded = extract_header(headers, X_ERROR_JSON_HEADER)?; + let decoded = base64::engine::general_purpose::STANDARD + .decode(encoded) + .ok()?; + let parsed = serde_json::from_slice::(&decoded).ok()?; + parsed + .get("error") + .and_then(|error| error.get("code")) + .and_then(Value::as_str) + .map(str::to_string) +} + pub(crate) fn auth_provider_from_auth( auth: Option, provider: &ModelProviderInfo, @@ -191,6 +215,26 @@ pub(crate) struct CoreAuthProvider { account_id: Option, } +impl CoreAuthProvider { + pub(crate) fn auth_header_attached(&self) -> bool { + self.token + .as_ref() + .is_some_and(|token| http::HeaderValue::from_str(&format!("Bearer {token}")).is_ok()) + } + + pub(crate) fn auth_header_name(&self) -> Option<&'static str> { + self.auth_header_attached().then_some("authorization") + } + + #[cfg(test)] + pub(crate) fn for_test(token: Option<&str>, account_id: Option<&str>) -> Self { + Self { + token: token.map(str::to_string), + account_id: account_id.map(str::to_string), + } + } +} + impl ApiAuthProvider for CoreAuthProvider { fn bearer_token(&self) -> Option { self.token.clone() diff --git a/codex-rs/core/src/api_bridge_tests.rs b/codex-rs/core/src/api_bridge_tests.rs index e8391021b1f..71d3889915c 100644 --- a/codex-rs/core/src/api_bridge_tests.rs +++ b/codex-rs/core/src/api_bridge_tests.rs @@ -1,4 +1,5 @@ use super::*; +use base64::Engine; use pretty_assertions::assert_eq; #[test] @@ -94,3 +95,49 @@ fn map_api_error_does_not_fallback_limit_name_to_limit_id() { None ); } + +#[test] +fn map_api_error_extracts_identity_auth_details_from_headers() { + let mut headers = HeaderMap::new(); + headers.insert(REQUEST_ID_HEADER, http::HeaderValue::from_static("req-401")); + headers.insert(CF_RAY_HEADER, http::HeaderValue::from_static("ray-401")); + headers.insert( + X_OPENAI_AUTHORIZATION_ERROR_HEADER, + http::HeaderValue::from_static("missing_authorization_header"), + ); + let x_error_json = + base64::engine::general_purpose::STANDARD.encode(r#"{"error":{"code":"token_expired"}}"#); + headers.insert( + X_ERROR_JSON_HEADER, + http::HeaderValue::from_str(&x_error_json).expect("valid x-error-json header"), + ); + + let err = map_api_error(ApiError::Transport(TransportError::Http { + status: http::StatusCode::UNAUTHORIZED, + url: Some("https://chatgpt.com/backend-api/codex/models".to_string()), + headers: Some(headers), + body: Some(r#"{"detail":"Unauthorized"}"#.to_string()), + })); + + let CodexErr::UnexpectedStatus(err) = err else { + panic!("expected CodexErr::UnexpectedStatus, got {err:?}"); + }; + assert_eq!(err.request_id.as_deref(), Some("req-401")); + assert_eq!(err.cf_ray.as_deref(), Some("ray-401")); + assert_eq!( + err.identity_authorization_error.as_deref(), + Some("missing_authorization_header") + ); + assert_eq!(err.identity_error_code.as_deref(), Some("token_expired")); +} + +#[test] +fn core_auth_provider_reports_when_auth_header_will_attach() { + let auth = CoreAuthProvider { + token: Some("access-token".to_string()), + account_id: None, + }; + + assert!(auth.auth_header_attached()); + assert_eq!(auth.auth_header_name(), Some("authorization")); +} diff --git a/codex-rs/core/src/auth.rs b/codex-rs/core/src/auth.rs index 8818aa5c6cf..78aa693c52b 100644 --- a/codex-rs/core/src/auth.rs +++ b/codex-rs/core/src/auth.rs @@ -874,6 +874,17 @@ pub struct UnauthorizedRecovery { mode: UnauthorizedRecoveryMode, } +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct UnauthorizedRecoveryStepResult { + auth_state_changed: Option, +} + +impl UnauthorizedRecoveryStepResult { + pub fn auth_state_changed(&self) -> Option { + self.auth_state_changed + } +} + impl UnauthorizedRecovery { fn new(manager: Arc) -> Self { let cached_auth = manager.auth_cached(); @@ -917,7 +928,46 @@ impl UnauthorizedRecovery { !matches!(self.step, UnauthorizedRecoveryStep::Done) } - pub async fn next(&mut self) -> Result<(), RefreshTokenError> { + pub fn unavailable_reason(&self) -> &'static str { + if !self + .manager + .auth_cached() + .as_ref() + .is_some_and(CodexAuth::is_chatgpt_auth) + { + return "not_chatgpt_auth"; + } + + if self.mode == UnauthorizedRecoveryMode::External + && !self.manager.has_external_auth_refresher() + { + return "no_external_refresher"; + } + + if matches!(self.step, UnauthorizedRecoveryStep::Done) { + return "recovery_exhausted"; + } + + "ready" + } + + pub fn mode_name(&self) -> &'static str { + match self.mode { + UnauthorizedRecoveryMode::Managed => "managed", + UnauthorizedRecoveryMode::External => "external", + } + } + + pub fn step_name(&self) -> &'static str { + match self.step { + UnauthorizedRecoveryStep::Reload => "reload", + UnauthorizedRecoveryStep::RefreshToken => "refresh_token", + UnauthorizedRecoveryStep::ExternalRefresh => "external_refresh", + UnauthorizedRecoveryStep::Done => "done", + } + } + + pub async fn next(&mut self) -> Result { if !self.has_next() { return Err(RefreshTokenError::Permanent(RefreshTokenFailedError::new( RefreshTokenFailedReason::Other, @@ -931,8 +981,17 @@ impl UnauthorizedRecovery { .manager .reload_if_account_id_matches(self.expected_account_id.as_deref()) { - ReloadOutcome::ReloadedChanged | ReloadOutcome::ReloadedNoChange => { + ReloadOutcome::ReloadedChanged => { self.step = UnauthorizedRecoveryStep::RefreshToken; + return Ok(UnauthorizedRecoveryStepResult { + auth_state_changed: Some(true), + }); + } + ReloadOutcome::ReloadedNoChange => { + self.step = UnauthorizedRecoveryStep::RefreshToken; + return Ok(UnauthorizedRecoveryStepResult { + auth_state_changed: Some(false), + }); } ReloadOutcome::Skipped => { self.step = UnauthorizedRecoveryStep::Done; @@ -946,16 +1005,24 @@ impl UnauthorizedRecovery { UnauthorizedRecoveryStep::RefreshToken => { self.manager.refresh_token_from_authority().await?; self.step = UnauthorizedRecoveryStep::Done; + return Ok(UnauthorizedRecoveryStepResult { + auth_state_changed: Some(true), + }); } UnauthorizedRecoveryStep::ExternalRefresh => { self.manager .refresh_external_auth(ExternalAuthRefreshReason::Unauthorized) .await?; self.step = UnauthorizedRecoveryStep::Done; + return Ok(UnauthorizedRecoveryStepResult { + auth_state_changed: Some(true), + }); } UnauthorizedRecoveryStep::Done => {} } - Ok(()) + Ok(UnauthorizedRecoveryStepResult { + auth_state_changed: None, + }) } } diff --git a/codex-rs/core/src/auth_tests.rs b/codex-rs/core/src/auth_tests.rs index 0c4a574f340..3bc5eb6c781 100644 --- a/codex-rs/core/src/auth_tests.rs +++ b/codex-rs/core/src/auth_tests.rs @@ -13,6 +13,7 @@ use codex_protocol::config_types::ForcedLoginMethod; use pretty_assertions::assert_eq; use serde::Serialize; use serde_json::json; +use std::sync::Arc; use tempfile::tempdir; #[tokio::test] @@ -171,6 +172,33 @@ fn logout_removes_auth_file() -> Result<(), std::io::Error> { Ok(()) } +#[test] +fn unauthorized_recovery_reports_mode_and_step_names() { + let dir = tempdir().unwrap(); + let manager = AuthManager::shared( + dir.path().to_path_buf(), + false, + AuthCredentialsStoreMode::File, + ); + let managed = UnauthorizedRecovery { + manager: Arc::clone(&manager), + step: UnauthorizedRecoveryStep::Reload, + expected_account_id: None, + mode: UnauthorizedRecoveryMode::Managed, + }; + assert_eq!(managed.mode_name(), "managed"); + assert_eq!(managed.step_name(), "reload"); + + let external = UnauthorizedRecovery { + manager, + step: UnauthorizedRecoveryStep::ExternalRefresh, + expected_account_id: None, + mode: UnauthorizedRecoveryMode::External, + }; + assert_eq!(external.mode_name(), "external"); + assert_eq!(external.step_name(), "external_refresh"); +} + struct AuthFileParams { openai_api_key: Option, chatgpt_plan_type: Option, diff --git a/codex-rs/core/src/client.rs b/codex-rs/core/src/client.rs index 32c1653ae8e..c438d72741a 100644 --- a/codex-rs/core/src/client.rs +++ b/codex-rs/core/src/client.rs @@ -75,6 +75,7 @@ use http::HeaderValue; use http::StatusCode as HttpStatusCode; use reqwest::StatusCode; use std::time::Duration; +use std::time::Instant; use tokio::sync::mpsc; use tokio::sync::oneshot; use tokio::sync::oneshot::error::TryRecvError; @@ -85,6 +86,7 @@ use tracing::trace; use tracing::warn; use crate::AuthManager; +use crate::auth::AuthMode; use crate::auth::CodexAuth; use crate::auth::RefreshTokenError; use crate::client_common::Prompt; @@ -97,7 +99,14 @@ use crate::error::Result; use crate::flags::CODEX_RS_SSE_FIXTURE; use crate::model_provider_info::ModelProviderInfo; use crate::model_provider_info::WireApi; +use crate::response_debug_context::extract_response_debug_context; +use crate::response_debug_context::extract_response_debug_context_from_api_error; +use crate::response_debug_context::telemetry_api_error_message; +use crate::response_debug_context::telemetry_transport_error_message; use crate::tools::spec::create_tools_json_for_responses_api; +use crate::util::FeedbackRequestTags; +use crate::util::emit_feedback_auth_recovery_tags; +use crate::util::emit_feedback_request_tags; pub const OPENAI_BETA_HEADER: &str = "OpenAI-Beta"; pub const X_CODEX_TURN_STATE_HEADER: &str = "x-codex-turn-state"; @@ -105,7 +114,9 @@ pub const X_CODEX_TURN_METADATA_HEADER: &str = "x-codex-turn-metadata"; pub const X_RESPONSESAPI_INCLUDE_TIMING_METRICS_HEADER: &str = "x-responsesapi-include-timing-metrics"; const RESPONSES_WEBSOCKETS_V2_BETA_HEADER_VALUE: &str = "responses_websockets=2026-02-06"; - +const RESPONSES_ENDPOINT: &str = "/responses"; +const RESPONSES_COMPACT_ENDPOINT: &str = "/responses/compact"; +const MEMORIES_SUMMARIZE_ENDPOINT: &str = "/memories/trace_summarize"; pub fn ws_version_from_features(config: &Config) -> bool { config .features @@ -144,6 +155,17 @@ struct CurrentClientSetup { api_auth: CoreAuthProvider, } +#[derive(Clone, Copy)] +struct RequestRouteTelemetry { + endpoint: &'static str, +} + +impl RequestRouteTelemetry { + fn for_endpoint(endpoint: &'static str) -> Self { + Self { endpoint } + } +} + /// A session-scoped client for model-provider API calls. /// /// This holds configuration and state that should be shared across turns within a Codex session @@ -201,6 +223,23 @@ struct WebsocketSession { connection: Option, last_request: Option, last_response_rx: Option>, + connection_reused: StdMutex, +} + +impl WebsocketSession { + fn set_connection_reused(&self, connection_reused: bool) { + *self + .connection_reused + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) = connection_reused; + } + + fn connection_reused(&self) -> bool { + *self + .connection_reused + .lock() + .unwrap_or_else(std::sync::PoisonError::into_inner) + } } enum WebsocketStreamOutcome { @@ -291,7 +330,15 @@ impl ModelClient { } let client_setup = self.current_client_setup().await?; let transport = ReqwestTransport::new(build_reqwest_client()); - let request_telemetry = Self::build_request_telemetry(session_telemetry); + let request_telemetry = Self::build_request_telemetry( + session_telemetry, + AuthRequestTelemetryContext::new( + client_setup.auth.as_ref().map(CodexAuth::auth_mode), + &client_setup.api_auth, + PendingUnauthorizedRetry::default(), + ), + RequestRouteTelemetry::for_endpoint(RESPONSES_COMPACT_ENDPOINT), + ); let client = ApiCompactClient::new(transport, client_setup.api_provider, client_setup.api_auth) .with_telemetry(Some(request_telemetry)); @@ -351,7 +398,15 @@ impl ModelClient { let client_setup = self.current_client_setup().await?; let transport = ReqwestTransport::new(build_reqwest_client()); - let request_telemetry = Self::build_request_telemetry(session_telemetry); + let request_telemetry = Self::build_request_telemetry( + session_telemetry, + AuthRequestTelemetryContext::new( + client_setup.auth.as_ref().map(CodexAuth::auth_mode), + &client_setup.api_auth, + PendingUnauthorizedRetry::default(), + ), + RequestRouteTelemetry::for_endpoint(MEMORIES_SUMMARIZE_ENDPOINT), + ); let client = ApiMemoriesClient::new(transport, client_setup.api_provider, client_setup.api_auth) .with_telemetry(Some(request_telemetry)); @@ -391,8 +446,16 @@ impl ModelClient { } /// Builds request telemetry for unary API calls (e.g., Compact endpoint). - fn build_request_telemetry(session_telemetry: &SessionTelemetry) -> Arc { - let telemetry = Arc::new(ApiTelemetry::new(session_telemetry.clone())); + fn build_request_telemetry( + session_telemetry: &SessionTelemetry, + auth_context: AuthRequestTelemetryContext, + request_route_telemetry: RequestRouteTelemetry, + ) -> Arc { + let telemetry = Arc::new(ApiTelemetry::new( + session_telemetry.clone(), + auth_context, + request_route_telemetry, + )); let request_telemetry: Arc = telemetry; request_telemetry } @@ -458,6 +521,7 @@ impl ModelClient { /// /// Both startup prewarm and in-turn `needs_new` reconnects call this path so handshake /// behavior remains consistent across both flows. + #[allow(clippy::too_many_arguments)] async fn connect_websocket( &self, session_telemetry: &SessionTelemetry, @@ -465,17 +529,69 @@ impl ModelClient { api_auth: CoreAuthProvider, turn_state: Option>>, turn_metadata_header: Option<&str>, + auth_context: AuthRequestTelemetryContext, + request_route_telemetry: RequestRouteTelemetry, ) -> std::result::Result { let headers = self.build_websocket_headers(turn_state.as_ref(), turn_metadata_header); - let websocket_telemetry = ModelClientSession::build_websocket_telemetry(session_telemetry); - ApiWebSocketResponsesClient::new(api_provider, api_auth) + let websocket_telemetry = ModelClientSession::build_websocket_telemetry( + session_telemetry, + auth_context, + request_route_telemetry, + ); + let start = Instant::now(); + let result = ApiWebSocketResponsesClient::new(api_provider, api_auth) .connect( headers, crate::default_client::default_headers(), turn_state, Some(websocket_telemetry), ) - .await + .await; + let error_message = result.as_ref().err().map(telemetry_api_error_message); + let response_debug = result + .as_ref() + .err() + .map(extract_response_debug_context_from_api_error) + .unwrap_or_default(); + let status = result.as_ref().err().and_then(api_error_http_status); + session_telemetry.record_websocket_connect( + start.elapsed(), + status, + error_message.as_deref(), + auth_context.auth_header_attached, + auth_context.auth_header_name, + auth_context.retry_after_unauthorized, + auth_context.recovery_mode, + auth_context.recovery_phase, + request_route_telemetry.endpoint, + false, + response_debug.request_id.as_deref(), + response_debug.cf_ray.as_deref(), + response_debug.auth_error.as_deref(), + response_debug.auth_error_code.as_deref(), + ); + emit_feedback_request_tags(&FeedbackRequestTags { + endpoint: request_route_telemetry.endpoint, + auth_header_attached: auth_context.auth_header_attached, + auth_header_name: auth_context.auth_header_name, + auth_mode: auth_context.auth_mode, + auth_retry_after_unauthorized: Some(auth_context.retry_after_unauthorized), + auth_recovery_mode: auth_context.recovery_mode, + auth_recovery_phase: auth_context.recovery_phase, + auth_connection_reused: Some(false), + auth_request_id: response_debug.request_id.as_deref(), + auth_cf_ray: response_debug.cf_ray.as_deref(), + auth_error: response_debug.auth_error.as_deref(), + auth_error_code: response_debug.auth_error_code.as_deref(), + auth_recovery_followup_success: auth_context + .retry_after_unauthorized + .then_some(result.is_ok()), + auth_recovery_followup_status: auth_context + .retry_after_unauthorized + .then_some(status) + .flatten(), + }); + result } /// Builds websocket handshake headers for both prewarm and turn-time reconnect. @@ -718,7 +834,11 @@ impl ModelClientSession { "failed to build websocket prewarm client setup: {err}" )) })?; - + let auth_context = AuthRequestTelemetryContext::new( + client_setup.auth.as_ref().map(CodexAuth::auth_mode), + &client_setup.api_auth, + PendingUnauthorizedRetry::default(), + ); let connection = self .client .connect_websocket( @@ -727,9 +847,12 @@ impl ModelClientSession { client_setup.api_auth, Some(Arc::clone(&self.turn_state)), None, + auth_context, + RequestRouteTelemetry::for_endpoint(RESPONSES_ENDPOINT), ) .await?; self.websocket_session.connection = Some(connection); + self.websocket_session.set_connection_reused(false); Ok(()) } /// Returns a websocket connection for this turn. @@ -742,17 +865,22 @@ impl ModelClientSession { wire_api = %self.client.state.provider.wire_api, transport = "responses_websocket", api.path = "responses", - turn.has_metadata_header = turn_metadata_header.is_some() + turn.has_metadata_header = params.turn_metadata_header.is_some() ) )] async fn websocket_connection( &mut self, - session_telemetry: &SessionTelemetry, - api_provider: codex_api::Provider, - api_auth: CoreAuthProvider, - turn_metadata_header: Option<&str>, - options: &ApiResponsesOptions, + params: WebsocketConnectParams<'_>, ) -> std::result::Result<&ApiWebSocketConnection, ApiError> { + let WebsocketConnectParams { + session_telemetry, + api_provider, + api_auth, + turn_metadata_header, + options, + auth_context, + request_route_telemetry, + } = params; let needs_new = match self.websocket_session.connection.as_ref() { Some(conn) => conn.is_closed().await, None => true, @@ -773,9 +901,14 @@ impl ModelClientSession { api_auth, Some(turn_state), turn_metadata_header, + auth_context, + request_route_telemetry, ) .await?; self.websocket_session.connection = Some(new_conn); + self.websocket_session.set_connection_reused(false); + } else { + self.websocket_session.set_connection_reused(true); } self.websocket_session @@ -840,11 +973,20 @@ impl ModelClientSession { let mut auth_recovery = auth_manager .as_ref() .map(super::auth::AuthManager::unauthorized_recovery); + let mut pending_retry = PendingUnauthorizedRetry::default(); loop { let client_setup = self.client.current_client_setup().await?; let transport = ReqwestTransport::new(build_reqwest_client()); - let (request_telemetry, sse_telemetry) = - Self::build_streaming_telemetry(session_telemetry); + let request_auth_context = AuthRequestTelemetryContext::new( + client_setup.auth.as_ref().map(CodexAuth::auth_mode), + &client_setup.api_auth, + pending_retry, + ); + let (request_telemetry, sse_telemetry) = Self::build_streaming_telemetry( + session_telemetry, + request_auth_context, + RequestRouteTelemetry::for_endpoint(RESPONSES_ENDPOINT), + ); let compression = self.responses_request_compression(client_setup.auth.as_ref()); let options = self.build_responses_options(turn_metadata_header, compression); @@ -872,7 +1014,14 @@ impl ModelClientSession { Err(ApiError::Transport( unauthorized_transport @ TransportError::Http { status, .. }, )) if status == StatusCode::UNAUTHORIZED => { - handle_unauthorized(unauthorized_transport, &mut auth_recovery).await?; + pending_retry = PendingUnauthorizedRetry::from_recovery( + handle_unauthorized( + unauthorized_transport, + &mut auth_recovery, + session_telemetry, + ) + .await?, + ); continue; } Err(err) => return Err(map_api_error(err)), @@ -911,8 +1060,14 @@ impl ModelClientSession { let mut auth_recovery = auth_manager .as_ref() .map(super::auth::AuthManager::unauthorized_recovery); + let mut pending_retry = PendingUnauthorizedRetry::default(); loop { let client_setup = self.client.current_client_setup().await?; + let request_auth_context = AuthRequestTelemetryContext::new( + client_setup.auth.as_ref().map(CodexAuth::auth_mode), + &client_setup.api_auth, + pending_retry, + ); let compression = self.responses_request_compression(client_setup.auth.as_ref()); let options = self.build_responses_options(turn_metadata_header, compression); @@ -933,13 +1088,17 @@ impl ModelClientSession { } match self - .websocket_connection( + .websocket_connection(WebsocketConnectParams { session_telemetry, - client_setup.api_provider, - client_setup.api_auth, + api_provider: client_setup.api_provider, + api_auth: client_setup.api_auth, turn_metadata_header, - &options, - ) + options: &options, + auth_context: request_auth_context, + request_route_telemetry: RequestRouteTelemetry::for_endpoint( + RESPONSES_ENDPOINT, + ), + }) .await { Ok(_) => {} @@ -951,7 +1110,14 @@ impl ModelClientSession { Err(ApiError::Transport( unauthorized_transport @ TransportError::Http { status, .. }, )) if status == StatusCode::UNAUTHORIZED => { - handle_unauthorized(unauthorized_transport, &mut auth_recovery).await?; + pending_retry = PendingUnauthorizedRetry::from_recovery( + handle_unauthorized( + unauthorized_transport, + &mut auth_recovery, + session_telemetry, + ) + .await?, + ); continue; } Err(err) => return Err(map_api_error(err)), @@ -968,7 +1134,7 @@ impl ModelClientSession { "websocket connection is unavailable".to_string(), )) })? - .stream_request(ws_request) + .stream_request(ws_request, self.websocket_session.connection_reused()) .await .map_err(map_api_error)?; let (stream, last_request_rx) = @@ -981,8 +1147,14 @@ impl ModelClientSession { /// Builds request and SSE telemetry for streaming API calls. fn build_streaming_telemetry( session_telemetry: &SessionTelemetry, + auth_context: AuthRequestTelemetryContext, + request_route_telemetry: RequestRouteTelemetry, ) -> (Arc, Arc) { - let telemetry = Arc::new(ApiTelemetry::new(session_telemetry.clone())); + let telemetry = Arc::new(ApiTelemetry::new( + session_telemetry.clone(), + auth_context, + request_route_telemetry, + )); let request_telemetry: Arc = telemetry.clone(); let sse_telemetry: Arc = telemetry; (request_telemetry, sse_telemetry) @@ -991,8 +1163,14 @@ impl ModelClientSession { /// Builds telemetry for the Responses API WebSocket transport. fn build_websocket_telemetry( session_telemetry: &SessionTelemetry, + auth_context: AuthRequestTelemetryContext, + request_route_telemetry: RequestRouteTelemetry, ) -> Arc { - let telemetry = Arc::new(ApiTelemetry::new(session_telemetry.clone())); + let telemetry = Arc::new(ApiTelemetry::new( + session_telemetry.clone(), + auth_context, + request_route_telemetry, + )); let websocket_telemetry: Arc = telemetry; websocket_telemetry } @@ -1126,6 +1304,7 @@ impl ModelClientSession { self.websocket_session.connection = None; self.websocket_session.last_request = None; self.websocket_session.last_response_rx = None; + self.websocket_session.set_connection_reused(false); } activated } @@ -1264,30 +1443,209 @@ where /// /// When refresh succeeds, the caller should retry the API call; otherwise /// the mapped `CodexErr` is returned to the caller. +#[derive(Clone, Copy, Debug)] +struct UnauthorizedRecoveryExecution { + mode: &'static str, + phase: &'static str, +} + +#[derive(Clone, Copy, Debug, Default)] +struct PendingUnauthorizedRetry { + retry_after_unauthorized: bool, + recovery_mode: Option<&'static str>, + recovery_phase: Option<&'static str>, +} + +impl PendingUnauthorizedRetry { + fn from_recovery(recovery: UnauthorizedRecoveryExecution) -> Self { + Self { + retry_after_unauthorized: true, + recovery_mode: Some(recovery.mode), + recovery_phase: Some(recovery.phase), + } + } +} + +#[derive(Clone, Copy, Debug, Default)] +struct AuthRequestTelemetryContext { + auth_mode: Option<&'static str>, + auth_header_attached: bool, + auth_header_name: Option<&'static str>, + retry_after_unauthorized: bool, + recovery_mode: Option<&'static str>, + recovery_phase: Option<&'static str>, +} + +impl AuthRequestTelemetryContext { + fn new( + auth_mode: Option, + api_auth: &CoreAuthProvider, + retry: PendingUnauthorizedRetry, + ) -> Self { + Self { + auth_mode: auth_mode.map(|mode| match mode { + AuthMode::ApiKey => "ApiKey", + AuthMode::Chatgpt => "Chatgpt", + }), + auth_header_attached: api_auth.auth_header_attached(), + auth_header_name: api_auth.auth_header_name(), + retry_after_unauthorized: retry.retry_after_unauthorized, + recovery_mode: retry.recovery_mode, + recovery_phase: retry.recovery_phase, + } + } +} + +struct WebsocketConnectParams<'a> { + session_telemetry: &'a SessionTelemetry, + api_provider: codex_api::Provider, + api_auth: CoreAuthProvider, + turn_metadata_header: Option<&'a str>, + options: &'a ApiResponsesOptions, + auth_context: AuthRequestTelemetryContext, + request_route_telemetry: RequestRouteTelemetry, +} + async fn handle_unauthorized( transport: TransportError, auth_recovery: &mut Option, -) -> Result<()> { + session_telemetry: &SessionTelemetry, +) -> Result { + let debug = extract_response_debug_context(&transport); if let Some(recovery) = auth_recovery && recovery.has_next() { + let mode = recovery.mode_name(); + let phase = recovery.step_name(); return match recovery.next().await { - Ok(_) => Ok(()), - Err(RefreshTokenError::Permanent(failed)) => Err(CodexErr::RefreshTokenFailed(failed)), - Err(RefreshTokenError::Transient(other)) => Err(CodexErr::Io(other)), + Ok(step_result) => { + session_telemetry.record_auth_recovery( + mode, + phase, + "recovery_succeeded", + debug.request_id.as_deref(), + debug.cf_ray.as_deref(), + debug.auth_error.as_deref(), + debug.auth_error_code.as_deref(), + None, + step_result.auth_state_changed(), + ); + emit_feedback_auth_recovery_tags( + mode, + phase, + "recovery_succeeded", + debug.request_id.as_deref(), + debug.cf_ray.as_deref(), + debug.auth_error.as_deref(), + debug.auth_error_code.as_deref(), + ); + Ok(UnauthorizedRecoveryExecution { mode, phase }) + } + Err(RefreshTokenError::Permanent(failed)) => { + session_telemetry.record_auth_recovery( + mode, + phase, + "recovery_failed_permanent", + debug.request_id.as_deref(), + debug.cf_ray.as_deref(), + debug.auth_error.as_deref(), + debug.auth_error_code.as_deref(), + None, + None, + ); + emit_feedback_auth_recovery_tags( + mode, + phase, + "recovery_failed_permanent", + debug.request_id.as_deref(), + debug.cf_ray.as_deref(), + debug.auth_error.as_deref(), + debug.auth_error_code.as_deref(), + ); + Err(CodexErr::RefreshTokenFailed(failed)) + } + Err(RefreshTokenError::Transient(other)) => { + session_telemetry.record_auth_recovery( + mode, + phase, + "recovery_failed_transient", + debug.request_id.as_deref(), + debug.cf_ray.as_deref(), + debug.auth_error.as_deref(), + debug.auth_error_code.as_deref(), + None, + None, + ); + emit_feedback_auth_recovery_tags( + mode, + phase, + "recovery_failed_transient", + debug.request_id.as_deref(), + debug.cf_ray.as_deref(), + debug.auth_error.as_deref(), + debug.auth_error_code.as_deref(), + ); + Err(CodexErr::Io(other)) + } }; } + let (mode, phase, recovery_reason) = match auth_recovery.as_ref() { + Some(recovery) => ( + recovery.mode_name(), + recovery.step_name(), + Some(recovery.unavailable_reason()), + ), + None => ("none", "none", Some("auth_manager_missing")), + }; + session_telemetry.record_auth_recovery( + mode, + phase, + "recovery_not_run", + debug.request_id.as_deref(), + debug.cf_ray.as_deref(), + debug.auth_error.as_deref(), + debug.auth_error_code.as_deref(), + recovery_reason, + None, + ); + emit_feedback_auth_recovery_tags( + mode, + phase, + "recovery_not_run", + debug.request_id.as_deref(), + debug.cf_ray.as_deref(), + debug.auth_error.as_deref(), + debug.auth_error_code.as_deref(), + ); + Err(map_api_error(ApiError::Transport(transport))) } +fn api_error_http_status(error: &ApiError) -> Option { + match error { + ApiError::Transport(TransportError::Http { status, .. }) => Some(status.as_u16()), + _ => None, + } +} + struct ApiTelemetry { session_telemetry: SessionTelemetry, + auth_context: AuthRequestTelemetryContext, + request_route_telemetry: RequestRouteTelemetry, } impl ApiTelemetry { - fn new(session_telemetry: SessionTelemetry) -> Self { - Self { session_telemetry } + fn new( + session_telemetry: SessionTelemetry, + auth_context: AuthRequestTelemetryContext, + request_route_telemetry: RequestRouteTelemetry, + ) -> Self { + Self { + session_telemetry, + auth_context, + request_route_telemetry, + } } } @@ -1299,13 +1657,50 @@ impl RequestTelemetry for ApiTelemetry { error: Option<&TransportError>, duration: Duration, ) { - let error_message = error.map(std::string::ToString::to_string); + let error_message = error.map(telemetry_transport_error_message); + let status = status.map(|s| s.as_u16()); + let debug = error + .map(extract_response_debug_context) + .unwrap_or_default(); self.session_telemetry.record_api_request( attempt, - status.map(|s| s.as_u16()), + status, error_message.as_deref(), duration, + self.auth_context.auth_header_attached, + self.auth_context.auth_header_name, + self.auth_context.retry_after_unauthorized, + self.auth_context.recovery_mode, + self.auth_context.recovery_phase, + self.request_route_telemetry.endpoint, + debug.request_id.as_deref(), + debug.cf_ray.as_deref(), + debug.auth_error.as_deref(), + debug.auth_error_code.as_deref(), ); + emit_feedback_request_tags(&FeedbackRequestTags { + endpoint: self.request_route_telemetry.endpoint, + auth_header_attached: self.auth_context.auth_header_attached, + auth_header_name: self.auth_context.auth_header_name, + auth_mode: self.auth_context.auth_mode, + auth_retry_after_unauthorized: Some(self.auth_context.retry_after_unauthorized), + auth_recovery_mode: self.auth_context.recovery_mode, + auth_recovery_phase: self.auth_context.recovery_phase, + auth_connection_reused: None, + auth_request_id: debug.request_id.as_deref(), + auth_cf_ray: debug.cf_ray.as_deref(), + auth_error: debug.auth_error.as_deref(), + auth_error_code: debug.auth_error_code.as_deref(), + auth_recovery_followup_success: self + .auth_context + .retry_after_unauthorized + .then_some(error.is_none()), + auth_recovery_followup_status: self + .auth_context + .retry_after_unauthorized + .then_some(status) + .flatten(), + }); } } @@ -1323,10 +1718,40 @@ impl SseTelemetry for ApiTelemetry { } impl WebsocketTelemetry for ApiTelemetry { - fn on_ws_request(&self, duration: Duration, error: Option<&ApiError>) { - let error_message = error.map(std::string::ToString::to_string); - self.session_telemetry - .record_websocket_request(duration, error_message.as_deref()); + fn on_ws_request(&self, duration: Duration, error: Option<&ApiError>, connection_reused: bool) { + let error_message = error.map(telemetry_api_error_message); + let status = error.and_then(api_error_http_status); + let debug = error + .map(extract_response_debug_context_from_api_error) + .unwrap_or_default(); + self.session_telemetry.record_websocket_request( + duration, + error_message.as_deref(), + connection_reused, + ); + emit_feedback_request_tags(&FeedbackRequestTags { + endpoint: self.request_route_telemetry.endpoint, + auth_header_attached: self.auth_context.auth_header_attached, + auth_header_name: self.auth_context.auth_header_name, + auth_mode: self.auth_context.auth_mode, + auth_retry_after_unauthorized: Some(self.auth_context.retry_after_unauthorized), + auth_recovery_mode: self.auth_context.recovery_mode, + auth_recovery_phase: self.auth_context.recovery_phase, + auth_connection_reused: Some(connection_reused), + auth_request_id: debug.request_id.as_deref(), + auth_cf_ray: debug.cf_ray.as_deref(), + auth_error: debug.auth_error.as_deref(), + auth_error_code: debug.auth_error_code.as_deref(), + auth_recovery_followup_success: self + .auth_context + .retry_after_unauthorized + .then_some(error.is_none()), + auth_recovery_followup_status: self + .auth_context + .retry_after_unauthorized + .then_some(status) + .flatten(), + }); } fn on_ws_event( diff --git a/codex-rs/core/src/client_tests.rs b/codex-rs/core/src/client_tests.rs index 138b61ffbb5..441a3486457 100644 --- a/codex-rs/core/src/client_tests.rs +++ b/codex-rs/core/src/client_tests.rs @@ -1,4 +1,7 @@ +use super::AuthRequestTelemetryContext; use super::ModelClient; +use super::PendingUnauthorizedRetry; +use super::UnauthorizedRecoveryExecution; use codex_otel::SessionTelemetry; use codex_protocol::ThreadId; use codex_protocol::openai_models::ModelInfo; @@ -94,3 +97,22 @@ async fn summarize_memories_returns_empty_for_empty_input() { .expect("empty summarize request should succeed"); assert_eq!(output.len(), 0); } + +#[test] +fn auth_request_telemetry_context_tracks_attached_auth_and_retry_phase() { + let auth_context = AuthRequestTelemetryContext::new( + Some(crate::auth::AuthMode::Chatgpt), + &crate::api_bridge::CoreAuthProvider::for_test(Some("access-token"), Some("workspace-123")), + PendingUnauthorizedRetry::from_recovery(UnauthorizedRecoveryExecution { + mode: "managed", + phase: "refresh_token", + }), + ); + + assert_eq!(auth_context.auth_mode, Some("Chatgpt")); + assert!(auth_context.auth_header_attached); + assert_eq!(auth_context.auth_header_name, Some("authorization")); + assert!(auth_context.retry_after_unauthorized); + assert_eq!(auth_context.recovery_mode, Some("managed")); + assert_eq!(auth_context.recovery_phase, Some("refresh_token")); +} diff --git a/codex-rs/core/src/error.rs b/codex-rs/core/src/error.rs index f3bb4dc8e59..e8e86defc27 100644 --- a/codex-rs/core/src/error.rs +++ b/codex-rs/core/src/error.rs @@ -292,6 +292,8 @@ pub struct UnexpectedResponseError { pub url: Option, pub cf_ray: Option, pub request_id: Option, + pub identity_authorization_error: Option, + pub identity_error_code: Option, } const CLOUDFLARE_BLOCKED_MESSAGE: &str = @@ -346,6 +348,12 @@ impl UnexpectedResponseError { if let Some(id) = &self.request_id { message.push_str(&format!(", request id: {id}")); } + if let Some(auth_error) = &self.identity_authorization_error { + message.push_str(&format!(", auth error: {auth_error}")); + } + if let Some(error_code) = &self.identity_error_code { + message.push_str(&format!(", auth error code: {error_code}")); + } Some(message) } @@ -368,6 +376,12 @@ impl std::fmt::Display for UnexpectedResponseError { if let Some(id) = &self.request_id { message.push_str(&format!(", request id: {id}")); } + if let Some(auth_error) = &self.identity_authorization_error { + message.push_str(&format!(", auth error: {auth_error}")); + } + if let Some(error_code) = &self.identity_error_code { + message.push_str(&format!(", auth error code: {error_code}")); + } write!(f, "{message}") } } diff --git a/codex-rs/core/src/error_tests.rs b/codex-rs/core/src/error_tests.rs index fa2bd4a6e15..51cbf42dde5 100644 --- a/codex-rs/core/src/error_tests.rs +++ b/codex-rs/core/src/error_tests.rs @@ -328,6 +328,8 @@ fn unexpected_status_cloudflare_html_is_simplified() { url: Some("http://example.com/blocked".to_string()), cf_ray: Some("ray-id".to_string()), request_id: None, + identity_authorization_error: None, + identity_error_code: None, }; let status = StatusCode::FORBIDDEN.to_string(); let url = "http://example.com/blocked"; @@ -345,6 +347,8 @@ fn unexpected_status_non_html_is_unchanged() { url: Some("http://example.com/plain".to_string()), cf_ray: None, request_id: None, + identity_authorization_error: None, + identity_error_code: None, }; let status = StatusCode::FORBIDDEN.to_string(); let url = "http://example.com/plain"; @@ -363,6 +367,8 @@ fn unexpected_status_prefers_error_message_when_present() { url: Some("https://chatgpt.com/backend-api/codex/responses".to_string()), cf_ray: None, request_id: Some("req-123".to_string()), + identity_authorization_error: None, + identity_error_code: None, }; let status = StatusCode::UNAUTHORIZED.to_string(); assert_eq!( @@ -382,6 +388,8 @@ fn unexpected_status_truncates_long_body_with_ellipsis() { url: Some("http://example.com/long".to_string()), cf_ray: None, request_id: Some("req-long".to_string()), + identity_authorization_error: None, + identity_error_code: None, }; let status = StatusCode::BAD_GATEWAY.to_string(); let expected_body = format!("{}...", "x".repeat(UNEXPECTED_RESPONSE_BODY_MAX_BYTES)); @@ -401,6 +409,8 @@ fn unexpected_status_includes_cf_ray_and_request_id() { url: Some("https://chatgpt.com/backend-api/codex/responses".to_string()), cf_ray: Some("9c81f9f18f2fa49d-LHR".to_string()), request_id: Some("req-xyz".to_string()), + identity_authorization_error: None, + identity_error_code: None, }; let status = StatusCode::UNAUTHORIZED.to_string(); assert_eq!( @@ -411,6 +421,26 @@ fn unexpected_status_includes_cf_ray_and_request_id() { ); } +#[test] +fn unexpected_status_includes_identity_auth_details() { + let err = UnexpectedResponseError { + status: StatusCode::UNAUTHORIZED, + body: "plain text error".to_string(), + url: Some("https://chatgpt.com/backend-api/codex/models".to_string()), + cf_ray: Some("cf-ray-auth-401-test".to_string()), + request_id: Some("req-auth".to_string()), + identity_authorization_error: Some("missing_authorization_header".to_string()), + identity_error_code: Some("token_expired".to_string()), + }; + let status = StatusCode::UNAUTHORIZED.to_string(); + assert_eq!( + err.to_string(), + format!( + "unexpected status {status}: plain text error, url: https://chatgpt.com/backend-api/codex/models, cf-ray: cf-ray-auth-401-test, request id: req-auth, auth error: missing_authorization_header, auth error code: token_expired" + ) + ); +} + #[test] fn usage_limit_reached_includes_hours_and_minutes() { let base = Utc.with_ymd_and_hms(2024, 1, 1, 0, 0, 0).unwrap(); diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index a57e02ecb23..8ebea67eab1 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -86,6 +86,7 @@ pub use model_provider_info::WireApi; pub use model_provider_info::built_in_model_providers; pub use model_provider_info::create_oss_provider_with_base_url; mod event_mapping; +mod response_debug_context; pub mod review_format; pub mod review_prompts; mod seatbelt_permissions; diff --git a/codex-rs/core/src/models_manager/manager.rs b/codex-rs/core/src/models_manager/manager.rs index fed50cb5f65..49eb1d7221b 100644 --- a/codex-rs/core/src/models_manager/manager.rs +++ b/codex-rs/core/src/models_manager/manager.rs @@ -3,6 +3,7 @@ use crate::api_bridge::auth_provider_from_auth; use crate::api_bridge::map_api_error; use crate::auth::AuthManager; use crate::auth::AuthMode; +use crate::auth::CodexAuth; use crate::config::Config; use crate::default_client::build_reqwest_client; use crate::error::CodexErr; @@ -11,8 +12,15 @@ use crate::model_provider_info::ModelProviderInfo; use crate::models_manager::collaboration_mode_presets::CollaborationModesConfig; use crate::models_manager::collaboration_mode_presets::builtin_collaboration_mode_presets; use crate::models_manager::model_info; +use crate::response_debug_context::extract_response_debug_context; +use crate::response_debug_context::telemetry_transport_error_message; +use crate::util::FeedbackRequestTags; +use crate::util::emit_feedback_request_tags; use codex_api::ModelsClient; +use codex_api::RequestTelemetry; use codex_api::ReqwestTransport; +use codex_api::TransportError; +use codex_otel::TelemetryAuthMode; use codex_protocol::config_types::CollaborationModeMask; use codex_protocol::openai_models::ModelInfo; use codex_protocol::openai_models::ModelPreset; @@ -32,6 +40,82 @@ use tracing::instrument; const MODEL_CACHE_FILE: &str = "models_cache.json"; const DEFAULT_MODEL_CACHE_TTL: Duration = Duration::from_secs(300); const MODELS_REFRESH_TIMEOUT: Duration = Duration::from_secs(5); +const MODELS_ENDPOINT: &str = "/models"; +#[derive(Clone)] +struct ModelsRequestTelemetry { + auth_mode: Option, + auth_header_attached: bool, + auth_header_name: Option<&'static str>, +} + +impl RequestTelemetry for ModelsRequestTelemetry { + fn on_request( + &self, + attempt: u64, + status: Option, + error: Option<&TransportError>, + duration: Duration, + ) { + let success = status.is_some_and(|code| code.is_success()) && error.is_none(); + let error_message = error.map(telemetry_transport_error_message); + let response_debug = error + .map(extract_response_debug_context) + .unwrap_or_default(); + let status = status.map(|status| status.as_u16()); + tracing::event!( + target: "codex_otel.log_only", + tracing::Level::INFO, + event.name = "codex.api_request", + duration_ms = %duration.as_millis(), + http.response.status_code = status, + success = success, + error.message = error_message.as_deref(), + attempt = attempt, + endpoint = MODELS_ENDPOINT, + auth.header_attached = self.auth_header_attached, + auth.header_name = self.auth_header_name, + auth.request_id = response_debug.request_id.as_deref(), + auth.cf_ray = response_debug.cf_ray.as_deref(), + auth.error = response_debug.auth_error.as_deref(), + auth.error_code = response_debug.auth_error_code.as_deref(), + auth.mode = self.auth_mode.as_deref(), + ); + tracing::event!( + target: "codex_otel.trace_safe", + tracing::Level::INFO, + event.name = "codex.api_request", + duration_ms = %duration.as_millis(), + http.response.status_code = status, + success = success, + error.message = error_message.as_deref(), + attempt = attempt, + endpoint = MODELS_ENDPOINT, + auth.header_attached = self.auth_header_attached, + auth.header_name = self.auth_header_name, + auth.request_id = response_debug.request_id.as_deref(), + auth.cf_ray = response_debug.cf_ray.as_deref(), + auth.error = response_debug.auth_error.as_deref(), + auth.error_code = response_debug.auth_error_code.as_deref(), + auth.mode = self.auth_mode.as_deref(), + ); + emit_feedback_request_tags(&FeedbackRequestTags { + endpoint: MODELS_ENDPOINT, + auth_header_attached: self.auth_header_attached, + auth_header_name: self.auth_header_name, + auth_mode: self.auth_mode.as_deref(), + auth_retry_after_unauthorized: None, + auth_recovery_mode: None, + auth_recovery_phase: None, + auth_connection_reused: None, + auth_request_id: response_debug.request_id.as_deref(), + auth_cf_ray: response_debug.cf_ray.as_deref(), + auth_error: response_debug.auth_error.as_deref(), + auth_error_code: response_debug.auth_error_code.as_deref(), + auth_recovery_followup_success: None, + auth_recovery_followup_status: None, + }); + } +} /// Strategy for refreshing available models. #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -313,11 +397,17 @@ impl ModelsManager { let _timer = codex_otel::start_global_timer("codex.remote_models.fetch_update.duration_ms", &[]); let auth = self.auth_manager.auth().await; - let auth_mode = self.auth_manager.auth_mode(); + let auth_mode = auth.as_ref().map(CodexAuth::auth_mode); let api_provider = self.provider.to_api_provider(auth_mode)?; let api_auth = auth_provider_from_auth(auth.clone(), &self.provider)?; let transport = ReqwestTransport::new(build_reqwest_client()); - let client = ModelsClient::new(transport, api_provider, api_auth); + let request_telemetry: Arc = Arc::new(ModelsRequestTelemetry { + auth_mode: auth_mode.map(|mode| TelemetryAuthMode::from(mode).to_string()), + auth_header_attached: api_auth.auth_header_attached(), + auth_header_name: api_auth.auth_header_name(), + }); + let client = ModelsClient::new(transport, api_provider, api_auth) + .with_telemetry(Some(request_telemetry)); let client_version = crate::models_manager::client_version_to_whole(); let (models, etag) = timeout( diff --git a/codex-rs/core/src/response_debug_context.rs b/codex-rs/core/src/response_debug_context.rs new file mode 100644 index 00000000000..bc7eab172bb --- /dev/null +++ b/codex-rs/core/src/response_debug_context.rs @@ -0,0 +1,167 @@ +use base64::Engine; +use codex_api::TransportError; +use codex_api::error::ApiError; + +const REQUEST_ID_HEADER: &str = "x-request-id"; +const OAI_REQUEST_ID_HEADER: &str = "x-oai-request-id"; +const CF_RAY_HEADER: &str = "cf-ray"; +const AUTH_ERROR_HEADER: &str = "x-openai-authorization-error"; +const X_ERROR_JSON_HEADER: &str = "x-error-json"; + +#[derive(Debug, Default, Clone, PartialEq, Eq)] +pub(crate) struct ResponseDebugContext { + pub(crate) request_id: Option, + pub(crate) cf_ray: Option, + pub(crate) auth_error: Option, + pub(crate) auth_error_code: Option, +} + +pub(crate) fn extract_response_debug_context(transport: &TransportError) -> ResponseDebugContext { + let mut context = ResponseDebugContext::default(); + + let TransportError::Http { + headers, body: _, .. + } = transport + else { + return context; + }; + + let extract_header = |name: &str| { + headers + .as_ref() + .and_then(|headers| headers.get(name)) + .and_then(|value| value.to_str().ok()) + .map(str::to_string) + }; + + context.request_id = + extract_header(REQUEST_ID_HEADER).or_else(|| extract_header(OAI_REQUEST_ID_HEADER)); + context.cf_ray = extract_header(CF_RAY_HEADER); + context.auth_error = extract_header(AUTH_ERROR_HEADER); + context.auth_error_code = extract_header(X_ERROR_JSON_HEADER).and_then(|encoded| { + let decoded = base64::engine::general_purpose::STANDARD + .decode(encoded) + .ok()?; + let parsed = serde_json::from_slice::(&decoded).ok()?; + parsed + .get("error") + .and_then(|error| error.get("code")) + .and_then(serde_json::Value::as_str) + .map(str::to_string) + }); + + context +} + +pub(crate) fn extract_response_debug_context_from_api_error( + error: &ApiError, +) -> ResponseDebugContext { + match error { + ApiError::Transport(transport) => extract_response_debug_context(transport), + _ => ResponseDebugContext::default(), + } +} + +pub(crate) fn telemetry_transport_error_message(error: &TransportError) -> String { + match error { + TransportError::Http { status, .. } => format!("http {}", status.as_u16()), + TransportError::RetryLimit => "retry limit reached".to_string(), + TransportError::Timeout => "timeout".to_string(), + TransportError::Network(err) => err.to_string(), + TransportError::Build(err) => err.to_string(), + } +} + +pub(crate) fn telemetry_api_error_message(error: &ApiError) -> String { + match error { + ApiError::Transport(transport) => telemetry_transport_error_message(transport), + ApiError::Api { status, .. } => format!("api error {}", status.as_u16()), + ApiError::Stream(err) => err.to_string(), + ApiError::ContextWindowExceeded => "context window exceeded".to_string(), + ApiError::QuotaExceeded => "quota exceeded".to_string(), + ApiError::UsageNotIncluded => "usage not included".to_string(), + ApiError::Retryable { .. } => "retryable error".to_string(), + ApiError::RateLimit(_) => "rate limit".to_string(), + ApiError::InvalidRequest { .. } => "invalid request".to_string(), + ApiError::ServerOverloaded => "server overloaded".to_string(), + } +} + +#[cfg(test)] +mod tests { + use super::ResponseDebugContext; + use super::extract_response_debug_context; + use super::telemetry_api_error_message; + use super::telemetry_transport_error_message; + use codex_api::TransportError; + use codex_api::error::ApiError; + use http::HeaderMap; + use http::HeaderValue; + use http::StatusCode; + use pretty_assertions::assert_eq; + + #[test] + fn extract_response_debug_context_decodes_identity_headers() { + let mut headers = HeaderMap::new(); + headers.insert("x-oai-request-id", HeaderValue::from_static("req-auth")); + headers.insert("cf-ray", HeaderValue::from_static("ray-auth")); + headers.insert( + "x-openai-authorization-error", + HeaderValue::from_static("missing_authorization_header"), + ); + headers.insert( + "x-error-json", + HeaderValue::from_static("eyJlcnJvciI6eyJjb2RlIjoidG9rZW5fZXhwaXJlZCJ9fQ=="), + ); + + let context = extract_response_debug_context(&TransportError::Http { + status: StatusCode::UNAUTHORIZED, + url: Some("https://chatgpt.com/backend-api/codex/models".to_string()), + headers: Some(headers), + body: Some(r#"{"error":{"message":"plain text error"},"status":401}"#.to_string()), + }); + + assert_eq!( + context, + ResponseDebugContext { + request_id: Some("req-auth".to_string()), + cf_ray: Some("ray-auth".to_string()), + auth_error: Some("missing_authorization_header".to_string()), + auth_error_code: Some("token_expired".to_string()), + } + ); + } + + #[test] + fn telemetry_error_messages_omit_http_bodies() { + let transport = TransportError::Http { + status: StatusCode::UNAUTHORIZED, + url: Some("https://chatgpt.com/backend-api/codex/responses".to_string()), + headers: None, + body: Some(r#"{"error":{"message":"secret token leaked"}}"#.to_string()), + }; + + assert_eq!(telemetry_transport_error_message(&transport), "http 401"); + assert_eq!( + telemetry_api_error_message(&ApiError::Transport(transport)), + "http 401" + ); + } + + #[test] + fn telemetry_error_messages_preserve_non_http_details() { + let network = TransportError::Network("dns lookup failed".to_string()); + let build = TransportError::Build("invalid header value".to_string()); + let stream = ApiError::Stream("socket closed".to_string()); + + assert_eq!( + telemetry_transport_error_message(&network), + "dns lookup failed" + ); + assert_eq!( + telemetry_transport_error_message(&build), + "invalid header value" + ); + assert_eq!(telemetry_api_error_message(&stream), "socket closed"); + } +} diff --git a/codex-rs/core/src/util.rs b/codex-rs/core/src/util.rs index 62e872ae6cf..43c6d85222b 100644 --- a/codex-rs/core/src/util.rs +++ b/codex-rs/core/src/util.rs @@ -37,6 +37,111 @@ macro_rules! feedback_tags { }; } +pub(crate) struct FeedbackRequestTags<'a> { + pub endpoint: &'a str, + pub auth_header_attached: bool, + pub auth_header_name: Option<&'a str>, + pub auth_mode: Option<&'a str>, + pub auth_retry_after_unauthorized: Option, + pub auth_recovery_mode: Option<&'a str>, + pub auth_recovery_phase: Option<&'a str>, + pub auth_connection_reused: Option, + pub auth_request_id: Option<&'a str>, + pub auth_cf_ray: Option<&'a str>, + pub auth_error: Option<&'a str>, + pub auth_error_code: Option<&'a str>, + pub auth_recovery_followup_success: Option, + pub auth_recovery_followup_status: Option, +} + +struct Auth401FeedbackSnapshot<'a> { + request_id: &'a str, + cf_ray: &'a str, + error: &'a str, + error_code: &'a str, +} + +impl<'a> Auth401FeedbackSnapshot<'a> { + fn from_optional_fields( + request_id: Option<&'a str>, + cf_ray: Option<&'a str>, + error: Option<&'a str>, + error_code: Option<&'a str>, + ) -> Self { + Self { + request_id: request_id.unwrap_or(""), + cf_ray: cf_ray.unwrap_or(""), + error: error.unwrap_or(""), + error_code: error_code.unwrap_or(""), + } + } +} + +pub(crate) fn emit_feedback_request_tags(tags: &FeedbackRequestTags<'_>) { + let auth_header_name = tags.auth_header_name.unwrap_or(""); + let auth_mode = tags.auth_mode.unwrap_or(""); + let auth_retry_after_unauthorized = tags + .auth_retry_after_unauthorized + .map_or_else(String::new, |value| value.to_string()); + let auth_recovery_mode = tags.auth_recovery_mode.unwrap_or(""); + let auth_recovery_phase = tags.auth_recovery_phase.unwrap_or(""); + let auth_connection_reused = tags + .auth_connection_reused + .map_or_else(String::new, |value| value.to_string()); + let auth_request_id = tags.auth_request_id.unwrap_or(""); + let auth_cf_ray = tags.auth_cf_ray.unwrap_or(""); + let auth_error = tags.auth_error.unwrap_or(""); + let auth_error_code = tags.auth_error_code.unwrap_or(""); + let auth_recovery_followup_success = tags + .auth_recovery_followup_success + .map_or_else(String::new, |value| value.to_string()); + let auth_recovery_followup_status = tags + .auth_recovery_followup_status + .map_or_else(String::new, |value| value.to_string()); + feedback_tags!( + endpoint = tags.endpoint, + auth_header_attached = tags.auth_header_attached, + auth_header_name = auth_header_name, + auth_mode = auth_mode, + auth_retry_after_unauthorized = auth_retry_after_unauthorized, + auth_recovery_mode = auth_recovery_mode, + auth_recovery_phase = auth_recovery_phase, + auth_connection_reused = auth_connection_reused, + auth_request_id = auth_request_id, + auth_cf_ray = auth_cf_ray, + auth_error = auth_error, + auth_error_code = auth_error_code, + auth_recovery_followup_success = auth_recovery_followup_success, + auth_recovery_followup_status = auth_recovery_followup_status + ); +} + +pub(crate) fn emit_feedback_auth_recovery_tags( + auth_recovery_mode: &str, + auth_recovery_phase: &str, + auth_recovery_outcome: &str, + auth_request_id: Option<&str>, + auth_cf_ray: Option<&str>, + auth_error: Option<&str>, + auth_error_code: Option<&str>, +) { + let auth_401 = Auth401FeedbackSnapshot::from_optional_fields( + auth_request_id, + auth_cf_ray, + auth_error, + auth_error_code, + ); + feedback_tags!( + auth_recovery_mode = auth_recovery_mode, + auth_recovery_phase = auth_recovery_phase, + auth_recovery_outcome = auth_recovery_outcome, + auth_401_request_id = auth_401.request_id, + auth_401_cf_ray = auth_401.cf_ray, + auth_401_error = auth_401.error, + auth_401_error_code = auth_401.error_code + ); +} + pub fn backoff(attempt: u64) -> Duration { let exp = BACKOFF_FACTOR.powi(attempt.saturating_sub(1) as i32); let base = (INITIAL_DELAY_MS as f64 * exp) as u64; diff --git a/codex-rs/core/src/util_tests.rs b/codex-rs/core/src/util_tests.rs index dd5956bf615..9df8c67e897 100644 --- a/codex-rs/core/src/util_tests.rs +++ b/codex-rs/core/src/util_tests.rs @@ -1,4 +1,15 @@ use super::*; +use std::collections::BTreeMap; +use std::sync::Arc; +use std::sync::Mutex; +use tracing::Event; +use tracing::Subscriber; +use tracing::field::Visit; +use tracing_subscriber::Layer; +use tracing_subscriber::layer::Context; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::registry::LookupSpan; +use tracing_subscriber::util::SubscriberInitExt; #[test] fn test_try_parse_error_message() { @@ -32,6 +43,298 @@ fn feedback_tags_macro_compiles() { feedback_tags!(model = "gpt-5", cached = true, debug_only = OnlyDebug); } +#[derive(Default)] +struct TagCollectorVisitor { + tags: BTreeMap, +} + +impl Visit for TagCollectorVisitor { + fn record_bool(&mut self, field: &tracing::field::Field, value: bool) { + self.tags + .insert(field.name().to_string(), value.to_string()); + } + + fn record_str(&mut self, field: &tracing::field::Field, value: &str) { + self.tags + .insert(field.name().to_string(), value.to_string()); + } + + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + self.tags + .insert(field.name().to_string(), format!("{value:?}")); + } +} + +#[derive(Clone)] +struct TagCollectorLayer { + tags: Arc>>, +} + +impl Layer for TagCollectorLayer +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + fn on_event(&self, event: &Event<'_>, _ctx: Context<'_, S>) { + if event.metadata().target() != "feedback_tags" { + return; + } + let mut visitor = TagCollectorVisitor::default(); + event.record(&mut visitor); + self.tags.lock().unwrap().extend(visitor.tags); + } +} + +#[test] +fn emit_feedback_request_tags_records_sentry_feedback_fields() { + let tags = Arc::new(Mutex::new(BTreeMap::new())); + let _guard = tracing_subscriber::registry() + .with(TagCollectorLayer { tags: tags.clone() }) + .set_default(); + + emit_feedback_request_tags(&FeedbackRequestTags { + endpoint: "/responses", + auth_header_attached: true, + auth_header_name: Some("authorization"), + auth_mode: Some("chatgpt"), + auth_retry_after_unauthorized: Some(false), + auth_recovery_mode: Some("managed"), + auth_recovery_phase: Some("refresh_token"), + auth_connection_reused: Some(true), + auth_request_id: Some("req-123"), + auth_cf_ray: Some("ray-123"), + auth_error: Some("missing_authorization_header"), + auth_error_code: Some("token_expired"), + auth_recovery_followup_success: Some(true), + auth_recovery_followup_status: Some(200), + }); + + let tags = tags.lock().unwrap().clone(); + assert_eq!( + tags.get("endpoint").map(String::as_str), + Some("\"/responses\"") + ); + assert_eq!( + tags.get("auth_header_attached").map(String::as_str), + Some("true") + ); + assert_eq!( + tags.get("auth_header_name").map(String::as_str), + Some("\"authorization\"") + ); + assert_eq!( + tags.get("auth_request_id").map(String::as_str), + Some("\"req-123\"") + ); + assert_eq!( + tags.get("auth_error_code").map(String::as_str), + Some("\"token_expired\"") + ); + assert_eq!( + tags.get("auth_recovery_followup_success") + .map(String::as_str), + Some("\"true\"") + ); + assert_eq!( + tags.get("auth_recovery_followup_status") + .map(String::as_str), + Some("\"200\"") + ); +} + +#[test] +fn emit_feedback_auth_recovery_tags_preserves_401_specific_fields() { + let tags = Arc::new(Mutex::new(BTreeMap::new())); + let _guard = tracing_subscriber::registry() + .with(TagCollectorLayer { tags: tags.clone() }) + .set_default(); + + emit_feedback_auth_recovery_tags( + "managed", + "refresh_token", + "recovery_succeeded", + Some("req-401"), + Some("ray-401"), + Some("missing_authorization_header"), + Some("token_expired"), + ); + + let tags = tags.lock().unwrap().clone(); + assert_eq!( + tags.get("auth_401_request_id").map(String::as_str), + Some("\"req-401\"") + ); + assert_eq!( + tags.get("auth_401_cf_ray").map(String::as_str), + Some("\"ray-401\"") + ); + assert_eq!( + tags.get("auth_401_error").map(String::as_str), + Some("\"missing_authorization_header\"") + ); + assert_eq!( + tags.get("auth_401_error_code").map(String::as_str), + Some("\"token_expired\"") + ); +} + +#[test] +fn emit_feedback_auth_recovery_tags_clears_stale_401_fields() { + let tags = Arc::new(Mutex::new(BTreeMap::new())); + let _guard = tracing_subscriber::registry() + .with(TagCollectorLayer { tags: tags.clone() }) + .set_default(); + + emit_feedback_auth_recovery_tags( + "managed", + "refresh_token", + "recovery_failed_transient", + Some("req-401-a"), + Some("ray-401-a"), + Some("missing_authorization_header"), + Some("token_expired"), + ); + emit_feedback_auth_recovery_tags( + "managed", + "done", + "recovery_not_run", + Some("req-401-b"), + None, + None, + None, + ); + + let tags = tags.lock().unwrap().clone(); + assert_eq!( + tags.get("auth_401_request_id").map(String::as_str), + Some("\"req-401-b\"") + ); + assert_eq!( + tags.get("auth_401_cf_ray").map(String::as_str), + Some("\"\"") + ); + assert_eq!(tags.get("auth_401_error").map(String::as_str), Some("\"\"")); + assert_eq!( + tags.get("auth_401_error_code").map(String::as_str), + Some("\"\"") + ); +} + +#[test] +fn emit_feedback_request_tags_preserves_latest_auth_fields_after_unauthorized() { + let tags = Arc::new(Mutex::new(BTreeMap::new())); + let _guard = tracing_subscriber::registry() + .with(TagCollectorLayer { tags: tags.clone() }) + .set_default(); + + emit_feedback_request_tags(&FeedbackRequestTags { + endpoint: "/responses", + auth_header_attached: true, + auth_header_name: Some("authorization"), + auth_mode: Some("chatgpt"), + auth_retry_after_unauthorized: Some(true), + auth_recovery_mode: Some("managed"), + auth_recovery_phase: Some("refresh_token"), + auth_connection_reused: None, + auth_request_id: Some("req-123"), + auth_cf_ray: Some("ray-123"), + auth_error: Some("missing_authorization_header"), + auth_error_code: Some("token_expired"), + auth_recovery_followup_success: Some(false), + auth_recovery_followup_status: Some(401), + }); + + let tags = tags.lock().unwrap().clone(); + assert_eq!( + tags.get("auth_request_id").map(String::as_str), + Some("\"req-123\"") + ); + assert_eq!( + tags.get("auth_cf_ray").map(String::as_str), + Some("\"ray-123\"") + ); + assert_eq!( + tags.get("auth_error").map(String::as_str), + Some("\"missing_authorization_header\"") + ); + assert_eq!( + tags.get("auth_error_code").map(String::as_str), + Some("\"token_expired\"") + ); + assert_eq!( + tags.get("auth_recovery_followup_success") + .map(String::as_str), + Some("\"false\"") + ); +} + +#[test] +fn emit_feedback_request_tags_clears_stale_latest_auth_fields() { + let tags = Arc::new(Mutex::new(BTreeMap::new())); + let _guard = tracing_subscriber::registry() + .with(TagCollectorLayer { tags: tags.clone() }) + .set_default(); + + emit_feedback_request_tags(&FeedbackRequestTags { + endpoint: "/responses", + auth_header_attached: true, + auth_header_name: Some("authorization"), + auth_mode: Some("chatgpt"), + auth_retry_after_unauthorized: Some(false), + auth_recovery_mode: Some("managed"), + auth_recovery_phase: Some("refresh_token"), + auth_connection_reused: Some(true), + auth_request_id: Some("req-123"), + auth_cf_ray: Some("ray-123"), + auth_error: Some("missing_authorization_header"), + auth_error_code: Some("token_expired"), + auth_recovery_followup_success: Some(true), + auth_recovery_followup_status: Some(200), + }); + emit_feedback_request_tags(&FeedbackRequestTags { + endpoint: "/responses", + auth_header_attached: true, + auth_header_name: None, + auth_mode: None, + auth_retry_after_unauthorized: None, + auth_recovery_mode: None, + auth_recovery_phase: None, + auth_connection_reused: None, + auth_request_id: None, + auth_cf_ray: None, + auth_error: None, + auth_error_code: None, + auth_recovery_followup_success: None, + auth_recovery_followup_status: None, + }); + + let tags = tags.lock().unwrap().clone(); + assert_eq!( + tags.get("auth_header_name").map(String::as_str), + Some("\"\"") + ); + assert_eq!(tags.get("auth_mode").map(String::as_str), Some("\"\"")); + assert_eq!( + tags.get("auth_request_id").map(String::as_str), + Some("\"\"") + ); + assert_eq!(tags.get("auth_cf_ray").map(String::as_str), Some("\"\"")); + assert_eq!(tags.get("auth_error").map(String::as_str), Some("\"\"")); + assert_eq!( + tags.get("auth_error_code").map(String::as_str), + Some("\"\"") + ); + assert_eq!( + tags.get("auth_recovery_followup_success") + .map(String::as_str), + Some("\"\"") + ); + assert_eq!( + tags.get("auth_recovery_followup_status") + .map(String::as_str), + Some("\"\"") + ); +} + #[test] fn normalize_thread_name_trims_and_rejects_empty() { assert_eq!(normalize_thread_name(" "), None); diff --git a/codex-rs/otel/src/events/session_telemetry.rs b/codex-rs/otel/src/events/session_telemetry.rs index 327416093a0..c8520ea6c9c 100644 --- a/codex-rs/otel/src/events/session_telemetry.rs +++ b/codex-rs/otel/src/events/session_telemetry.rs @@ -340,17 +340,43 @@ impl SessionTelemetry { Ok(response) => (Some(response.status().as_u16()), None), Err(error) => (error.status().map(|s| s.as_u16()), Some(error.to_string())), }; - self.record_api_request(attempt, status, error.as_deref(), duration); + self.record_api_request( + attempt, + status, + error.as_deref(), + duration, + false, + None, + false, + None, + None, + "unknown", + None, + None, + None, + None, + ); response } + #[allow(clippy::too_many_arguments)] pub fn record_api_request( &self, attempt: u64, status: Option, error: Option<&str>, duration: Duration, + auth_header_attached: bool, + auth_header_name: Option<&str>, + retry_after_unauthorized: bool, + recovery_mode: Option<&str>, + recovery_phase: Option<&str>, + endpoint: &str, + request_id: Option<&str>, + cf_ray: Option<&str>, + auth_error: Option<&str>, + auth_error_code: Option<&str>, ) { let success = status.is_some_and(|code| (200..=299).contains(&code)) && error.is_none(); let success_str = if success { "true" } else { "false" }; @@ -375,13 +401,76 @@ impl SessionTelemetry { http.response.status_code = status, error.message = error, attempt = attempt, + auth.header_attached = auth_header_attached, + auth.header_name = auth_header_name, + auth.retry_after_unauthorized = retry_after_unauthorized, + auth.recovery_mode = recovery_mode, + auth.recovery_phase = recovery_phase, + endpoint = endpoint, + auth.request_id = request_id, + auth.cf_ray = cf_ray, + auth.error = auth_error, + auth.error_code = auth_error_code, }, log: {}, trace: {}, ); } - pub fn record_websocket_request(&self, duration: Duration, error: Option<&str>) { + #[allow(clippy::too_many_arguments)] + pub fn record_websocket_connect( + &self, + duration: Duration, + status: Option, + error: Option<&str>, + auth_header_attached: bool, + auth_header_name: Option<&str>, + retry_after_unauthorized: bool, + recovery_mode: Option<&str>, + recovery_phase: Option<&str>, + endpoint: &str, + connection_reused: bool, + request_id: Option<&str>, + cf_ray: Option<&str>, + auth_error: Option<&str>, + auth_error_code: Option<&str>, + ) { + let success = error.is_none() + && status + .map(|code| (200..=299).contains(&code)) + .unwrap_or(true); + let success_str = if success { "true" } else { "false" }; + log_and_trace_event!( + self, + common: { + event.name = "codex.websocket_connect", + duration_ms = %duration.as_millis(), + http.response.status_code = status, + success = success_str, + error.message = error, + auth.header_attached = auth_header_attached, + auth.header_name = auth_header_name, + auth.retry_after_unauthorized = retry_after_unauthorized, + auth.recovery_mode = recovery_mode, + auth.recovery_phase = recovery_phase, + endpoint = endpoint, + auth.connection_reused = connection_reused, + auth.request_id = request_id, + auth.cf_ray = cf_ray, + auth.error = auth_error, + auth.error_code = auth_error_code, + }, + log: {}, + trace: {}, + ); + } + + pub fn record_websocket_request( + &self, + duration: Duration, + error: Option<&str>, + connection_reused: bool, + ) { let success_str = if error.is_none() { "true" } else { "false" }; self.counter( WEBSOCKET_REQUEST_COUNT_METRIC, @@ -400,6 +489,39 @@ impl SessionTelemetry { duration_ms = %duration.as_millis(), success = success_str, error.message = error, + auth.connection_reused = connection_reused, + }, + log: {}, + trace: {}, + ); + } + + #[allow(clippy::too_many_arguments)] + pub fn record_auth_recovery( + &self, + mode: &str, + step: &str, + outcome: &str, + request_id: Option<&str>, + cf_ray: Option<&str>, + auth_error: Option<&str>, + auth_error_code: Option<&str>, + recovery_reason: Option<&str>, + auth_state_changed: Option, + ) { + log_and_trace_event!( + self, + common: { + event.name = "codex.auth_recovery", + auth.mode = mode, + auth.step = step, + auth.outcome = outcome, + auth.request_id = request_id, + auth.cf_ray = cf_ray, + auth.error = auth_error, + auth.error_code = auth_error_code, + auth.recovery_reason = recovery_reason, + auth.state_changed = auth_state_changed, }, log: {}, trace: {}, diff --git a/codex-rs/otel/tests/suite/otel_export_routing_policy.rs b/codex-rs/otel/tests/suite/otel_export_routing_policy.rs index 317c6a691c3..df5d876b4b5 100644 --- a/codex-rs/otel/tests/suite/otel_export_routing_policy.rs +++ b/codex-rs/otel/tests/suite/otel_export_routing_policy.rs @@ -297,3 +297,462 @@ fn otel_export_routing_policy_routes_tool_result_log_and_trace_events() { assert!(!tool_trace_attrs.contains_key("mcp_server")); assert!(!tool_trace_attrs.contains_key("mcp_server_origin")); } + +#[test] +fn otel_export_routing_policy_routes_auth_recovery_log_and_trace_events() { + let log_exporter = InMemoryLogExporter::default(); + let logger_provider = SdkLoggerProvider::builder() + .with_simple_exporter(log_exporter.clone()) + .build(); + let span_exporter = InMemorySpanExporter::default(); + let tracer_provider = SdkTracerProvider::builder() + .with_simple_exporter(span_exporter.clone()) + .build(); + let tracer = tracer_provider.tracer("sink-split-test"); + + let subscriber = tracing_subscriber::registry() + .with( + opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new( + &logger_provider, + ) + .with_filter(filter_fn(OtelProvider::log_export_filter)), + ) + .with( + tracing_opentelemetry::layer() + .with_tracer(tracer) + .with_filter(filter_fn(OtelProvider::trace_export_filter)), + ); + + tracing::subscriber::with_default(subscriber, || { + tracing::callsite::rebuild_interest_cache(); + let manager = SessionTelemetry::new( + ThreadId::new(), + "gpt-5.1", + "gpt-5.1", + Some("account-id".to_string()), + Some("engineer@example.com".to_string()), + Some(TelemetryAuthMode::Chatgpt), + "codex_exec".to_string(), + true, + "tty".to_string(), + SessionSource::Cli, + ); + let root_span = tracing::info_span!("root"); + let _root_guard = root_span.enter(); + manager.record_auth_recovery( + "managed", + "reload", + "recovery_succeeded", + Some("req-401"), + Some("ray-401"), + Some("missing_authorization_header"), + Some("token_expired"), + None, + Some(true), + ); + }); + + logger_provider.force_flush().expect("flush logs"); + tracer_provider.force_flush().expect("flush traces"); + + let logs = log_exporter.get_emitted_logs().expect("log export"); + let recovery_log = find_log_by_event_name(&logs, "codex.auth_recovery"); + let recovery_log_attrs = log_attributes(&recovery_log.record); + assert_eq!( + recovery_log_attrs.get("auth.mode").map(String::as_str), + Some("managed") + ); + assert_eq!( + recovery_log_attrs.get("auth.step").map(String::as_str), + Some("reload") + ); + assert_eq!( + recovery_log_attrs.get("auth.outcome").map(String::as_str), + Some("recovery_succeeded") + ); + assert_eq!( + recovery_log_attrs + .get("auth.request_id") + .map(String::as_str), + Some("req-401") + ); + assert_eq!( + recovery_log_attrs.get("auth.cf_ray").map(String::as_str), + Some("ray-401") + ); + assert_eq!( + recovery_log_attrs.get("auth.error").map(String::as_str), + Some("missing_authorization_header") + ); + assert_eq!( + recovery_log_attrs + .get("auth.error_code") + .map(String::as_str), + Some("token_expired") + ); + assert_eq!( + recovery_log_attrs + .get("auth.state_changed") + .map(String::as_str), + Some("true") + ); + + let spans = span_exporter.get_finished_spans().expect("span export"); + assert_eq!(spans.len(), 1); + let span_events = &spans[0].events.events; + assert_eq!(span_events.len(), 1); + + let recovery_trace_event = find_span_event_by_name_attr(span_events, "codex.auth_recovery"); + let recovery_trace_attrs = span_event_attributes(recovery_trace_event); + assert_eq!( + recovery_trace_attrs.get("auth.mode").map(String::as_str), + Some("managed") + ); + assert_eq!( + recovery_trace_attrs.get("auth.step").map(String::as_str), + Some("reload") + ); + assert_eq!( + recovery_trace_attrs.get("auth.outcome").map(String::as_str), + Some("recovery_succeeded") + ); + assert_eq!( + recovery_trace_attrs + .get("auth.request_id") + .map(String::as_str), + Some("req-401") + ); + assert_eq!( + recovery_trace_attrs.get("auth.cf_ray").map(String::as_str), + Some("ray-401") + ); + assert_eq!( + recovery_trace_attrs.get("auth.error").map(String::as_str), + Some("missing_authorization_header") + ); + assert_eq!( + recovery_trace_attrs + .get("auth.error_code") + .map(String::as_str), + Some("token_expired") + ); + assert_eq!( + recovery_trace_attrs + .get("auth.state_changed") + .map(String::as_str), + Some("true") + ); +} + +#[test] +fn otel_export_routing_policy_routes_api_request_auth_observability() { + let log_exporter = InMemoryLogExporter::default(); + let logger_provider = SdkLoggerProvider::builder() + .with_simple_exporter(log_exporter.clone()) + .build(); + let span_exporter = InMemorySpanExporter::default(); + let tracer_provider = SdkTracerProvider::builder() + .with_simple_exporter(span_exporter.clone()) + .build(); + let tracer = tracer_provider.tracer("sink-split-test"); + + let subscriber = tracing_subscriber::registry() + .with( + opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new( + &logger_provider, + ) + .with_filter(filter_fn(OtelProvider::log_export_filter)), + ) + .with( + tracing_opentelemetry::layer() + .with_tracer(tracer) + .with_filter(filter_fn(OtelProvider::trace_export_filter)), + ); + + tracing::subscriber::with_default(subscriber, || { + tracing::callsite::rebuild_interest_cache(); + let manager = SessionTelemetry::new( + ThreadId::new(), + "gpt-5.1", + "gpt-5.1", + Some("account-id".to_string()), + Some("engineer@example.com".to_string()), + Some(TelemetryAuthMode::Chatgpt), + "codex_exec".to_string(), + true, + "tty".to_string(), + SessionSource::Cli, + ); + let root_span = tracing::info_span!("root"); + let _root_guard = root_span.enter(); + manager.record_api_request( + 1, + Some(401), + Some("http 401"), + std::time::Duration::from_millis(42), + true, + Some("authorization"), + true, + Some("managed"), + Some("refresh_token"), + "/responses", + Some("req-401"), + Some("ray-401"), + Some("missing_authorization_header"), + Some("token_expired"), + ); + }); + + logger_provider.force_flush().expect("flush logs"); + tracer_provider.force_flush().expect("flush traces"); + + let logs = log_exporter.get_emitted_logs().expect("log export"); + let request_log = find_log_by_event_name(&logs, "codex.api_request"); + let request_log_attrs = log_attributes(&request_log.record); + assert_eq!( + request_log_attrs + .get("auth.header_attached") + .map(String::as_str), + Some("true") + ); + assert_eq!( + request_log_attrs + .get("auth.header_name") + .map(String::as_str), + Some("authorization") + ); + assert_eq!( + request_log_attrs + .get("auth.retry_after_unauthorized") + .map(String::as_str), + Some("true") + ); + assert_eq!( + request_log_attrs + .get("auth.recovery_mode") + .map(String::as_str), + Some("managed") + ); + assert_eq!( + request_log_attrs + .get("auth.recovery_phase") + .map(String::as_str), + Some("refresh_token") + ); + assert_eq!( + request_log_attrs.get("endpoint").map(String::as_str), + Some("/responses") + ); + assert_eq!( + request_log_attrs.get("auth.error").map(String::as_str), + Some("missing_authorization_header") + ); + + let spans = span_exporter.get_finished_spans().expect("span export"); + let request_trace_event = + find_span_event_by_name_attr(&spans[0].events.events, "codex.api_request"); + let request_trace_attrs = span_event_attributes(request_trace_event); + assert_eq!( + request_trace_attrs + .get("auth.header_attached") + .map(String::as_str), + Some("true") + ); + assert_eq!( + request_trace_attrs + .get("auth.header_name") + .map(String::as_str), + Some("authorization") + ); + assert_eq!( + request_trace_attrs + .get("auth.retry_after_unauthorized") + .map(String::as_str), + Some("true") + ); + assert_eq!( + request_trace_attrs.get("endpoint").map(String::as_str), + Some("/responses") + ); +} + +#[test] +fn otel_export_routing_policy_routes_websocket_connect_auth_observability() { + let log_exporter = InMemoryLogExporter::default(); + let logger_provider = SdkLoggerProvider::builder() + .with_simple_exporter(log_exporter.clone()) + .build(); + let span_exporter = InMemorySpanExporter::default(); + let tracer_provider = SdkTracerProvider::builder() + .with_simple_exporter(span_exporter.clone()) + .build(); + let tracer = tracer_provider.tracer("sink-split-test"); + + let subscriber = tracing_subscriber::registry() + .with( + opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new( + &logger_provider, + ) + .with_filter(filter_fn(OtelProvider::log_export_filter)), + ) + .with( + tracing_opentelemetry::layer() + .with_tracer(tracer) + .with_filter(filter_fn(OtelProvider::trace_export_filter)), + ); + + tracing::subscriber::with_default(subscriber, || { + tracing::callsite::rebuild_interest_cache(); + let manager = SessionTelemetry::new( + ThreadId::new(), + "gpt-5.1", + "gpt-5.1", + Some("account-id".to_string()), + Some("engineer@example.com".to_string()), + Some(TelemetryAuthMode::Chatgpt), + "codex_exec".to_string(), + true, + "tty".to_string(), + SessionSource::Cli, + ); + let root_span = tracing::info_span!("root"); + let _root_guard = root_span.enter(); + manager.record_websocket_connect( + std::time::Duration::from_millis(17), + Some(401), + Some("http 401"), + true, + Some("authorization"), + true, + Some("managed"), + Some("reload"), + "/responses", + false, + Some("req-ws-401"), + Some("ray-ws-401"), + Some("missing_authorization_header"), + Some("token_expired"), + ); + }); + + logger_provider.force_flush().expect("flush logs"); + tracer_provider.force_flush().expect("flush traces"); + + let logs = log_exporter.get_emitted_logs().expect("log export"); + let connect_log = find_log_by_event_name(&logs, "codex.websocket_connect"); + let connect_log_attrs = log_attributes(&connect_log.record); + assert_eq!( + connect_log_attrs + .get("auth.header_attached") + .map(String::as_str), + Some("true") + ); + assert_eq!( + connect_log_attrs + .get("auth.header_name") + .map(String::as_str), + Some("authorization") + ); + assert_eq!( + connect_log_attrs.get("auth.error").map(String::as_str), + Some("missing_authorization_header") + ); + assert_eq!( + connect_log_attrs.get("endpoint").map(String::as_str), + Some("/responses") + ); + assert_eq!( + connect_log_attrs + .get("auth.connection_reused") + .map(String::as_str), + Some("false") + ); + + let spans = span_exporter.get_finished_spans().expect("span export"); + let connect_trace_event = + find_span_event_by_name_attr(&spans[0].events.events, "codex.websocket_connect"); + let connect_trace_attrs = span_event_attributes(connect_trace_event); + assert_eq!( + connect_trace_attrs + .get("auth.recovery_phase") + .map(String::as_str), + Some("reload") + ); +} + +#[test] +fn otel_export_routing_policy_routes_websocket_request_transport_observability() { + let log_exporter = InMemoryLogExporter::default(); + let logger_provider = SdkLoggerProvider::builder() + .with_simple_exporter(log_exporter.clone()) + .build(); + let span_exporter = InMemorySpanExporter::default(); + let tracer_provider = SdkTracerProvider::builder() + .with_simple_exporter(span_exporter.clone()) + .build(); + let tracer = tracer_provider.tracer("sink-split-test"); + + let subscriber = tracing_subscriber::registry() + .with( + opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge::new( + &logger_provider, + ) + .with_filter(filter_fn(OtelProvider::log_export_filter)), + ) + .with( + tracing_opentelemetry::layer() + .with_tracer(tracer) + .with_filter(filter_fn(OtelProvider::trace_export_filter)), + ); + + tracing::subscriber::with_default(subscriber, || { + tracing::callsite::rebuild_interest_cache(); + let manager = SessionTelemetry::new( + ThreadId::new(), + "gpt-5.1", + "gpt-5.1", + Some("account-id".to_string()), + Some("engineer@example.com".to_string()), + Some(TelemetryAuthMode::Chatgpt), + "codex_exec".to_string(), + true, + "tty".to_string(), + SessionSource::Cli, + ); + let root_span = tracing::info_span!("root"); + let _root_guard = root_span.enter(); + manager.record_websocket_request( + std::time::Duration::from_millis(23), + Some("stream error"), + true, + ); + }); + + logger_provider.force_flush().expect("flush logs"); + tracer_provider.force_flush().expect("flush traces"); + + let logs = log_exporter.get_emitted_logs().expect("log export"); + let request_log = find_log_by_event_name(&logs, "codex.websocket_request"); + let request_log_attrs = log_attributes(&request_log.record); + assert_eq!( + request_log_attrs + .get("auth.connection_reused") + .map(String::as_str), + Some("true") + ); + assert_eq!( + request_log_attrs.get("error.message").map(String::as_str), + Some("stream error") + ); + + let spans = span_exporter.get_finished_spans().expect("span export"); + let request_trace_event = + find_span_event_by_name_attr(&spans[0].events.events, "codex.websocket_request"); + let request_trace_attrs = span_event_attributes(request_trace_event); + assert_eq!( + request_trace_attrs + .get("auth.connection_reused") + .map(String::as_str), + Some("true") + ); +} diff --git a/codex-rs/otel/tests/suite/runtime_summary.rs b/codex-rs/otel/tests/suite/runtime_summary.rs index c2f252381f1..778ed05783b 100644 --- a/codex-rs/otel/tests/suite/runtime_summary.rs +++ b/codex-rs/otel/tests/suite/runtime_summary.rs @@ -47,8 +47,23 @@ fn runtime_metrics_summary_collects_tool_api_and_streaming_metrics() -> Result<( None, None, ); - manager.record_api_request(1, Some(200), None, Duration::from_millis(300)); - manager.record_websocket_request(Duration::from_millis(400), None); + manager.record_api_request( + 1, + Some(200), + None, + Duration::from_millis(300), + false, + None, + false, + None, + None, + "/responses", + None, + None, + None, + None, + ); + manager.record_websocket_request(Duration::from_millis(400), None, false); let sse_response: std::result::Result< Option>>, tokio::time::error::Elapsed,