diff --git a/codex-rs/app-server/tests/common/mcp_process.rs b/codex-rs/app-server/tests/common/mcp_process.rs index b2289493acff..430a400a2c2e 100644 --- a/codex-rs/app-server/tests/common/mcp_process.rs +++ b/codex-rs/app-server/tests/common/mcp_process.rs @@ -1031,6 +1031,31 @@ impl McpProcess { Ok(notification) } + pub async fn read_stream_until_matching_notification( + &mut self, + description: &str, + predicate: F, + ) -> anyhow::Result + where + F: Fn(&JSONRPCNotification) -> bool, + { + eprintln!("in read_stream_until_matching_notification({description})"); + + let message = self + .read_stream_until_message(|message| { + matches!( + message, + JSONRPCMessage::Notification(notification) if predicate(notification) + ) + }) + .await?; + + let JSONRPCMessage::Notification(notification) = message else { + unreachable!("expected JSONRPCMessage::Notification, got {message:?}"); + }; + Ok(notification) + } + pub async fn read_next_message(&mut self) -> anyhow::Result { self.read_stream_until_message(|_| true).await } @@ -1043,6 +1068,16 @@ impl McpProcess { self.pending_messages.clear(); } + pub fn pending_notification_methods(&self) -> Vec { + self.pending_messages + .iter() + .filter_map(|message| match message { + JSONRPCMessage::Notification(notification) => Some(notification.method.clone()), + _ => None, + }) + .collect() + } + /// Reads the stream until a message matches `predicate`, buffering any non-matching messages /// for later reads. async fn read_stream_until_message(&mut self, predicate: F) -> anyhow::Result diff --git a/codex-rs/app-server/tests/suite/fuzzy_file_search.rs b/codex-rs/app-server/tests/suite/fuzzy_file_search.rs index 7341a5a5f7ad..0070c2b30b83 100644 --- a/codex-rs/app-server/tests/suite/fuzzy_file_search.rs +++ b/codex-rs/app-server/tests/suite/fuzzy_file_search.rs @@ -52,54 +52,86 @@ async fn wait_for_session_updated( query: &str, file_expectation: FileExpectation, ) -> Result { - for _ in 0..20 { - let notification = timeout( - DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_notification_message(SESSION_UPDATED_METHOD), - ) - .await??; - let params = notification - .params - .ok_or_else(|| anyhow!("missing notification params"))?; - let payload = serde_json::from_value::(params)?; - if payload.session_id != session_id || payload.query != query { - continue; - } - let files_match = match file_expectation { - FileExpectation::Any => true, - FileExpectation::Empty => payload.files.is_empty(), - FileExpectation::NonEmpty => !payload.files.is_empty(), - }; - if files_match { - return Ok(payload); + let description = format!("session update for sessionId={session_id}, query={query}"); + let notification = match timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_matching_notification(&description, |notification| { + if notification.method != SESSION_UPDATED_METHOD { + return false; + } + let Some(params) = notification.params.as_ref() else { + return false; + }; + let Ok(payload) = + serde_json::from_value::(params.clone()) + else { + return false; + }; + let files_match = match file_expectation { + FileExpectation::Any => true, + FileExpectation::Empty => payload.files.is_empty(), + FileExpectation::NonEmpty => !payload.files.is_empty(), + }; + payload.session_id == session_id && payload.query == query && files_match + }), + ) + .await + { + Ok(result) => result?, + Err(_) => { + anyhow::bail!( + "timed out waiting for {description}; buffered notifications={:?}", + mcp.pending_notification_methods() + ) } - } - anyhow::bail!( - "did not receive expected session update for sessionId={session_id}, query={query}" - ); + }; + let params = notification + .params + .ok_or_else(|| anyhow!("missing notification params"))?; + Ok(serde_json::from_value::< + FuzzyFileSearchSessionUpdatedNotification, + >(params)?) } async fn wait_for_session_completed( mcp: &mut McpProcess, session_id: &str, ) -> Result { - for _ in 0..20 { - let notification = timeout( - DEFAULT_READ_TIMEOUT, - mcp.read_stream_until_notification_message(SESSION_COMPLETED_METHOD), - ) - .await??; - let params = notification - .params - .ok_or_else(|| anyhow!("missing notification params"))?; - let payload = - serde_json::from_value::(params)?; - if payload.session_id == session_id { - return Ok(payload); + let description = format!("session completion for sessionId={session_id}"); + let notification = match timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_matching_notification(&description, |notification| { + if notification.method != SESSION_COMPLETED_METHOD { + return false; + } + let Some(params) = notification.params.as_ref() else { + return false; + }; + let Ok(payload) = serde_json::from_value::( + params.clone(), + ) else { + return false; + }; + payload.session_id == session_id + }), + ) + .await + { + Ok(result) => result?, + Err(_) => { + anyhow::bail!( + "timed out waiting for {description}; buffered notifications={:?}", + mcp.pending_notification_methods() + ) } - } - - anyhow::bail!("did not receive expected session completion for sessionId={session_id}"); + }; + + let params = notification + .params + .ok_or_else(|| anyhow!("missing notification params"))?; + Ok(serde_json::from_value::< + FuzzyFileSearchSessionCompletedNotification, + >(params)?) } async fn assert_update_request_fails_for_missing_session(