Skip to content

feat(rust_arroyo): add Broadcast step#164

Merged
bmckerry merged 9 commits intomainfrom
74/broadcast-custom-step
Jul 7, 2025
Merged

feat(rust_arroyo): add Broadcast step#164
bmckerry merged 9 commits intomainfrom
74/broadcast-custom-step

Conversation

@bmckerry
Copy link
Member

@bmckerry bmckerry commented Jun 27, 2025

This PR:

  • adds a rust arroyo Broadcast step which makes a copy of a message for each downstream route, where each copy is routed to one of those branches
  • implements the Clone trait for RoutedValue (and conversely for each variant of RoutedValuePayload)
  • updates docs with current progress on implementing watermarks
  • has a massive uv.lock diff for some reason

My plan going forward is to re-use the logic from generate_broadcast_messages() when making a custom Router step that routes regular messages downstream (to a single downstream branch), but broadcasts watermarks downstream (to all branches).

For docs on routing/watermarks, please see https://getsentry.github.io/streams/runtime/arroyo.html#routes.

@bmckerry bmckerry changed the title feat(API): add Unfold generic step for 1:n message relationships feat(rust_arroyo): add Broadcast step Jul 2, 2025
@bmckerry bmckerry force-pushed the 74/broadcast-custom-step branch from e354fa3 to 29a67bd Compare July 3, 2025 14:33
@@ -1,50 +0,0 @@
from sentry_streams.examples.broadcast_fn import BroadcastFunctions
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleting this as it isn't a practical example and implicit broadcast doesn't work now

@@ -1,166 +1,167 @@
version = 1
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure what happened here, I thought this diff was already on main


impl ProcessingStrategy<RoutedValue> for Broadcaster {
fn poll(&mut self) -> Result<Option<CommitRequest>, StrategyError> {
self.flush_pending()?;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a situation I'm not sure about:

  • a watermark message goes thru the broadcast step with 2 downstream branches, the first copy receives a MessageRejected backpressure error and goes into the pending buffer
  • before the watermark message is retried, poll() is called which flushes the pending buffer, sending the previously failed message copy downstream
    • commit step receives watermark from branch 1
  • the failed message is retried and successfully passes through the broadcast step, sending 2 more copies of the message downstream
    • commit step receives watermark from branches 1 and 2

In the above situation we end up with 3 watermarks reaching the commit step when the commit step only expects 2 (since there's 2 downstream routes), meaning if we're just counting watermarks we'll commit after we receive the 2nd watermark copy from branch 1 without receiving the watermark copy from branch 2.

Questions:

  • can arroyo call poll() between when a message receives MessageRejected and when that message is retried?
  • does this mean that in the custom commit step, we'll need to ensure we receive a watermark with a specific committable payload routed for each possible branch, and not just count the number of watermarks with a given committable payload?

Copy link
Member Author

@bmckerry bmckerry Jul 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'll need to ensure we receive a watermark with a specific committable payload routed for each possible branch, and not just count the number of watermarks with a given committable payload?

talked with @evanh and this is basically the answer to this problem

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I understand the problem.
Why would we ever send a second copy of the watermark ?
If we try to send a watermark, route 1 works and route 2 receives a MessageRejected:

  • The next poll should only re-send route 2 watermark.
  • route 1 watermak should not be sent again as you do for any other message.

Are we not using the message identifier to avoid sending another copy of the same watermark ?

@bmckerry bmckerry marked this pull request as ready for review July 3, 2025 16:59
@bmckerry bmckerry requested a review from a team as a code owner July 3, 2025 16:59
Copy link
Member

@evanh evanh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it looks good!

@bmckerry bmckerry merged commit a4f718e into main Jul 7, 2025
17 checks passed
Comment on lines +325 to +338
impl Clone for PyStreamingMessage {
fn clone(&self) -> Self {
match self {
PyStreamingMessage::PyAnyMessage { ref content } => {
let py_any_clone = traced_with_gil!(|py| content.clone_ref(py));
PyStreamingMessage::PyAnyMessage {
content: py_any_clone,
}
}
PyStreamingMessage::RawMessage { ref content } => {
let raw_clone = traced_with_gil!(|py| content.clone_ref(py));
PyStreamingMessage::RawMessage { content: raw_clone }
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not creating a clone of the message.
It is only cloning the reference to the Python payload, which means that, if the payload is mutable, two branches may update the same message.
The only way to clone the payload is to relying on Python code to clone it (maybe using deepcopy).

In these scenarios unit tests can help you to validate correctness of your code. The identity of the python object of your clone would not change.

Comment on lines +142 to +145
let unfolded_messages = generate_broadcast_messages(&self.downstream_branches, message);
for msg in unfolded_messages {
self.handle_submit(msg)?;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this works as expected.
As this piece of code does not flush the pending messages, if there are pending messages, you would submit the newer message before the pending messages (that are older) thus sending messages out of order, which is wrong.

Please ensure this case is covered by your unit tests.


impl ProcessingStrategy<RoutedValue> for Broadcaster {
fn poll(&mut self) -> Result<Option<CommitRequest>, StrategyError> {
self.flush_pending()?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I understand the problem.
Why would we ever send a second copy of the watermark ?
If we try to send a watermark, route 1 works and route 2 receives a MessageRejected:

  • The next poll should only re-send route 2 watermark.
  • route 1 watermak should not be sent again as you do for any other message.

Are we not using the message identifier to avoid sending another copy of the same watermark ?

Comment on lines +240 to +249
let _ = step.submit(message.clone());
assert_eq!(step.pending_messages.len(), 1);
let actual_messages = submitted_messages_clone.lock().unwrap();
assert_messages_match(
py,
vec![
"test_message".into_py_any(py).unwrap(),
"test_message".into_py_any(py).unwrap(),
],
actual_messages.deref(),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not enough to test the functionality. It does not check messages are sent on all routes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants