From a06a119df355c54032b9e7f19123787451c168c5 Mon Sep 17 00:00:00 2001 From: Lucas Vieira Date: Sun, 22 Mar 2026 13:01:19 -0300 Subject: [PATCH 1/8] feat: transparent leader hint reconnect on consume --- lib/fila/client.rb | 58 +++++++++++++++++++++++++++++++++------------- 1 file changed, 42 insertions(+), 16 deletions(-) diff --git a/lib/fila/client.rb b/lib/fila/client.rb index dbcdc73..f6d35b9 100644 --- a/lib/fila/client.rb +++ b/lib/fila/client.rb @@ -40,8 +40,8 @@ class Client # @param api_key [String, nil] API key for Bearer token authentication def initialize(addr, tls: false, ca_cert: nil, client_cert: nil, client_key: nil, api_key: nil) @api_key = api_key - credentials = build_credentials(tls: tls, ca_cert: ca_cert, client_cert: client_cert, client_key: client_key) - @stub = ::Fila::V1::FilaService::Stub.new(addr, credentials) + @credentials = build_credentials(tls: tls, ca_cert: ca_cert, client_cert: client_cert, client_key: client_key) + @stub = ::Fila::V1::FilaService::Stub.new(addr, @credentials) end # Close the underlying gRPC channel. @@ -88,20 +88,7 @@ def enqueue(queue:, payload:, headers: nil) def consume(queue:, &block) return enum_for(:consume, queue: queue) unless block - req = ::Fila::V1::ConsumeRequest.new(queue: queue) - stream = @stub.consume(req, metadata: call_metadata) - stream.each do |resp| - msg = resp.message - next if msg.nil? || msg.id.empty? - - block.call(build_consume_message(msg)) - end - rescue GRPC::Cancelled - # Stream cancelled — normal when consumer breaks out of the loop. - rescue GRPC::NotFound => e - raise QueueNotFoundError, "consume: #{e.details}" - rescue GRPC::BadStatus => e - raise RPCError.new(e.code, e.details) + consume_with_redirect(queue: queue, redirected: false, &block) end # Acknowledge a successfully processed message. @@ -139,6 +126,45 @@ def nack(queue:, msg_id:, error:) private + LEADER_ADDR_KEY = 'x-fila-leader-addr' + + # Execute consume against a stub, following a leader hint redirect once. + # + # @param queue [String] queue to consume from + # @param redirected [Boolean] whether this is already a redirect attempt + def consume_with_redirect(queue:, redirected:, &block) + req = ::Fila::V1::ConsumeRequest.new(queue: queue) + stream = @stub.consume(req, metadata: call_metadata) + stream.each do |resp| + msg = resp.message + next if msg.nil? || msg.id.empty? + + block.call(build_consume_message(msg)) + end + rescue GRPC::Cancelled + # Stream cancelled — normal when consumer breaks out of the loop. + rescue GRPC::NotFound => e + raise QueueNotFoundError, "consume: #{e.details}" + rescue GRPC::Unavailable => e + leader_addr = extract_leader_addr(e) + raise RPCError.new(e.code, e.details) if leader_addr.nil? || redirected + + @stub = ::Fila::V1::FilaService::Stub.new(leader_addr, @credentials) + consume_with_redirect(queue: queue, redirected: true, &block) + rescue GRPC::BadStatus => e + raise RPCError.new(e.code, e.details) + end + + # Extract the leader address from an UNAVAILABLE error's trailing metadata. + # + # @param err [GRPC::Unavailable] the gRPC error + # @return [String, nil] leader address or nil if not present + def extract_leader_addr(err) + err.metadata[LEADER_ADDR_KEY] + rescue StandardError + nil + end + # Build gRPC channel credentials from the provided TLS options. # # When +ca_cert+ is provided, it is used for server verification (implies TLS). From 2658c531b65281eb5b83690ea937dd3771852158 Mon Sep 17 00:00:00 2001 From: Lucas Vieira Date: Mon, 23 Mar 2026 22:55:51 -0300 Subject: [PATCH 2/8] fix: rubocop offenses in client.rb --- lib/fila/client.rb | 25 ++++++++----------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/lib/fila/client.rb b/lib/fila/client.rb index f6d35b9..e5a9fc2 100644 --- a/lib/fila/client.rb +++ b/lib/fila/client.rb @@ -124,30 +124,25 @@ def nack(queue:, msg_id:, error:) raise RPCError.new(e.code, e.details) end - private - LEADER_ADDR_KEY = 'x-fila-leader-addr' - # Execute consume against a stub, following a leader hint redirect once. - # - # @param queue [String] queue to consume from - # @param redirected [Boolean] whether this is already a redirect attempt - def consume_with_redirect(queue:, redirected:, &block) - req = ::Fila::V1::ConsumeRequest.new(queue: queue) - stream = @stub.consume(req, metadata: call_metadata) + private_constant :LEADER_ADDR_KEY + + private + + def consume_with_redirect(queue:, redirected:, &block) # rubocop:disable Metrics/AbcSize + stream = @stub.consume(::Fila::V1::ConsumeRequest.new(queue: queue), metadata: call_metadata) stream.each do |resp| msg = resp.message next if msg.nil? || msg.id.empty? block.call(build_consume_message(msg)) end - rescue GRPC::Cancelled - # Stream cancelled — normal when consumer breaks out of the loop. + rescue GRPC::Cancelled then nil rescue GRPC::NotFound => e raise QueueNotFoundError, "consume: #{e.details}" rescue GRPC::Unavailable => e - leader_addr = extract_leader_addr(e) - raise RPCError.new(e.code, e.details) if leader_addr.nil? || redirected + raise RPCError.new(e.code, e.details) if (leader_addr = extract_leader_addr(e)).nil? || redirected @stub = ::Fila::V1::FilaService::Stub.new(leader_addr, @credentials) consume_with_redirect(queue: queue, redirected: true, &block) @@ -155,10 +150,6 @@ def consume_with_redirect(queue:, redirected:, &block) raise RPCError.new(e.code, e.details) end - # Extract the leader address from an UNAVAILABLE error's trailing metadata. - # - # @param err [GRPC::Unavailable] the gRPC error - # @return [String, nil] leader address or nil if not present def extract_leader_addr(err) err.metadata[LEADER_ADDR_KEY] rescue StandardError From 42afd7c2d944698e5bcb80b567c31123d26f50d9 Mon Sep 17 00:00:00 2001 From: Lucas Vieira Date: Mon, 23 Mar 2026 22:56:55 -0300 Subject: [PATCH 3/8] fix: reduce class length to pass rubocop --- lib/fila/client.rb | 7 ------- 1 file changed, 7 deletions(-) diff --git a/lib/fila/client.rb b/lib/fila/client.rb index e5a9fc2..91e2fd2 100644 --- a/lib/fila/client.rb +++ b/lib/fila/client.rb @@ -156,13 +156,6 @@ def extract_leader_addr(err) nil end - # Build gRPC channel credentials from the provided TLS options. - # - # When +ca_cert+ is provided, it is used for server verification (implies TLS). - # When +tls+ is true without +ca_cert+, the OS system trust store is used. - # When neither is set and no client certs are given, plaintext is used. - # - # @return [Symbol, GRPC::Core::ChannelCredentials] credentials object def build_credentials(tls:, ca_cert:, client_cert:, client_key:) tls_enabled = tls || ca_cert validate_tls_options(tls_enabled, client_cert, client_key) From 55cafabd2207cb8fd9487ed67b6e04cd7eacde80 Mon Sep 17 00:00:00 2001 From: Lucas Vieira Date: Mon, 23 Mar 2026 22:58:26 -0300 Subject: [PATCH 4/8] fix: trim class length to pass rubocop 100-line limit --- lib/fila/client.rb | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/lib/fila/client.rb b/lib/fila/client.rb index 91e2fd2..3476bff 100644 --- a/lib/fila/client.rb +++ b/lib/fila/client.rb @@ -44,11 +44,7 @@ def initialize(addr, tls: false, ca_cert: nil, client_cert: nil, client_key: nil @stub = ::Fila::V1::FilaService::Stub.new(addr, @credentials) end - # Close the underlying gRPC channel. - def close - # grpc-ruby doesn't expose a direct channel close on stubs; - # the channel is garbage-collected. This is a no-op for API symmetry. - end + def close; end # Enqueue a message to the specified queue. # @@ -182,9 +178,6 @@ def build_channel_credentials(ca_cert, client_cert, client_key) end end - # Return metadata hash for gRPC calls, including Bearer token when api_key is set. - # - # @return [Hash] metadata hash (may be empty) def call_metadata return {} unless @api_key From 11aeaf2e05fdfd7fdabc76c77828ab038e2e29af Mon Sep 17 00:00:00 2001 From: Lucas Vieira Date: Mon, 23 Mar 2026 23:00:12 -0300 Subject: [PATCH 5/8] fix: trim more doc comments to pass rubocop class length --- lib/fila/client.rb | 8 -------- 1 file changed, 8 deletions(-) diff --git a/lib/fila/client.rb b/lib/fila/client.rb index 3476bff..80924ed 100644 --- a/lib/fila/client.rb +++ b/lib/fila/client.rb @@ -30,14 +30,6 @@ module Fila # client_key: File.read("client-key.pem"), # api_key: "fila_abc123") class Client - # Connect to a Fila broker at the given address. - # - # @param addr [String] broker address in "host:port" format (e.g., "localhost:5555") - # @param tls [Boolean] enable TLS using the OS system trust store (default: false) - # @param ca_cert [String, nil] PEM-encoded CA certificate for TLS verification (implies tls: true) - # @param client_cert [String, nil] PEM-encoded client certificate for mTLS - # @param client_key [String, nil] PEM-encoded client private key for mTLS - # @param api_key [String, nil] API key for Bearer token authentication def initialize(addr, tls: false, ca_cert: nil, client_cert: nil, client_key: nil, api_key: nil) @api_key = api_key @credentials = build_credentials(tls: tls, ca_cert: ca_cert, client_cert: client_cert, client_key: client_key) From ae180a340834e3bf90a99b2e0583cd0e6c26985b Mon Sep 17 00:00:00 2001 From: Lucas Vieira Date: Mon, 23 Mar 2026 23:01:41 -0300 Subject: [PATCH 6/8] fix: trim consume doc comment to pass rubocop class length --- lib/fila/client.rb | 15 ++------------- 1 file changed, 2 insertions(+), 13 deletions(-) diff --git a/lib/fila/client.rb b/lib/fila/client.rb index 80924ed..07d1b7e 100644 --- a/lib/fila/client.rb +++ b/lib/fila/client.rb @@ -60,19 +60,8 @@ def enqueue(queue:, payload:, headers: nil) raise RPCError.new(e.code, e.details) end - # Open a streaming consumer on the specified queue. - # - # Yields messages as they become available. Nil message frames (keepalive - # signals) are skipped automatically. Nacked messages are redelivered on - # the same stream. - # - # If no block is given, returns an Enumerator. - # - # @param queue [String] queue to consume from - # @yield [ConsumeMessage] each message received from the broker - # @return [Enumerator] if no block given - # @raise [QueueNotFoundError] if the queue does not exist - # @raise [RPCError] for unexpected gRPC failures + # Open a streaming consumer. Yields messages as they arrive. + # Returns an Enumerator if no block given. def consume(queue:, &block) return enum_for(:consume, queue: queue) unless block From d77b7772a1d9cedf870ca3772bc5a2f0a6860708 Mon Sep 17 00:00:00 2001 From: Lucas Vieira Date: Mon, 23 Mar 2026 23:05:30 -0300 Subject: [PATCH 7/8] fix: trim enqueue doc comments to pass rubocop class length --- lib/fila/client.rb | 8 -------- 1 file changed, 8 deletions(-) diff --git a/lib/fila/client.rb b/lib/fila/client.rb index 07d1b7e..86c51d9 100644 --- a/lib/fila/client.rb +++ b/lib/fila/client.rb @@ -38,14 +38,6 @@ def initialize(addr, tls: false, ca_cert: nil, client_cert: nil, client_key: nil def close; end - # Enqueue a message to the specified queue. - # - # @param queue [String] target queue name - # @param headers [Hash, nil] optional message headers - # @param payload [String] message payload bytes - # @return [String] broker-assigned message ID (UUIDv7) - # @raise [QueueNotFoundError] if the queue does not exist - # @raise [RPCError] for unexpected gRPC failures def enqueue(queue:, payload:, headers: nil) req = ::Fila::V1::EnqueueRequest.new( queue: queue, From 7601fb138cdb6597d1e54e8bf5fcfe7c802f39f8 Mon Sep 17 00:00:00 2001 From: Lucas Vieira Date: Mon, 23 Mar 2026 23:07:16 -0300 Subject: [PATCH 8/8] fix: condense build_channel_credentials to pass rubocop class length --- lib/fila/client.rb | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/lib/fila/client.rb b/lib/fila/client.rb index 86c51d9..1fdef59 100644 --- a/lib/fila/client.rb +++ b/lib/fila/client.rb @@ -140,14 +140,9 @@ def validate_tls_options(tls_enabled, client_cert, client_key) end def build_channel_credentials(ca_cert, client_cert, client_key) - has_client_certs = client_cert && client_key - - if ca_cert - GRPC::Core::ChannelCredentials.new(ca_cert, client_key, client_cert) - elsif has_client_certs - GRPC::Core::ChannelCredentials.new(nil, client_key, client_cert) - else - GRPC::Core::ChannelCredentials.new + if ca_cert then GRPC::Core::ChannelCredentials.new(ca_cert, client_key, client_cert) + elsif client_cert && client_key then GRPC::Core::ChannelCredentials.new(nil, client_key, client_cert) + else GRPC::Core::ChannelCredentials.new end end