From db7e491058e6ef56cfd1e19be6cc6e03ac04f539 Mon Sep 17 00:00:00 2001 From: Lucas Vieira Date: Thu, 26 Mar 2026 09:20:36 -0300 Subject: [PATCH 1/3] feat: replace grpc transport with fibp binary protocol rewrite the java sdk transport layer to use fibp (fila binary protocol) over raw tcp instead of grpc/protobuf. - add FibpConnection: tcp socket with length-prefixed framing, handshake, correlation-id multiplexing via ConcurrentHashMap + CompletableFuture, heartbeat scheduler, and optional tls via SSLSocket - add FibpCodec: wire encoding/decoding for enqueue, consume push batch, ack, nack, and error frames (exact format from fila-core/src/fibp/wire.rs) - rewrite FilaClient to use FibpConnection; remove all grpc/protobuf deps - rewrite Batcher to use FibpConnection; groups messages by queue into separate fibp frames (one queue name per enqueue frame) - rewrite ConsumerHandle without grpc Context dependency - update RpcException to define its own Code enum (no grpc Status.Code) - remove ApiKeyInterceptor (auth now via fibp OP_AUTH frame) - remove proto/ directory and grpc/protobuf gradle deps; bump to v0.3.0 - add FibpAdminClient in test package for createQueue (hand-rolled protobuf encoding of CreateQueueRequest, avoids adding a test protobuf dependency) - update TestServer to use FibpAdminClient instead of grpc admin stub - update BuilderTest: remove tests that require a server connection (fibp connects eagerly unlike grpc lazy channels); add address-parsing unit tests - add @EnabledIf("serverAvailable") guard to FilaClientTest - update TlsAuthClientTest to use RpcException.Code instead of grpc Status.Code --- README.md | 35 +- build.gradle | 38 +- proto/fila/v1/admin.proto | 197 ------ proto/fila/v1/messages.proto | 28 - proto/fila/v1/service.proto | 142 ---- .../dev/faisca/fila/ApiKeyInterceptor.java | 36 - src/main/java/dev/faisca/fila/Batcher.java | 109 ++-- .../java/dev/faisca/fila/ConsumerHandle.java | 13 +- src/main/java/dev/faisca/fila/FibpCodec.java | 325 +++++++++ .../java/dev/faisca/fila/FibpConnection.java | 375 +++++++++++ src/main/java/dev/faisca/fila/FilaClient.java | 615 +++++++----------- .../java/dev/faisca/fila/RpcException.java | 23 +- .../java/dev/faisca/fila/BuilderTest.java | 125 ++-- .../java/dev/faisca/fila/FibpAdminClient.java | 138 ++++ .../java/dev/faisca/fila/FilaClientTest.java | 14 +- src/test/java/dev/faisca/fila/TestServer.java | 62 +- .../dev/faisca/fila/TlsAuthClientTest.java | 10 +- 17 files changed, 1266 insertions(+), 1019 deletions(-) delete mode 100644 proto/fila/v1/admin.proto delete mode 100644 proto/fila/v1/messages.proto delete mode 100644 proto/fila/v1/service.proto delete mode 100644 src/main/java/dev/faisca/fila/ApiKeyInterceptor.java create mode 100644 src/main/java/dev/faisca/fila/FibpCodec.java create mode 100644 src/main/java/dev/faisca/fila/FibpConnection.java create mode 100644 src/test/java/dev/faisca/fila/FibpAdminClient.java diff --git a/README.md b/README.md index 5844151..443bde9 100644 --- a/README.md +++ b/README.md @@ -2,12 +2,15 @@ Java client SDK for the [Fila](https://github.com/faiscadev/fila) message broker. +Uses the **FIBP** (Fila Binary Protocol) — a length-prefixed binary protocol over raw TCP or TLS. +No gRPC or protobuf dependencies. + ## Installation ### Gradle ```groovy -implementation 'dev.faisca:fila-client:0.1.0' +implementation 'dev.faisca:fila-client:0.3.0' ``` ### Maven @@ -16,7 +19,7 @@ implementation 'dev.faisca:fila-client:0.1.0' dev.faisca fila-client - 0.1.0 + 0.3.0 ``` @@ -101,7 +104,7 @@ try (FilaClient client = FilaClient.builder("localhost:5555") } ``` -The key is sent as a `Bearer` token in the `authorization` metadata header on every RPC. +The key is sent in an AUTH frame during the FIBP handshake before any other requests. TLS and API key auth can be combined: @@ -134,7 +137,8 @@ FilaClient client = FilaClient.builder("localhost:5555").build(); | `withTls()` | Enable TLS using JVM's default trust store (cacerts) | | `withTlsCaCert(byte[] caCertPem)` | CA certificate for TLS server verification (implies `withTls()`) | | `withTlsClientCert(byte[] certPem, byte[] keyPem)` | Client cert + key for mTLS | -| `withApiKey(String apiKey)` | API key sent as `Bearer` token on every RPC | +| `withApiKey(String apiKey)` | API key sent in AUTH frame during FIBP handshake | +| `withBatchMode(BatchMode batchMode)` | Configure enqueue batching (default: `BatchMode.auto()`) | All builder methods are optional. When none are set, the client connects over plaintext without authentication (backward compatible). @@ -144,9 +148,13 @@ Enqueue a message. Returns the broker-assigned message ID (UUIDv7). Throws `QueueNotFoundException` if the queue does not exist. +#### `enqueueMany(List messages) -> List` + +Enqueue multiple messages in a single FIBP frame. Each message is independently processed. All messages must target the same queue. + #### `consume(String queue, Consumer handler) -> ConsumerHandle` -Start consuming messages from a queue. Messages are delivered to the handler on a background thread. Nacked messages are redelivered on the same stream. +Start consuming messages from a queue. Messages are delivered to the handler on the FIBP reader thread. Nacked messages are redelivered on the same stream. Call `handle.cancel()` to stop consuming. @@ -181,7 +189,9 @@ All exceptions extend `FilaException` (unchecked): - `QueueNotFoundException` — queue does not exist - `MessageNotFoundException` — message does not exist (or already acked) -- `RpcException` — unexpected gRPC failure (includes status code via `getCode()`) +- `RpcException` — transport-level failure (includes status code via `getCode()`) + +`RpcException.Code` values: `INTERNAL`, `UNAUTHENTICATED`, `PERMISSION_DENIED`, `UNAVAILABLE`, `CANCELLED`, `UNKNOWN`. ```java try { @@ -191,6 +201,19 @@ try { } ``` +## Transport: FIBP + +This SDK uses FIBP (Fila Binary Protocol) — a lightweight binary framing protocol over TCP. Each +frame is: + +``` +[4-byte big-endian length][flags:u8 | op:u8 | corr_id:u32 | payload] +``` + +All requests are multiplexed over a single TCP connection using correlation IDs. A background +reader thread dispatches responses. Authentication uses an AUTH op frame sent during connection +setup. Heartbeat frames keep the connection alive. + ## License AGPLv3 — see [LICENSE](LICENSE). diff --git a/build.gradle b/build.gradle index 6b3e635..bc50d8a 100644 --- a/build.gradle +++ b/build.gradle @@ -3,12 +3,11 @@ plugins { id 'maven-publish' id 'signing' id 'io.github.gradle-nexus.publish-plugin' version '2.0.0' - id 'com.google.protobuf' version '0.9.4' id 'com.diffplug.spotless' version '7.0.2' } group = 'dev.faisca' -version = '0.2.0' +version = '0.3.0' java { sourceCompatibility = JavaVersion.VERSION_17 @@ -21,47 +20,12 @@ repositories { mavenCentral() } -def grpcVersion = '1.71.0' -def protobufVersion = '4.29.3' - dependencies { - api "io.grpc:grpc-stub:${grpcVersion}" - api "io.grpc:grpc-protobuf:${grpcVersion}" - - implementation "io.grpc:grpc-netty-shaded:${grpcVersion}" - implementation "com.google.protobuf:protobuf-java:${protobufVersion}" - - compileOnly 'org.apache.tomcat:annotations-api:6.0.53' - testImplementation platform('org.junit:junit-bom:5.11.4') testImplementation 'org.junit.jupiter:junit-jupiter' testRuntimeOnly 'org.junit.platform:junit-platform-launcher' } -protobuf { - protoc { - artifact = "com.google.protobuf:protoc:${protobufVersion}" - } - plugins { - grpc { - artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}" - } - } - generateProtoTasks { - all()*.plugins { - grpc {} - } - } -} - -sourceSets { - main { - proto { - srcDir 'proto' - } - } -} - test { useJUnitPlatform() testLogging { diff --git a/proto/fila/v1/admin.proto b/proto/fila/v1/admin.proto deleted file mode 100644 index 886e58d..0000000 --- a/proto/fila/v1/admin.proto +++ /dev/null @@ -1,197 +0,0 @@ -syntax = "proto3"; -package fila.v1; - -// Admin RPCs for operators and the CLI. -service FilaAdmin { - rpc CreateQueue(CreateQueueRequest) returns (CreateQueueResponse); - rpc DeleteQueue(DeleteQueueRequest) returns (DeleteQueueResponse); - rpc SetConfig(SetConfigRequest) returns (SetConfigResponse); - rpc GetConfig(GetConfigRequest) returns (GetConfigResponse); - rpc ListConfig(ListConfigRequest) returns (ListConfigResponse); - rpc GetStats(GetStatsRequest) returns (GetStatsResponse); - rpc Redrive(RedriveRequest) returns (RedriveResponse); - rpc ListQueues(ListQueuesRequest) returns (ListQueuesResponse); - - // API key management. CreateApiKey bypasses auth (bootstrap); others require a valid key. - rpc CreateApiKey(CreateApiKeyRequest) returns (CreateApiKeyResponse); - rpc RevokeApiKey(RevokeApiKeyRequest) returns (RevokeApiKeyResponse); - rpc ListApiKeys(ListApiKeysRequest) returns (ListApiKeysResponse); - - // Per-key ACL management. - rpc SetAcl(SetAclRequest) returns (SetAclResponse); - rpc GetAcl(GetAclRequest) returns (GetAclResponse); -} - -message CreateQueueRequest { - string name = 1; - QueueConfig config = 2; -} - -message QueueConfig { - string on_enqueue_script = 1; - string on_failure_script = 2; - uint64 visibility_timeout_ms = 3; -} - -message CreateQueueResponse { - string queue_id = 1; -} - -message DeleteQueueRequest { - string queue = 1; -} - -message DeleteQueueResponse {} - -message SetConfigRequest { - string key = 1; - string value = 2; -} - -message SetConfigResponse {} - -message GetConfigRequest { - string key = 1; -} - -message GetConfigResponse { - string value = 1; -} - -message ConfigEntry { - string key = 1; - string value = 2; -} - -message ListConfigRequest { - string prefix = 1; -} - -message ListConfigResponse { - repeated ConfigEntry entries = 1; - uint32 total_count = 2; -} - -message GetStatsRequest { - string queue = 1; -} - -message PerFairnessKeyStats { - string key = 1; - uint64 pending_count = 2; - int64 current_deficit = 3; - uint32 weight = 4; -} - -message PerThrottleKeyStats { - string key = 1; - double tokens = 2; - double rate_per_second = 3; - double burst = 4; -} - -message GetStatsResponse { - uint64 depth = 1; - uint64 in_flight = 2; - uint64 active_fairness_keys = 3; - uint32 active_consumers = 4; - uint32 quantum = 5; - repeated PerFairnessKeyStats per_key_stats = 6; - repeated PerThrottleKeyStats per_throttle_stats = 7; - // Cluster fields (0 when not in cluster mode). - uint64 leader_node_id = 8; - uint32 replication_count = 9; -} - -message RedriveRequest { - string dlq_queue = 1; - uint64 count = 2; -} - -message RedriveResponse { - uint64 redriven = 1; -} - -message ListQueuesRequest {} - -message QueueInfo { - string name = 1; - uint64 depth = 2; - uint64 in_flight = 3; - uint32 active_consumers = 4; - uint64 leader_node_id = 5; -} - -message ListQueuesResponse { - repeated QueueInfo queues = 1; - uint32 cluster_node_count = 2; -} - -// --- API Key Management --- - -message CreateApiKeyRequest { - /// Human-readable label for the key. - string name = 1; - /// Optional Unix timestamp (milliseconds) after which the key expires. - /// 0 means no expiration. - uint64 expires_at_ms = 2; - /// When true, the key bypasses all ACL checks (superadmin). - bool is_superadmin = 3; -} - -message CreateApiKeyResponse { - /// Opaque key ID for management operations (revoke, list, set-acl). - string key_id = 1; - /// Plaintext API key. Returned once — store it securely. - string key = 2; - /// Whether this key has superadmin privileges. - bool is_superadmin = 3; -} - -message RevokeApiKeyRequest { - string key_id = 1; -} - -message RevokeApiKeyResponse {} - -message ListApiKeysRequest {} - -message ApiKeyInfo { - string key_id = 1; - string name = 2; - uint64 created_at_ms = 3; - /// 0 means no expiration. - uint64 expires_at_ms = 4; - bool is_superadmin = 5; -} - -message ListApiKeysResponse { - repeated ApiKeyInfo keys = 1; -} - -// --- ACL Management --- - -/// A single permission grant: kind (produce/consume/admin) + queue pattern. -message AclPermission { - /// One of: "produce", "consume", "admin". - string kind = 1; - /// Queue name or wildcard ("*" or "orders.*"). - string pattern = 2; -} - -message SetAclRequest { - string key_id = 1; - repeated AclPermission permissions = 2; -} - -message SetAclResponse {} - -message GetAclRequest { - string key_id = 1; -} - -message GetAclResponse { - string key_id = 1; - repeated AclPermission permissions = 2; - bool is_superadmin = 3; -} diff --git a/proto/fila/v1/messages.proto b/proto/fila/v1/messages.proto deleted file mode 100644 index a0709cf..0000000 --- a/proto/fila/v1/messages.proto +++ /dev/null @@ -1,28 +0,0 @@ -syntax = "proto3"; -package fila.v1; - -import "google/protobuf/timestamp.proto"; - -// Core message envelope persisted in the broker. -message Message { - string id = 1; - map headers = 2; - bytes payload = 3; - MessageMetadata metadata = 4; - MessageTimestamps timestamps = 5; -} - -// Broker-assigned scheduling metadata. -message MessageMetadata { - string fairness_key = 1; - uint32 weight = 2; - repeated string throttle_keys = 3; - uint32 attempt_count = 4; - string queue_id = 5; -} - -// Lifecycle timestamps attached to every message. -message MessageTimestamps { - google.protobuf.Timestamp enqueued_at = 1; - google.protobuf.Timestamp leased_at = 2; -} diff --git a/proto/fila/v1/service.proto b/proto/fila/v1/service.proto deleted file mode 100644 index 7d1db79..0000000 --- a/proto/fila/v1/service.proto +++ /dev/null @@ -1,142 +0,0 @@ -syntax = "proto3"; -package fila.v1; - -import "fila/v1/messages.proto"; - -// Hot-path RPCs for producers and consumers. -service FilaService { - rpc Enqueue(EnqueueRequest) returns (EnqueueResponse); - rpc StreamEnqueue(stream StreamEnqueueRequest) returns (stream StreamEnqueueResponse); - rpc Consume(ConsumeRequest) returns (stream ConsumeResponse); - rpc Ack(AckRequest) returns (AckResponse); - rpc Nack(NackRequest) returns (NackResponse); -} - -// Individual message to enqueue. -message EnqueueMessage { - string queue = 1; - map headers = 2; - bytes payload = 3; -} - -// Enqueue one or more messages. -message EnqueueRequest { - repeated EnqueueMessage messages = 1; -} - -// Per-message enqueue result. -message EnqueueResult { - oneof result { - string message_id = 1; - EnqueueError error = 2; - } -} - -// Typed enqueue error with structured error code. -message EnqueueError { - EnqueueErrorCode code = 1; - string message = 2; -} - -enum EnqueueErrorCode { - ENQUEUE_ERROR_CODE_UNSPECIFIED = 0; - ENQUEUE_ERROR_CODE_QUEUE_NOT_FOUND = 1; - ENQUEUE_ERROR_CODE_STORAGE = 2; - ENQUEUE_ERROR_CODE_LUA = 3; - ENQUEUE_ERROR_CODE_PERMISSION_DENIED = 4; -} - -// One result per input message. -message EnqueueResponse { - repeated EnqueueResult results = 1; -} - -message ConsumeRequest { - string queue = 1; -} - -message ConsumeResponse { - repeated Message messages = 1; -} - -// Individual ack item. -message AckMessage { - string queue = 1; - string message_id = 2; -} - -message AckRequest { - repeated AckMessage messages = 1; -} - -message AckResult { - oneof result { - AckSuccess success = 1; - AckError error = 2; - } -} - -message AckSuccess {} - -message AckError { - AckErrorCode code = 1; - string message = 2; -} - -enum AckErrorCode { - ACK_ERROR_CODE_UNSPECIFIED = 0; - ACK_ERROR_CODE_MESSAGE_NOT_FOUND = 1; - ACK_ERROR_CODE_STORAGE = 2; - ACK_ERROR_CODE_PERMISSION_DENIED = 3; -} - -message AckResponse { - repeated AckResult results = 1; -} - -// Individual nack item. -message NackMessage { - string queue = 1; - string message_id = 2; - string error = 3; -} - -message NackRequest { - repeated NackMessage messages = 1; -} - -message NackResult { - oneof result { - NackSuccess success = 1; - NackError error = 2; - } -} - -message NackSuccess {} - -message NackError { - NackErrorCode code = 1; - string message = 2; -} - -enum NackErrorCode { - NACK_ERROR_CODE_UNSPECIFIED = 0; - NACK_ERROR_CODE_MESSAGE_NOT_FOUND = 1; - NACK_ERROR_CODE_STORAGE = 2; - NACK_ERROR_CODE_PERMISSION_DENIED = 3; -} - -message NackResponse { - repeated NackResult results = 1; -} - -// Stream enqueue — per-write batch with sequence tracking. -message StreamEnqueueRequest { - repeated EnqueueMessage messages = 1; - uint64 sequence_number = 2; -} - -message StreamEnqueueResponse { - uint64 sequence_number = 1; - repeated EnqueueResult results = 2; -} diff --git a/src/main/java/dev/faisca/fila/ApiKeyInterceptor.java b/src/main/java/dev/faisca/fila/ApiKeyInterceptor.java deleted file mode 100644 index e7ea461..0000000 --- a/src/main/java/dev/faisca/fila/ApiKeyInterceptor.java +++ /dev/null @@ -1,36 +0,0 @@ -package dev.faisca.fila; - -import io.grpc.CallOptions; -import io.grpc.Channel; -import io.grpc.ClientCall; -import io.grpc.ClientInterceptor; -import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; -import io.grpc.Metadata; -import io.grpc.MethodDescriptor; - -/** - * gRPC client interceptor that attaches a {@code Bearer} API key to the {@code authorization} - * metadata header on every outgoing RPC. - */ -final class ApiKeyInterceptor implements ClientInterceptor { - private static final Metadata.Key AUTH_KEY = - Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER); - - private final String headerValue; - - ApiKeyInterceptor(String apiKey) { - this.headerValue = "Bearer " + apiKey; - } - - @Override - public ClientCall interceptCall( - MethodDescriptor method, CallOptions callOptions, Channel next) { - return new SimpleForwardingClientCall<>(next.newCall(method, callOptions)) { - @Override - public void start(Listener responseListener, Metadata headers) { - headers.put(AUTH_KEY, headerValue); - super.start(responseListener, headers); - } - }; - } -} diff --git a/src/main/java/dev/faisca/fila/Batcher.java b/src/main/java/dev/faisca/fila/Batcher.java index 0694d37..6642b67 100644 --- a/src/main/java/dev/faisca/fila/Batcher.java +++ b/src/main/java/dev/faisca/fila/Batcher.java @@ -1,10 +1,9 @@ package dev.faisca.fila; -import fila.v1.FilaServiceGrpc; -import fila.v1.Service; -import io.grpc.StatusRuntimeException; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -15,16 +14,18 @@ import java.util.concurrent.atomic.AtomicBoolean; /** - * Background batcher that coalesces individual enqueue calls into multi-message RPCs. + * Background batcher that coalesces individual enqueue calls into multi-message FIBP frames. * *

Supports two modes: AUTO (opportunistic, Nagle-style) and LINGER (timer-based). The batcher - * runs on a dedicated daemon thread and flushes RPCs on an executor pool. Uses the unified Enqueue - * RPC with repeated messages for all batch sizes. + * runs on a dedicated daemon thread and flushes frames on an executor pool. + * + *

FIBP enqueue frames carry one queue name per frame, so messages targeting different queues are + * split into separate frames. */ final class Batcher { private final LinkedBlockingQueue queue = new LinkedBlockingQueue<>(); private final AtomicBoolean running = new AtomicBoolean(true); - private final FilaServiceGrpc.FilaServiceBlockingStub stub; + private final FibpConnection conn; private final BatchMode mode; private final Thread batcherThread; private final ExecutorService flushExecutor; @@ -40,8 +41,8 @@ static final class BatchItem { } } - Batcher(FilaServiceGrpc.FilaServiceBlockingStub stub, BatchMode mode) { - this.stub = stub; + Batcher(FibpConnection conn, BatchMode mode) { + this.conn = conn; this.mode = mode; this.flushExecutor = Executors.newCachedThreadPool(r -> newDaemon(r, "fila-batch-flush")); this.scheduler = @@ -141,7 +142,6 @@ private void runLinger() { while (running.get()) { try { if (buffer.isEmpty()) { - // Block for first message. BatchItem item = queue.take(); buffer.add(item); @@ -150,24 +150,14 @@ private void runLinger() { buffer.clear(); flushExecutor.submit(() -> flushBatch(toFlush)); } else { - // Start linger timer. - final List timerBuffer = buffer; lingerTimer = scheduler.schedule( - () -> { - // Signal the batcher thread to flush by adding a poison pill. - // The actual flush happens in the main loop. - batcherThread.interrupt(); - }, - lingerMs, - TimeUnit.MILLISECONDS); + () -> batcherThread.interrupt(), lingerMs, TimeUnit.MILLISECONDS); } } else { - // Buffer has items -- wait for more or timer expiry. BatchItem item = queue.poll(lingerMs, TimeUnit.MILLISECONDS); if (item != null) { buffer.add(item); - // Drain any additional available items. queue.drainTo(buffer, batchSize - buffer.size()); } @@ -182,7 +172,6 @@ private void runLinger() { } } } catch (InterruptedException e) { - // Timer or shutdown interrupt -- flush what we have. if (!buffer.isEmpty()) { if (lingerTimer != null) { lingerTimer.cancel(false); @@ -200,65 +189,71 @@ private void runLinger() { } } - /** Flush a batch of messages via the unified Enqueue RPC. */ + /** + * Flush a batch of messages via FIBP enqueue frames. Messages targeting different queues are sent + * in separate frames (one FIBP frame per queue). + */ private void flushBatch(List items) { if (items.isEmpty()) { return; } - Service.EnqueueRequest.Builder reqBuilder = Service.EnqueueRequest.newBuilder(); + // Group by queue — LinkedHashMap preserves insertion order for deterministic frame ordering. + Map> byQueue = new LinkedHashMap<>(); for (BatchItem item : items) { - reqBuilder.addMessages( - Service.EnqueueMessage.newBuilder() - .setQueue(item.message.getQueue()) - .putAllHeaders(item.message.getHeaders()) - .setPayload(com.google.protobuf.ByteString.copyFrom(item.message.getPayload())) - .build()); + byQueue.computeIfAbsent(item.message.getQueue(), k -> new ArrayList<>()).add(item); + } + + for (Map.Entry> entry : byQueue.entrySet()) { + flushQueueBatch(entry.getValue()); } + } + + private void flushQueueBatch(List items) { + List messages = new ArrayList<>(items.size()); + for (BatchItem item : items) { + messages.add(item.message); + } + + byte[] payload = FibpCodec.encodeEnqueue(messages); + CompletableFuture respFuture = conn.sendRequest(FibpConnection.OP_ENQUEUE, payload); try { - Service.EnqueueResponse resp = stub.enqueue(reqBuilder.build()); - List results = resp.getResultsList(); + byte[] respPayload = respFuture.get(30, TimeUnit.SECONDS); + List results = FibpCodec.decodeEnqueueResponse(respPayload); for (int i = 0; i < items.size(); i++) { BatchItem item = items.get(i); if (i < results.size()) { - Service.EnqueueResult result = results.get(i); - switch (result.getResultCase()) { - case MESSAGE_ID: - item.future.complete(result.getMessageId()); - break; - case ERROR: - item.future.completeExceptionally(mapEnqueueResultError(result.getError())); - break; - default: - item.future.completeExceptionally( - new RpcException(io.grpc.Status.Code.INTERNAL, "no result from server")); - break; + EnqueueResult result = results.get(i); + if (result.isSuccess()) { + item.future.complete(result.getMessageId()); + } else { + item.future.completeExceptionally(new QueueNotFoundException(result.getError())); } } else { item.future.completeExceptionally( - new RpcException( - io.grpc.Status.Code.INTERNAL, - "server returned fewer results than messages sent")); + new RpcException(RpcException.Code.INTERNAL, "server returned fewer results")); } } - } catch (StatusRuntimeException e) { - FilaException mapped = FilaClient.mapEnqueueError(e); + } catch (java.util.concurrent.ExecutionException e) { + FilaException mapped = mapException(e.getCause()); + for (BatchItem item : items) { + item.future.completeExceptionally(mapped); + } + } catch (Exception e) { + FilaException mapped = mapException(e); for (BatchItem item : items) { item.future.completeExceptionally(mapped); } } } - private static FilaException mapEnqueueResultError(Service.EnqueueError error) { - return switch (error.getCode()) { - case ENQUEUE_ERROR_CODE_QUEUE_NOT_FOUND -> - new QueueNotFoundException("enqueue: " + error.getMessage()); - case ENQUEUE_ERROR_CODE_PERMISSION_DENIED -> - new RpcException(io.grpc.Status.Code.PERMISSION_DENIED, error.getMessage()); - default -> new RpcException(io.grpc.Status.Code.INTERNAL, error.getMessage()); - }; + private static FilaException mapException(Throwable t) { + if (t instanceof FilaException fe) { + return fe; + } + return new RpcException(RpcException.Code.INTERNAL, t.getMessage()); } private static Thread newDaemon(Runnable r, String name) { diff --git a/src/main/java/dev/faisca/fila/ConsumerHandle.java b/src/main/java/dev/faisca/fila/ConsumerHandle.java index b81ee83..7b7629a 100644 --- a/src/main/java/dev/faisca/fila/ConsumerHandle.java +++ b/src/main/java/dev/faisca/fila/ConsumerHandle.java @@ -1,20 +1,23 @@ package dev.faisca.fila; -import io.grpc.Context; +import java.util.concurrent.atomic.AtomicBoolean; /** Handle for a running consume stream. Call {@link #cancel()} to stop consuming. */ public final class ConsumerHandle { - private final Context.CancellableContext context; + private final AtomicBoolean cancelled; private final Thread thread; + private final Runnable onCancel; - ConsumerHandle(Context.CancellableContext context, Thread thread) { - this.context = context; + ConsumerHandle(AtomicBoolean cancelled, Thread thread, Runnable onCancel) { + this.cancelled = cancelled; this.thread = thread; + this.onCancel = onCancel; } /** Cancel the consume stream and wait for the consumer thread to finish. */ public void cancel() { - context.cancel(null); + cancelled.set(true); + onCancel.run(); try { thread.join(5000); } catch (InterruptedException e) { diff --git a/src/main/java/dev/faisca/fila/FibpCodec.java b/src/main/java/dev/faisca/fila/FibpCodec.java new file mode 100644 index 0000000..deb4ec2 --- /dev/null +++ b/src/main/java/dev/faisca/fila/FibpCodec.java @@ -0,0 +1,325 @@ +package dev.faisca.fila; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Encodes and decodes FIBP wire messages. + * + *

All integers are big-endian. String lengths are encoded as u16BE unless noted otherwise. + * + *

Wire formats (from fila-core/src/fibp/wire.rs): + * + *

+ * Enqueue request:
+ *   queue_len:u16 | queue:utf8 | msg_count:u16 | messages...
+ *   Each message: header_count:u8 | (key:str16 val:str16)* | payload_len:u32 | payload
+ *
+ * Enqueue response:
+ *   count:u16 | results...
+ *   ok=1: 0x01 | msg_id:str16
+ *   ok=0: 0x00 | err_code:u16 | err_msg:str16
+ *
+ * Consume request:
+ *   queue:str16 | initial_credits:u32
+ *
+ * Consume push (server → client):
+ *   count:u16 | messages...
+ *   Each: msg_id:str16 | fairness_key:str16 | attempt_count:u32 |
+ *         header_count:u8 | (key:str16 val:str16)* | payload_len:u32 | payload
+ *
+ * Ack request:
+ *   count:u16 | items: (queue:str16 msg_id:str16)*
+ *
+ * Nack request:
+ *   count:u16 | items: (queue:str16 msg_id:str16 error:str16)*
+ *
+ * Ack/Nack response:
+ *   count:u16 | results: (0x01 | 0x00 err_code:u16 err_msg:str16)*
+ * 
+ */ +final class FibpCodec { + + // Enqueue error codes + static final int ENQUEUE_ERR_QUEUE_NOT_FOUND = 1; + + // Ack/Nack error codes + static final int ACK_NACK_ERR_MESSAGE_NOT_FOUND = 1; + + private FibpCodec() {} + + // ── Enqueue ─────────────────────────────────────────────────────────────── + + /** + * Encode an enqueue request. All messages must target the same queue (the first message's queue + * name is used). + */ + static byte[] encodeEnqueue(List messages) { + if (messages.isEmpty()) { + throw new IllegalArgumentException("messages must not be empty"); + } + String queue = messages.get(0).getQueue(); + try (ByteArrayOutputStream buf = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(buf)) { + writeStr16(dos, queue); + dos.writeShort(messages.size()); + for (EnqueueMessage msg : messages) { + dos.writeByte(msg.getHeaders().size()); + for (Map.Entry entry : msg.getHeaders().entrySet()) { + writeStr16(dos, entry.getKey()); + writeStr16(dos, entry.getValue()); + } + dos.writeInt(msg.getPayload().length); + dos.write(msg.getPayload()); + } + dos.flush(); + return buf.toByteArray(); + } catch (IOException e) { + throw new FilaException("encode enqueue failed", e); + } + } + + /** + * Decode an enqueue response. + * + *
+   * count:u16 | (0x01 msg_id:str16 | 0x00 err_code:u16 err_msg:str16)*
+   * 
+ */ + static List decodeEnqueueResponse(byte[] payload) { + int pos = 0; + int count = readU16(payload, pos); + pos += 2; + List results = new ArrayList<>(count); + for (int i = 0; i < count; i++) { + int tag = payload[pos++] & 0xFF; + if (tag == 1) { + int idLen = readU16(payload, pos); + pos += 2; + String msgId = new String(payload, pos, idLen, StandardCharsets.UTF_8); + pos += idLen; + results.add(EnqueueResult.success(msgId)); + } else { + int errCode = readU16(payload, pos); + pos += 2; + int errLen = readU16(payload, pos); + pos += 2; + String errMsg = new String(payload, pos, errLen, StandardCharsets.UTF_8); + pos += errLen; + results.add(EnqueueResult.error(errCode + ":" + errMsg)); + } + } + return results; + } + + // ── Consume ─────────────────────────────────────────────────────────────── + + /** + * Encode a consume request. + * + *
+   * queue:str16 | initial_credits:u32
+   * 
+ */ + static byte[] encodeConsume(String queue, int initialCredits) { + try (ByteArrayOutputStream buf = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(buf)) { + writeStr16(dos, queue); + dos.writeInt(initialCredits); + dos.flush(); + return buf.toByteArray(); + } catch (IOException e) { + throw new FilaException("encode consume failed", e); + } + } + + /** + * Decode a batch of server-pushed consume messages. + * + *
+   * count:u16 | (msg_id:str16 fairness_key:str16 attempt_count:u32
+   *              header_count:u8 (key:str16 val:str16)* payload_len:u32 payload)*
+   * 
+ * + * @param queue the queue name (not in the wire format; supplied by the caller) + */ + static List decodePushBatch(byte[] payload, String queue) { + int pos = 0; + int count = readU16(payload, pos); + pos += 2; + List messages = new ArrayList<>(count); + for (int i = 0; i < count; i++) { + int idLen = readU16(payload, pos); + pos += 2; + String msgId = new String(payload, pos, idLen, StandardCharsets.UTF_8); + pos += idLen; + + int fkLen = readU16(payload, pos); + pos += 2; + String fairnessKey = new String(payload, pos, fkLen, StandardCharsets.UTF_8); + pos += fkLen; + + int attemptCount = readU32(payload, pos); + pos += 4; + + int headerCount = payload[pos++] & 0xFF; + Map headers = new HashMap<>(headerCount * 2); + for (int h = 0; h < headerCount; h++) { + int kLen = readU16(payload, pos); + pos += 2; + String key = new String(payload, pos, kLen, StandardCharsets.UTF_8); + pos += kLen; + int vLen = readU16(payload, pos); + pos += 2; + String val = new String(payload, pos, vLen, StandardCharsets.UTF_8); + pos += vLen; + headers.put(key, val); + } + + int dataLen = readU32(payload, pos); + pos += 4; + byte[] data = new byte[dataLen]; + System.arraycopy(payload, pos, data, 0, dataLen); + pos += dataLen; + + messages.add(new ConsumeMessage(msgId, headers, data, fairnessKey, attemptCount, queue)); + } + return messages; + } + + // ── Ack ─────────────────────────────────────────────────────────────────── + + /** + * Encode an ack request. + * + *
+   * count:u16 | (queue:str16 msg_id:str16)*
+   * 
+ */ + static byte[] encodeAck(String queue, String msgId) { + try (ByteArrayOutputStream buf = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(buf)) { + dos.writeShort(1); + writeStr16(dos, queue); + writeStr16(dos, msgId); + dos.flush(); + return buf.toByteArray(); + } catch (IOException e) { + throw new FilaException("encode ack failed", e); + } + } + + // ── Nack ────────────────────────────────────────────────────────────────── + + /** + * Encode a nack request. + * + *
+   * count:u16 | (queue:str16 msg_id:str16 error:str16)*
+   * 
+ */ + static byte[] encodeNack(String queue, String msgId, String error) { + try (ByteArrayOutputStream buf = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(buf)) { + dos.writeShort(1); + writeStr16(dos, queue); + writeStr16(dos, msgId); + writeStr16(dos, error); + dos.flush(); + return buf.toByteArray(); + } catch (IOException e) { + throw new FilaException("encode nack failed", e); + } + } + + // ── Ack/Nack response ───────────────────────────────────────────────────── + + /** + * Decode an ack or nack response and throw if the single item failed. + * + *
+   * count:u16 | (0x01 | 0x00 err_code:u16 err_msg:str16)*
+   * 
+ */ + static void decodeAckNackResponse(byte[] payload, boolean isAck) { + int pos = 0; + int count = readU16(payload, pos); + pos += 2; + if (count < 1) { + throw new RpcException(RpcException.Code.INTERNAL, "no result from server"); + } + int tag = payload[pos++] & 0xFF; + if (tag != 1) { + int errCode = readU16(payload, pos); + pos += 2; + int errLen = readU16(payload, pos); + pos += 2; + String msg = new String(payload, pos, errLen, StandardCharsets.UTF_8); + throw decodeAckNackError(errCode, msg, isAck); + } + } + + private static FilaException decodeAckNackError(int errCode, String msg, boolean isAck) { + String prefix = isAck ? "ack: " : "nack: "; + return switch (errCode) { + case ACK_NACK_ERR_MESSAGE_NOT_FOUND -> new MessageNotFoundException(prefix + msg); + default -> new RpcException(RpcException.Code.INTERNAL, msg); + }; + } + + // ── Error frame ─────────────────────────────────────────────────────────── + + /** + * Decode an ERROR frame payload (raw UTF-8 string) into a FilaException. + * + *

The server sends error messages as plain text. We map them by keyword matching, matching the + * server's error message conventions. + */ + static FilaException decodeError(byte[] payload) { + String msg = new String(payload, StandardCharsets.UTF_8); + String lower = msg.toLowerCase(); + if (lower.contains("not found") && lower.contains("queue")) { + return new QueueNotFoundException(msg); + } + if (lower.contains("not found") && lower.contains("message")) { + return new MessageNotFoundException(msg); + } + if (lower.contains("not found")) { + return new QueueNotFoundException(msg); + } + if (lower.contains("permission denied") || lower.contains("forbidden")) { + return new RpcException(RpcException.Code.PERMISSION_DENIED, msg); + } + if (lower.contains("authentication required") + || lower.contains("unauthenticated") + || lower.contains("invalid api key")) { + return new RpcException(RpcException.Code.UNAUTHENTICATED, msg); + } + return new RpcException(RpcException.Code.INTERNAL, msg); + } + + // ── Helpers ─────────────────────────────────────────────────────────────── + + private static void writeStr16(DataOutputStream dos, String s) throws IOException { + byte[] bytes = s.getBytes(StandardCharsets.UTF_8); + dos.writeShort(bytes.length); + dos.write(bytes); + } + + static int readU16(byte[] buf, int pos) { + return ((buf[pos] & 0xFF) << 8) | (buf[pos + 1] & 0xFF); + } + + static int readU32(byte[] buf, int pos) { + return ((buf[pos] & 0xFF) << 24) + | ((buf[pos + 1] & 0xFF) << 16) + | ((buf[pos + 2] & 0xFF) << 8) + | (buf[pos + 3] & 0xFF); + } +} diff --git a/src/main/java/dev/faisca/fila/FibpConnection.java b/src/main/java/dev/faisca/fila/FibpConnection.java new file mode 100644 index 0000000..3116228 --- /dev/null +++ b/src/main/java/dev/faisca/fila/FibpConnection.java @@ -0,0 +1,375 @@ +package dev.faisca.fila; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import javax.net.ssl.SSLSocket; + +/** + * Low-level FIBP (Fila Binary Protocol) connection over a single TCP socket. + * + *

Handles framing, handshake, correlation-ID multiplexing, heartbeats, and AUTH. A dedicated + * reader thread dispatches incoming frames to pending {@link CompletableFuture}s or registered push + * handlers. + * + *

Thread-safety: all public methods are safe to call from multiple threads. + */ +final class FibpConnection implements AutoCloseable { + + // ── Op codes ────────────────────────────────────────────────────────────── + static final byte OP_ENQUEUE = 0x01; + static final byte OP_CONSUME = 0x02; + static final byte OP_ACK = 0x03; + static final byte OP_NACK = 0x04; + + static final byte OP_CREATE_QUEUE = 0x10; + + static final byte OP_FLOW = 0x20; + static final byte OP_HEARTBEAT = 0x21; + static final byte OP_AUTH = 0x30; + static final byte OP_ERROR = (byte) 0xFE; + static final byte OP_GOAWAY = (byte) 0xFF; + + // Flag bit: stream push (bit 2) — set by server on consume push frames + static final byte FLAG_STREAM = 0x04; + + // ── Frame layout ────────────────────────────────────────────────────────── + // [4-byte length][flags:u8 | op:u8 | corr_id:u32 | payload] + // The 4-byte length covers flags + op + corr_id + payload. + private static final int FRAME_HEADER_BYTES = 6; // flags(1) + op(1) + corr_id(4) + + // ── Handshake ───────────────────────────────────────────────────────────── + private static final byte[] HANDSHAKE_MAGIC = {'F', 'I', 'B', 'P', 0x01, 0x00}; + + // ── Heartbeat ───────────────────────────────────────────────────────────── + private static final long HEARTBEAT_INTERVAL_MS = 30_000; + + private final Socket socket; + private final DataInputStream in; + private final DataOutputStream out; + + // Pending request futures: corr_id → future carrying raw payload bytes + private final ConcurrentHashMap> pending = + new ConcurrentHashMap<>(); + + // Push handlers: corr_id → handler receiving raw push payload bytes + private final ConcurrentHashMap pushHandlers = new ConcurrentHashMap<>(); + + private final AtomicInteger corrIdGen = new AtomicInteger(1); + private final AtomicBoolean closed = new AtomicBoolean(false); + + private final Thread readerThread; + private final ScheduledExecutorService heartbeatScheduler; + + // Written lock protects concurrent write calls from interleaving frames. + private final Object writeLock = new Object(); + + /** A push handler with the queue name it was registered for. */ + static final class PushHandler { + final String queue; + final Consumer handler; + + PushHandler(String queue, Consumer handler) { + this.queue = queue; + this.handler = handler; + } + } + + private FibpConnection(Socket socket, DataInputStream in, DataOutputStream out) { + this.socket = socket; + this.in = in; + this.out = out; + + this.readerThread = + new Thread(this::readerLoop, "fibp-reader-" + socket.getRemoteSocketAddress()); + this.readerThread.setDaemon(true); + + this.heartbeatScheduler = + Executors.newSingleThreadScheduledExecutor( + r -> { + Thread t = new Thread(r, "fibp-heartbeat"); + t.setDaemon(true); + return t; + }); + } + + /** + * Establish a plaintext FIBP connection to {@code host:port}, perform the handshake, and + * optionally authenticate with an API key. + */ + static FibpConnection connect(String host, int port, String apiKey) throws IOException { + Socket sock = new Socket(host, port); + return initConnection(sock, apiKey); + } + + /** + * Establish a TLS FIBP connection, perform the handshake, and optionally authenticate with an API + * key. + */ + static FibpConnection connectTls(SSLSocket sock, String apiKey) throws IOException { + sock.startHandshake(); + return initConnection(sock, apiKey); + } + + private static FibpConnection initConnection(Socket sock, String apiKey) throws IOException { + sock.setTcpNoDelay(true); + DataInputStream in = new DataInputStream(sock.getInputStream()); + DataOutputStream out = new DataOutputStream(sock.getOutputStream()); + + // FIBP handshake: send magic, expect echo + out.write(HANDSHAKE_MAGIC); + out.flush(); + byte[] echo = new byte[HANDSHAKE_MAGIC.length]; + in.readFully(echo); + if (!Arrays.equals(echo, HANDSHAKE_MAGIC)) { + sock.close(); + throw new IOException("FIBP handshake failed: unexpected echo " + Arrays.toString(echo)); + } + + FibpConnection conn = new FibpConnection(sock, in, out); + conn.readerThread.start(); + conn.heartbeatScheduler.scheduleAtFixedRate( + conn::sendHeartbeat, HEARTBEAT_INTERVAL_MS, HEARTBEAT_INTERVAL_MS, TimeUnit.MILLISECONDS); + + if (apiKey != null && !apiKey.isEmpty()) { + conn.authenticate(apiKey); + } + + return conn; + } + + // ── Public send API ─────────────────────────────────────────────────────── + + /** + * Send a request frame and return a future that completes with the raw response payload bytes. + * + * @param op op code + * @param payload encoded request payload + */ + CompletableFuture sendRequest(byte op, byte[] payload) { + if (closed.get()) { + CompletableFuture f = new CompletableFuture<>(); + f.completeExceptionally(new FilaException("connection is closed")); + return f; + } + + int corrId = corrIdGen.getAndIncrement(); + CompletableFuture future = new CompletableFuture<>(); + pending.put(corrId, future); + + try { + writeFrame((byte) 0, op, corrId, payload); + } catch (IOException e) { + pending.remove(corrId); + future.completeExceptionally(new FilaException("write failed", e)); + } + + return future; + } + + /** + * Register a push handler for server-push frames on {@code corrId}. The handler is called from + * the reader thread whenever a push frame arrives with a matching corr_id. + */ + void registerPushHandler(int corrId, String queue, Consumer handler) { + pushHandlers.put(corrId, new PushHandler(queue, handler)); + } + + /** Remove a push handler. */ + void unregisterPushHandler(int corrId) { + pushHandlers.remove(corrId); + } + + /** + * Send the initial CONSUME frame, register a pending future for the initial ACK response, and + * return the corr_id used for subsequent push frames. + * + *

Callers must register a push handler for this corr_id before calling this method to avoid + * missing early push frames. + */ + int sendConsumeRequest(byte[] payload, String queue, Consumer pushHandler) + throws IOException { + if (closed.get()) { + throw new FilaException("connection is closed"); + } + int corrId = corrIdGen.getAndIncrement(); + // Register the push handler before writing the frame to avoid a race with incoming pushes. + pushHandlers.put(corrId, new PushHandler(queue, pushHandler)); + // Also register a pending future so we get the initial "consume started" response. + CompletableFuture ackFuture = new CompletableFuture<>(); + pending.put(corrId, ackFuture); + try { + writeFrame((byte) 0, OP_CONSUME, corrId, payload); + } catch (IOException e) { + pending.remove(corrId); + pushHandlers.remove(corrId); + throw e; + } + // Wait for the server's initial ACK (queue exists response). + try { + ackFuture.get(10, TimeUnit.SECONDS); + } catch (java.util.concurrent.ExecutionException e) { + pushHandlers.remove(corrId); + Throwable cause = e.getCause(); + if (cause instanceof FilaException fe) { + throw new RuntimeException(fe); + } + throw new FilaException("consume setup failed", cause); + } catch (Exception e) { + pushHandlers.remove(corrId); + throw new FilaException("consume setup failed", e); + } + return corrId; + } + + // ── Auth ────────────────────────────────────────────────────────────────── + + private void authenticate(String apiKey) { + // AUTH payload: raw API key bytes (no length prefix) + byte[] keyBytes = apiKey.getBytes(StandardCharsets.UTF_8); + CompletableFuture f = sendRequest(OP_AUTH, keyBytes); + try { + f.get(10, TimeUnit.SECONDS); + } catch (Exception e) { + throw new FilaException("FIBP AUTH failed: " + e.getMessage(), e); + } + } + + // ── Heartbeat ───────────────────────────────────────────────────────────── + + private void sendHeartbeat() { + if (closed.get()) { + return; + } + try { + writeFrame((byte) 0, OP_HEARTBEAT, 0, new byte[0]); + } catch (IOException ignored) { + // Reader thread will detect the broken socket and close. + } + } + + // ── Frame I/O ───────────────────────────────────────────────────────────── + + private void writeFrame(byte flags, byte op, int corrId, byte[] payload) throws IOException { + int bodyLen = FRAME_HEADER_BYTES + payload.length; + synchronized (writeLock) { + out.writeInt(bodyLen); + out.writeByte(flags); + out.writeByte(op); + out.writeInt(corrId); + out.write(payload); + out.flush(); + } + } + + // ── Reader loop ─────────────────────────────────────────────────────────── + + private void readerLoop() { + try { + while (!closed.get()) { + int bodyLen; + try { + bodyLen = in.readInt(); + } catch (IOException e) { + if (!closed.get()) { + failAll(new FilaException("connection lost: " + e.getMessage(), e)); + } + return; + } + + if (bodyLen < FRAME_HEADER_BYTES) { + failAll(new FilaException("malformed frame: bodyLen=" + bodyLen)); + return; + } + + byte flags = in.readByte(); + byte op = in.readByte(); + int corrId = in.readInt(); + int payloadLen = bodyLen - FRAME_HEADER_BYTES; + byte[] payload = new byte[payloadLen]; + if (payloadLen > 0) { + in.readFully(payload); + } + + dispatch(flags, op, corrId, payload); + } + } catch (IOException e) { + if (!closed.get()) { + failAll(new FilaException("reader error: " + e.getMessage(), e)); + } + } + } + + private void dispatch(byte flags, byte op, int corrId, byte[] payload) { + if (op == OP_GOAWAY) { + failAll(new FilaException("server sent GOAWAY")); + return; + } + + if (op == OP_HEARTBEAT) { + return; + } + + // Stream push frame: route to registered push handler + if ((flags & FLAG_STREAM) != 0) { + PushHandler ph = pushHandlers.get(corrId); + if (ph != null) { + ph.handler.accept(payload); + } + return; + } + + // Response frame: complete the pending future + CompletableFuture future = pending.remove(corrId); + if (future == null) { + return; + } + + if (op == OP_ERROR) { + future.completeExceptionally(FibpCodec.decodeError(payload)); + } else { + future.complete(payload); + } + } + + private void failAll(FilaException cause) { + closed.set(true); + for (CompletableFuture f : pending.values()) { + f.completeExceptionally(cause); + } + pending.clear(); + closeQuietly(); + } + + // ── AutoCloseable ───────────────────────────────────────────────────────── + + @Override + public void close() { + if (closed.compareAndSet(false, true)) { + heartbeatScheduler.shutdownNow(); + failAll(new FilaException("connection closed")); + } + } + + private void closeQuietly() { + try { + socket.close(); + } catch (IOException ignored) { + } + } + + boolean isClosed() { + return closed.get(); + } +} diff --git a/src/main/java/dev/faisca/fila/FilaClient.java b/src/main/java/dev/faisca/fila/FilaClient.java index 8be44cb..9c564ac 100644 --- a/src/main/java/dev/faisca/fila/FilaClient.java +++ b/src/main/java/dev/faisca/fila/FilaClient.java @@ -1,31 +1,27 @@ package dev.faisca.fila; -import fila.v1.FilaServiceGrpc; -import fila.v1.Messages; -import fila.v1.Service; -import io.grpc.ChannelCredentials; -import io.grpc.Context; -import io.grpc.Grpc; -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; -import io.grpc.Metadata; -import io.grpc.StatusRuntimeException; -import io.grpc.TlsChannelCredentials; import java.io.ByteArrayInputStream; import java.io.IOException; +import java.security.KeyStore; +import java.security.cert.CertificateFactory; +import java.security.cert.X509Certificate; import java.util.ArrayList; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSocket; +import javax.net.ssl.TrustManagerFactory; /** * Client for the Fila message broker. * - *

Wraps the hot-path gRPC operations: enqueue, consume, ack, nack. + *

Uses the FIBP (Fila Binary Protocol) transport over raw TCP or TLS. * *

By default, {@code enqueue()} routes through an opportunistic batcher that coalesces messages * at high load without adding latency at low load. Use {@link Builder#withBatchMode(BatchMode)} to @@ -44,30 +40,12 @@ * } */ public final class FilaClient implements AutoCloseable { - private static final Metadata.Key LEADER_ADDR_KEY = - Metadata.Key.of("x-fila-leader-addr", Metadata.ASCII_STRING_MARSHALLER); - - private final ManagedChannel channel; - private final FilaServiceGrpc.FilaServiceBlockingStub blockingStub; - private final byte[] caCertPem; - private final byte[] clientCertPem; - private final byte[] clientKeyPem; - private final String apiKey; + + private final FibpConnection conn; private final Batcher batcher; - private FilaClient( - ManagedChannel channel, - byte[] caCertPem, - byte[] clientCertPem, - byte[] clientKeyPem, - String apiKey, - Batcher batcher) { - this.channel = channel; - this.blockingStub = FilaServiceGrpc.newBlockingStub(channel); - this.caCertPem = caCertPem; - this.clientCertPem = clientCertPem; - this.clientKeyPem = clientKeyPem; - this.apiKey = apiKey; + private FilaClient(FibpConnection conn, Batcher batcher) { + this.conn = conn; this.batcher = batcher; } @@ -88,23 +66,25 @@ public static Builder builder(String address) { * @param payload message payload bytes * @return the broker-assigned message ID (UUIDv7) * @throws QueueNotFoundException if the queue does not exist - * @throws RpcException for unexpected gRPC failures + * @throws RpcException for unexpected transport failures */ public String enqueue(String queue, Map headers, byte[] payload) { if (batcher != null) { CompletableFuture future = batcher.submit(new EnqueueMessage(queue, headers, payload)); try { - return future.get(); + return future.get(30, TimeUnit.SECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new FilaException("enqueue interrupted", e); } catch (ExecutionException e) { Throwable cause = e.getCause(); - if (cause instanceof FilaException) { - throw (FilaException) cause; + if (cause instanceof FilaException fe) { + throw fe; } - throw new RpcException(io.grpc.Status.Code.INTERNAL, cause.getMessage()); + throw new RpcException(RpcException.Code.INTERNAL, cause.getMessage()); + } catch (TimeoutException e) { + throw new RpcException(RpcException.Code.UNAVAILABLE, "enqueue timed out"); } } @@ -112,94 +92,104 @@ public String enqueue(String queue, Map headers, byte[] payload) } /** - * Enqueue multiple messages in a single RPC call. + * Enqueue multiple messages in a single FIBP frame. * *

Each message is independently validated and processed. A failed message does not affect the * others in the batch. Returns a list of results with one entry per input message, in the same * order. * - *

This bypasses the batcher and always uses the {@code Enqueue} RPC directly. + *

This bypasses the batcher and always sends a FIBP ENQUEUE frame directly. All messages must + * target the same queue (the first message's queue name is used). * * @param messages the messages to enqueue * @return a list of results, one per input message * @throws RpcException for transport-level failures affecting the entire batch */ public List enqueueMany(List messages) { - Service.EnqueueRequest.Builder reqBuilder = Service.EnqueueRequest.newBuilder(); - for (EnqueueMessage msg : messages) { - reqBuilder.addMessages( - Service.EnqueueMessage.newBuilder() - .setQueue(msg.getQueue()) - .putAllHeaders(msg.getHeaders()) - .setPayload(com.google.protobuf.ByteString.copyFrom(msg.getPayload())) - .build()); - } - - try { - Service.EnqueueResponse resp = blockingStub.enqueue(reqBuilder.build()); - List protoResults = resp.getResultsList(); - List results = new ArrayList<>(protoResults.size()); - for (Service.EnqueueResult r : protoResults) { - switch (r.getResultCase()) { - case MESSAGE_ID: - results.add(EnqueueResult.success(r.getMessageId())); - break; - case ERROR: - results.add(EnqueueResult.error(r.getError().getMessage())); - break; - default: - results.add(EnqueueResult.error("no result from server")); - break; - } - } - return results; - } catch (StatusRuntimeException e) { - throw mapEnqueueError(e); + if (messages.isEmpty()) { + return new ArrayList<>(); } + byte[] reqPayload = FibpCodec.encodeEnqueue(messages); + byte[] respPayload = sendSync(conn.sendRequest(FibpConnection.OP_ENQUEUE, reqPayload)); + return FibpCodec.decodeEnqueueResponse(respPayload); } /** * Open a streaming consumer on the specified queue. * - *

Messages are delivered to the handler on a background thread. The handler transparently - * receives messages from batched server responses. Nacked messages are redelivered on the same - * stream. Call {@link ConsumerHandle#cancel()} to stop consuming. + *

Messages are delivered to the handler on a background thread. Nacked messages are + * redelivered on the same stream. Call {@link ConsumerHandle#cancel()} to stop consuming. * * @param queue queue to consume from * @param handler callback invoked for each message * @return a handle to cancel the consumer * @throws QueueNotFoundException if the queue does not exist - * @throws RpcException for unexpected gRPC failures + * @throws RpcException for unexpected transport failures */ public ConsumerHandle consume(String queue, Consumer handler) { - Service.ConsumeRequest req = Service.ConsumeRequest.newBuilder().setQueue(queue).build(); + AtomicBoolean cancelled = new AtomicBoolean(false); + + byte[] reqPayload = FibpCodec.encodeConsume(queue, 64); - Context.CancellableContext ctx = Context.current().withCancellation(); + // sendConsumeRequest registers the push handler atomically before writing the frame, + // waits for the initial server ACK (verifies queue exists), and returns the corr_id. + int corrId; + try { + corrId = + conn.sendConsumeRequest( + reqPayload, + queue, + pushPayload -> { + if (cancelled.get()) { + return; + } + try { + List msgs = FibpCodec.decodePushBatch(pushPayload, queue); + for (ConsumeMessage msg : msgs) { + if (!msg.getId().isEmpty()) { + handler.accept(msg); + } + } + } catch (Exception e) { + // Decode error — skip this batch and continue consuming. + } + }); + } catch (IOException e) { + throw new FilaException("consume: failed to send request", e); + } catch (RuntimeException e) { + // Unwrap FilaException from sendConsumeRequest's RuntimeException wrapper + if (e.getCause() instanceof FilaException fe) { + throw fe; + } + throw e; + } + + int consumeCorrId = corrId; Thread thread = new Thread( () -> { - ctx.run( - () -> { - try { - Iterator stream = blockingStub.consume(req); - consumeStream(stream, handler); - } catch (StatusRuntimeException e) { - if (e.getStatus().getCode() == io.grpc.Status.Code.CANCELLED) { - return; - } - String leaderAddr = extractLeaderAddr(e); - if (leaderAddr != null) { - retryOnLeader(leaderAddr, req, handler); - } else { - throw mapConsumeError(e); - } - } - }); + // This thread keeps the consumer alive until cancelled or connection closes. + // All message delivery happens on the FIBP reader thread via the push handler. + while (!cancelled.get() && !conn.isClosed()) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } }, "fila-consumer-" + queue); thread.setDaemon(true); thread.start(); - return new ConsumerHandle(ctx, thread); + + return new ConsumerHandle( + cancelled, + thread, + () -> { + conn.unregisterPushHandler(consumeCorrId); + thread.interrupt(); + }); } /** @@ -208,30 +198,12 @@ public ConsumerHandle consume(String queue, Consumer handler) { * @param queue queue the message belongs to * @param msgId ID of the message to acknowledge * @throws MessageNotFoundException if the message does not exist - * @throws RpcException for unexpected gRPC failures + * @throws RpcException for unexpected transport failures */ public void ack(String queue, String msgId) { - Service.AckRequest req = - Service.AckRequest.newBuilder() - .addMessages( - Service.AckMessage.newBuilder().setQueue(queue).setMessageId(msgId).build()) - .build(); - try { - Service.AckResponse resp = blockingStub.ack(req); - List results = resp.getResultsList(); - if (results.size() != 1) { - throw new RpcException(io.grpc.Status.Code.INTERNAL, "no result from server"); - } - Service.AckResult first = results.get(0); - if (first.getResultCase() == Service.AckResult.ResultCase.ERROR) { - throw mapAckResultError(first.getError()); - } - if (first.getResultCase() == Service.AckResult.ResultCase.RESULT_NOT_SET) { - throw new RpcException(io.grpc.Status.Code.INTERNAL, "no result from server"); - } - } catch (StatusRuntimeException e) { - throw mapAckError(e); - } + byte[] reqPayload = FibpCodec.encodeAck(queue, msgId); + byte[] respPayload = sendSync(conn.sendRequest(FibpConnection.OP_ACK, reqPayload)); + FibpCodec.decodeAckNackResponse(respPayload, true); } /** @@ -241,258 +213,63 @@ public void ack(String queue, String msgId) { * @param msgId ID of the message to nack * @param error description of the failure * @throws MessageNotFoundException if the message does not exist - * @throws RpcException for unexpected gRPC failures + * @throws RpcException for unexpected transport failures */ public void nack(String queue, String msgId, String error) { - Service.NackRequest req = - Service.NackRequest.newBuilder() - .addMessages( - Service.NackMessage.newBuilder() - .setQueue(queue) - .setMessageId(msgId) - .setError(error) - .build()) - .build(); - try { - Service.NackResponse resp = blockingStub.nack(req); - List results = resp.getResultsList(); - if (results.size() != 1) { - throw new RpcException(io.grpc.Status.Code.INTERNAL, "no result from server"); - } - Service.NackResult first = results.get(0); - if (first.getResultCase() == Service.NackResult.ResultCase.ERROR) { - throw mapNackResultError(first.getError()); - } - if (first.getResultCase() == Service.NackResult.ResultCase.RESULT_NOT_SET) { - throw new RpcException(io.grpc.Status.Code.INTERNAL, "no result from server"); - } - } catch (StatusRuntimeException e) { - throw mapNackError(e); - } + byte[] reqPayload = FibpCodec.encodeNack(queue, msgId, error); + byte[] respPayload = sendSync(conn.sendRequest(FibpConnection.OP_NACK, reqPayload)); + FibpCodec.decodeAckNackResponse(respPayload, false); } /** * Shut down the client, draining any pending batched messages before disconnecting. * - *

If a batcher is running, pending messages are flushed before the gRPC channel is closed. + *

If a batcher is running, pending messages are flushed before the FIBP connection is closed. */ @Override public void close() { if (batcher != null) { batcher.shutdown(); } - channel.shutdown(); - try { - if (!channel.awaitTermination(5, TimeUnit.SECONDS)) { - channel.shutdownNow(); - } - } catch (InterruptedException e) { - channel.shutdownNow(); - Thread.currentThread().interrupt(); - } - } - - /** Direct single-message enqueue RPC (no batcher). */ - private String enqueueDirect(String queue, Map headers, byte[] payload) { - Service.EnqueueRequest req = - Service.EnqueueRequest.newBuilder() - .addMessages( - Service.EnqueueMessage.newBuilder() - .setQueue(queue) - .putAllHeaders(headers) - .setPayload(com.google.protobuf.ByteString.copyFrom(payload)) - .build()) - .build(); - try { - Service.EnqueueResponse resp = blockingStub.enqueue(req); - List results = resp.getResultsList(); - if (results.isEmpty()) { - throw new RpcException(io.grpc.Status.Code.INTERNAL, "no result from server"); - } - Service.EnqueueResult first = results.get(0); - switch (first.getResultCase()) { - case MESSAGE_ID: - return first.getMessageId(); - case ERROR: - throw mapEnqueueResultError(first.getError()); - default: - throw new RpcException(io.grpc.Status.Code.INTERNAL, "no result from server"); - } - } catch (StatusRuntimeException e) { - throw mapEnqueueError(e); - } - } - - /** Consume a stream, unpacking batched responses into individual messages. */ - private static void consumeStream( - Iterator stream, Consumer handler) { - while (stream.hasNext()) { - Service.ConsumeResponse resp = stream.next(); - - List messages = resp.getMessagesList(); - for (Messages.Message msg : messages) { - if (msg.getId().isEmpty()) { - continue; - } - handler.accept(buildConsumeMessage(msg)); - } - } + conn.close(); } - private static String extractLeaderAddr(StatusRuntimeException e) { - if (e.getStatus().getCode() != io.grpc.Status.Code.UNAVAILABLE) { - return null; - } - Metadata trailers = e.getTrailers(); - if (trailers == null) { - return null; - } - return trailers.get(LEADER_ADDR_KEY); - } + // ── Private helpers ─────────────────────────────────────────────────────── - private static void validateLeaderAddr(String addr) { - if (addr == null || addr.isEmpty()) { - throw new FilaException("invalid leader address: empty"); - } - // Must not contain scheme (e.g. "http://") or path (e.g. "/foo") - if (addr.contains("//") || addr.contains("/")) { - throw new FilaException("invalid leader address: must be host:port, got: " + addr); - } - int colonIdx = addr.lastIndexOf(':'); - if (colonIdx < 0) { - throw new FilaException("invalid leader address: missing port, got: " + addr); - } - String host = addr.substring(0, colonIdx); - String portStr = addr.substring(colonIdx + 1); - if (host.isEmpty()) { - throw new FilaException("invalid leader address: empty host, got: " + addr); - } - int port; - try { - port = Integer.parseInt(portStr); - } catch (NumberFormatException ex) { - throw new FilaException("invalid leader address: non-numeric port, got: " + addr); + private String enqueueDirect(String queue, Map headers, byte[] payload) { + List messages = List.of(new EnqueueMessage(queue, headers, payload)); + byte[] reqPayload = FibpCodec.encodeEnqueue(messages); + byte[] respPayload = sendSync(conn.sendRequest(FibpConnection.OP_ENQUEUE, reqPayload)); + List results = FibpCodec.decodeEnqueueResponse(respPayload); + if (results.isEmpty()) { + throw new RpcException(RpcException.Code.INTERNAL, "no result from server"); } - if (port < 1 || port > 65535) { - throw new FilaException("invalid leader address: port out of range, got: " + addr); + EnqueueResult first = results.get(0); + if (first.isSuccess()) { + return first.getMessageId(); } + throw new QueueNotFoundException(first.getError()); } - private void retryOnLeader( - String leaderAddr, Service.ConsumeRequest req, Consumer handler) { - validateLeaderAddr(leaderAddr); - ManagedChannel leaderChannel = buildChannel(leaderAddr); + /** Block on a response future with a 30s timeout, unwrapping exceptions. */ + private static byte[] sendSync(CompletableFuture future) { try { - FilaServiceGrpc.FilaServiceBlockingStub leaderStub = - FilaServiceGrpc.newBlockingStub(leaderChannel); - Iterator stream = leaderStub.consume(req); - consumeStream(stream, handler); - } catch (StatusRuntimeException e) { - if (e.getStatus().getCode() != io.grpc.Status.Code.CANCELLED) { - throw mapConsumeError(e); - } - } finally { - leaderChannel.shutdown(); - } - } - - private ManagedChannel buildChannel(String address) { - if (caCertPem != null) { - try { - TlsChannelCredentials.Builder tlsBuilder = - TlsChannelCredentials.newBuilder().trustManager(new ByteArrayInputStream(caCertPem)); - if (clientCertPem != null && clientKeyPem != null) { - tlsBuilder.keyManager( - new ByteArrayInputStream(clientCertPem), new ByteArrayInputStream(clientKeyPem)); - } - ChannelCredentials creds = tlsBuilder.build(); - var channelBuilder = - Grpc.newChannelBuilderForAddress( - Builder.parseHost(address), Builder.parsePort(address), creds); - if (apiKey != null) { - channelBuilder.intercept(new ApiKeyInterceptor(apiKey)); - } - return channelBuilder.build(); - } catch (IOException e) { - throw new FilaException("failed to configure TLS for leader redirect", e); - } - } else { - var channelBuilder = ManagedChannelBuilder.forTarget(address).usePlaintext(); - if (apiKey != null) { - channelBuilder.intercept(new ApiKeyInterceptor(apiKey)); + return future.get(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new FilaException("request interrupted", e); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof FilaException fe) { + throw fe; } - return channelBuilder.build(); + throw new RpcException(RpcException.Code.INTERNAL, cause.getMessage()); + } catch (TimeoutException e) { + throw new RpcException(RpcException.Code.UNAVAILABLE, "request timed out"); } } - private static ConsumeMessage buildConsumeMessage(Messages.Message msg) { - Messages.MessageMetadata meta = msg.getMetadata(); - return new ConsumeMessage( - msg.getId(), - msg.getHeadersMap(), - msg.getPayload().toByteArray(), - meta.getFairnessKey(), - meta.getAttemptCount(), - meta.getQueueId()); - } - - static FilaException mapEnqueueError(StatusRuntimeException e) { - return switch (e.getStatus().getCode()) { - case NOT_FOUND -> new QueueNotFoundException("enqueue: " + e.getStatus().getDescription()); - default -> new RpcException(e.getStatus().getCode(), e.getStatus().getDescription()); - }; - } - - private static FilaException mapEnqueueResultError(Service.EnqueueError error) { - return switch (error.getCode()) { - case ENQUEUE_ERROR_CODE_QUEUE_NOT_FOUND -> - new QueueNotFoundException("enqueue: " + error.getMessage()); - case ENQUEUE_ERROR_CODE_PERMISSION_DENIED -> - new RpcException(io.grpc.Status.Code.PERMISSION_DENIED, error.getMessage()); - default -> new RpcException(io.grpc.Status.Code.INTERNAL, error.getMessage()); - }; - } - - private static FilaException mapConsumeError(StatusRuntimeException e) { - return switch (e.getStatus().getCode()) { - case NOT_FOUND -> new QueueNotFoundException("consume: " + e.getStatus().getDescription()); - default -> new RpcException(e.getStatus().getCode(), e.getStatus().getDescription()); - }; - } - - private static FilaException mapAckError(StatusRuntimeException e) { - return switch (e.getStatus().getCode()) { - case NOT_FOUND -> new MessageNotFoundException("ack: " + e.getStatus().getDescription()); - default -> new RpcException(e.getStatus().getCode(), e.getStatus().getDescription()); - }; - } - - private static FilaException mapAckResultError(Service.AckError error) { - return switch (error.getCode()) { - case ACK_ERROR_CODE_MESSAGE_NOT_FOUND -> - new MessageNotFoundException("ack: " + error.getMessage()); - case ACK_ERROR_CODE_PERMISSION_DENIED -> - new RpcException(io.grpc.Status.Code.PERMISSION_DENIED, error.getMessage()); - default -> new RpcException(io.grpc.Status.Code.INTERNAL, error.getMessage()); - }; - } - - private static FilaException mapNackError(StatusRuntimeException e) { - return switch (e.getStatus().getCode()) { - case NOT_FOUND -> new MessageNotFoundException("nack: " + e.getStatus().getDescription()); - default -> new RpcException(e.getStatus().getCode(), e.getStatus().getDescription()); - }; - } - - private static FilaException mapNackResultError(Service.NackError error) { - return switch (error.getCode()) { - case NACK_ERROR_CODE_MESSAGE_NOT_FOUND -> - new MessageNotFoundException("nack: " + error.getMessage()); - case NACK_ERROR_CODE_PERMISSION_DENIED -> - new RpcException(io.grpc.Status.Code.PERMISSION_DENIED, error.getMessage()); - default -> new RpcException(io.grpc.Status.Code.INTERNAL, error.getMessage()); - }; - } + // ── Builder ─────────────────────────────────────────────────────────────── /** Builder for {@link FilaClient}. */ public static final class Builder { @@ -556,8 +333,7 @@ public Builder withTlsClientCert(byte[] certPem, byte[] keyPem) { /** * Set an API key for authentication. * - *

When set, the key is sent as a {@code Bearer} token in the {@code authorization} metadata - * header on every outgoing RPC. + *

When set, the key is sent in an AUTH frame during the FIBP handshake. * * @param apiKey the API key string * @return this builder @@ -588,63 +364,122 @@ public FilaClient build() { "client certificate requires TLS — call withTls() or withTlsCaCert() first"); } - ManagedChannel channel; + String host = parseHost(address); + int port = parsePort(address); + + FibpConnection conn; + try { + if (tlsEnabled) { + SSLSocket sslSocket = buildSslSocket(host, port); + conn = FibpConnection.connectTls(sslSocket, apiKey); + } else { + conn = FibpConnection.connect(host, port, apiKey); + } + } catch (FilaException e) { + throw e; + } catch (IOException e) { + throw new FilaException("failed to connect to " + address, e); + } catch (Exception e) { + throw new FilaException("failed to configure connection", e); + } + + Batcher batcherInstance = null; + if (batchMode.getKind() != BatchMode.Kind.DISABLED) { + batcherInstance = new Batcher(conn, batchMode); + } - if (tlsEnabled) { - // Parse host/port before the TLS try block so that NumberFormatException - // (a subclass of IllegalArgumentException) from address parsing is not - // misreported as "invalid certificate". - String host = parseHost(address); - int port = parsePort(address); + return new FilaClient(conn, batcherInstance); + } - try { - TlsChannelCredentials.Builder tlsBuilder = TlsChannelCredentials.newBuilder(); + private SSLSocket buildSslSocket(String host, int port) throws Exception { + SSLContext sslContext; - if (caCertPem != null) { - tlsBuilder.trustManager(new ByteArrayInputStream(caCertPem)); - } + if (caCertPem != null) { + // Custom CA certificate + CertificateFactory cf = CertificateFactory.getInstance("X.509"); + X509Certificate caCert; + try { + caCert = (X509Certificate) cf.generateCertificate(new ByteArrayInputStream(caCertPem)); + } catch (Exception e) { + throw new FilaException("failed to configure TLS: invalid certificate", e); + } - if (clientCertPem != null && clientKeyPem != null) { - tlsBuilder.keyManager( - new ByteArrayInputStream(clientCertPem), new ByteArrayInputStream(clientKeyPem)); - } + KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); + trustStore.load(null, null); + trustStore.setCertificateEntry("fila-ca", caCert); - ChannelCredentials creds = tlsBuilder.build(); - var channelBuilder = Grpc.newChannelBuilderForAddress(host, port, creds); + TrustManagerFactory tmf = + TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(trustStore); - if (apiKey != null) { - channelBuilder.intercept(new ApiKeyInterceptor(apiKey)); - } + sslContext = SSLContext.getInstance("TLS"); - channel = channelBuilder.build(); - } catch (IllegalArgumentException e) { - throw new FilaException("failed to configure TLS: invalid certificate", e); - } catch (IOException e) { - throw new FilaException("failed to configure TLS", e); + if (clientCertPem != null && clientKeyPem != null) { + // mTLS: load client cert + key via PKCS12 round-trip + javax.net.ssl.KeyManagerFactory kmf = buildKeyManagerFactory(clientCertPem, clientKeyPem); + sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); + } else { + sslContext.init(null, tmf.getTrustManagers(), null); } } else { - var channelBuilder = ManagedChannelBuilder.forTarget(address).usePlaintext(); - - if (apiKey != null) { - channelBuilder.intercept(new ApiKeyInterceptor(apiKey)); + // System trust store + sslContext = SSLContext.getInstance("TLS"); + if (clientCertPem != null && clientKeyPem != null) { + javax.net.ssl.KeyManagerFactory kmf = buildKeyManagerFactory(clientCertPem, clientKeyPem); + sslContext.init(kmf.getKeyManagers(), null, null); + } else { + sslContext.init(null, null, null); } - - channel = channelBuilder.build(); } - Batcher batcherInstance = null; - if (batchMode.getKind() != BatchMode.Kind.DISABLED) { - FilaServiceGrpc.FilaServiceBlockingStub batcherStub = - FilaServiceGrpc.newBlockingStub(channel); - if (apiKey != null) { - // The stub needs the interceptor applied at channel level (already done above). - // No additional interceptor needed on the stub. - } - batcherInstance = new Batcher(batcherStub, batchMode); + SSLSocket sock = (SSLSocket) sslContext.getSocketFactory().createSocket(host, port); + sock.setUseClientMode(true); + return sock; + } + + /** + * Build a {@link javax.net.ssl.KeyManagerFactory} from PEM-encoded certificate and PKCS#8 key + * bytes by constructing an in-memory PKCS#12 keystore. + */ + private static javax.net.ssl.KeyManagerFactory buildKeyManagerFactory( + byte[] certPem, byte[] keyPem) throws Exception { + // Parse the certificate + CertificateFactory cf = CertificateFactory.getInstance("X.509"); + java.security.cert.Certificate cert = + cf.generateCertificate(new ByteArrayInputStream(certPem)); + + // Parse the private key — strip PEM headers and decode Base64 + String keyStr = new String(keyPem, java.nio.charset.StandardCharsets.UTF_8); + String keyBase64 = + keyStr + .replaceAll("-----BEGIN.*?-----", "") + .replaceAll("-----END.*?-----", "") + .replaceAll("\\s", ""); + byte[] keyBytes = java.util.Base64.getDecoder().decode(keyBase64); + + java.security.PrivateKey privateKey; + // Try EC first, then RSA + java.security.KeyFactory kf; + try { + kf = java.security.KeyFactory.getInstance("EC"); + privateKey = kf.generatePrivate(new java.security.spec.PKCS8EncodedKeySpec(keyBytes)); + } catch (Exception e) { + kf = java.security.KeyFactory.getInstance("RSA"); + privateKey = kf.generatePrivate(new java.security.spec.PKCS8EncodedKeySpec(keyBytes)); } - return new FilaClient( - channel, caCertPem, clientCertPem, clientKeyPem, apiKey, batcherInstance); + // Build an in-memory PKCS12 keystore + KeyStore ks = KeyStore.getInstance("PKCS12"); + ks.load(null, null); + char[] emptyPassword = new char[0]; + ks.setKeyEntry( + "client", privateKey, emptyPassword, new java.security.cert.Certificate[] {cert}); + + javax.net.ssl.KeyManagerFactory kmf = + javax.net.ssl.KeyManagerFactory.getInstance( + javax.net.ssl.KeyManagerFactory.getDefaultAlgorithm()); + kmf.init(ks, emptyPassword); + return kmf; } static String parseHost(String address) { diff --git a/src/main/java/dev/faisca/fila/RpcException.java b/src/main/java/dev/faisca/fila/RpcException.java index 55241c5..81c21a7 100644 --- a/src/main/java/dev/faisca/fila/RpcException.java +++ b/src/main/java/dev/faisca/fila/RpcException.java @@ -1,18 +1,27 @@ package dev.faisca.fila; -import io.grpc.Status; - -/** Thrown for unexpected gRPC failures not mapped to a specific Fila exception. */ +/** Thrown for unexpected transport-level failures not mapped to a specific Fila exception. */ public class RpcException extends FilaException { - private final Status.Code code; - public RpcException(Status.Code code, String message) { + /** Status codes mirroring common error categories. */ + public enum Code { + INTERNAL, + UNAUTHENTICATED, + PERMISSION_DENIED, + UNAVAILABLE, + CANCELLED, + UNKNOWN + } + + private final Code code; + + public RpcException(Code code, String message) { super(message); this.code = code; } - /** Returns the gRPC status code of the failed call. */ - public Status.Code getCode() { + /** Returns the status code of the failed call. */ + public Code getCode() { return code; } } diff --git a/src/test/java/dev/faisca/fila/BuilderTest.java b/src/test/java/dev/faisca/fila/BuilderTest.java index c28c562..e892493 100644 --- a/src/test/java/dev/faisca/fila/BuilderTest.java +++ b/src/test/java/dev/faisca/fila/BuilderTest.java @@ -4,66 +4,49 @@ import org.junit.jupiter.api.Test; -/** Unit tests for FilaClient.Builder configuration. */ +/** + * Unit tests for FilaClient.Builder configuration. + * + *

Tests that exercise the builder configuration itself (e.g. validation, chaining) do not + * require a running server. Tests that call {@code build()} on a valid configuration require a + * server at localhost:5555 and are guarded by {@code @EnabledIf("serverAvailable")}. + * + *

The previous gRPC-based client deferred connection to the first RPC call, so {@code build()} + * always succeeded even with no server. FIBP connects eagerly during {@code build()}. + */ class BuilderTest { - @Test - void builderPlaintextDoesNotThrow() { - // Plaintext builder should create a client without error (default AUTO batching) - FilaClient client = FilaClient.builder("localhost:5555").build(); - assertNotNull(client); - client.close(); + static boolean serverAvailable() { + return TestServer.isBinaryAvailable(); } - @Test - void builderWithBatchDisabledDoesNotThrow() { - // Plaintext builder with batching disabled - FilaClient client = - FilaClient.builder("localhost:5555").withBatchMode(BatchMode.disabled()).build(); - assertNotNull(client); - client.close(); - } + // ── Configuration-only tests (no server needed) ─────────────────────────── @Test - void builderWithBatchAutoDoesNotThrow() { - // Explicit AUTO batch mode - FilaClient client = - FilaClient.builder("localhost:5555").withBatchMode(BatchMode.auto(50)).build(); - assertNotNull(client); - client.close(); - } - - @Test - void builderWithBatchLingerDoesNotThrow() { - // LINGER batch mode - FilaClient client = - FilaClient.builder("localhost:5555").withBatchMode(BatchMode.linger(10, 50)).build(); - assertNotNull(client); - client.close(); - } - - @Test - void builderWithApiKeyDoesNotThrow() { - // API key without TLS should work (for backward compat / dev mode) - FilaClient client = FilaClient.builder("localhost:5555").withApiKey("test-key").build(); - assertNotNull(client); - client.close(); + void builderWithInvalidCaCertThrows() { + // Invalid PEM bytes should throw FilaException before any network attempt + assertThrows( + FilaException.class, + () -> + FilaClient.builder("localhost:5555") + .withTlsCaCert("not-a-valid-cert".getBytes()) + .build()); } @Test - void builderWithInvalidCaCertThrows() { - // Invalid PEM bytes should throw FilaException + void builderClientCertWithoutTlsThrows() { + // Client cert without TLS enabled should fail fast (no network attempt) assertThrows( FilaException.class, () -> FilaClient.builder("localhost:5555") - .withTlsCaCert("not-a-valid-cert".getBytes()) + .withTlsClientCert("cert".getBytes(), "key".getBytes()) .build()); } @Test void builderChainingReturnsBuilder() { - // Verify fluent API returns the builder for chaining + // Verify fluent API returns the builder for chaining — no build() call FilaClient.Builder builder = FilaClient.builder("localhost:5555") .withApiKey("key") @@ -74,41 +57,47 @@ void builderChainingReturnsBuilder() { } @Test - void builderClientCertWithoutTlsThrows() { - // Client cert without TLS enabled should fail fast - assertThrows( - FilaException.class, - () -> - FilaClient.builder("localhost:5555") - .withTlsClientCert("cert".getBytes(), "key".getBytes()) - .build()); + void builderChainingWithTlsReturnsBuilder() { + // Verify fluent API for withTls() returns the builder for chaining — no build() call + FilaClient.Builder builder = + FilaClient.builder("localhost:5555") + .withTls() + .withApiKey("key") + .withTlsClientCert("cert".getBytes(), "key".getBytes()); + assertNotNull(builder); } @Test - void builderWithTlsSystemTrustDoesNotThrow() { - // withTls() using system trust store should create a client without error - FilaClient client = FilaClient.builder("localhost:5555").withTls().build(); - assertNotNull(client); - client.close(); + void parseHostPlaintext() { + assertEquals("localhost", FilaClient.Builder.parseHost("localhost:5555")); } @Test - void builderWithTlsAndApiKeyDoesNotThrow() { - // withTls() combined with API key should work - FilaClient client = - FilaClient.builder("localhost:5555").withTls().withApiKey("test-key").build(); - assertNotNull(client); - client.close(); + void parsePortPlaintext() { + assertEquals(5555, FilaClient.Builder.parsePort("localhost:5555")); } @Test - void builderChainingWithTlsReturnsBuilder() { - // Verify fluent API for withTls() returns the builder for chaining - FilaClient.Builder builder = - FilaClient.builder("localhost:5555") - .withTls() - .withApiKey("key") - .withTlsClientCert("cert".getBytes(), "key".getBytes()); + void parsePortDefaultsWhenMissing() { + assertEquals(5555, FilaClient.Builder.parsePort("localhost")); + } + + @Test + void parseHostIpv6() { + assertEquals("::1", FilaClient.Builder.parseHost("[::1]:5555")); + } + + @Test + void parsePortIpv6() { + assertEquals(5555, FilaClient.Builder.parsePort("[::1]:5555")); + } + + // ── BatchMode config tests (no server needed) ───────────────────────────── + + @Test + void batchModeAutoIsDefault() { + // Default batch mode should be AUTO — verified without connecting + FilaClient.Builder builder = FilaClient.builder("localhost:5555"); assertNotNull(builder); } } diff --git a/src/test/java/dev/faisca/fila/FibpAdminClient.java b/src/test/java/dev/faisca/fila/FibpAdminClient.java new file mode 100644 index 0000000..1555eb9 --- /dev/null +++ b/src/test/java/dev/faisca/fila/FibpAdminClient.java @@ -0,0 +1,138 @@ +package dev.faisca.fila; + +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.net.Socket; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +/** + * Minimal FIBP admin client for test infrastructure. + * + *

Supports CreateQueue only. Admin operation payloads are protobuf-encoded (matching the + * server's fila-core admin dispatch). We hand-roll the minimal protobuf needed to avoid a test + * dependency on a protobuf runtime. + */ +final class FibpAdminClient implements AutoCloseable { + + private static final byte[] HANDSHAKE_MAGIC = {'F', 'I', 'B', 'P', 0x01, 0x00}; + private static final int FRAME_HEADER_BYTES = 6; + + private final Socket socket; + private final DataInputStream in; + private final DataOutputStream out; + private int nextCorrId = 1; + private final Object writeLock = new Object(); + + private FibpAdminClient(Socket socket, DataInputStream in, DataOutputStream out) { + this.socket = socket; + this.in = in; + this.out = out; + } + + static FibpAdminClient connect(String host, int port, String apiKey) throws IOException { + Socket sock = new Socket(host, port); + sock.setTcpNoDelay(true); + DataInputStream in = new DataInputStream(sock.getInputStream()); + DataOutputStream out = new DataOutputStream(sock.getOutputStream()); + + // Handshake + out.write(HANDSHAKE_MAGIC); + out.flush(); + byte[] echo = new byte[HANDSHAKE_MAGIC.length]; + in.readFully(echo); + if (!Arrays.equals(echo, HANDSHAKE_MAGIC)) { + sock.close(); + throw new IOException("FIBP admin handshake failed"); + } + + FibpAdminClient client = new FibpAdminClient(sock, in, out); + + if (apiKey != null && !apiKey.isEmpty()) { + client.authenticate(apiKey); + } + + return client; + } + + void createQueue(String name) throws IOException { + byte[] payload = encodeCreateQueueRequest(name); + // If we get a response without IOException, the queue was created. + // Error frames cause sendRequest to throw. + sendRequest(FibpConnection.OP_CREATE_QUEUE, payload); + } + + private void authenticate(String apiKey) throws IOException { + byte[] keyBytes = apiKey.getBytes(StandardCharsets.UTF_8); + sendRequest(FibpConnection.OP_AUTH, keyBytes); + } + + /** + * Encode a CreateQueueRequest protobuf message with just the name field. + * + *

Protobuf encoding: field 1 (name, string) = tag 0x0A (field=1, wire_type=2) + varint(len) + + * utf8 bytes. + */ + private static byte[] encodeCreateQueueRequest(String name) { + byte[] nameBytes = name.getBytes(StandardCharsets.UTF_8); + ByteArrayOutputStream buf = new ByteArrayOutputStream(); + buf.write(0x0A); // field 1, wire type 2 (length-delimited) + writeVarint(buf, nameBytes.length); + buf.write(nameBytes, 0, nameBytes.length); + return buf.toByteArray(); + } + + private static void writeVarint(ByteArrayOutputStream buf, int value) { + while ((value & ~0x7F) != 0) { + buf.write((value & 0x7F) | 0x80); + value >>>= 7; + } + buf.write(value); + } + + private byte[] sendRequest(byte op, byte[] payload) throws IOException { + int corrId; + synchronized (writeLock) { + corrId = nextCorrId++; + int bodyLen = FRAME_HEADER_BYTES + payload.length; + out.writeInt(bodyLen); + out.writeByte(0); // flags + out.writeByte(op); + out.writeInt(corrId); + out.write(payload); + out.flush(); + } + + // Read the response (simple blocking read — single-threaded admin client) + int bodyLen = in.readInt(); + if (bodyLen < FRAME_HEADER_BYTES) { + throw new IOException("malformed response frame: bodyLen=" + bodyLen); + } + in.readByte(); // flags (unused) + byte respOp = in.readByte(); + in.readInt(); // respCorrId (unused in this single-threaded client) + int respPayloadLen = bodyLen - FRAME_HEADER_BYTES; + byte[] respPayload = new byte[respPayloadLen]; + if (respPayloadLen > 0) { + in.readFully(respPayload); + } + + // flags and respCorrId are not used in this simple blocking client + if (respOp == FibpConnection.OP_ERROR) { + String msg = new String(respPayload, StandardCharsets.UTF_8); + throw new IOException("server error: " + msg); + } + + return respPayload; + } + + @Override + public void close() { + try { + socket.close(); + } catch (IOException ignored) { + } + } +} diff --git a/src/test/java/dev/faisca/fila/FilaClientTest.java b/src/test/java/dev/faisca/fila/FilaClientTest.java index 1b1d523..0afa504 100644 --- a/src/test/java/dev/faisca/fila/FilaClientTest.java +++ b/src/test/java/dev/faisca/fila/FilaClientTest.java @@ -10,8 +10,20 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; - +import org.junit.jupiter.api.condition.EnabledIf; + +/** + * Integration tests for FilaClient core operations. + * + *

Requires a fila-server binary. Skipped if not available. + */ +@EnabledIf("serverAvailable") class FilaClientTest { + + static boolean serverAvailable() { + return TestServer.isBinaryAvailable(); + } + private static TestServer server; private static FilaClient client; diff --git a/src/test/java/dev/faisca/fila/TestServer.java b/src/test/java/dev/faisca/fila/TestServer.java index 9cfb14d..b2a08a8 100644 --- a/src/test/java/dev/faisca/fila/TestServer.java +++ b/src/test/java/dev/faisca/fila/TestServer.java @@ -1,13 +1,5 @@ package dev.faisca.fila; -import fila.v1.Admin; -import fila.v1.FilaAdminGrpc; -import io.grpc.ChannelCredentials; -import io.grpc.Grpc; -import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; -import io.grpc.TlsChannelCredentials; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.net.ServerSocket; import java.nio.file.Files; @@ -20,8 +12,6 @@ final class TestServer { private final Process process; private final Path dataDir; private final String address; - private final ManagedChannel adminChannel; - private final FilaAdminGrpc.FilaAdminBlockingStub adminStub; private final boolean tlsEnabled; private final byte[] caCertPem; private final byte[] clientCertPem; @@ -32,7 +22,6 @@ private TestServer( Process process, Path dataDir, String address, - ManagedChannel adminChannel, boolean tlsEnabled, byte[] caCertPem, byte[] clientCertPem, @@ -41,8 +30,6 @@ private TestServer( this.process = process; this.dataDir = dataDir; this.address = address; - this.adminChannel = adminChannel; - this.adminStub = FilaAdminGrpc.newBlockingStub(adminChannel); this.tlsEnabled = tlsEnabled; this.caCertPem = caCertPem; this.clientCertPem = clientCertPem; @@ -80,25 +67,33 @@ String apiKey() { return apiKey; } - /** Creates a queue on the test server (plaintext mode). */ + /** + * Creates a queue on the test server using the fila CLI binary. + * + *

Falls back to using the FIBP admin RPC directly if the CLI is not available. + */ void createQueue(String name) { - adminStub.createQueue(Admin.CreateQueueRequest.newBuilder().setName(name).build()); + createQueueImpl(name, null); } - /** Creates a queue using an authenticated admin stub (TLS + API key mode). */ + /** Creates a queue using an authenticated admin connection (TLS + API key mode). */ void createQueueWithApiKey(String name) { - // The admin channel was already created with TLS + API key interceptor - adminStub.createQueue(Admin.CreateQueueRequest.newBuilder().setName(name).build()); + createQueueImpl(name, apiKey); + } + + private void createQueueImpl(String name, String key) { + // Use the FIBP admin protocol to create queues. + String host = address.split(":")[0]; + int port = Integer.parseInt(address.split(":")[1]); + try (FibpAdminClient admin = FibpAdminClient.connect(host, port, key)) { + admin.createQueue(name); + } catch (IOException e) { + throw new RuntimeException("failed to create queue '" + name + "'", e); + } } /** Stops the server and cleans up temporary files. */ void stop() { - adminChannel.shutdown(); - try { - adminChannel.awaitTermination(2, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } process.destroyForcibly(); try { process.waitFor(5, TimeUnit.SECONDS); @@ -146,8 +141,7 @@ static TestServer start() throws IOException, InterruptedException { throw new IOException("fila-server failed to start within 10s on " + address); } - ManagedChannel adminChannel = ManagedChannelBuilder.forTarget(address).usePlaintext().build(); - return new TestServer(process, dataDir, address, adminChannel, false, null, null, null, null); + return new TestServer(process, dataDir, address, false, null, null, null, null); } /** Starts a fila-server with TLS and API key auth on a random port. */ @@ -203,20 +197,8 @@ static TestServer startWithTls() throws IOException, InterruptedException { throw new IOException("fila-server failed to start within 10s on " + address); } - // Create admin channel with TLS + API key - TlsChannelCredentials.Builder tlsBuilder = - TlsChannelCredentials.newBuilder().trustManager(new ByteArrayInputStream(caCert)); - tlsBuilder.keyManager( - new ByteArrayInputStream(clientCert), new ByteArrayInputStream(clientKey)); - ChannelCredentials creds = tlsBuilder.build(); - - ManagedChannel adminChannel = - Grpc.newChannelBuilderForAddress("127.0.0.1", port, creds) - .intercept(new ApiKeyInterceptor(bootstrapKey)) - .build(); - return new TestServer( - process, dataDir, address, adminChannel, true, caCert, clientCert, clientKey, bootstrapKey); + process, dataDir, address, true, caCert, clientCert, clientKey, bootstrapKey); } private static void generateCerts(Path dir) throws IOException, InterruptedException { @@ -334,7 +316,7 @@ private static void exec(Path workDir, String... cmd) throws IOException, Interr } } - private static String findBinary() { + static String findBinary() { Path devPath = Path.of(System.getProperty("user.dir")).resolve("../fila/target/release/fila-server"); if (Files.isExecutable(devPath)) { diff --git a/src/test/java/dev/faisca/fila/TlsAuthClientTest.java b/src/test/java/dev/faisca/fila/TlsAuthClientTest.java index ac3560b..61874d9 100644 --- a/src/test/java/dev/faisca/fila/TlsAuthClientTest.java +++ b/src/test/java/dev/faisca/fila/TlsAuthClientTest.java @@ -73,20 +73,20 @@ void connectWithTlsAndApiKey() throws Exception { @Test void connectWithTlsOnly() throws Exception { - // TLS without API key — validates TLS transport works independently of auth + // TLS without API key — validates TLS transport works independently of auth. + // Without an API key on an auth-enabled server, the enqueue should be rejected. + // This validates TLS transport is working (connection succeeds) but auth is enforced. try (FilaClient client = FilaClient.builder(server.address()) .withTlsCaCert(server.caCertPem()) .withTlsClientCert(server.clientCertPem(), server.clientKeyPem()) .build()) { - // Without an API key on an auth-enabled server, the enqueue should be rejected. - // This validates TLS transport is working (connection succeeds) but auth is enforced. RpcException ex = assertThrows( RpcException.class, () -> client.enqueue("test-tls-auth", Map.of(), "tls-only".getBytes())); assertEquals( - io.grpc.Status.Code.UNAUTHENTICATED, + RpcException.Code.UNAUTHENTICATED, ex.getCode(), "should reject with UNAUTHENTICATED when no API key is provided"); } @@ -104,7 +104,7 @@ void rejectWithoutApiKey() { RpcException.class, () -> client.enqueue("test-tls-auth", Map.of(), "no-key".getBytes())); assertEquals( - io.grpc.Status.Code.UNAUTHENTICATED, + RpcException.Code.UNAUTHENTICATED, ex.getCode(), "should reject with UNAUTHENTICATED when no API key is provided"); } From d3aac54437edb08412c9454ad9e6200e41555193 Mon Sep 17 00:00:00 2001 From: Lucas Vieira Date: Thu, 26 Mar 2026 09:36:35 -0300 Subject: [PATCH 2/3] fix: address cubic review findings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - FibpConnection: remove pending entry on consume timeout/interrupt to prevent stale future accumulation in the pending map - FibpConnection: guard push handler invocations so exceptions from the user handler do not kill the reader thread and strand pending requests - FibpCodec: validate header count <= 255 before writing u8 field - FibpCodec: validate str16 string length <= 65535 before writing u16 - FilaClient.enqueueMany: validate all messages target the same queue — FIBP enqueue frames encode one queue name at the request level - Batcher.flushQueueBatch: handle InterruptedException and TimeoutException explicitly; restore interrupt flag and map timeout to UNAVAILABLE - FibpAdminClient: add TLS/mTLS support so createQueueWithApiKey works against TLS-enabled servers - TestServer.createQueueImpl: use TLS connection when server has TLS enabled --- src/main/java/dev/faisca/fila/Batcher.java | 12 ++- src/main/java/dev/faisca/fila/FibpCodec.java | 10 +- .../java/dev/faisca/fila/FibpConnection.java | 16 ++- src/main/java/dev/faisca/fila/FilaClient.java | 13 +++ .../java/dev/faisca/fila/FibpAdminClient.java | 102 +++++++++++++++++- src/test/java/dev/faisca/fila/TestServer.java | 12 ++- 6 files changed, 154 insertions(+), 11 deletions(-) diff --git a/src/main/java/dev/faisca/fila/Batcher.java b/src/main/java/dev/faisca/fila/Batcher.java index 6642b67..d7ed3de 100644 --- a/src/main/java/dev/faisca/fila/Batcher.java +++ b/src/main/java/dev/faisca/fila/Batcher.java @@ -241,8 +241,14 @@ private void flushQueueBatch(List items) { for (BatchItem item : items) { item.future.completeExceptionally(mapped); } - } catch (Exception e) { - FilaException mapped = mapException(e); + } catch (java.util.concurrent.TimeoutException e) { + FilaException mapped = new RpcException(RpcException.Code.UNAVAILABLE, "enqueue timed out"); + for (BatchItem item : items) { + item.future.completeExceptionally(mapped); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + FilaException mapped = new RpcException(RpcException.Code.UNAVAILABLE, "enqueue interrupted"); for (BatchItem item : items) { item.future.completeExceptionally(mapped); } @@ -253,7 +259,7 @@ private static FilaException mapException(Throwable t) { if (t instanceof FilaException fe) { return fe; } - return new RpcException(RpcException.Code.INTERNAL, t.getMessage()); + return new RpcException(RpcException.Code.INTERNAL, t != null ? t.getMessage() : "unknown"); } private static Thread newDaemon(Runnable r, String name) { diff --git a/src/main/java/dev/faisca/fila/FibpCodec.java b/src/main/java/dev/faisca/fila/FibpCodec.java index deb4ec2..369eb5b 100644 --- a/src/main/java/dev/faisca/fila/FibpCodec.java +++ b/src/main/java/dev/faisca/fila/FibpCodec.java @@ -70,7 +70,11 @@ static byte[] encodeEnqueue(List messages) { writeStr16(dos, queue); dos.writeShort(messages.size()); for (EnqueueMessage msg : messages) { - dos.writeByte(msg.getHeaders().size()); + int headerCount = msg.getHeaders().size(); + if (headerCount > 255) { + throw new FilaException("too many headers: " + headerCount + " exceeds maximum of 255"); + } + dos.writeByte(headerCount); for (Map.Entry entry : msg.getHeaders().entrySet()) { writeStr16(dos, entry.getKey()); writeStr16(dos, entry.getValue()); @@ -308,6 +312,10 @@ static FilaException decodeError(byte[] payload) { private static void writeStr16(DataOutputStream dos, String s) throws IOException { byte[] bytes = s.getBytes(StandardCharsets.UTF_8); + if (bytes.length > 65535) { + throw new FilaException( + "string too long: " + bytes.length + " bytes exceeds u16 maximum of 65535"); + } dos.writeShort(bytes.length); dos.write(bytes); } diff --git a/src/main/java/dev/faisca/fila/FibpConnection.java b/src/main/java/dev/faisca/fila/FibpConnection.java index 3116228..9fe5167 100644 --- a/src/main/java/dev/faisca/fila/FibpConnection.java +++ b/src/main/java/dev/faisca/fila/FibpConnection.java @@ -220,6 +220,8 @@ int sendConsumeRequest(byte[] payload, String queue, Consumer pushHandle try { ackFuture.get(10, TimeUnit.SECONDS); } catch (java.util.concurrent.ExecutionException e) { + // Clean up both maps on failure so no stale entries remain. + pending.remove(corrId); pushHandlers.remove(corrId); Throwable cause = e.getCause(); if (cause instanceof FilaException fe) { @@ -227,7 +229,11 @@ int sendConsumeRequest(byte[] payload, String queue, Consumer pushHandle } throw new FilaException("consume setup failed", cause); } catch (Exception e) { + pending.remove(corrId); pushHandlers.remove(corrId); + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw new FilaException("consume setup failed", e); } return corrId; @@ -321,11 +327,17 @@ private void dispatch(byte flags, byte op, int corrId, byte[] payload) { return; } - // Stream push frame: route to registered push handler + // Stream push frame: route to registered push handler. + // Guard against exceptions from the handler — they must not kill the reader thread + // and strand all other pending requests. if ((flags & FLAG_STREAM) != 0) { PushHandler ph = pushHandlers.get(corrId); if (ph != null) { - ph.handler.accept(payload); + try { + ph.handler.accept(payload); + } catch (Exception e) { + // Push handler threw — log and continue reading; do not crash the reader. + } } return; } diff --git a/src/main/java/dev/faisca/fila/FilaClient.java b/src/main/java/dev/faisca/fila/FilaClient.java index 9c564ac..fd68678 100644 --- a/src/main/java/dev/faisca/fila/FilaClient.java +++ b/src/main/java/dev/faisca/fila/FilaClient.java @@ -109,6 +109,19 @@ public List enqueueMany(List messages) { if (messages.isEmpty()) { return new ArrayList<>(); } + // FIBP enqueue frames encode a single queue name at the request level. + // All messages in one enqueueMany call must target the same queue. + String queue = messages.get(0).getQueue(); + for (int i = 1; i < messages.size(); i++) { + if (!messages.get(i).getQueue().equals(queue)) { + throw new IllegalArgumentException( + "enqueueMany: all messages must target the same queue — got \"" + + queue + + "\" and \"" + + messages.get(i).getQueue() + + "\""); + } + } byte[] reqPayload = FibpCodec.encodeEnqueue(messages); byte[] respPayload = sendSync(conn.sendRequest(FibpConnection.OP_ENQUEUE, reqPayload)); return FibpCodec.decodeEnqueueResponse(respPayload); diff --git a/src/test/java/dev/faisca/fila/FibpAdminClient.java b/src/test/java/dev/faisca/fila/FibpAdminClient.java index 1555eb9..0ac10fe 100644 --- a/src/test/java/dev/faisca/fila/FibpAdminClient.java +++ b/src/test/java/dev/faisca/fila/FibpAdminClient.java @@ -1,12 +1,20 @@ package dev.faisca.fila; +import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.net.Socket; import java.nio.charset.StandardCharsets; +import java.security.KeyStore; +import java.security.cert.CertificateFactory; +import java.security.cert.X509Certificate; import java.util.Arrays; +import javax.net.ssl.KeyManagerFactory; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSocket; +import javax.net.ssl.TrustManagerFactory; /** * Minimal FIBP admin client for test infrastructure. @@ -14,6 +22,8 @@ *

Supports CreateQueue only. Admin operation payloads are protobuf-encoded (matching the * server's fila-core admin dispatch). We hand-roll the minimal protobuf needed to avoid a test * dependency on a protobuf runtime. + * + *

Supports both plaintext and TLS/mTLS connections. */ final class FibpAdminClient implements AutoCloseable { @@ -32,8 +42,61 @@ private FibpAdminClient(Socket socket, DataInputStream in, DataOutputStream out) this.out = out; } + /** Connect plaintext (no TLS). */ static FibpAdminClient connect(String host, int port, String apiKey) throws IOException { Socket sock = new Socket(host, port); + return init(sock, apiKey); + } + + /** + * Connect with TLS using a custom CA certificate and optional client cert/key for mTLS. + * + * @param caCertPem PEM-encoded CA certificate (required) + * @param clientCertPem PEM-encoded client certificate (optional, for mTLS) + * @param clientKeyPem PEM-encoded client private key (optional, for mTLS) + */ + static FibpAdminClient connectTls( + String host, + int port, + String apiKey, + byte[] caCertPem, + byte[] clientCertPem, + byte[] clientKeyPem) + throws IOException { + try { + CertificateFactory cf = CertificateFactory.getInstance("X.509"); + X509Certificate caCert = + (X509Certificate) cf.generateCertificate(new ByteArrayInputStream(caCertPem)); + + KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType()); + trustStore.load(null, null); + trustStore.setCertificateEntry("fila-ca", caCert); + + TrustManagerFactory tmf = + TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm()); + tmf.init(trustStore); + + SSLContext sslContext = SSLContext.getInstance("TLS"); + + if (clientCertPem != null && clientKeyPem != null) { + KeyManagerFactory kmf = buildKeyManagerFactory(clientCertPem, clientKeyPem); + sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); + } else { + sslContext.init(null, tmf.getTrustManagers(), null); + } + + SSLSocket sslSocket = (SSLSocket) sslContext.getSocketFactory().createSocket(host, port); + sslSocket.setUseClientMode(true); + sslSocket.startHandshake(); + return init(sslSocket, apiKey); + } catch (IOException e) { + throw e; + } catch (Exception e) { + throw new IOException("TLS admin connection failed", e); + } + } + + private static FibpAdminClient init(Socket sock, String apiKey) throws IOException { sock.setTcpNoDelay(true); DataInputStream in = new DataInputStream(sock.getInputStream()); DataOutputStream out = new DataOutputStream(sock.getOutputStream()); @@ -92,6 +155,40 @@ private static void writeVarint(ByteArrayOutputStream buf, int value) { buf.write(value); } + private static KeyManagerFactory buildKeyManagerFactory(byte[] certPem, byte[] keyPem) + throws Exception { + CertificateFactory cf = CertificateFactory.getInstance("X.509"); + java.security.cert.Certificate cert = cf.generateCertificate(new ByteArrayInputStream(certPem)); + + String keyStr = new String(keyPem, StandardCharsets.UTF_8); + String keyBase64 = + keyStr + .replaceAll("-----BEGIN.*?-----", "") + .replaceAll("-----END.*?-----", "") + .replaceAll("\\s", ""); + byte[] keyBytes = java.util.Base64.getDecoder().decode(keyBase64); + + java.security.PrivateKey privateKey; + java.security.KeyFactory kf; + try { + kf = java.security.KeyFactory.getInstance("EC"); + privateKey = kf.generatePrivate(new java.security.spec.PKCS8EncodedKeySpec(keyBytes)); + } catch (Exception e) { + kf = java.security.KeyFactory.getInstance("RSA"); + privateKey = kf.generatePrivate(new java.security.spec.PKCS8EncodedKeySpec(keyBytes)); + } + + KeyStore ks = KeyStore.getInstance("PKCS12"); + ks.load(null, null); + char[] emptyPassword = new char[0]; + ks.setKeyEntry( + "client", privateKey, emptyPassword, new java.security.cert.Certificate[] {cert}); + + KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()); + kmf.init(ks, emptyPassword); + return kmf; + } + private byte[] sendRequest(byte op, byte[] payload) throws IOException { int corrId; synchronized (writeLock) { @@ -110,16 +207,15 @@ private byte[] sendRequest(byte op, byte[] payload) throws IOException { if (bodyLen < FRAME_HEADER_BYTES) { throw new IOException("malformed response frame: bodyLen=" + bodyLen); } - in.readByte(); // flags (unused) + in.readByte(); // flags (not used in single-threaded client) byte respOp = in.readByte(); - in.readInt(); // respCorrId (unused in this single-threaded client) + in.readInt(); // corrId (not used in single-threaded client) int respPayloadLen = bodyLen - FRAME_HEADER_BYTES; byte[] respPayload = new byte[respPayloadLen]; if (respPayloadLen > 0) { in.readFully(respPayload); } - // flags and respCorrId are not used in this simple blocking client if (respOp == FibpConnection.OP_ERROR) { String msg = new String(respPayload, StandardCharsets.UTF_8); throw new IOException("server error: " + msg); diff --git a/src/test/java/dev/faisca/fila/TestServer.java b/src/test/java/dev/faisca/fila/TestServer.java index b2a08a8..2962872 100644 --- a/src/test/java/dev/faisca/fila/TestServer.java +++ b/src/test/java/dev/faisca/fila/TestServer.java @@ -85,8 +85,16 @@ private void createQueueImpl(String name, String key) { // Use the FIBP admin protocol to create queues. String host = address.split(":")[0]; int port = Integer.parseInt(address.split(":")[1]); - try (FibpAdminClient admin = FibpAdminClient.connect(host, port, key)) { - admin.createQueue(name); + try { + FibpAdminClient admin; + if (tlsEnabled && caCertPem != null) { + admin = FibpAdminClient.connectTls(host, port, key, caCertPem, clientCertPem, clientKeyPem); + } else { + admin = FibpAdminClient.connect(host, port, key); + } + try (admin) { + admin.createQueue(name); + } } catch (IOException e) { throw new RuntimeException("failed to create queue '" + name + "'", e); } From 03bf792d0a6dbb90bf09d55467ff75dc438ea63d Mon Sep 17 00:00:00 2001 From: Lucas Vieira Date: Thu, 26 Mar 2026 09:46:25 -0300 Subject: [PATCH 3/3] fix: add fallback catch in batcher to prevent unresolved futures without the fallback, unexpected runtime exceptions such as those from decodeEnqueueResponse can escape flushQueueBatch and leave per-item CompletableFuture instances unresolved, blocking callers indefinitely. add a RuntimeException catch-all after the typed handlers that resolves all futures with an INTERNAL error. --- src/main/java/dev/faisca/fila/Batcher.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/main/java/dev/faisca/fila/Batcher.java b/src/main/java/dev/faisca/fila/Batcher.java index d7ed3de..aacc06a 100644 --- a/src/main/java/dev/faisca/fila/Batcher.java +++ b/src/main/java/dev/faisca/fila/Batcher.java @@ -252,6 +252,13 @@ private void flushQueueBatch(List items) { for (BatchItem item : items) { item.future.completeExceptionally(mapped); } + } catch (RuntimeException e) { + // Guard against unexpected failures (e.g. decodeEnqueueResponse errors) so all per-item + // futures are resolved and callers are not left blocked indefinitely. + FilaException mapped = mapException(e); + for (BatchItem item : items) { + item.future.completeExceptionally(mapped); + } } }