Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,77 @@ try (FilaClient client = FilaClient.builder("localhost:5555").build()) {
}
```

## TLS

### System trust store (public CAs)

If the Fila server uses a certificate issued by a public CA (e.g., Let's Encrypt), enable TLS with the JVM's default trust store:

```java
try (FilaClient client = FilaClient.builder("localhost:5555")
.withTls()
.build()) {
// use client...
}
```

### Custom CA certificate

For servers using self-signed or private CA certificates, provide the CA cert explicitly:

```java
byte[] caCert = Files.readAllBytes(Path.of("ca.pem"));

try (FilaClient client = FilaClient.builder("localhost:5555")
.withTlsCaCert(caCert)
.build()) {
// use client...
}
```

### Mutual TLS (mTLS)

For mutual TLS, also provide the client certificate and key. This works with both trust modes:

```java
byte[] caCert = Files.readAllBytes(Path.of("ca.pem"));
byte[] clientCert = Files.readAllBytes(Path.of("client.pem"));
byte[] clientKey = Files.readAllBytes(Path.of("client-key.pem"));

try (FilaClient client = FilaClient.builder("localhost:5555")
.withTlsCaCert(caCert)
.withTlsClientCert(clientCert, clientKey)
.build()) {
// use client...
}
```

## API Key Authentication

When the server has auth enabled, provide an API key:

```java
try (FilaClient client = FilaClient.builder("localhost:5555")
.withApiKey("your-api-key")
.build()) {
// use client...
}
```

The key is sent as a `Bearer` token in the `authorization` metadata header on every RPC.

TLS and API key auth can be combined:

```java
try (FilaClient client = FilaClient.builder("localhost:5555")
.withTlsCaCert(caCert)
.withTlsClientCert(clientCert, clientKey)
.withApiKey("your-api-key")
.build()) {
// use client...
}
```

## API Reference

### `FilaClient`
Expand All @@ -56,6 +127,17 @@ FilaClient client = FilaClient.builder("localhost:5555").build();

`FilaClient` implements `AutoCloseable` for use with try-with-resources.

#### Builder Methods

| Method | Description |
|--------|-------------|
| `withTls()` | Enable TLS using JVM's default trust store (cacerts) |
| `withTlsCaCert(byte[] caCertPem)` | CA certificate for TLS server verification (implies `withTls()`) |
| `withTlsClientCert(byte[] certPem, byte[] keyPem)` | Client cert + key for mTLS |
| `withApiKey(String apiKey)` | API key sent as `Bearer` token on every RPC |

All builder methods are optional. When none are set, the client connects over plaintext without authentication (backward compatible).

#### `enqueue(String queue, Map<String, String> headers, byte[] payload) -> String`

Enqueue a message. Returns the broker-assigned message ID (UUIDv7).
Expand Down
83 changes: 83 additions & 0 deletions proto/fila/v1/admin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@ service FilaAdmin {
rpc GetStats(GetStatsRequest) returns (GetStatsResponse);
rpc Redrive(RedriveRequest) returns (RedriveResponse);
rpc ListQueues(ListQueuesRequest) returns (ListQueuesResponse);

// API key management. CreateApiKey bypasses auth (bootstrap); others require a valid key.
Copy link
Copy Markdown

@cubic-dev-ai cubic-dev-ai Bot Mar 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Unauthenticated CreateApiKey can create superadmin keys. Since this RPC bypasses auth (per the comment), any network-reachable client can call CreateApiKey(is_superadmin=true) and obtain a key that bypasses all ACL checks. Consider restricting the unauthenticated bootstrap path—e.g., only allow it when no keys exist yet, or disallow is_superadmin on unauthenticated calls—so that an open bootstrap endpoint cannot be exploited post-setup.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At proto/fila/v1/admin.proto, line 15:

<comment>Unauthenticated `CreateApiKey` can create superadmin keys. Since this RPC bypasses auth (per the comment), any network-reachable client can call `CreateApiKey(is_superadmin=true)` and obtain a key that bypasses all ACL checks. Consider restricting the unauthenticated bootstrap path—e.g., only allow it when no keys exist yet, or disallow `is_superadmin` on unauthenticated calls—so that an open bootstrap endpoint cannot be exploited post-setup.</comment>

<file context>
@@ -11,6 +11,15 @@ service FilaAdmin {
   rpc Redrive(RedriveRequest) returns (RedriveResponse);
   rpc ListQueues(ListQueuesRequest) returns (ListQueuesResponse);
+
+  // API key management. CreateApiKey bypasses auth (bootstrap); others require a valid key.
+  rpc CreateApiKey(CreateApiKeyRequest) returns (CreateApiKeyResponse);
+  rpc RevokeApiKey(RevokeApiKeyRequest) returns (RevokeApiKeyResponse);
</file context>
Fix with Cubic

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a server-side authorization concern, not a client SDK issue. The Fila server handles this through the bootstrap_apikey mechanism: when auth is enabled, CreateApiKey requires a valid bootstrap key (configured via config or env var). The proto definition must match the server's proto — changing it here would break compatibility. The SDK is just a client; it doesn't make authorization decisions.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback! I've saved this as a new learning to improve future reviews.

rpc CreateApiKey(CreateApiKeyRequest) returns (CreateApiKeyResponse);
rpc RevokeApiKey(RevokeApiKeyRequest) returns (RevokeApiKeyResponse);
rpc ListApiKeys(ListApiKeysRequest) returns (ListApiKeysResponse);

// Per-key ACL management.
rpc SetAcl(SetAclRequest) returns (SetAclResponse);
rpc GetAcl(GetAclRequest) returns (GetAclResponse);
}

message CreateQueueRequest {
Expand Down Expand Up @@ -89,6 +98,9 @@ message GetStatsResponse {
uint32 quantum = 5;
repeated PerFairnessKeyStats per_key_stats = 6;
repeated PerThrottleKeyStats per_throttle_stats = 7;
// Cluster fields (0 when not in cluster mode).
uint64 leader_node_id = 8;
uint32 replication_count = 9;
}

message RedriveRequest {
Expand All @@ -107,8 +119,79 @@ message QueueInfo {
uint64 depth = 2;
uint64 in_flight = 3;
uint32 active_consumers = 4;
uint64 leader_node_id = 5;
}

message ListQueuesResponse {
repeated QueueInfo queues = 1;
uint32 cluster_node_count = 2;
}

// --- API Key Management ---

message CreateApiKeyRequest {
/// Human-readable label for the key.
string name = 1;
/// Optional Unix timestamp (milliseconds) after which the key expires.
/// 0 means no expiration.
uint64 expires_at_ms = 2;
/// When true, the key bypasses all ACL checks (superadmin).
bool is_superadmin = 3;
}

message CreateApiKeyResponse {
/// Opaque key ID for management operations (revoke, list, set-acl).
string key_id = 1;
/// Plaintext API key. Returned once — store it securely.
string key = 2;
/// Whether this key has superadmin privileges.
bool is_superadmin = 3;
}

message RevokeApiKeyRequest {
string key_id = 1;
}

message RevokeApiKeyResponse {}

message ListApiKeysRequest {}

message ApiKeyInfo {
string key_id = 1;
string name = 2;
uint64 created_at_ms = 3;
/// 0 means no expiration.
uint64 expires_at_ms = 4;
bool is_superadmin = 5;
}

message ListApiKeysResponse {
repeated ApiKeyInfo keys = 1;
}

// --- ACL Management ---

/// A single permission grant: kind (produce/consume/admin) + queue pattern.
message AclPermission {
/// One of: "produce", "consume", "admin".
string kind = 1;
/// Queue name or wildcard ("*" or "orders.*").
string pattern = 2;
}

message SetAclRequest {
string key_id = 1;
repeated AclPermission permissions = 2;
}

message SetAclResponse {}

message GetAclRequest {
string key_id = 1;
}

message GetAclResponse {
string key_id = 1;
repeated AclPermission permissions = 2;
bool is_superadmin = 3;
}
36 changes: 36 additions & 0 deletions src/main/java/dev/faisca/fila/ApiKeyInterceptor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package dev.faisca.fila;

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall.SimpleForwardingClientCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;

/**
* gRPC client interceptor that attaches a {@code Bearer} API key to the {@code authorization}
* metadata header on every outgoing RPC.
*/
final class ApiKeyInterceptor implements ClientInterceptor {
private static final Metadata.Key<String> AUTH_KEY =
Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER);

private final String headerValue;

ApiKeyInterceptor(String apiKey) {
this.headerValue = "Bearer " + apiKey;
}

@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new SimpleForwardingClientCall<>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
headers.put(AUTH_KEY, headerValue);
super.start(responseListener, headers);
}
};
}
}
Loading
Loading