Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/fila.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
require_relative 'fila/version'
require_relative 'fila/errors'
require_relative 'fila/consume_message'
require_relative 'fila/batch_enqueue_result'
require_relative 'fila/enqueue_result'
require_relative 'fila/batcher'
require_relative 'fila/client'
84 changes: 33 additions & 51 deletions lib/fila/batcher.rb
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# frozen_string_literal: true

module Fila
# Background batcher that collects enqueue requests and flushes them
# in batches via BatchEnqueue RPC. Supports auto (opportunistic) and
# linger (timer-based) modes.
# Background batcher that collects enqueue messages and flushes them
# in batches via the unified Enqueue RPC. Supports auto (opportunistic)
# and linger (timer-based) modes.
#
# @api private
class Batcher
class Batcher # rubocop:disable Metrics/ClassLength
# An item queued for batching, pairing a message with its result slot.
BatchItem = Struct.new(:request, :result_queue, keyword_init: true)
BatchItem = Struct.new(:message, :result_queue, keyword_init: true)

# @param stub [Fila::V1::FilaService::Stub] gRPC stub
# @param metadata [Hash] call metadata (auth headers)
Expand All @@ -33,13 +33,13 @@ def initialize(stub:, metadata:, mode:, max_batch_size: 100, batch_size: 100, li
# Submit a message for batched sending. Blocks until the batch
# containing this message is flushed and the result is available.
#
# @param request [Fila::V1::EnqueueRequest] the enqueue request
# @param message [Fila::V1::EnqueueMessage] the enqueue message
# @return [String] message ID on success
# @raise [Fila::QueueNotFoundError] if the queue does not exist
# @raise [Fila::RPCError] for unexpected gRPC failures
def submit(request)
def submit(message)
result_queue = Queue.new
item = BatchItem.new(request: request, result_queue: result_queue)
item = BatchItem.new(message: message, result_queue: result_queue)

@mutex.synchronize do
raise Fila::Error, 'batcher is closed' if @stopped
Expand Down Expand Up @@ -128,51 +128,35 @@ def drain_nonblocking(batch)
end
end

# Flush a batch of items. Uses single Enqueue RPC for 1 message
# (preserves exact error types like QueueNotFoundError), and
# BatchEnqueue RPC for 2+ messages.
# Flush a batch of items via the unified Enqueue RPC.
def flush_batch(items)
if items.size == 1
flush_single(items.first)
else
flush_multi(items)
end
end
req = ::Fila::V1::EnqueueRequest.new(messages: items.map(&:message))
results = @stub.enqueue(req, metadata: @metadata).results

def flush_single(item)
resp = @stub.enqueue(item.request, metadata: @metadata)
item.result_queue.push(resp.message_id)
rescue GRPC::NotFound => e
item.result_queue.push(QueueNotFoundError.new("enqueue: #{e.details}"))
items.each_with_index do |item, idx|
item.result_queue.push(result_to_outcome(results[idx]))
end
rescue GRPC::BadStatus => e
item.result_queue.push(RPCError.new(e.code, e.details))
broadcast_error(items, RPCError.new(e.code, e.details))
rescue StandardError => e
item.result_queue.push(Fila::Error.new(e.message))
broadcast_error(items, Fila::Error.new(e.message))
end

def flush_multi(items)
req = ::Fila::V1::BatchEnqueueRequest.new(
messages: items.map(&:request)
)
resp = @stub.batch_enqueue(req, metadata: @metadata)
results = resp.results
# Convert a single proto EnqueueResult into a String (message_id) or Exception.
def result_to_outcome(result)
return Fila::Error.new('no result from server') if result.nil?
return result.message_id if result.result == :message_id

items.each_with_index do |item, idx|
result = results[idx]
if result.nil?
item.result_queue.push(Fila::Error.new('no result from server'))
elsif result.result == :success
item.result_queue.push(result.success.message_id)
else
item.result_queue.push(RPCError.new(GRPC::Core::StatusCodes::INTERNAL, result.error))
end
err = result.error
case err.code
when :ENQUEUE_ERROR_CODE_QUEUE_NOT_FOUND
QueueNotFoundError.new("enqueue: #{err.message}")
else
RPCError.new(GRPC::Core::StatusCodes::INTERNAL, err.message)
end
rescue GRPC::BadStatus => e
# Transport-level failure: all messages in this batch get the error.
err = RPCError.new(e.code, e.details)
items.each { |item| item.result_queue.push(err) }
rescue StandardError => e
err = Fila::Error.new(e.message)
end

def broadcast_error(items, err)
items.each { |item| item.result_queue.push(err) }
end

Expand All @@ -185,13 +169,11 @@ def current_time_ms
def pop_with_timeout(timeout_ms)
deadline = current_time_ms + timeout_ms
loop do
begin
return @queue.pop(true)
rescue ThreadError
raise if current_time_ms >= deadline
return @queue.pop(true)
rescue ThreadError
raise if current_time_ms >= deadline

sleep(0.001) # 1ms polling interval
end
sleep(0.001) # 1ms polling interval
end
end
end
Expand Down
112 changes: 67 additions & 45 deletions lib/fila/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
require_relative 'proto/fila/v1/service_services_pb'
require_relative 'errors'
require_relative 'consume_message'
require_relative 'batch_enqueue_result'
require_relative 'enqueue_result'
require_relative 'batcher'

module Fila
Expand Down Expand Up @@ -69,7 +69,7 @@ def close
@batcher = nil
end

# Enqueue a message to a queue.
# Enqueue a single message to a queue.
#
# When batching is enabled (default), the message is submitted to
# the background batcher. At low load each message is sent
Expand All @@ -82,59 +82,57 @@ def close
# @raise [QueueNotFoundError] if the queue does not exist
# @raise [RPCError] for unexpected gRPC failures
def enqueue(queue:, payload:, headers: nil)
req = ::Fila::V1::EnqueueRequest.new(
msg = ::Fila::V1::EnqueueMessage.new(
queue: queue,
headers: headers || {},
payload: payload
)

if @batcher
@batcher.submit(req)
@batcher.submit(msg)
else
enqueue_direct(req)
enqueue_single(msg)
end
end

# Enqueue a batch of messages in a single RPC call.
# Enqueue multiple messages in a single RPC call.
#
# Each message is independently validated and processed. A failed
# message does not affect the others. Returns an array of
# BatchEnqueueResult with one result per input message, in order.
# EnqueueResult with one result per input message, in order.
#
# This bypasses the background batcher and always uses the
# BatchEnqueue RPC directly.
# Enqueue RPC directly.
#
# @param messages [Array<Hash>] messages to enqueue; each hash has
# keys :queue (String), :payload (String), and optionally
# :headers (Hash<String,String>)
# @return [Array<BatchEnqueueResult>]
# @return [Array<EnqueueResult>]
# @raise [RPCError] for transport-level gRPC failures
def batch_enqueue(messages)
def enqueue_many(messages)
proto_messages = messages.map do |m|
::Fila::V1::EnqueueRequest.new(
::Fila::V1::EnqueueMessage.new(
queue: m[:queue],
headers: m[:headers] || {},
payload: m[:payload]
)
end

req = ::Fila::V1::BatchEnqueueRequest.new(messages: proto_messages)
resp = @stub.batch_enqueue(req, metadata: call_metadata)
req = ::Fila::V1::EnqueueRequest.new(messages: proto_messages)
resp = @stub.enqueue(req, metadata: call_metadata)

resp.results.map do |r|
if r.result == :success
BatchEnqueueResult.new(message_id: r.success.message_id)
if r.result == :message_id
EnqueueResult.new(message_id: r.message_id)
else
BatchEnqueueResult.new(error: r.error)
EnqueueResult.new(error: r.error.message)
end
end
rescue GRPC::BadStatus => e
raise RPCError.new(e.code, e.details)
end

# Open a streaming consumer. Yields messages as they arrive.
# Transparently unpacks batched delivery (repeated messages field)
# with fallback to singular message field.
# Returns an Enumerator if no block given.
def consume(queue:, &block)
return enum_for(:consume, queue: queue) unless block
Expand All @@ -149,11 +147,21 @@ def consume(queue:, &block)
# @raise [MessageNotFoundError] if the message does not exist
# @raise [RPCError] for unexpected gRPC failures
def ack(queue:, msg_id:)
req = ::Fila::V1::AckRequest.new(queue: queue, message_id: msg_id)
@stub.ack(req, metadata: call_metadata)
nil
rescue GRPC::NotFound => e
raise MessageNotFoundError, "ack: #{e.details}"
msg = ::Fila::V1::AckMessage.new(queue: queue, message_id: msg_id)
req = ::Fila::V1::AckRequest.new(messages: [msg])
resp = @stub.ack(req, metadata: call_metadata)

result = resp.results.first
raise RPCError.new(GRPC::Core::StatusCodes::INTERNAL, 'no result from server') if result.nil?
return if result.result == :success

err = result.error
case err.code
when :ACK_ERROR_CODE_MESSAGE_NOT_FOUND
raise MessageNotFoundError, "ack: #{err.message}"
else
raise RPCError.new(GRPC::Core::StatusCodes::INTERNAL, "ack: #{err.message}")
end
rescue GRPC::BadStatus => e
raise RPCError.new(e.code, e.details)
end
Expand All @@ -166,11 +174,21 @@ def ack(queue:, msg_id:)
# @raise [MessageNotFoundError] if the message does not exist
# @raise [RPCError] for unexpected gRPC failures
def nack(queue:, msg_id:, error:)
req = ::Fila::V1::NackRequest.new(queue: queue, message_id: msg_id, error: error)
@stub.nack(req, metadata: call_metadata)
nil
rescue GRPC::NotFound => e
raise MessageNotFoundError, "nack: #{e.details}"
msg = ::Fila::V1::NackMessage.new(queue: queue, message_id: msg_id, error: error)
req = ::Fila::V1::NackRequest.new(messages: [msg])
resp = @stub.nack(req, metadata: call_metadata)

result = resp.results.first
raise RPCError.new(GRPC::Core::StatusCodes::INTERNAL, 'no result from server') if result.nil?
return if result.result == :success

err = result.error
case err.code
when :NACK_ERROR_CODE_MESSAGE_NOT_FOUND
raise MessageNotFoundError, "nack: #{err.message}"
else
raise RPCError.new(GRPC::Core::StatusCodes::INTERNAL, "nack: #{err.message}")
end
rescue GRPC::BadStatus => e
raise RPCError.new(e.code, e.details)
end
Expand Down Expand Up @@ -200,11 +218,25 @@ def start_batcher(mode, max_batch_size, batch_size, linger_ms)
)
end

def enqueue_direct(req)
# Send a single message via the unified Enqueue RPC.
def enqueue_single(msg)
req = ::Fila::V1::EnqueueRequest.new(messages: [msg])
resp = @stub.enqueue(req, metadata: call_metadata)
resp.message_id
rescue GRPC::NotFound => e
raise QueueNotFoundError, "enqueue: #{e.details}"

result = resp.results.first
raise RPCError.new(GRPC::Core::StatusCodes::INTERNAL, 'no result from server') if result.nil?

if result.result == :message_id
result.message_id
else
err = result.error
case err.code
when :ENQUEUE_ERROR_CODE_QUEUE_NOT_FOUND
raise QueueNotFoundError, "enqueue: #{err.message}"
else
raise RPCError.new(GRPC::Core::StatusCodes::INTERNAL, "enqueue: #{err.message}")
end
end
rescue GRPC::BadStatus => e
raise RPCError.new(e.code, e.details)
end
Expand All @@ -226,20 +258,10 @@ def consume_with_redirect(queue:, redirected:, &block)
raise RPCError.new(e.code, e.details)
end

# Unpack messages from a ConsumeResponse. Prefers the repeated
# messages field (batched delivery); falls back to singular message
# field for backward compatibility with older servers.
# Unpack messages from a ConsumeResponse.
def yield_messages_from_response(resp, &block)
msgs = resp.messages
if msgs && !msgs.empty?
msgs.each do |msg|
next if msg.nil? || msg.id.empty?

block.call(build_consume_message(msg))
end
else
msg = resp.message
return if msg.nil? || msg.id.empty?
resp.messages.each do |msg|
next if msg.nil? || msg.id.empty?

block.call(build_consume_message(msg))
end
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
# frozen_string_literal: true

module Fila
# Result of a single message within a batch enqueue call.
# Result of a single message within an enqueue_many call.
#
# Each message in a batch is independently validated and processed.
# Each message is independently validated and processed.
# A failed message does not affect the others.
#
# @example
# results = client.batch_enqueue(messages)
# results = client.enqueue_many(messages)
# results.each do |r|
# if r.success?
# puts "Enqueued: #{r.message_id}"
# else
# puts "Failed: #{r.error}"
# end
# end
class BatchEnqueueResult
class EnqueueResult
# @return [String, nil] broker-assigned message ID on success
attr_reader :message_id

Expand Down
Loading
Loading