feat: batch enqueue, delivery batching, and smart batch modes#3
Merged
vieiralucas merged 1 commit intomainfrom Mar 24, 2026
Merged
feat: batch enqueue, delivery batching, and smart batch modes#3vieiralucas merged 1 commit intomainfrom
vieiralucas merged 1 commit intomainfrom
Conversation
- Add batch_enqueue() for explicit multi-message BatchEnqueue RPC - Add background batcher with three modes: :auto (default), :linger, :disabled - Auto mode: opportunistic batching via Queue drain (zero latency at low load) - Linger mode: timer-based batching with configurable linger_ms and batch_size - Single-item optimization: 1 message uses Enqueue RPC (preserves error types) - Delivery batching: consume unpacks repeated messages field transparently - close() drains pending messages before disconnecting - Update proto with BatchEnqueue RPC and ConsumeResponse.messages field - Bump version to 0.3.0
There was a problem hiding this comment.
2 issues found across 10 files
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="lib/fila/batcher.rb">
<violation number="1" location="lib/fila/batcher.rb:104">
P1: Shutdown signal consumed without re-enqueue causes `close()` to hang in linger mode. When `:shutdown` is received in the inner while loop, it breaks without re-pushing to the queue, so the outer loop blocks forever on `@queue.pop`. Apply the same pattern used in `drain_nonblocking`.</violation>
</file>
<file name="lib/fila/client.rb">
<violation number="1" location="lib/fila/client.rb:91">
P2: Race condition: `@batcher` can become `nil` between the check and the call if another thread invokes `close()`. Capture in a local variable to avoid `NoMethodError`.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
|
|
||
| begin | ||
| item = pop_with_timeout(remaining_ms) | ||
| break if item == :shutdown |
There was a problem hiding this comment.
P1: Shutdown signal consumed without re-enqueue causes close() to hang in linger mode. When :shutdown is received in the inner while loop, it breaks without re-pushing to the queue, so the outer loop blocks forever on @queue.pop. Apply the same pattern used in drain_nonblocking.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At lib/fila/batcher.rb, line 104:
<comment>Shutdown signal consumed without re-enqueue causes `close()` to hang in linger mode. When `:shutdown` is received in the inner while loop, it breaks without re-pushing to the queue, so the outer loop blocks forever on `@queue.pop`. Apply the same pattern used in `drain_nonblocking`.</comment>
<file context>
@@ -0,0 +1,198 @@
+
+ begin
+ item = pop_with_timeout(remaining_ms)
+ break if item == :shutdown
+
+ batch << item
</file context>
Comment on lines
+91
to
+92
| if @batcher | ||
| @batcher.submit(req) |
There was a problem hiding this comment.
P2: Race condition: @batcher can become nil between the check and the call if another thread invokes close(). Capture in a local variable to avoid NoMethodError.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At lib/fila/client.rb, line 91:
<comment>Race condition: `@batcher` can become `nil` between the check and the call if another thread invokes `close()`. Capture in a local variable to avoid `NoMethodError`.</comment>
<file context>
@@ -8,51 +8,133 @@
- rescue GRPC::NotFound => e
- raise QueueNotFoundError, "enqueue: #{e.details}"
+
+ if @batcher
+ @batcher.submit(req)
+ else
</file context>
Suggested change
| if @batcher | |
| @batcher.submit(req) | |
| batcher = @batcher | |
| if batcher | |
| batcher.submit(req) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
batch_enqueue(messages): Explicit batch enqueue method usingBatchEnqueueRPC. Each message gets an individualBatchEnqueueResult(success with message_id or error string).ConsumeResponse.messages(repeated field) into individual messages, falling back to singularmessagefield for backward compatibility.batch_mode:parameter::auto(DEFAULT): Background thread with Queue-based opportunistic batching. At low load, messages sent individually (zero latency penalty). At high load, messages cluster into batches naturally.:linger: Timer-based forced batching withlinger_ms:andbatch_size:controls.:disabled: Eachenqueueis a direct RPC (pre-existing behavior).EnqueueRPC (preserves exact error types likeQueueNotFoundError). For 2+ messages, usesBatchEnqueue.enqueueroutes through the auto-batcher by default.Fila::Client.new(addr, batch_mode: :disabled)overrides.closedrains pending messages before disconnecting.BatchEnqueueRPC,BatchEnqueueRequest/Response/Result, andConsumeResponse.messagesrepeated field.Test plan
TestBatchEnqueue: explicit batch_enqueue with multiple messages, single message, empty array, mixed success/failureTestBatchEnqueueResult: unit tests for success/error result structTestAutoBatching: single enqueue, concurrent enqueues (10 threads), nonexistent queue error propagationTestLingerBatching: single enqueue, concurrent enqueuesTestDisabledBatching: direct enqueue, error propagationTestBatchModeValidation: invalid mode raises ArgumentError, all valid modes acceptedTestCloseFlush: close drains pending messages, double close is safeSummary by cubic
Adds batch enqueue, delivery batching, and smart client-side batching to boost throughput while keeping low latency.
enqueuenow uses the auto-batcher by default;closeflushes pending messages. Version bumped to 0.3.0 and proto adds batch APIs.New Features
batch_enqueue(messages)returns per-message success/error results.:auto(default),:linger(linger_ms,batch_size),:disabled; single message usesEnqueue, 2+ useBatchEnqueue.ConsumeResponse.messageswith fallback tomessage.closedrains pending messages.BatchEnqueueRPC and batched consume; version0.3.0.Migration
enqueueroutes through the auto-batcher. Disable viabatch_mode: :disabled.batch_mode: :lingerwithlinger_msandbatch_sizefor timed batching.Written for commit 78aa15d. Summary will update on new commits.