diff --git a/lib/fila/client.rb b/lib/fila/client.rb index dbcdc73..1fdef59 100644 --- a/lib/fila/client.rb +++ b/lib/fila/client.rb @@ -30,34 +30,14 @@ module Fila # client_key: File.read("client-key.pem"), # api_key: "fila_abc123") class Client - # Connect to a Fila broker at the given address. - # - # @param addr [String] broker address in "host:port" format (e.g., "localhost:5555") - # @param tls [Boolean] enable TLS using the OS system trust store (default: false) - # @param ca_cert [String, nil] PEM-encoded CA certificate for TLS verification (implies tls: true) - # @param client_cert [String, nil] PEM-encoded client certificate for mTLS - # @param client_key [String, nil] PEM-encoded client private key for mTLS - # @param api_key [String, nil] API key for Bearer token authentication def initialize(addr, tls: false, ca_cert: nil, client_cert: nil, client_key: nil, api_key: nil) @api_key = api_key - credentials = build_credentials(tls: tls, ca_cert: ca_cert, client_cert: client_cert, client_key: client_key) - @stub = ::Fila::V1::FilaService::Stub.new(addr, credentials) + @credentials = build_credentials(tls: tls, ca_cert: ca_cert, client_cert: client_cert, client_key: client_key) + @stub = ::Fila::V1::FilaService::Stub.new(addr, @credentials) end - # Close the underlying gRPC channel. - def close - # grpc-ruby doesn't expose a direct channel close on stubs; - # the channel is garbage-collected. This is a no-op for API symmetry. - end + def close; end - # Enqueue a message to the specified queue. - # - # @param queue [String] target queue name - # @param headers [Hash, nil] optional message headers - # @param payload [String] message payload bytes - # @return [String] broker-assigned message ID (UUIDv7) - # @raise [QueueNotFoundError] if the queue does not exist - # @raise [RPCError] for unexpected gRPC failures def enqueue(queue:, payload:, headers: nil) req = ::Fila::V1::EnqueueRequest.new( queue: queue, @@ -72,36 +52,12 @@ def enqueue(queue:, payload:, headers: nil) raise RPCError.new(e.code, e.details) end - # Open a streaming consumer on the specified queue. - # - # Yields messages as they become available. Nil message frames (keepalive - # signals) are skipped automatically. Nacked messages are redelivered on - # the same stream. - # - # If no block is given, returns an Enumerator. - # - # @param queue [String] queue to consume from - # @yield [ConsumeMessage] each message received from the broker - # @return [Enumerator] if no block given - # @raise [QueueNotFoundError] if the queue does not exist - # @raise [RPCError] for unexpected gRPC failures + # Open a streaming consumer. Yields messages as they arrive. + # Returns an Enumerator if no block given. def consume(queue:, &block) return enum_for(:consume, queue: queue) unless block - req = ::Fila::V1::ConsumeRequest.new(queue: queue) - stream = @stub.consume(req, metadata: call_metadata) - stream.each do |resp| - msg = resp.message - next if msg.nil? || msg.id.empty? - - block.call(build_consume_message(msg)) - end - rescue GRPC::Cancelled - # Stream cancelled — normal when consumer breaks out of the loop. - rescue GRPC::NotFound => e - raise QueueNotFoundError, "consume: #{e.details}" - rescue GRPC::BadStatus => e - raise RPCError.new(e.code, e.details) + consume_with_redirect(queue: queue, redirected: false, &block) end # Acknowledge a successfully processed message. @@ -137,15 +93,38 @@ def nack(queue:, msg_id:, error:) raise RPCError.new(e.code, e.details) end + LEADER_ADDR_KEY = 'x-fila-leader-addr' + + private_constant :LEADER_ADDR_KEY + private - # Build gRPC channel credentials from the provided TLS options. - # - # When +ca_cert+ is provided, it is used for server verification (implies TLS). - # When +tls+ is true without +ca_cert+, the OS system trust store is used. - # When neither is set and no client certs are given, plaintext is used. - # - # @return [Symbol, GRPC::Core::ChannelCredentials] credentials object + def consume_with_redirect(queue:, redirected:, &block) # rubocop:disable Metrics/AbcSize + stream = @stub.consume(::Fila::V1::ConsumeRequest.new(queue: queue), metadata: call_metadata) + stream.each do |resp| + msg = resp.message + next if msg.nil? || msg.id.empty? + + block.call(build_consume_message(msg)) + end + rescue GRPC::Cancelled then nil + rescue GRPC::NotFound => e + raise QueueNotFoundError, "consume: #{e.details}" + rescue GRPC::Unavailable => e + raise RPCError.new(e.code, e.details) if (leader_addr = extract_leader_addr(e)).nil? || redirected + + @stub = ::Fila::V1::FilaService::Stub.new(leader_addr, @credentials) + consume_with_redirect(queue: queue, redirected: true, &block) + rescue GRPC::BadStatus => e + raise RPCError.new(e.code, e.details) + end + + def extract_leader_addr(err) + err.metadata[LEADER_ADDR_KEY] + rescue StandardError + nil + end + def build_credentials(tls:, ca_cert:, client_cert:, client_key:) tls_enabled = tls || ca_cert validate_tls_options(tls_enabled, client_cert, client_key) @@ -161,20 +140,12 @@ def validate_tls_options(tls_enabled, client_cert, client_key) end def build_channel_credentials(ca_cert, client_cert, client_key) - has_client_certs = client_cert && client_key - - if ca_cert - GRPC::Core::ChannelCredentials.new(ca_cert, client_key, client_cert) - elsif has_client_certs - GRPC::Core::ChannelCredentials.new(nil, client_key, client_cert) - else - GRPC::Core::ChannelCredentials.new + if ca_cert then GRPC::Core::ChannelCredentials.new(ca_cert, client_key, client_cert) + elsif client_cert && client_key then GRPC::Core::ChannelCredentials.new(nil, client_key, client_cert) + else GRPC::Core::ChannelCredentials.new end end - # Return metadata hash for gRPC calls, including Bearer token when api_key is set. - # - # @return [Hash] metadata hash (may be empty) def call_metadata return {} unless @api_key