From 62ab6ca752a316eaeeba1ec03960d9fe021c7c98 Mon Sep 17 00:00:00 2001
From: brenden20 <118419078+brenden20@users.noreply.github.com>
Date: Thu, 25 Jul 2024 17:18:04 -0500
Subject: [PATCH 1/3] Removed all traces of ConsumerTestBuilder
---
checkstyle/suppressions.xml | 3 -
.../internals/CommitRequestManagerTest.java | 4 +-
.../internals/ConsumerTestBuilder.java | 316 ------------------
.../ShareHeartbeatRequestManagerTest.java | 54 ++-
4 files changed, 27 insertions(+), 350 deletions(-)
delete mode 100644 clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index ba28341d1a3ac..1a90900924b6b 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -135,9 +135,6 @@
-
-
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
index dc16d31c6345e..33d2d6aae1b7e 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java
@@ -73,8 +73,6 @@
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_GROUP_ID;
-import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_GROUP_INSTANCE_ID;
import static org.apache.kafka.test.TestUtils.assertFutureThrows;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -99,6 +97,8 @@ public class CommitRequestManagerTest {
private final long retryBackoffMs = 100;
private final long retryBackoffMaxMs = 1000;
private static final String CONSUMER_COORDINATOR_METRICS = "consumer-coordinator-metrics";
+ private static final String DEFAULT_GROUP_ID = "group-id";
+ private static final String DEFAULT_GROUP_INSTANCE_ID = "group-instance-id";
private final Node mockedNode = new Node(1, "host1", 9092);
private SubscriptionState subscriptionState;
private LogContext logContext;
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
deleted file mode 100644
index 0e6ff3ebd9e15..0000000000000
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java
+++ /dev/null
@@ -1,316 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.clients.consumer.internals;
-
-import org.apache.kafka.clients.ApiVersions;
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.GroupRebalanceConfig;
-import org.apache.kafka.clients.MockClient;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
-import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
-import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
-import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
-import org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager;
-import org.apache.kafka.clients.consumer.internals.metrics.RebalanceMetricsManager;
-import org.apache.kafka.common.internals.ClusterResourceListeners;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.requests.MetadataResponse;
-import org.apache.kafka.common.requests.RequestTestUtils;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Timer;
-
-import java.io.Closeable;
-import java.util.HashMap;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_INSTANCE_ID_CONFIG;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager;
-import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics;
-import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState;
-import static org.apache.kafka.common.utils.Utils.closeQuietly;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-
-@SuppressWarnings("ClassDataAbstractionCoupling")
-public class ConsumerTestBuilder implements Closeable {
-
- static final long DEFAULT_RETRY_BACKOFF_MS = 80;
- static final long DEFAULT_RETRY_BACKOFF_MAX_MS = 1000;
- static final int DEFAULT_REQUEST_TIMEOUT_MS = 500;
- static final int DEFAULT_MAX_POLL_INTERVAL_MS = 10000;
- static final String DEFAULT_GROUP_INSTANCE_ID = "group-instance-id";
- static final String DEFAULT_GROUP_ID = "group-id";
- static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
- static final double DEFAULT_HEARTBEAT_JITTER_MS = 0.0;
- static final String DEFAULT_REMOTE_ASSIGNOR = "uniform";
-
- final LogContext logContext = new LogContext();
- final Time time;
- public final BlockingQueue applicationEventQueue;
- public final BlockingQueue backgroundEventQueue;
- final ConsumerConfig config;
- final long retryBackoffMs;
- final SubscriptionState subscriptions;
- final ConsumerMetadata metadata;
- final FetchConfig fetchConfig;
- final FetchBuffer fetchBuffer;
- final Metrics metrics;
- final Timer pollTimer;
- final FetchMetricsManager metricsManager;
- final NetworkClientDelegate networkClientDelegate;
- final OffsetsRequestManager offsetsRequestManager;
- final Optional coordinatorRequestManager;
- final Optional commitRequestManager;
- final Optional heartbeatRequestManager;
- final Optional membershipManager;
- final Optional heartbeatState;
- final Optional heartbeatRequestState;
- final TopicMetadataRequestManager topicMetadataRequestManager;
- final FetchRequestManager fetchRequestManager;
- final RequestManagers requestManagers;
- public final ApplicationEventProcessor applicationEventProcessor;
- public final BackgroundEventHandler backgroundEventHandler;
- public final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker;
- final MockClient client;
- final Optional groupInfo;
- final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
-
- public ConsumerTestBuilder(Optional groupInfo) {
- this(groupInfo, true, true);
- }
-
- public ConsumerTestBuilder(Optional groupInfo, boolean enableAutoCommit, boolean enableAutoTick) {
- this.groupInfo = groupInfo;
- this.time = enableAutoTick ? new MockTime(1) : new MockTime();
- this.applicationEventQueue = new LinkedBlockingQueue<>();
- this.backgroundEventQueue = new LinkedBlockingQueue<>();
- this.backgroundEventHandler = spy(new BackgroundEventHandler(backgroundEventQueue));
- this.offsetCommitCallbackInvoker = mock(OffsetCommitCallbackInvoker.class);
- GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(
- 100,
- DEFAULT_MAX_POLL_INTERVAL_MS,
- DEFAULT_HEARTBEAT_INTERVAL_MS,
- groupInfo.map(gi -> gi.groupId).orElse(null),
- groupInfo.flatMap(gi -> gi.groupInstanceId),
- DEFAULT_RETRY_BACKOFF_MS,
- DEFAULT_RETRY_BACKOFF_MAX_MS,
- true);
- ApiVersions apiVersions = new ApiVersions();
-
- Properties properties = new Properties();
- properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
- properties.put(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, DEFAULT_RETRY_BACKOFF_MS);
- properties.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, DEFAULT_REQUEST_TIMEOUT_MS);
- properties.put(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG, DEFAULT_MAX_POLL_INTERVAL_MS);
-
- if (!enableAutoCommit)
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-
- groupInfo.ifPresent(gi -> {
- properties.put(GROUP_ID_CONFIG, gi.groupId);
- gi.groupInstanceId.ifPresent(groupInstanceId -> properties.put(GROUP_INSTANCE_ID_CONFIG, groupInstanceId));
- });
-
- this.config = new ConsumerConfig(properties);
-
- this.fetchConfig = new FetchConfig(config);
- this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG);
- final long requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
- this.metrics = createMetrics(config, time);
-
- this.subscriptions = spy(createSubscriptionState(config, logContext));
- this.metadata = spy(new ConsumerMetadata(config, subscriptions, logContext, new ClusterResourceListeners()));
- this.metricsManager = createFetchMetricsManager(metrics);
- this.pollTimer = time.timer(groupRebalanceConfig.rebalanceTimeoutMs);
-
- this.client = new MockClient(time, metadata);
- MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith(1, new HashMap() {
- {
- String topic1 = "test1";
- put(topic1, 1);
- String topic2 = "test2";
- put(topic2, 1);
- }
- });
- this.client.updateMetadata(metadataResponse);
-
- this.networkClientDelegate = spy(new NetworkClientDelegate(time,
- config,
- logContext,
- client,
- metadata,
- backgroundEventHandler));
- this.offsetsRequestManager = spy(new OffsetsRequestManager(subscriptions,
- metadata,
- fetchConfig.isolationLevel,
- time,
- retryBackoffMs,
- requestTimeoutMs,
- apiVersions,
- networkClientDelegate,
- backgroundEventHandler,
- logContext));
-
- this.topicMetadataRequestManager = spy(new TopicMetadataRequestManager(logContext, time, config));
-
- if (groupInfo.isPresent()) {
- GroupInformation gi = groupInfo.get();
- CoordinatorRequestManager coordinator = spy(new CoordinatorRequestManager(
- logContext,
- DEFAULT_RETRY_BACKOFF_MS,
- DEFAULT_RETRY_BACKOFF_MAX_MS,
- backgroundEventHandler,
- gi.groupId
- ));
- CommitRequestManager commit = spy(new CommitRequestManager(time,
- logContext,
- subscriptions,
- config,
- coordinator,
- offsetCommitCallbackInvoker,
- gi.groupId,
- gi.groupInstanceId,
- metrics));
- MembershipManager mm = spy(
- new MembershipManagerImpl(
- gi.groupId,
- gi.groupInstanceId,
- groupRebalanceConfig.rebalanceTimeoutMs,
- gi.serverAssignor,
- subscriptions,
- commit,
- metadata,
- logContext,
- Optional.empty(),
- backgroundEventHandler,
- time,
- mock(RebalanceMetricsManager.class)
- )
- );
- HeartbeatRequestManager.HeartbeatState heartbeatState = spy(new HeartbeatRequestManager.HeartbeatState(
- subscriptions,
- mm,
- DEFAULT_MAX_POLL_INTERVAL_MS));
- HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState = spy(new HeartbeatRequestManager.HeartbeatRequestState(
- logContext,
- time,
- gi.heartbeatIntervalMs,
- retryBackoffMs,
- DEFAULT_RETRY_BACKOFF_MAX_MS,
- gi.heartbeatJitterMs));
- HeartbeatRequestManager heartbeat = spy(new HeartbeatRequestManager(
- logContext,
- pollTimer,
- config,
- coordinator,
- mm,
- heartbeatState,
- heartbeatRequestState,
- backgroundEventHandler,
- metrics));
-
- this.coordinatorRequestManager = Optional.of(coordinator);
- this.commitRequestManager = Optional.of(commit);
- this.heartbeatRequestManager = Optional.of(heartbeat);
- this.heartbeatState = Optional.of(heartbeatState);
- this.heartbeatRequestState = Optional.of(heartbeatRequestState);
- this.membershipManager = Optional.of(mm);
- } else {
- this.coordinatorRequestManager = Optional.empty();
- this.commitRequestManager = Optional.empty();
- this.heartbeatRequestManager = Optional.empty();
- this.heartbeatState = Optional.empty();
- this.heartbeatRequestState = Optional.empty();
- this.membershipManager = Optional.empty();
- }
-
- this.fetchBuffer = new FetchBuffer(logContext);
- this.fetchRequestManager = spy(new FetchRequestManager(logContext,
- time,
- metadata,
- subscriptions,
- fetchConfig,
- fetchBuffer,
- metricsManager,
- networkClientDelegate,
- apiVersions));
- this.requestManagers = new RequestManagers(logContext,
- offsetsRequestManager,
- topicMetadataRequestManager,
- fetchRequestManager,
- coordinatorRequestManager,
- commitRequestManager,
- heartbeatRequestManager,
- membershipManager
- );
- this.applicationEventProcessor = spy(new ApplicationEventProcessor(
- logContext,
- requestManagers,
- metadata,
- subscriptions
- )
- );
-
- this.rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker(
- logContext,
- subscriptions,
- time,
- new RebalanceCallbackMetricsManager(metrics)
- );
- }
-
- @Override
- public void close() {
- closeQuietly(requestManagers, RequestManagers.class.getSimpleName());
- }
-
- public static class GroupInformation {
- final String groupId;
- final Optional groupInstanceId;
- final int heartbeatIntervalMs;
- final double heartbeatJitterMs;
- final Optional serverAssignor;
-
- public GroupInformation(String groupId, Optional groupInstanceId) {
- this(groupId, groupInstanceId, DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_HEARTBEAT_JITTER_MS,
- Optional.of(DEFAULT_REMOTE_ASSIGNOR));
- }
-
- public GroupInformation(String groupId, Optional groupInstanceId, int heartbeatIntervalMs, double heartbeatJitterMs, Optional serverAssignor) {
- this.heartbeatIntervalMs = heartbeatIntervalMs;
- this.heartbeatJitterMs = heartbeatJitterMs;
- this.serverAssignor = serverAssignor;
- this.groupId = groupId;
- this.groupInstanceId = groupInstanceId;
- }
- }
-
- static Optional createDefaultGroupInformation() {
- return Optional.of(new GroupInformation(DEFAULT_GROUP_ID, Optional.empty()));
- }
-}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
index 5fdbc9f6e0d6f..004ddb615bf9d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
@@ -59,10 +59,6 @@
import java.util.Random;
import java.util.concurrent.TimeUnit;
-import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS;
-import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS;
-import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_RETRY_BACKOFF_MAX_MS;
-import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_RETRY_BACKOFF_MS;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP_PREFIX;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -80,11 +76,11 @@
import static org.mockito.Mockito.when;
public class ShareHeartbeatRequestManagerTest {
- private final long retryBackoffMs = DEFAULT_RETRY_BACKOFF_MS;
- private final int heartbeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL_MS;
- private final int maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL_MS;
- private final long retryBackoffMaxMs = DEFAULT_RETRY_BACKOFF_MAX_MS;
- private static final String DEFAULT_GROUP_ID = "groupId";
+ private static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
+ private static final int DEFAULT_MAX_POLL_INTERVAL_MS = 10000;
+ private static final long DEFAULT_RETRY_BACKOFF_MAX_MS = 1000;
+ private static final long DEFAULT_RETRY_BACKOFF_MS = 80;
+ private static final String DEFAULT_GROUP_ID = "group-id";
private static final String SHARE_CONSUMER_COORDINATOR_METRICS = CONSUMER_SHARE_METRIC_GROUP_PREFIX + "-coordinator-metrics";
private ShareConsumerTestBuilder testBuilder;
@@ -530,7 +526,7 @@ public void testUnknownMemberId(final short version) {
public void testHeartbeatState() {
// The initial ShareGroupHeartbeatRequest sets most fields to their initial empty values
ShareGroupHeartbeatRequestData data = heartbeatState.buildRequestData();
- assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId());
+ assertEquals(DEFAULT_GROUP_ID, data.groupId());
assertEquals("", data.memberId());
assertEquals(0, data.memberEpoch());
assertEquals(Collections.emptyList(), data.subscribedTopicNames());
@@ -540,7 +536,7 @@ public void testHeartbeatState() {
// Mock a response from the group coordinator, that supplies the member ID and a new epoch
mockStableMember();
data = heartbeatState.buildRequestData();
- assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId());
+ assertEquals(DEFAULT_GROUP_ID, data.groupId());
assertEquals(memberId, data.memberId());
assertEquals(1, data.memberEpoch());
assertNull(data.subscribedTopicNames());
@@ -553,7 +549,7 @@ public void testHeartbeatState() {
membershipManager.onSubscriptionUpdated();
membershipManager.transitionToFenced(); // And indirect way of moving to JOINING state
data = heartbeatState.buildRequestData();
- assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId());
+ assertEquals(DEFAULT_GROUP_ID, data.groupId());
assertEquals(memberId, data.memberId());
assertEquals(0, data.memberEpoch());
assertEquals(Collections.singletonList(topic), data.subscribedTopicNames());
@@ -589,9 +585,9 @@ public void testPollTimerExpiration() {
heartbeatRequestState = spy(new ShareHeartbeatRequestManager.HeartbeatRequestState(
new LogContext(),
time,
- heartbeatIntervalMs,
- retryBackoffMs,
- retryBackoffMaxMs,
+ DEFAULT_HEARTBEAT_INTERVAL_MS,
+ DEFAULT_RETRY_BACKOFF_MS,
+ DEFAULT_RETRY_BACKOFF_MAX_MS,
0));
backgroundEventHandler = mock(BackgroundEventHandler.class);
@@ -606,8 +602,8 @@ public void testPollTimerExpiration() {
// On poll timer expiration, the member should send a last heartbeat to leave the group
// and notify the membership manager
- time.sleep(maxPollIntervalMs);
- assertHeartbeat(heartbeatRequestManager, heartbeatIntervalMs);
+ time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS);
+ assertHeartbeat(heartbeatRequestManager, DEFAULT_HEARTBEAT_INTERVAL_MS);
verify(membershipManager).transitionToSendingLeaveGroup(true);
verify(heartbeatState).reset();
verify(heartbeatRequestState).reset();
@@ -620,7 +616,7 @@ public void testPollTimerExpiration() {
assertTrue(pollTimer.notExpired());
verify(membershipManager).maybeRejoinStaleMember();
when(membershipManager.shouldSkipHeartbeat()).thenReturn(false);
- assertHeartbeat(heartbeatRequestManager, heartbeatIntervalMs);
+ assertHeartbeat(heartbeatRequestManager, DEFAULT_HEARTBEAT_INTERVAL_MS);
}
/**
@@ -635,7 +631,7 @@ public void testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeavin
when(membershipManager.shouldSkipHeartbeat()).thenReturn(false);
when(membershipManager.isLeavingGroup()).thenReturn(true);
- time.sleep(maxPollIntervalMs);
+ time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS);
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
// No transition to leave due to stale member should be triggered, because the member is
@@ -658,8 +654,8 @@ public void testHeartbeatMetrics() {
new LogContext(),
time,
0, // This initial interval should be 0 to ensure heartbeat on the clock
- retryBackoffMs,
- retryBackoffMaxMs,
+ DEFAULT_RETRY_BACKOFF_MS,
+ DEFAULT_RETRY_BACKOFF_MAX_MS,
0);
backgroundEventHandler = mock(BackgroundEventHandler.class);
heartbeatRequestManager = createHeartbeatRequestManager(
@@ -678,11 +674,11 @@ public void testHeartbeatMetrics() {
// test poll
assertHeartbeat(heartbeatRequestManager, 0);
- time.sleep(heartbeatIntervalMs);
+ time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
assertEquals(1.0, getMetric("heartbeat-total").metricValue());
- assertEquals((double) TimeUnit.MILLISECONDS.toSeconds(heartbeatIntervalMs), getMetric("last-heartbeat-seconds-ago").metricValue());
+ assertEquals((double) TimeUnit.MILLISECONDS.toSeconds(DEFAULT_HEARTBEAT_INTERVAL_MS), getMetric("last-heartbeat-seconds-ago").metricValue());
- assertHeartbeat(heartbeatRequestManager, heartbeatIntervalMs);
+ assertHeartbeat(heartbeatRequestManager, DEFAULT_HEARTBEAT_INTERVAL_MS);
assertEquals(0.06d, (double) getMetric("heartbeat-rate").metricValue(), 0.005d);
assertEquals(2.0, getMetric("heartbeat-total").metricValue());
@@ -808,10 +804,10 @@ private ConsumerConfig config() {
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
- prop.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, String.valueOf(maxPollIntervalMs));
- prop.setProperty(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, String.valueOf(retryBackoffMs));
- prop.setProperty(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG, String.valueOf(retryBackoffMaxMs));
- prop.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, String.valueOf(heartbeatIntervalMs));
+ prop.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, String.valueOf(DEFAULT_MAX_POLL_INTERVAL_MS));
+ prop.setProperty(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, String.valueOf(DEFAULT_RETRY_BACKOFF_MS));
+ prop.setProperty(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG, String.valueOf(DEFAULT_RETRY_BACKOFF_MAX_MS));
+ prop.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, String.valueOf(DEFAULT_HEARTBEAT_INTERVAL_MS));
return new ConsumerConfig(prop);
}
@@ -826,7 +822,7 @@ private ShareHeartbeatRequestManager createHeartbeatRequestManager(
final ShareHeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState,
final BackgroundEventHandler backgroundEventHandler) {
LogContext logContext = new LogContext();
- pollTimer = time.timer(maxPollIntervalMs);
+ pollTimer = time.timer(DEFAULT_MAX_POLL_INTERVAL_MS);
return new ShareHeartbeatRequestManager(
logContext,
pollTimer,
From 4685990e5419ee41b5a6de68486933b2ba14d56d Mon Sep 17 00:00:00 2001
From: brenden20 <118419078+brenden20@users.noreply.github.com>
Date: Fri, 26 Jul 2024 15:49:13 -0500
Subject: [PATCH 2/3] Used ShareConsumerTestBuilder imports
---
.../ShareHeartbeatRequestManagerTest.java | 54 ++++++++++---------
1 file changed, 29 insertions(+), 25 deletions(-)
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
index 004ddb615bf9d..a9eeff6c41109 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
@@ -59,6 +59,10 @@
import java.util.Random;
import java.util.concurrent.TimeUnit;
+import static org.apache.kafka.clients.consumer.internals.ShareConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS;
+import static org.apache.kafka.clients.consumer.internals.ShareConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS;
+import static org.apache.kafka.clients.consumer.internals.ShareConsumerTestBuilder.DEFAULT_RETRY_BACKOFF_MAX_MS;
+import static org.apache.kafka.clients.consumer.internals.ShareConsumerTestBuilder.DEFAULT_RETRY_BACKOFF_MS;
import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP_PREFIX;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -76,11 +80,11 @@
import static org.mockito.Mockito.when;
public class ShareHeartbeatRequestManagerTest {
- private static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000;
- private static final int DEFAULT_MAX_POLL_INTERVAL_MS = 10000;
- private static final long DEFAULT_RETRY_BACKOFF_MAX_MS = 1000;
- private static final long DEFAULT_RETRY_BACKOFF_MS = 80;
- private static final String DEFAULT_GROUP_ID = "group-id";
+ private final long retryBackoffMs = DEFAULT_RETRY_BACKOFF_MS;
+ private final int heartbeatIntervalMs = DEFAULT_HEARTBEAT_INTERVAL_MS;
+ private final int maxPollIntervalMs = DEFAULT_MAX_POLL_INTERVAL_MS;
+ private final long retryBackoffMaxMs = DEFAULT_RETRY_BACKOFF_MAX_MS;
+ private static final String DEFAULT_GROUP_ID = "groupId";
private static final String SHARE_CONSUMER_COORDINATOR_METRICS = CONSUMER_SHARE_METRIC_GROUP_PREFIX + "-coordinator-metrics";
private ShareConsumerTestBuilder testBuilder;
@@ -526,7 +530,7 @@ public void testUnknownMemberId(final short version) {
public void testHeartbeatState() {
// The initial ShareGroupHeartbeatRequest sets most fields to their initial empty values
ShareGroupHeartbeatRequestData data = heartbeatState.buildRequestData();
- assertEquals(DEFAULT_GROUP_ID, data.groupId());
+ assertEquals(ShareConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId());
assertEquals("", data.memberId());
assertEquals(0, data.memberEpoch());
assertEquals(Collections.emptyList(), data.subscribedTopicNames());
@@ -536,7 +540,7 @@ public void testHeartbeatState() {
// Mock a response from the group coordinator, that supplies the member ID and a new epoch
mockStableMember();
data = heartbeatState.buildRequestData();
- assertEquals(DEFAULT_GROUP_ID, data.groupId());
+ assertEquals(ShareConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId());
assertEquals(memberId, data.memberId());
assertEquals(1, data.memberEpoch());
assertNull(data.subscribedTopicNames());
@@ -549,7 +553,7 @@ public void testHeartbeatState() {
membershipManager.onSubscriptionUpdated();
membershipManager.transitionToFenced(); // And indirect way of moving to JOINING state
data = heartbeatState.buildRequestData();
- assertEquals(DEFAULT_GROUP_ID, data.groupId());
+ assertEquals(ShareConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId());
assertEquals(memberId, data.memberId());
assertEquals(0, data.memberEpoch());
assertEquals(Collections.singletonList(topic), data.subscribedTopicNames());
@@ -585,9 +589,9 @@ public void testPollTimerExpiration() {
heartbeatRequestState = spy(new ShareHeartbeatRequestManager.HeartbeatRequestState(
new LogContext(),
time,
- DEFAULT_HEARTBEAT_INTERVAL_MS,
- DEFAULT_RETRY_BACKOFF_MS,
- DEFAULT_RETRY_BACKOFF_MAX_MS,
+ heartbeatIntervalMs,
+ retryBackoffMs,
+ retryBackoffMaxMs,
0));
backgroundEventHandler = mock(BackgroundEventHandler.class);
@@ -602,8 +606,8 @@ public void testPollTimerExpiration() {
// On poll timer expiration, the member should send a last heartbeat to leave the group
// and notify the membership manager
- time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS);
- assertHeartbeat(heartbeatRequestManager, DEFAULT_HEARTBEAT_INTERVAL_MS);
+ time.sleep(maxPollIntervalMs);
+ assertHeartbeat(heartbeatRequestManager, heartbeatIntervalMs);
verify(membershipManager).transitionToSendingLeaveGroup(true);
verify(heartbeatState).reset();
verify(heartbeatRequestState).reset();
@@ -616,7 +620,7 @@ public void testPollTimerExpiration() {
assertTrue(pollTimer.notExpired());
verify(membershipManager).maybeRejoinStaleMember();
when(membershipManager.shouldSkipHeartbeat()).thenReturn(false);
- assertHeartbeat(heartbeatRequestManager, DEFAULT_HEARTBEAT_INTERVAL_MS);
+ assertHeartbeat(heartbeatRequestManager, heartbeatIntervalMs);
}
/**
@@ -631,7 +635,7 @@ public void testPollTimerExpirationShouldNotMarkMemberStaleIfMemberAlreadyLeavin
when(membershipManager.shouldSkipHeartbeat()).thenReturn(false);
when(membershipManager.isLeavingGroup()).thenReturn(true);
- time.sleep(DEFAULT_MAX_POLL_INTERVAL_MS);
+ time.sleep(maxPollIntervalMs);
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
// No transition to leave due to stale member should be triggered, because the member is
@@ -654,8 +658,8 @@ public void testHeartbeatMetrics() {
new LogContext(),
time,
0, // This initial interval should be 0 to ensure heartbeat on the clock
- DEFAULT_RETRY_BACKOFF_MS,
- DEFAULT_RETRY_BACKOFF_MAX_MS,
+ retryBackoffMs,
+ retryBackoffMaxMs,
0);
backgroundEventHandler = mock(BackgroundEventHandler.class);
heartbeatRequestManager = createHeartbeatRequestManager(
@@ -674,11 +678,11 @@ public void testHeartbeatMetrics() {
// test poll
assertHeartbeat(heartbeatRequestManager, 0);
- time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
+ time.sleep(heartbeatIntervalMs);
assertEquals(1.0, getMetric("heartbeat-total").metricValue());
- assertEquals((double) TimeUnit.MILLISECONDS.toSeconds(DEFAULT_HEARTBEAT_INTERVAL_MS), getMetric("last-heartbeat-seconds-ago").metricValue());
+ assertEquals((double) TimeUnit.MILLISECONDS.toSeconds(heartbeatIntervalMs), getMetric("last-heartbeat-seconds-ago").metricValue());
- assertHeartbeat(heartbeatRequestManager, DEFAULT_HEARTBEAT_INTERVAL_MS);
+ assertHeartbeat(heartbeatRequestManager, heartbeatIntervalMs);
assertEquals(0.06d, (double) getMetric("heartbeat-rate").metricValue(), 0.005d);
assertEquals(2.0, getMetric("heartbeat-total").metricValue());
@@ -804,10 +808,10 @@ private ConsumerConfig config() {
prop.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
prop.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
- prop.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, String.valueOf(DEFAULT_MAX_POLL_INTERVAL_MS));
- prop.setProperty(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, String.valueOf(DEFAULT_RETRY_BACKOFF_MS));
- prop.setProperty(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG, String.valueOf(DEFAULT_RETRY_BACKOFF_MAX_MS));
- prop.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, String.valueOf(DEFAULT_HEARTBEAT_INTERVAL_MS));
+ prop.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, String.valueOf(maxPollIntervalMs));
+ prop.setProperty(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG, String.valueOf(retryBackoffMs));
+ prop.setProperty(ConsumerConfig.RETRY_BACKOFF_MAX_MS_CONFIG, String.valueOf(retryBackoffMaxMs));
+ prop.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, String.valueOf(heartbeatIntervalMs));
return new ConsumerConfig(prop);
}
@@ -822,7 +826,7 @@ private ShareHeartbeatRequestManager createHeartbeatRequestManager(
final ShareHeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState,
final BackgroundEventHandler backgroundEventHandler) {
LogContext logContext = new LogContext();
- pollTimer = time.timer(DEFAULT_MAX_POLL_INTERVAL_MS);
+ pollTimer = time.timer(maxPollIntervalMs);
return new ShareHeartbeatRequestManager(
logContext,
pollTimer,
From 10cca29b031a375e3eea9c6456448a5d88d56e80 Mon Sep 17 00:00:00 2001
From: brenden20 <118419078+brenden20@users.noreply.github.com>
Date: Sun, 4 Aug 2024 00:41:10 -0500
Subject: [PATCH 3/3] Update ShareHeartbeatRequestManagerTest.java
---
.../consumer/internals/ShareHeartbeatRequestManagerTest.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
index a9eeff6c41109..5ea93dae40f74 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java
@@ -59,11 +59,11 @@
import java.util.Random;
import java.util.concurrent.TimeUnit;
+import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP_PREFIX;
import static org.apache.kafka.clients.consumer.internals.ShareConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS;
import static org.apache.kafka.clients.consumer.internals.ShareConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS;
import static org.apache.kafka.clients.consumer.internals.ShareConsumerTestBuilder.DEFAULT_RETRY_BACKOFF_MAX_MS;
import static org.apache.kafka.clients.consumer.internals.ShareConsumerTestBuilder.DEFAULT_RETRY_BACKOFF_MS;
-import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP_PREFIX;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;