Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions core/src/main/java/io/grpc/internal/PickFirstLeafLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ final class PickFirstLeafLoadBalancer extends LoadBalancer {
private ConnectivityState concludedState = IDLE;
private final boolean enableHappyEyeballs =
PickFirstLoadBalancerProvider.isEnabledHappyEyeballs();
private boolean notAPetiolePolicy = true; // means not under a petiole policy

PickFirstLeafLoadBalancer(Helper helper) {
this.helper = checkNotNull(helper, "helper");
Expand All @@ -80,6 +81,10 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
return Status.FAILED_PRECONDITION.withDescription("Already shut down");
}

// Cache whether or not this is a petiole policy, which is based off of an address attribute
Boolean isPetiolePolicy = resolvedAddresses.getAttributes().get(IS_PETIOLE_POLICY);
this.notAPetiolePolicy = isPetiolePolicy == null || !isPetiolePolicy;

List<EquivalentAddressGroup> servers = resolvedAddresses.getAddresses();

// Validate the address list
Expand Down Expand Up @@ -303,7 +308,8 @@ private void updateHealthCheckedState(SubchannelData subchannelData) {
if (subchannelData.state != READY) {
return;
}
if (subchannelData.getHealthState() == READY) {

if (notAPetiolePolicy || subchannelData.getHealthState() == READY) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just always set health status to READY when creating the subchannel. Same as L454.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the if at line 453 to use notAPetiolePolicy for auto setting as ready, but I'd like to leave this here to make it clear to people maintaining the code what is happening since understanding that somewhere else set the value to READY and nowhere else should have changed it is difficult.

updateBalancingState(READY,
new FixedResultPicker(PickResult.withSubchannel(subchannelData.subchannel)));
} else if (subchannelData.getHealthState() == TRANSIENT_FAILURE) {
Expand Down Expand Up @@ -444,7 +450,7 @@ private SubchannelData createNewSubchannel(SocketAddress addr, Attributes attrs)
hcListener.subchannelData = subchannelData;
subchannels.put(addr, subchannelData);
Attributes scAttrs = subchannel.getAttributes();
if (scAttrs.get(LoadBalancer.HAS_HEALTH_PRODUCER_LISTENER_KEY) == null) {
if (notAPetiolePolicy || scAttrs.get(LoadBalancer.HAS_HEALTH_PRODUCER_LISTENER_KEY) == null) {
subchannelData.healthStateInfo = ConnectivityStateInfo.forNonError(READY);
}
subchannel.start(stateInfo -> processSubchannelState(subchannelData, stateInfo));
Expand All @@ -468,6 +474,13 @@ private final class HealthListener implements SubchannelStateListener {

@Override
public void onSubchannelState(ConnectivityStateInfo newState) {
if (notAPetiolePolicy) {
log.log(Level.WARNING,
"Ignoring health status {0} for subchannel {1} as this is not under a petiole policy",
new Object[]{newState, subchannelData.subchannel});
return;
}

log.log(Level.FINE, "Received health status {0} for subchannel {1}",
new Object[]{newState, subchannelData.subchannel});
subchannelData.healthStateInfo = newState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static io.grpc.ConnectivityState.TRANSIENT_FAILURE;
import static io.grpc.LoadBalancer.HAS_HEALTH_PRODUCER_LISTENER_KEY;
import static io.grpc.LoadBalancer.HEALTH_CONSUMER_LISTENER_ARG_KEY;
import static io.grpc.LoadBalancer.IS_PETIOLE_POLICY;
import static io.grpc.internal.PickFirstLeafLoadBalancer.CONNECTION_DELAY_INTERVAL_MS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand Down Expand Up @@ -389,15 +390,44 @@ public void pickAfterResolvedAndChanged() {
verify(mockSubchannel2).requestConnection();
}

@Test
public void healthCheck_nonPetiolePolicy() {
when(mockSubchannel1.getAttributes()).thenReturn(
Attributes.newBuilder().set(HAS_HEALTH_PRODUCER_LISTENER_KEY, true).build());

// Initialize with one server loadbalancer and both health and state listeners
List<EquivalentAddressGroup> oneServer = Lists.newArrayList(servers.get(0));
loadBalancer.acceptResolvedAddresses(ResolvedAddresses.newBuilder().setAddresses(oneServer)
.setAttributes(Attributes.EMPTY).build());
InOrder inOrder = inOrder(mockHelper, mockSubchannel1);
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class));
inOrder.verify(mockHelper).createSubchannel(createArgsCaptor.capture());
SubchannelStateListener healthListener = createArgsCaptor.getValue()
.getOption(HEALTH_CONSUMER_LISTENER_ARG_KEY);
inOrder.verify(mockSubchannel1).start(stateListenerCaptor.capture());
SubchannelStateListener stateListener = stateListenerCaptor.getValue();

stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));
healthListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));
inOrder.verify(mockHelper, never()).updateBalancingState(any(), any());

stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY));
inOrder.verify(mockHelper).updateBalancingState(eq(READY), any()); // health listener ignored

healthListener.onSubchannelState(ConnectivityStateInfo.forTransientFailure(Status.INTERNAL));
inOrder.verify(mockHelper, never()).updateBalancingState(any(), any(SubchannelPicker.class));
}

@Test
public void healthCheckFlow() {
when(mockSubchannel1.getAttributes()).thenReturn(
Attributes.newBuilder().set(HAS_HEALTH_PRODUCER_LISTENER_KEY, true).build());
when(mockSubchannel2.getAttributes()).thenReturn(
Attributes.newBuilder().set(HAS_HEALTH_PRODUCER_LISTENER_KEY, true).build());

List<EquivalentAddressGroup> oneServer = Lists.newArrayList(servers.get(0), servers.get(1));
loadBalancer.acceptResolvedAddresses(ResolvedAddresses.newBuilder().setAddresses(oneServer)
.setAttributes(Attributes.EMPTY).build());
.setAttributes(Attributes.newBuilder().set(IS_PETIOLE_POLICY, true).build()).build());

InOrder inOrder = inOrder(mockHelper, mockSubchannel1, mockSubchannel2);
inOrder.verify(mockHelper).updateBalancingState(eq(CONNECTING), any(SubchannelPicker.class));
Expand All @@ -413,13 +443,13 @@ public void healthCheckFlow() {
// subchannel2 | IDLE | IDLE
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));
healthListener.onSubchannelState(ConnectivityStateInfo.forNonError(CONNECTING));
inOrder.verify(mockHelper, times(0)).updateBalancingState(any(), any());
inOrder.verify(mockHelper, never()).updateBalancingState(any(), any());

// subchannel | state | health
// subchannel1 | READY | CONNECTING
// subchannel2 | IDLE | IDLE
stateListener.onSubchannelState(ConnectivityStateInfo.forNonError(READY));
inOrder.verify(mockHelper, times(0)).updateBalancingState(any(), any());
inOrder.verify(mockHelper, never()).updateBalancingState(any(), any());

// subchannel | state | health
// subchannel1 | READY | READY
Expand Down