From 8bb8a0943d4df61b6a6acefa14a8f3fc8c04fba5 Mon Sep 17 00:00:00 2001 From: Lucas Vieira Date: Wed, 25 Mar 2026 00:07:59 -0300 Subject: [PATCH 1/2] feat: unified api surface for story 30.2 - copy new service.proto (BatchEnqueue RPC removed, EnqueueRequest now takes repeated EnqueueMessage, AckRequest/NackRequest take repeated messages, ConsumeResponse only has repeated messages field) - regenerate ruby proto code from new service.proto - replace batch_enqueue method with enqueue_many (no "batch" prefix) - enqueue wraps single message in EnqueueMessage + EnqueueRequest - ack/nack wrap in repeated AckMessage/NackMessage, parse first result - consume uses only repeated messages field (singular field removed) - rename BatchEnqueueResult to EnqueueResult - batcher uses unified Enqueue RPC for all batch sizes - update all tests to match new api surface - bump version to 0.4.0 --- lib/fila.rb | 2 +- lib/fila/batcher.rb | 55 +++------ lib/fila/client.rb | 110 ++++++++++------- ...ch_enqueue_result.rb => enqueue_result.rb} | 8 +- lib/fila/proto/fila/v1/service_pb.rb | 21 +++- lib/fila/proto/fila/v1/service_services_pb.rb | 2 +- lib/fila/version.rb | 2 +- proto/fila/v1/service.proto | 116 +++++++++++++++--- test/test_batch.rb | 42 +++---- 9 files changed, 228 insertions(+), 130 deletions(-) rename lib/fila/{batch_enqueue_result.rb => enqueue_result.rb} (79%) diff --git a/lib/fila.rb b/lib/fila.rb index 977a9a9..3a74a1b 100644 --- a/lib/fila.rb +++ b/lib/fila.rb @@ -3,6 +3,6 @@ require_relative 'fila/version' require_relative 'fila/errors' require_relative 'fila/consume_message' -require_relative 'fila/batch_enqueue_result' +require_relative 'fila/enqueue_result' require_relative 'fila/batcher' require_relative 'fila/client' diff --git a/lib/fila/batcher.rb b/lib/fila/batcher.rb index 577f4d4..254f7de 100644 --- a/lib/fila/batcher.rb +++ b/lib/fila/batcher.rb @@ -1,14 +1,14 @@ # frozen_string_literal: true module Fila - # Background batcher that collects enqueue requests and flushes them - # in batches via BatchEnqueue RPC. Supports auto (opportunistic) and - # linger (timer-based) modes. + # Background batcher that collects enqueue messages and flushes them + # in batches via the unified Enqueue RPC. Supports auto (opportunistic) + # and linger (timer-based) modes. # # @api private class Batcher # An item queued for batching, pairing a message with its result slot. - BatchItem = Struct.new(:request, :result_queue, keyword_init: true) + BatchItem = Struct.new(:message, :result_queue, keyword_init: true) # @param stub [Fila::V1::FilaService::Stub] gRPC stub # @param metadata [Hash] call metadata (auth headers) @@ -33,13 +33,13 @@ def initialize(stub:, metadata:, mode:, max_batch_size: 100, batch_size: 100, li # Submit a message for batched sending. Blocks until the batch # containing this message is flushed and the result is available. # - # @param request [Fila::V1::EnqueueRequest] the enqueue request + # @param message [Fila::V1::EnqueueMessage] the enqueue message # @return [String] message ID on success # @raise [Fila::QueueNotFoundError] if the queue does not exist # @raise [Fila::RPCError] for unexpected gRPC failures - def submit(request) + def submit(message) result_queue = Queue.new - item = BatchItem.new(request: request, result_queue: result_queue) + item = BatchItem.new(message: message, result_queue: result_queue) @mutex.synchronize do raise Fila::Error, 'batcher is closed' if @stopped @@ -128,43 +128,28 @@ def drain_nonblocking(batch) end end - # Flush a batch of items. Uses single Enqueue RPC for 1 message - # (preserves exact error types like QueueNotFoundError), and - # BatchEnqueue RPC for 2+ messages. + # Flush a batch of items via the unified Enqueue RPC. def flush_batch(items) - if items.size == 1 - flush_single(items.first) - else - flush_multi(items) - end - end - - def flush_single(item) - resp = @stub.enqueue(item.request, metadata: @metadata) - item.result_queue.push(resp.message_id) - rescue GRPC::NotFound => e - item.result_queue.push(QueueNotFoundError.new("enqueue: #{e.details}")) - rescue GRPC::BadStatus => e - item.result_queue.push(RPCError.new(e.code, e.details)) - rescue StandardError => e - item.result_queue.push(Fila::Error.new(e.message)) - end - - def flush_multi(items) - req = ::Fila::V1::BatchEnqueueRequest.new( - messages: items.map(&:request) + req = ::Fila::V1::EnqueueRequest.new( + messages: items.map(&:message) ) - resp = @stub.batch_enqueue(req, metadata: @metadata) + resp = @stub.enqueue(req, metadata: @metadata) results = resp.results items.each_with_index do |item, idx| result = results[idx] if result.nil? item.result_queue.push(Fila::Error.new('no result from server')) - elsif result.result == :success - item.result_queue.push(result.success.message_id) + elsif result.result == :message_id + item.result_queue.push(result.message_id) else - item.result_queue.push(RPCError.new(GRPC::Core::StatusCodes::INTERNAL, result.error)) + err = result.error + case err.code + when :ENQUEUE_ERROR_CODE_QUEUE_NOT_FOUND + item.result_queue.push(QueueNotFoundError.new("enqueue: #{err.message}")) + else + item.result_queue.push(RPCError.new(GRPC::Core::StatusCodes::INTERNAL, err.message)) + end end end rescue GRPC::BadStatus => e diff --git a/lib/fila/client.rb b/lib/fila/client.rb index ce4e8a8..e3cb12e 100644 --- a/lib/fila/client.rb +++ b/lib/fila/client.rb @@ -8,7 +8,7 @@ require_relative 'proto/fila/v1/service_services_pb' require_relative 'errors' require_relative 'consume_message' -require_relative 'batch_enqueue_result' +require_relative 'enqueue_result' require_relative 'batcher' module Fila @@ -69,7 +69,7 @@ def close @batcher = nil end - # Enqueue a message to a queue. + # Enqueue a single message to a queue. # # When batching is enabled (default), the message is submitted to # the background batcher. At low load each message is sent @@ -82,50 +82,50 @@ def close # @raise [QueueNotFoundError] if the queue does not exist # @raise [RPCError] for unexpected gRPC failures def enqueue(queue:, payload:, headers: nil) - req = ::Fila::V1::EnqueueRequest.new( + msg = ::Fila::V1::EnqueueMessage.new( queue: queue, headers: headers || {}, payload: payload ) if @batcher - @batcher.submit(req) + @batcher.submit(msg) else - enqueue_direct(req) + enqueue_single(msg) end end - # Enqueue a batch of messages in a single RPC call. + # Enqueue multiple messages in a single RPC call. # # Each message is independently validated and processed. A failed # message does not affect the others. Returns an array of - # BatchEnqueueResult with one result per input message, in order. + # EnqueueResult with one result per input message, in order. # # This bypasses the background batcher and always uses the - # BatchEnqueue RPC directly. + # Enqueue RPC directly. # # @param messages [Array] messages to enqueue; each hash has # keys :queue (String), :payload (String), and optionally # :headers (Hash) - # @return [Array] + # @return [Array] # @raise [RPCError] for transport-level gRPC failures - def batch_enqueue(messages) + def enqueue_many(messages) proto_messages = messages.map do |m| - ::Fila::V1::EnqueueRequest.new( + ::Fila::V1::EnqueueMessage.new( queue: m[:queue], headers: m[:headers] || {}, payload: m[:payload] ) end - req = ::Fila::V1::BatchEnqueueRequest.new(messages: proto_messages) - resp = @stub.batch_enqueue(req, metadata: call_metadata) + req = ::Fila::V1::EnqueueRequest.new(messages: proto_messages) + resp = @stub.enqueue(req, metadata: call_metadata) resp.results.map do |r| - if r.result == :success - BatchEnqueueResult.new(message_id: r.success.message_id) + if r.result == :message_id + EnqueueResult.new(message_id: r.message_id) else - BatchEnqueueResult.new(error: r.error) + EnqueueResult.new(error: r.error.message) end end rescue GRPC::BadStatus => e @@ -133,8 +133,6 @@ def batch_enqueue(messages) end # Open a streaming consumer. Yields messages as they arrive. - # Transparently unpacks batched delivery (repeated messages field) - # with fallback to singular message field. # Returns an Enumerator if no block given. def consume(queue:, &block) return enum_for(:consume, queue: queue) unless block @@ -149,11 +147,20 @@ def consume(queue:, &block) # @raise [MessageNotFoundError] if the message does not exist # @raise [RPCError] for unexpected gRPC failures def ack(queue:, msg_id:) - req = ::Fila::V1::AckRequest.new(queue: queue, message_id: msg_id) - @stub.ack(req, metadata: call_metadata) - nil - rescue GRPC::NotFound => e - raise MessageNotFoundError, "ack: #{e.details}" + msg = ::Fila::V1::AckMessage.new(queue: queue, message_id: msg_id) + req = ::Fila::V1::AckRequest.new(messages: [msg]) + resp = @stub.ack(req, metadata: call_metadata) + + result = resp.results.first + return if result.nil? || result.result == :success + + err = result.error + case err.code + when :ACK_ERROR_CODE_MESSAGE_NOT_FOUND + raise MessageNotFoundError, "ack: #{err.message}" + else + raise RPCError.new(GRPC::Core::StatusCodes::INTERNAL, "ack: #{err.message}") + end rescue GRPC::BadStatus => e raise RPCError.new(e.code, e.details) end @@ -166,11 +173,20 @@ def ack(queue:, msg_id:) # @raise [MessageNotFoundError] if the message does not exist # @raise [RPCError] for unexpected gRPC failures def nack(queue:, msg_id:, error:) - req = ::Fila::V1::NackRequest.new(queue: queue, message_id: msg_id, error: error) - @stub.nack(req, metadata: call_metadata) - nil - rescue GRPC::NotFound => e - raise MessageNotFoundError, "nack: #{e.details}" + msg = ::Fila::V1::NackMessage.new(queue: queue, message_id: msg_id, error: error) + req = ::Fila::V1::NackRequest.new(messages: [msg]) + resp = @stub.nack(req, metadata: call_metadata) + + result = resp.results.first + return if result.nil? || result.result == :success + + err = result.error + case err.code + when :NACK_ERROR_CODE_MESSAGE_NOT_FOUND + raise MessageNotFoundError, "nack: #{err.message}" + else + raise RPCError.new(GRPC::Core::StatusCodes::INTERNAL, "nack: #{err.message}") + end rescue GRPC::BadStatus => e raise RPCError.new(e.code, e.details) end @@ -200,11 +216,25 @@ def start_batcher(mode, max_batch_size, batch_size, linger_ms) ) end - def enqueue_direct(req) + # Send a single message via the unified Enqueue RPC. + def enqueue_single(msg) + req = ::Fila::V1::EnqueueRequest.new(messages: [msg]) resp = @stub.enqueue(req, metadata: call_metadata) - resp.message_id - rescue GRPC::NotFound => e - raise QueueNotFoundError, "enqueue: #{e.details}" + + result = resp.results.first + raise RPCError.new(GRPC::Core::StatusCodes::INTERNAL, 'no result from server') if result.nil? + + if result.result == :message_id + result.message_id + else + err = result.error + case err.code + when :ENQUEUE_ERROR_CODE_QUEUE_NOT_FOUND + raise QueueNotFoundError, "enqueue: #{err.message}" + else + raise RPCError.new(GRPC::Core::StatusCodes::INTERNAL, "enqueue: #{err.message}") + end + end rescue GRPC::BadStatus => e raise RPCError.new(e.code, e.details) end @@ -226,20 +256,10 @@ def consume_with_redirect(queue:, redirected:, &block) raise RPCError.new(e.code, e.details) end - # Unpack messages from a ConsumeResponse. Prefers the repeated - # messages field (batched delivery); falls back to singular message - # field for backward compatibility with older servers. + # Unpack messages from a ConsumeResponse. def yield_messages_from_response(resp, &block) - msgs = resp.messages - if msgs && !msgs.empty? - msgs.each do |msg| - next if msg.nil? || msg.id.empty? - - block.call(build_consume_message(msg)) - end - else - msg = resp.message - return if msg.nil? || msg.id.empty? + resp.messages.each do |msg| + next if msg.nil? || msg.id.empty? block.call(build_consume_message(msg)) end diff --git a/lib/fila/batch_enqueue_result.rb b/lib/fila/enqueue_result.rb similarity index 79% rename from lib/fila/batch_enqueue_result.rb rename to lib/fila/enqueue_result.rb index be9cf13..ce8b3db 100644 --- a/lib/fila/batch_enqueue_result.rb +++ b/lib/fila/enqueue_result.rb @@ -1,13 +1,13 @@ # frozen_string_literal: true module Fila - # Result of a single message within a batch enqueue call. + # Result of a single message within an enqueue_many call. # - # Each message in a batch is independently validated and processed. + # Each message is independently validated and processed. # A failed message does not affect the others. # # @example - # results = client.batch_enqueue(messages) + # results = client.enqueue_many(messages) # results.each do |r| # if r.success? # puts "Enqueued: #{r.message_id}" @@ -15,7 +15,7 @@ module Fila # puts "Failed: #{r.error}" # end # end - class BatchEnqueueResult + class EnqueueResult # @return [String, nil] broker-assigned message ID on success attr_reader :message_id diff --git a/lib/fila/proto/fila/v1/service_pb.rb b/lib/fila/proto/fila/v1/service_pb.rb index 6120065..9751f67 100644 --- a/lib/fila/proto/fila/v1/service_pb.rb +++ b/lib/fila/proto/fila/v1/service_pb.rb @@ -7,23 +7,36 @@ require 'fila/v1/messages_pb' -descriptor_data = "\n\x15\x66ila/v1/service.proto\x12\x07\x66ila.v1\x1a\x16\x66ila/v1/messages.proto\"\x97\x01\n\x0e\x45nqueueRequest\x12\r\n\x05queue\x18\x01 \x01(\t\x12\x35\n\x07headers\x18\x02 \x03(\x0b\x32$.fila.v1.EnqueueRequest.HeadersEntry\x12\x0f\n\x07payload\x18\x03 \x01(\x0c\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"%\n\x0f\x45nqueueResponse\x12\x12\n\nmessage_id\x18\x01 \x01(\t\"\x1f\n\x0e\x43onsumeRequest\x12\r\n\x05queue\x18\x01 \x01(\t\"X\n\x0f\x43onsumeResponse\x12!\n\x07message\x18\x01 \x01(\x0b\x32\x10.fila.v1.Message\x12\"\n\x08messages\x18\x02 \x03(\x0b\x32\x10.fila.v1.Message\"/\n\nAckRequest\x12\r\n\x05queue\x18\x01 \x01(\t\x12\x12\n\nmessage_id\x18\x02 \x01(\t\"\r\n\x0b\x41\x63kResponse\"?\n\x0bNackRequest\x12\r\n\x05queue\x18\x01 \x01(\t\x12\x12\n\nmessage_id\x18\x02 \x01(\t\x12\r\n\x05\x65rror\x18\x03 \x01(\t\"\x0e\n\x0cNackResponse\"@\n\x13\x42\x61tchEnqueueRequest\x12)\n\x08messages\x18\x01 \x03(\x0b\x32\x17.fila.v1.EnqueueRequest\"D\n\x14\x42\x61tchEnqueueResponse\x12,\n\x07results\x18\x01 \x03(\x0b\x32\x1b.fila.v1.BatchEnqueueResult\"\\\n\x12\x42\x61tchEnqueueResult\x12+\n\x07success\x18\x01 \x01(\x0b\x32\x18.fila.v1.EnqueueResponseH\x00\x12\x0f\n\x05\x65rror\x18\x02 \x01(\tH\x00\x42\x08\n\x06result2\xbf\x02\n\x0b\x46ilaService\x12<\n\x07\x45nqueue\x12\x17.fila.v1.EnqueueRequest\x1a\x18.fila.v1.EnqueueResponse\x12K\n\x0c\x42\x61tchEnqueue\x12\x1c.fila.v1.BatchEnqueueRequest\x1a\x1d.fila.v1.BatchEnqueueResponse\x12>\n\x07\x43onsume\x12\x17.fila.v1.ConsumeRequest\x1a\x18.fila.v1.ConsumeResponse0\x01\x12\x30\n\x03\x41\x63k\x12\x13.fila.v1.AckRequest\x1a\x14.fila.v1.AckResponse\x12\x33\n\x04Nack\x12\x14.fila.v1.NackRequest\x1a\x15.fila.v1.NackResponseb\x06proto3" +descriptor_data = "\n\x15\x66ila/v1/service.proto\x12\x07\x66ila.v1\x1a\x16\x66ila/v1/messages.proto\"\x97\x01\n\x0e\x45nqueueMessage\x12\r\n\x05queue\x18\x01 \x01(\t\x12\x35\n\x07headers\x18\x02 \x03(\x0b\x32$.fila.v1.EnqueueMessage.HeadersEntry\x12\x0f\n\x07payload\x18\x03 \x01(\x0c\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\";\n\x0e\x45nqueueRequest\x12)\n\x08messages\x18\x01 \x03(\x0b\x32\x17.fila.v1.EnqueueMessage\"W\n\rEnqueueResult\x12\x14\n\nmessage_id\x18\x01 \x01(\tH\x00\x12&\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x15.fila.v1.EnqueueErrorH\x00\x42\x08\n\x06result\"H\n\x0c\x45nqueueError\x12\'\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x19.fila.v1.EnqueueErrorCode\x12\x0f\n\x07message\x18\x02 \x01(\t\":\n\x0f\x45nqueueResponse\x12\'\n\x07results\x18\x01 \x03(\x0b\x32\x16.fila.v1.EnqueueResult\"\x1f\n\x0e\x43onsumeRequest\x12\r\n\x05queue\x18\x01 \x01(\t\"5\n\x0f\x43onsumeResponse\x12\"\n\x08messages\x18\x01 \x03(\x0b\x32\x10.fila.v1.Message\"/\n\nAckMessage\x12\r\n\x05queue\x18\x01 \x01(\t\x12\x12\n\nmessage_id\x18\x02 \x01(\t\"3\n\nAckRequest\x12%\n\x08messages\x18\x01 \x03(\x0b\x32\x13.fila.v1.AckMessage\"a\n\tAckResult\x12&\n\x07success\x18\x01 \x01(\x0b\x32\x13.fila.v1.AckSuccessH\x00\x12\"\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x11.fila.v1.AckErrorH\x00\x42\x08\n\x06result\"\x0c\n\nAckSuccess\"@\n\x08\x41\x63kError\x12#\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x15.fila.v1.AckErrorCode\x12\x0f\n\x07message\x18\x02 \x01(\t\"2\n\x0b\x41\x63kResponse\x12#\n\x07results\x18\x01 \x03(\x0b\x32\x12.fila.v1.AckResult\"?\n\x0bNackMessage\x12\r\n\x05queue\x18\x01 \x01(\t\x12\x12\n\nmessage_id\x18\x02 \x01(\t\x12\r\n\x05\x65rror\x18\x03 \x01(\t\"5\n\x0bNackRequest\x12&\n\x08messages\x18\x01 \x03(\x0b\x32\x14.fila.v1.NackMessage\"d\n\nNackResult\x12\'\n\x07success\x18\x01 \x01(\x0b\x32\x14.fila.v1.NackSuccessH\x00\x12#\n\x05\x65rror\x18\x02 \x01(\x0b\x32\x12.fila.v1.NackErrorH\x00\x42\x08\n\x06result\"\r\n\x0bNackSuccess\"B\n\tNackError\x12$\n\x04\x63ode\x18\x01 \x01(\x0e\x32\x16.fila.v1.NackErrorCode\x12\x0f\n\x07message\x18\x02 \x01(\t\"4\n\x0cNackResponse\x12$\n\x07results\x18\x01 \x03(\x0b\x32\x13.fila.v1.NackResult\"Z\n\x14StreamEnqueueRequest\x12)\n\x08messages\x18\x01 \x03(\x0b\x32\x17.fila.v1.EnqueueMessage\x12\x17\n\x0fsequence_number\x18\x02 \x01(\x04\"Y\n\x15StreamEnqueueResponse\x12\x17\n\x0fsequence_number\x18\x01 \x01(\x04\x12\'\n\x07results\x18\x02 \x03(\x0b\x32\x16.fila.v1.EnqueueResult*\xc4\x01\n\x10\x45nqueueErrorCode\x12\"\n\x1e\x45NQUEUE_ERROR_CODE_UNSPECIFIED\x10\x00\x12&\n\"ENQUEUE_ERROR_CODE_QUEUE_NOT_FOUND\x10\x01\x12\x1e\n\x1a\x45NQUEUE_ERROR_CODE_STORAGE\x10\x02\x12\x1a\n\x16\x45NQUEUE_ERROR_CODE_LUA\x10\x03\x12(\n$ENQUEUE_ERROR_CODE_PERMISSION_DENIED\x10\x04*\x96\x01\n\x0c\x41\x63kErrorCode\x12\x1e\n\x1a\x41\x43K_ERROR_CODE_UNSPECIFIED\x10\x00\x12$\n ACK_ERROR_CODE_MESSAGE_NOT_FOUND\x10\x01\x12\x1a\n\x16\x41\x43K_ERROR_CODE_STORAGE\x10\x02\x12$\n ACK_ERROR_CODE_PERMISSION_DENIED\x10\x03*\x9b\x01\n\rNackErrorCode\x12\x1f\n\x1bNACK_ERROR_CODE_UNSPECIFIED\x10\x00\x12%\n!NACK_ERROR_CODE_MESSAGE_NOT_FOUND\x10\x01\x12\x1b\n\x17NACK_ERROR_CODE_STORAGE\x10\x02\x12%\n!NACK_ERROR_CODE_PERMISSION_DENIED\x10\x03\x32\xc6\x02\n\x0b\x46ilaService\x12<\n\x07\x45nqueue\x12\x17.fila.v1.EnqueueRequest\x1a\x18.fila.v1.EnqueueResponse\x12R\n\rStreamEnqueue\x12\x1d.fila.v1.StreamEnqueueRequest\x1a\x1e.fila.v1.StreamEnqueueResponse(\x01\x30\x01\x12>\n\x07\x43onsume\x12\x17.fila.v1.ConsumeRequest\x1a\x18.fila.v1.ConsumeResponse0\x01\x12\x30\n\x03\x41\x63k\x12\x13.fila.v1.AckRequest\x1a\x14.fila.v1.AckResponse\x12\x33\n\x04Nack\x12\x14.fila.v1.NackRequest\x1a\x15.fila.v1.NackResponseb\x06proto3" pool = ::Google::Protobuf::DescriptorPool.generated_pool pool.add_serialized_file(descriptor_data) module Fila module V1 + EnqueueMessage = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.EnqueueMessage").msgclass EnqueueRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.EnqueueRequest").msgclass + EnqueueResult = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.EnqueueResult").msgclass + EnqueueError = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.EnqueueError").msgclass EnqueueResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.EnqueueResponse").msgclass ConsumeRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.ConsumeRequest").msgclass ConsumeResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.ConsumeResponse").msgclass + AckMessage = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.AckMessage").msgclass AckRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.AckRequest").msgclass + AckResult = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.AckResult").msgclass + AckSuccess = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.AckSuccess").msgclass + AckError = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.AckError").msgclass AckResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.AckResponse").msgclass + NackMessage = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.NackMessage").msgclass NackRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.NackRequest").msgclass + NackResult = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.NackResult").msgclass + NackSuccess = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.NackSuccess").msgclass + NackError = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.NackError").msgclass NackResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.NackResponse").msgclass - BatchEnqueueRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.BatchEnqueueRequest").msgclass - BatchEnqueueResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.BatchEnqueueResponse").msgclass - BatchEnqueueResult = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.BatchEnqueueResult").msgclass + StreamEnqueueRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.StreamEnqueueRequest").msgclass + StreamEnqueueResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.StreamEnqueueResponse").msgclass + EnqueueErrorCode = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.EnqueueErrorCode").enummodule + AckErrorCode = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.AckErrorCode").enummodule + NackErrorCode = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.NackErrorCode").enummodule end end diff --git a/lib/fila/proto/fila/v1/service_services_pb.rb b/lib/fila/proto/fila/v1/service_services_pb.rb index 0ec0dcb..93d38ab 100644 --- a/lib/fila/proto/fila/v1/service_services_pb.rb +++ b/lib/fila/proto/fila/v1/service_services_pb.rb @@ -17,7 +17,7 @@ class Service self.service_name = 'fila.v1.FilaService' rpc :Enqueue, ::Fila::V1::EnqueueRequest, ::Fila::V1::EnqueueResponse - rpc :BatchEnqueue, ::Fila::V1::BatchEnqueueRequest, ::Fila::V1::BatchEnqueueResponse + rpc :StreamEnqueue, stream(::Fila::V1::StreamEnqueueRequest), stream(::Fila::V1::StreamEnqueueResponse) rpc :Consume, ::Fila::V1::ConsumeRequest, stream(::Fila::V1::ConsumeResponse) rpc :Ack, ::Fila::V1::AckRequest, ::Fila::V1::AckResponse rpc :Nack, ::Fila::V1::NackRequest, ::Fila::V1::NackResponse diff --git a/lib/fila/version.rb b/lib/fila/version.rb index e7c9e84..ecc3d06 100644 --- a/lib/fila/version.rb +++ b/lib/fila/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module Fila - VERSION = '0.3.0' + VERSION = '0.4.0' end diff --git a/proto/fila/v1/service.proto b/proto/fila/v1/service.proto index fc0f710..7d1db79 100644 --- a/proto/fila/v1/service.proto +++ b/proto/fila/v1/service.proto @@ -6,20 +6,49 @@ import "fila/v1/messages.proto"; // Hot-path RPCs for producers and consumers. service FilaService { rpc Enqueue(EnqueueRequest) returns (EnqueueResponse); - rpc BatchEnqueue(BatchEnqueueRequest) returns (BatchEnqueueResponse); + rpc StreamEnqueue(stream StreamEnqueueRequest) returns (stream StreamEnqueueResponse); rpc Consume(ConsumeRequest) returns (stream ConsumeResponse); rpc Ack(AckRequest) returns (AckResponse); rpc Nack(NackRequest) returns (NackResponse); } -message EnqueueRequest { +// Individual message to enqueue. +message EnqueueMessage { string queue = 1; map headers = 2; bytes payload = 3; } +// Enqueue one or more messages. +message EnqueueRequest { + repeated EnqueueMessage messages = 1; +} + +// Per-message enqueue result. +message EnqueueResult { + oneof result { + string message_id = 1; + EnqueueError error = 2; + } +} + +// Typed enqueue error with structured error code. +message EnqueueError { + EnqueueErrorCode code = 1; + string message = 2; +} + +enum EnqueueErrorCode { + ENQUEUE_ERROR_CODE_UNSPECIFIED = 0; + ENQUEUE_ERROR_CODE_QUEUE_NOT_FOUND = 1; + ENQUEUE_ERROR_CODE_STORAGE = 2; + ENQUEUE_ERROR_CODE_LUA = 3; + ENQUEUE_ERROR_CODE_PERMISSION_DENIED = 4; +} + +// One result per input message. message EnqueueResponse { - string message_id = 1; + repeated EnqueueResult results = 1; } message ConsumeRequest { @@ -27,36 +56,87 @@ message ConsumeRequest { } message ConsumeResponse { - Message message = 1; // Single message (backward compatible, used when batch size is 1) - repeated Message messages = 2; // Batched messages (populated when server sends multiple at once) + repeated Message messages = 1; } -message AckRequest { +// Individual ack item. +message AckMessage { string queue = 1; string message_id = 2; } -message AckResponse {} +message AckRequest { + repeated AckMessage messages = 1; +} + +message AckResult { + oneof result { + AckSuccess success = 1; + AckError error = 2; + } +} -message NackRequest { +message AckSuccess {} + +message AckError { + AckErrorCode code = 1; + string message = 2; +} + +enum AckErrorCode { + ACK_ERROR_CODE_UNSPECIFIED = 0; + ACK_ERROR_CODE_MESSAGE_NOT_FOUND = 1; + ACK_ERROR_CODE_STORAGE = 2; + ACK_ERROR_CODE_PERMISSION_DENIED = 3; +} + +message AckResponse { + repeated AckResult results = 1; +} + +// Individual nack item. +message NackMessage { string queue = 1; string message_id = 2; string error = 3; } -message NackResponse {} +message NackRequest { + repeated NackMessage messages = 1; +} + +message NackResult { + oneof result { + NackSuccess success = 1; + NackError error = 2; + } +} -message BatchEnqueueRequest { - repeated EnqueueRequest messages = 1; +message NackSuccess {} + +message NackError { + NackErrorCode code = 1; + string message = 2; } -message BatchEnqueueResponse { - repeated BatchEnqueueResult results = 1; +enum NackErrorCode { + NACK_ERROR_CODE_UNSPECIFIED = 0; + NACK_ERROR_CODE_MESSAGE_NOT_FOUND = 1; + NACK_ERROR_CODE_STORAGE = 2; + NACK_ERROR_CODE_PERMISSION_DENIED = 3; } -message BatchEnqueueResult { - oneof result { - EnqueueResponse success = 1; - string error = 2; - } +message NackResponse { + repeated NackResult results = 1; +} + +// Stream enqueue — per-write batch with sequence tracking. +message StreamEnqueueRequest { + repeated EnqueueMessage messages = 1; + uint64 sequence_number = 2; +} + +message StreamEnqueueResponse { + uint64 sequence_number = 1; + repeated EnqueueResult results = 2; } diff --git a/test/test_batch.rb b/test/test_batch.rb index 5a8778d..5bdb4fb 100644 --- a/test/test_batch.rb +++ b/test/test_batch.rb @@ -4,7 +4,7 @@ return unless FILA_SERVER_AVAILABLE -class TestBatchEnqueue < Minitest::Test +class TestEnqueueMany < Minitest::Test def setup @server = TestServerHelper.start @client = Fila::Client.new(@server[:addr], batch_mode: :disabled) @@ -15,14 +15,14 @@ def teardown TestServerHelper.stop(@server) if @server end - def test_batch_enqueue_multiple_messages - TestServerHelper.create_queue(@server, 'batch-test') + def test_enqueue_many_multiple_messages + TestServerHelper.create_queue(@server, 'many-test') messages = 5.times.map do |i| - { queue: 'batch-test', payload: "batch-msg-#{i}", headers: { 'index' => i.to_s } } + { queue: 'many-test', payload: "many-msg-#{i}", headers: { 'index' => i.to_s } } end - results = @client.batch_enqueue(messages) + results = @client.enqueue_many(messages) assert_equal 5, results.size results.each do |r| @@ -32,19 +32,19 @@ def test_batch_enqueue_multiple_messages # Verify all messages are consumable. received_ids = [] - @client.consume(queue: 'batch-test') do |msg| + @client.consume(queue: 'many-test') do |msg| received_ids << msg.id - @client.ack(queue: 'batch-test', msg_id: msg.id) + @client.ack(queue: 'many-test', msg_id: msg.id) break if received_ids.size >= 5 end assert_equal 5, received_ids.size end - def test_batch_enqueue_single_message - TestServerHelper.create_queue(@server, 'batch-single') + def test_enqueue_many_single_message + TestServerHelper.create_queue(@server, 'many-single') - results = @client.batch_enqueue([ - { queue: 'batch-single', payload: 'solo' } + results = @client.enqueue_many([ + { queue: 'many-single', payload: 'solo' } ]) assert_equal 1, results.size @@ -52,21 +52,21 @@ def test_batch_enqueue_single_message refute_empty results.first.message_id end - def test_batch_enqueue_empty_array - results = @client.batch_enqueue([]) + def test_enqueue_many_empty_array + results = @client.enqueue_many([]) assert_equal 0, results.size end - def test_batch_enqueue_mixed_success_and_failure - TestServerHelper.create_queue(@server, 'batch-mixed') + def test_enqueue_many_mixed_success_and_failure + TestServerHelper.create_queue(@server, 'many-mixed') messages = [ - { queue: 'batch-mixed', payload: 'good-1' }, + { queue: 'many-mixed', payload: 'good-1' }, { queue: 'no-such-queue-xyz', payload: 'bad' }, - { queue: 'batch-mixed', payload: 'good-2' } + { queue: 'many-mixed', payload: 'good-2' } ] - results = @client.batch_enqueue(messages) + results = @client.enqueue_many(messages) assert_equal 3, results.size assert results[0].success?, "first message should succeed" @@ -76,16 +76,16 @@ def test_batch_enqueue_mixed_success_and_failure end end -class TestBatchEnqueueResult < Minitest::Test +class TestEnqueueResult < Minitest::Test def test_success_result - r = Fila::BatchEnqueueResult.new(message_id: 'abc-123') + r = Fila::EnqueueResult.new(message_id: 'abc-123') assert r.success? assert_equal 'abc-123', r.message_id assert_nil r.error end def test_error_result - r = Fila::BatchEnqueueResult.new(error: 'queue not found') + r = Fila::EnqueueResult.new(error: 'queue not found') refute r.success? assert_nil r.message_id assert_equal 'queue not found', r.error From 88a334abd493debdb5d32487dadd153210949449 Mon Sep 17 00:00:00 2001 From: Lucas Vieira Date: Wed, 25 Mar 2026 09:53:52 -0300 Subject: [PATCH 2/2] fix: address cubic finding and rubocop lint offenses MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - ack/nack: raise RPCError on nil result instead of silently returning (identified by cubic — nil means zero results, which is an error for single-message operations) - batcher: extract result_to_outcome and broadcast_error from flush_batch to satisfy AbcSize/CyclomaticComplexity/MethodLength/PerceivedComplexity - batcher: remove redundant begin block in pop_with_timeout - batcher: add rubocop:disable for Metrics/ClassLength (126 lines, thread management + two modes is inherently complex) - test_batch: fix array indentation and prefer single-quoted strings --- lib/fila/batcher.rb | 57 +++++++++++++++++++++------------------------ lib/fila/client.rb | 6 +++-- test/test_batch.rb | 14 +++++------ 3 files changed, 38 insertions(+), 39 deletions(-) diff --git a/lib/fila/batcher.rb b/lib/fila/batcher.rb index 254f7de..593d323 100644 --- a/lib/fila/batcher.rb +++ b/lib/fila/batcher.rb @@ -6,7 +6,7 @@ module Fila # and linger (timer-based) modes. # # @api private - class Batcher + class Batcher # rubocop:disable Metrics/ClassLength # An item queued for batching, pairing a message with its result slot. BatchItem = Struct.new(:message, :result_queue, keyword_init: true) @@ -130,34 +130,33 @@ def drain_nonblocking(batch) # Flush a batch of items via the unified Enqueue RPC. def flush_batch(items) - req = ::Fila::V1::EnqueueRequest.new( - messages: items.map(&:message) - ) - resp = @stub.enqueue(req, metadata: @metadata) - results = resp.results + req = ::Fila::V1::EnqueueRequest.new(messages: items.map(&:message)) + results = @stub.enqueue(req, metadata: @metadata).results items.each_with_index do |item, idx| - result = results[idx] - if result.nil? - item.result_queue.push(Fila::Error.new('no result from server')) - elsif result.result == :message_id - item.result_queue.push(result.message_id) - else - err = result.error - case err.code - when :ENQUEUE_ERROR_CODE_QUEUE_NOT_FOUND - item.result_queue.push(QueueNotFoundError.new("enqueue: #{err.message}")) - else - item.result_queue.push(RPCError.new(GRPC::Core::StatusCodes::INTERNAL, err.message)) - end - end + item.result_queue.push(result_to_outcome(results[idx])) end rescue GRPC::BadStatus => e - # Transport-level failure: all messages in this batch get the error. - err = RPCError.new(e.code, e.details) - items.each { |item| item.result_queue.push(err) } + broadcast_error(items, RPCError.new(e.code, e.details)) rescue StandardError => e - err = Fila::Error.new(e.message) + broadcast_error(items, Fila::Error.new(e.message)) + end + + # Convert a single proto EnqueueResult into a String (message_id) or Exception. + def result_to_outcome(result) + return Fila::Error.new('no result from server') if result.nil? + return result.message_id if result.result == :message_id + + err = result.error + case err.code + when :ENQUEUE_ERROR_CODE_QUEUE_NOT_FOUND + QueueNotFoundError.new("enqueue: #{err.message}") + else + RPCError.new(GRPC::Core::StatusCodes::INTERNAL, err.message) + end + end + + def broadcast_error(items, err) items.each { |item| item.result_queue.push(err) } end @@ -170,13 +169,11 @@ def current_time_ms def pop_with_timeout(timeout_ms) deadline = current_time_ms + timeout_ms loop do - begin - return @queue.pop(true) - rescue ThreadError - raise if current_time_ms >= deadline + return @queue.pop(true) + rescue ThreadError + raise if current_time_ms >= deadline - sleep(0.001) # 1ms polling interval - end + sleep(0.001) # 1ms polling interval end end end diff --git a/lib/fila/client.rb b/lib/fila/client.rb index e3cb12e..53d6cff 100644 --- a/lib/fila/client.rb +++ b/lib/fila/client.rb @@ -152,7 +152,8 @@ def ack(queue:, msg_id:) resp = @stub.ack(req, metadata: call_metadata) result = resp.results.first - return if result.nil? || result.result == :success + raise RPCError.new(GRPC::Core::StatusCodes::INTERNAL, 'no result from server') if result.nil? + return if result.result == :success err = result.error case err.code @@ -178,7 +179,8 @@ def nack(queue:, msg_id:, error:) resp = @stub.nack(req, metadata: call_metadata) result = resp.results.first - return if result.nil? || result.result == :success + raise RPCError.new(GRPC::Core::StatusCodes::INTERNAL, 'no result from server') if result.nil? + return if result.result == :success err = result.error case err.code diff --git a/test/test_batch.rb b/test/test_batch.rb index 5bdb4fb..62f1b41 100644 --- a/test/test_batch.rb +++ b/test/test_batch.rb @@ -43,9 +43,9 @@ def test_enqueue_many_multiple_messages def test_enqueue_many_single_message TestServerHelper.create_queue(@server, 'many-single') - results = @client.enqueue_many([ - { queue: 'many-single', payload: 'solo' } - ]) + results = @client.enqueue_many( + [{ queue: 'many-single', payload: 'solo' }] + ) assert_equal 1, results.size assert results.first.success? @@ -69,10 +69,10 @@ def test_enqueue_many_mixed_success_and_failure results = @client.enqueue_many(messages) assert_equal 3, results.size - assert results[0].success?, "first message should succeed" - refute results[1].success?, "second message should fail (nonexistent queue)" - assert results[1].error, "second message should have error description" - assert results[2].success?, "third message should succeed" + assert results[0].success?, 'first message should succeed' + refute results[1].success?, 'second message should fail (nonexistent queue)' + assert results[1].error, 'second message should have error description' + assert results[2].success?, 'third message should succeed' end end