diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index b6016f04f78..31804cf2808 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -56,6 +56,7 @@ import com.google.api.pathtemplate.PathTemplate; import com.google.cloud.RetryHelper; import com.google.cloud.RetryHelper.RetryHelperException; +import com.google.cloud.grpc.GcpManagedChannel; import com.google.cloud.grpc.GcpManagedChannelBuilder; import com.google.cloud.grpc.GcpManagedChannelOptions; import com.google.cloud.grpc.GcpManagedChannelOptions.GcpMetricsOptions; @@ -266,6 +267,8 @@ public class GapicSpannerRpc implements SpannerRpc { private static final ConcurrentMap ADMINISTRATIVE_REQUESTS_RATE_LIMITERS = new ConcurrentHashMap<>(); private final boolean leaderAwareRoutingEnabled; + private final int numChannels; + private final boolean isGrpcGcpExtensionEnabled; public static GapicSpannerRpc create(SpannerOptions options) { return new GapicSpannerRpc(options); @@ -317,6 +320,8 @@ public GapicSpannerRpc(final SpannerOptions options) { this.callCredentialsProvider = options.getCallCredentialsProvider(); this.compressorName = options.getCompressorName(); this.leaderAwareRoutingEnabled = options.isLeaderAwareRoutingEnabled(); + this.numChannels = options.getNumChannels(); + this.isGrpcGcpExtensionEnabled = options.isGrpcGcpExtensionEnabled(); if (initializeStubs) { // First check if SpannerOptions provides a TransportChannelProvider. Create one @@ -1950,7 +1955,20 @@ GrpcCallContext newCallContext( boolean routeToLeader) { GrpcCallContext context = GrpcCallContext.createDefault(); if (options != null) { - context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()); + if (this.isGrpcGcpExtensionEnabled) { + // Set channel affinity in gRPC-GCP. + // Compute bounded channel hint to prevent gRPC-GCP affinity map from getting unbounded. + int boundedChannelHint = Option.CHANNEL_HINT.getLong(options).intValue() % this.numChannels; + context = + context.withCallOptions( + context + .getCallOptions() + .withOption( + GcpManagedChannel.AFFINITY_KEY, String.valueOf(boundedChannelHint))); + } else { + // Set channel affinity in GAX. + context = context.withChannelAffinity(Option.CHANNEL_HINT.getLong(options).intValue()); + } } if (compressorName != null) { // This sets the compressor for Client -> Server.