Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions codex-rs/app-server/tests/common/mcp_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1031,6 +1031,31 @@ impl McpProcess {
Ok(notification)
}

pub async fn read_stream_until_matching_notification<F>(
&mut self,
description: &str,
predicate: F,
) -> anyhow::Result<JSONRPCNotification>
where
F: Fn(&JSONRPCNotification) -> bool,
{
eprintln!("in read_stream_until_matching_notification({description})");

let message = self
.read_stream_until_message(|message| {
matches!(
message,
JSONRPCMessage::Notification(notification) if predicate(notification)
)
})
.await?;

let JSONRPCMessage::Notification(notification) = message else {
unreachable!("expected JSONRPCMessage::Notification, got {message:?}");
};
Ok(notification)
}

pub async fn read_next_message(&mut self) -> anyhow::Result<JSONRPCMessage> {
self.read_stream_until_message(|_| true).await
}
Expand All @@ -1043,6 +1068,16 @@ impl McpProcess {
self.pending_messages.clear();
}

pub fn pending_notification_methods(&self) -> Vec<String> {
self.pending_messages
.iter()
.filter_map(|message| match message {
JSONRPCMessage::Notification(notification) => Some(notification.method.clone()),
_ => None,
})
.collect()
}

/// Reads the stream until a message matches `predicate`, buffering any non-matching messages
/// for later reads.
async fn read_stream_until_message<F>(&mut self, predicate: F) -> anyhow::Result<JSONRPCMessage>
Expand Down
112 changes: 72 additions & 40 deletions codex-rs/app-server/tests/suite/fuzzy_file_search.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,54 +52,86 @@ async fn wait_for_session_updated(
query: &str,
file_expectation: FileExpectation,
) -> Result<FuzzyFileSearchSessionUpdatedNotification> {
for _ in 0..20 {
let notification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message(SESSION_UPDATED_METHOD),
)
.await??;
let params = notification
.params
.ok_or_else(|| anyhow!("missing notification params"))?;
let payload = serde_json::from_value::<FuzzyFileSearchSessionUpdatedNotification>(params)?;
if payload.session_id != session_id || payload.query != query {
continue;
}
let files_match = match file_expectation {
FileExpectation::Any => true,
FileExpectation::Empty => payload.files.is_empty(),
FileExpectation::NonEmpty => !payload.files.is_empty(),
};
if files_match {
return Ok(payload);
let description = format!("session update for sessionId={session_id}, query={query}");
let notification = match timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_matching_notification(&description, |notification| {
if notification.method != SESSION_UPDATED_METHOD {
return false;
}
let Some(params) = notification.params.as_ref() else {
return false;
};
let Ok(payload) =
serde_json::from_value::<FuzzyFileSearchSessionUpdatedNotification>(params.clone())
else {
return false;
};
let files_match = match file_expectation {
FileExpectation::Any => true,
FileExpectation::Empty => payload.files.is_empty(),
FileExpectation::NonEmpty => !payload.files.is_empty(),
};
payload.session_id == session_id && payload.query == query && files_match
}),
)
.await
{
Ok(result) => result?,
Err(_) => {
anyhow::bail!(
"timed out waiting for {description}; buffered notifications={:?}",
mcp.pending_notification_methods()
)
}
}
anyhow::bail!(
"did not receive expected session update for sessionId={session_id}, query={query}"
);
};
let params = notification
.params
.ok_or_else(|| anyhow!("missing notification params"))?;
Ok(serde_json::from_value::<
FuzzyFileSearchSessionUpdatedNotification,
>(params)?)
}

async fn wait_for_session_completed(
mcp: &mut McpProcess,
session_id: &str,
) -> Result<FuzzyFileSearchSessionCompletedNotification> {
for _ in 0..20 {
let notification = timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_notification_message(SESSION_COMPLETED_METHOD),
)
.await??;
let params = notification
.params
.ok_or_else(|| anyhow!("missing notification params"))?;
let payload =
serde_json::from_value::<FuzzyFileSearchSessionCompletedNotification>(params)?;
if payload.session_id == session_id {
return Ok(payload);
let description = format!("session completion for sessionId={session_id}");
let notification = match timeout(
DEFAULT_READ_TIMEOUT,
mcp.read_stream_until_matching_notification(&description, |notification| {
if notification.method != SESSION_COMPLETED_METHOD {
return false;
}
let Some(params) = notification.params.as_ref() else {
return false;
};
let Ok(payload) = serde_json::from_value::<FuzzyFileSearchSessionCompletedNotification>(
params.clone(),
) else {
return false;
};
payload.session_id == session_id
}),
)
.await
{
Ok(result) => result?,
Err(_) => {
anyhow::bail!(
"timed out waiting for {description}; buffered notifications={:?}",
mcp.pending_notification_methods()
)
}
}

anyhow::bail!("did not receive expected session completion for sessionId={session_id}");
};

let params = notification
.params
.ok_or_else(|| anyhow!("missing notification params"))?;
Ok(serde_json::from_value::<
FuzzyFileSearchSessionCompletedNotification,
>(params)?)
}

async fn assert_update_request_fails_for_missing_session(
Expand Down
Loading