diff --git a/providers/flagd/README.md b/providers/flagd/README.md index f2d680c2c..5194e3a87 100644 --- a/providers/flagd/README.md +++ b/providers/flagd/README.md @@ -47,6 +47,46 @@ FlagdProvider flagdProvider = new FlagdProvider( In the above example, in-process handlers attempt to connect to a sync service on address `localhost:8013` to obtain [flag definitions](https://github.com/open-feature/schemas/blob/main/json/flags.json). +#### Selector filtering (In-process mode only) + +The `selector` option allows filtering flag configurations from flagd based on source identifiers when using the in-process resolver. This is useful when flagd is configured with multiple flag sources and you want to sync only a specific subset. + +##### Usage + +To use selector filtering, simply configure the `selector` option when creating the provider: + +```java +FlagdProvider flagdProvider = new FlagdProvider( + FlagdOptions.builder() + .resolverType(Config.Resolver.IN_PROCESS) + .selector("source=my-app") + .build()); +``` + +Or via environment variable: +```bash +export FLAGD_SOURCE_SELECTOR="source=my-app" +``` + +##### Implementation details + +> [!IMPORTANT] +> **Selector normalization (flagd issue #1814)** +> +> As part of [flagd issue #1814](https://github.com/open-feature/flagd/issues/1814), the flagd project is normalizing selector handling across all services to use the `flagd-selector` gRPC metadata header. +> +> **Current implementation:** +> - The Java SDK **automatically passes the selector via the `flagd-selector` header** (preferred approach) +> - For backward compatibility with older flagd versions, the selector is **also sent in the request body** +> - Both methods work with current flagd versions +> - In a future major version of flagd, the request body selector field may be removed +> +> **No migration needed:** +> +> Users do not need to make any code changes. The SDK handles selector normalization automatically. + +For more details on selector normalization, see the [flagd selector normalization issue](https://github.com/open-feature/flagd/issues/1814). + #### Sync-metadata To support the injection of contextual data configured in flagd for in-process evaluation, the provider exposes a `getSyncMetadata` accessor which provides the most recent value returned by the [GetMetadata RPC](https://buf.build/open-feature/flagd/docs/main:flagd.sync.v1#flagd.sync.v1.FlagSyncService.GetMetadata). @@ -106,30 +146,33 @@ variables. Given below are the supported configurations: -| Option name | Environment variable name | Type & Values | Default | Compatible resolver | -|-----------------------|------------------------------------------------------------------------|--------------------------|-------------------------------|-------------------------| -| resolver | FLAGD_RESOLVER | String - rpc, in-process | rpc | | -| host | FLAGD_HOST | String | localhost | rpc & in-process | -| port | FLAGD_PORT (rpc), FLAGD_SYNC_PORT (in-process, FLAGD_PORT as fallback) | int | 8013 (rpc), 8015 (in-process) | rpc & in-process | -| targetUri | FLAGD_TARGET_URI | string | null | rpc & in-process | -| tls | FLAGD_TLS | boolean | false | rpc & in-process | -| defaultAuthority | FLAGD_DEFAULT_AUTHORITY | String | null | rpc & in-process | -| socketPath | FLAGD_SOCKET_PATH | String | null | rpc & in-process | -| certPath | FLAGD_SERVER_CERT_PATH | String | null | rpc & in-process | -| deadline | FLAGD_DEADLINE_MS | int | 500 | rpc & in-process & file | -| streamDeadlineMs | FLAGD_STREAM_DEADLINE_MS | int | 600000 | rpc & in-process | -| keepAliveTime | FLAGD_KEEP_ALIVE_TIME_MS | long | 0 | rpc & in-process | -| selector | FLAGD_SOURCE_SELECTOR | String | null | in-process | -| providerId | FLAGD_SOURCE_PROVIDER_ID | String | null | in-process | -| cache | FLAGD_CACHE | String - lru, disabled | lru | rpc | -| maxCacheSize | FLAGD_MAX_CACHE_SIZE | int | 1000 | rpc | -| maxEventStreamRetries | FLAGD_MAX_EVENT_STREAM_RETRIES | int | 5 | rpc | -| retryBackoffMs | FLAGD_RETRY_BACKOFF_MS | int | 1000 | rpc | -| offlineFlagSourcePath | FLAGD_OFFLINE_FLAG_SOURCE_PATH | String | null | file | -| offlinePollIntervalMs | FLAGD_OFFLINE_POLL_MS | int | 5000 | file | +| Option name | Environment variable name | Type & Values | Default | Compatible resolver | +| --------------------- | ---------------------------------------------------------------------- | ------------------------ | ----------------------------- | ------------------------------------------------------------------------------- | +| resolver | FLAGD_RESOLVER | String - rpc, in-process | rpc | | +| host | FLAGD_HOST | String | localhost | rpc & in-process | +| port | FLAGD_PORT (rpc), FLAGD_SYNC_PORT (in-process, FLAGD_PORT as fallback) | int | 8013 (rpc), 8015 (in-process) | rpc & in-process | +| targetUri | FLAGD_TARGET_URI | string | null | rpc & in-process | +| tls | FLAGD_TLS | boolean | false | rpc & in-process | +| defaultAuthority | FLAGD_DEFAULT_AUTHORITY | String | null | rpc & in-process | +| socketPath | FLAGD_SOCKET_PATH | String | null | rpc & in-process | +| certPath | FLAGD_SERVER_CERT_PATH | String | null | rpc & in-process | +| deadline | FLAGD_DEADLINE_MS | int | 500 | rpc & in-process & file | +| streamDeadlineMs | FLAGD_STREAM_DEADLINE_MS | int | 600000 | rpc & in-process | +| keepAliveTime | FLAGD_KEEP_ALIVE_TIME_MS | long | 0 | rpc & in-process | +| selector | FLAGD_SOURCE_SELECTOR | String | null | in-process (see [migration guidance](#selector-filtering-in-process-mode-only)) | +| providerId | FLAGD_SOURCE_PROVIDER_ID | String | null | in-process | +| cache | FLAGD_CACHE | String - lru, disabled | lru | rpc | +| maxCacheSize | FLAGD_MAX_CACHE_SIZE | int | 1000 | rpc | +| maxEventStreamRetries | FLAGD_MAX_EVENT_STREAM_RETRIES | int | 5 | rpc | +| retryBackoffMs | FLAGD_RETRY_BACKOFF_MS | int | 1000 | rpc | +| offlineFlagSourcePath | FLAGD_OFFLINE_FLAG_SOURCE_PATH | String | null | file | +| offlinePollIntervalMs | FLAGD_OFFLINE_POLL_MS | int | 5000 | file | > [!NOTE] > Some configurations are only applicable for RPC resolver. + +> [!NOTE] +> The `selector` option automatically uses the `flagd-selector` header (the preferred approach per [flagd issue #1814](https://github.com/open-feature/flagd/issues/1814)) while maintaining backward compatibility with older flagd versions. See [Selector filtering](#selector-filtering-in-process-mode-only) for details. > ### Unix socket support @@ -189,6 +232,9 @@ FlagdProvider flagdProvider = new FlagdProvider( The `clientInterceptors` and `defaultAuthority` are meant for connection of the in-process resolver to a Sync API implementation on a host/port, that might require special credentials or headers. +> [!NOTE] +> The SDK automatically handles the `flagd-selector` header when the `selector` option is configured. Custom interceptors are not needed for selector filtering. See [Selector filtering](#selector-filtering-in-process-mode-only) for details. + ```java private static ClientInterceptor createHeaderInterceptor() { return new ClientInterceptor() { diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java index 4cda34df4..0eebe16c9 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java @@ -124,6 +124,14 @@ public class FlagdOptions { fallBackToEnvOrDefault(Config.STREAM_RETRY_GRACE_PERIOD, Config.DEFAULT_STREAM_RETRY_GRACE_PERIOD); /** * Selector to be used with flag sync gRPC contract. + * + *

The SDK automatically passes the selector via the {@code flagd-selector} gRPC metadata header + * (the preferred approach per flagd issue #1814). + * For backward compatibility with older flagd versions, the selector is also sent in the request body. + * + *

Only applicable for in-process resolver mode. + * + * @see Selector filtering documentation **/ @Builder.Default private String selector = fallBackToEnvOrDefault(Config.SOURCE_SELECTOR_ENV_VAR_NAME, null); diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelBuilder.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelBuilder.java index 39f33af27..72cda613b 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelBuilder.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelBuilder.java @@ -3,7 +3,14 @@ import dev.openfeature.contrib.providers.flagd.FlagdOptions; import dev.openfeature.contrib.providers.flagd.resolver.common.nameresolvers.EnvoyResolverProvider; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import io.grpc.CallOptions; +import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ForwardingClientCall; import io.grpc.ManagedChannel; +import io.grpc.Metadata; +import io.grpc.MethodDescriptor; import io.grpc.NameResolverRegistry; import io.grpc.Status.Code; import io.grpc.netty.GrpcSslContexts; @@ -26,6 +33,10 @@ /** gRPC channel builder helper. */ @SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "we don't care to serialize this") public class ChannelBuilder { + + private static final Metadata.Key FLAGD_SELECTOR_KEY = + Metadata.Key.of("flagd-selector", Metadata.ASCII_STRING_MARSHALLER); + /** * Controls retry (not-reconnection) policy for failed RPCs. */ @@ -94,14 +105,19 @@ public static ManagedChannel nettyChannel(final FlagdOptions options) { if (!Epoll.isAvailable()) { throw new IllegalStateException("unix socket cannot be used", Epoll.unavailabilityCause()); } - return NettyChannelBuilder.forAddress(new DomainSocketAddress(options.getSocketPath())) + var channelBuilder = NettyChannelBuilder.forAddress(new DomainSocketAddress(options.getSocketPath())) .keepAliveTime(keepAliveMs, TimeUnit.MILLISECONDS) .eventLoopGroup(new MultiThreadIoEventLoopGroup(EpollIoHandler.newFactory())) .channelType(EpollDomainSocketChannel.class) .usePlaintext() .defaultServiceConfig(buildRetryPolicy(options)) - .enableRetry() - .build(); + .enableRetry(); + + // add header-based selector interceptor if selector is provided + if (options.getSelector() != null) { + channelBuilder.intercept(createSelectorInterceptor(options.getSelector())); + } + return channelBuilder.build(); } // build a TCP socket @@ -116,14 +132,14 @@ public static ManagedChannel nettyChannel(final FlagdOptions options) { final String defaultTarget = String.format("%s:%s", options.getHost(), options.getPort()); final String targetUri = isValidTargetUri(options.getTargetUri()) ? options.getTargetUri() : defaultTarget; - final NettyChannelBuilder builder = + final NettyChannelBuilder channelBuilder = NettyChannelBuilder.forTarget(targetUri).keepAliveTime(keepAliveMs, TimeUnit.MILLISECONDS); if (options.getDefaultAuthority() != null) { - builder.overrideAuthority(options.getDefaultAuthority()); + channelBuilder.overrideAuthority(options.getDefaultAuthority()); } if (options.getClientInterceptors() != null) { - builder.intercept(options.getClientInterceptors()); + channelBuilder.intercept(options.getClientInterceptors()); } if (options.isTls()) { SslContextBuilder sslContext = GrpcSslContexts.forClient(); @@ -135,17 +151,22 @@ public static ManagedChannel nettyChannel(final FlagdOptions options) { } } - builder.sslContext(sslContext.build()); + channelBuilder.sslContext(sslContext.build()); } else { - builder.usePlaintext(); + channelBuilder.usePlaintext(); } // telemetry interceptor if option is provided if (options.getOpenTelemetry() != null) { - builder.intercept(new FlagdGrpcInterceptor(options.getOpenTelemetry())); + channelBuilder.intercept(new FlagdGrpcInterceptor(options.getOpenTelemetry())); + } + // add header-based selector interceptor if selector is provided + if (options.getSelector() != null) { + channelBuilder.intercept(createSelectorInterceptor(options.getSelector())); } - return builder.defaultServiceConfig(buildRetryPolicy(options)) + return channelBuilder + .defaultServiceConfig(buildRetryPolicy(options)) .enableRetry() .build(); } catch (SSLException ssle) { @@ -160,6 +181,30 @@ public static ManagedChannel nettyChannel(final FlagdOptions options) { } } + /** + * Creates a ClientInterceptor that adds the flagd-selector header to gRPC requests. + * This is the preferred approach for passing selectors as per flagd issue #1814. + * + * @param selector the selector value to pass in the header + * @return a ClientInterceptor that adds the flagd-selector header + */ + private static ClientInterceptor createSelectorInterceptor(String selector) { + return new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + return new ForwardingClientCall.SimpleForwardingClientCall( + next.newCall(method, callOptions)) { + @Override + public void start(Listener responseListener, Metadata headers) { + headers.put(FLAGD_SELECTOR_KEY, selector); + super.start(responseListener, headers); + } + }; + } + }; + } + private static boolean isValidTargetUri(String targetUri) { if (targetUri == null) { return false; diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java index 3c1058566..1e2e043d7 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/sync/SyncStreamQueueSource.java @@ -35,7 +35,6 @@ justification = "We need to expose the BlockingQueue to allow consumers to read from it") public class SyncStreamQueueSource implements QueueSource { private static final int QUEUE_SIZE = 5; - private final AtomicBoolean shutdown = new AtomicBoolean(false); private final AtomicBoolean shouldThrottle = new AtomicBoolean(false); private final int streamDeadline; @@ -253,6 +252,8 @@ private void syncFlags(SyncStreamObserver streamObserver) { } final SyncFlagsRequest.Builder syncRequest = SyncFlagsRequest.newBuilder(); + // Selector is now passed via header using ClientInterceptor (see constructor) + // Keeping this for backward compatibility with older flagd versions if (this.selector != null) { syncRequest.setSelector(this.selector); } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderSyncResourcesTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderSyncResourcesTest.java index c7d9672ef..fd7f55111 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderSyncResourcesTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/FlagdProviderSyncResourcesTest.java @@ -100,11 +100,13 @@ void callingInitialize_wakesUpWaitingThread() throws InterruptedException { waitingThread.join(); + var wait = MAX_TIME_TOLERANCE * 3; + Assertions.assertTrue( - waitTime.get() < MAX_TIME_TOLERANCE * 2, + waitTime.get() < wait, () -> "Wakeup should be almost instant, but took " + waitTime.get() + " ms, which is more than the max of" - + (MAX_TIME_TOLERANCE * 2) + " ms"); + + wait + " ms"); } @Timeout(2)