From 9e36f497c87f76fb237e56ecc9f2952224b55d98 Mon Sep 17 00:00:00 2001 From: abbccdda Date: Fri, 20 Mar 2020 16:39:08 -0700 Subject: [PATCH 1/2] Check for no generation during sync group response --- .../kafka/clients/consumer/internals/AbstractCoordinator.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 2d93766ac90f0..8427b92d781b8 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -913,7 +913,8 @@ private boolean isProtocolTypeInconsistent(String protocolType) { } private boolean isProtocolNameInconsistent(String protocolName) { - return protocolName != null && !protocolName.equals(generation().protocolName); + return protocolName != null && generation() != Generation.NO_GENERATION + && !protocolName.equals(generation().protocolName); } /** From 5fccb32033c53fb3be3ea04fc99bb989913722ea Mon Sep 17 00:00:00 2001 From: abbccdda Date: Fri, 20 Mar 2020 18:17:50 -0700 Subject: [PATCH 2/2] add unit test --- .../internals/AbstractCoordinatorTest.java | 66 +++++++++++++------ 1 file changed, 47 insertions(+), 19 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index f9588362e31d6..e2315cd478c3c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -49,7 +49,6 @@ import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Timer; -import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import org.junit.Test; @@ -365,8 +364,8 @@ public void testJoinGroupRequestWithFencedInstanceIdException() { @Test public void testJoinGroupProtocolTypeAndName() { - String wrongProtocolType = "wrong-type"; - String wrongProtocolName = "wrong-name"; + final String wrongProtocolType = "wrong-type"; + final String wrongProtocolName = "wrong-name"; // No Protocol Type in both JoinGroup and SyncGroup responses assertTrue(joinGroupWithProtocolTypeAndName(null, null, null)); @@ -391,6 +390,39 @@ public void testJoinGroupProtocolTypeAndName() { () -> joinGroupWithProtocolTypeAndName(PROTOCOL_TYPE, PROTOCOL_TYPE, wrongProtocolName)); } + @Test + public void testNoGenerationWillNotTriggerProtocolNameCheck() { + final String wrongProtocolName = "wrong-name"; + + setupCoordinator(); + mockClient.reset(); + mockClient.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.ensureCoordinatorReady(mockTime.timer(0)); + + mockClient.prepareResponse(body -> { + if (!(body instanceof JoinGroupRequest)) { + return false; + } + JoinGroupRequest joinGroupRequest = (JoinGroupRequest) body; + return joinGroupRequest.data().protocolType().equals(PROTOCOL_TYPE); + }, joinGroupFollowerResponse(defaultGeneration, memberId, + "memberid", Errors.NONE, PROTOCOL_TYPE)); + + mockClient.prepareResponse(body -> { + if (!(body instanceof SyncGroupRequest)) { + return false; + } + coordinator.resetGenerationOnLeaveGroup(); + + SyncGroupRequest syncGroupRequest = (SyncGroupRequest) body; + return syncGroupRequest.data.protocolType().equals(PROTOCOL_TYPE) + && syncGroupRequest.data.protocolName().equals(PROTOCOL_NAME); + }, syncGroupResponse(Errors.NONE, PROTOCOL_TYPE, wrongProtocolName)); + + // No exception shall be thrown as the generation is reset. + coordinator.joinGroupIfNeeded(mockTime.timer(100L)); + } + private boolean joinGroupWithProtocolTypeAndName(String joinGroupResponseProtocolType, String syncGroupResponseProtocolType, String syncGroupResponseProtocolName) { @@ -665,7 +697,7 @@ public boolean matches(AbstractRequest body) { try { coordinator.ensureActiveGroup(); fail("Should have woken up from ensureActiveGroup()"); - } catch (WakeupException e) { + } catch (WakeupException ignored) { } assertEquals(1, coordinator.onJoinPrepareInvokes); @@ -703,7 +735,7 @@ public boolean matches(AbstractRequest body) { try { coordinator.ensureActiveGroup(); fail("Should have woken up from ensureActiveGroup()"); - } catch (WakeupException e) { + } catch (WakeupException ignored) { } assertEquals(1, coordinator.onJoinPrepareInvokes); @@ -738,7 +770,7 @@ public void testWakeupAfterJoinGroupReceived() throws Exception { try { coordinator.ensureActiveGroup(); fail("Should have woken up from ensureActiveGroup()"); - } catch (WakeupException e) { + } catch (WakeupException ignored) { } assertEquals(1, coordinator.onJoinPrepareInvokes); @@ -811,7 +843,7 @@ public boolean matches(AbstractRequest body) { try { coordinator.ensureActiveGroup(); fail("Should have woken up from ensureActiveGroup()"); - } catch (WakeupException e) { + } catch (WakeupException ignored) { } assertEquals(1, coordinator.onJoinPrepareInvokes); @@ -884,7 +916,7 @@ public void testWakeupAfterSyncGroupReceived() throws Exception { try { coordinator.ensureActiveGroup(); fail("Should have woken up from ensureActiveGroup()"); - } catch (WakeupException e) { + } catch (WakeupException ignored) { } assertEquals(1, coordinator.onJoinPrepareInvokes); @@ -945,7 +977,7 @@ public void testWakeupInOnJoinComplete() throws Exception { try { coordinator.ensureActiveGroup(); fail("Should have woken up from ensureActiveGroup()"); - } catch (WakeupException e) { + } catch (WakeupException ignored) { } assertEquals(1, coordinator.onJoinPrepareInvokes); @@ -990,12 +1022,8 @@ private AtomicBoolean prepareFirstHeartbeat() { private void awaitFirstHeartbeat(final AtomicBoolean heartbeatReceived) throws Exception { mockTime.sleep(HEARTBEAT_INTERVAL_MS); - TestUtils.waitForCondition(new TestCondition() { - @Override - public boolean conditionMet() { - return heartbeatReceived.get(); - } - }, 3000, "Should have received a heartbeat request after joining the group"); + TestUtils.waitForCondition(heartbeatReceived::get, + 3000, "Should have received a heartbeat request after joining the group"); } private FindCoordinatorResponse groupCoordinatorResponse(Node node, Errors error) { @@ -1063,10 +1091,10 @@ public static class DummyCoordinator extends AbstractCoordinator { private int onJoinCompleteInvokes = 0; private boolean wakeupOnJoinComplete = false; - public DummyCoordinator(GroupRebalanceConfig rebalanceConfig, - ConsumerNetworkClient client, - Metrics metrics, - Time time) { + DummyCoordinator(GroupRebalanceConfig rebalanceConfig, + ConsumerNetworkClient client, + Metrics metrics, + Time time) { super(rebalanceConfig, new LogContext(), client, metrics, METRIC_GROUP_PREFIX, time); }