From 4edbe6ff7895f9b70495bf7d9205c5bfac1d1abf Mon Sep 17 00:00:00 2001 From: Felipe Coury Date: Sun, 15 Mar 2026 13:23:49 -0300 Subject: [PATCH 01/12] fix(tui): restore remote resume and fork history Preserve the app-server `Thread` snapshot returned during start, resume, and fork, then seed the primary thread store from its `turns` before replaying the rebuilt transcript. This makes `tui_app_server` restore remote session history without relying on historical websocket notifications being replayed after `thread/resume` or `thread/fork`. --- codex-rs/tui_app_server/src/app.rs | 174 +++++++++++++++--- .../src/app/app_server_adapter.rs | 153 +++++++++++++++ .../tui_app_server/src/app_server_session.rs | 8 +- 3 files changed, 311 insertions(+), 24 deletions(-) diff --git a/codex-rs/tui_app_server/src/app.rs b/codex-rs/tui_app_server/src/app.rs index 9fd3f1bd3dd5..7096278b01b3 100644 --- a/codex-rs/tui_app_server/src/app.rs +++ b/codex-rs/tui_app_server/src/app.rs @@ -118,6 +118,7 @@ mod pending_interactive_replay; use self::agent_navigation::AgentNavigationDirection; use self::agent_navigation::AgentNavigationState; +use self::app_server_adapter::thread_snapshot_events; use self::app_server_requests::PendingAppServerRequests; use self::pending_interactive_replay::PendingInteractiveReplayState; @@ -2050,6 +2051,7 @@ impl App { self.active_thread_id = None; self.active_thread_rx = None; self.primary_thread_id = None; + self.primary_session_configured = None; self.pending_primary_events.clear(); self.pending_app_server_requests.clear(); self.chat_widget.set_pending_thread_approvals(Vec::new()); @@ -2117,11 +2119,41 @@ impl App { let init = self.chatwidget_init_for_forked_or_resumed_thread(tui, self.config.clone()); self.chat_widget = ChatWidget::new_with_app_event(init); self.reset_thread_event_state(); - self.enqueue_primary_event(Event { + self.restore_started_app_server_thread(started).await + } + + async fn restore_started_app_server_thread( + &mut self, + started: AppServerStartedThread, + ) -> Result<()> { + let session_configured = started.session_configured; + let thread_id = session_configured.session_id; + let session_event = Event { id: String::new(), - msg: EventMsg::SessionConfigured(started.session_configured), - }) - .await + msg: EventMsg::SessionConfigured(session_configured.clone()), + }; + let history_events = thread_snapshot_events(&started.thread); + + self.primary_thread_id = Some(thread_id); + self.primary_session_configured = Some(session_configured); + self.upsert_agent_picker_thread(thread_id, None, None, false); + + let store = { + let channel = self.ensure_thread_channel(thread_id); + Arc::clone(&channel.store) + }; + let snapshot = { + let mut store = store.lock().await; + store.push_event(session_event); + for event in history_events { + store.push_event(event); + } + store.snapshot() + }; + + self.activate_thread_channel(thread_id).await; + self.replay_thread_snapshot(snapshot, false); + Ok(()) } fn fresh_session_config(&self) -> Config { @@ -2313,7 +2345,7 @@ impl App { let enhanced_keys_supported = tui.enhanced_keys_supported(); let wait_for_initial_session_configured = Self::should_wait_for_initial_session(&session_selection); - let (mut chat_widget, initial_session_configured) = match session_selection { + let (mut chat_widget, initial_started_thread) = match session_selection { SessionSelection::StartFresh | SessionSelection::Exit => { let started = app_server.start_thread(&config).await?; let startup_tooltip_override = @@ -2342,10 +2374,7 @@ impl App { status_line_invalid_items_warned: status_line_invalid_items_warned.clone(), session_telemetry: session_telemetry.clone(), }; - ( - ChatWidget::new_with_app_event(init), - Some(started.session_configured), - ) + (ChatWidget::new_with_app_event(init), started) } SessionSelection::Resume(target_session) => { let resumed = app_server @@ -2378,10 +2407,7 @@ impl App { status_line_invalid_items_warned: status_line_invalid_items_warned.clone(), session_telemetry: session_telemetry.clone(), }; - ( - ChatWidget::new_with_app_event(init), - Some(resumed.session_configured), - ) + (ChatWidget::new_with_app_event(init), resumed) } SessionSelection::Fork(target_session) => { session_telemetry.counter( @@ -2419,10 +2445,7 @@ impl App { status_line_invalid_items_warned: status_line_invalid_items_warned.clone(), session_telemetry: session_telemetry.clone(), }; - ( - ChatWidget::new_with_app_event(init), - Some(forked.session_configured), - ) + (ChatWidget::new_with_app_event(init), forked) } }; @@ -2474,13 +2497,8 @@ impl App { pending_primary_events: VecDeque::new(), pending_app_server_requests: PendingAppServerRequests::default(), }; - if let Some(session_configured) = initial_session_configured { - app.enqueue_primary_event(Event { - id: String::new(), - msg: EventMsg::SessionConfigured(session_configured), - }) + app.restore_started_app_server_thread(initial_started_thread) .await?; - } // On startup, if Agent mode (workspace-write) or ReadOnly is active, warn about world-writable dirs on Windows. #[cfg(target_os = "windows")] @@ -4427,6 +4445,7 @@ mod tests { use crate::app_backtrack::BacktrackSelection; use crate::app_backtrack::BacktrackState; use crate::app_backtrack::user_count; + use crate::app_server_session::AppServerStartedThread; use crate::chatwidget::tests::make_chatwidget_manual_with_sender; use crate::chatwidget::tests::set_chatgpt_auth; use crate::file_search::FileSearchManager; @@ -4437,6 +4456,11 @@ mod tests { use crate::multi_agents::AgentPickerThreadEntry; use assert_matches::assert_matches; + use codex_app_server_protocol::Thread; + use codex_app_server_protocol::ThreadItem; + use codex_app_server_protocol::ThreadStatus; + use codex_app_server_protocol::Turn; + use codex_app_server_protocol::TurnStatus; use codex_core::config::ConfigBuilder; use codex_core::config::ConfigOverrides; use codex_core::config::types::ModelAvailabilityNuxConfig; @@ -6680,6 +6704,110 @@ guardian_approval = true ) } + #[tokio::test] + async fn restore_started_app_server_thread_replays_remote_history() -> Result<()> { + let (mut app, mut app_event_rx, _op_rx) = make_test_app_with_channels().await; + let thread_id = ThreadId::new(); + + app.restore_started_app_server_thread(AppServerStartedThread { + thread: Thread { + id: thread_id.to_string(), + preview: "hello".to_string(), + ephemeral: false, + model_provider: "test-provider".to_string(), + created_at: 0, + updated_at: 0, + status: ThreadStatus::Idle, + path: None, + cwd: PathBuf::from("/tmp/project"), + cli_version: "test".to_string(), + source: SessionSource::Cli.into(), + agent_nickname: None, + agent_role: None, + git_info: None, + name: Some("restored".to_string()), + turns: vec![Turn { + id: "turn-1".to_string(), + items: vec![ + ThreadItem::UserMessage { + id: "user-1".to_string(), + content: vec![codex_app_server_protocol::UserInput::Text { + text: "hello from remote".to_string(), + text_elements: Vec::new(), + }], + }, + ThreadItem::AgentMessage { + id: "assistant-1".to_string(), + text: "restored response".to_string(), + phase: None, + }, + ], + status: TurnStatus::Completed, + error: None, + }], + }, + session_configured: SessionConfiguredEvent { + session_id: thread_id, + forked_from_id: None, + thread_name: Some("restored".to_string()), + model: "gpt-test".to_string(), + model_provider_id: "test-provider".to_string(), + service_tier: None, + approval_policy: AskForApproval::Never, + approvals_reviewer: ApprovalsReviewer::User, + sandbox_policy: SandboxPolicy::new_read_only_policy(), + cwd: PathBuf::from("/tmp/project"), + reasoning_effort: None, + history_log_id: 0, + history_entry_count: 0, + initial_messages: None, + network_proxy: None, + rollout_path: Some(PathBuf::new()), + }, + }) + .await?; + + while let Ok(event) = app_event_rx.try_recv() { + if let AppEvent::InsertHistoryCell(cell) = event { + let cell: Arc = cell.into(); + app.transcript_cells.push(cell); + } + } + + assert_eq!(app.primary_thread_id, Some(thread_id)); + assert_eq!(app.active_thread_id, Some(thread_id)); + + let user_messages: Vec = app + .transcript_cells + .iter() + .filter_map(|cell| { + cell.as_any() + .downcast_ref::() + .map(|cell| cell.message.clone()) + }) + .collect(); + let agent_messages: Vec = app + .transcript_cells + .iter() + .filter_map(|cell| { + cell.as_any() + .downcast_ref::() + .map(|cell| { + cell.display_lines(80) + .into_iter() + .map(|line| line.to_string()) + .collect::>() + .join("\n") + }) + }) + .collect(); + + assert_eq!(user_messages, vec!["hello from remote".to_string()]); + assert_eq!(agent_messages, vec!["• restored response".to_string()]); + + Ok(()) + } + #[test] fn thread_event_store_tracks_active_turn_lifecycle() { let mut store = ThreadEventStore::new(8); diff --git a/codex-rs/tui_app_server/src/app/app_server_adapter.rs b/codex-rs/tui_app_server/src/app/app_server_adapter.rs index d21542b8bf0c..d89d21d21502 100644 --- a/codex-rs/tui_app_server/src/app/app_server_adapter.rs +++ b/codex-rs/tui_app_server/src/app/app_server_adapter.rs @@ -19,7 +19,10 @@ use crate::app_server_session::status_account_display_from_auth_mode; use codex_app_server_client::AppServerEvent; use codex_app_server_protocol::JSONRPCErrorError; use codex_app_server_protocol::ServerNotification; +use codex_app_server_protocol::Thread; use codex_app_server_protocol::ThreadItem; +use codex_app_server_protocol::Turn; +use codex_app_server_protocol::TurnStatus; use codex_protocol::ThreadId; use codex_protocol::config_types::ModeKind; use codex_protocol::items::AgentMessageContent; @@ -48,6 +51,8 @@ use codex_protocol::protocol::ThreadNameUpdatedEvent; use codex_protocol::protocol::TokenCountEvent; use codex_protocol::protocol::TokenUsage; use codex_protocol::protocol::TokenUsageInfo; +use codex_protocol::protocol::TurnAbortReason; +use codex_protocol::protocol::TurnAbortedEvent; use codex_protocol::protocol::TurnCompleteEvent; use codex_protocol::protocol::TurnStartedEvent; use serde_json::Value; @@ -196,6 +201,22 @@ impl App { } } +pub(super) fn thread_snapshot_events(thread: &Thread) -> Vec { + let Ok(thread_id) = ThreadId::from_string(&thread.id) else { + tracing::warn!( + thread_id = %thread.id, + "ignoring app-server thread snapshot with invalid thread id" + ); + return Vec::new(); + }; + + thread + .turns + .iter() + .flat_map(|turn| turn_snapshot_events(thread_id, turn)) + .collect() +} + fn legacy_thread_event(params: Option) -> Option<(ThreadId, Event)> { let Value::Object(mut params) = params? else { return None; @@ -418,6 +439,70 @@ fn token_usage_from_app_server( } } +fn turn_snapshot_events(thread_id: ThreadId, turn: &Turn) -> Vec { + let mut events = vec![Event { + id: String::new(), + msg: EventMsg::TurnStarted(TurnStartedEvent { + turn_id: turn.id.clone(), + model_context_window: None, + collaboration_mode_kind: ModeKind::default(), + }), + }]; + + events.extend(turn.items.iter().filter_map(|item| { + let item = thread_item_to_core(item.clone())?; + Some(Event { + id: String::new(), + msg: EventMsg::ItemCompleted(ItemCompletedEvent { + thread_id, + turn_id: turn.id.clone(), + item, + }), + }) + })); + + match turn.status { + TurnStatus::Completed => events.push(Event { + id: String::new(), + msg: EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: turn.id.clone(), + last_agent_message: None, + }), + }), + TurnStatus::Interrupted => events.push(Event { + id: String::new(), + msg: EventMsg::TurnAborted(TurnAbortedEvent { + turn_id: Some(turn.id.clone()), + reason: TurnAbortReason::Interrupted, + }), + }), + TurnStatus::Failed => { + if let Some(error) = &turn.error { + events.push(Event { + id: String::new(), + msg: EventMsg::Error(ErrorEvent { + message: error.message.clone(), + codex_error_info: error + .codex_error_info + .clone() + .and_then(app_server_codex_error_info_to_core), + }), + }); + } + events.push(Event { + id: String::new(), + msg: EventMsg::TurnComplete(TurnCompleteEvent { + turn_id: turn.id.clone(), + last_agent_message: None, + }), + }); + } + TurnStatus::InProgress => {} + } + + events +} + fn thread_item_to_core(item: ThreadItem) -> Option { match item { ThreadItem::UserMessage { id, content } => Some(TurnItem::UserMessage(UserMessageItem { @@ -504,11 +589,14 @@ fn app_server_codex_error_info_to_core( #[cfg(test)] mod tests { use super::server_notification_thread_events; + use super::thread_snapshot_events; use codex_app_server_protocol::AgentMessageDeltaNotification; use codex_app_server_protocol::ItemCompletedNotification; use codex_app_server_protocol::ReasoningSummaryTextDeltaNotification; use codex_app_server_protocol::ServerNotification; + use codex_app_server_protocol::Thread; use codex_app_server_protocol::ThreadItem; + use codex_app_server_protocol::ThreadStatus; use codex_app_server_protocol::Turn; use codex_app_server_protocol::TurnCompletedNotification; use codex_app_server_protocol::TurnStatus; @@ -518,7 +606,11 @@ mod tests { use codex_protocol::items::TurnItem; use codex_protocol::models::MessagePhase; use codex_protocol::protocol::EventMsg; + use codex_protocol::protocol::SessionSource; + use codex_protocol::protocol::TurnAbortReason; + use codex_protocol::protocol::TurnAbortedEvent; use pretty_assertions::assert_eq; + use std::path::PathBuf; #[test] fn bridges_completed_agent_messages_from_server_notifications() { @@ -642,4 +734,65 @@ mod tests { }; assert_eq!(delta.delta, "Thinking"); } + + #[test] + fn bridges_thread_snapshot_turns_for_resume_restore() { + let thread_id = ThreadId::new(); + let events = thread_snapshot_events(&Thread { + id: thread_id.to_string(), + preview: "hello".to_string(), + ephemeral: false, + model_provider: "openai".to_string(), + created_at: 0, + updated_at: 0, + status: ThreadStatus::Idle, + path: None, + cwd: PathBuf::from("/tmp/project"), + cli_version: "test".to_string(), + source: SessionSource::Cli.into(), + agent_nickname: None, + agent_role: None, + git_info: None, + name: Some("restore".to_string()), + turns: vec![ + Turn { + id: "turn-complete".to_string(), + items: vec![ + ThreadItem::UserMessage { + id: "user-1".to_string(), + content: vec![codex_app_server_protocol::UserInput::Text { + text: "hello".to_string(), + text_elements: Vec::new(), + }], + }, + ThreadItem::AgentMessage { + id: "assistant-1".to_string(), + text: "hi".to_string(), + phase: Some(MessagePhase::FinalAnswer), + }, + ], + status: TurnStatus::Completed, + error: None, + }, + Turn { + id: "turn-interrupted".to_string(), + items: Vec::new(), + status: TurnStatus::Interrupted, + error: None, + }, + ], + }); + + assert_eq!(events.len(), 6); + assert!(matches!(events[0].msg, EventMsg::TurnStarted(_))); + assert!(matches!(events[1].msg, EventMsg::ItemCompleted(_))); + assert!(matches!(events[2].msg, EventMsg::ItemCompleted(_))); + assert!(matches!(events[3].msg, EventMsg::TurnComplete(_))); + assert!(matches!(events[4].msg, EventMsg::TurnStarted(_))); + let EventMsg::TurnAborted(TurnAbortedEvent { turn_id, reason }) = &events[5].msg else { + panic!("expected interrupted turn replay"); + }; + assert_eq!(turn_id.as_deref(), Some("turn-interrupted")); + assert_eq!(*reason, TurnAbortReason::Interrupted); + } } diff --git a/codex-rs/tui_app_server/src/app_server_session.rs b/codex-rs/tui_app_server/src/app_server_session.rs index 19c882caf016..c3ed29c1e7a2 100644 --- a/codex-rs/tui_app_server/src/app_server_session.rs +++ b/codex-rs/tui_app_server/src/app_server_session.rs @@ -124,6 +124,7 @@ impl ThreadParamsMode { } pub(crate) struct AppServerStartedThread { + pub(crate) thread: Thread, pub(crate) session_configured: SessionConfiguredEvent, } @@ -840,7 +841,10 @@ fn started_thread_from_start_response( ) -> Result { let session_configured = session_configured_from_thread_start_response(response) .map_err(color_eyre::eyre::Report::msg)?; - Ok(AppServerStartedThread { session_configured }) + Ok(AppServerStartedThread { + thread: response.thread.clone(), + session_configured, + }) } fn started_thread_from_resume_response( @@ -850,6 +854,7 @@ fn started_thread_from_resume_response( let session_configured = session_configured_from_thread_resume_response(response) .map_err(color_eyre::eyre::Report::msg)?; Ok(AppServerStartedThread { + thread: response.thread.clone(), session_configured: SessionConfiguredEvent { initial_messages: thread_initial_messages( &session_configured.session_id, @@ -868,6 +873,7 @@ fn started_thread_from_fork_response( let session_configured = session_configured_from_thread_fork_response(response) .map_err(color_eyre::eyre::Report::msg)?; Ok(AppServerStartedThread { + thread: response.thread.clone(), session_configured: SessionConfiguredEvent { initial_messages: thread_initial_messages( &session_configured.session_id, From aef587ff41aeb4af760260e655ba3b8e61e55bc4 Mon Sep 17 00:00:00 2001 From: Felipe Coury Date: Mon, 16 Mar 2026 20:35:17 -0300 Subject: [PATCH 02/12] fix(tui): preserve restored thread history Build the first replay snapshot for resumed and forked app-server threads from a temporary lossless `ThreadEventStore` before updating the real bounded channel store. This keeps early history visible on first load even when the restored thread is larger than the live event buffer. Add a regression test for over-capacity restores and document why the `TurnStatus::InProgress` snapshot arm intentionally emits no completion event during replay. --- codex-rs/tui_app_server/src/app.rs | 105 +++++++++++++++++- .../src/app/app_server_adapter.rs | 4 +- 2 files changed, 104 insertions(+), 5 deletions(-) diff --git a/codex-rs/tui_app_server/src/app.rs b/codex-rs/tui_app_server/src/app.rs index 7096278b01b3..9480c8add753 100644 --- a/codex-rs/tui_app_server/src/app.rs +++ b/codex-rs/tui_app_server/src/app.rs @@ -2133,6 +2133,14 @@ impl App { msg: EventMsg::SessionConfigured(session_configured.clone()), }; let history_events = thread_snapshot_events(&started.thread); + let replay_snapshot = { + let mut replay_store = ThreadEventStore::new(history_events.len().saturating_add(1)); + replay_store.push_event(session_event.clone()); + for event in &history_events { + replay_store.push_event(event.clone()); + } + replay_store.snapshot() + }; self.primary_thread_id = Some(thread_id); self.primary_session_configured = Some(session_configured); @@ -2142,17 +2150,16 @@ impl App { let channel = self.ensure_thread_channel(thread_id); Arc::clone(&channel.store) }; - let snapshot = { + { let mut store = store.lock().await; store.push_event(session_event); for event in history_events { store.push_event(event); } - store.snapshot() - }; + } self.activate_thread_channel(thread_id).await; - self.replay_thread_snapshot(snapshot, false); + self.replay_thread_snapshot(replay_snapshot, false); Ok(()) } @@ -6808,6 +6815,96 @@ guardian_approval = true Ok(()) } + #[tokio::test] + async fn restore_started_app_server_thread_replays_history_beyond_store_capacity() -> Result<()> + { + let (mut app, mut app_event_rx, _op_rx) = make_test_app_with_channels().await; + let thread_id = ThreadId::new(); + let turn_count = THREAD_EVENT_CHANNEL_CAPACITY + 5; + + let turns = (0..turn_count) + .map(|index| Turn { + id: format!("turn-{index}"), + items: vec![ThreadItem::UserMessage { + id: format!("user-{index}"), + content: vec![codex_app_server_protocol::UserInput::Text { + text: format!("message {index}"), + text_elements: Vec::new(), + }], + }], + status: TurnStatus::Completed, + error: None, + }) + .collect(); + + app.restore_started_app_server_thread(AppServerStartedThread { + thread: Thread { + id: thread_id.to_string(), + preview: "hello".to_string(), + ephemeral: false, + model_provider: "test-provider".to_string(), + created_at: 0, + updated_at: 0, + status: ThreadStatus::Idle, + path: None, + cwd: PathBuf::from("/tmp/project"), + cli_version: "test".to_string(), + source: SessionSource::Cli.into(), + agent_nickname: None, + agent_role: None, + git_info: None, + name: Some("restored".to_string()), + turns, + }, + session_configured: SessionConfiguredEvent { + session_id: thread_id, + forked_from_id: None, + thread_name: Some("restored".to_string()), + model: "gpt-test".to_string(), + model_provider_id: "test-provider".to_string(), + service_tier: None, + approval_policy: AskForApproval::Never, + approvals_reviewer: ApprovalsReviewer::User, + sandbox_policy: SandboxPolicy::new_read_only_policy(), + cwd: PathBuf::from("/tmp/project"), + reasoning_effort: None, + history_log_id: 0, + history_entry_count: 0, + initial_messages: None, + network_proxy: None, + rollout_path: Some(PathBuf::new()), + }, + }) + .await?; + + while let Ok(event) = app_event_rx.try_recv() { + if let AppEvent::InsertHistoryCell(cell) = event { + let cell: Arc = cell.into(); + app.transcript_cells.push(cell); + } + } + + let user_messages: Vec = app + .transcript_cells + .iter() + .filter_map(|cell| { + cell.as_any() + .downcast_ref::() + .map(|cell| cell.message.clone()) + }) + .collect(); + + assert_eq!(user_messages.len(), turn_count); + assert_eq!(user_messages.first().map(String::as_str), Some("message 0")); + let last_message = format!("message {}", turn_count - 1); + assert_eq!( + user_messages.last().map(String::as_str), + Some(last_message.as_str()) + ); + + Ok(()) + } + #[test] fn thread_event_store_tracks_active_turn_lifecycle() { let mut store = ThreadEventStore::new(8); diff --git a/codex-rs/tui_app_server/src/app/app_server_adapter.rs b/codex-rs/tui_app_server/src/app/app_server_adapter.rs index d89d21d21502..1819e4947e2a 100644 --- a/codex-rs/tui_app_server/src/app/app_server_adapter.rs +++ b/codex-rs/tui_app_server/src/app/app_server_adapter.rs @@ -497,7 +497,9 @@ fn turn_snapshot_events(thread_id: ThreadId, turn: &Turn) -> Vec { }), }); } - TurnStatus::InProgress => {} + TurnStatus::InProgress => { + // Preserve unfinished turns during snapshot replay without emitting completion events. + } } events From 28f80f4a4c56f9e21739ab42e1b350eb02293e0d Mon Sep 17 00:00:00 2001 From: Felipe Coury Date: Mon, 16 Mar 2026 20:49:09 -0300 Subject: [PATCH 03/12] fix(tui): align restored and live turn terminal states Share terminal turn event mapping between the live app-server bridge and snapshot replay so interrupted and failed turns hydrate with the same `EventMsg` sequence in both paths. Add coverage for interrupted and failed `TurnCompleted` notifications to lock in parity for reconnect and restore flows. --- .../src/app/app_server_adapter.rs | 102 +++++++++++++++--- 1 file changed, 90 insertions(+), 12 deletions(-) diff --git a/codex-rs/tui_app_server/src/app/app_server_adapter.rs b/codex-rs/tui_app_server/src/app/app_server_adapter.rs index 1819e4947e2a..e2198e932e14 100644 --- a/codex-rs/tui_app_server/src/app/app_server_adapter.rs +++ b/codex-rs/tui_app_server/src/app/app_server_adapter.rs @@ -307,16 +307,12 @@ fn server_notification_thread_events( }), }], )), - ServerNotification::TurnCompleted(notification) => Some(( - ThreadId::from_string(¬ification.thread_id).ok()?, - vec![Event { - id: String::new(), - msg: EventMsg::TurnComplete(TurnCompleteEvent { - turn_id: notification.turn.id, - last_agent_message: None, - }), - }], - )), + ServerNotification::TurnCompleted(notification) => { + let thread_id = ThreadId::from_string(¬ification.thread_id).ok()?; + let mut events = Vec::new(); + append_terminal_turn_events(&mut events, ¬ification.turn); + Some((thread_id, events)) + } ServerNotification::ItemStarted(notification) => Some(( ThreadId::from_string(¬ification.thread_id).ok()?, vec![Event { @@ -461,6 +457,12 @@ fn turn_snapshot_events(thread_id: ThreadId, turn: &Turn) -> Vec { }) })); + append_terminal_turn_events(&mut events, turn); + + events +} + +fn append_terminal_turn_events(events: &mut Vec, turn: &Turn) { match turn.status { TurnStatus::Completed => events.push(Event { id: String::new(), @@ -501,8 +503,6 @@ fn turn_snapshot_events(thread_id: ThreadId, turn: &Turn) -> Vec { // Preserve unfinished turns during snapshot replay without emitting completion events. } } - - events } fn thread_item_to_core(item: ThreadItem) -> Option { @@ -593,6 +593,7 @@ mod tests { use super::server_notification_thread_events; use super::thread_snapshot_events; use codex_app_server_protocol::AgentMessageDeltaNotification; + use codex_app_server_protocol::CodexErrorInfo; use codex_app_server_protocol::ItemCompletedNotification; use codex_app_server_protocol::ReasoningSummaryTextDeltaNotification; use codex_app_server_protocol::ServerNotification; @@ -601,6 +602,7 @@ mod tests { use codex_app_server_protocol::ThreadStatus; use codex_app_server_protocol::Turn; use codex_app_server_protocol::TurnCompletedNotification; + use codex_app_server_protocol::TurnError; use codex_app_server_protocol::TurnStatus; use codex_protocol::ThreadId; use codex_protocol::items::AgentMessageContent; @@ -695,6 +697,82 @@ mod tests { assert_eq!(completed.last_agent_message, None); } + #[test] + fn bridges_interrupted_turn_completion_from_server_notifications() { + let thread_id = "019cee8c-b993-7e33-88c0-014d4e62612d".to_string(); + let turn_id = "019cee8c-b9b4-7f10-a1b0-38caa876a012".to_string(); + + let (actual_thread_id, events) = server_notification_thread_events( + ServerNotification::TurnCompleted(TurnCompletedNotification { + thread_id: thread_id.clone(), + turn: Turn { + id: turn_id.clone(), + items: Vec::new(), + status: TurnStatus::Interrupted, + error: None, + }, + }), + ) + .expect("notification should bridge"); + + assert_eq!( + actual_thread_id, + ThreadId::from_string(&thread_id).expect("valid thread id") + ); + let [event] = events.as_slice() else { + panic!("expected one bridged event"); + }; + let EventMsg::TurnAborted(aborted) = &event.msg else { + panic!("expected turn aborted event"); + }; + assert_eq!(aborted.turn_id.as_deref(), Some(turn_id.as_str())); + assert_eq!(aborted.reason, TurnAbortReason::Interrupted); + } + + #[test] + fn bridges_failed_turn_completion_from_server_notifications() { + let thread_id = "019cee8c-b993-7e33-88c0-014d4e62612d".to_string(); + let turn_id = "019cee8c-b9b4-7f10-a1b0-38caa876a012".to_string(); + + let (actual_thread_id, events) = server_notification_thread_events( + ServerNotification::TurnCompleted(TurnCompletedNotification { + thread_id: thread_id.clone(), + turn: Turn { + id: turn_id.clone(), + items: Vec::new(), + status: TurnStatus::Failed, + error: Some(TurnError { + message: "request failed".to_string(), + codex_error_info: Some(CodexErrorInfo::Other), + additional_details: None, + }), + }, + }), + ) + .expect("notification should bridge"); + + assert_eq!( + actual_thread_id, + ThreadId::from_string(&thread_id).expect("valid thread id") + ); + let [error_event, complete_event] = events.as_slice() else { + panic!("expected error and completion events"); + }; + let EventMsg::Error(error) = &error_event.msg else { + panic!("expected error event"); + }; + assert_eq!(error.message, "request failed"); + assert_eq!( + error.codex_error_info, + Some(codex_protocol::protocol::CodexErrorInfo::Other) + ); + let EventMsg::TurnComplete(completed) = &complete_event.msg else { + panic!("expected turn complete event"); + }; + assert_eq!(completed.turn_id, turn_id); + assert_eq!(completed.last_agent_message, None); + } + #[test] fn bridges_text_deltas_from_server_notifications() { let thread_id = "019cee8c-b993-7e33-88c0-014d4e62612d".to_string(); From 72fbf9b054cf87d2453825772ee467b66f1fc665 Mon Sep 17 00:00:00 2001 From: Felipe Coury Date: Mon, 16 Mar 2026 20:56:09 -0300 Subject: [PATCH 04/12] docs(tui): document thread snapshot restore and replay functions Add rustdoc to the new types and functions introduced by the remote resume/fork history fix so reviewers can understand intent without reading the full diff. --- codex-rs/tui_app_server/src/app.rs | 14 ++++++++++++ .../src/app/app_server_adapter.rs | 22 +++++++++++++++++++ .../tui_app_server/src/app_server_session.rs | 8 +++++++ 3 files changed, 44 insertions(+) diff --git a/codex-rs/tui_app_server/src/app.rs b/codex-rs/tui_app_server/src/app.rs index 9480c8add753..d781a6863f17 100644 --- a/codex-rs/tui_app_server/src/app.rs +++ b/codex-rs/tui_app_server/src/app.rs @@ -2122,6 +2122,20 @@ impl App { self.restore_started_app_server_thread(started).await } + /// Hydrate thread state from an `AppServerStartedThread` returned by the + /// app-server start/resume/fork handshake. + /// + /// This is the single path that every session-start variant funnels + /// through. It performs four things in order: + /// + /// 1. Converts the `Thread` snapshot into protocol-level `Event`s. + /// 2. Builds a **lossless** replay snapshot from a temporary store so that + /// the initial render sees all history even when the thread has more + /// turns than the bounded channel capacity. + /// 3. Pushes the same events into the real channel store for backtrack and + /// navigation. + /// 4. Activates the thread channel and replays the snapshot into the chat + /// widget. async fn restore_started_app_server_thread( &mut self, started: AppServerStartedThread, diff --git a/codex-rs/tui_app_server/src/app/app_server_adapter.rs b/codex-rs/tui_app_server/src/app/app_server_adapter.rs index e2198e932e14..72216b98cc02 100644 --- a/codex-rs/tui_app_server/src/app/app_server_adapter.rs +++ b/codex-rs/tui_app_server/src/app/app_server_adapter.rs @@ -201,6 +201,12 @@ impl App { } } +/// Convert a `Thread` snapshot into a flat sequence of protocol `Event`s +/// suitable for replaying into the TUI event store. +/// +/// Each turn is expanded into `TurnStarted`, zero or more `ItemCompleted`, +/// and a terminal event that matches the turn's `TurnStatus`. Returns an +/// empty vec (with a warning log) if the thread ID is not a valid UUID. pub(super) fn thread_snapshot_events(thread: &Thread) -> Vec { let Ok(thread_id) = ThreadId::from_string(&thread.id) else { tracing::warn!( @@ -435,6 +441,12 @@ fn token_usage_from_app_server( } } +/// Expand a single `Turn` into the event sequence the TUI would have +/// observed if it had been connected for the turn's entire lifetime: +/// `TurnStarted` → `ItemCompleted`* → terminal event. +/// +/// Items that cannot be mapped to a core `TurnItem` (e.g. unknown +/// variants) are silently skipped via `filter_map`. fn turn_snapshot_events(thread_id: ThreadId, turn: &Turn) -> Vec { let mut events = vec![Event { id: String::new(), @@ -462,6 +474,16 @@ fn turn_snapshot_events(thread_id: ThreadId, turn: &Turn) -> Vec { events } +/// Append the terminal event(s) for a turn based on its `TurnStatus`. +/// +/// This function is shared between the live notification bridge +/// (`TurnCompleted` handling) and the snapshot replay path so that both +/// produce identical `EventMsg` sequences for the same turn status. +/// +/// - `Completed` → `TurnComplete` +/// - `Interrupted` → `TurnAborted { reason: Interrupted }` +/// - `Failed` → `Error` (if present) then `TurnComplete` +/// - `InProgress` → no events (the turn is still running) fn append_terminal_turn_events(events: &mut Vec, turn: &Turn) { match turn.status { TurnStatus::Completed => events.push(Event { diff --git a/codex-rs/tui_app_server/src/app_server_session.rs b/codex-rs/tui_app_server/src/app_server_session.rs index c3ed29c1e7a2..deb793052f41 100644 --- a/codex-rs/tui_app_server/src/app_server_session.rs +++ b/codex-rs/tui_app_server/src/app_server_session.rs @@ -123,6 +123,14 @@ impl ThreadParamsMode { } } +/// Result of starting, resuming, or forking an app-server thread. +/// +/// Carries the full `Thread` snapshot returned by the server alongside the +/// derived `SessionConfiguredEvent`. The snapshot's `turns` are used by +/// `App::restore_started_app_server_thread` to seed the event store and +/// replay transcript history — this is the only source of prior-turn data +/// for remote sessions, where historical websocket notifications are not +/// re-sent after the handshake. pub(crate) struct AppServerStartedThread { pub(crate) thread: Thread, pub(crate) session_configured: SessionConfiguredEvent, From 1b99608e5671041bc606ae4eedfd887e520f47d6 Mon Sep 17 00:00:00 2001 From: Felipe Coury Date: Mon, 16 Mar 2026 21:10:14 -0300 Subject: [PATCH 05/12] fix(tui): trim restored thread cloning Borrow app-server thread items while bridging replay events and move `response.thread` out of start, resume, and fork responses instead of cloning potentially large restored histories. This keeps the replay behavior unchanged while removing avoidable allocation churn on restore paths and response mapping. --- .../src/app/app_server_adapter.rs | 48 ++++++++++-------- .../tui_app_server/src/app_server_session.rs | 50 ++++++++++--------- 2 files changed, 54 insertions(+), 44 deletions(-) diff --git a/codex-rs/tui_app_server/src/app/app_server_adapter.rs b/codex-rs/tui_app_server/src/app/app_server_adapter.rs index 72216b98cc02..15afbd741942 100644 --- a/codex-rs/tui_app_server/src/app/app_server_adapter.rs +++ b/codex-rs/tui_app_server/src/app/app_server_adapter.rs @@ -326,7 +326,7 @@ fn server_notification_thread_events( msg: EventMsg::ItemStarted(ItemStartedEvent { thread_id: ThreadId::from_string(¬ification.thread_id).ok()?, turn_id: notification.turn_id, - item: thread_item_to_core(notification.item)?, + item: thread_item_to_core(¬ification.item)?, }), }], )), @@ -337,7 +337,7 @@ fn server_notification_thread_events( msg: EventMsg::ItemCompleted(ItemCompletedEvent { thread_id: ThreadId::from_string(¬ification.thread_id).ok()?, turn_id: notification.turn_id, - item: thread_item_to_core(notification.item)?, + item: thread_item_to_core(¬ification.item)?, }), }], )), @@ -458,7 +458,7 @@ fn turn_snapshot_events(thread_id: ThreadId, turn: &Turn) -> Vec { }]; events.extend(turn.items.iter().filter_map(|item| { - let item = thread_item_to_core(item.clone())?; + let item = thread_item_to_core(item)?; Some(Event { id: String::new(), msg: EventMsg::ItemCompleted(ItemCompletedEvent { @@ -527,36 +527,40 @@ fn append_terminal_turn_events(events: &mut Vec, turn: &Turn) { } } -fn thread_item_to_core(item: ThreadItem) -> Option { +fn thread_item_to_core(item: &ThreadItem) -> Option { match item { ThreadItem::UserMessage { id, content } => Some(TurnItem::UserMessage(UserMessageItem { - id, + id: id.clone(), content: content - .into_iter() + .iter() + .cloned() .map(codex_app_server_protocol::UserInput::into_core) .collect(), })), ThreadItem::AgentMessage { id, text, phase } => { Some(TurnItem::AgentMessage(AgentMessageItem { - id, - content: vec![AgentMessageContent::Text { text }], - phase, + id: id.clone(), + content: vec![AgentMessageContent::Text { text: text.clone() }], + phase: phase.clone(), })) } - ThreadItem::Plan { id, text } => Some(TurnItem::Plan(PlanItem { id, text })), + ThreadItem::Plan { id, text } => Some(TurnItem::Plan(PlanItem { + id: id.clone(), + text: text.clone(), + })), ThreadItem::Reasoning { id, summary, content, } => Some(TurnItem::Reasoning(ReasoningItem { - id, - summary_text: summary, - raw_content: content, + id: id.clone(), + summary_text: summary.clone(), + raw_content: content.clone(), })), ThreadItem::WebSearch { id, query, action } => Some(TurnItem::WebSearch(WebSearchItem { - id, - query, - action: app_server_web_search_action_to_core(action?)?, + id: id.clone(), + query: query.clone(), + action: app_server_web_search_action_to_core(action.clone()?)?, })), ThreadItem::ImageGeneration { id, @@ -564,14 +568,16 @@ fn thread_item_to_core(item: ThreadItem) -> Option { revised_prompt, result, } => Some(TurnItem::ImageGeneration(ImageGenerationItem { - id, - status, - revised_prompt, - result, + id: id.clone(), + status: status.clone(), + revised_prompt: revised_prompt.clone(), + result: result.clone(), saved_path: None, })), ThreadItem::ContextCompaction { id } => { - Some(TurnItem::ContextCompaction(ContextCompactionItem { id })) + Some(TurnItem::ContextCompaction(ContextCompactionItem { + id: id.clone(), + })) } ThreadItem::CommandExecution { .. } | ThreadItem::FileChange { .. } diff --git a/codex-rs/tui_app_server/src/app_server_session.rs b/codex-rs/tui_app_server/src/app_server_session.rs index deb793052f41..708c09657973 100644 --- a/codex-rs/tui_app_server/src/app_server_session.rs +++ b/codex-rs/tui_app_server/src/app_server_session.rs @@ -276,7 +276,7 @@ impl AppServerSession { }) .await .wrap_err("thread/start failed during TUI bootstrap")?; - started_thread_from_start_response(&response) + started_thread_from_start_response(response) } pub(crate) async fn resume_thread( @@ -298,7 +298,7 @@ impl AppServerSession { }) .await .wrap_err("thread/resume failed during TUI bootstrap")?; - started_thread_from_resume_response(&response, show_raw_agent_reasoning) + started_thread_from_resume_response(response, show_raw_agent_reasoning) } pub(crate) async fn fork_thread( @@ -320,7 +320,7 @@ impl AppServerSession { }) .await .wrap_err("thread/fork failed during TUI bootstrap")?; - started_thread_from_fork_response(&response, show_raw_agent_reasoning) + started_thread_from_fork_response(response, show_raw_agent_reasoning) } fn thread_params_mode(&self) -> ThreadParamsMode { @@ -845,49 +845,53 @@ fn thread_cwd_from_config(config: &Config, thread_params_mode: ThreadParamsMode) } fn started_thread_from_start_response( - response: &ThreadStartResponse, + response: ThreadStartResponse, ) -> Result { - let session_configured = session_configured_from_thread_start_response(response) + let session_configured = session_configured_from_thread_start_response(&response) .map_err(color_eyre::eyre::Report::msg)?; Ok(AppServerStartedThread { - thread: response.thread.clone(), + thread: response.thread, session_configured, }) } fn started_thread_from_resume_response( - response: &ThreadResumeResponse, + response: ThreadResumeResponse, show_raw_agent_reasoning: bool, ) -> Result { - let session_configured = session_configured_from_thread_resume_response(response) + let session_configured = session_configured_from_thread_resume_response(&response) .map_err(color_eyre::eyre::Report::msg)?; + let thread = response.thread; + let initial_messages = thread_initial_messages( + &session_configured.session_id, + &thread.turns, + show_raw_agent_reasoning, + ); Ok(AppServerStartedThread { - thread: response.thread.clone(), + thread, session_configured: SessionConfiguredEvent { - initial_messages: thread_initial_messages( - &session_configured.session_id, - &response.thread.turns, - show_raw_agent_reasoning, - ), + initial_messages, ..session_configured }, }) } fn started_thread_from_fork_response( - response: &ThreadForkResponse, + response: ThreadForkResponse, show_raw_agent_reasoning: bool, ) -> Result { - let session_configured = session_configured_from_thread_fork_response(response) + let session_configured = session_configured_from_thread_fork_response(&response) .map_err(color_eyre::eyre::Report::msg)?; + let thread = response.thread; + let initial_messages = thread_initial_messages( + &session_configured.session_id, + &thread.turns, + show_raw_agent_reasoning, + ); Ok(AppServerStartedThread { - thread: response.thread.clone(), + thread, session_configured: SessionConfiguredEvent { - initial_messages: thread_initial_messages( - &session_configured.session_id, - &response.thread.turns, - show_raw_agent_reasoning, - ), + initial_messages, ..session_configured }, }) @@ -1268,7 +1272,7 @@ mod tests { }; let started = - started_thread_from_resume_response(&response, /*show_raw_agent_reasoning*/ false) + started_thread_from_resume_response(response, /*show_raw_agent_reasoning*/ false) .expect("resume response should map"); let initial_messages = started .session_configured From d261fbf1332c268dd54cd4acaa7bec5e8b46d418 Mon Sep 17 00:00:00 2001 From: Felipe Coury Date: Mon, 16 Mar 2026 21:17:01 -0300 Subject: [PATCH 06/12] fix(tui): preserve web search other actions Keep `WebSearchAction::Other` when bridging app-server thread items so restore replay matches the session mapping path instead of dropping the entire web search item. This preserves history parity for restored threads without changing the handling of supported web search actions. --- codex-rs/tui_app_server/src/app/app_server_adapter.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/codex-rs/tui_app_server/src/app/app_server_adapter.rs b/codex-rs/tui_app_server/src/app/app_server_adapter.rs index 15afbd741942..c0b36364f6d1 100644 --- a/codex-rs/tui_app_server/src/app/app_server_adapter.rs +++ b/codex-rs/tui_app_server/src/app/app_server_adapter.rs @@ -606,7 +606,9 @@ fn app_server_web_search_action_to_core( codex_app_server_protocol::WebSearchAction::FindInPage { url, pattern } => { Some(codex_protocol::models::WebSearchAction::FindInPage { url, pattern }) } - codex_app_server_protocol::WebSearchAction::Other => None, + codex_app_server_protocol::WebSearchAction::Other => { + Some(codex_protocol::models::WebSearchAction::Other) + } } } From b0160cc58b22078a653927075fe311518a1f2d56 Mon Sep 17 00:00:00 2001 From: Felipe Coury Date: Mon, 16 Mar 2026 21:33:15 -0300 Subject: [PATCH 07/12] fix(tui): avoid duplicate live turn errors Keep failed-turn error synthesis for snapshot replay, but suppress it when bridging live `TurnCompleted` notifications because the app-server already emits a separate `ServerNotification::Error` for failed turns. Add coverage so live failed turns emit a single completion event while restored failed turns still replay their error state. --- .../src/app/app_server_adapter.rs | 46 +++++++++++++------ 1 file changed, 31 insertions(+), 15 deletions(-) diff --git a/codex-rs/tui_app_server/src/app/app_server_adapter.rs b/codex-rs/tui_app_server/src/app/app_server_adapter.rs index c0b36364f6d1..fe9f0e0a693c 100644 --- a/codex-rs/tui_app_server/src/app/app_server_adapter.rs +++ b/codex-rs/tui_app_server/src/app/app_server_adapter.rs @@ -316,7 +316,11 @@ fn server_notification_thread_events( ServerNotification::TurnCompleted(notification) => { let thread_id = ThreadId::from_string(¬ification.thread_id).ok()?; let mut events = Vec::new(); - append_terminal_turn_events(&mut events, ¬ification.turn); + append_terminal_turn_events( + &mut events, + ¬ification.turn, + /*include_failed_error*/ false, + ); Some((thread_id, events)) } ServerNotification::ItemStarted(notification) => Some(( @@ -469,7 +473,7 @@ fn turn_snapshot_events(thread_id: ThreadId, turn: &Turn) -> Vec { }) })); - append_terminal_turn_events(&mut events, turn); + append_terminal_turn_events(&mut events, turn, /*include_failed_error*/ true); events } @@ -484,7 +488,7 @@ fn turn_snapshot_events(thread_id: ThreadId, turn: &Turn) -> Vec { /// - `Interrupted` → `TurnAborted { reason: Interrupted }` /// - `Failed` → `Error` (if present) then `TurnComplete` /// - `InProgress` → no events (the turn is still running) -fn append_terminal_turn_events(events: &mut Vec, turn: &Turn) { +fn append_terminal_turn_events(events: &mut Vec, turn: &Turn, include_failed_error: bool) { match turn.status { TurnStatus::Completed => events.push(Event { id: String::new(), @@ -501,7 +505,7 @@ fn append_terminal_turn_events(events: &mut Vec, turn: &Turn) { }), }), TurnStatus::Failed => { - if let Some(error) = &turn.error { + if include_failed_error && let Some(error) = &turn.error { events.push(Event { id: String::new(), msg: EventMsg::Error(ErrorEvent { @@ -785,17 +789,9 @@ mod tests { actual_thread_id, ThreadId::from_string(&thread_id).expect("valid thread id") ); - let [error_event, complete_event] = events.as_slice() else { - panic!("expected error and completion events"); + let [complete_event] = events.as_slice() else { + panic!("expected turn completion only"); }; - let EventMsg::Error(error) = &error_event.msg else { - panic!("expected error event"); - }; - assert_eq!(error.message, "request failed"); - assert_eq!( - error.codex_error_info, - Some(codex_protocol::protocol::CodexErrorInfo::Other) - ); let EventMsg::TurnComplete(completed) = &complete_event.msg else { panic!("expected turn complete event"); }; @@ -890,10 +886,20 @@ mod tests { status: TurnStatus::Interrupted, error: None, }, + Turn { + id: "turn-failed".to_string(), + items: Vec::new(), + status: TurnStatus::Failed, + error: Some(TurnError { + message: "request failed".to_string(), + codex_error_info: Some(CodexErrorInfo::Other), + additional_details: None, + }), + }, ], }); - assert_eq!(events.len(), 6); + assert_eq!(events.len(), 9); assert!(matches!(events[0].msg, EventMsg::TurnStarted(_))); assert!(matches!(events[1].msg, EventMsg::ItemCompleted(_))); assert!(matches!(events[2].msg, EventMsg::ItemCompleted(_))); @@ -904,5 +910,15 @@ mod tests { }; assert_eq!(turn_id.as_deref(), Some("turn-interrupted")); assert_eq!(*reason, TurnAbortReason::Interrupted); + assert!(matches!(events[6].msg, EventMsg::TurnStarted(_))); + let EventMsg::Error(error) = &events[7].msg else { + panic!("expected failed turn error replay"); + }; + assert_eq!(error.message, "request failed"); + assert_eq!( + error.codex_error_info, + Some(codex_protocol::protocol::CodexErrorInfo::Other) + ); + assert!(matches!(events[8].msg, EventMsg::TurnComplete(_))); } } From 5fc90fd39e21638eb228e7c52e0b05914ef3862c Mon Sep 17 00:00:00 2001 From: Felipe Coury Date: Mon, 16 Mar 2026 21:50:45 -0300 Subject: [PATCH 08/12] fix(tui): avoid duplicate restored history replay Stop resume and fork startup mapping from populating `SessionConfiguredEvent.initial_messages` now that restored thread history is replayed through the snapshot restore path. Remove the unused initial-message conversion helpers and update the session test to lock in snapshot-only replay for restored history. --- .../tui_app_server/src/app_server_session.rs | 177 +----------------- 1 file changed, 8 insertions(+), 169 deletions(-) diff --git a/codex-rs/tui_app_server/src/app_server_session.rs b/codex-rs/tui_app_server/src/app_server_session.rs index 708c09657973..1539b2a9cf9a 100644 --- a/codex-rs/tui_app_server/src/app_server_session.rs +++ b/codex-rs/tui_app_server/src/app_server_session.rs @@ -55,15 +55,6 @@ use codex_app_server_protocol::TurnSteerResponse; use codex_core::config::Config; use codex_otel::TelemetryAuthMode; use codex_protocol::ThreadId; -use codex_protocol::items::AgentMessageContent; -use codex_protocol::items::AgentMessageItem; -use codex_protocol::items::ContextCompactionItem; -use codex_protocol::items::ImageGenerationItem; -use codex_protocol::items::PlanItem; -use codex_protocol::items::ReasoningItem; -use codex_protocol::items::TurnItem; -use codex_protocol::items::UserMessageItem; -use codex_protocol::items::WebSearchItem; use codex_protocol::openai_models::ModelAvailabilityNux; use codex_protocol::openai_models::ModelPreset; use codex_protocol::openai_models::ModelUpgrade; @@ -73,8 +64,6 @@ use codex_protocol::protocol::ConversationAudioParams; use codex_protocol::protocol::ConversationStartParams; use codex_protocol::protocol::ConversationTextParams; use codex_protocol::protocol::CreditsSnapshot; -use codex_protocol::protocol::EventMsg; -use codex_protocol::protocol::ItemCompletedEvent; use codex_protocol::protocol::RateLimitSnapshot; use codex_protocol::protocol::RateLimitWindow; use codex_protocol::protocol::ReviewRequest; @@ -857,43 +846,27 @@ fn started_thread_from_start_response( fn started_thread_from_resume_response( response: ThreadResumeResponse, - show_raw_agent_reasoning: bool, + _show_raw_agent_reasoning: bool, ) -> Result { let session_configured = session_configured_from_thread_resume_response(&response) .map_err(color_eyre::eyre::Report::msg)?; let thread = response.thread; - let initial_messages = thread_initial_messages( - &session_configured.session_id, - &thread.turns, - show_raw_agent_reasoning, - ); Ok(AppServerStartedThread { thread, - session_configured: SessionConfiguredEvent { - initial_messages, - ..session_configured - }, + session_configured, }) } fn started_thread_from_fork_response( response: ThreadForkResponse, - show_raw_agent_reasoning: bool, + _show_raw_agent_reasoning: bool, ) -> Result { let session_configured = session_configured_from_thread_fork_response(&response) .map_err(color_eyre::eyre::Report::msg)?; let thread = response.thread; - let initial_messages = thread_initial_messages( - &session_configured.session_id, - &thread.turns, - show_raw_agent_reasoning, - ); Ok(AppServerStartedThread { thread, - session_configured: SessionConfiguredEvent { - initial_messages, - ..session_configured - }, + session_configured, }) } @@ -1010,121 +983,6 @@ fn session_configured_from_thread_response( }) } -fn thread_initial_messages( - thread_id: &ThreadId, - turns: &[codex_app_server_protocol::Turn], - show_raw_agent_reasoning: bool, -) -> Option> { - let events: Vec = turns - .iter() - .flat_map(|turn| turn_initial_messages(thread_id, turn, show_raw_agent_reasoning)) - .collect(); - (!events.is_empty()).then_some(events) -} - -fn turn_initial_messages( - thread_id: &ThreadId, - turn: &codex_app_server_protocol::Turn, - show_raw_agent_reasoning: bool, -) -> Vec { - turn.items - .iter() - .cloned() - .filter_map(app_server_thread_item_to_core) - .flat_map(|item| match item { - TurnItem::UserMessage(item) => vec![item.as_legacy_event()], - TurnItem::Plan(item) => vec![EventMsg::ItemCompleted(ItemCompletedEvent { - thread_id: *thread_id, - turn_id: turn.id.clone(), - item: TurnItem::Plan(item), - })], - item => item.as_legacy_events(show_raw_agent_reasoning), - }) - .collect() -} - -fn app_server_thread_item_to_core(item: codex_app_server_protocol::ThreadItem) -> Option { - match item { - codex_app_server_protocol::ThreadItem::UserMessage { id, content } => { - Some(TurnItem::UserMessage(UserMessageItem { - id, - content: content - .into_iter() - .map(codex_app_server_protocol::UserInput::into_core) - .collect(), - })) - } - codex_app_server_protocol::ThreadItem::AgentMessage { id, text, phase } => { - Some(TurnItem::AgentMessage(AgentMessageItem { - id, - content: vec![AgentMessageContent::Text { text }], - phase, - })) - } - codex_app_server_protocol::ThreadItem::Plan { id, text } => { - Some(TurnItem::Plan(PlanItem { id, text })) - } - codex_app_server_protocol::ThreadItem::Reasoning { - id, - summary, - content, - } => Some(TurnItem::Reasoning(ReasoningItem { - id, - summary_text: summary, - raw_content: content, - })), - codex_app_server_protocol::ThreadItem::WebSearch { id, query, action } => { - Some(TurnItem::WebSearch(WebSearchItem { - id, - query, - action: app_server_web_search_action_to_core(action?)?, - })) - } - codex_app_server_protocol::ThreadItem::ImageGeneration { - id, - status, - revised_prompt, - result, - } => Some(TurnItem::ImageGeneration(ImageGenerationItem { - id, - status, - revised_prompt, - result, - saved_path: None, - })), - codex_app_server_protocol::ThreadItem::ContextCompaction { id } => { - Some(TurnItem::ContextCompaction(ContextCompactionItem { id })) - } - codex_app_server_protocol::ThreadItem::CommandExecution { .. } - | codex_app_server_protocol::ThreadItem::FileChange { .. } - | codex_app_server_protocol::ThreadItem::McpToolCall { .. } - | codex_app_server_protocol::ThreadItem::DynamicToolCall { .. } - | codex_app_server_protocol::ThreadItem::CollabAgentToolCall { .. } - | codex_app_server_protocol::ThreadItem::ImageView { .. } - | codex_app_server_protocol::ThreadItem::EnteredReviewMode { .. } - | codex_app_server_protocol::ThreadItem::ExitedReviewMode { .. } => None, - } -} - -fn app_server_web_search_action_to_core( - action: codex_app_server_protocol::WebSearchAction, -) -> Option { - match action { - codex_app_server_protocol::WebSearchAction::Search { query, queries } => { - Some(codex_protocol::models::WebSearchAction::Search { query, queries }) - } - codex_app_server_protocol::WebSearchAction::OpenPage { url } => { - Some(codex_protocol::models::WebSearchAction::OpenPage { url }) - } - codex_app_server_protocol::WebSearchAction::FindInPage { url, pattern } => { - Some(codex_protocol::models::WebSearchAction::FindInPage { url, pattern }) - } - codex_app_server_protocol::WebSearchAction::Other => { - Some(codex_protocol::models::WebSearchAction::Other) - } - } -} - fn app_server_rate_limit_snapshots_to_core( response: GetAccountRateLimitsResponse, ) -> Vec { @@ -1222,7 +1080,7 @@ mod tests { } #[test] - fn resume_response_restores_initial_messages_from_turn_items() { + fn resume_response_relies_on_snapshot_replay_not_initial_messages() { let thread_id = ThreadId::new(); let response = ThreadResumeResponse { thread: codex_app_server_protocol::Thread { @@ -1274,27 +1132,8 @@ mod tests { let started = started_thread_from_resume_response(response, /*show_raw_agent_reasoning*/ false) .expect("resume response should map"); - let initial_messages = started - .session_configured - .initial_messages - .expect("resume response should restore replay history"); - - assert_eq!(initial_messages.len(), 2); - match &initial_messages[0] { - EventMsg::UserMessage(event) => { - assert_eq!(event.message, "hello from history"); - assert_eq!(event.images.as_ref(), Some(&Vec::new())); - assert!(event.local_images.is_empty()); - assert!(event.text_elements.is_empty()); - } - other => panic!("expected replayed user message, got {other:?}"), - } - match &initial_messages[1] { - EventMsg::AgentMessage(event) => { - assert_eq!(event.message, "assistant reply"); - assert_eq!(event.phase, None); - } - other => panic!("expected replayed agent message, got {other:?}"), - } + assert!(started.session_configured.initial_messages.is_none()); + assert_eq!(started.thread.turns.len(), 1); + assert_eq!(started.thread.turns[0].items.len(), 2); } } From ebf5e63d252749a131c0cb029ad5886ee5a65880 Mon Sep 17 00:00:00 2001 From: Felipe Coury Date: Mon, 16 Mar 2026 22:12:34 -0300 Subject: [PATCH 09/12] fix(tui): preserve restored snapshot item history Replay restored non-message turn items using their legacy event shapes so resume and fork snapshots keep reasoning, web search, image generation, and context compaction cells visible in the transcript. Keep `ItemCompleted` replay for user, plan, and agent-message items, which still depend on committed-turn semantics in the TUI. Add adapter tests to pin the mixed replay contract. --- .../src/app/app_server_adapter.rs | 109 +++++++++++++++--- 1 file changed, 94 insertions(+), 15 deletions(-) diff --git a/codex-rs/tui_app_server/src/app/app_server_adapter.rs b/codex-rs/tui_app_server/src/app/app_server_adapter.rs index fe9f0e0a693c..4297eff975b2 100644 --- a/codex-rs/tui_app_server/src/app/app_server_adapter.rs +++ b/codex-rs/tui_app_server/src/app/app_server_adapter.rs @@ -446,11 +446,12 @@ fn token_usage_from_app_server( } /// Expand a single `Turn` into the event sequence the TUI would have -/// observed if it had been connected for the turn's entire lifetime: -/// `TurnStarted` → `ItemCompleted`* → terminal event. +/// observed if it had been connected for the turn's entire lifetime. /// -/// Items that cannot be mapped to a core `TurnItem` (e.g. unknown -/// variants) are silently skipped via `filter_map`. +/// Snapshot replay keeps committed-item semantics for user / plan / +/// agent-message items, while replaying the legacy events that still +/// drive rendering for reasoning, web-search, image-generation, and +/// context-compaction history cells. fn turn_snapshot_events(thread_id: ThreadId, turn: &Turn) -> Vec { let mut events = vec![Event { id: String::new(), @@ -461,17 +462,36 @@ fn turn_snapshot_events(thread_id: ThreadId, turn: &Turn) -> Vec { }), }]; - events.extend(turn.items.iter().filter_map(|item| { - let item = thread_item_to_core(item)?; - Some(Event { - id: String::new(), - msg: EventMsg::ItemCompleted(ItemCompletedEvent { - thread_id, - turn_id: turn.id.clone(), - item, - }), - }) - })); + for item in &turn.items { + let Some(item) = thread_item_to_core(item) else { + continue; + }; + match item { + TurnItem::UserMessage(_) | TurnItem::Plan(_) | TurnItem::AgentMessage(_) => { + events.push(Event { + id: String::new(), + msg: EventMsg::ItemCompleted(ItemCompletedEvent { + thread_id, + turn_id: turn.id.clone(), + item, + }), + }); + } + TurnItem::Reasoning(_) + | TurnItem::WebSearch(_) + | TurnItem::ImageGeneration(_) + | TurnItem::ContextCompaction(_) => { + events.extend( + item.as_legacy_events(/*show_raw_agent_reasoning*/ false) + .into_iter() + .map(|msg| Event { + id: String::new(), + msg, + }), + ); + } + } + } append_terminal_turn_events(&mut events, turn, /*include_failed_error*/ true); @@ -626,6 +646,7 @@ fn app_server_codex_error_info_to_core( mod tests { use super::server_notification_thread_events; use super::thread_snapshot_events; + use super::turn_snapshot_events; use codex_app_server_protocol::AgentMessageDeltaNotification; use codex_app_server_protocol::CodexErrorInfo; use codex_app_server_protocol::ItemCompletedNotification; @@ -921,4 +942,62 @@ mod tests { ); assert!(matches!(events[8].msg, EventMsg::TurnComplete(_))); } + + #[test] + fn bridges_non_message_snapshot_items_via_legacy_events() { + let events = turn_snapshot_events( + ThreadId::new(), + &Turn { + id: "turn-complete".to_string(), + items: vec![ + ThreadItem::Reasoning { + id: "reasoning-1".to_string(), + summary: vec!["Need to inspect config".to_string()], + content: vec!["hidden chain".to_string()], + }, + ThreadItem::WebSearch { + id: "search-1".to_string(), + query: "ratatui stylize".to_string(), + action: Some(codex_app_server_protocol::WebSearchAction::Other), + }, + ThreadItem::ImageGeneration { + id: "image-1".to_string(), + status: "completed".to_string(), + revised_prompt: Some("diagram".to_string()), + result: "image.png".to_string(), + }, + ThreadItem::ContextCompaction { + id: "compact-1".to_string(), + }, + ], + status: TurnStatus::Completed, + error: None, + }, + ); + + assert_eq!(events.len(), 6); + assert!(matches!(events[0].msg, EventMsg::TurnStarted(_))); + let EventMsg::AgentReasoning(reasoning) = &events[1].msg else { + panic!("expected reasoning replay"); + }; + assert_eq!(reasoning.text, "Need to inspect config"); + let EventMsg::WebSearchEnd(web_search) = &events[2].msg else { + panic!("expected web search replay"); + }; + assert_eq!(web_search.call_id, "search-1"); + assert_eq!(web_search.query, "ratatui stylize"); + assert_eq!( + web_search.action, + codex_protocol::models::WebSearchAction::Other + ); + let EventMsg::ImageGenerationEnd(image_generation) = &events[3].msg else { + panic!("expected image generation replay"); + }; + assert_eq!(image_generation.call_id, "image-1"); + assert_eq!(image_generation.status, "completed"); + assert_eq!(image_generation.revised_prompt.as_deref(), Some("diagram")); + assert_eq!(image_generation.result, "image.png"); + assert!(matches!(events[4].msg, EventMsg::ContextCompacted(_))); + assert!(matches!(events[5].msg, EventMsg::TurnComplete(_))); + } } From cfe71650e9eeb7df90ebf343cfef7e84b94f15ec Mon Sep 17 00:00:00 2001 From: Felipe Coury Date: Mon, 16 Mar 2026 22:36:57 -0300 Subject: [PATCH 10/12] fix(tui): preserve raw reasoning in restored snapshots Carry `show_raw_agent_reasoning` through the app-server start/resume state so snapshot replay keeps raw reasoning history when users enable that transcript mode. Update snapshot replay tests in the adapter and app restore path to pin that resumed and forked threads retain the raw reasoning events instead of dropping them during hydration. --- codex-rs/tui_app_server/src/app.rs | 88 +++++++++- .../src/app/app_server_adapter.rs | 157 +++++++++++------- .../tui_app_server/src/app_server_session.rs | 9 +- 3 files changed, 193 insertions(+), 61 deletions(-) diff --git a/codex-rs/tui_app_server/src/app.rs b/codex-rs/tui_app_server/src/app.rs index d781a6863f17..3e749e45db6e 100644 --- a/codex-rs/tui_app_server/src/app.rs +++ b/codex-rs/tui_app_server/src/app.rs @@ -2146,7 +2146,8 @@ impl App { id: String::new(), msg: EventMsg::SessionConfigured(session_configured.clone()), }; - let history_events = thread_snapshot_events(&started.thread); + let history_events = + thread_snapshot_events(&started.thread, started.show_raw_agent_reasoning); let replay_snapshot = { let mut replay_store = ThreadEventStore::new(history_events.len().saturating_add(1)); replay_store.push_event(session_event.clone()); @@ -6785,6 +6786,7 @@ guardian_approval = true network_proxy: None, rollout_path: Some(PathBuf::new()), }, + show_raw_agent_reasoning: false, }) .await?; @@ -6888,6 +6890,7 @@ guardian_approval = true network_proxy: None, rollout_path: Some(PathBuf::new()), }, + show_raw_agent_reasoning: false, }) .await?; @@ -6919,6 +6922,89 @@ guardian_approval = true Ok(()) } + #[tokio::test] + async fn restore_started_app_server_thread_replays_raw_reasoning_when_enabled() -> Result<()> { + let (mut app, mut app_event_rx, _op_rx) = make_test_app_with_channels().await; + let thread_id = ThreadId::new(); + + app.restore_started_app_server_thread(AppServerStartedThread { + thread: Thread { + id: thread_id.to_string(), + preview: "hello".to_string(), + ephemeral: false, + model_provider: "test-provider".to_string(), + created_at: 0, + updated_at: 0, + status: ThreadStatus::Idle, + path: None, + cwd: PathBuf::from("/tmp/project"), + cli_version: "test".to_string(), + source: SessionSource::Cli.into(), + agent_nickname: None, + agent_role: None, + git_info: None, + name: Some("restored".to_string()), + turns: vec![Turn { + id: "turn-1".to_string(), + items: vec![ThreadItem::Reasoning { + id: "reasoning-1".to_string(), + summary: vec!["summary reasoning".to_string()], + content: vec!["raw reasoning".to_string()], + }], + status: TurnStatus::Completed, + error: None, + }], + }, + session_configured: SessionConfiguredEvent { + session_id: thread_id, + forked_from_id: None, + thread_name: Some("restored".to_string()), + model: "gpt-test".to_string(), + model_provider_id: "test-provider".to_string(), + service_tier: None, + approval_policy: AskForApproval::Never, + approvals_reviewer: ApprovalsReviewer::User, + sandbox_policy: SandboxPolicy::new_read_only_policy(), + cwd: PathBuf::from("/tmp/project"), + reasoning_effort: None, + history_log_id: 0, + history_entry_count: 0, + initial_messages: None, + network_proxy: None, + rollout_path: Some(PathBuf::new()), + }, + show_raw_agent_reasoning: true, + }) + .await?; + + while let Ok(event) = app_event_rx.try_recv() { + if let AppEvent::InsertHistoryCell(cell) = event { + let cell: Arc = cell.into(); + app.transcript_cells.push(cell); + } + } + + let channel = app + .thread_event_channels + .get(&thread_id) + .expect("restored thread channel should exist"); + let snapshot = channel.store.lock().await.snapshot(); + let replayed_raw_reasoning = snapshot.events.iter().any(|event| { + matches!( + &event.msg, + EventMsg::AgentReasoningRawContent(raw) if raw.text == "raw reasoning" + ) + }); + + assert!( + replayed_raw_reasoning, + "expected restored snapshot to keep raw reasoning event: {:?}", + snapshot.events + ); + + Ok(()) + } + #[test] fn thread_event_store_tracks_active_turn_lifecycle() { let mut store = ThreadEventStore::new(8); diff --git a/codex-rs/tui_app_server/src/app/app_server_adapter.rs b/codex-rs/tui_app_server/src/app/app_server_adapter.rs index 4297eff975b2..0fff49fd20c1 100644 --- a/codex-rs/tui_app_server/src/app/app_server_adapter.rs +++ b/codex-rs/tui_app_server/src/app/app_server_adapter.rs @@ -207,7 +207,10 @@ impl App { /// Each turn is expanded into `TurnStarted`, zero or more `ItemCompleted`, /// and a terminal event that matches the turn's `TurnStatus`. Returns an /// empty vec (with a warning log) if the thread ID is not a valid UUID. -pub(super) fn thread_snapshot_events(thread: &Thread) -> Vec { +pub(super) fn thread_snapshot_events( + thread: &Thread, + show_raw_agent_reasoning: bool, +) -> Vec { let Ok(thread_id) = ThreadId::from_string(&thread.id) else { tracing::warn!( thread_id = %thread.id, @@ -219,7 +222,7 @@ pub(super) fn thread_snapshot_events(thread: &Thread) -> Vec { thread .turns .iter() - .flat_map(|turn| turn_snapshot_events(thread_id, turn)) + .flat_map(|turn| turn_snapshot_events(thread_id, turn, show_raw_agent_reasoning)) .collect() } @@ -452,7 +455,11 @@ fn token_usage_from_app_server( /// agent-message items, while replaying the legacy events that still /// drive rendering for reasoning, web-search, image-generation, and /// context-compaction history cells. -fn turn_snapshot_events(thread_id: ThreadId, turn: &Turn) -> Vec { +fn turn_snapshot_events( + thread_id: ThreadId, + turn: &Turn, + show_raw_agent_reasoning: bool, +) -> Vec { let mut events = vec![Event { id: String::new(), msg: EventMsg::TurnStarted(TurnStartedEvent { @@ -482,7 +489,7 @@ fn turn_snapshot_events(thread_id: ThreadId, turn: &Turn) -> Vec { | TurnItem::ImageGeneration(_) | TurnItem::ContextCompaction(_) => { events.extend( - item.as_legacy_events(/*show_raw_agent_reasoning*/ false) + item.as_legacy_events(show_raw_agent_reasoning) .into_iter() .map(|msg| Event { id: String::new(), @@ -865,60 +872,63 @@ mod tests { #[test] fn bridges_thread_snapshot_turns_for_resume_restore() { let thread_id = ThreadId::new(); - let events = thread_snapshot_events(&Thread { - id: thread_id.to_string(), - preview: "hello".to_string(), - ephemeral: false, - model_provider: "openai".to_string(), - created_at: 0, - updated_at: 0, - status: ThreadStatus::Idle, - path: None, - cwd: PathBuf::from("/tmp/project"), - cli_version: "test".to_string(), - source: SessionSource::Cli.into(), - agent_nickname: None, - agent_role: None, - git_info: None, - name: Some("restore".to_string()), - turns: vec![ - Turn { - id: "turn-complete".to_string(), - items: vec![ - ThreadItem::UserMessage { - id: "user-1".to_string(), - content: vec![codex_app_server_protocol::UserInput::Text { - text: "hello".to_string(), - text_elements: Vec::new(), - }], - }, - ThreadItem::AgentMessage { - id: "assistant-1".to_string(), - text: "hi".to_string(), - phase: Some(MessagePhase::FinalAnswer), - }, - ], - status: TurnStatus::Completed, - error: None, - }, - Turn { - id: "turn-interrupted".to_string(), - items: Vec::new(), - status: TurnStatus::Interrupted, - error: None, - }, - Turn { - id: "turn-failed".to_string(), - items: Vec::new(), - status: TurnStatus::Failed, - error: Some(TurnError { - message: "request failed".to_string(), - codex_error_info: Some(CodexErrorInfo::Other), - additional_details: None, - }), - }, - ], - }); + let events = thread_snapshot_events( + &Thread { + id: thread_id.to_string(), + preview: "hello".to_string(), + ephemeral: false, + model_provider: "openai".to_string(), + created_at: 0, + updated_at: 0, + status: ThreadStatus::Idle, + path: None, + cwd: PathBuf::from("/tmp/project"), + cli_version: "test".to_string(), + source: SessionSource::Cli.into(), + agent_nickname: None, + agent_role: None, + git_info: None, + name: Some("restore".to_string()), + turns: vec![ + Turn { + id: "turn-complete".to_string(), + items: vec![ + ThreadItem::UserMessage { + id: "user-1".to_string(), + content: vec![codex_app_server_protocol::UserInput::Text { + text: "hello".to_string(), + text_elements: Vec::new(), + }], + }, + ThreadItem::AgentMessage { + id: "assistant-1".to_string(), + text: "hi".to_string(), + phase: Some(MessagePhase::FinalAnswer), + }, + ], + status: TurnStatus::Completed, + error: None, + }, + Turn { + id: "turn-interrupted".to_string(), + items: Vec::new(), + status: TurnStatus::Interrupted, + error: None, + }, + Turn { + id: "turn-failed".to_string(), + items: Vec::new(), + status: TurnStatus::Failed, + error: Some(TurnError { + message: "request failed".to_string(), + codex_error_info: Some(CodexErrorInfo::Other), + additional_details: None, + }), + }, + ], + }, + /*show_raw_agent_reasoning*/ false, + ); assert_eq!(events.len(), 9); assert!(matches!(events[0].msg, EventMsg::TurnStarted(_))); @@ -973,6 +983,7 @@ mod tests { status: TurnStatus::Completed, error: None, }, + /*show_raw_agent_reasoning*/ false, ); assert_eq!(events.len(), 6); @@ -1000,4 +1011,34 @@ mod tests { assert!(matches!(events[4].msg, EventMsg::ContextCompacted(_))); assert!(matches!(events[5].msg, EventMsg::TurnComplete(_))); } + + #[test] + fn bridges_raw_reasoning_snapshot_items_when_enabled() { + let events = turn_snapshot_events( + ThreadId::new(), + &Turn { + id: "turn-complete".to_string(), + items: vec![ThreadItem::Reasoning { + id: "reasoning-1".to_string(), + summary: vec!["Need to inspect config".to_string()], + content: vec!["hidden chain".to_string()], + }], + status: TurnStatus::Completed, + error: None, + }, + /*show_raw_agent_reasoning*/ true, + ); + + assert_eq!(events.len(), 4); + assert!(matches!(events[0].msg, EventMsg::TurnStarted(_))); + let EventMsg::AgentReasoning(reasoning) = &events[1].msg else { + panic!("expected reasoning replay"); + }; + assert_eq!(reasoning.text, "Need to inspect config"); + let EventMsg::AgentReasoningRawContent(raw_reasoning) = &events[2].msg else { + panic!("expected raw reasoning replay"); + }; + assert_eq!(raw_reasoning.text, "hidden chain"); + assert!(matches!(events[3].msg, EventMsg::TurnComplete(_))); + } } diff --git a/codex-rs/tui_app_server/src/app_server_session.rs b/codex-rs/tui_app_server/src/app_server_session.rs index 1539b2a9cf9a..514005193bec 100644 --- a/codex-rs/tui_app_server/src/app_server_session.rs +++ b/codex-rs/tui_app_server/src/app_server_session.rs @@ -123,6 +123,7 @@ impl ThreadParamsMode { pub(crate) struct AppServerStartedThread { pub(crate) thread: Thread, pub(crate) session_configured: SessionConfiguredEvent, + pub(crate) show_raw_agent_reasoning: bool, } impl AppServerSession { @@ -841,12 +842,13 @@ fn started_thread_from_start_response( Ok(AppServerStartedThread { thread: response.thread, session_configured, + show_raw_agent_reasoning: false, }) } fn started_thread_from_resume_response( response: ThreadResumeResponse, - _show_raw_agent_reasoning: bool, + show_raw_agent_reasoning: bool, ) -> Result { let session_configured = session_configured_from_thread_resume_response(&response) .map_err(color_eyre::eyre::Report::msg)?; @@ -854,12 +856,13 @@ fn started_thread_from_resume_response( Ok(AppServerStartedThread { thread, session_configured, + show_raw_agent_reasoning, }) } fn started_thread_from_fork_response( response: ThreadForkResponse, - _show_raw_agent_reasoning: bool, + show_raw_agent_reasoning: bool, ) -> Result { let session_configured = session_configured_from_thread_fork_response(&response) .map_err(color_eyre::eyre::Report::msg)?; @@ -867,6 +870,7 @@ fn started_thread_from_fork_response( Ok(AppServerStartedThread { thread, session_configured, + show_raw_agent_reasoning, }) } @@ -1133,6 +1137,7 @@ mod tests { started_thread_from_resume_response(response, /*show_raw_agent_reasoning*/ false) .expect("resume response should map"); assert!(started.session_configured.initial_messages.is_none()); + assert!(!started.show_raw_agent_reasoning); assert_eq!(started.thread.turns.len(), 1); assert_eq!(started.thread.turns[0].items.len(), 2); } From 1df21351a3a748763aa182aeee9a3e1567f8b76b Mon Sep 17 00:00:00 2001 From: Felipe Coury Date: Tue, 17 Mar 2026 11:32:17 -0300 Subject: [PATCH 11/12] fix(tui): keep resumed prompts after restored history Defer `initial_user_message` submission while a restored thread snapshot is replayed so resume and fork startup prompts stay after hydrated history in the transcript. Add a regression test that restores remote history with a startup prompt and asserts the restored transcript renders before the prompt submission. --- codex-rs/tui_app_server/src/app.rs | 112 ++++++++++++++++++ codex-rs/tui_app_server/src/chatwidget.rs | 27 ++++- .../tui_app_server/src/chatwidget/tests.rs | 1 + 3 files changed, 139 insertions(+), 1 deletion(-) diff --git a/codex-rs/tui_app_server/src/app.rs b/codex-rs/tui_app_server/src/app.rs index 3e749e45db6e..710ab0c1d49a 100644 --- a/codex-rs/tui_app_server/src/app.rs +++ b/codex-rs/tui_app_server/src/app.rs @@ -2241,6 +2241,8 @@ impl App { snapshot: ThreadEventSnapshot, resume_restored_queue: bool, ) { + self.chat_widget + .set_initial_user_message_submit_suppressed(/*suppressed*/ true); if let Some(event) = snapshot.session_configured { self.handle_codex_event_replay(event); } @@ -2253,6 +2255,9 @@ impl App { } self.chat_widget .set_queue_autosend_suppressed(/*suppressed*/ false); + self.chat_widget + .set_initial_user_message_submit_suppressed(/*suppressed*/ false); + self.chat_widget.submit_initial_user_message_if_pending(); if resume_restored_queue { self.chat_widget.maybe_send_next_queued_input(); } @@ -6831,6 +6836,113 @@ guardian_approval = true Ok(()) } + #[tokio::test] + async fn restore_started_app_server_thread_submits_initial_prompt_after_history_replay() + -> Result<()> { + let (mut app, mut app_event_rx, mut op_rx) = make_test_app_with_channels().await; + let thread_id = ThreadId::new(); + app.chat_widget.set_initial_user_message_for_test( + crate::chatwidget::create_initial_user_message( + Some("resume prompt".to_string()), + Vec::new(), + Vec::new(), + ), + ); + + app.restore_started_app_server_thread(AppServerStartedThread { + thread: Thread { + id: thread_id.to_string(), + preview: "hello".to_string(), + ephemeral: false, + model_provider: "test-provider".to_string(), + created_at: 0, + updated_at: 0, + status: ThreadStatus::Idle, + path: None, + cwd: PathBuf::from("/tmp/project"), + cli_version: "test".to_string(), + source: SessionSource::Cli.into(), + agent_nickname: None, + agent_role: None, + git_info: None, + name: Some("restored".to_string()), + turns: vec![Turn { + id: "turn-1".to_string(), + items: vec![ + ThreadItem::UserMessage { + id: "user-1".to_string(), + content: vec![codex_app_server_protocol::UserInput::Text { + text: "hello from remote".to_string(), + text_elements: Vec::new(), + }], + }, + ThreadItem::AgentMessage { + id: "assistant-1".to_string(), + text: "restored response".to_string(), + phase: None, + }, + ], + status: TurnStatus::Completed, + error: None, + }], + }, + session_configured: SessionConfiguredEvent { + session_id: thread_id, + forked_from_id: None, + thread_name: Some("restored".to_string()), + model: "gpt-test".to_string(), + model_provider_id: "test-provider".to_string(), + service_tier: None, + approval_policy: AskForApproval::Never, + approvals_reviewer: ApprovalsReviewer::User, + sandbox_policy: SandboxPolicy::new_read_only_policy(), + cwd: PathBuf::from("/tmp/project"), + reasoning_effort: None, + history_log_id: 0, + history_entry_count: 0, + initial_messages: None, + network_proxy: None, + rollout_path: Some(PathBuf::new()), + }, + show_raw_agent_reasoning: false, + }) + .await?; + + while let Ok(event) = app_event_rx.try_recv() { + if let AppEvent::InsertHistoryCell(cell) = event { + let cell: Arc = cell.into(); + app.transcript_cells.push(cell); + } + } + + let user_messages: Vec = app + .transcript_cells + .iter() + .filter_map(|cell| { + cell.as_any() + .downcast_ref::() + .map(|cell| cell.message.clone()) + }) + .collect(); + + assert_eq!( + user_messages, + vec!["hello from remote".to_string(), "resume prompt".to_string()] + ); + match next_user_turn_op(&mut op_rx) { + Op::UserTurn { items, .. } => assert_eq!( + items, + vec![UserInput::Text { + text: "resume prompt".to_string(), + text_elements: Vec::new(), + }] + ), + other => panic!("expected resume prompt submission, got {other:?}"), + } + + Ok(()) + } + #[tokio::test] async fn restore_started_app_server_thread_replays_history_beyond_store_capacity() -> Result<()> { diff --git a/codex-rs/tui_app_server/src/chatwidget.rs b/codex-rs/tui_app_server/src/chatwidget.rs index 0b4fb7c184a8..e4101bb11ebd 100644 --- a/codex-rs/tui_app_server/src/chatwidget.rs +++ b/codex-rs/tui_app_server/src/chatwidget.rs @@ -719,6 +719,10 @@ pub(crate) struct ChatWidget { // When resuming an existing session (selected via resume picker), avoid an // immediate redraw on SessionConfigured to prevent a gratuitous UI flicker. suppress_session_configured_redraw: bool, + // During snapshot restore, defer startup prompt submission until replayed + // history has been rendered so resumed/forked prompts keep chronological + // order. + suppress_initial_user_message_submit: bool, // User messages queued while a turn is in progress queued_user_messages: VecDeque, // Steers already submitted to core but not yet committed into history. @@ -1427,7 +1431,11 @@ impl ChatWidget { self.prefetch_connectors(); } if let Some(user_message) = self.initial_user_message.take() { - self.submit_user_message(user_message); + if self.suppress_initial_user_message_submit { + self.initial_user_message = Some(user_message); + } else { + self.submit_user_message(user_message); + } } if let Some(forked_from_id) = forked_from_id { self.emit_forked_thread_event(forked_from_id); @@ -1437,6 +1445,21 @@ impl ChatWidget { } } + pub(crate) fn set_initial_user_message_submit_suppressed(&mut self, suppressed: bool) { + self.suppress_initial_user_message_submit = suppressed; + } + + #[cfg(test)] + pub(crate) fn set_initial_user_message_for_test(&mut self, user_message: Option) { + self.initial_user_message = user_message; + } + + pub(crate) fn submit_initial_user_message_if_pending(&mut self) { + if let Some(user_message) = self.initial_user_message.take() { + self.submit_user_message(user_message); + } + } + fn emit_forked_thread_event(&self, forked_from_id: ThreadId) { let app_event_tx = self.app_event_tx.clone(); let codex_home = self.config.codex_home.clone(); @@ -3624,6 +3647,7 @@ impl ChatWidget { show_welcome_banner: is_first_run, startup_tooltip_override, suppress_session_configured_redraw: false, + suppress_initial_user_message_submit: false, pending_notification: None, quit_shortcut_expires_at: None, quit_shortcut_key: None, @@ -3816,6 +3840,7 @@ impl ChatWidget { show_welcome_banner: false, startup_tooltip_override: None, suppress_session_configured_redraw: true, + suppress_initial_user_message_submit: false, pending_notification: None, quit_shortcut_expires_at: None, quit_shortcut_key: None, diff --git a/codex-rs/tui_app_server/src/chatwidget/tests.rs b/codex-rs/tui_app_server/src/chatwidget/tests.rs index 07770182b224..6468e3de4773 100644 --- a/codex-rs/tui_app_server/src/chatwidget/tests.rs +++ b/codex-rs/tui_app_server/src/chatwidget/tests.rs @@ -1898,6 +1898,7 @@ async fn make_chatwidget_manual( submit_pending_steers_after_interrupt: false, queued_message_edit_binding: crate::key_hint::alt(KeyCode::Up), suppress_session_configured_redraw: false, + suppress_initial_user_message_submit: false, pending_notification: None, quit_shortcut_expires_at: None, quit_shortcut_key: None, From 6151d8a9ea33b8d5ef4e7e3c88a580fbfc91472c Mon Sep 17 00:00:00 2001 From: Felipe Coury Date: Tue, 17 Mar 2026 11:56:35 -0300 Subject: [PATCH 12/12] fix(tui): annotate restore replay literals --- codex-rs/tui_app_server/src/app.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/codex-rs/tui_app_server/src/app.rs b/codex-rs/tui_app_server/src/app.rs index 710ab0c1d49a..c08e74c33743 100644 --- a/codex-rs/tui_app_server/src/app.rs +++ b/codex-rs/tui_app_server/src/app.rs @@ -2159,7 +2159,10 @@ impl App { self.primary_thread_id = Some(thread_id); self.primary_session_configured = Some(session_configured); - self.upsert_agent_picker_thread(thread_id, None, None, false); + self.upsert_agent_picker_thread( + thread_id, /*agent_nickname*/ None, /*agent_role*/ None, + /*is_closed*/ false, + ); let store = { let channel = self.ensure_thread_channel(thread_id); @@ -2174,7 +2177,7 @@ impl App { } self.activate_thread_channel(thread_id).await; - self.replay_thread_snapshot(replay_snapshot, false); + self.replay_thread_snapshot(replay_snapshot, /*resume_restored_queue*/ false); Ok(()) }