diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java index f9a42171a0447..8708ea4f7e343 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerPartitionAssignor.java @@ -154,6 +154,14 @@ public List partitions() { public ByteBuffer userData() { return userData; } + + @Override + public String toString() { + return "Assignment(" + + "partitions=" + partitions + + (userData == null ? "" : ", userDataSize=" + userData.remaining()) + + ')'; + } } final class GroupSubscription { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index c59b7fd345fca..27580a2ecc020 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -1370,9 +1370,6 @@ boolean matches(MetadataSnapshot other) { return version == other.version || partitionsPerTopic.equals(other.partitionsPerTopic); } - Map partitionsPerTopic() { - return partitionsPerTopic; - } } private static class OffsetCommitCompletion { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index 9200eb8ad5f4b..6568c9188a408 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -186,15 +186,17 @@ private boolean changeSubscription(Set topicsToSubscribe) { } /** - * Add topics to the current group subscription. This is used by the group leader to ensure + * Set the current group subscription. This is used by the group leader to ensure * that it receives metadata updates for all topics that the group is interested in. - * @param topics The topics to add to the group subscription + * + * @param topics All topics from the group subscription + * @return true if the group subscription contains topics which are not part of the local subscription */ synchronized boolean groupSubscribe(Collection topics) { if (!hasAutoAssignedPartitions()) throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE); - groupSubscription = new HashSet<>(groupSubscription); - return groupSubscription.addAll(topics); + groupSubscription = new HashSet<>(topics); + return !subscription.containsAll(groupSubscription); } /** @@ -328,7 +330,7 @@ public synchronized Set pausedPartitions() { } /** - * Get the subcription topics for which metadata is required . For the leader, this will include + * Get the subscription topics for which metadata is required. For the leader, this will include * the union of the subscriptions of all group members. For followers, it is just that member's * subscription. This is used when querying topic metadata to detect the metadata changes which would * require rebalancing. The leader fetches metadata for all topics in the group so that it diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java index 74048adb76e64..96f08f52acb3f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java @@ -105,6 +105,22 @@ public void partitionAssignmentChangeOnTopicSubscription() { assertEquals(0, state.numAssignedPartitions()); } + @Test + public void testGroupSubscribe() { + state.subscribe(singleton(topic1), rebalanceListener); + assertEquals(singleton(topic1), state.metadataTopics()); + + assertFalse(state.groupSubscribe(singleton(topic1))); + assertEquals(singleton(topic1), state.metadataTopics()); + + assertTrue(state.groupSubscribe(Utils.mkSet(topic, topic1))); + assertEquals(Utils.mkSet(topic, topic1), state.metadataTopics()); + + // `groupSubscribe` does not accumulate + assertFalse(state.groupSubscribe(singleton(topic1))); + assertEquals(singleton(topic1), state.metadataTopics()); + } + @Test public void partitionAssignmentChangeOnPatternSubscription() { state.subscribe(Pattern.compile(".*"), rebalanceListener); @@ -112,7 +128,7 @@ public void partitionAssignmentChangeOnPatternSubscription() { assertTrue(state.assignedPartitions().isEmpty()); assertEquals(0, state.numAssignedPartitions()); - state.subscribeFromPattern(new HashSet<>(Collections.singletonList(topic))); + state.subscribeFromPattern(Collections.singleton(topic)); // assigned partitions should remain unchanged assertTrue(state.assignedPartitions().isEmpty()); assertEquals(0, state.numAssignedPartitions()); @@ -244,7 +260,7 @@ public void cantAssignPartitionForUnsubscribedTopics() { @Test public void cantAssignPartitionForUnmatchedPattern() { state.subscribe(Pattern.compile(".*t"), rebalanceListener); - state.subscribeFromPattern(new HashSet<>(Collections.singletonList(topic))); + state.subscribeFromPattern(Collections.singleton(topic)); assertFalse(state.checkAssignmentMatchedSubscription(Collections.singletonList(t1p0))); } diff --git a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala index 97b638f8e6ffa..853b2caa7c751 100644 --- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala @@ -65,20 +65,24 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness { } def committedOffsets(topic: String = topic, group: String = group): Map[TopicPartition, Long] = { - val props = new Properties - props.put("bootstrap.servers", brokerList) - props.put("group.id", group) - val consumer = new KafkaConsumer(props, new StringDeserializer, new StringDeserializer) + val consumer = createNoAutoCommitConsumer(group) try { val partitions: Set[TopicPartition] = consumer.partitionsFor(topic) .asScala.toSet.map {partitionInfo : PartitionInfo => new TopicPartition(partitionInfo.topic, partitionInfo.partition)} - consumer.committed(partitions.asJava).asScala.filter(_._2 != null).mapValues(_.offset()).toMap } finally { consumer.close() } } + def createNoAutoCommitConsumer(group: String): KafkaConsumer[String, String] = { + val props = new Properties + props.put("bootstrap.servers", brokerList) + props.put("group.id", group) + props.put("enable.auto.commit", "false") + new KafkaConsumer(props, new StringDeserializer, new StringDeserializer) + } + def getConsumerGroupService(args: Array[String]): ConsumerGroupService = { val opts = new ConsumerGroupCommandOptions(args) val service = new ConsumerGroupService(opts, Map(AdminClientConfig.RETRIES_CONFIG -> Int.MaxValue.toString)) diff --git a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala index 838444ce748f1..f9322b36e76f1 100644 --- a/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ResetConsumerGroupOffsetTest.scala @@ -16,8 +16,6 @@ import java.io.{BufferedWriter, File, FileWriter} import java.text.{ParseException, SimpleDateFormat} import java.util.{Calendar, Date, Properties} -import scala.collection.Seq - import joptsimple.OptionException import kafka.admin.ConsumerGroupCommand.ConsumerGroupService import kafka.server.KafkaConfig @@ -28,6 +26,9 @@ import org.apache.kafka.test import org.junit.Assert._ import org.junit.Test +import scala.collection.JavaConverters._ +import scala.collection.Seq + class TimeConversionTests { @Test @@ -462,12 +463,28 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest { executor.shutdown() } - private def awaitConsumerProgress(topic: String = topic, group: String = group, count: Long): Unit = { - TestUtils.waitUntilTrue(() => { - val offsets = committedOffsets(topic = topic, group = group).values - count == offsets.sum - }, "Expected that consumer group has consumed all messages from topic/partition. " + - s"Expected offset: $count. Actual offset: ${committedOffsets(topic, group).values.sum}") + private def awaitConsumerProgress(topic: String = topic, + group: String = group, + count: Long): Unit = { + val consumer = createNoAutoCommitConsumer(group) + try { + val partitions = consumer.partitionsFor(topic).asScala.map { partitionInfo => + new TopicPartition(partitionInfo.topic, partitionInfo.partition) + }.toSet + + TestUtils.waitUntilTrue(() => { + val committed = consumer.committed(partitions.asJava).values.asScala + val total = committed.foldLeft(0L) { case (currentSum, offsetAndMetadata) => + currentSum + Option(offsetAndMetadata).map(_.offset).getOrElse(0L) + } + total == count + }, "Expected that consumer group has consumed all messages from topic/partition. " + + s"Expected offset: $count. Actual offset: ${committedOffsets(topic, group).values.sum}") + + } finally { + consumer.close() + } + } private def resetAndAssertOffsets(args: Array[String],