diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 267a52873d037..354818a270bab 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -463,6 +463,10 @@ void maybeUpdateSubscriptionMetadata() { } } + private boolean coordinatorUnknownAndUnready(Timer timer) { + return coordinatorUnknown() && !ensureCoordinatorReady(timer); + } + /** * Poll for coordinator events. This ensures that the coordinator is known and that the consumer * has joined the group (if it is using group management). This also handles periodic offset commits @@ -488,7 +492,7 @@ public boolean poll(Timer timer, boolean waitForJoinGroup) { // Always update the heartbeat last poll time so that the heartbeat thread does not leave the // group proactively due to application inactivity even if (say) the coordinator cannot be found. pollHeartbeat(timer.currentTimeMs()); - if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) { + if (coordinatorUnknownAndUnready(timer)) { return false; } @@ -525,15 +529,13 @@ public boolean poll(Timer timer, boolean waitForJoinGroup) { } } } else { - // For manually assigned partitions, if there are no ready nodes, await metadata. + // For manually assigned partitions, if coordinator is unknown, make sure we lookup one and await metadata. // If connections to all nodes fail, wakeups triggered while attempting to send fetch // requests result in polls returning immediately, causing a tight loop of polls. Without // the wakeup, poll() with no channels would block for the timeout, delaying re-connection. - // awaitMetadataUpdate() initiates new connections with configured backoff and avoids the busy loop. - // When group management is used, metadata wait is already performed for this scenario as - // coordinator is unknown, hence this check is not required. - if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) { - client.awaitMetadataUpdate(timer); + // awaitMetadataUpdate() in ensureCoordinatorReady initiates new connections with configured backoff and avoids the busy loop. + if (coordinatorUnknownAndUnready(timer)) { + return false; } } @@ -1030,7 +1032,7 @@ public boolean commitOffsetsSync(Map offsets, return true; do { - if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) { + if (coordinatorUnknownAndUnready(timer)) { return false; } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 9b79473d89e4a..f8b86b83cd9b7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -192,6 +192,9 @@ public class KafkaConsumerTest { private final String partitionLost = "Hit partition lost "; private final Collection singleTopicPartition = Collections.singleton(new TopicPartition(topic, 0)); + private final Time time = new MockTime(); + private final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); + private final ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); @Test public void testMetricsReporterAutoGeneratedClientId() { @@ -266,12 +269,9 @@ private KafkaConsumer setUpConsumerWithRecordsToPoll(TopicPartit } private KafkaConsumer setUpConsumerWithRecordsToPoll(TopicPartition tp, int recordCount, Deserializer deserializer) { - Time time = new MockTime(); Cluster cluster = TestUtils.singletonCluster(tp.topic(), 1); Node node = cluster.nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); @@ -538,16 +538,12 @@ private KafkaConsumer newConsumer(Properties props) { @Test public void verifyHeartbeatSent() throws Exception { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); @@ -573,14 +569,11 @@ public void verifyHeartbeatSent() throws Exception { @Test public void verifyHeartbeatSentWhenFetchedDataReady() throws Exception { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); @@ -607,16 +600,12 @@ public void verifyHeartbeatSentWhenFetchedDataReady() throws Exception { @Test public void verifyPollTimesOutDuringMetadataUpdate() { - final Time time = new MockTime(); - final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); final ConsumerMetadata metadata = createMetadata(subscription); final MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - final ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - final KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); // Since we would enable the heartbeat thread after received join-response which could @@ -635,16 +624,12 @@ public void verifyPollTimesOutDuringMetadataUpdate() { @SuppressWarnings("deprecation") @Test public void verifyDeprecatedPollDoesNotTimeOutDuringMetadataUpdate() { - final Time time = new MockTime(); - final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); final ConsumerMetadata metadata = createMetadata(subscription); final MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - final ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - final KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); prepareRebalance(client, node, assignor, singletonList(tp0), null); @@ -660,15 +645,12 @@ public void verifyDeprecatedPollDoesNotTimeOutDuringMetadataUpdate() { @Test public void verifyNoCoordinatorLookupForManualAssignmentWithSeek() { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); + KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, null, groupInstanceId, false); consumer.assign(singleton(tp0)); consumer.seekToBeginning(singleton(tp0)); @@ -683,22 +665,58 @@ public void verifyNoCoordinatorLookupForManualAssignmentWithSeek() { consumer.close(Duration.ofMillis(0)); } + @Test + public void verifyNoCoordinatorLookupForManualAssignmentWithOffsetCommit() { + ConsumerMetadata metadata = createMetadata(subscription); + MockClient client = new MockClient(time, metadata); + + initMetadata(client, Collections.singletonMap(topic, 1)); + Node node = metadata.fetch().nodes().get(0); + + // create a consumer with groupID with manual assignment + KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); + consumer.assign(singleton(tp0)); + + // 1st coordinator error should cause coordinator unknown + client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.COORDINATOR_NOT_AVAILABLE, groupId, node), node); + consumer.poll(Duration.ofMillis(0)); + + // 2nd coordinator error should find the correct coordinator and clear the findCoordinatorFuture + client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node); + + client.prepareResponse(offsetResponse(Collections.singletonMap(tp0, 50L), Errors.NONE)); + client.prepareResponse(fetchResponse(tp0, 50L, 5)); + + ConsumerRecords records = consumer.poll(Duration.ofMillis(0)); + assertEquals(5, records.count()); + assertEquals(55L, consumer.position(tp0)); + + // after coordinator found, consumer should be able to commit the offset successfully + client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp0, Errors.NONE))); + consumer.commitSync(Collections.singletonMap(tp0, new OffsetAndMetadata(55L))); + + // verify the offset is committed + client.prepareResponse(offsetResponse(Collections.singletonMap(tp0, 55L), Errors.NONE)); + assertEquals(55, consumer.committed(Collections.singleton(tp0), Duration.ZERO).get(tp0).offset()); + consumer.close(Duration.ofMillis(0)); + } + @Test public void testFetchProgressWithMissingPartitionPosition() { // Verifies that we can make progress on one partition while we are awaiting // a reset on another partition. - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 2)); + Node node = metadata.fetch().nodes().get(0); KafkaConsumer consumer = newConsumerNoAutoCommit(time, client, subscription, metadata); consumer.assign(Arrays.asList(tp0, tp1)); consumer.seekToEnd(singleton(tp0)); consumer.seekToBeginning(singleton(tp1)); + client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node); client.prepareResponse(body -> { ListOffsetsRequest request = (ListOffsetsRequest) body; List partitions = request.topics().stream().flatMap(t -> { @@ -742,7 +760,6 @@ private void initMetadata(MockClient mockClient, Map partitionC @Test public void testMissingOffsetNoResetPolicy() { - Time time = new MockTime(); SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -750,8 +767,6 @@ public void testMissingOffsetNoResetPolicy() { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupId, groupInstanceId, false); consumer.assign(singletonList(tp0)); @@ -766,7 +781,6 @@ public void testMissingOffsetNoResetPolicy() { @Test public void testResetToCommittedOffset() { - Time time = new MockTime(); SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.NONE); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -774,8 +788,6 @@ public void testResetToCommittedOffset() { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupId, groupInstanceId, false); consumer.assign(singletonList(tp0)); @@ -791,7 +803,6 @@ public void testResetToCommittedOffset() { @Test public void testResetUsingAutoResetPolicy() { - Time time = new MockTime(); SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.LATEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -799,8 +810,6 @@ public void testResetUsingAutoResetPolicy() { initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupId, groupInstanceId, false); consumer.assign(singletonList(tp0)); @@ -818,15 +827,12 @@ public void testResetUsingAutoResetPolicy() { @Test public void testOffsetIsValidAfterSeek() { - Time time = new MockTime(); SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.LATEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupId, Optional.empty(), false); consumer.assign(singletonList(tp0)); @@ -840,16 +846,12 @@ public void testCommitsFetchedDuringAssign() { long offset1 = 10000; long offset2 = 20000; - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 2)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.assign(singletonList(tp0)); @@ -894,8 +896,6 @@ public void testFetchStableOffsetThrowInPosition() { private KafkaConsumer setupThrowableConsumer() { long offset1 = 10000; - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -904,8 +904,6 @@ private KafkaConsumer setupThrowableConsumer() { Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer consumer = newConsumer( time, client, subscription, metadata, assignor, true, groupId, groupInstanceId, true); consumer.assign(singletonList(tp0)); @@ -922,16 +920,12 @@ private KafkaConsumer setupThrowableConsumer() { public void testNoCommittedOffsets() { long offset1 = 10000; - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 2)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.assign(Arrays.asList(tp0, tp1)); @@ -951,16 +945,12 @@ public void testNoCommittedOffsets() { @Test public void testAutoCommitSentBeforePositionUpdate() { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null); @@ -988,8 +978,6 @@ public void testAutoCommitSentBeforePositionUpdate() { @Test public void testRegexSubscription() { String unmatchedTopic = "unmatched"; - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -1000,8 +988,6 @@ public void testRegexSubscription() { initMetadata(client, partitionCounts); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); prepareRebalance(client, node, singleton(topic), assignor, singletonList(tp0), null); @@ -1018,13 +1004,9 @@ public void testRegexSubscription() { @Test public void testChangingRegexSubscription() { - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - String otherTopic = "other"; TopicPartition otherTopicPartition = new TopicPartition(otherTopic, 0); - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -1057,16 +1039,12 @@ public void testChangingRegexSubscription() { @Test public void testWakeupWithFetchDataAvailable() throws Exception { - final Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); prepareRebalance(client, node, assignor, singletonList(tp0), null); @@ -1098,16 +1076,12 @@ public void testWakeupWithFetchDataAvailable() throws Exception { @Test public void testPollThrowsInterruptExceptionIfInterrupted() { - final Time time = new MockTime(); - final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); final ConsumerMetadata metadata = createMetadata(subscription); final MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - final ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); prepareRebalance(client, node, assignor, singletonList(tp0), null); @@ -1128,16 +1102,12 @@ public void testPollThrowsInterruptExceptionIfInterrupted() { @Test public void fetchResponseWithUnexpectedPartitionIsIgnored() { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RangeAssignor(); - KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singletonList(topic), getConsumerRebalanceListener(consumer)); @@ -1164,8 +1134,6 @@ public void fetchResponseWithUnexpectedPartitionIsIgnored() { */ @Test public void testSubscriptionChangesWithAutoCommitEnabled() { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -1279,8 +1247,6 @@ public void testSubscriptionChangesWithAutoCommitEnabled() { */ @Test public void testSubscriptionChangesWithAutoCommitDisabled() { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -1336,8 +1302,6 @@ public void testSubscriptionChangesWithAutoCommitDisabled() { @Test public void testUnsubscribeShouldTriggerPartitionsRevokedWithValidGeneration() { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -1361,8 +1325,6 @@ public void testUnsubscribeShouldTriggerPartitionsRevokedWithValidGeneration() { @Test public void testUnsubscribeShouldTriggerPartitionsLostWithNoGeneration() throws Exception { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -1398,8 +1360,6 @@ private void initializeSubscriptionWithSingleTopic(KafkaConsumer @Test public void testManualAssignmentChangeWithAutoCommitEnabled() { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -1455,8 +1415,6 @@ public void testManualAssignmentChangeWithAutoCommitEnabled() { @Test public void testManualAssignmentChangeWithAutoCommitDisabled() { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -1512,8 +1470,6 @@ public void testManualAssignmentChangeWithAutoCommitDisabled() { @Test public void testOffsetOfPausedPartitions() { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -1707,16 +1663,12 @@ public void testMetricConfigRecordingLevel() { @Test public void testShouldAttemptToRejoinGroupAfterSyncGroupFailed() throws Exception { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, false, groupInstanceId); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); client.prepareResponseFrom(FindCoordinatorResponse.prepareResponse(Errors.NONE, groupId, node), node); @@ -1779,16 +1731,12 @@ private void consumerCloseTest(final long closeTimeoutMs, List responses, long waitMs, boolean interrupt) throws Exception { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - final KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, false, Optional.empty()); consumer.subscribe(singleton(topic), getConsumerRebalanceListener(consumer)); Node coordinator = prepareRebalance(client, node, assignor, singletonList(tp0), null); @@ -1865,8 +1813,6 @@ private void consumerCloseTest(final long closeTimeoutMs, @Test public void testPartitionsForNonExistingTopic() { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -1879,8 +1825,6 @@ public void testPartitionsForNonExistingTopic() { Collections.emptyList()); client.prepareResponse(updateResponse); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); assertEquals(Collections.emptyList(), consumer.partitionsFor("non-exist-topic")); } @@ -1954,7 +1898,6 @@ public void testMeasureCommitSyncDuration() { MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 2)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.assign(singletonList(tp0)); @@ -1999,7 +1942,6 @@ public void testMeasureCommittedDuration() { MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 2)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.assign(singletonList(tp0)); @@ -2022,16 +1964,12 @@ public void testMeasureCommittedDuration() { @Test public void testRebalanceException() { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); Node node = metadata.fetch().nodes().get(0); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singleton(topic), getExceptionConsumerRebalanceListener()); @@ -2069,7 +2007,6 @@ public void testRebalanceException() { @Test public void testReturnRecordsDuringRebalance() throws InterruptedException { Time time = new MockTime(1L); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); ConsumerPartitionAssignor assignor = new CooperativeStickyAssignor(); @@ -2194,16 +2131,12 @@ public void testReturnRecordsDuringRebalance() throws InterruptedException { @Test public void testGetGroupMetadata() { - final Time time = new MockTime(); - final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); final ConsumerMetadata metadata = createMetadata(subscription); final MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); final Node node = metadata.fetch().nodes().get(0); - final ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - final KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); final ConsumerGroupMetadata groupMetadataOnStart = consumer.groupMetadata(); @@ -2228,8 +2161,6 @@ public void testGetGroupMetadata() { @Test public void testInvalidGroupMetadata() throws InterruptedException { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); @@ -2257,13 +2188,10 @@ public void testInvalidGroupMetadata() throws InterruptedException { @Test public void testCurrentLag() { - final Time time = new MockTime(); - final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); final ConsumerMetadata metadata = createMetadata(subscription); final MockClient client = new MockClient(time, metadata); initMetadata(client, singletonMap(topic, 1)); - final ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); final KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); @@ -2315,13 +2243,10 @@ public void testCurrentLag() { @Test public void testListOffsetShouldUpateSubscriptions() { - final Time time = new MockTime(); - final SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); final ConsumerMetadata metadata = createMetadata(subscription); final MockClient client = new MockClient(time, metadata); initMetadata(client, singletonMap(topic, 1)); - final ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); final KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); @@ -2343,7 +2268,6 @@ public void testListOffsetShouldUpateSubscriptions() { } private KafkaConsumer consumerWithPendingAuthenticationError(final Time time) { - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); @@ -2641,14 +2565,16 @@ private KafkaConsumer newConsumer(Time time, ConsumerNetworkClient consumerClient = new ConsumerNetworkClient(loggerFactory, client, metadata, time, retryBackoffMs, requestTimeoutMs, heartbeatIntervalMs); - GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs, + ConsumerCoordinator consumerCoordinator = null; + if (groupId != null) { + GroupRebalanceConfig rebalanceConfig = new GroupRebalanceConfig(sessionTimeoutMs, rebalanceTimeoutMs, heartbeatIntervalMs, groupId, groupInstanceId, retryBackoffMs, true); - ConsumerCoordinator consumerCoordinator = new ConsumerCoordinator(rebalanceConfig, + consumerCoordinator = new ConsumerCoordinator(rebalanceConfig, loggerFactory, consumerClient, assignors, @@ -2661,6 +2587,7 @@ private KafkaConsumer newConsumer(Time time, autoCommitIntervalMs, interceptors, throwOnStableOffsetNotSupported); + } Fetcher fetcher = new Fetcher<>( loggerFactory, consumerClient, @@ -2723,16 +2650,12 @@ private static class FetchInfo { @Test public void testSubscriptionOnInvalidTopic() { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); Cluster cluster = metadata.fetch(); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - String invalidTopicName = "topic abc"; // Invalid topic name due to space List topicMetadata = new ArrayList<>(); @@ -2752,14 +2675,10 @@ public void testSubscriptionOnInvalidTopic() { @Test public void testPollTimeMetrics() { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); consumer.subscribe(singletonList(topic)); // MetricName objects to check @@ -2801,14 +2720,10 @@ public void testPollTimeMetrics() { @Test public void testPollIdleRatio() { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); - KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); // MetricName object to check Metrics metrics = consumer.metrics; @@ -2851,8 +2766,6 @@ private static boolean consumerMetricPresent(KafkaConsumer consu @Test public void testClosingConsumerUnregistersConsumerMetrics() { - Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); initMetadata(client, Collections.singletonMap(topic, 1)); @@ -2879,10 +2792,8 @@ public void testEnforceRebalanceWithManualAssignment() { @Test public void testEnforceRebalanceTriggersRebalanceOnNextPoll() { Time time = new MockTime(1L); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); - ConsumerPartitionAssignor assignor = new RoundRobinAssignor(); KafkaConsumer consumer = newConsumer(time, client, subscription, metadata, assignor, true, groupInstanceId); MockRebalanceListener countingRebalanceListener = new MockRebalanceListener(); initMetadata(client, Utils.mkMap(Utils.mkEntry(topic, 1), Utils.mkEntry(topic2, 1), Utils.mkEntry(topic3, 1))); @@ -2976,8 +2887,6 @@ public void testEndOffsetsTimeout() { } private KafkaConsumer consumerForCheckingTimeoutException() { - final Time time = new MockTime(); - SubscriptionState subscription = new SubscriptionState(new LogContext(), OffsetResetStrategy.EARLIEST); ConsumerMetadata metadata = createMetadata(subscription); MockClient client = new MockClient(time, metadata); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 10d2a83460060..1962c94820fb6 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -477,6 +477,21 @@ public void testGroupReadUnauthorized() { assertThrows(GroupAuthorizationException.class, () -> coordinator.poll(time.timer(Long.MAX_VALUE))); } + @Test + public void testCoordinatorNotAvailableWithUserAssignedType() { + subscriptions.assignFromUser(Collections.singleton(t1p)); + // should mark coordinator unknown after COORDINATOR_NOT_AVAILABLE error + client.prepareResponse(groupCoordinatorResponse(node, Errors.COORDINATOR_NOT_AVAILABLE)); + // set timeout to 0 because we don't want to retry after the error + coordinator.poll(time.timer(0)); + assertTrue(coordinator.coordinatorUnknown()); + + // should find an available node in next find coordinator request + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); + coordinator.poll(time.timer(Long.MAX_VALUE)); + assertFalse(coordinator.coordinatorUnknown()); + } + @Test public void testCoordinatorNotAvailable() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 538e40195f588..7f9ddb2e80034 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -1038,7 +1038,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { addAndVerifyAcls(Set(new AccessControlEntry(clientPrincipalString, WildcardHost, READ, ALLOW)), topicResource) // in this case, we do an explicit seek, so there should be no need to query the coordinator at all - val consumer = createConsumer() + // remove the group.id config to avoid coordinator created + val consumer = createConsumer(configsToRemove = List(ConsumerConfig.GROUP_ID_CONFIG)) consumer.assign(List(tp).asJava) consumer.seekToBeginning(List(tp).asJava) consumeRecords(consumer)