Skip to content
88 changes: 67 additions & 21 deletions providers/flagd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,46 @@ FlagdProvider flagdProvider = new FlagdProvider(

In the above example, in-process handlers attempt to connect to a sync service on address `localhost:8013` to obtain [flag definitions](https://github.com/open-feature/schemas/blob/main/json/flags.json).

#### Selector filtering (In-process mode only)

The `selector` option allows filtering flag configurations from flagd based on source identifiers when using the in-process resolver. This is useful when flagd is configured with multiple flag sources and you want to sync only a specific subset.

##### Usage

To use selector filtering, simply configure the `selector` option when creating the provider:

```java
FlagdProvider flagdProvider = new FlagdProvider(
FlagdOptions.builder()
.resolverType(Config.Resolver.IN_PROCESS)
.selector("source=my-app")
.build());
```

Or via environment variable:
```bash
export FLAGD_SOURCE_SELECTOR="source=my-app"
```

##### Implementation details

> [!IMPORTANT]
> **Selector normalization (flagd issue #1814)**
>
> As part of [flagd issue #1814](https://github.com/open-feature/flagd/issues/1814), the flagd project is normalizing selector handling across all services to use the `flagd-selector` gRPC metadata header.
>
> **Current implementation:**
> - The Java SDK **automatically passes the selector via the `flagd-selector` header** (preferred approach)
> - For backward compatibility with older flagd versions, the selector is **also sent in the request body**
> - Both methods work with current flagd versions
> - In a future major version of flagd, the request body selector field may be removed
>
> **No migration needed:**
>
> Users do not need to make any code changes. The SDK handles selector normalization automatically.

For more details on selector normalization, see the [flagd selector normalization issue](https://github.com/open-feature/flagd/issues/1814).

#### Sync-metadata

To support the injection of contextual data configured in flagd for in-process evaluation, the provider exposes a `getSyncMetadata` accessor which provides the most recent value returned by the [GetMetadata RPC](https://buf.build/open-feature/flagd/docs/main:flagd.sync.v1#flagd.sync.v1.FlagSyncService.GetMetadata).
Expand Down Expand Up @@ -106,30 +146,33 @@ variables.

Given below are the supported configurations:

| Option name | Environment variable name | Type & Values | Default | Compatible resolver |
|-----------------------|------------------------------------------------------------------------|--------------------------|-------------------------------|-------------------------|
| resolver | FLAGD_RESOLVER | String - rpc, in-process | rpc | |
| host | FLAGD_HOST | String | localhost | rpc & in-process |
| port | FLAGD_PORT (rpc), FLAGD_SYNC_PORT (in-process, FLAGD_PORT as fallback) | int | 8013 (rpc), 8015 (in-process) | rpc & in-process |
| targetUri | FLAGD_TARGET_URI | string | null | rpc & in-process |
| tls | FLAGD_TLS | boolean | false | rpc & in-process |
| defaultAuthority | FLAGD_DEFAULT_AUTHORITY | String | null | rpc & in-process |
| socketPath | FLAGD_SOCKET_PATH | String | null | rpc & in-process |
| certPath | FLAGD_SERVER_CERT_PATH | String | null | rpc & in-process |
| deadline | FLAGD_DEADLINE_MS | int | 500 | rpc & in-process & file |
| streamDeadlineMs | FLAGD_STREAM_DEADLINE_MS | int | 600000 | rpc & in-process |
| keepAliveTime | FLAGD_KEEP_ALIVE_TIME_MS | long | 0 | rpc & in-process |
| selector | FLAGD_SOURCE_SELECTOR | String | null | in-process |
| providerId | FLAGD_SOURCE_PROVIDER_ID | String | null | in-process |
| cache | FLAGD_CACHE | String - lru, disabled | lru | rpc |
| maxCacheSize | FLAGD_MAX_CACHE_SIZE | int | 1000 | rpc |
| maxEventStreamRetries | FLAGD_MAX_EVENT_STREAM_RETRIES | int | 5 | rpc |
| retryBackoffMs | FLAGD_RETRY_BACKOFF_MS | int | 1000 | rpc |
| offlineFlagSourcePath | FLAGD_OFFLINE_FLAG_SOURCE_PATH | String | null | file |
| offlinePollIntervalMs | FLAGD_OFFLINE_POLL_MS | int | 5000 | file |
| Option name | Environment variable name | Type & Values | Default | Compatible resolver |
| --------------------- | ---------------------------------------------------------------------- | ------------------------ | ----------------------------- | ------------------------------------------------------------------------------- |
| resolver | FLAGD_RESOLVER | String - rpc, in-process | rpc | |
| host | FLAGD_HOST | String | localhost | rpc & in-process |
| port | FLAGD_PORT (rpc), FLAGD_SYNC_PORT (in-process, FLAGD_PORT as fallback) | int | 8013 (rpc), 8015 (in-process) | rpc & in-process |
| targetUri | FLAGD_TARGET_URI | string | null | rpc & in-process |
| tls | FLAGD_TLS | boolean | false | rpc & in-process |
| defaultAuthority | FLAGD_DEFAULT_AUTHORITY | String | null | rpc & in-process |
| socketPath | FLAGD_SOCKET_PATH | String | null | rpc & in-process |
| certPath | FLAGD_SERVER_CERT_PATH | String | null | rpc & in-process |
| deadline | FLAGD_DEADLINE_MS | int | 500 | rpc & in-process & file |
| streamDeadlineMs | FLAGD_STREAM_DEADLINE_MS | int | 600000 | rpc & in-process |
| keepAliveTime | FLAGD_KEEP_ALIVE_TIME_MS | long | 0 | rpc & in-process |
| selector | FLAGD_SOURCE_SELECTOR | String | null | in-process (see [migration guidance](#selector-filtering-in-process-mode-only)) |
| providerId | FLAGD_SOURCE_PROVIDER_ID | String | null | in-process |
| cache | FLAGD_CACHE | String - lru, disabled | lru | rpc |
| maxCacheSize | FLAGD_MAX_CACHE_SIZE | int | 1000 | rpc |
| maxEventStreamRetries | FLAGD_MAX_EVENT_STREAM_RETRIES | int | 5 | rpc |
| retryBackoffMs | FLAGD_RETRY_BACKOFF_MS | int | 1000 | rpc |
| offlineFlagSourcePath | FLAGD_OFFLINE_FLAG_SOURCE_PATH | String | null | file |
| offlinePollIntervalMs | FLAGD_OFFLINE_POLL_MS | int | 5000 | file |

> [!NOTE]
> Some configurations are only applicable for RPC resolver.

> [!NOTE]
> The `selector` option automatically uses the `flagd-selector` header (the preferred approach per [flagd issue #1814](https://github.com/open-feature/flagd/issues/1814)) while maintaining backward compatibility with older flagd versions. See [Selector filtering](#selector-filtering-in-process-mode-only) for details.
>

### Unix socket support
Expand Down Expand Up @@ -189,6 +232,9 @@ FlagdProvider flagdProvider = new FlagdProvider(

The `clientInterceptors` and `defaultAuthority` are meant for connection of the in-process resolver to a Sync API implementation on a host/port, that might require special credentials or headers.

> [!NOTE]
> The SDK automatically handles the `flagd-selector` header when the `selector` option is configured. Custom interceptors are not needed for selector filtering. See [Selector filtering](#selector-filtering-in-process-mode-only) for details.

```java
private static ClientInterceptor createHeaderInterceptor() {
return new ClientInterceptor() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ public class FlagdOptions {
fallBackToEnvOrDefault(Config.STREAM_RETRY_GRACE_PERIOD, Config.DEFAULT_STREAM_RETRY_GRACE_PERIOD);
/**
* Selector to be used with flag sync gRPC contract.
*
* <p>The SDK automatically passes the selector via the {@code flagd-selector} gRPC metadata header
* (the preferred approach per <a href="https://github.com/open-feature/flagd/issues/1814">flagd issue #1814</a>).
* For backward compatibility with older flagd versions, the selector is also sent in the request body.
*
* <p>Only applicable for in-process resolver mode.
*
* @see <a href="https://github.com/open-feature/java-sdk-contrib/tree/main/providers/flagd#selector-filtering-in-process-mode-only">Selector filtering documentation</a>
**/
@Builder.Default
private String selector = fallBackToEnvOrDefault(Config.SOURCE_SELECTOR_ENV_VAR_NAME, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,14 @@
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
import dev.openfeature.contrib.providers.flagd.resolver.common.nameresolvers.EnvoyResolverProvider;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.ManagedChannel;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.NameResolverRegistry;
import io.grpc.Status.Code;
import io.grpc.netty.GrpcSslContexts;
Expand All @@ -26,6 +33,10 @@
/** gRPC channel builder helper. */
@SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "we don't care to serialize this")
public class ChannelBuilder {

private static final Metadata.Key<String> FLAGD_SELECTOR_KEY =
Metadata.Key.of("flagd-selector", Metadata.ASCII_STRING_MARSHALLER);

/**
* Controls retry (not-reconnection) policy for failed RPCs.
*/
Expand Down Expand Up @@ -94,14 +105,19 @@ public static ManagedChannel nettyChannel(final FlagdOptions options) {
if (!Epoll.isAvailable()) {
throw new IllegalStateException("unix socket cannot be used", Epoll.unavailabilityCause());
}
return NettyChannelBuilder.forAddress(new DomainSocketAddress(options.getSocketPath()))
var channelBuilder = NettyChannelBuilder.forAddress(new DomainSocketAddress(options.getSocketPath()))
.keepAliveTime(keepAliveMs, TimeUnit.MILLISECONDS)
.eventLoopGroup(new MultiThreadIoEventLoopGroup(EpollIoHandler.newFactory()))
.channelType(EpollDomainSocketChannel.class)
.usePlaintext()
.defaultServiceConfig(buildRetryPolicy(options))
.enableRetry()
.build();
.enableRetry();

// add header-based selector interceptor if selector is provided
if (options.getSelector() != null) {
channelBuilder.intercept(createSelectorInterceptor(options.getSelector()));
}
return channelBuilder.build();
}

// build a TCP socket
Expand All @@ -116,14 +132,14 @@ public static ManagedChannel nettyChannel(final FlagdOptions options) {
final String defaultTarget = String.format("%s:%s", options.getHost(), options.getPort());
final String targetUri = isValidTargetUri(options.getTargetUri()) ? options.getTargetUri() : defaultTarget;

final NettyChannelBuilder builder =
final NettyChannelBuilder channelBuilder =
NettyChannelBuilder.forTarget(targetUri).keepAliveTime(keepAliveMs, TimeUnit.MILLISECONDS);

if (options.getDefaultAuthority() != null) {
builder.overrideAuthority(options.getDefaultAuthority());
channelBuilder.overrideAuthority(options.getDefaultAuthority());
}
if (options.getClientInterceptors() != null) {
builder.intercept(options.getClientInterceptors());
channelBuilder.intercept(options.getClientInterceptors());
}
if (options.isTls()) {
SslContextBuilder sslContext = GrpcSslContexts.forClient();
Expand All @@ -135,17 +151,22 @@ public static ManagedChannel nettyChannel(final FlagdOptions options) {
}
}

builder.sslContext(sslContext.build());
channelBuilder.sslContext(sslContext.build());
} else {
builder.usePlaintext();
channelBuilder.usePlaintext();
}

// telemetry interceptor if option is provided
if (options.getOpenTelemetry() != null) {
builder.intercept(new FlagdGrpcInterceptor(options.getOpenTelemetry()));
channelBuilder.intercept(new FlagdGrpcInterceptor(options.getOpenTelemetry()));
}
// add header-based selector interceptor if selector is provided
if (options.getSelector() != null) {
channelBuilder.intercept(createSelectorInterceptor(options.getSelector()));
}
Comment thread
toddbaert marked this conversation as resolved.

return builder.defaultServiceConfig(buildRetryPolicy(options))
return channelBuilder
.defaultServiceConfig(buildRetryPolicy(options))
.enableRetry()
.build();
} catch (SSLException ssle) {
Expand All @@ -160,6 +181,30 @@ public static ManagedChannel nettyChannel(final FlagdOptions options) {
}
}

/**
* Creates a ClientInterceptor that adds the flagd-selector header to gRPC requests.
* This is the preferred approach for passing selectors as per flagd issue #1814.
*
* @param selector the selector value to pass in the header
* @return a ClientInterceptor that adds the flagd-selector header
*/
private static ClientInterceptor createSelectorInterceptor(String selector) {
return new ClientInterceptor() {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method, CallOptions callOptions, Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(
next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
headers.put(FLAGD_SELECTOR_KEY, selector);
super.start(responseListener, headers);
}
};
}
};
}

private static boolean isValidTargetUri(String targetUri) {
if (targetUri == null) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
justification = "We need to expose the BlockingQueue to allow consumers to read from it")
public class SyncStreamQueueSource implements QueueSource {
private static final int QUEUE_SIZE = 5;

private final AtomicBoolean shutdown = new AtomicBoolean(false);
private final AtomicBoolean shouldThrottle = new AtomicBoolean(false);
private final int streamDeadline;
Expand Down Expand Up @@ -253,6 +252,8 @@ private void syncFlags(SyncStreamObserver streamObserver) {
}

final SyncFlagsRequest.Builder syncRequest = SyncFlagsRequest.newBuilder();
// Selector is now passed via header using ClientInterceptor (see constructor)
// Keeping this for backward compatibility with older flagd versions
if (this.selector != null) {
syncRequest.setSelector(this.selector);
}
Expand Down
Loading