diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index def3e2bc0d08d..df79a17ccbefd 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -135,9 +135,6 @@ - - diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java index dc16d31c6345e..33d2d6aae1b7e 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java @@ -73,8 +73,6 @@ import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_GROUP_ID; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_GROUP_INSTANCE_ID; import static org.apache.kafka.test.TestUtils.assertFutureThrows; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -99,6 +97,8 @@ public class CommitRequestManagerTest { private final long retryBackoffMs = 100; private final long retryBackoffMaxMs = 1000; private static final String CONSUMER_COORDINATOR_METRICS = "consumer-coordinator-metrics"; + private static final String DEFAULT_GROUP_ID = "group-id"; + private static final String DEFAULT_GROUP_INSTANCE_ID = "group-instance-id"; private final Node mockedNode = new Node(1, "host1", 9092); private SubscriptionState subscriptionState; private LogContext logContext; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java deleted file mode 100644 index 0e6ff3ebd9e15..0000000000000 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerTestBuilder.java +++ /dev/null @@ -1,316 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.clients.consumer.internals; - -import org.apache.kafka.clients.ApiVersions; -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.GroupRebalanceConfig; -import org.apache.kafka.clients.MockClient; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent; -import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor; -import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent; -import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler; -import org.apache.kafka.clients.consumer.internals.metrics.RebalanceCallbackMetricsManager; -import org.apache.kafka.clients.consumer.internals.metrics.RebalanceMetricsManager; -import org.apache.kafka.common.internals.ClusterResourceListeners; -import org.apache.kafka.common.metrics.Metrics; -import org.apache.kafka.common.requests.MetadataResponse; -import org.apache.kafka.common.requests.RequestTestUtils; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.utils.LogContext; -import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.common.utils.Time; -import org.apache.kafka.common.utils.Timer; - -import java.io.Closeable; -import java.util.HashMap; -import java.util.Optional; -import java.util.Properties; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; -import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_INSTANCE_ID_CONFIG; -import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; -import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createFetchMetricsManager; -import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createMetrics; -import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.createSubscriptionState; -import static org.apache.kafka.common.utils.Utils.closeQuietly; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.spy; - -@SuppressWarnings("ClassDataAbstractionCoupling") -public class ConsumerTestBuilder implements Closeable { - - static final long DEFAULT_RETRY_BACKOFF_MS = 80; - static final long DEFAULT_RETRY_BACKOFF_MAX_MS = 1000; - static final int DEFAULT_REQUEST_TIMEOUT_MS = 500; - static final int DEFAULT_MAX_POLL_INTERVAL_MS = 10000; - static final String DEFAULT_GROUP_INSTANCE_ID = "group-instance-id"; - static final String DEFAULT_GROUP_ID = "group-id"; - static final int DEFAULT_HEARTBEAT_INTERVAL_MS = 1000; - static final double DEFAULT_HEARTBEAT_JITTER_MS = 0.0; - static final String DEFAULT_REMOTE_ASSIGNOR = "uniform"; - - final LogContext logContext = new LogContext(); - final Time time; - public final BlockingQueue applicationEventQueue; - public final BlockingQueue backgroundEventQueue; - final ConsumerConfig config; - final long retryBackoffMs; - final SubscriptionState subscriptions; - final ConsumerMetadata metadata; - final FetchConfig fetchConfig; - final FetchBuffer fetchBuffer; - final Metrics metrics; - final Timer pollTimer; - final FetchMetricsManager metricsManager; - final NetworkClientDelegate networkClientDelegate; - final OffsetsRequestManager offsetsRequestManager; - final Optional coordinatorRequestManager; - final Optional commitRequestManager; - final Optional heartbeatRequestManager; - final Optional membershipManager; - final Optional heartbeatState; - final Optional heartbeatRequestState; - final TopicMetadataRequestManager topicMetadataRequestManager; - final FetchRequestManager fetchRequestManager; - final RequestManagers requestManagers; - public final ApplicationEventProcessor applicationEventProcessor; - public final BackgroundEventHandler backgroundEventHandler; - public final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker; - final MockClient client; - final Optional groupInfo; - final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker; - - public ConsumerTestBuilder(Optional groupInfo) { - this(groupInfo, true, true); - } - - public ConsumerTestBuilder(Optional groupInfo, boolean enableAutoCommit, boolean enableAutoTick) { - this.groupInfo = groupInfo; - this.time = enableAutoTick ? new MockTime(1) : new MockTime(); - this.applicationEventQueue = new LinkedBlockingQueue<>(); - this.backgroundEventQueue = new LinkedBlockingQueue<>(); - this.backgroundEventHandler = spy(new BackgroundEventHandler(backgroundEventQueue)); - this.offsetCommitCallbackInvoker = mock(OffsetCommitCallbackInvoker.class); - GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig( - 100, - DEFAULT_MAX_POLL_INTERVAL_MS, - DEFAULT_HEARTBEAT_INTERVAL_MS, - groupInfo.map(gi -> gi.groupId).orElse(null), - groupInfo.flatMap(gi -> gi.groupInstanceId), - DEFAULT_RETRY_BACKOFF_MS, - DEFAULT_RETRY_BACKOFF_MAX_MS, - true); - ApiVersions apiVersions = new ApiVersions(); - - Properties properties = new Properties(); - properties.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - properties.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - properties.put(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG, DEFAULT_RETRY_BACKOFF_MS); - properties.put(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG, DEFAULT_REQUEST_TIMEOUT_MS); - properties.put(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG, DEFAULT_MAX_POLL_INTERVAL_MS); - - if (!enableAutoCommit) - properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - - groupInfo.ifPresent(gi -> { - properties.put(GROUP_ID_CONFIG, gi.groupId); - gi.groupInstanceId.ifPresent(groupInstanceId -> properties.put(GROUP_INSTANCE_ID_CONFIG, groupInstanceId)); - }); - - this.config = new ConsumerConfig(properties); - - this.fetchConfig = new FetchConfig(config); - this.retryBackoffMs = config.getLong(ConsumerConfig.RETRY_BACKOFF_MS_CONFIG); - final long requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); - this.metrics = createMetrics(config, time); - - this.subscriptions = spy(createSubscriptionState(config, logContext)); - this.metadata = spy(new ConsumerMetadata(config, subscriptions, logContext, new ClusterResourceListeners())); - this.metricsManager = createFetchMetricsManager(metrics); - this.pollTimer = time.timer(groupRebalanceConfig.rebalanceTimeoutMs); - - this.client = new MockClient(time, metadata); - MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith(1, new HashMap() { - { - String topic1 = "test1"; - put(topic1, 1); - String topic2 = "test2"; - put(topic2, 1); - } - }); - this.client.updateMetadata(metadataResponse); - - this.networkClientDelegate = spy(new NetworkClientDelegate(time, - config, - logContext, - client, - metadata, - backgroundEventHandler)); - this.offsetsRequestManager = spy(new OffsetsRequestManager(subscriptions, - metadata, - fetchConfig.isolationLevel, - time, - retryBackoffMs, - requestTimeoutMs, - apiVersions, - networkClientDelegate, - backgroundEventHandler, - logContext)); - - this.topicMetadataRequestManager = spy(new TopicMetadataRequestManager(logContext, time, config)); - - if (groupInfo.isPresent()) { - GroupInformation gi = groupInfo.get(); - CoordinatorRequestManager coordinator = spy(new CoordinatorRequestManager( - logContext, - DEFAULT_RETRY_BACKOFF_MS, - DEFAULT_RETRY_BACKOFF_MAX_MS, - backgroundEventHandler, - gi.groupId - )); - CommitRequestManager commit = spy(new CommitRequestManager(time, - logContext, - subscriptions, - config, - coordinator, - offsetCommitCallbackInvoker, - gi.groupId, - gi.groupInstanceId, - metrics)); - MembershipManager mm = spy( - new MembershipManagerImpl( - gi.groupId, - gi.groupInstanceId, - groupRebalanceConfig.rebalanceTimeoutMs, - gi.serverAssignor, - subscriptions, - commit, - metadata, - logContext, - Optional.empty(), - backgroundEventHandler, - time, - mock(RebalanceMetricsManager.class) - ) - ); - HeartbeatRequestManager.HeartbeatState heartbeatState = spy(new HeartbeatRequestManager.HeartbeatState( - subscriptions, - mm, - DEFAULT_MAX_POLL_INTERVAL_MS)); - HeartbeatRequestManager.HeartbeatRequestState heartbeatRequestState = spy(new HeartbeatRequestManager.HeartbeatRequestState( - logContext, - time, - gi.heartbeatIntervalMs, - retryBackoffMs, - DEFAULT_RETRY_BACKOFF_MAX_MS, - gi.heartbeatJitterMs)); - HeartbeatRequestManager heartbeat = spy(new HeartbeatRequestManager( - logContext, - pollTimer, - config, - coordinator, - mm, - heartbeatState, - heartbeatRequestState, - backgroundEventHandler, - metrics)); - - this.coordinatorRequestManager = Optional.of(coordinator); - this.commitRequestManager = Optional.of(commit); - this.heartbeatRequestManager = Optional.of(heartbeat); - this.heartbeatState = Optional.of(heartbeatState); - this.heartbeatRequestState = Optional.of(heartbeatRequestState); - this.membershipManager = Optional.of(mm); - } else { - this.coordinatorRequestManager = Optional.empty(); - this.commitRequestManager = Optional.empty(); - this.heartbeatRequestManager = Optional.empty(); - this.heartbeatState = Optional.empty(); - this.heartbeatRequestState = Optional.empty(); - this.membershipManager = Optional.empty(); - } - - this.fetchBuffer = new FetchBuffer(logContext); - this.fetchRequestManager = spy(new FetchRequestManager(logContext, - time, - metadata, - subscriptions, - fetchConfig, - fetchBuffer, - metricsManager, - networkClientDelegate, - apiVersions)); - this.requestManagers = new RequestManagers(logContext, - offsetsRequestManager, - topicMetadataRequestManager, - fetchRequestManager, - coordinatorRequestManager, - commitRequestManager, - heartbeatRequestManager, - membershipManager - ); - this.applicationEventProcessor = spy(new ApplicationEventProcessor( - logContext, - requestManagers, - metadata, - subscriptions - ) - ); - - this.rebalanceListenerInvoker = new ConsumerRebalanceListenerInvoker( - logContext, - subscriptions, - time, - new RebalanceCallbackMetricsManager(metrics) - ); - } - - @Override - public void close() { - closeQuietly(requestManagers, RequestManagers.class.getSimpleName()); - } - - public static class GroupInformation { - final String groupId; - final Optional groupInstanceId; - final int heartbeatIntervalMs; - final double heartbeatJitterMs; - final Optional serverAssignor; - - public GroupInformation(String groupId, Optional groupInstanceId) { - this(groupId, groupInstanceId, DEFAULT_HEARTBEAT_INTERVAL_MS, DEFAULT_HEARTBEAT_JITTER_MS, - Optional.of(DEFAULT_REMOTE_ASSIGNOR)); - } - - public GroupInformation(String groupId, Optional groupInstanceId, int heartbeatIntervalMs, double heartbeatJitterMs, Optional serverAssignor) { - this.heartbeatIntervalMs = heartbeatIntervalMs; - this.heartbeatJitterMs = heartbeatJitterMs; - this.serverAssignor = serverAssignor; - this.groupId = groupId; - this.groupInstanceId = groupInstanceId; - } - } - - static Optional createDefaultGroupInformation() { - return Optional.of(new GroupInformation(DEFAULT_GROUP_ID, Optional.empty())); - } -} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java index 5fdbc9f6e0d6f..5ea93dae40f74 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManagerTest.java @@ -59,11 +59,11 @@ import java.util.Random; import java.util.concurrent.TimeUnit; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_RETRY_BACKOFF_MAX_MS; -import static org.apache.kafka.clients.consumer.internals.ConsumerTestBuilder.DEFAULT_RETRY_BACKOFF_MS; import static org.apache.kafka.clients.consumer.internals.ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP_PREFIX; +import static org.apache.kafka.clients.consumer.internals.ShareConsumerTestBuilder.DEFAULT_HEARTBEAT_INTERVAL_MS; +import static org.apache.kafka.clients.consumer.internals.ShareConsumerTestBuilder.DEFAULT_MAX_POLL_INTERVAL_MS; +import static org.apache.kafka.clients.consumer.internals.ShareConsumerTestBuilder.DEFAULT_RETRY_BACKOFF_MAX_MS; +import static org.apache.kafka.clients.consumer.internals.ShareConsumerTestBuilder.DEFAULT_RETRY_BACKOFF_MS; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -530,7 +530,7 @@ public void testUnknownMemberId(final short version) { public void testHeartbeatState() { // The initial ShareGroupHeartbeatRequest sets most fields to their initial empty values ShareGroupHeartbeatRequestData data = heartbeatState.buildRequestData(); - assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId()); + assertEquals(ShareConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId()); assertEquals("", data.memberId()); assertEquals(0, data.memberEpoch()); assertEquals(Collections.emptyList(), data.subscribedTopicNames()); @@ -540,7 +540,7 @@ public void testHeartbeatState() { // Mock a response from the group coordinator, that supplies the member ID and a new epoch mockStableMember(); data = heartbeatState.buildRequestData(); - assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId()); + assertEquals(ShareConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId()); assertEquals(memberId, data.memberId()); assertEquals(1, data.memberEpoch()); assertNull(data.subscribedTopicNames()); @@ -553,7 +553,7 @@ public void testHeartbeatState() { membershipManager.onSubscriptionUpdated(); membershipManager.transitionToFenced(); // And indirect way of moving to JOINING state data = heartbeatState.buildRequestData(); - assertEquals(ConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId()); + assertEquals(ShareConsumerTestBuilder.DEFAULT_GROUP_ID, data.groupId()); assertEquals(memberId, data.memberId()); assertEquals(0, data.memberEpoch()); assertEquals(Collections.singletonList(topic), data.subscribedTopicNames());