Skip to content

Conversation

@neekolas
Copy link
Contributor

@neekolas neekolas commented Oct 29, 2025

TL;DR

Refactored cursor store API to efficiently handle multiple topics at once with the new latest_for_topics method.

What changed?

  • Replaced latest_maybe_missing_per with a more efficient latest_for_topics method that handles multiple topics in a single call
  • Updated implementations in InMemoryCursorStore and MLS cursor store to support batch processing of topics
  • Modified the streams implementation to use the new API when subscribing to group and welcome messages
  • Made TopicKind derive Hash to support using it as a key in HashMaps

How to test?

  • Verify that subscribing to group messages and welcome messages works correctly
  • Test with multiple topics to ensure the batch processing functions as expected
  • Check that cursor retrieval maintains the same behavior as before but with improved efficiency

Why make this change?

This change improves performance by reducing the number of database calls when working with multiple topics. Instead of making separate calls for each topic, we now batch them together, which is especially beneficial when subscribing to multiple groups or installations simultaneously. The new API is also cleaner and more intuitive for handling collections of topics.

Copy link
Contributor Author

neekolas commented Oct 29, 2025


How to use the Graphite Merge Queue

Add the label mergequeue to this PR to add it to the merge queue.

You must have a Graphite account in order to use the merge queue. Sign up using this link.

An organization admin has enabled the Graphite Merge Queue in this repository.

Please do not merge from GitHub as this will restart CI on PRs being processed by the merge queue.

This stack of pull requests is managed by Graphite. Learn more about stacking.

@claude
Copy link

claude bot commented Oct 29, 2025

Claude finished @neekolas's task —— View job


Code Review - November 3, 2025, 7:44 PM UTC

  • Read changed files and understand implementation
  • Analyze code quality and potential issues
  • Provide review feedback

Review Findings

1. Behavior Change: Missing Topics Not Returned (Critical)

The new latest_for_topics implementation differs from latest in how it handles missing cursors:

In xmtp_mls/src/cursor_store.rs:118-124 and 137-143:

Ok(topics_of_kind
    .into_iter()
    .map(|topic| {
        let cursor = cursors.remove(topic.identifier()).unwrap_or_default();
        (topic.clone(), cursor)
    })
    .collect())

Good: This correctly returns default cursors for missing topics.

However, in xmtp_api_d14n/src/protocol/in_memory_cursor_store.rs:92-94:

Ok(topics
    .map(|topic| (topic.clone(), self.latest(topic).unwrap_or_default()))
    .collect())

Good: This also returns default cursors.

But in usage at xmtp_api_d14n/src/queries/v3/streams.rs:40 and 73:

let cursor = cursors.get(topic).cloned().unwrap_or_default().max();

⚠️ Potential Issue: While the code uses unwrap_or_default(), this defensive pattern suggests there might be an expectation that topics could be missing from the map. The implementations appear correct, but this warrants testing to ensure consistency.

2. Missing Test Coverage for latest_for_topics

The PR introduces a new core API method but doesn't add tests for it:

  • No unit tests verify latest_for_topics behavior with multiple topics
  • No tests verify the batch processing works correctly for different TopicKind combinations
  • No tests verify missing topics return default cursors

Recommendation: Add tests similar to the existing get_last_cursor_for_ids_* tests in refresh_state.rs:1270-1447, covering:

  • Empty topics list
  • Single topic
  • Multiple topics with mixed states (some with cursors, some without)
  • Multiple topics of different TopicKinds
  • Large batches to test chunking behavior

3. Iterator Consumption Pattern (Minor Performance Concern)

In xmtp_mls/src/cursor_store.rs:105:

let partitions = topics.into_group_map_by(|t| t.kind());

This consumes the entire iterator upfront and collects all topics into a HashMap, which could be memory-intensive for large topic sets. While this is probably acceptable given typical use cases, it's worth noting that the batching efficiency gains at the database level come at the cost of upfront collection in memory.

4. Inconsistent Error Handling in IdentityUpdatesV1 (Minor)

In xmtp_mls/src/cursor_store.rs:145-156:

TopicKind::IdentityUpdatesV1 => topics_of_kind
    .into_iter()
    .map(|topic| {
        let sid = self
            .db
            .get_latest_sequence_id_for_inbox(&hex::encode(topic.identifier()))
            .map_err(|e| CursorStoreError::Other(Box::new(e) as Box<_>))?;
        // ...
    })
    .collect(),

