diff --git a/codex-rs/app-server-protocol/src/protocol/thread_history.rs b/codex-rs/app-server-protocol/src/protocol/thread_history.rs index 6772d6f92a5..e6c679b4186 100644 --- a/codex-rs/app-server-protocol/src/protocol/thread_history.rs +++ b/codex-rs/app-server-protocol/src/protocol/thread_history.rs @@ -197,8 +197,12 @@ impl ThreadHistoryBuilder { if !payload.message.trim().is_empty() { content.push(UserInput::Text { text: payload.message.clone(), - // TODO: Thread text element ranges into thread history. Empty keeps old behavior. - text_elements: Vec::new(), + text_elements: payload + .text_elements + .iter() + .cloned() + .map(Into::into) + .collect(), }); } if let Some(images) = &payload.images { @@ -206,6 +210,9 @@ impl ThreadHistoryBuilder { content.push(UserInput::Image { url: image.clone() }); } } + for path in &payload.local_images { + content.push(UserInput::LocalImage { path: path.clone() }); + } content } } diff --git a/codex-rs/app-server-protocol/src/protocol/v1.rs b/codex-rs/app-server-protocol/src/protocol/v1.rs index ecc9d7c07de..a5bc69be5df 100644 --- a/codex-rs/app-server-protocol/src/protocol/v1.rs +++ b/codex-rs/app-server-protocol/src/protocol/v1.rs @@ -16,6 +16,8 @@ use codex_protocol::protocol::ReviewDecision; use codex_protocol::protocol::SandboxPolicy; use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::TurnAbortReason; +use codex_protocol::user_input::ByteRange as CoreByteRange; +use codex_protocol::user_input::TextElement as CoreTextElement; use codex_utils_absolute_path::AbsolutePathBuf; use schemars::JsonSchema; use serde::Deserialize; @@ -444,9 +446,74 @@ pub struct RemoveConversationListenerParams { #[serde(rename_all = "camelCase")] #[serde(tag = "type", content = "data")] pub enum InputItem { - Text { text: String }, - Image { image_url: String }, - LocalImage { path: PathBuf }, + Text { + text: String, + /// UI-defined spans within `text` used to render or persist special elements. + #[serde(default)] + text_elements: Vec, + }, + Image { + image_url: String, + }, + LocalImage { + path: PathBuf, + }, +} + +#[derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(rename = "ByteRange")] +pub struct V1ByteRange { + /// Start byte offset (inclusive) within the UTF-8 text buffer. + pub start: usize, + /// End byte offset (exclusive) within the UTF-8 text buffer. + pub end: usize, +} + +impl From for V1ByteRange { + fn from(value: CoreByteRange) -> Self { + Self { + start: value.start, + end: value.end, + } + } +} + +impl From for CoreByteRange { + fn from(value: V1ByteRange) -> Self { + Self { + start: value.start, + end: value.end, + } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(rename = "TextElement")] +pub struct V1TextElement { + /// Byte range in the parent `text` buffer that this element occupies. + pub byte_range: V1ByteRange, + /// Optional human-readable placeholder for the element, displayed in the UI. + pub placeholder: Option, +} + +impl From for V1TextElement { + fn from(value: CoreTextElement) -> Self { + Self { + byte_range: value.byte_range.into(), + placeholder: value.placeholder, + } + } +} + +impl From for CoreTextElement { + fn from(value: V1TextElement) -> Self { + Self { + byte_range: value.byte_range.into(), + placeholder: value.placeholder, + } + } } #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 4ff12915449..ac930e1f429 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -30,6 +30,8 @@ use codex_protocol::protocol::SkillMetadata as CoreSkillMetadata; use codex_protocol::protocol::SkillScope as CoreSkillScope; use codex_protocol::protocol::TokenUsage as CoreTokenUsage; use codex_protocol::protocol::TokenUsageInfo as CoreTokenUsageInfo; +use codex_protocol::user_input::ByteRange as CoreByteRange; +use codex_protocol::user_input::TextElement as CoreTextElement; use codex_protocol::user_input::UserInput as CoreUserInput; use codex_utils_absolute_path::AbsolutePathBuf; use mcp_types::ContentBlock as McpContentBlock; @@ -1589,6 +1591,24 @@ pub struct ByteRange { pub end: usize, } +impl From for ByteRange { + fn from(value: CoreByteRange) -> Self { + Self { + start: value.start, + end: value.end, + } + } +} + +impl From for CoreByteRange { + fn from(value: ByteRange) -> Self { + Self { + start: value.start, + end: value.end, + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] @@ -1599,6 +1619,24 @@ pub struct TextElement { pub placeholder: Option, } +impl From for TextElement { + fn from(value: CoreTextElement) -> Self { + Self { + byte_range: value.byte_range.into(), + placeholder: value.placeholder, + } + } +} + +impl From for CoreTextElement { + fn from(value: TextElement) -> Self { + Self { + byte_range: value.byte_range.into(), + placeholder: value.placeholder, + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(tag = "type", rename_all = "camelCase")] #[ts(tag = "type")] @@ -1625,10 +1663,12 @@ pub enum UserInput { impl UserInput { pub fn into_core(self) -> CoreUserInput { match self { - UserInput::Text { text, .. } => CoreUserInput::Text { + UserInput::Text { text, - // TODO: Thread text element ranges into v2 inputs. Empty keeps old behavior. - text_elements: Vec::new(), + text_elements, + } => CoreUserInput::Text { + text, + text_elements: text_elements.into_iter().map(Into::into).collect(), }, UserInput::Image { url } => CoreUserInput::Image { image_url: url }, UserInput::LocalImage { path } => CoreUserInput::LocalImage { path }, @@ -1640,10 +1680,12 @@ impl UserInput { impl From for UserInput { fn from(value: CoreUserInput) -> Self { match value { - CoreUserInput::Text { text, .. } => UserInput::Text { + CoreUserInput::Text { + text, + text_elements, + } => UserInput::Text { text, - // TODO: Thread text element ranges from core into v2 inputs. - text_elements: Vec::new(), + text_elements: text_elements.into_iter().map(Into::into).collect(), }, CoreUserInput::Image { image_url } => UserInput::Image { url: image_url }, CoreUserInput::LocalImage { path } => UserInput::LocalImage { path }, diff --git a/codex-rs/app-server-test-client/src/main.rs b/codex-rs/app-server-test-client/src/main.rs index 809c2b5a338..9df895a70a8 100644 --- a/codex-rs/app-server-test-client/src/main.rs +++ b/codex-rs/app-server-test-client/src/main.rs @@ -477,6 +477,7 @@ impl CodexClient { conversation_id: *conversation_id, items: vec![InputItem::Text { text: message.to_string(), + text_elements: Vec::new(), }], }, }; diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 770e8de4e35..fc7115447db 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -3125,10 +3125,12 @@ impl CodexMessageProcessor { let mapped_items: Vec = items .into_iter() .map(|item| match item { - WireInputItem::Text { text } => CoreInputItem::Text { + WireInputItem::Text { text, - // TODO: Thread text element ranges into v1 input handling. - text_elements: Vec::new(), + text_elements, + } => CoreInputItem::Text { + text, + text_elements: text_elements.into_iter().map(Into::into).collect(), }, WireInputItem::Image { image_url } => CoreInputItem::Image { image_url }, WireInputItem::LocalImage { path } => CoreInputItem::LocalImage { path }, @@ -3175,10 +3177,12 @@ impl CodexMessageProcessor { let mapped_items: Vec = items .into_iter() .map(|item| match item { - WireInputItem::Text { text } => CoreInputItem::Text { + WireInputItem::Text { text, - // TODO: Thread text element ranges into v1 input handling. - text_elements: Vec::new(), + text_elements, + } => CoreInputItem::Text { + text, + text_elements: text_elements.into_iter().map(Into::into).collect(), }, WireInputItem::Image { image_url } => CoreInputItem::Image { image_url }, WireInputItem::LocalImage { path } => CoreInputItem::LocalImage { path }, @@ -3341,6 +3345,7 @@ impl CodexMessageProcessor { id: turn_id.clone(), content: vec![V2UserInput::Text { text: display_text.to_string(), + // Review prompt display text is synthesized; no UI element ranges to preserve. text_elements: Vec::new(), }], }] diff --git a/codex-rs/app-server/tests/common/lib.rs b/codex-rs/app-server/tests/common/lib.rs index af4982b846b..48577db110c 100644 --- a/codex-rs/app-server/tests/common/lib.rs +++ b/codex-rs/app-server/tests/common/lib.rs @@ -29,6 +29,7 @@ pub use responses::create_exec_command_sse_response; pub use responses::create_final_assistant_message_sse_response; pub use responses::create_shell_command_sse_response; pub use rollout::create_fake_rollout; +pub use rollout::create_fake_rollout_with_text_elements; use serde::de::DeserializeOwned; pub fn to_response(response: JSONRPCResponse) -> anyhow::Result { diff --git a/codex-rs/app-server/tests/common/rollout.rs b/codex-rs/app-server/tests/common/rollout.rs index b5829716af6..784b79b98d7 100644 --- a/codex-rs/app-server/tests/common/rollout.rs +++ b/codex-rs/app-server/tests/common/rollout.rs @@ -87,3 +87,75 @@ pub fn create_fake_rollout( fs::write(file_path, lines.join("\n") + "\n")?; Ok(uuid_str) } + +pub fn create_fake_rollout_with_text_elements( + codex_home: &Path, + filename_ts: &str, + meta_rfc3339: &str, + preview: &str, + text_elements: Vec, + model_provider: Option<&str>, + git_info: Option, +) -> Result { + let uuid = Uuid::new_v4(); + let uuid_str = uuid.to_string(); + let conversation_id = ThreadId::from_string(&uuid_str)?; + + // sessions/YYYY/MM/DD derived from filename_ts (YYYY-MM-DDThh-mm-ss) + let year = &filename_ts[0..4]; + let month = &filename_ts[5..7]; + let day = &filename_ts[8..10]; + let dir = codex_home.join("sessions").join(year).join(month).join(day); + fs::create_dir_all(&dir)?; + + let file_path = dir.join(format!("rollout-{filename_ts}-{uuid}.jsonl")); + + // Build JSONL lines + let meta = SessionMeta { + id: conversation_id, + timestamp: meta_rfc3339.to_string(), + cwd: PathBuf::from("/"), + originator: "codex".to_string(), + cli_version: "0.0.0".to_string(), + instructions: None, + source: SessionSource::Cli, + model_provider: model_provider.map(str::to_string), + }; + let payload = serde_json::to_value(SessionMetaLine { + meta, + git: git_info, + })?; + + let lines = [ + json!( { + "timestamp": meta_rfc3339, + "type": "session_meta", + "payload": payload + }) + .to_string(), + json!( { + "timestamp": meta_rfc3339, + "type":"response_item", + "payload": { + "type":"message", + "role":"user", + "content":[{"type":"input_text","text": preview}] + } + }) + .to_string(), + json!( { + "timestamp": meta_rfc3339, + "type":"event_msg", + "payload": { + "type":"user_message", + "message": preview, + "text_elements": text_elements, + "local_images": [] + } + }) + .to_string(), + ]; + + fs::write(file_path, lines.join("\n") + "\n")?; + Ok(uuid_str) +} diff --git a/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs b/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs index 456206af896..b97acfc40c0 100644 --- a/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs +++ b/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs @@ -114,6 +114,7 @@ async fn test_codex_jsonrpc_conversation_flow() -> Result<()> { conversation_id, items: vec![codex_app_server_protocol::InputItem::Text { text: "text".to_string(), + text_elements: Vec::new(), }], }) .await?; @@ -241,6 +242,7 @@ async fn test_send_user_turn_changes_approval_policy_behavior() -> Result<()> { conversation_id, items: vec![codex_app_server_protocol::InputItem::Text { text: "run python".to_string(), + text_elements: Vec::new(), }], }) .await?; @@ -296,6 +298,7 @@ async fn test_send_user_turn_changes_approval_policy_behavior() -> Result<()> { conversation_id, items: vec![codex_app_server_protocol::InputItem::Text { text: "run python again".to_string(), + text_elements: Vec::new(), }], cwd: working_directory.clone(), approval_policy: AskForApproval::Never, @@ -405,6 +408,7 @@ async fn test_send_user_turn_updates_sandbox_and_cwd_between_turns() -> Result<( conversation_id, items: vec![InputItem::Text { text: "first turn".to_string(), + text_elements: Vec::new(), }], cwd: first_cwd.clone(), approval_policy: AskForApproval::Never, @@ -437,6 +441,7 @@ async fn test_send_user_turn_updates_sandbox_and_cwd_between_turns() -> Result<( conversation_id, items: vec![InputItem::Text { text: "second turn".to_string(), + text_elements: Vec::new(), }], cwd: second_cwd.clone(), approval_policy: AskForApproval::Never, diff --git a/codex-rs/app-server/tests/suite/create_thread.rs b/codex-rs/app-server/tests/suite/create_thread.rs index 9709af03bf6..8ad33425393 100644 --- a/codex-rs/app-server/tests/suite/create_thread.rs +++ b/codex-rs/app-server/tests/suite/create_thread.rs @@ -77,6 +77,7 @@ async fn test_conversation_create_and_send_message_ok() -> Result<()> { conversation_id, items: vec![InputItem::Text { text: "Hello".to_string(), + text_elements: Vec::new(), }], }) .await?; diff --git a/codex-rs/app-server/tests/suite/interrupt.rs b/codex-rs/app-server/tests/suite/interrupt.rs index 6248581e28c..f8dc2a7e8e1 100644 --- a/codex-rs/app-server/tests/suite/interrupt.rs +++ b/codex-rs/app-server/tests/suite/interrupt.rs @@ -105,6 +105,7 @@ async fn shell_command_interruption() -> anyhow::Result<()> { conversation_id, items: vec![codex_app_server_protocol::InputItem::Text { text: "run first sleep command".to_string(), + text_elements: Vec::new(), }], }) .await?; diff --git a/codex-rs/app-server/tests/suite/output_schema.rs b/codex-rs/app-server/tests/suite/output_schema.rs index 4ec500a245c..c120a7fe2da 100644 --- a/codex-rs/app-server/tests/suite/output_schema.rs +++ b/codex-rs/app-server/tests/suite/output_schema.rs @@ -80,6 +80,7 @@ async fn send_user_turn_accepts_output_schema_v1() -> Result<()> { conversation_id, items: vec![InputItem::Text { text: "Hello".to_string(), + text_elements: Vec::new(), }], cwd: codex_home.path().to_path_buf(), approval_policy: AskForApproval::Never, @@ -181,6 +182,7 @@ async fn send_user_turn_output_schema_is_per_turn_v1() -> Result<()> { conversation_id, items: vec![InputItem::Text { text: "Hello".to_string(), + text_elements: Vec::new(), }], cwd: codex_home.path().to_path_buf(), approval_policy: AskForApproval::Never, @@ -228,6 +230,7 @@ async fn send_user_turn_output_schema_is_per_turn_v1() -> Result<()> { conversation_id, items: vec![InputItem::Text { text: "Hello again".to_string(), + text_elements: Vec::new(), }], cwd: codex_home.path().to_path_buf(), approval_policy: AskForApproval::Never, diff --git a/codex-rs/app-server/tests/suite/send_message.rs b/codex-rs/app-server/tests/suite/send_message.rs index 83e809f48eb..0c713de87ce 100644 --- a/codex-rs/app-server/tests/suite/send_message.rs +++ b/codex-rs/app-server/tests/suite/send_message.rs @@ -101,6 +101,7 @@ async fn send_message( conversation_id, items: vec![InputItem::Text { text: message.to_string(), + text_elements: Vec::new(), }], }) .await?; @@ -194,6 +195,7 @@ async fn test_send_message_raw_notifications_opt_in() -> Result<()> { conversation_id, items: vec![InputItem::Text { text: "Hello".to_string(), + text_elements: Vec::new(), }], }) .await?; @@ -245,6 +247,7 @@ async fn test_send_message_session_not_found() -> Result<()> { conversation_id: unknown, items: vec![InputItem::Text { text: "ping".to_string(), + text_elements: Vec::new(), }], }) .await?; @@ -425,7 +428,7 @@ fn content_texts(content: &[ContentItem]) -> Vec<&str> { content .iter() .filter_map(|item| match item { - ContentItem::InputText { text } | ContentItem::OutputText { text } => { + ContentItem::InputText { text, .. } | ContentItem::OutputText { text } => { Some(text.as_str()) } _ => None, diff --git a/codex-rs/app-server/tests/suite/v2/thread_resume.rs b/codex-rs/app-server/tests/suite/v2/thread_resume.rs index cfcfcedf70f..be2971397de 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_resume.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_resume.rs @@ -1,6 +1,6 @@ use anyhow::Result; use app_test_support::McpProcess; -use app_test_support::create_fake_rollout; +use app_test_support::create_fake_rollout_with_text_elements; use app_test_support::create_mock_responses_server_repeating_assistant; use app_test_support::to_response; use codex_app_server_protocol::JSONRPCResponse; @@ -15,6 +15,9 @@ use codex_app_server_protocol::TurnStatus; use codex_app_server_protocol::UserInput; use codex_protocol::models::ContentItem; use codex_protocol::models::ResponseItem; +use codex_protocol::user_input::ByteRange; +use codex_protocol::user_input::TextElement; +use pretty_assertions::assert_eq; use std::path::PathBuf; use tempfile::TempDir; use tokio::time::timeout; @@ -71,11 +74,19 @@ async fn thread_resume_returns_rollout_history() -> Result<()> { create_config_toml(codex_home.path(), &server.uri())?; let preview = "Saved user message"; - let conversation_id = create_fake_rollout( + let text_elements = vec![TextElement { + byte_range: ByteRange { start: 0, end: 5 }, + placeholder: Some("".into()), + }]; + let conversation_id = create_fake_rollout_with_text_elements( codex_home.path(), "2025-01-05T12-00-00", "2025-01-05T12:00:00Z", preview, + text_elements + .iter() + .map(|elem| serde_json::to_value(elem).expect("serialize text element")) + .collect(), Some("mock_provider"), None, )?; @@ -119,7 +130,7 @@ async fn thread_resume_returns_rollout_history() -> Result<()> { content, &vec![UserInput::Text { text: preview.to_string(), - text_elements: Vec::new(), + text_elements: text_elements.clone().into_iter().map(Into::into).collect(), }] ); } diff --git a/codex-rs/app-server/tests/suite/v2/turn_start.rs b/codex-rs/app-server/tests/suite/v2/turn_start.rs index 406f80328ba..e3b758a4c64 100644 --- a/codex-rs/app-server/tests/suite/v2/turn_start.rs +++ b/codex-rs/app-server/tests/suite/v2/turn_start.rs @@ -8,6 +8,7 @@ use app_test_support::create_mock_responses_server_sequence_unchecked; use app_test_support::create_shell_command_sse_response; use app_test_support::format_with_current_shell_display; use app_test_support::to_response; +use codex_app_server_protocol::ByteRange; use codex_app_server_protocol::ClientInfo; use codex_app_server_protocol::CommandExecutionApprovalDecision; use codex_app_server_protocol::CommandExecutionRequestApprovalResponse; @@ -23,6 +24,7 @@ use codex_app_server_protocol::PatchApplyStatus; use codex_app_server_protocol::PatchChangeKind; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ServerRequest; +use codex_app_server_protocol::TextElement; use codex_app_server_protocol::ThreadItem; use codex_app_server_protocol::ThreadStartParams; use codex_app_server_protocol::ThreadStartResponse; @@ -113,6 +115,87 @@ async fn turn_start_sends_originator_header() -> Result<()> { Ok(()) } +#[tokio::test] +async fn turn_start_emits_user_message_item_with_text_elements() -> Result<()> { + let responses = vec![create_final_assistant_message_sse_response("Done")?]; + let server = create_mock_responses_server_sequence_unchecked(responses).await; + + let codex_home = TempDir::new()?; + create_config_toml(codex_home.path(), &server.uri(), "never")?; + + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()).await??; + + let thread_req = mcp + .send_thread_start_request(ThreadStartParams { + model: Some("mock-model".to_string()), + ..Default::default() + }) + .await?; + let thread_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(thread_req)), + ) + .await??; + let ThreadStartResponse { thread, .. } = to_response::(thread_resp)?; + + let text_elements = vec![TextElement { + byte_range: ByteRange { start: 0, end: 5 }, + placeholder: Some("".to_string()), + }]; + let turn_req = mcp + .send_turn_start_request(TurnStartParams { + thread_id: thread.id.clone(), + input: vec![V2UserInput::Text { + text: "Hello".to_string(), + text_elements: text_elements.clone(), + }], + ..Default::default() + }) + .await?; + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(turn_req)), + ) + .await??; + + let user_message_item = timeout(DEFAULT_READ_TIMEOUT, async { + loop { + let notification = mcp + .read_stream_until_notification_message("item/started") + .await?; + let params = notification.params.expect("item/started params"); + let item_started: ItemStartedNotification = + serde_json::from_value(params).expect("deserialize item/started notification"); + if let ThreadItem::UserMessage { .. } = item_started.item { + return Ok::(item_started.item); + } + } + }) + .await??; + + match user_message_item { + ThreadItem::UserMessage { content, .. } => { + assert_eq!( + content, + vec![V2UserInput::Text { + text: "Hello".to_string(), + text_elements, + }] + ); + } + other => panic!("expected user message item, got {other:?}"), + } + + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("turn/completed"), + ) + .await??; + + Ok(()) +} + #[tokio::test] async fn turn_start_emits_notifications_and_accepts_model_override() -> Result<()> { // Provide a mock server and config so model wiring is valid. diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 59f19dc9bec..06000a345c7 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -36,6 +36,7 @@ use codex_protocol::ThreadId; use codex_protocol::approvals::ExecPolicyAmendment; use codex_protocol::config_types::WebSearchMode; use codex_protocol::items::TurnItem; +use codex_protocol::items::UserMessageItem; use codex_protocol::openai_models::ModelInfo; use codex_protocol::protocol::FileChange; use codex_protocol::protocol::HasLegacyEvent; @@ -1525,6 +1526,22 @@ impl Session { } } + pub(crate) async fn record_user_prompt_and_emit_turn_item( + &self, + turn_context: &TurnContext, + input: &[UserInput], + response_item: ResponseItem, + ) { + // Persist the user message to history, but emit the turn item from `UserInput` so + // UI-only `text_elements` are preserved. `ResponseItem::Message` does not carry + // those spans, and `record_response_item_and_emit_turn_item` would drop them. + self.record_conversation_items(turn_context, std::slice::from_ref(&response_item)) + .await; + let turn_item = TurnItem::UserMessage(UserMessageItem::new(input)); + self.emit_turn_item_started(turn_context, &turn_item).await; + self.emit_turn_item_completed(turn_context, turn_item).await; + } + pub(crate) async fn notify_background_event( &self, turn_context: &TurnContext, @@ -2563,9 +2580,9 @@ pub(crate) async fn run_turn( .await; } - let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input); + let initial_input_for_turn: ResponseInputItem = ResponseInputItem::from(input.clone()); let response_item: ResponseItem = initial_input_for_turn.clone().into(); - sess.record_response_item_and_emit_turn_item(turn_context.as_ref(), response_item) + sess.record_user_prompt_and_emit_turn_item(turn_context.as_ref(), &input, response_item) .await; if !skill_items.is_empty() { diff --git a/codex-rs/core/tests/suite/items.rs b/codex-rs/core/tests/suite/items.rs index ce9b0f13407..20739ab0dbe 100644 --- a/codex-rs/core/tests/suite/items.rs +++ b/codex-rs/core/tests/suite/items.rs @@ -6,6 +6,8 @@ use codex_core::protocol::ItemCompletedEvent; use codex_core::protocol::ItemStartedEvent; use codex_core::protocol::Op; use codex_protocol::items::TurnItem; +use codex_protocol::user_input::ByteRange; +use codex_protocol::user_input::TextElement; use codex_protocol::user_input::UserInput; use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; @@ -38,12 +40,18 @@ async fn user_message_item_is_emitted() -> anyhow::Result<()> { let first_response = sse(vec![ev_response_created("resp-1"), ev_completed("resp-1")]); mount_sse_once(&server, first_response).await; + let text_elements = vec![TextElement { + byte_range: ByteRange { start: 0, end: 6 }, + placeholder: Some("".into()), + }]; + let expected_input = UserInput::Text { + text: "please inspect sample.txt".into(), + text_elements: text_elements.clone(), + }; + codex .submit(Op::UserInput { - items: (vec![UserInput::Text { - text: "please inspect sample.txt".into(), - text_elements: Vec::new(), - }]), + items: vec![expected_input.clone()], final_output_json_schema: None, }) .await?; @@ -66,20 +74,16 @@ async fn user_message_item_is_emitted() -> anyhow::Result<()> { .await; assert_eq!(started_item.id, completed_item.id); - assert_eq!( - started_item.content, - vec![UserInput::Text { - text: "please inspect sample.txt".into(), - text_elements: Vec::new(), - }] - ); - assert_eq!( - completed_item.content, - vec![UserInput::Text { - text: "please inspect sample.txt".into(), - text_elements: Vec::new(), - }] - ); + assert_eq!(started_item.content, vec![expected_input.clone()]); + assert_eq!(completed_item.content, vec![expected_input]); + + let legacy_message = wait_for_event_match(&codex, |ev| match ev { + EventMsg::UserMessage(event) => Some(event.clone()), + _ => None, + }) + .await; + assert_eq!(legacy_message.message, "please inspect sample.txt"); + assert_eq!(legacy_message.text_elements, text_elements); Ok(()) } diff --git a/codex-rs/core/tests/suite/resume.rs b/codex-rs/core/tests/suite/resume.rs index 8424146d65b..8912870e5eb 100644 --- a/codex-rs/core/tests/suite/resume.rs +++ b/codex-rs/core/tests/suite/resume.rs @@ -1,6 +1,8 @@ use anyhow::Result; use codex_core::protocol::EventMsg; use codex_core::protocol::Op; +use codex_protocol::user_input::ByteRange; +use codex_protocol::user_input::TextElement; use codex_protocol::user_input::UserInput; use core_test_support::responses::ev_assistant_message; use core_test_support::responses::ev_completed; @@ -12,6 +14,7 @@ use core_test_support::responses::start_mock_server; use core_test_support::skip_if_no_network; use core_test_support::test_codex::test_codex; use core_test_support::wait_for_event; +use pretty_assertions::assert_eq; use std::sync::Arc; #[tokio::test(flavor = "multi_thread", worker_threads = 2)] @@ -32,11 +35,16 @@ async fn resume_includes_initial_messages_from_rollout_events() -> Result<()> { ]); mount_sse_once(&server, initial_sse).await; + let text_elements = vec![TextElement { + byte_range: ByteRange { start: 0, end: 6 }, + placeholder: Some("".into()), + }]; + codex .submit(Op::UserInput { items: vec![UserInput::Text { text: "Record some messages".into(), - text_elements: Vec::new(), + text_elements: text_elements.clone(), }], final_output_json_schema: None, }) @@ -57,6 +65,7 @@ async fn resume_includes_initial_messages_from_rollout_events() -> Result<()> { EventMsg::TokenCount(_), ] => { assert_eq!(first_user.message, "Record some messages"); + assert_eq!(first_user.text_elements, text_elements); assert_eq!(assistant_message.message, "Completed first turn"); } other => panic!("unexpected initial messages after resume: {other:#?}"), diff --git a/codex-rs/protocol/src/items.rs b/codex-rs/protocol/src/items.rs index 9d6b484d920..cb04d427b9d 100644 --- a/codex-rs/protocol/src/items.rs +++ b/codex-rs/protocol/src/items.rs @@ -4,6 +4,8 @@ use crate::protocol::AgentReasoningRawContentEvent; use crate::protocol::EventMsg; use crate::protocol::UserMessageEvent; use crate::protocol::WebSearchEndEvent; +use crate::user_input::ByteRange; +use crate::user_input::TextElement; use crate::user_input::UserInput; use schemars::JsonSchema; use serde::Deserialize; @@ -62,13 +64,13 @@ impl UserMessageItem { } pub fn as_legacy_event(&self) -> EventMsg { + // Legacy user-message events flatten only text inputs into `message` and + // rebase text element ranges onto that concatenated text. EventMsg::UserMessage(UserMessageEvent { message: self.message(), images: Some(self.image_urls()), - // TODO: Thread text element ranges into legacy user message events. - text_elements: Vec::new(), - // TODO: Thread local image paths into legacy user message events. - local_images: Vec::new(), + local_images: self.local_image_paths(), + text_elements: self.text_elements(), }) } @@ -83,6 +85,32 @@ impl UserMessageItem { .join("") } + pub fn text_elements(&self) -> Vec { + let mut out = Vec::new(); + let mut offset = 0usize; + for input in &self.content { + if let UserInput::Text { + text, + text_elements, + } = input + { + // Text element ranges are relative to each text chunk; offset them so they align + // with the concatenated message returned by `message()`. + for elem in text_elements { + out.push(TextElement { + byte_range: ByteRange { + start: offset + elem.byte_range.start, + end: offset + elem.byte_range.end, + }, + placeholder: elem.placeholder.clone(), + }); + } + offset += text.len(); + } + } + out + } + pub fn image_urls(&self) -> Vec { self.content .iter() @@ -92,6 +120,16 @@ impl UserMessageItem { }) .collect() } + + pub fn local_image_paths(&self) -> Vec { + self.content + .iter() + .filter_map(|c| match c { + UserInput::LocalImage { path } => Some(path.clone()), + _ => None, + }) + .collect() + } } impl AgentMessageItem {