-
Notifications
You must be signed in to change notification settings - Fork 1.2k
[Feature] Add batch event comsumption #13261
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
[Feature] Add batch event comsumption #13261
Conversation
|
Mooved here @Megafredo |
|
Hello @Renizmy, thank you for the switch! |
|
Fixed, sorry |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #13261 +/- ##
===========================================
+ Coverage 16.26% 30.84% +14.57%
===========================================
Files 2846 2913 +67
Lines 412135 192309 -219826
Branches 11512 39176 +27664
===========================================
- Hits 67035 59317 -7718
+ Misses 345100 132992 -212108
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Megafredo
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @Renizmy, thanks for your work!
This new method for streams that allows batch processing will make a lot of people happy!
|
@Renizmy FYI, we'd like to improve a bit and refactor the code before merging ! :) |
|
Hi @Renizmy, Thank you for your contribution. As @helene-nguyen mentioned, we'd like the code to be refactored before merging. The main concern is that the new class ( Instead of creating a new class and method, we suggest implementing a Then each batch-capable connector (in regards of the targeted API) could be able to use this adapter to receive batch of message instead individual message. Usage (assuming wrapper is named self.helper.listen_stream(message_callback=self.process_message)
--->
batch_callback = self.helper.create_batch_callback(self.process_message_batch, self.batch_size, self.batch_timeout, self.max_batches_per_minute)
self.helper.listen_stream(message_callback=batch_callback)Would you be open to making this change? |
xfournet
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update! I made some comments, I will resume the PR after theses first feebacks have been processed.
| self.batch_timestamps = None | ||
|
|
||
| # Timer thread for timeout-based batch processing | ||
| self._stop_event = threading.Event() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't see where _stop_event is set ?
| """ | ||
|
|
||
| def timer_loop(): | ||
| while not self._stop_event.is_set(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after we exit the while loop, we should process remaining messages via _process_batch, else they will be lost ?
| if last_msg_id is not None: | ||
| state = self.helper.get_state() | ||
| if state is not None: | ||
| state["start_from"] = str(last_msg_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you could rather rely on last batch_data msg.id instead managing a separate variable for that which can be error prone
| self._start_timeout_timer() | ||
|
|
||
| # Heartbeat queue for rate limit waiting | ||
| self._heartbeat_queue: Optional[Queue] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is the purpose of _heartbeat_queue since it's set but never used ?
| self.batch_callback(batch_data) | ||
| if last_msg_id is not None: | ||
| state = self.helper.get_state() | ||
| if state is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if there is no state, it won't never been created ?
| finally: | ||
| self._lock.acquire() | ||
|
|
||
| def _wait_for_rate_limit(self) -> float: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there is no rate limiting in the non batch mode, why should we have it only for batch mode ?
if needed it should be separated from BatchCallbackWrapper, so we can set rate limit for batch and non batch cases
it would also permit to simplify this class that supports many concerns
|
|
||
| # Reset batch state (still under _lock) | ||
| self.batch = [] | ||
| self.batch_start_time = time.time() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it will prevent the batch_start_time to be initialized at the first message of the next batch, leading to potential premature execution of the next batch. It should rather be set to None here
| self.batch_start_time = time.time() | ||
|
|
||
| # Release _lock, acquire _processing_lock for callback | ||
| self._lock.release() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's unusal lock strategy and so look fragile for futur changes.
you should rather separate the 'batch extraction' from the 'batch processing' so you could be able to use lock with conventional way
|
Hi @xfournet , Thanks for the review! All points addressed: Changes to the rate limiter have led to simplifications. I haven't implemented any code related to RL for basic stream consumption (out of scope?). |
Related to: #13372