The collect() on an iterator returning Result types works but is less explicit than using try_collect() or collecting into Result<Vec<_>, _>. This is consistent with the existing codebase style, so not a blocker, but worth noting for clarity.

5. Hash Derive for TopicKind (Good)

Adding Hash derive to TopicKind in xmtp_proto/src/types/topic.rs:8 is appropriate for use as a HashMap key and doesn't introduce any issues since TopicKind is a simple enum with Copy semantics.


Summary

The refactoring successfully achieves its goal of batching cursor lookups for improved performance. The implementation is generally sound with good consistency across both storage backends. The main concerns are:

  1. Test coverage: No tests for the new latest_for_topics method
  2. Behavior verification: Ensure default cursor behavior is tested end-to-end

The code quality is good and follows existing patterns in the codebase. The failing CI tests mentioned in the PR history should be investigated to determine if they're related to this change or unrelated infrastructure issues.


@macroscopeapp
Copy link
Contributor

macroscopeapp bot commented Oct 29, 2025

Batch-fetch latest cursors per topic to start streaming from the last sync by replacing protocol.traits.cursor_store::CursorStore::latest_maybe_missing_per with CursorStore::latest_for_topics and updating stream subscriptions in queries.v3.streams::V3Client

Introduce a batch latest_for_topics API on CursorStore and remove per-topic latest_maybe_missing_per; update stream subscriptions to use the batch map of Topic -> GlobalCursor, and add a hashed TopicKind to support partitioned lookups in the MLS cursor store.

📍Where to Start

Start with the CursorStore trait changes in cursor_store.rs, then review the MLS implementation in cursor_store.rs, and finally the subscription call sites in streams.rs.


Macroscope summarized 0d5d147.

@codecov
Copy link

codecov bot commented Oct 29, 2025

Codecov Report

❌ Patch coverage is 63.33333% with 22 lines in your changes missing coverage. Please review.
✅ Project coverage is 77.32%. Comparing base (e2b2f4d) to head (cbe0d63).
⚠️ Report is 4 commits behind head on main.

Files with missing lines Patch % Lines
xmtp_mls/src/cursor_store.rs 69.09% 17 Missing ⚠️
...tp_api_d14n/src/protocol/in_memory_cursor_store.rs 0.00% 5 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2697      +/-   ##
==========================================
- Coverage   77.33%   77.32%   -0.02%     
==========================================
  Files         359      361       +2     
  Lines       53002    53161     +159     
==========================================
+ Hits        40991    41105     +114     
- Misses      12011    12056      +45     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@neekolas neekolas force-pushed the 10-28-change_streaming_cursors_to_start_from_last_sync branch from dd9bba8 to e15dcf4 Compare October 29, 2025 00:24
@neekolas neekolas marked this pull request as ready for review October 29, 2025 05:33
@neekolas neekolas requested a review from a team as a code owner October 29, 2025 05:33
@neekolas neekolas force-pushed the 10-28-change_streaming_cursors_to_start_from_last_sync branch from e15dcf4 to 0d5d147 Compare October 29, 2025 21:07
@cameronvoell cameronvoell self-requested a review November 3, 2025 17:16
@graphite-app
Copy link

graphite-app bot commented Nov 3, 2025

Merge activity

  • Nov 3, 7:08 PM UTC: neekolas added this pull request to the Graphite merge queue.
  • Nov 3, 7:08 PM UTC: CI is running for this pull request on a draft pull request (#2719) due to your merge queue CI optimization settings.
  • Nov 3, 7:14 PM UTC: The Graphite merge queue couldn't merge this PR because it was not satisfying all requirements (Failed CI (Test)).
  • Nov 3, 7:15 PM UTC: neekolas added this pull request to the Graphite merge queue.
  • Nov 3, 7:15 PM UTC: CI is running for this pull request on a draft pull request (#2720) due to your merge queue CI optimization settings.
  • Nov 3, 7:21 PM UTC: The Graphite merge queue couldn't merge this PR because it was not satisfying all requirements (Failed CI (Test)).
  • Nov 3, 7:57 PM UTC: neekolas added this pull request to the Graphite merge queue.
  • Nov 3, 7:57 PM UTC: CI is running for this pull request on a draft pull request (#2722) due to your merge queue CI optimization settings.
  • Nov 3, 8:12 PM UTC: Merged by the Graphite merge queue via draft PR: #2722.

graphite-app bot pushed a commit that referenced this pull request Nov 3, 2025
### TL;DR

Refactored cursor store API to efficiently handle multiple topics at once with the new `latest_for_topics` method.

### What changed?

- Replaced `latest_maybe_missing_per` with a more efficient `latest_for_topics` method that handles multiple topics in a single call
- Updated implementations in `InMemoryCursorStore` and MLS cursor store to support batch processing of topics
- Modified the streams implementation to use the new API when subscribing to group and welcome messages
- Made `TopicKind` derive `Hash` to support using it as a key in HashMaps

### How to test?

- Verify that subscribing to group messages and welcome messages works correctly
- Test with multiple topics to ensure the batch processing functions as expected
- Check that cursor retrieval maintains the same behavior as before but with improved efficiency

### Why make this change?

This change improves performance by reducing the number of database calls when working with multiple topics. Instead of making separate calls for each topic, we now batch them together, which is especially beneficial when subscribing to multiple groups or installations simultaneously. The new API is also cleaner and more intuitive for handling collections of topics.
graphite-app bot pushed a commit that referenced this pull request Nov 3, 2025
### TL;DR

Refactored cursor store API to efficiently handle multiple topics at once with the new `latest_for_topics` method.

### What changed?

- Replaced `latest_maybe_missing_per` with a more efficient `latest_for_topics` method that handles multiple topics in a single call
- Updated implementations in `InMemoryCursorStore` and MLS cursor store to support batch processing of topics
- Modified the streams implementation to use the new API when subscribing to group and welcome messages
- Made `TopicKind` derive `Hash` to support using it as a key in HashMaps

### How to test?

- Verify that subscribing to group messages and welcome messages works correctly
- Test with multiple topics to ensure the batch processing functions as expected
- Check that cursor retrieval maintains the same behavior as before but with improved efficiency

### Why make this change?

This change improves performance by reducing the number of database calls when working with multiple topics. Instead of making separate calls for each topic, we now batch them together, which is especially beneficial when subscribing to multiple groups or installations simultaneously. The new API is also cleaner and more intuitive for handling collections of topics.
@neekolas neekolas force-pushed the 10-28-change_streaming_cursors_to_start_from_last_sync branch from 0d5d147 to cbe0d63 Compare November 3, 2025 19:43
graphite-app bot pushed a commit that referenced this pull request Nov 3, 2025
### TL;DR

Refactored cursor store API to efficiently handle multiple topics at once with the new `latest_for_topics` method.

### What changed?

- Replaced `latest_maybe_missing_per` with a more efficient `latest_for_topics` method that handles multiple topics in a single call
- Updated implementations in `InMemoryCursorStore` and MLS cursor store to support batch processing of topics
- Modified the streams implementation to use the new API when subscribing to group and welcome messages
- Made `TopicKind` derive `Hash` to support using it as a key in HashMaps

### How to test?

- Verify that subscribing to group messages and welcome messages works correctly
- Test with multiple topics to ensure the batch processing functions as expected
- Check that cursor retrieval maintains the same behavior as before but with improved efficiency

### Why make this change?

This change improves performance by reducing the number of database calls when working with multiple topics. Instead of making separate calls for each topic, we now batch them together, which is especially beneficial when subscribing to multiple groups or installations simultaneously. The new API is also cleaner and more intuitive for handling collections of topics.
@graphite-app graphite-app bot closed this Nov 3, 2025
@graphite-app graphite-app bot deleted the 10-28-change_streaming_cursors_to_start_from_last_sync branch November 3, 2025 20:12
cameronvoell pushed a commit that referenced this pull request Nov 4, 2025
### TL;DR

Refactored cursor store API to efficiently handle multiple topics at once with the new `latest_for_topics` method.

### What changed?

- Replaced `latest_maybe_missing_per` with a more efficient `latest_for_topics` method that handles multiple topics in a single call
- Updated implementations in `InMemoryCursorStore` and MLS cursor store to support batch processing of topics
- Modified the streams implementation to use the new API when subscribing to group and welcome messages
- Made `TopicKind` derive `Hash` to support using it as a key in HashMaps

### How to test?

- Verify that subscribing to group messages and welcome messages works correctly
- Test with multiple topics to ensure the batch processing functions as expected
- Check that cursor retrieval maintains the same behavior as before but with improved efficiency

### Why make this change?

This change improves performance by reducing the number of database calls when working with multiple topics. Instead of making separate calls for each topic, we now batch them together, which is especially beneficial when subscribing to multiple groups or installations simultaneously. The new API is also cleaner and more intuitive for handling collections of topics.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants