feat: 30.2 — unified api surface#4
Conversation
- update service.proto to unified api (no more BatchEnqueue rpc) - enqueue/ack/nack use repeated message fields - consume uses only repeated messages field (singular removed) - rename BatchMode to AccumulatorMode, batch.go to accumulator.go - add EnqueueMany() replacing BatchEnqueue() - ItemError wraps sentinel errors (ErrQueueNotFound, ErrMessageNotFound) so errors.Is works on per-item results - all 20 tests pass (5 unit + 15 integration)
There was a problem hiding this comment.
4 issues found across 13 files
Prompt for AI agents (unresolved issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="client.go">
<violation number="1" location="client.go:190">
P2: Disabled mode detection misses pointer values (`*AccumulatorModeDisabled`), so accumulation can be enabled unexpectedly.</violation>
</file>
<file name="proto/fila/v1/service.proto">
<violation number="1" location="proto/fila/v1/service.proto:25">
P2: Field numbers 2 and 3 from the old `EnqueueRequest` (and similarly in `AckRequest`, `NackRequest`, `ConsumeResponse`) are removed without `reserved` directives. If a future edit accidentally reuses these numbers, old serialized data will be silently misinterpreted. Add `reserved 2, 3;` to guard against that.</violation>
</file>
<file name="accumulator.go">
<violation number="1" location="accumulator.go:124">
P2: `runAuto` has no cap on the number of items drained into a single batch, unlike `runLinger` which respects `maxSize`. Under high throughput the entire 4096-item channel buffer can be flushed in one RPC, which risks exceeding gRPC's default 4 MB max message size and failing with `ResourceExhausted`. Consider adding a `maxSize` cap to the drain loop, consistent with `runLinger`.</violation>
</file>
<file name="enqueue.go">
<violation number="1" location="enqueue.go:71">
P2: `EnqueueMany` does not validate that response result count matches input message count, so missing per-item results are silently dropped.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| if _, disabled := batchMode.(BatchModeDisabled); !disabled { | ||
| c.batcher = newBatcher(svc, batchMode) | ||
| // Start accumulator unless disabled. | ||
| if _, disabled := accMode.(AccumulatorModeDisabled); !disabled { |
There was a problem hiding this comment.
P2: Disabled mode detection misses pointer values (*AccumulatorModeDisabled), so accumulation can be enabled unexpectedly.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At client.go, line 190:
<comment>Disabled mode detection misses pointer values (`*AccumulatorModeDisabled`), so accumulation can be enabled unexpectedly.</comment>
<file context>
@@ -186,19 +186,19 @@ func Dial(addr string, opts ...DialOption) (*Client, error) {
- if _, disabled := batchMode.(BatchModeDisabled); !disabled {
- c.batcher = newBatcher(svc, batchMode)
+ // Start accumulator unless disabled.
+ if _, disabled := accMode.(AccumulatorModeDisabled); !disabled {
+ c.accumulator = newAccumulator(svc, accMode)
}
</file context>
| if _, disabled := accMode.(AccumulatorModeDisabled); !disabled { | |
| disabled := false | |
| switch accMode.(type) { | |
| case AccumulatorModeDisabled, *AccumulatorModeDisabled: | |
| disabled = true | |
| } | |
| if !disabled { |
There was a problem hiding this comment.
Thanks for the update! Noted that it’s handled via a type switch covering both value and pointer types.
| } | ||
|
|
||
| // Enqueue one or more messages. | ||
| message EnqueueRequest { |
There was a problem hiding this comment.
P2: Field numbers 2 and 3 from the old EnqueueRequest (and similarly in AckRequest, NackRequest, ConsumeResponse) are removed without reserved directives. If a future edit accidentally reuses these numbers, old serialized data will be silently misinterpreted. Add reserved 2, 3; to guard against that.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At proto/fila/v1/service.proto, line 25:
<comment>Field numbers 2 and 3 from the old `EnqueueRequest` (and similarly in `AckRequest`, `NackRequest`, `ConsumeResponse`) are removed without `reserved` directives. If a future edit accidentally reuses these numbers, old serialized data will be silently misinterpreted. Add `reserved 2, 3;` to guard against that.</comment>
<file context>
@@ -8,57 +8,137 @@ import "fila/v1/messages.proto";
}
+// Enqueue one or more messages.
+message EnqueueRequest {
+ repeated EnqueueMessage messages = 1;
+}
</file context>
There was a problem hiding this comment.
Thanks for addressing this—adding reserved numbers for removed fields helps prevent accidental reuse.
| drain: | ||
| for { | ||
| select { | ||
| case item := <-a.ch: |
There was a problem hiding this comment.
P2: runAuto has no cap on the number of items drained into a single batch, unlike runLinger which respects maxSize. Under high throughput the entire 4096-item channel buffer can be flushed in one RPC, which risks exceeding gRPC's default 4 MB max message size and failing with ResourceExhausted. Consider adding a maxSize cap to the drain loop, consistent with runLinger.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At accumulator.go, line 124:
<comment>`runAuto` has no cap on the number of items drained into a single batch, unlike `runLinger` which respects `maxSize`. Under high throughput the entire 4096-item channel buffer can be flushed in one RPC, which risks exceeding gRPC's default 4 MB max message size and failing with `ResourceExhausted`. Consider adding a `maxSize` cap to the drain loop, consistent with `runLinger`.</comment>
<file context>
@@ -0,0 +1,266 @@
+ drain:
+ for {
+ select {
+ case item := <-a.ch:
+ batch = append(batch, item)
+ default:
</file context>
There was a problem hiding this comment.
Thanks for addressing this and aligning runAuto with runLinger’s behavior!
| results := make([]EnqueueManyResult, len(resp.Results)) | ||
| for i, r := range resp.Results { | ||
| switch v := r.Result.(type) { | ||
| case *filav1.BatchEnqueueResult_Success: | ||
| results[i] = BatchEnqueueResult{MessageID: v.Success.MessageId} | ||
| case *filav1.BatchEnqueueResult_Error: | ||
| results[i] = BatchEnqueueResult{Err: &BatchItemError{Message: v.Error}} | ||
| case *filav1.EnqueueResult_MessageId: | ||
| results[i] = EnqueueManyResult{MessageID: v.MessageId} | ||
| case *filav1.EnqueueResult_Error: | ||
| results[i] = EnqueueManyResult{Err: enqueueErrorToItemError(v.Error)} | ||
| default: | ||
| results[i] = BatchEnqueueResult{Err: &BatchItemError{Message: "unknown result type"}} | ||
| results[i] = EnqueueManyResult{Err: &ItemError{Message: "unknown result type"}} | ||
| } | ||
| } |
There was a problem hiding this comment.
P2: EnqueueMany does not validate that response result count matches input message count, so missing per-item results are silently dropped.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At enqueue.go, line 71:
<comment>`EnqueueMany` does not validate that response result count matches input message count, so missing per-item results are silently dropped.</comment>
<file context>
@@ -11,66 +12,116 @@ import (
}
- results := make([]BatchEnqueueResult, len(resp.Results))
+ results := make([]EnqueueManyResult, len(resp.Results))
for i, r := range resp.Results {
switch v := r.Result.(type) {
</file context>
| results := make([]EnqueueManyResult, len(resp.Results)) | |
| for i, r := range resp.Results { | |
| switch v := r.Result.(type) { | |
| case *filav1.BatchEnqueueResult_Success: | |
| results[i] = BatchEnqueueResult{MessageID: v.Success.MessageId} | |
| case *filav1.BatchEnqueueResult_Error: | |
| results[i] = BatchEnqueueResult{Err: &BatchItemError{Message: v.Error}} | |
| case *filav1.EnqueueResult_MessageId: | |
| results[i] = EnqueueManyResult{MessageID: v.MessageId} | |
| case *filav1.EnqueueResult_Error: | |
| results[i] = EnqueueManyResult{Err: enqueueErrorToItemError(v.Error)} | |
| default: | |
| results[i] = BatchEnqueueResult{Err: &BatchItemError{Message: "unknown result type"}} | |
| results[i] = EnqueueManyResult{Err: &ItemError{Message: "unknown result type"}} | |
| } | |
| } | |
| results := make([]EnqueueManyResult, len(messages)) | |
| for i := range messages { | |
| if i >= len(resp.Results) { | |
| results[i] = EnqueueManyResult{Err: fmt.Errorf("enqueue: server returned fewer results than messages sent")} | |
| continue | |
| } | |
| r := resp.Results[i] | |
| switch v := r.Result.(type) { | |
| case *filav1.EnqueueResult_MessageId: | |
| results[i] = EnqueueManyResult{MessageID: v.MessageId} | |
| case *filav1.EnqueueResult_Error: | |
| results[i] = EnqueueManyResult{Err: enqueueErrorToItemError(v.Error)} | |
| default: | |
| results[i] = EnqueueManyResult{Err: &ItemError{Message: "unknown result type"}} | |
| } | |
| } |
There was a problem hiding this comment.
Thanks for addressing this in PR #5—allocating by input size and filling missing results with errors will prevent silent drops.
- client.go: handle both value and pointer AccumulatorModeDisabled in type assertion to prevent accumulator from starting unexpectedly - accumulator.go: cap runAuto drain loop at maxAutoBatchSize (1000) to prevent exceeding gRPC 4MB max message size under high throughput - enqueue.go: validate EnqueueMany response result count matches input message count, fill missing results with explicit errors - proto: add reserved directives for removed field numbers in EnqueueRequest, ConsumeResponse, AckRequest, NackRequest to prevent accidental field number reuse
fix: address cubic review findings from PR #4
Summary
BatchEnqueueRPC andBatchEnqueue()methodEnqueue,Ack,Nacknow use repeated message fields in the protoConsumeuses only the repeatedmessagesfield (singularmessageremoved)BatchMode->AccumulatorMode,batch.go->accumulator.goEnqueueMany()replacingBatchEnqueue()for explicit multi-message enqueueItemErrorwraps sentinel errors (ErrQueueNotFound,ErrMessageNotFound) viaUnwrap()soerrors.Isworks on per-item resultsTest plan
TestExtractMessages*,TestProtoToConsumeMessageNilTestEnqueueManyExplicit,TestEnqueueManyPartialFailure,TestEnqueueManyEmpty,TestEnqueueManyNonexistentQueueTestEnqueueWith*Accumulator*,TestEnqueueConcurrentAccumulation,TestCloseFlushesPendingMessagesTestEnqueueConsumeAck,TestEnqueueConsumeNackRedeliver,TestEnqueueNonexistentQueueTestTLSConnection,TestAPIKeyAuth,TestAPIKeyAuthRejected,TestNoAuthBackwardCompatibleSummary by cubic
Unifies the Go SDK with the broker’s unified API. Removes
BatchEnqueue, adds an internal accumulator andEnqueueMany(), and moves Enqueue/Ack/Nack/Consume to repeated message fields for multi-message operations.New Features
proto/fila/v1/service.proto.AccumulatorMode(defaultAccumulatorModeAuto) for opportunistic multi-message sends.EnqueueMany()for explicit multi-message enqueue.ItemErrorwraps sentinel errors (e.g.,ErrQueueNotFound,ErrMessageNotFound) soerrors.Isworks per item.Consumenow delivers only viamessages(singularmessageremoved).Migration
BatchEnqueue()withEnqueueMany().WithBatchMode(...)and allBatchMode*types withWithAccumulatorMode(...)andAccumulatorMode*.ConsumeResponse.messageto useConsumeResponse.messages.EnqueueMany(), check results witherrors.Is(err, fila.ErrQueueNotFound)etc.Written for commit a18e208. Summary will update on new commits.