From 78aa15d12dd7dd823fe1feb30079b73dfbcd0ad3 Mon Sep 17 00:00:00 2001 From: Lucas Vieira Date: Tue, 24 Mar 2026 10:18:30 -0300 Subject: [PATCH] feat: add batch enqueue, delivery batching, and smart batch modes - Add batch_enqueue() for explicit multi-message BatchEnqueue RPC - Add background batcher with three modes: :auto (default), :linger, :disabled - Auto mode: opportunistic batching via Queue drain (zero latency at low load) - Linger mode: timer-based batching with configurable linger_ms and batch_size - Single-item optimization: 1 message uses Enqueue RPC (preserves error types) - Delivery batching: consume unpacks repeated messages field transparently - close() drains pending messages before disconnecting - Update proto with BatchEnqueue RPC and ConsumeResponse.messages field - Bump version to 0.3.0 --- lib/fila.rb | 2 + lib/fila/batch_enqueue_result.rb | 37 +++ lib/fila/batcher.rb | 198 +++++++++++++ lib/fila/client.rb | 158 +++++++++- lib/fila/proto/fila/v1/service_pb.rb | 5 +- lib/fila/proto/fila/v1/service_services_pb.rb | 1 + lib/fila/version.rb | 2 +- proto/fila/v1/service.proto | 19 +- test/test_batch.rb | 273 ++++++++++++++++++ test/test_client.rb | 1 + 10 files changed, 677 insertions(+), 19 deletions(-) create mode 100644 lib/fila/batch_enqueue_result.rb create mode 100644 lib/fila/batcher.rb create mode 100644 test/test_batch.rb diff --git a/lib/fila.rb b/lib/fila.rb index 91e321c..977a9a9 100644 --- a/lib/fila.rb +++ b/lib/fila.rb @@ -3,4 +3,6 @@ require_relative 'fila/version' require_relative 'fila/errors' require_relative 'fila/consume_message' +require_relative 'fila/batch_enqueue_result' +require_relative 'fila/batcher' require_relative 'fila/client' diff --git a/lib/fila/batch_enqueue_result.rb b/lib/fila/batch_enqueue_result.rb new file mode 100644 index 0000000..be9cf13 --- /dev/null +++ b/lib/fila/batch_enqueue_result.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true + +module Fila + # Result of a single message within a batch enqueue call. + # + # Each message in a batch is independently validated and processed. + # A failed message does not affect the others. + # + # @example + # results = client.batch_enqueue(messages) + # results.each do |r| + # if r.success? + # puts "Enqueued: #{r.message_id}" + # else + # puts "Failed: #{r.error}" + # end + # end + class BatchEnqueueResult + # @return [String, nil] broker-assigned message ID on success + attr_reader :message_id + + # @return [String, nil] error description on failure + attr_reader :error + + # @param message_id [String, nil] message ID if successful + # @param error [String, nil] error string if failed + def initialize(message_id: nil, error: nil) + @message_id = message_id + @error = error + end + + # @return [Boolean] true if the message was successfully enqueued + def success? + !@message_id.nil? + end + end +end diff --git a/lib/fila/batcher.rb b/lib/fila/batcher.rb new file mode 100644 index 0000000..577f4d4 --- /dev/null +++ b/lib/fila/batcher.rb @@ -0,0 +1,198 @@ +# frozen_string_literal: true + +module Fila + # Background batcher that collects enqueue requests and flushes them + # in batches via BatchEnqueue RPC. Supports auto (opportunistic) and + # linger (timer-based) modes. + # + # @api private + class Batcher + # An item queued for batching, pairing a message with its result slot. + BatchItem = Struct.new(:request, :result_queue, keyword_init: true) + + # @param stub [Fila::V1::FilaService::Stub] gRPC stub + # @param metadata [Hash] call metadata (auth headers) + # @param mode [Symbol] :auto or :linger + # @param max_batch_size [Integer] cap on batch size (auto mode) + # @param batch_size [Integer] batch size threshold (linger mode) + # @param linger_ms [Integer] linger time in ms (linger mode) + def initialize(stub:, metadata:, mode:, max_batch_size: 100, batch_size: 100, linger_ms: 10) + @stub = stub + @metadata = metadata + @mode = mode + @max_batch_size = mode == :auto ? max_batch_size : batch_size + @linger_ms = linger_ms + @queue = Queue.new + @stopped = false + @mutex = Mutex.new + + @thread = Thread.new { run_loop } + @thread.abort_on_exception = true + end + + # Submit a message for batched sending. Blocks until the batch + # containing this message is flushed and the result is available. + # + # @param request [Fila::V1::EnqueueRequest] the enqueue request + # @return [String] message ID on success + # @raise [Fila::QueueNotFoundError] if the queue does not exist + # @raise [Fila::RPCError] for unexpected gRPC failures + def submit(request) + result_queue = Queue.new + item = BatchItem.new(request: request, result_queue: result_queue) + + @mutex.synchronize do + raise Fila::Error, 'batcher is closed' if @stopped + + @queue.push(item) + end + + # Block until the batcher flushes our batch and posts the result. + outcome = result_queue.pop + case outcome + when String then outcome + when Exception then raise outcome + else raise Fila::Error, "unexpected batcher result: #{outcome.inspect}" + end + end + + # Drain pending messages and stop the background thread. + def close + @mutex.synchronize { @stopped = true } + @queue.push(:shutdown) + @thread.join + end + + private + + def run_loop + case @mode + when :auto then run_auto_loop + when :linger then run_linger_loop + end + end + + # Auto mode: block for the first message, then non-blocking drain + # any additional messages that have arrived, flush concurrently. + def run_auto_loop + loop do + first = @queue.pop + break if first == :shutdown + + batch = [first] + drain_nonblocking(batch) + flush_batch(batch) + end + end + + # Linger mode: block for the first message, then wait up to linger_ms + # for more messages or until batch_size is reached. + def run_linger_loop + loop do + first = @queue.pop + break if first == :shutdown + + batch = [first] + deadline = current_time_ms + @linger_ms + + while batch.size < @max_batch_size + remaining_ms = deadline - current_time_ms + break if remaining_ms <= 0 + + begin + item = pop_with_timeout(remaining_ms) + break if item == :shutdown + + batch << item + rescue ThreadError + break + end + end + + flush_batch(batch) + end + end + + def drain_nonblocking(batch) + while batch.size < @max_batch_size + begin + item = @queue.pop(true) # non_block = true + if item == :shutdown + @queue.push(:shutdown) # re-enqueue so the loop sees it + break + end + batch << item + rescue ThreadError + break + end + end + end + + # Flush a batch of items. Uses single Enqueue RPC for 1 message + # (preserves exact error types like QueueNotFoundError), and + # BatchEnqueue RPC for 2+ messages. + def flush_batch(items) + if items.size == 1 + flush_single(items.first) + else + flush_multi(items) + end + end + + def flush_single(item) + resp = @stub.enqueue(item.request, metadata: @metadata) + item.result_queue.push(resp.message_id) + rescue GRPC::NotFound => e + item.result_queue.push(QueueNotFoundError.new("enqueue: #{e.details}")) + rescue GRPC::BadStatus => e + item.result_queue.push(RPCError.new(e.code, e.details)) + rescue StandardError => e + item.result_queue.push(Fila::Error.new(e.message)) + end + + def flush_multi(items) + req = ::Fila::V1::BatchEnqueueRequest.new( + messages: items.map(&:request) + ) + resp = @stub.batch_enqueue(req, metadata: @metadata) + results = resp.results + + items.each_with_index do |item, idx| + result = results[idx] + if result.nil? + item.result_queue.push(Fila::Error.new('no result from server')) + elsif result.result == :success + item.result_queue.push(result.success.message_id) + else + item.result_queue.push(RPCError.new(GRPC::Core::StatusCodes::INTERNAL, result.error)) + end + end + rescue GRPC::BadStatus => e + # Transport-level failure: all messages in this batch get the error. + err = RPCError.new(e.code, e.details) + items.each { |item| item.result_queue.push(err) } + rescue StandardError => e + err = Fila::Error.new(e.message) + items.each { |item| item.result_queue.push(err) } + end + + def current_time_ms + (Process.clock_gettime(Process::CLOCK_MONOTONIC) * 1000).to_i + end + + # Pop from @queue with a timeout in milliseconds. + # Raises ThreadError if nothing is available within the timeout. + def pop_with_timeout(timeout_ms) + deadline = current_time_ms + timeout_ms + loop do + begin + return @queue.pop(true) + rescue ThreadError + raise if current_time_ms >= deadline + + sleep(0.001) # 1ms polling interval + end + end + end + end +end diff --git a/lib/fila/client.rb b/lib/fila/client.rb index 1fdef59..ce4e8a8 100644 --- a/lib/fila/client.rb +++ b/lib/fila/client.rb @@ -8,51 +8,133 @@ require_relative 'proto/fila/v1/service_services_pb' require_relative 'errors' require_relative 'consume_message' +require_relative 'batch_enqueue_result' +require_relative 'batcher' module Fila # Client for the Fila message broker. # # Wraps the hot-path gRPC operations: enqueue, consume, ack, nack. # - # @example Plain-text (no auth) + # @example Plain-text, default auto-batching # client = Fila::Client.new("localhost:5555") # + # @example Batching disabled + # client = Fila::Client.new("localhost:5555", batch_mode: :disabled) + # + # @example Linger-based batching + # client = Fila::Client.new("localhost:5555", + # batch_mode: :linger, linger_ms: 10, batch_size: 50) + # # @example TLS with system trust store # client = Fila::Client.new("localhost:5555", tls: true) # - # @example TLS with custom CA - # client = Fila::Client.new("localhost:5555", ca_cert: File.read("ca.pem")) - # # @example mTLS + API key # client = Fila::Client.new("localhost:5555", # ca_cert: File.read("ca.pem"), # client_cert: File.read("client.pem"), # client_key: File.read("client-key.pem"), # api_key: "fila_abc123") - class Client - def initialize(addr, tls: false, ca_cert: nil, client_cert: nil, client_key: nil, api_key: nil) + class Client # rubocop:disable Metrics/ClassLength + # Valid batch mode values. + BATCH_MODES = %i[auto linger disabled].freeze + + private_constant :BATCH_MODES + + # @param addr [String] server address (host:port) + # @param tls [Boolean] enable TLS with system trust store + # @param ca_cert [String, nil] PEM-encoded CA certificate + # @param client_cert [String, nil] PEM-encoded client certificate (mTLS) + # @param client_key [String, nil] PEM-encoded client key (mTLS) + # @param api_key [String, nil] API key for authentication + # @param batch_mode [Symbol] :auto (default), :linger, or :disabled + # @param max_batch_size [Integer] max batch size for auto mode (default: 100) + # @param batch_size [Integer] batch size for linger mode (default: 100) + # @param linger_ms [Integer] linger time in ms for linger mode (default: 10) + def initialize( # rubocop:disable Metrics/ParameterLists + addr, tls: false, ca_cert: nil, client_cert: nil, client_key: nil, + api_key: nil, batch_mode: :auto, max_batch_size: 100, + batch_size: 100, linger_ms: 10 + ) + validate_batch_mode(batch_mode) @api_key = api_key @credentials = build_credentials(tls: tls, ca_cert: ca_cert, client_cert: client_cert, client_key: client_key) @stub = ::Fila::V1::FilaService::Stub.new(addr, @credentials) + @batcher = start_batcher(batch_mode, max_batch_size, batch_size, linger_ms) end - def close; end + # Drain pending batched messages and disconnect. + def close + @batcher&.close + @batcher = nil + end + # Enqueue a message to a queue. + # + # When batching is enabled (default), the message is submitted to + # the background batcher. At low load each message is sent + # individually; at high load messages cluster into batches. + # + # @param queue [String] target queue name + # @param payload [String] message payload + # @param headers [Hash, nil] optional headers + # @return [String] broker-assigned message ID + # @raise [QueueNotFoundError] if the queue does not exist + # @raise [RPCError] for unexpected gRPC failures def enqueue(queue:, payload:, headers: nil) req = ::Fila::V1::EnqueueRequest.new( queue: queue, headers: headers || {}, payload: payload ) - resp = @stub.enqueue(req, metadata: call_metadata) - resp.message_id - rescue GRPC::NotFound => e - raise QueueNotFoundError, "enqueue: #{e.details}" + + if @batcher + @batcher.submit(req) + else + enqueue_direct(req) + end + end + + # Enqueue a batch of messages in a single RPC call. + # + # Each message is independently validated and processed. A failed + # message does not affect the others. Returns an array of + # BatchEnqueueResult with one result per input message, in order. + # + # This bypasses the background batcher and always uses the + # BatchEnqueue RPC directly. + # + # @param messages [Array] messages to enqueue; each hash has + # keys :queue (String), :payload (String), and optionally + # :headers (Hash) + # @return [Array] + # @raise [RPCError] for transport-level gRPC failures + def batch_enqueue(messages) + proto_messages = messages.map do |m| + ::Fila::V1::EnqueueRequest.new( + queue: m[:queue], + headers: m[:headers] || {}, + payload: m[:payload] + ) + end + + req = ::Fila::V1::BatchEnqueueRequest.new(messages: proto_messages) + resp = @stub.batch_enqueue(req, metadata: call_metadata) + + resp.results.map do |r| + if r.result == :success + BatchEnqueueResult.new(message_id: r.success.message_id) + else + BatchEnqueueResult.new(error: r.error) + end + end rescue GRPC::BadStatus => e raise RPCError.new(e.code, e.details) end # Open a streaming consumer. Yields messages as they arrive. + # Transparently unpacks batched delivery (repeated messages field) + # with fallback to singular message field. # Returns an Enumerator if no block given. def consume(queue:, &block) return enum_for(:consume, queue: queue) unless block @@ -99,13 +181,38 @@ def nack(queue:, msg_id:, error:) private - def consume_with_redirect(queue:, redirected:, &block) # rubocop:disable Metrics/AbcSize + def validate_batch_mode(mode) + return if BATCH_MODES.include?(mode) + + raise ArgumentError, "invalid batch_mode: #{mode.inspect}, must be one of #{BATCH_MODES.inspect}" + end + + def start_batcher(mode, max_batch_size, batch_size, linger_ms) + return nil if mode == :disabled + + Batcher.new( + stub: @stub, + metadata: call_metadata, + mode: mode, + max_batch_size: max_batch_size, + batch_size: batch_size, + linger_ms: linger_ms + ) + end + + def enqueue_direct(req) + resp = @stub.enqueue(req, metadata: call_metadata) + resp.message_id + rescue GRPC::NotFound => e + raise QueueNotFoundError, "enqueue: #{e.details}" + rescue GRPC::BadStatus => e + raise RPCError.new(e.code, e.details) + end + + def consume_with_redirect(queue:, redirected:, &block) stream = @stub.consume(::Fila::V1::ConsumeRequest.new(queue: queue), metadata: call_metadata) stream.each do |resp| - msg = resp.message - next if msg.nil? || msg.id.empty? - - block.call(build_consume_message(msg)) + yield_messages_from_response(resp, &block) end rescue GRPC::Cancelled then nil rescue GRPC::NotFound => e @@ -119,6 +226,25 @@ def consume_with_redirect(queue:, redirected:, &block) # rubocop:disable Metrics raise RPCError.new(e.code, e.details) end + # Unpack messages from a ConsumeResponse. Prefers the repeated + # messages field (batched delivery); falls back to singular message + # field for backward compatibility with older servers. + def yield_messages_from_response(resp, &block) + msgs = resp.messages + if msgs && !msgs.empty? + msgs.each do |msg| + next if msg.nil? || msg.id.empty? + + block.call(build_consume_message(msg)) + end + else + msg = resp.message + return if msg.nil? || msg.id.empty? + + block.call(build_consume_message(msg)) + end + end + def extract_leader_addr(err) err.metadata[LEADER_ADDR_KEY] rescue StandardError diff --git a/lib/fila/proto/fila/v1/service_pb.rb b/lib/fila/proto/fila/v1/service_pb.rb index 7eba33f..6120065 100644 --- a/lib/fila/proto/fila/v1/service_pb.rb +++ b/lib/fila/proto/fila/v1/service_pb.rb @@ -7,7 +7,7 @@ require 'fila/v1/messages_pb' -descriptor_data = "\n\x15\x66ila/v1/service.proto\x12\x07\x66ila.v1\x1a\x16\x66ila/v1/messages.proto\"\x97\x01\n\x0e\x45nqueueRequest\x12\r\n\x05queue\x18\x01 \x01(\t\x12\x35\n\x07headers\x18\x02 \x03(\x0b\x32$.fila.v1.EnqueueRequest.HeadersEntry\x12\x0f\n\x07payload\x18\x03 \x01(\x0c\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"%\n\x0f\x45nqueueResponse\x12\x12\n\nmessage_id\x18\x01 \x01(\t\"\x1f\n\x0e\x43onsumeRequest\x12\r\n\x05queue\x18\x01 \x01(\t\"4\n\x0f\x43onsumeResponse\x12!\n\x07message\x18\x01 \x01(\x0b\x32\x10.fila.v1.Message\"/\n\nAckRequest\x12\r\n\x05queue\x18\x01 \x01(\t\x12\x12\n\nmessage_id\x18\x02 \x01(\t\"\r\n\x0b\x41\x63kResponse\"?\n\x0bNackRequest\x12\r\n\x05queue\x18\x01 \x01(\t\x12\x12\n\nmessage_id\x18\x02 \x01(\t\x12\r\n\x05\x65rror\x18\x03 \x01(\t\"\x0e\n\x0cNackResponse2\xf2\x01\n\x0b\x46ilaService\x12<\n\x07\x45nqueue\x12\x17.fila.v1.EnqueueRequest\x1a\x18.fila.v1.EnqueueResponse\x12>\n\x07\x43onsume\x12\x17.fila.v1.ConsumeRequest\x1a\x18.fila.v1.ConsumeResponse0\x01\x12\x30\n\x03\x41\x63k\x12\x13.fila.v1.AckRequest\x1a\x14.fila.v1.AckResponse\x12\x33\n\x04Nack\x12\x14.fila.v1.NackRequest\x1a\x15.fila.v1.NackResponseb\x06proto3" +descriptor_data = "\n\x15\x66ila/v1/service.proto\x12\x07\x66ila.v1\x1a\x16\x66ila/v1/messages.proto\"\x97\x01\n\x0e\x45nqueueRequest\x12\r\n\x05queue\x18\x01 \x01(\t\x12\x35\n\x07headers\x18\x02 \x03(\x0b\x32$.fila.v1.EnqueueRequest.HeadersEntry\x12\x0f\n\x07payload\x18\x03 \x01(\x0c\x1a.\n\x0cHeadersEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t:\x02\x38\x01\"%\n\x0f\x45nqueueResponse\x12\x12\n\nmessage_id\x18\x01 \x01(\t\"\x1f\n\x0e\x43onsumeRequest\x12\r\n\x05queue\x18\x01 \x01(\t\"X\n\x0f\x43onsumeResponse\x12!\n\x07message\x18\x01 \x01(\x0b\x32\x10.fila.v1.Message\x12\"\n\x08messages\x18\x02 \x03(\x0b\x32\x10.fila.v1.Message\"/\n\nAckRequest\x12\r\n\x05queue\x18\x01 \x01(\t\x12\x12\n\nmessage_id\x18\x02 \x01(\t\"\r\n\x0b\x41\x63kResponse\"?\n\x0bNackRequest\x12\r\n\x05queue\x18\x01 \x01(\t\x12\x12\n\nmessage_id\x18\x02 \x01(\t\x12\r\n\x05\x65rror\x18\x03 \x01(\t\"\x0e\n\x0cNackResponse\"@\n\x13\x42\x61tchEnqueueRequest\x12)\n\x08messages\x18\x01 \x03(\x0b\x32\x17.fila.v1.EnqueueRequest\"D\n\x14\x42\x61tchEnqueueResponse\x12,\n\x07results\x18\x01 \x03(\x0b\x32\x1b.fila.v1.BatchEnqueueResult\"\\\n\x12\x42\x61tchEnqueueResult\x12+\n\x07success\x18\x01 \x01(\x0b\x32\x18.fila.v1.EnqueueResponseH\x00\x12\x0f\n\x05\x65rror\x18\x02 \x01(\tH\x00\x42\x08\n\x06result2\xbf\x02\n\x0b\x46ilaService\x12<\n\x07\x45nqueue\x12\x17.fila.v1.EnqueueRequest\x1a\x18.fila.v1.EnqueueResponse\x12K\n\x0c\x42\x61tchEnqueue\x12\x1c.fila.v1.BatchEnqueueRequest\x1a\x1d.fila.v1.BatchEnqueueResponse\x12>\n\x07\x43onsume\x12\x17.fila.v1.ConsumeRequest\x1a\x18.fila.v1.ConsumeResponse0\x01\x12\x30\n\x03\x41\x63k\x12\x13.fila.v1.AckRequest\x1a\x14.fila.v1.AckResponse\x12\x33\n\x04Nack\x12\x14.fila.v1.NackRequest\x1a\x15.fila.v1.NackResponseb\x06proto3" pool = ::Google::Protobuf::DescriptorPool.generated_pool pool.add_serialized_file(descriptor_data) @@ -22,5 +22,8 @@ module V1 AckResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.AckResponse").msgclass NackRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.NackRequest").msgclass NackResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.NackResponse").msgclass + BatchEnqueueRequest = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.BatchEnqueueRequest").msgclass + BatchEnqueueResponse = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.BatchEnqueueResponse").msgclass + BatchEnqueueResult = ::Google::Protobuf::DescriptorPool.generated_pool.lookup("fila.v1.BatchEnqueueResult").msgclass end end diff --git a/lib/fila/proto/fila/v1/service_services_pb.rb b/lib/fila/proto/fila/v1/service_services_pb.rb index 941aba4..0ec0dcb 100644 --- a/lib/fila/proto/fila/v1/service_services_pb.rb +++ b/lib/fila/proto/fila/v1/service_services_pb.rb @@ -17,6 +17,7 @@ class Service self.service_name = 'fila.v1.FilaService' rpc :Enqueue, ::Fila::V1::EnqueueRequest, ::Fila::V1::EnqueueResponse + rpc :BatchEnqueue, ::Fila::V1::BatchEnqueueRequest, ::Fila::V1::BatchEnqueueResponse rpc :Consume, ::Fila::V1::ConsumeRequest, stream(::Fila::V1::ConsumeResponse) rpc :Ack, ::Fila::V1::AckRequest, ::Fila::V1::AckResponse rpc :Nack, ::Fila::V1::NackRequest, ::Fila::V1::NackResponse diff --git a/lib/fila/version.rb b/lib/fila/version.rb index cce5b33..e7c9e84 100644 --- a/lib/fila/version.rb +++ b/lib/fila/version.rb @@ -1,5 +1,5 @@ # frozen_string_literal: true module Fila - VERSION = '0.2.0' + VERSION = '0.3.0' end diff --git a/proto/fila/v1/service.proto b/proto/fila/v1/service.proto index f14fdd0..fc0f710 100644 --- a/proto/fila/v1/service.proto +++ b/proto/fila/v1/service.proto @@ -6,6 +6,7 @@ import "fila/v1/messages.proto"; // Hot-path RPCs for producers and consumers. service FilaService { rpc Enqueue(EnqueueRequest) returns (EnqueueResponse); + rpc BatchEnqueue(BatchEnqueueRequest) returns (BatchEnqueueResponse); rpc Consume(ConsumeRequest) returns (stream ConsumeResponse); rpc Ack(AckRequest) returns (AckResponse); rpc Nack(NackRequest) returns (NackResponse); @@ -26,7 +27,8 @@ message ConsumeRequest { } message ConsumeResponse { - Message message = 1; + Message message = 1; // Single message (backward compatible, used when batch size is 1) + repeated Message messages = 2; // Batched messages (populated when server sends multiple at once) } message AckRequest { @@ -43,3 +45,18 @@ message NackRequest { } message NackResponse {} + +message BatchEnqueueRequest { + repeated EnqueueRequest messages = 1; +} + +message BatchEnqueueResponse { + repeated BatchEnqueueResult results = 1; +} + +message BatchEnqueueResult { + oneof result { + EnqueueResponse success = 1; + string error = 2; + } +} diff --git a/test/test_batch.rb b/test/test_batch.rb new file mode 100644 index 0000000..5a8778d --- /dev/null +++ b/test/test_batch.rb @@ -0,0 +1,273 @@ +# frozen_string_literal: true + +require 'test_helper' + +return unless FILA_SERVER_AVAILABLE + +class TestBatchEnqueue < Minitest::Test + def setup + @server = TestServerHelper.start + @client = Fila::Client.new(@server[:addr], batch_mode: :disabled) + end + + def teardown + @client&.close + TestServerHelper.stop(@server) if @server + end + + def test_batch_enqueue_multiple_messages + TestServerHelper.create_queue(@server, 'batch-test') + + messages = 5.times.map do |i| + { queue: 'batch-test', payload: "batch-msg-#{i}", headers: { 'index' => i.to_s } } + end + + results = @client.batch_enqueue(messages) + + assert_equal 5, results.size + results.each do |r| + assert r.success?, "expected success but got error: #{r.error}" + refute_empty r.message_id + end + + # Verify all messages are consumable. + received_ids = [] + @client.consume(queue: 'batch-test') do |msg| + received_ids << msg.id + @client.ack(queue: 'batch-test', msg_id: msg.id) + break if received_ids.size >= 5 + end + assert_equal 5, received_ids.size + end + + def test_batch_enqueue_single_message + TestServerHelper.create_queue(@server, 'batch-single') + + results = @client.batch_enqueue([ + { queue: 'batch-single', payload: 'solo' } + ]) + + assert_equal 1, results.size + assert results.first.success? + refute_empty results.first.message_id + end + + def test_batch_enqueue_empty_array + results = @client.batch_enqueue([]) + assert_equal 0, results.size + end + + def test_batch_enqueue_mixed_success_and_failure + TestServerHelper.create_queue(@server, 'batch-mixed') + + messages = [ + { queue: 'batch-mixed', payload: 'good-1' }, + { queue: 'no-such-queue-xyz', payload: 'bad' }, + { queue: 'batch-mixed', payload: 'good-2' } + ] + + results = @client.batch_enqueue(messages) + assert_equal 3, results.size + + assert results[0].success?, "first message should succeed" + refute results[1].success?, "second message should fail (nonexistent queue)" + assert results[1].error, "second message should have error description" + assert results[2].success?, "third message should succeed" + end +end + +class TestBatchEnqueueResult < Minitest::Test + def test_success_result + r = Fila::BatchEnqueueResult.new(message_id: 'abc-123') + assert r.success? + assert_equal 'abc-123', r.message_id + assert_nil r.error + end + + def test_error_result + r = Fila::BatchEnqueueResult.new(error: 'queue not found') + refute r.success? + assert_nil r.message_id + assert_equal 'queue not found', r.error + end +end + +class TestAutoBatching < Minitest::Test + def setup + @server = TestServerHelper.start + # Default batch_mode is :auto + @client = Fila::Client.new(@server[:addr]) + end + + def teardown + @client&.close + TestServerHelper.stop(@server) if @server + end + + def test_auto_batch_enqueue_single + TestServerHelper.create_queue(@server, 'auto-single') + + msg_id = @client.enqueue(queue: 'auto-single', payload: 'auto-msg') + assert msg_id + refute_empty msg_id + + received = false + @client.consume(queue: 'auto-single') do |msg| + assert_equal msg_id, msg.id + assert_equal 'auto-msg', msg.payload + @client.ack(queue: 'auto-single', msg_id: msg.id) + received = true + break + end + assert received + end + + def test_auto_batch_enqueue_concurrent + TestServerHelper.create_queue(@server, 'auto-concurrent') + + # Fire multiple enqueues concurrently to exercise batching. + threads = 10.times.map do |i| + Thread.new do + @client.enqueue(queue: 'auto-concurrent', payload: "msg-#{i}") + end + end + ids = threads.map(&:value) + + assert_equal 10, ids.size + ids.each do |id| + assert id + refute_empty id + end + + # Consume all messages. + received = [] + @client.consume(queue: 'auto-concurrent') do |msg| + received << msg.id + @client.ack(queue: 'auto-concurrent', msg_id: msg.id) + break if received.size >= 10 + end + assert_equal 10, received.size + end + + def test_auto_batch_nonexistent_queue_raises + assert_raises(Fila::QueueNotFoundError) do + @client.enqueue(queue: 'no-such-queue-auto', payload: 'fail') + end + end +end + +class TestLingerBatching < Minitest::Test + def setup + @server = TestServerHelper.start + @client = Fila::Client.new(@server[:addr], batch_mode: :linger, linger_ms: 50, batch_size: 10) + end + + def teardown + @client&.close + TestServerHelper.stop(@server) if @server + end + + def test_linger_batch_enqueue + TestServerHelper.create_queue(@server, 'linger-test') + + msg_id = @client.enqueue(queue: 'linger-test', payload: 'linger-msg') + assert msg_id + refute_empty msg_id + end + + def test_linger_batch_concurrent + TestServerHelper.create_queue(@server, 'linger-concurrent') + + threads = 5.times.map do |i| + Thread.new do + @client.enqueue(queue: 'linger-concurrent', payload: "linger-#{i}") + end + end + ids = threads.map(&:value) + + assert_equal 5, ids.size + ids.each { |id| refute_empty id } + end +end + +class TestDisabledBatching < Minitest::Test + def setup + @server = TestServerHelper.start + @client = Fila::Client.new(@server[:addr], batch_mode: :disabled) + end + + def teardown + @client&.close + TestServerHelper.stop(@server) if @server + end + + def test_disabled_batch_enqueue_direct + TestServerHelper.create_queue(@server, 'disabled-test') + + msg_id = @client.enqueue(queue: 'disabled-test', payload: 'direct-msg') + assert msg_id + refute_empty msg_id + end + + def test_disabled_nonexistent_queue_raises + assert_raises(Fila::QueueNotFoundError) do + @client.enqueue(queue: 'no-such-queue-disabled', payload: 'fail') + end + end +end + +class TestBatchModeValidation < Minitest::Test + def test_invalid_batch_mode_raises + assert_raises(ArgumentError) do + Fila::Client.new('localhost:5555', batch_mode: :invalid) + end + end + + def test_valid_batch_modes_accepted + # These should not raise (but won't connect since server isn't on this port). + # Just verify argument validation passes. + %i[auto linger disabled].each do |mode| + client = Fila::Client.new('localhost:19999', batch_mode: mode) + client.close + end + end +end + +class TestCloseFlush < Minitest::Test + def setup + @server = TestServerHelper.start + end + + def teardown + TestServerHelper.stop(@server) if @server + end + + def test_close_drains_pending_messages + TestServerHelper.create_queue(@server, 'close-drain') + client = Fila::Client.new(@server[:addr]) + + # Enqueue a message, then close immediately. + msg_id = client.enqueue(queue: 'close-drain', payload: 'drain-me') + refute_empty msg_id + + client.close + + # Verify the message was persisted. + verify_client = Fila::Client.new(@server[:addr], batch_mode: :disabled) + received = false + verify_client.consume(queue: 'close-drain') do |msg| + assert_equal msg_id, msg.id + verify_client.ack(queue: 'close-drain', msg_id: msg.id) + received = true + break + end + assert received + verify_client.close + end + + def test_double_close_is_safe + client = Fila::Client.new(@server[:addr]) + client.close + client.close # Should not raise. + end +end diff --git a/test/test_client.rb b/test/test_client.rb index 7d0a01f..f39fe44 100644 --- a/test/test_client.rb +++ b/test/test_client.rb @@ -11,6 +11,7 @@ def setup end def teardown + @client&.close TestServerHelper.stop(@server) if @server end