Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,14 @@ public List<TopicPartition> partitions() {
public ByteBuffer userData() {
return userData;
}

@Override
public String toString() {
return "Assignment(" +
"partitions=" + partitions +
(userData == null ? "" : ", userDataSize=" + userData.remaining()) +
')';
}
}

final class GroupSubscription {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1370,9 +1370,6 @@ boolean matches(MetadataSnapshot other) {
return version == other.version || partitionsPerTopic.equals(other.partitionsPerTopic);
}

Map<String, Integer> partitionsPerTopic() {
return partitionsPerTopic;
}
}

private static class OffsetCommitCompletion {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,15 +186,17 @@ private boolean changeSubscription(Set<String> topicsToSubscribe) {
}

/**
* Add topics to the current group subscription. This is used by the group leader to ensure
* Set the current group subscription. This is used by the group leader to ensure
* that it receives metadata updates for all topics that the group is interested in.
* @param topics The topics to add to the group subscription
*
* @param topics All topics from the group subscription
* @return true if the group subscription contains topics which are not part of the local subscription
*/
synchronized boolean groupSubscribe(Collection<String> topics) {
if (!hasAutoAssignedPartitions())
throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);
groupSubscription = new HashSet<>(groupSubscription);
return groupSubscription.addAll(topics);
groupSubscription = new HashSet<>(topics);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were accumulating topics earlier, but now we are replacing groupSubscription. I guess that is fine since we are passing in all the topics. May be worth updating the comment and also verifying that we are no longer accumulating in the unit test.

return !subscription.containsAll(groupSubscription);
}

/**
Expand Down Expand Up @@ -328,7 +330,7 @@ public synchronized Set<TopicPartition> pausedPartitions() {
}

/**
* Get the subcription topics for which metadata is required . For the leader, this will include
* Get the subscription topics for which metadata is required. For the leader, this will include
* the union of the subscriptions of all group members. For followers, it is just that member's
* subscription. This is used when querying topic metadata to detect the metadata changes which would
* require rebalancing. The leader fetches metadata for all topics in the group so that it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,14 +105,30 @@ public void partitionAssignmentChangeOnTopicSubscription() {
assertEquals(0, state.numAssignedPartitions());
}

@Test
public void testGroupSubscribe() {
state.subscribe(singleton(topic1), rebalanceListener);
assertEquals(singleton(topic1), state.metadataTopics());

assertFalse(state.groupSubscribe(singleton(topic1)));
assertEquals(singleton(topic1), state.metadataTopics());

assertTrue(state.groupSubscribe(Utils.mkSet(topic, topic1)));
assertEquals(Utils.mkSet(topic, topic1), state.metadataTopics());

// `groupSubscribe` does not accumulate
assertFalse(state.groupSubscribe(singleton(topic1)));
assertEquals(singleton(topic1), state.metadataTopics());
}

@Test
public void partitionAssignmentChangeOnPatternSubscription() {
state.subscribe(Pattern.compile(".*"), rebalanceListener);
// assigned partitions should remain unchanged
assertTrue(state.assignedPartitions().isEmpty());
assertEquals(0, state.numAssignedPartitions());

state.subscribeFromPattern(new HashSet<>(Collections.singletonList(topic)));
state.subscribeFromPattern(Collections.singleton(topic));
// assigned partitions should remain unchanged
assertTrue(state.assignedPartitions().isEmpty());
assertEquals(0, state.numAssignedPartitions());
Expand Down Expand Up @@ -244,7 +260,7 @@ public void cantAssignPartitionForUnsubscribedTopics() {
@Test
public void cantAssignPartitionForUnmatchedPattern() {
state.subscribe(Pattern.compile(".*t"), rebalanceListener);
state.subscribeFromPattern(new HashSet<>(Collections.singletonList(topic)));
state.subscribeFromPattern(Collections.singleton(topic));
assertFalse(state.checkAssignmentMatchedSubscription(Collections.singletonList(t1p0)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,24 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness {
}

def committedOffsets(topic: String = topic, group: String = group): Map[TopicPartition, Long] = {
val props = new Properties
props.put("bootstrap.servers", brokerList)
props.put("group.id", group)
val consumer = new KafkaConsumer(props, new StringDeserializer, new StringDeserializer)
val consumer = createNoAutoCommitConsumer(group)
try {
val partitions: Set[TopicPartition] = consumer.partitionsFor(topic)
.asScala.toSet.map {partitionInfo : PartitionInfo => new TopicPartition(partitionInfo.topic, partitionInfo.partition)}

consumer.committed(partitions.asJava).asScala.filter(_._2 != null).mapValues(_.offset()).toMap
} finally {
consumer.close()
}
}

def createNoAutoCommitConsumer(group: String): KafkaConsumer[String, String] = {
val props = new Properties
props.put("bootstrap.servers", brokerList)
props.put("group.id", group)
props.put("enable.auto.commit", "false")
new KafkaConsumer(props, new StringDeserializer, new StringDeserializer)
}

def getConsumerGroupService(args: Array[String]): ConsumerGroupService = {
val opts = new ConsumerGroupCommandOptions(args)
val service = new ConsumerGroupService(opts, Map(AdminClientConfig.RETRIES_CONFIG -> Int.MaxValue.toString))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ import java.io.{BufferedWriter, File, FileWriter}
import java.text.{ParseException, SimpleDateFormat}
import java.util.{Calendar, Date, Properties}

import scala.collection.Seq

import joptsimple.OptionException
import kafka.admin.ConsumerGroupCommand.ConsumerGroupService
import kafka.server.KafkaConfig
Expand All @@ -28,6 +26,9 @@ import org.apache.kafka.test
import org.junit.Assert._
import org.junit.Test

import scala.collection.JavaConverters._
import scala.collection.Seq

class TimeConversionTests {

@Test
Expand Down Expand Up @@ -462,12 +463,28 @@ class ResetConsumerGroupOffsetTest extends ConsumerGroupCommandTest {
executor.shutdown()
}

private def awaitConsumerProgress(topic: String = topic, group: String = group, count: Long): Unit = {
TestUtils.waitUntilTrue(() => {
val offsets = committedOffsets(topic = topic, group = group).values
count == offsets.sum
}, "Expected that consumer group has consumed all messages from topic/partition. " +
s"Expected offset: $count. Actual offset: ${committedOffsets(topic, group).values.sum}")
private def awaitConsumerProgress(topic: String = topic,
group: String = group,
count: Long): Unit = {
val consumer = createNoAutoCommitConsumer(group)
try {
val partitions = consumer.partitionsFor(topic).asScala.map { partitionInfo =>
new TopicPartition(partitionInfo.topic, partitionInfo.partition)
}.toSet

TestUtils.waitUntilTrue(() => {
val committed = consumer.committed(partitions.asJava).values.asScala
val total = committed.foldLeft(0L) { case (currentSum, offsetAndMetadata) =>
currentSum + Option(offsetAndMetadata).map(_.offset).getOrElse(0L)
}
total == count
}, "Expected that consumer group has consumed all messages from topic/partition. " +
s"Expected offset: $count. Actual offset: ${committedOffsets(topic, group).values.sum}")

} finally {
consumer.close()
}

}

private def resetAndAssertOffsets(args: Array[String],
Expand Down