diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java index 77b7495b49217..67fb7267f024f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatRequestManagerTest.java @@ -30,8 +30,6 @@ import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData; import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData; -import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData.Assignment; -import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; @@ -46,7 +44,6 @@ import org.apache.kafka.common.utils.Timer; import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource; -import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; @@ -58,33 +55,22 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Properties; -import java.util.Random; +import java.util.Set; import java.util.SortedSet; -import java.util.concurrent.TimeUnit; - -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_GROUP_INSTANCE_ID; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_REMOTE_ASSIGNOR; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_RETRY_BACKOFF_MAX_MS; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_RETRY_BACKOFF_MS; + import static org.apache.kafka.common.utils.Utils.mkSortedSet; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; -import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.clearInvocations; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -94,9 +80,16 @@ public class HeartbeatRequestManagerTest { private static final String DEFAULT_GROUP_ID = "groupId"; - private static final String CONSUMER_COORDINATOR_METRICS = "consumer-coordinator-metrics"; + private static final String DEFAULT_REMOTE_ASSIGNOR = "uniform"; + private static final String DEFAULT_GROUP_INSTANCE_ID = "group-instance-id"; + private static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; + private static final int DEFAULT_MAX_POLL_INTERVAL_MS = 10000; + private static final long DEFAULT_RETRY_BACKOFF_MS = 80; + private static final long DEFAULT_RETRY_BACKOFF_MAX_MS = 1000; + private static final double DEFAULT_HEARTBEAT_JITTER_MS = 0.0; + private static final String DEFAULT_MEMBER_ID = "member-id"; + private static final int DEFAULT_MEMBER_EPOCH = 1; - private ConsumerTestBuilder testBuilder; private Time time; private Timer pollTimer; private CoordinatorRequestManager coordinatorRequestManager; @@ -106,60 +99,85 @@ public class HeartbeatRequestManagerTest { private MembershipManager membershipManager; private HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState; private HeartbeatRequestManager.HeartbeatState heartbeatState; - private final String memberId = "member-id"; - private final int memberEpoch = 1; private BackgroundEventHandler backgroundEventHandler; - private Metrics metrics; + private LogContext logContext; @BeforeEach public void setUp() { - setUp(ConsumerTestBuilder.createDefaultGroupInformation()); - } + this.time = new MockTime(); + this.logContext = new LogContext(); + this.pollTimer = spy(time.timer(DEFAULT_MAX_POLL_INTERVAL_MS)); + this.coordinatorRequestManager = mock(CoordinatorRequestManager.class); + this.heartbeatState = mock(HeartbeatState.class); + this.backgroundEventHandler = mock(BackgroundEventHandler.class); + this.subscriptions = mock(SubscriptionState.class); + this.membershipManager = mock(MembershipManagerImpl.class); + this.metadata = mock(ConsumerMetadata.class); + Metrics metrics = new Metrics(time); + ConsumerConfig config = mock(ConsumerConfig.class); + + this.heartbeatRequestState = spy(new HeartbeatRequestState( + logContext, + time, + DEFAULT_HEARTBEAT_INTERVAL_MS, + DEFAULT_RETRY_BACKOFF_MS, + DEFAULT_RETRY_BACKOFF_MAX_MS, + DEFAULT_HEARTBEAT_JITTER_MS)); - private void setUp(Optional groupInfo) { - testBuilder = new ConsumerTestBuilder(groupInfo, true, false); - time = testBuilder.time; - coordinatorRequestManager = testBuilder.coordinatorRequestManager.orElseThrow(IllegalStateException::new); - heartbeatRequestManager = testBuilder.heartbeatRequestManager.orElseThrow(IllegalStateException::new); - heartbeatRequestState = testBuilder.heartbeatRequestState.orElseThrow(IllegalStateException::new); - heartbeatState = testBuilder.heartbeatState.orElseThrow(IllegalStateException::new); - backgroundEventHandler = testBuilder.backgroundEventHandler; - subscriptions = testBuilder.subscriptions; - membershipManager = testBuilder.membershipManager.orElseThrow(IllegalStateException::new); - metadata = testBuilder.metadata; - metrics = new Metrics(time); + this.heartbeatRequestManager = new HeartbeatRequestManager( + logContext, + pollTimer, + config, + coordinatorRequestManager, + membershipManager, + heartbeatState, + heartbeatRequestState, + backgroundEventHandler, + metrics); - when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", 9999))); + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(mock(Node.class))); } - private void resetWithZeroHeartbeatInterval(Optional groupInstanceId) { - cleanup(); - - ConsumerTestBuilder.GroupInformation gi = new ConsumerTestBuilder.GroupInformation( - DEFAULT_GROUP_ID, - groupInstanceId, + private void createHeartbeatRequestStateWithZeroHeartbeatInterval() { + this.heartbeatRequestState = spy(new HeartbeatRequestState( + logContext, + time, 0, - 0.0, - Optional.of(DEFAULT_REMOTE_ASSIGNOR) - ); + DEFAULT_RETRY_BACKOFF_MS, + DEFAULT_RETRY_BACKOFF_MAX_MS, + DEFAULT_HEARTBEAT_JITTER_MS)); - setUp(Optional.of(gi)); + this.heartbeatRequestManager = createHeartbeatRequestManager( + coordinatorRequestManager, + membershipManager, + heartbeatState, + heartbeatRequestState, + backgroundEventHandler); } - @AfterEach - public void cleanup() { - if (testBuilder != null) { - testBuilder.close(); - } + private void createHeartbeatStatAndRequestManager() { + this.heartbeatState = new HeartbeatState( + subscriptions, + membershipManager, + DEFAULT_MAX_POLL_INTERVAL_MS + ); + + this.heartbeatRequestManager = createHeartbeatRequestManager( + coordinatorRequestManager, + membershipManager, + heartbeatState, + heartbeatRequestState, + backgroundEventHandler + ); } + @Test public void testHeartbeatOnStartup() { NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(0, result.unsentRequests.size()); - resetWithZeroHeartbeatInterval(Optional.empty()); - mockStableMember(); + createHeartbeatRequestStateWithZeroHeartbeatInterval(); assertEquals(0, heartbeatRequestManager.maximumTimeToWait(time.milliseconds())); result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, result.unsentRequests.size()); @@ -171,7 +189,6 @@ public void testHeartbeatOnStartup() { @Test public void testSuccessfulHeartbeatTiming() { - mockStableMember(); NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent while interval has not expired"); @@ -201,12 +218,16 @@ public void testSuccessfulHeartbeatTiming() { @ParameterizedTest @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT) public void testFirstHeartbeatIncludesRequiredInfoToJoinGroupAndGetAssignments(short version) { - resetWithZeroHeartbeatInterval(Optional.of(DEFAULT_GROUP_INSTANCE_ID)); + createHeartbeatStatAndRequestManager(); + createHeartbeatRequestStateWithZeroHeartbeatInterval(); + time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); String topic = "topic1"; - subscriptions.subscribe(Collections.singleton(topic), Optional.empty()); - membershipManager.onSubscriptionUpdated(); + Set set = Collections.singleton(topic); + when(subscriptions.subscription()).thenReturn(set); + subscriptions.subscribe(set, Optional.empty()); // Create a ConsumerHeartbeatRequest and verify the payload + mockJoiningMemberData(DEFAULT_GROUP_INSTANCE_ID); assertEquals(0, heartbeatRequestManager.maximumTimeToWait(time.milliseconds())); NetworkClientDelegate.PollResult pollResult = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, pollResult.unsentRequests.size()); @@ -231,11 +252,10 @@ public void testFirstHeartbeatIncludesRequiredInfoToJoinGroupAndGetAssignments(s @ValueSource(booleans = {true, false}) public void testSkippingHeartbeat(final boolean shouldSkipHeartbeat) { // The initial heartbeatInterval is set to 0 - resetWithZeroHeartbeatInterval(Optional.empty()); + createHeartbeatRequestStateWithZeroHeartbeatInterval(); // Mocking notInGroup when(membershipManager.shouldSkipHeartbeat()).thenReturn(shouldSkipHeartbeat); - when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true); NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); @@ -251,23 +271,22 @@ public void testSkippingHeartbeat(final boolean shouldSkipHeartbeat) { @Test public void testTimerNotDue() { - mockStableMember(); time.sleep(100); // time elapsed < heartbeatInterval, no heartbeat should be sent NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); + assertEquals(0, result.unsentRequests.size()); assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, result.timeUntilNextPollMs); assertEquals(DEFAULT_HEARTBEAT_INTERVAL_MS - 100, heartbeatRequestManager.maximumTimeToWait(time.milliseconds())); // Member in state where it should not send Heartbeat anymore when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true); - membershipManager.transitionToFatal(); + when(membershipManager.shouldSkipHeartbeat()).thenReturn(true); result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(Long.MAX_VALUE, result.timeUntilNextPollMs); } @Test public void testHeartbeatNotSentIfAnotherOneInFlight() { - mockStableMember(); time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); // Heartbeat sent (no response received) @@ -275,14 +294,15 @@ public void testHeartbeatNotSentIfAnotherOneInFlight() { assertEquals(1, result.unsentRequests.size()); NetworkClientDelegate.UnsentRequest inflightReq = result.unsentRequests.get(0); + time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent while a " + - "previous one is in-flight"); + "previous one is in-flight"); time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent when the " + - "interval expires if there is a previous HB request in-flight"); + "interval expires if there is a previous HB request in-flight"); // Receive response for the inflight after the interval expired. The next HB should be sent // on the next poll waiting only for the minimal backoff. @@ -312,9 +332,7 @@ public void testHeartbeatOutsideInterval() { @Test public void testNetworkTimeout() { // The initial heartbeatInterval is set to 0 - resetWithZeroHeartbeatInterval(Optional.empty()); - mockStableMember(); - when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", 9999))); + createHeartbeatRequestStateWithZeroHeartbeatInterval(); NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, result.unsentRequests.size()); // Mimic network timeout @@ -335,10 +353,8 @@ public void testNetworkTimeout() { @Test public void testFailureOnFatalException() { // The initial heartbeatInterval is set to 0 - resetWithZeroHeartbeatInterval(Optional.empty()); - mockStableMember(); + time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); - when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", 9999))); NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, result.unsentRequests.size()); result.unsentRequests.get(0).handler().onFailure(time.milliseconds(), new KafkaException("fatal")); @@ -360,33 +376,36 @@ public void testNoCoordinator() { @ParameterizedTest @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT) public void testValidateConsumerGroupHeartbeatRequest(final short version) { + createHeartbeatStatAndRequestManager(); + // The initial heartbeatInterval is set to 0, but we're testing - resetWithZeroHeartbeatInterval(Optional.of(DEFAULT_GROUP_INSTANCE_ID)); - mockStableMember(); + time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); - List subscribedTopics = Collections.singletonList("topic"); - subscriptions.subscribe(new HashSet<>(subscribedTopics), Optional.empty()); + String subscribedTopic = "topic"; + when(subscriptions.subscription()).thenReturn(Collections.singleton(subscribedTopic)); // Update membershipManager's memberId and memberEpoch ConsumerGroupHeartbeatResponse result = new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData() - .setMemberId(memberId) - .setMemberEpoch(memberEpoch)); + .setMemberId(DEFAULT_MEMBER_ID) + .setMemberEpoch(DEFAULT_MEMBER_EPOCH)); membershipManager.onHeartbeatSuccess(result.data()); // Create a ConsumerHeartbeatRequest and verify the payload + mockStableMemberData(DEFAULT_GROUP_INSTANCE_ID); NetworkClientDelegate.PollResult pollResult = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, pollResult.unsentRequests.size()); NetworkClientDelegate.UnsentRequest request = pollResult.unsentRequests.get(0); assertInstanceOf(Builder.class, request.requestBuilder()); ConsumerGroupHeartbeatRequest heartbeatRequest = - (ConsumerGroupHeartbeatRequest) request.requestBuilder().build(version); + (ConsumerGroupHeartbeatRequest) request.requestBuilder().build(version); + assertEquals(DEFAULT_GROUP_ID, heartbeatRequest.data().groupId()); - assertEquals(memberId, heartbeatRequest.data().memberId()); - assertEquals(memberEpoch, heartbeatRequest.data().memberEpoch()); + assertEquals(DEFAULT_MEMBER_ID, heartbeatRequest.data().memberId()); + assertEquals(DEFAULT_MEMBER_EPOCH, heartbeatRequest.data().memberEpoch()); assertEquals(10000, heartbeatRequest.data().rebalanceTimeoutMs()); - assertEquals(subscribedTopics, heartbeatRequest.data().subscribedTopicNames()); + assertEquals(subscribedTopic, heartbeatRequest.data().subscribedTopicNames().get(0)); assertEquals(DEFAULT_GROUP_INSTANCE_ID, heartbeatRequest.data().instanceId()); assertEquals(DEFAULT_REMOTE_ASSIGNOR, heartbeatRequest.data().serverAssignor()); } @@ -394,23 +413,9 @@ public void testValidateConsumerGroupHeartbeatRequest(final short version) { @ParameterizedTest @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT) public void testValidateConsumerGroupHeartbeatRequestAssignmentSentWhenLocalEpochChanges(final short version) { - CoordinatorRequestManager coordinatorRequestManager = mock(CoordinatorRequestManager.class); - MembershipManager membershipManager = mock(MembershipManager.class); - BackgroundEventHandler backgroundEventHandler = mock(BackgroundEventHandler.class); - SubscriptionState subscriptionState = mock(SubscriptionState.class); - HeartbeatRequestState requestState = mock(HeartbeatRequestState.class); - HeartbeatState heartbeatState = new HeartbeatState(subscriptionState, membershipManager, DEFAULT_MAX_POLL_INTERVAL_MS); - - HeartbeatRequestManager heartbeatRequestManager = createHeartbeatRequestManager( - coordinatorRequestManager, - membershipManager, - heartbeatState, - requestState, - backgroundEventHandler - ); + createHeartbeatStatAndRequestManager(); when(membershipManager.shouldHeartbeatNow()).thenReturn(true); - when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", 9999))); Uuid topicId = Uuid.randomUuid(); ConsumerGroupHeartbeatRequestData.TopicPartitions expectedTopicPartitions = @@ -428,6 +433,7 @@ topicId, mkSortedSet(0) assertEquals(Collections.singletonList(expectedTopicPartitions), heartbeatRequest1.data().topicPartitions()); // Assignment did not change, so no assignment should be sent + when(heartbeatRequestState.canSendRequest(anyLong())).thenReturn(true); ConsumerGroupHeartbeatRequest heartbeatRequest2 = getHeartbeatRequest(heartbeatRequestManager, version); assertNull(heartbeatRequest2.data().topicPartitions()); @@ -439,7 +445,6 @@ topicId, mkSortedSet(0) } private ConsumerGroupHeartbeatRequest getHeartbeatRequest(HeartbeatRequestManager heartbeatRequestManager, final short version) { - // Create a ConsumerHeartbeatRequest and verify the payload -- no assignment should be sent NetworkClientDelegate.PollResult pollResult = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, pollResult.unsentRequests.size()); NetworkClientDelegate.UnsentRequest request = pollResult.unsentRequests.get(0); @@ -450,8 +455,6 @@ private ConsumerGroupHeartbeatRequest getHeartbeatRequest(HeartbeatRequestManage @ParameterizedTest @MethodSource("errorProvider") public void testHeartbeatResponseOnErrorHandling(final Errors error, final boolean isFatal) { - mockStableMember(); - // Handling errors on the second heartbeat time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); @@ -489,6 +492,7 @@ public void testHeartbeatResponseOnErrorHandling(final Errors error, final boole break; default: if (isFatal) { + when(coordinatorRequestManager.coordinator()).thenReturn(Optional.empty()); ensureFatalError(error); } else { verify(backgroundEventHandler, never()).add(any()); @@ -521,62 +525,66 @@ private void assertNextHeartbeatTiming(long expectedTimeToNextHeartbeatMs) { @Test public void testHeartbeatState() { + mockJoiningMemberData(null); + + heartbeatState = new HeartbeatState( + subscriptions, + membershipManager, + DEFAULT_MAX_POLL_INTERVAL_MS + ); + + createHeartbeatRequestStateWithZeroHeartbeatInterval(); + // The initial ConsumerGroupHeartbeatRequest sets most fields to their initial empty values ConsumerGroupHeartbeatRequestData data = heartbeatState.buildRequestData(); - assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId()); + assertEquals(DEFAULT_GROUP_ID, data.groupId()); assertEquals("", data.memberId()); assertEquals(0, data.memberEpoch()); assertNull(data.instanceId()); - assertEquals(ConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS, data.rebalanceTimeoutMs()); + assertEquals(DEFAULT_MAX_POLL_INTERVAL_MS, data.rebalanceTimeoutMs()); assertEquals(Collections.emptyList(), data.subscribedTopicNames()); - assertEquals(ConsumerTestBuilder.DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor()); + assertEquals(DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor()); assertEquals(Collections.emptyList(), data.topicPartitions()); - membershipManager.onHeartbeatRequestGenerated(); - assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); // Mock a response from the group coordinator, that supplies the member ID and a new epoch - mockStableMember(); + when(membershipManager.state()).thenReturn(MemberState.STABLE); + when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true); + when(subscriptions.rebalanceListener()).thenReturn(Optional.empty()); + mockStableMemberData(null); data = heartbeatState.buildRequestData(); - assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId()); - assertEquals(memberId, data.memberId()); + assertEquals(DEFAULT_GROUP_ID, data.groupId()); + assertEquals(DEFAULT_MEMBER_ID, data.memberId()); assertEquals(1, data.memberEpoch()); assertNull(data.instanceId()); assertEquals(-1, data.rebalanceTimeoutMs()); assertNull(data.subscribedTopicNames()); assertNull(data.serverAssignor()); - assertEquals(data.topicPartitions(), Collections.emptyList()); - membershipManager.onHeartbeatRequestGenerated(); - assertEquals(MemberState.STABLE, membershipManager.state()); + assertEquals(Collections.emptyList(), data.topicPartitions()); // Join the group and subscribe to a topic, but the response has not yet been received String topic = "topic1"; subscriptions.subscribe(Collections.singleton(topic), Optional.empty()); - membershipManager.onSubscriptionUpdated(); - membershipManager.transitionToFenced(); // And indirect way of moving to JOINING state + when(subscriptions.subscription()).thenReturn(Collections.singleton(topic)); + mockRejoiningMemberData(); data = heartbeatState.buildRequestData(); - assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId()); - assertEquals(memberId, data.memberId()); + assertEquals(DEFAULT_GROUP_ID, data.groupId()); + assertEquals(DEFAULT_MEMBER_ID, data.memberId()); assertEquals(0, data.memberEpoch()); assertNull(data.instanceId()); assertEquals(DEFAULT_MAX_POLL_INTERVAL_MS, data.rebalanceTimeoutMs()); assertEquals(Collections.singletonList(topic), data.subscribedTopicNames()); - assertEquals(ConsumerTestBuilder.DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor()); + assertEquals(DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor()); assertEquals(Collections.emptyList(), data.topicPartitions()); - membershipManager.onHeartbeatRequestGenerated(); - assertEquals(MemberState.JOINING, membershipManager.state()); - membershipManager.transitionToFenced(); data = heartbeatState.buildRequestData(); - assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId()); - assertEquals(memberId, data.memberId()); + assertEquals(DEFAULT_GROUP_ID, data.groupId()); + assertEquals(DEFAULT_MEMBER_ID, data.memberId()); assertEquals(0, data.memberEpoch()); assertNull(data.instanceId()); assertEquals(DEFAULT_MAX_POLL_INTERVAL_MS, data.rebalanceTimeoutMs()); assertEquals(Collections.singletonList(topic), data.subscribedTopicNames()); - assertEquals(ConsumerTestBuilder.DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor()); + assertEquals(DEFAULT_REMOTE_ASSIGNOR, data.serverAssignor()); assertEquals(Collections.emptyList(), data.topicPartitions()); - membershipManager.onHeartbeatRequestGenerated(); - assertEquals(MemberState.JOINING, membershipManager.state()); // Mock the response from the group coordinator which returns an assignment ConsumerGroupHeartbeatResponseData.TopicPartitions tpTopic1 = @@ -587,39 +595,17 @@ public void testHeartbeatState() { ConsumerGroupHeartbeatResponseData.Assignment assignmentTopic1 = new ConsumerGroupHeartbeatResponseData.Assignment(); assignmentTopic1.setTopicPartitions(Collections.singletonList(tpTopic1)); - ConsumerGroupHeartbeatResponse rs1 = new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData() - .setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) - .setMemberId(memberId) - .setMemberEpoch(1) - .setAssignment(assignmentTopic1)); when(metadata.topicNames()).thenReturn(Collections.singletonMap(topicId, "topic1")); - membershipManager.onHeartbeatSuccess(rs1.data()); - - // We remain in RECONCILING state, as the assignment will be reconciled on the next poll - assertEquals(MemberState.RECONCILING, membershipManager.state()); } @Test public void testPollTimerExpiration() { - coordinatorRequestManager = mock(CoordinatorRequestManager.class); - membershipManager = mock(MembershipManager.class); - heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class); - heartbeatRequestState = spy(new HeartbeatRequestManager.HeartbeatRequestState( - new LogContext(), - time, - DEFAULT_HEARTBEAT_INTERVAL_MS, - DEFAULT_RETRY_BACKOFF_MS, - DEFAULT_RETRY_BACKOFF_MAX_MS, - 0)); - backgroundEventHandler = mock(BackgroundEventHandler.class); - heartbeatRequestManager = createHeartbeatRequestManager( coordinatorRequestManager, membershipManager, heartbeatState, heartbeatRequestState, backgroundEventHandler); - when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", 9999))); when(membershipManager.shouldSkipHeartbeat()).thenReturn(false); // On poll timer expiration, the member should send a last heartbeat to leave the group @@ -631,7 +617,6 @@ public void testPollTimerExpiration() { verify(heartbeatRequestState).reset(); verify(membershipManager).onHeartbeatRequestGenerated(); - when(membershipManager.state()).thenReturn(MemberState.STALE); when(membershipManager.shouldSkipHeartbeat()).thenReturn(true); assertNoHeartbeat(heartbeatRequestManager); heartbeatRequestManager.resetPollTimer(time.milliseconds()); @@ -666,76 +651,30 @@ public void testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeavin @Test public void testisExpiredByUsedForLogging() { - Timer pollTimer = spy(time.timer(DEFAULT_MAX_POLL_INTERVAL_MS)); - heartbeatRequestManager = new HeartbeatRequestManager(new LogContext(), pollTimer, config(), - coordinatorRequestManager, membershipManager, heartbeatState, heartbeatRequestState, - backgroundEventHandler, metrics); when(membershipManager.shouldSkipHeartbeat()).thenReturn(false); int exceededTimeMs = 5; time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS + exceededTimeMs); + when(membershipManager.isLeavingGroup()).thenReturn(false); NetworkClientDelegate.PollResult pollResult = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, pollResult.unsentRequests.size()); verify(membershipManager).transitionToSendingLeaveGroup(true); verify(pollTimer, never()).isExpiredBy(); - assertEquals(exceededTimeMs, pollTimer.isExpiredBy()); clearInvocations(pollTimer); heartbeatRequestManager.resetPollTimer(time.milliseconds()); verify(pollTimer).isExpiredBy(); } - @Test - public void testHeartbeatMetrics() { - // setup - coordinatorRequestManager = mock(CoordinatorRequestManager.class); - membershipManager = mock(MembershipManager.class); - heartbeatState = mock(HeartbeatRequestManager.HeartbeatState.class); - time = new MockTime(); - metrics = new Metrics(time); - heartbeatRequestState = new HeartbeatRequestManager.HeartbeatRequestState( - new LogContext(), - time, - 0, // This initial interval should be 0 to ensure heartbeat on the clock - DEFAULT_RETRY_BACKOFF_MS, - DEFAULT_RETRY_BACKOFF_MAX_MS, - 0); - backgroundEventHandler = mock(BackgroundEventHandler.class); - heartbeatRequestManager = createHeartbeatRequestManager( - coordinatorRequestManager, - membershipManager, - heartbeatState, - heartbeatRequestState, - backgroundEventHandler); - when(coordinatorRequestManager.coordinator()).thenReturn(Optional.of(new Node(1, "localhost", 9999))); - when(membershipManager.state()).thenReturn(MemberState.STABLE); - - assertNotNull(getMetric("heartbeat-response-time-max")); - assertNotNull(getMetric("heartbeat-rate")); - assertNotNull(getMetric("heartbeat-total")); - assertNotNull(getMetric("last-heartbeat-seconds-ago")); - - // test poll - assertHeartbeat(heartbeatRequestManager, 0); - time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); - assertEquals(1.0, getMetric("heartbeat-total").metricValue()); - assertEquals((double) TimeUnit.MILLISECONDS.toSeconds(DEFAULT_HEARTBEAT_INTERVAL_MS), getMetric("last-heartbeat-seconds-ago").metricValue()); - - assertHeartbeat(heartbeatRequestManager, DEFAULT_HEARTBEAT_INTERVAL_MS); - assertEquals(0.06d, (double) getMetric("heartbeat-rate").metricValue(), 0.005d); - assertEquals(2.0, getMetric("heartbeat-total").metricValue()); - - // Randomly sleep for some time - Random rand = new Random(); - int randomSleepS = rand.nextInt(11); - time.sleep(randomSleepS * 1000); - assertEquals((double) randomSleepS, getMetric("last-heartbeat-seconds-ago").metricValue()); - } - @Test public void testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() { - mockStableMember(); + heartbeatRequestManager = createHeartbeatRequestManager( + coordinatorRequestManager, + membershipManager, + heartbeatState, + heartbeatRequestState, + backgroundEventHandler); time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); @@ -743,7 +682,6 @@ public void testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() { // Receive HB response fencing member when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true); - doNothing().when(membershipManager).transitionToFenced(); ClientResponse response = createHeartbeatResponse(result.unsentRequests.get(0), Errors.FENCED_MEMBER_EPOCH); result.unsentRequests.get(0).handler().onComplete(response); @@ -751,11 +689,11 @@ public void testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() { verify(heartbeatRequestState).onFailedAttempt(anyLong()); verify(heartbeatRequestState).reset(); - when(membershipManager.state()).thenReturn(MemberState.FENCED); + when(membershipManager.shouldSkipHeartbeat()).thenReturn(true); result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(0, result.unsentRequests.size(), "Member should not send heartbeats while FENCED"); - when(membershipManager.state()).thenReturn(MemberState.JOINING); + when(membershipManager.shouldSkipHeartbeat()).thenReturn(false); result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, result.unsentRequests.size(), "Fenced member should resume heartbeat after transitioning to JOINING"); } @@ -763,18 +701,18 @@ public void testFencedMemberStopHeartbeatUntilItReleasesAssignmentToRejoin() { @ParameterizedTest @ApiKeyVersionsSource(apiKey = ApiKeys.CONSUMER_GROUP_HEARTBEAT) public void testSendingLeaveGroupHeartbeatWhenPreviousOneInFlight(final short version) { - mockStableMember(); time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(1, result.unsentRequests.size()); result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(0, result.unsentRequests.size(), "No heartbeat should be sent while a previous one is in-flight"); - membershipManager.leaveGroup(); - + when(membershipManager.state()).thenReturn(MemberState.LEAVING); + when(heartbeatState.buildRequestData()).thenReturn(new ConsumerGroupHeartbeatRequestData().setMemberEpoch(-1)); ConsumerGroupHeartbeatRequest heartbeatToLeave = getHeartbeatRequest(heartbeatRequestManager, version); assertEquals(ConsumerGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH, heartbeatToLeave.data().memberEpoch()); + when(membershipManager.shouldSkipHeartbeat()).thenReturn(true); NetworkClientDelegate.PollResult pollAgain = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(0, pollAgain.unsentRequests.size()); } @@ -792,23 +730,6 @@ private void assertNoHeartbeat(HeartbeatRequestManager hrm) { assertEquals(0, pollResult.unsentRequests.size()); } - private void mockStableMember() { - membershipManager.onSubscriptionUpdated(); - // Heartbeat response without assignment to set the state to STABLE. - when(subscriptions.hasAutoAssignedPartitions()).thenReturn(true); - when(subscriptions.rebalanceListener()).thenReturn(Optional.empty()); - ConsumerGroupHeartbeatResponse rs1 = new ConsumerGroupHeartbeatResponse(new ConsumerGroupHeartbeatResponseData() - .setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) - .setMemberId(memberId) - .setMemberEpoch(memberEpoch) - .setAssignment(new Assignment()) - ); - membershipManager.onHeartbeatSuccess(rs1.data()); - membershipManager.poll(time.milliseconds()); - membershipManager.onHeartbeatRequestGenerated(); - assertEquals(MemberState.STABLE, membershipManager.state()); - } - private void ensureFatalError(Errors expectedError) { verify(membershipManager).transitionToFatal(); @@ -823,7 +744,6 @@ private void ensureFatalError(Errors expectedError) { private void ensureHeartbeatStopped() { time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS); - assertEquals(MemberState.FATAL, membershipManager.state()); NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds()); assertEquals(0, result.unsentRequests.size()); } @@ -853,8 +773,8 @@ private ClientResponse createHeartbeatResponse( ConsumerGroupHeartbeatResponseData data = new ConsumerGroupHeartbeatResponseData() .setErrorCode(error.code()) .setHeartbeatIntervalMs(DEFAULT_HEARTBEAT_INTERVAL_MS) - .setMemberId(memberId) - .setMemberEpoch(memberEpoch); + .setMemberId(DEFAULT_MEMBER_ID) + .setMemberEpoch(DEFAULT_MEMBER_EPOCH); if (error != Errors.NONE) { data.setErrorMessage("stubbed error message"); } @@ -884,10 +804,6 @@ private ConsumerConfig config() { return new ConsumerConfig(prop); } - private KafkaMetric getMetric(final String name) { - return metrics.metrics().get(metrics.metricName(name, CONSUMER_COORDINATOR_METRICS)); - } - private HeartbeatRequestManager createHeartbeatRequestManager( final CoordinatorRequestManager coordinatorRequestManager, final MembershipManager membershipManager, @@ -905,6 +821,31 @@ private HeartbeatRequestManager createHeartbeatRequestManager( heartbeatState, heartbeatRequestState, backgroundEventHandler, - metrics); + new Metrics()); + } + + private void mockJoiningMemberData(String instanceId) { + when(membershipManager.state()).thenReturn(MemberState.JOINING); + when(membershipManager.groupInstanceId()).thenReturn(Optional.ofNullable(instanceId)); + when(membershipManager.memberId()).thenReturn(""); + when(membershipManager.memberEpoch()).thenReturn(0); + when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID); + when(membershipManager.currentAssignment()).thenReturn(LocalAssignment.NONE); + when(membershipManager.serverAssignor()).thenReturn(Optional.of(DEFAULT_REMOTE_ASSIGNOR)); + } + + private void mockRejoiningMemberData() { + when(membershipManager.state()).thenReturn(MemberState.JOINING); + when(membershipManager.memberEpoch()).thenReturn(0); + when(membershipManager.groupInstanceId()).thenReturn(Optional.empty()); + } + + private void mockStableMemberData(String instanceId) { + when(membershipManager.groupInstanceId()).thenReturn(Optional.ofNullable(instanceId)); + when(membershipManager.currentAssignment()).thenReturn(new LocalAssignment(0, Collections.emptyMap())); + when(membershipManager.groupId()).thenReturn(DEFAULT_GROUP_ID); + when(membershipManager.memberId()).thenReturn(DEFAULT_MEMBER_ID); + when(membershipManager.memberEpoch()).thenReturn(DEFAULT_MEMBER_EPOCH); + when(membershipManager.serverAssignor()).thenReturn(Optional.of(DEFAULT_REMOTE_ASSIGNOR)); } }