Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions xmtp_api/src/debug_wrapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,20 @@ where
.await
}

async fn subscribe_group_messages_with_cursors(
&self,
groups_with_cursors: &[(&GroupId, GlobalCursor)],
) -> Result<Self::GroupMessageStream, Self::Error> {
wrap_err(
|| {
self.inner
.subscribe_group_messages_with_cursors(groups_with_cursors)
},
|| self.inner.aggregate_stats(),
)
.await
}

async fn subscribe_welcome_messages(
&self,
installations: &[&InstallationId],
Expand Down
17 changes: 17 additions & 0 deletions xmtp_api/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,23 @@ where
.map_err(crate::dyn_err)
}

pub async fn subscribe_group_messages_with_cursors(
&self,
groups_with_cursors: &[(&GroupId, xmtp_proto::types::GlobalCursor)],
) -> Result<<ApiClient as XmtpMlsStreams>::GroupMessageStream>
where
ApiClient: XmtpMlsStreams,
{
tracing::debug!(
inbox_id = self.inbox_id,
"subscribing to group messages with cursors"
);
self.api_client
.subscribe_group_messages_with_cursors(groups_with_cursors)
.await
.map_err(crate::dyn_err)
}

pub async fn subscribe_welcome_messages(
&self,
installation_key: &InstallationId,
Expand Down
10 changes: 10 additions & 0 deletions xmtp_api_d14n/src/queries/api_stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,16 @@ where
self.inner.subscribe_group_messages(group_ids).await
}

async fn subscribe_group_messages_with_cursors(
&self,
groups_with_cursors: &[(&GroupId, xmtp_proto::types::GlobalCursor)],
) -> Result<Self::GroupMessageStream, Self::Error> {
self.stats.subscribe_messages.count_request();
self.inner
.subscribe_group_messages_with_cursors(groups_with_cursors)
.await
}

async fn subscribe_welcome_messages(
&self,
installations: &[&InstallationId],
Expand Down
11 changes: 11 additions & 0 deletions xmtp_api_d14n/src/queries/boxed_streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,17 @@ where
Ok(Box::pin(s) as Pin<Box<_>>)
}

async fn subscribe_group_messages_with_cursors(
&self,
groups_with_cursors: &[(&GroupId, xmtp_proto::types::GlobalCursor)],
) -> Result<Self::GroupMessageStream, Self::Error> {
let s = self
.inner
.subscribe_group_messages_with_cursors(groups_with_cursors)
.await?;
Ok(Box::pin(s) as Pin<Box<_>>)
}

async fn subscribe_welcome_messages(
&self,
installations: &[&InstallationId],
Expand Down
39 changes: 37 additions & 2 deletions xmtp_api_d14n/src/queries/d14n/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ use crate::protocol::{GroupMessageExtractor, WelcomeMessageExtractor};
use crate::queries::stream;

use super::D14nClient;
use std::collections::HashMap;
use xmtp_common::{MaybeSend, RetryableError};
use xmtp_proto::api::{ApiClientError, Client, QueryStream, XmtpStream};
use xmtp_proto::api_client::XmtpMlsStreams;
use xmtp_proto::types::{GroupId, InstallationId, TopicKind};
use xmtp_proto::types::{GlobalCursor, GroupId, InstallationId, TopicKind};
use xmtp_proto::xmtp::xmtpv4::message_api::SubscribeEnvelopesResponse;

#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
Expand Down Expand Up @@ -43,7 +44,41 @@ where
.cursor_store
.load()
.lcc_maybe_missing(&topics.iter().collect::<Vec<_>>())?;
tracing::info!("subscribing to messages @cursor={}", lcc);
tracing::debug!("subscribing to messages @cursor={}", lcc);
let s = SubscribeEnvelopes::builder()
.topics(topics)
.last_seen(lcc)
.build()?
.stream(&self.message_client)
.await?;
Ok(stream::try_extractor(s))
}

