Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -913,7 +913,8 @@ private boolean isProtocolTypeInconsistent(String protocolType) {
}

private boolean isProtocolNameInconsistent(String protocolName) {
return protocolName != null && !protocolName.equals(generation().protocolName);
return protocolName != null && generation() != Generation.NO_GENERATION
&& !protocolName.equals(generation().protocolName);
Comment on lines +916 to +917
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang @abbccdda Shouldn't we synchronise this or use a local reference of the generation to be 100% safe?

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;

Expand Down Expand Up @@ -365,8 +364,8 @@ public void testJoinGroupRequestWithFencedInstanceIdException() {

@Test
public void testJoinGroupProtocolTypeAndName() {
String wrongProtocolType = "wrong-type";
String wrongProtocolName = "wrong-name";
final String wrongProtocolType = "wrong-type";
final String wrongProtocolName = "wrong-name";

// No Protocol Type in both JoinGroup and SyncGroup responses
assertTrue(joinGroupWithProtocolTypeAndName(null, null, null));
Expand All @@ -391,6 +390,39 @@ public void testJoinGroupProtocolTypeAndName() {
() -> joinGroupWithProtocolTypeAndName(PROTOCOL_TYPE, PROTOCOL_TYPE, wrongProtocolName));
}

@Test
public void testNoGenerationWillNotTriggerProtocolNameCheck() {
final String wrongProtocolName = "wrong-name";

setupCoordinator();
mockClient.reset();
mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
coordinator.ensureCoordinatorReady(mockTime.timer(0));

mockClient.prepareResponse(body -> {
if (!(body instanceof JoinGroupRequest)) {
return false;
}
JoinGroupRequest joinGroupRequest = (JoinGroupRequest) body;
return joinGroupRequest.data().protocolType().equals(PROTOCOL_TYPE);
}, joinGroupFollowerResponse(defaultGeneration, memberId,
"memberid", Errors.NONE, PROTOCOL_TYPE));

mockClient.prepareResponse(body -> {
if (!(body instanceof SyncGroupRequest)) {
return false;
}
coordinator.resetGenerationOnLeaveGroup();

SyncGroupRequest syncGroupRequest = (SyncGroupRequest) body;
return syncGroupRequest.data.protocolType().equals(PROTOCOL_TYPE)
&& syncGroupRequest.data.protocolName().equals(PROTOCOL_NAME);
}, syncGroupResponse(Errors.NONE, PROTOCOL_TYPE, wrongProtocolName));

// No exception shall be thrown as the generation is reset.
coordinator.joinGroupIfNeeded(mockTime.timer(100L));
}

private boolean joinGroupWithProtocolTypeAndName(String joinGroupResponseProtocolType,
String syncGroupResponseProtocolType,
String syncGroupResponseProtocolName) {
Expand Down Expand Up @@ -665,7 +697,7 @@ public boolean matches(AbstractRequest body) {
try {
coordinator.ensureActiveGroup();
fail("Should have woken up from ensureActiveGroup()");
} catch (WakeupException e) {
} catch (WakeupException ignored) {
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just side cleanups starting L700

}

assertEquals(1, coordinator.onJoinPrepareInvokes);
Expand Down Expand Up @@ -703,7 +735,7 @@ public boolean matches(AbstractRequest body) {
try {
coordinator.ensureActiveGroup();
fail("Should have woken up from ensureActiveGroup()");
} catch (WakeupException e) {
} catch (WakeupException ignored) {
}

assertEquals(1, coordinator.onJoinPrepareInvokes);
Expand Down Expand Up @@ -738,7 +770,7 @@ public void testWakeupAfterJoinGroupReceived() throws Exception {
try {
coordinator.ensureActiveGroup();
fail("Should have woken up from ensureActiveGroup()");
} catch (WakeupException e) {
} catch (WakeupException ignored) {
}

assertEquals(1, coordinator.onJoinPrepareInvokes);
Expand Down Expand Up @@ -811,7 +843,7 @@ public boolean matches(AbstractRequest body) {
try {
coordinator.ensureActiveGroup();
fail("Should have woken up from ensureActiveGroup()");
} catch (WakeupException e) {
} catch (WakeupException ignored) {
}

assertEquals(1, coordinator.onJoinPrepareInvokes);
Expand Down Expand Up @@ -884,7 +916,7 @@ public void testWakeupAfterSyncGroupReceived() throws Exception {
try {
coordinator.ensureActiveGroup();
fail("Should have woken up from ensureActiveGroup()");
} catch (WakeupException e) {
} catch (WakeupException ignored) {
}

assertEquals(1, coordinator.onJoinPrepareInvokes);
Expand Down Expand Up @@ -945,7 +977,7 @@ public void testWakeupInOnJoinComplete() throws Exception {
try {
coordinator.ensureActiveGroup();
fail("Should have woken up from ensureActiveGroup()");
} catch (WakeupException e) {
} catch (WakeupException ignored) {
}

assertEquals(1, coordinator.onJoinPrepareInvokes);
Expand Down Expand Up @@ -990,12 +1022,8 @@ private AtomicBoolean prepareFirstHeartbeat() {

private void awaitFirstHeartbeat(final AtomicBoolean heartbeatReceived) throws Exception {
mockTime.sleep(HEARTBEAT_INTERVAL_MS);
TestUtils.waitForCondition(new TestCondition() {
@Override
public boolean conditionMet() {
return heartbeatReceived.get();
}
}, 3000, "Should have received a heartbeat request after joining the group");
TestUtils.waitForCondition(heartbeatReceived::get,
3000, "Should have received a heartbeat request after joining the group");
}

private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) {
Expand Down Expand Up @@ -1063,10 +1091,10 @@ public static class DummyCoordinator extends AbstractCoordinator {
private int onJoinCompleteInvokes = 0;
private boolean wakeupOnJoinComplete = false;

public DummyCoordinator(GroupRebalanceConfig rebalanceConfig,
ConsumerNetworkClient client,
Metrics metrics,
Time time) {
DummyCoordinator(GroupRebalanceConfig rebalanceConfig,
ConsumerNetworkClient client,
Metrics metrics,
Time time) {
super(rebalanceConfig, new LogContext(), client, metrics, METRIC_GROUP_PREFIX, time);
}

Expand Down