Skip to content
107 changes: 39 additions & 68 deletions lib/fila/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,34 +30,14 @@ module Fila
# client_key: File.read("client-key.pem"),
# api_key: "fila_abc123")
class Client
# Connect to a Fila broker at the given address.
#
# @param addr [String] broker address in "host:port" format (e.g., "localhost:5555")
# @param tls [Boolean] enable TLS using the OS system trust store (default: false)
# @param ca_cert [String, nil] PEM-encoded CA certificate for TLS verification (implies tls: true)
# @param client_cert [String, nil] PEM-encoded client certificate for mTLS
# @param client_key [String, nil] PEM-encoded client private key for mTLS
# @param api_key [String, nil] API key for Bearer token authentication
def initialize(addr, tls: false, ca_cert: nil, client_cert: nil, client_key: nil, api_key: nil)
@api_key = api_key
credentials = build_credentials(tls: tls, ca_cert: ca_cert, client_cert: client_cert, client_key: client_key)
@stub = ::Fila::V1::FilaService::Stub.new(addr, credentials)
@credentials = build_credentials(tls: tls, ca_cert: ca_cert, client_cert: client_cert, client_key: client_key)
@stub = ::Fila::V1::FilaService::Stub.new(addr, @credentials)
end

# Close the underlying gRPC channel.
def close
# grpc-ruby doesn't expose a direct channel close on stubs;
# the channel is garbage-collected. This is a no-op for API symmetry.
end
def close; end

# Enqueue a message to the specified queue.
#
# @param queue [String] target queue name
# @param headers [Hash<String, String>, nil] optional message headers
# @param payload [String] message payload bytes
# @return [String] broker-assigned message ID (UUIDv7)
# @raise [QueueNotFoundError] if the queue does not exist
# @raise [RPCError] for unexpected gRPC failures
def enqueue(queue:, payload:, headers: nil)
req = ::Fila::V1::EnqueueRequest.new(
queue: queue,
Expand All @@ -72,36 +52,12 @@ def enqueue(queue:, payload:, headers: nil)
raise RPCError.new(e.code, e.details)
end

# Open a streaming consumer on the specified queue.
#
# Yields messages as they become available. Nil message frames (keepalive
# signals) are skipped automatically. Nacked messages are redelivered on
# the same stream.
#
# If no block is given, returns an Enumerator.
#
# @param queue [String] queue to consume from
# @yield [ConsumeMessage] each message received from the broker
# @return [Enumerator<ConsumeMessage>] if no block given
# @raise [QueueNotFoundError] if the queue does not exist
# @raise [RPCError] for unexpected gRPC failures
# Open a streaming consumer. Yields messages as they arrive.
# Returns an Enumerator if no block given.
def consume(queue:, &block)
return enum_for(:consume, queue: queue) unless block

req = ::Fila::V1::ConsumeRequest.new(queue: queue)
stream = @stub.consume(req, metadata: call_metadata)
stream.each do |resp|
msg = resp.message
next if msg.nil? || msg.id.empty?

block.call(build_consume_message(msg))
end
rescue GRPC::Cancelled
# Stream cancelled — normal when consumer breaks out of the loop.
rescue GRPC::NotFound => e
raise QueueNotFoundError, "consume: #{e.details}"
rescue GRPC::BadStatus => e
raise RPCError.new(e.code, e.details)
consume_with_redirect(queue: queue, redirected: false, &block)
end

# Acknowledge a successfully processed message.
Expand Down Expand Up @@ -137,15 +93,38 @@ def nack(queue:, msg_id:, error:)
raise RPCError.new(e.code, e.details)
end

LEADER_ADDR_KEY = 'x-fila-leader-addr'

private_constant :LEADER_ADDR_KEY

private

# Build gRPC channel credentials from the provided TLS options.
#
# When +ca_cert+ is provided, it is used for server verification (implies TLS).
# When +tls+ is true without +ca_cert+, the OS system trust store is used.
# When neither is set and no client certs are given, plaintext is used.
#
# @return [Symbol, GRPC::Core::ChannelCredentials] credentials object
def consume_with_redirect(queue:, redirected:, &block) # rubocop:disable Metrics/AbcSize
stream = @stub.consume(::Fila::V1::ConsumeRequest.new(queue: queue), metadata: call_metadata)
stream.each do |resp|
msg = resp.message
next if msg.nil? || msg.id.empty?

block.call(build_consume_message(msg))
end
rescue GRPC::Cancelled then nil
rescue GRPC::NotFound => e
raise QueueNotFoundError, "consume: #{e.details}"
rescue GRPC::Unavailable => e
raise RPCError.new(e.code, e.details) if (leader_addr = extract_leader_addr(e)).nil? || redirected

@stub = ::Fila::V1::FilaService::Stub.new(leader_addr, @credentials)
consume_with_redirect(queue: queue, redirected: true, &block)
rescue GRPC::BadStatus => e
raise RPCError.new(e.code, e.details)
end

def extract_leader_addr(err)
err.metadata[LEADER_ADDR_KEY]
rescue StandardError
nil
end

def build_credentials(tls:, ca_cert:, client_cert:, client_key:)
tls_enabled = tls || ca_cert
validate_tls_options(tls_enabled, client_cert, client_key)
Expand All @@ -161,20 +140,12 @@ def validate_tls_options(tls_enabled, client_cert, client_key)
end

def build_channel_credentials(ca_cert, client_cert, client_key)
has_client_certs = client_cert && client_key

if ca_cert
GRPC::Core::ChannelCredentials.new(ca_cert, client_key, client_cert)
elsif has_client_certs
GRPC::Core::ChannelCredentials.new(nil, client_key, client_cert)
else
GRPC::Core::ChannelCredentials.new
if ca_cert then GRPC::Core::ChannelCredentials.new(ca_cert, client_key, client_cert)
elsif client_cert && client_key then GRPC::Core::ChannelCredentials.new(nil, client_key, client_cert)
else GRPC::Core::ChannelCredentials.new
end
end

# Return metadata hash for gRPC calls, including Bearer token when api_key is set.
#
# @return [Hash] metadata hash (may be empty)
def call_metadata
return {} unless @api_key

Expand Down
Loading