async fn subscribe_group_messages_with_cursors(
&self,
groups_with_cursors: &[(&GroupId, GlobalCursor)],
) -> Result<Self::GroupMessageStream, Self::Error> {
let topics = groups_with_cursors
.iter()
.map(|(gid, _)| TopicKind::GroupMessagesV1.create(gid))
.collect::<Vec<_>>();

// Compute the lowest common cursor from the provided cursors
let mut min_clock: HashMap<u32, u64> = HashMap::new();
for (_, cursor) in groups_with_cursors {
for (&node_id, &seq_id) in cursor.iter() {
min_clock
.entry(node_id)
.and_modify(|existing| *existing = (*existing).min(seq_id))
.or_insert(seq_id);
}
}
let lcc = GlobalCursor::new(min_clock);

tracing::debug!(
"subscribing to messages with provided cursors @cursor={}",
lcc
);
let s = SubscribeEnvelopes::builder()
.topics(topics)
.last_seen(lcc)
Expand Down
33 changes: 31 additions & 2 deletions xmtp_api_d14n/src/queries/v3/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use xmtp_proto::api::{ApiClientError, Client, QueryStream, XmtpStream};
use xmtp_proto::api_client::XmtpMlsStreams;
use xmtp_proto::mls_v1::subscribe_group_messages_request::Filter as GroupSubscribeFilter;
use xmtp_proto::mls_v1::subscribe_welcome_messages_request::Filter as WelcomeSubscribeFilter;
use xmtp_proto::types::{GroupId, GroupMessage, InstallationId, TopicKind, WelcomeMessage};
use xmtp_proto::types::{
GlobalCursor, GroupId, GroupMessage, InstallationId, TopicKind, WelcomeMessage,
};

#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
Expand Down Expand Up @@ -38,7 +40,7 @@ where
let mut filters = vec![];
for topic in &topics {
let cursor = cursors.get(topic).cloned().unwrap_or_default().max();
tracing::info!("subscribing to {topic} @ {cursor}");
tracing::debug!("subscribing to {topic} @ {cursor}");
filters.push(GroupSubscribeFilter {
group_id: topic.identifier().to_vec(),
id_cursor: cursor,
Expand All @@ -54,6 +56,33 @@ where
))
}

async fn subscribe_group_messages_with_cursors(
&self,
groups_with_cursors: &[(&GroupId, GlobalCursor)],
) -> Result<Self::GroupMessageStream, Self::Error> {
let mut filters = vec![];
for (group_id, cursor) in groups_with_cursors {
let id_cursor = cursor.max();
tracing::debug!(
"subscribing to group {} @ cursor {}",
hex::encode(group_id),
id_cursor
);
filters.push(GroupSubscribeFilter {
group_id: group_id.to_vec(),
id_cursor,
})
}

Ok(try_from_stream(
SubscribeGroupMessages::builder()
.filters(filters)
.build()?
.stream(&self.client)
.await?,
))
}

async fn subscribe_welcome_messages(
&self,
installations: &[&InstallationId],
Expand Down
4 changes: 4 additions & 0 deletions xmtp_api_d14n/src/test/mock_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ mod not_wasm {
#[mockall::concretize]
async fn subscribe_group_messages(&self, group_ids: &[&GroupId]) -> Result<MockGroupStream, MockError>;
#[mockall::concretize]
async fn subscribe_group_messages_with_cursors(&self, groups_with_cursors: &[(&GroupId, xmtp_proto::types::GlobalCursor)]) -> Result<MockGroupStream, MockError>;
#[mockall::concretize]
async fn subscribe_welcome_messages(&self, installations: &[&InstallationId]) -> Result<MockWelcomeStream, MockError>;
}

Expand Down Expand Up @@ -192,6 +194,8 @@ mod wasm {
#[mockall::concretize]
async fn subscribe_group_messages(&self, group_ids: &[&GroupId]) -> Result<MockGroupStream, MockError>;
#[mockall::concretize]
async fn subscribe_group_messages_with_cursors(&self, groups_with_cursors: &[(&GroupId, xmtp_proto::types::GlobalCursor)]) -> Result<MockGroupStream, MockError>;
#[mockall::concretize]
async fn subscribe_welcome_messages(&self, installations: &[&InstallationId]) -> Result<MockWelcomeStream, MockError>;
}

Expand Down
Loading
Loading