diff --git a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java index 871a317f832..45062f28f1b 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterImplLoadBalancer.java @@ -76,6 +76,8 @@ final class ClusterImplLoadBalancer extends LoadBalancer { private static final Attributes.Key ATTR_CLUSTER_LOCALITY_STATS = Attributes.Key.create("io.grpc.xds.ClusterImplLoadBalancer.clusterLocalityStats"); + private static final Attributes.Key ATTR_CLUSTER_LOCALITY_NAME = + Attributes.Key.create("io.grpc.xds.ClusterImplLoadBalancer.clusterLocalityName"); private final XdsLogger logger; private final Helper helper; @@ -209,11 +211,14 @@ public Subchannel createSubchannel(CreateSubchannelArgs args) { List addresses = withAdditionalAttributes(args.getAddresses()); Locality locality = args.getAddresses().get(0).getAttributes().get( InternalXdsAttributes.ATTR_LOCALITY); // all addresses should be in the same locality + String localityName = args.getAddresses().get(0).getAttributes().get( + InternalXdsAttributes.ATTR_LOCALITY_NAME); // Endpoint addresses resolved by ClusterResolverLoadBalancer should always contain // attributes with its locality, including endpoints in LOGICAL_DNS clusters. // In case of not (which really shouldn't), loads are aggregated under an empty locality. if (locality == null) { locality = Locality.create("", "", ""); + localityName = ""; } final ClusterLocalityStats localityStats = (lrsServerInfo == null) @@ -221,8 +226,10 @@ public Subchannel createSubchannel(CreateSubchannelArgs args) { : xdsClient.addClusterLocalityStats(lrsServerInfo, cluster, edsServiceName, locality); - Attributes attrs = args.getAttributes().toBuilder().set( - ATTR_CLUSTER_LOCALITY_STATS, localityStats).build(); + Attributes attrs = args.getAttributes().toBuilder() + .set(ATTR_CLUSTER_LOCALITY_STATS, localityStats) + .set(ATTR_CLUSTER_LOCALITY_NAME, localityName) + .build(); args = args.toBuilder().setAddresses(addresses).setAttributes(attrs).build(); final Subchannel subchannel = delegate().createSubchannel(args); @@ -344,6 +351,10 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { final ClusterLocalityStats stats = result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY_STATS); if (stats != null) { + String localityName = + result.getSubchannel().getAttributes().get(ATTR_CLUSTER_LOCALITY_NAME); + args.getPickDetailsConsumer().addOptionalLabel("grpc.lb.locality", localityName); + ClientStreamTracer.Factory tracerFactory = new CountingStreamTracerFactory( stats, inFlights, result.getStreamTracerFactory()); ClientStreamTracer.Factory orcaTracerFactory = OrcaPerRequestUtil.getInstance() diff --git a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java index 3ba08a23fbc..88162878469 100644 --- a/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/ClusterResolverLoadBalancer.java @@ -412,17 +412,18 @@ public void run() { if (endpoint.loadBalancingWeight() != 0) { weight *= endpoint.loadBalancingWeight(); } + String localityName = localityName(locality); Attributes attr = endpoint.eag().getAttributes().toBuilder() .set(InternalXdsAttributes.ATTR_LOCALITY, locality) + .set(InternalXdsAttributes.ATTR_LOCALITY_NAME, localityName) .set(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT, localityLbInfo.localityWeight()) .set(InternalXdsAttributes.ATTR_SERVER_WEIGHT, weight) .build(); EquivalentAddressGroup eag = new EquivalentAddressGroup( endpoint.eag().getAddresses(), attr); - eag = AddressFilter.setPathFilter( - eag, Arrays.asList(priorityName, localityName(locality))); + eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName)); addresses.add(eag); } } @@ -612,11 +613,13 @@ public void run() { for (EquivalentAddressGroup eag : resolutionResult.getAddresses()) { // No weight attribute is attached, all endpoint-level LB policy should be able // to handle such it. - Attributes attr = eag.getAttributes().toBuilder().set( - InternalXdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY).build(); + String localityName = localityName(LOGICAL_DNS_CLUSTER_LOCALITY); + Attributes attr = eag.getAttributes().toBuilder() + .set(InternalXdsAttributes.ATTR_LOCALITY, LOGICAL_DNS_CLUSTER_LOCALITY) + .set(InternalXdsAttributes.ATTR_LOCALITY_NAME, localityName) + .build(); eag = new EquivalentAddressGroup(eag.getAddresses(), attr); - eag = AddressFilter.setPathFilter( - eag, Arrays.asList(priorityName, LOGICAL_DNS_CLUSTER_LOCALITY.toString())); + eag = AddressFilter.setPathFilter(eag, Arrays.asList(priorityName, localityName)); addresses.add(eag); } PriorityChildConfig priorityChildConfig = generateDnsBasedPriorityChildConfig( @@ -844,6 +847,9 @@ private static String priorityName(String cluster, int priority) { * across all localities in all clusters. */ private static String localityName(Locality locality) { - return locality.toString(); + return "{region=\"" + locality.region() + + "\", zone=\"" + locality.zone() + + "\", sub_zone=\"" + locality.subZone() + + "\"}"; } } diff --git a/xds/src/main/java/io/grpc/xds/InternalXdsAttributes.java b/xds/src/main/java/io/grpc/xds/InternalXdsAttributes.java index 1497eff048a..aaaeb198d21 100644 --- a/xds/src/main/java/io/grpc/xds/InternalXdsAttributes.java +++ b/xds/src/main/java/io/grpc/xds/InternalXdsAttributes.java @@ -77,6 +77,13 @@ public final class InternalXdsAttributes { static final Attributes.Key ATTR_LOCALITY = Attributes.Key.create("io.grpc.xds.InternalXdsAttributes.locality"); + /** + * The name of the locality that this EquivalentAddressGroup is in. + */ + @EquivalentAddressGroup.Attr + static final Attributes.Key ATTR_LOCALITY_NAME = + Attributes.Key.create("io.grpc.xds.InternalXdsAttributes.localityName"); + /** * Endpoint weight for load balancing purposes. */ diff --git a/xds/src/main/java/io/grpc/xds/WeightedTargetLoadBalancer.java b/xds/src/main/java/io/grpc/xds/WeightedTargetLoadBalancer.java index f108ac899c1..9cf2c73fded 100644 --- a/xds/src/main/java/io/grpc/xds/WeightedTargetLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/WeightedTargetLoadBalancer.java @@ -23,6 +23,7 @@ import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import com.google.common.collect.ImmutableMap; +import io.grpc.Attributes; import io.grpc.ConnectivityState; import io.grpc.InternalLogId; import io.grpc.LoadBalancer; @@ -42,6 +43,8 @@ /** Load balancer for weighted_target policy. */ final class WeightedTargetLoadBalancer extends LoadBalancer { + public static final Attributes.Key CHILD_NAME = + Attributes.Key.create("io.grpc.xds.WeightedTargetLoadBalancer.CHILD_NAME"); private final XdsLogger logger; private final Map childBalancers = new HashMap<>(); @@ -95,6 +98,9 @@ public Status acceptResolvedAddressesInternal(ResolvedAddresses resolvedAddresse resolvedAddresses.toBuilder() .setAddresses(AddressFilter.filter(resolvedAddresses.getAddresses(), targetName)) .setLoadBalancingPolicyConfig(targets.get(targetName).policySelection.getConfig()) + .setAttributes(resolvedAddresses.getAttributes().toBuilder() + .set(CHILD_NAME, targetName) + .build()) .build()); } diff --git a/xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancer.java b/xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancer.java index e7463cd3710..bfb9db4fd24 100644 --- a/xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/WrrLocalityLoadBalancer.java @@ -31,7 +31,6 @@ import io.grpc.util.GracefulSwitchLoadBalancer; import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedPolicySelection; import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedTargetConfig; -import io.grpc.xds.client.Locality; import io.grpc.xds.client.XdsLogger; import io.grpc.xds.client.XdsLogger.XdsLogLevel; import java.util.HashMap; @@ -73,10 +72,10 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { = (WrrLocalityConfig) resolvedAddresses.getLoadBalancingPolicyConfig(); // A map of locality weights is built up from the locality weight attributes in each address. - Map localityWeights = new HashMap<>(); + Map localityWeights = new HashMap<>(); for (EquivalentAddressGroup eag : resolvedAddresses.getAddresses()) { Attributes eagAttrs = eag.getAttributes(); - Locality locality = eagAttrs.get(InternalXdsAttributes.ATTR_LOCALITY); + String locality = eagAttrs.get(InternalXdsAttributes.ATTR_LOCALITY_NAME); Integer localityWeight = eagAttrs.get(InternalXdsAttributes.ATTR_LOCALITY_WEIGHT); if (locality == null) { @@ -106,8 +105,8 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { // Weighted target LB expects a WeightedPolicySelection for each locality as it will create a // child LB for each. Map weightedPolicySelections = new HashMap<>(); - for (Locality locality : localityWeights.keySet()) { - weightedPolicySelections.put(locality.toString(), + for (String locality : localityWeights.keySet()) { + weightedPolicySelections.put(locality, new WeightedPolicySelection(localityWeights.get(locality), wrrLocalityConfig.childPolicy)); } diff --git a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java index 662430bef52..fa08d5eddea 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterImplLoadBalancerTest.java @@ -19,12 +19,14 @@ import static com.google.common.truth.Truth.assertThat; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.github.xds.data.orca.v3.OrcaLoadReport; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import io.grpc.Attributes; +import io.grpc.CallOptions; import io.grpc.ClientStreamTracer; import io.grpc.ConnectivityState; import io.grpc.EquivalentAddressGroup; @@ -32,6 +34,7 @@ import io.grpc.LoadBalancer; import io.grpc.LoadBalancer.CreateSubchannelArgs; import io.grpc.LoadBalancer.Helper; +import io.grpc.LoadBalancer.PickDetailsConsumer; import io.grpc.LoadBalancer.PickResult; import io.grpc.LoadBalancer.PickSubchannelArgs; import io.grpc.LoadBalancer.ResolvedAddresses; @@ -45,8 +48,10 @@ import io.grpc.SynchronizationContext; import io.grpc.internal.FakeClock; import io.grpc.internal.ObjectPool; +import io.grpc.internal.PickSubchannelArgsImpl; import io.grpc.internal.ServiceConfigUtil.PolicySelection; import io.grpc.protobuf.ProtoUtils; +import io.grpc.testing.TestMethodDescriptors; import io.grpc.xds.ClusterImplLoadBalancerProvider.ClusterImplConfig; import io.grpc.xds.Endpoints.DropOverload; import io.grpc.xds.EnvoyServerProtoData.DownstreamTlsContext; @@ -141,6 +146,9 @@ public AtomicLong getOrCreate(String cluster, @Nullable String edsServiceName) { } }; private final Helper helper = new FakeLbHelper(); + private PickSubchannelArgs pickSubchannelArgs = new PickSubchannelArgsImpl( + TestMethodDescriptors.voidMethod(), new Metadata(), CallOptions.DEFAULT, + new PickDetailsConsumer() {}); @Mock private ThreadSafeRandom mockRandom; private int xdsClientRefs; @@ -218,7 +226,7 @@ public void handleResolvedAddresses_childPolicyChanges() { public void nameResolutionError_beforeChildPolicyInstantiated_returnErrorPickerToUpstream() { loadBalancer.handleNameResolutionError(Status.UNIMPLEMENTED.withDescription("not found")); assertThat(currentState).isEqualTo(ConnectivityState.TRANSIENT_FAILURE); - PickResult result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class)); + PickResult result = currentPicker.pickSubchannel(pickSubchannelArgs); assertThat(result.getStatus().isOk()).isFalse(); assertThat(result.getStatus().getCode()).isEqualTo(Code.UNIMPLEMENTED); assertThat(result.getStatus().getDescription()).isEqualTo("not found"); @@ -243,6 +251,32 @@ public void nameResolutionError_afterChildPolicyInstantiated_propagateToDownstre .isEqualTo("cannot reach server"); } + @Test + public void pick_addsLocalityLabel() { + LoadBalancerProvider weightedTargetProvider = new WeightedTargetLoadBalancerProvider(); + WeightedTargetConfig weightedTargetConfig = + buildWeightedTargetConfig(ImmutableMap.of(locality, 10)); + ClusterImplConfig config = new ClusterImplConfig(CLUSTER, EDS_SERVICE_NAME, LRS_SERVER_INFO, + null, Collections.emptyList(), + new PolicySelection(weightedTargetProvider, weightedTargetConfig), null); + EquivalentAddressGroup endpoint = makeAddress("endpoint-addr", locality); + deliverAddressesAndConfig(Collections.singletonList(endpoint), config); + FakeLoadBalancer leafBalancer = Iterables.getOnlyElement(downstreamBalancers); + Subchannel subchannel = leafBalancer.helper.createSubchannel( + CreateSubchannelArgs.newBuilder().setAddresses(leafBalancer.addresses).build()); + leafBalancer.deliverSubchannelState(subchannel, ConnectivityState.READY); + assertThat(currentState).isEqualTo(ConnectivityState.READY); + + PickDetailsConsumer detailsConsumer = mock(PickDetailsConsumer.class); + pickSubchannelArgs = new PickSubchannelArgsImpl( + TestMethodDescriptors.voidMethod(), new Metadata(), CallOptions.DEFAULT, detailsConsumer); + PickResult result = currentPicker.pickSubchannel(pickSubchannelArgs); + assertThat(result.getStatus().isOk()).isTrue(); + // The value will be determined by the parent policy, so can be different than the value used in + // makeAddress() for the test. + verify(detailsConsumer).addOptionalLabel("grpc.lb.locality", locality.toString()); + } + @Test public void recordLoadStats() { LoadBalancerProvider weightedTargetProvider = new WeightedTargetLoadBalancerProvider(); @@ -258,7 +292,7 @@ public void recordLoadStats() { CreateSubchannelArgs.newBuilder().setAddresses(leafBalancer.addresses).build()); leafBalancer.deliverSubchannelState(subchannel, ConnectivityState.READY); assertThat(currentState).isEqualTo(ConnectivityState.READY); - PickResult result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class)); + PickResult result = currentPicker.pickSubchannel(pickSubchannelArgs); assertThat(result.getStatus().isOk()).isTrue(); ClientStreamTracer streamTracer1 = result.getStreamTracerFactory().newClientStreamTracer( ClientStreamTracer.StreamInfo.newBuilder().build(), new Metadata()); // first RPC call @@ -347,7 +381,7 @@ public void dropRpcsWithRespectToLbConfigDropCategories() { CreateSubchannelArgs.newBuilder().setAddresses(leafBalancer.addresses).build()); leafBalancer.deliverSubchannelState(subchannel, ConnectivityState.READY); assertThat(currentState).isEqualTo(ConnectivityState.READY); - PickResult result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class)); + PickResult result = currentPicker.pickSubchannel(pickSubchannelArgs); assertThat(result.getStatus().isOk()).isFalse(); assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); assertThat(result.getStatus().getDescription()).isEqualTo("Dropped: throttle"); @@ -373,7 +407,7 @@ public void dropRpcsWithRespectToLbConfigDropCategories() { .build()) .setLoadBalancingPolicyConfig(config) .build()); - result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class)); + result = currentPicker.pickSubchannel(pickSubchannelArgs); assertThat(result.getStatus().isOk()).isFalse(); assertThat(result.getStatus().getCode()).isEqualTo(Code.UNAVAILABLE); assertThat(result.getStatus().getDescription()).isEqualTo("Dropped: lb"); @@ -386,7 +420,7 @@ public void dropRpcsWithRespectToLbConfigDropCategories() { .isEqualTo(1L); assertThat(clusterStats.totalDroppedRequests()).isEqualTo(1L); - result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class)); + result = currentPicker.pickSubchannel(pickSubchannelArgs); assertThat(result.getStatus().isOk()).isTrue(); } @@ -423,7 +457,7 @@ private void subtest_maxConcurrentRequests_appliedByLbConfig(boolean enableCircu leafBalancer.deliverSubchannelState(subchannel, ConnectivityState.READY); assertThat(currentState).isEqualTo(ConnectivityState.READY); for (int i = 0; i < maxConcurrentRequests; i++) { - PickResult result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class)); + PickResult result = currentPicker.pickSubchannel(pickSubchannelArgs); assertThat(result.getStatus().isOk()).isTrue(); ClientStreamTracer.Factory streamTracerFactory = result.getStreamTracerFactory(); streamTracerFactory.newClientStreamTracer( @@ -434,7 +468,7 @@ private void subtest_maxConcurrentRequests_appliedByLbConfig(boolean enableCircu assertThat(clusterStats.clusterServiceName()).isEqualTo(EDS_SERVICE_NAME); assertThat(clusterStats.totalDroppedRequests()).isEqualTo(0L); - PickResult result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class)); + PickResult result = currentPicker.pickSubchannel(pickSubchannelArgs); clusterStats = Iterables.getOnlyElement(loadStatsManager.getClusterStatsReports(CLUSTER)); assertThat(clusterStats.clusterServiceName()).isEqualTo(EDS_SERVICE_NAME); if (enableCircuitBreaking) { @@ -455,7 +489,7 @@ private void subtest_maxConcurrentRequests_appliedByLbConfig(boolean enableCircu new PolicySelection(weightedTargetProvider, weightedTargetConfig), null); deliverAddressesAndConfig(Collections.singletonList(endpoint), config); - result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class)); + result = currentPicker.pickSubchannel(pickSubchannelArgs); assertThat(result.getStatus().isOk()).isTrue(); result.getStreamTracerFactory().newClientStreamTracer( ClientStreamTracer.StreamInfo.newBuilder().build(), new Metadata()); // 101th request @@ -463,7 +497,7 @@ private void subtest_maxConcurrentRequests_appliedByLbConfig(boolean enableCircu assertThat(clusterStats.clusterServiceName()).isEqualTo(EDS_SERVICE_NAME); assertThat(clusterStats.totalDroppedRequests()).isEqualTo(0L); - result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class)); // 102th request + result = currentPicker.pickSubchannel(pickSubchannelArgs); // 102th request clusterStats = Iterables.getOnlyElement(loadStatsManager.getClusterStatsReports(CLUSTER)); assertThat(clusterStats.clusterServiceName()).isEqualTo(EDS_SERVICE_NAME); if (enableCircuitBreaking) { @@ -511,7 +545,7 @@ private void subtest_maxConcurrentRequests_appliedWithDefaultValue( leafBalancer.deliverSubchannelState(subchannel, ConnectivityState.READY); assertThat(currentState).isEqualTo(ConnectivityState.READY); for (int i = 0; i < ClusterImplLoadBalancer.DEFAULT_PER_CLUSTER_MAX_CONCURRENT_REQUESTS; i++) { - PickResult result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class)); + PickResult result = currentPicker.pickSubchannel(pickSubchannelArgs); assertThat(result.getStatus().isOk()).isTrue(); ClientStreamTracer.Factory streamTracerFactory = result.getStreamTracerFactory(); streamTracerFactory.newClientStreamTracer( @@ -522,7 +556,7 @@ private void subtest_maxConcurrentRequests_appliedWithDefaultValue( assertThat(clusterStats.clusterServiceName()).isEqualTo(EDS_SERVICE_NAME); assertThat(clusterStats.totalDroppedRequests()).isEqualTo(0L); - PickResult result = currentPicker.pickSubchannel(mock(PickSubchannelArgs.class)); + PickResult result = currentPicker.pickSubchannel(pickSubchannelArgs); clusterStats = Iterables.getOnlyElement(loadStatsManager.getClusterStatsReports(CLUSTER)); assertThat(clusterStats.clusterServiceName()).isEqualTo(EDS_SERVICE_NAME); if (enableCircuitBreaking) { @@ -697,7 +731,11 @@ public String toString() { } EquivalentAddressGroup eag = new EquivalentAddressGroup(new FakeSocketAddress(name), - Attributes.newBuilder().set(InternalXdsAttributes.ATTR_LOCALITY, locality).build()); + Attributes.newBuilder() + .set(InternalXdsAttributes.ATTR_LOCALITY, locality) + // Unique but arbitrary string + .set(InternalXdsAttributes.ATTR_LOCALITY_NAME, locality.toString()) + .build()); return AddressFilter.setPathFilter(eag, Collections.singletonList(locality.toString())); } diff --git a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java index ddc7ef56d90..99b8605b4e9 100644 --- a/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/ClusterResolverLoadBalancerTest.java @@ -21,6 +21,7 @@ import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME; import static io.grpc.xds.XdsLbPolicies.WEIGHTED_TARGET_POLICY_NAME; import static io.grpc.xds.XdsLbPolicies.WRR_LOCALITY_POLICY_NAME; +import static java.util.stream.Collectors.toList; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; @@ -909,11 +910,12 @@ public void edsClustersAndLogicalDnsCluster_receivedEndpoints() { assertAddressesEqual(Arrays.asList(endpoint3, endpoint1, endpoint2), childBalancer.addresses); // ordered by cluster then addresses assertAddressesEqual(AddressFilter.filter(AddressFilter.filter( - childBalancer.addresses, CLUSTER1 + "[child1]"), locality1.toString()), + childBalancer.addresses, CLUSTER1 + "[child1]"), + "{region=\"test-region-1\", zone=\"test-zone-1\", sub_zone=\"test-subzone-1\"}"), Collections.singletonList(endpoint3)); assertAddressesEqual(AddressFilter.filter(AddressFilter.filter( childBalancer.addresses, CLUSTER_DNS + "[child0]"), - Locality.create("", "", "").toString()), + "{region=\"\", zone=\"\", sub_zone=\"\"}"), Arrays.asList(endpoint1, endpoint2)); } @@ -1142,10 +1144,11 @@ private static void assertClusterImplConfig(ClusterImplConfig config, String clu /** Asserts two list of EAGs contains same addresses, regardless of attributes. */ private static void assertAddressesEqual( List expected, List actual) { - assertThat(actual.size()).isEqualTo(expected.size()); - for (int i = 0; i < actual.size(); i++) { - assertThat(actual.get(i).getAddresses()).isEqualTo(expected.get(i).getAddresses()); - } + List> expectedAddresses + = expected.stream().map(EquivalentAddressGroup::getAddresses).collect(toList()); + List> actualAddresses + = actual.stream().map(EquivalentAddressGroup::getAddresses).collect(toList()); + assertThat(actualAddresses).isEqualTo(expectedAddresses); } private static EquivalentAddressGroup makeAddress(final String name) { diff --git a/xds/src/test/java/io/grpc/xds/WeightedTargetLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/WeightedTargetLoadBalancerTest.java index fa80c8d6e12..50864d724c6 100644 --- a/xds/src/test/java/io/grpc/xds/WeightedTargetLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/WeightedTargetLoadBalancerTest.java @@ -220,6 +220,8 @@ public void handleResolvedAddresses() { ResolvedAddresses resolvedAddresses = resolvedAddressesCaptor.getValue(); assertThat(resolvedAddresses.getLoadBalancingPolicyConfig()).isEqualTo(configs[i]); assertThat(resolvedAddresses.getAttributes().get(fakeKey)).isEqualTo(fakeValue); + assertThat(resolvedAddresses.getAttributes().get(WeightedTargetLoadBalancer.CHILD_NAME)) + .isEqualTo("target" + i); assertThat(Iterables.getOnlyElement(resolvedAddresses.getAddresses()).getAddresses()) .containsExactly(socketAddresses[i]); } diff --git a/xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerTest.java index 87ad876a182..e3483bc92b1 100644 --- a/xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/WrrLocalityLoadBalancerTest.java @@ -42,7 +42,6 @@ import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedPolicySelection; import io.grpc.xds.WeightedTargetLoadBalancerProvider.WeightedTargetConfig; import io.grpc.xds.WrrLocalityLoadBalancer.WrrLocalityConfig; -import io.grpc.xds.client.Locality; import java.net.SocketAddress; import java.util.Collections; import java.util.List; @@ -111,8 +110,8 @@ public void setUp() { @Test public void handleResolvedAddresses() { // A two locality cluster with a mock child LB policy. - Locality localityOne = Locality.create("region1", "zone1", "subzone1"); - Locality localityTwo = Locality.create("region2", "zone2", "subzone2"); + String localityOne = "localityOne"; + String localityTwo = "localityTwo"; PolicySelection childPolicy = new PolicySelection(mockChildProvider, null); // The child config is delivered wrapped in the wrr_locality config and the locality weights @@ -130,9 +129,9 @@ public void handleResolvedAddresses() { assertThat(config).isInstanceOf(WeightedTargetConfig.class); WeightedTargetConfig wtConfig = (WeightedTargetConfig) config; assertThat(wtConfig.targets).hasSize(2); - assertThat(wtConfig.targets).containsEntry(localityOne.toString(), + assertThat(wtConfig.targets).containsEntry(localityOne, new WeightedPolicySelection(1, childPolicy)); - assertThat(wtConfig.targets).containsEntry(localityTwo.toString(), + assertThat(wtConfig.targets).containsEntry(localityTwo, new WeightedPolicySelection(2, childPolicy)); } @@ -144,8 +143,7 @@ public void handleResolvedAddresses_noLocalityWeights() { // The child config is delivered wrapped in the wrr_locality config and the locality weights // in a ResolvedAddresses attribute. WrrLocalityConfig wlConfig = new WrrLocalityConfig(childPolicy); - deliverAddresses(wlConfig, ImmutableList.of( - makeAddress("addr", Locality.create("test-region", "test-zone", "test-subzone"), null))); + deliverAddresses(wlConfig, ImmutableList.of(makeAddress("addr", "test-locality", null))); // With no locality weights, we should get a TRANSIENT_FAILURE. verify(mockHelper).getAuthority(); @@ -166,8 +164,7 @@ public void handleNameResolutionError_noChildLb() { @Test public void handleNameResolutionError_withChildLb() { deliverAddresses(new WrrLocalityConfig(new PolicySelection(mockChildProvider, null)), - ImmutableList.of( - makeAddress("addr1", Locality.create("test-region1", "test-zone", "test-subzone"), 1))); + ImmutableList.of(makeAddress("addr1", "test-locality", 1))); Status status = Status.DEADLINE_EXCEEDED.withDescription("too slow"); loadBalancer.handleNameResolutionError(status); @@ -181,8 +178,7 @@ public void localityWeightAttributeNotPropagated() { PolicySelection childPolicy = new PolicySelection(mockChildProvider, null); WrrLocalityConfig wlConfig = new WrrLocalityConfig(childPolicy); - deliverAddresses(wlConfig, ImmutableList.of( - makeAddress("addr1", Locality.create("test-region1", "test-zone", "test-subzone"), 1))); + deliverAddresses(wlConfig, ImmutableList.of(makeAddress("addr1", "test-locality", 1))); // Assert that the child policy and the locality weights were correctly mapped to a // WeightedTargetConfig. @@ -195,8 +191,7 @@ public void localityWeightAttributeNotPropagated() { @Test public void shutdown() { deliverAddresses(new WrrLocalityConfig(new PolicySelection(mockChildProvider, null)), - ImmutableList.of( - makeAddress("addr", Locality.create("test-region", "test-zone", "test-subzone"), 1))); + ImmutableList.of(makeAddress("addr", "test-locality", 1))); loadBalancer.shutdown(); verify(mockWeightedTargetLb).shutdown(); @@ -224,7 +219,7 @@ private void deliverAddresses(WrrLocalityConfig config, List