makeNext() {
- if (current == null || !current.hasNext()) {
+ while (current == null || !current.hasNext()) {
if (iters.hasNext())
current = iters.next().iterator();
else
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index d7c8e14cea0cb..7290a38445646 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -16,19 +16,18 @@
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
+import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.Fetcher;
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
-import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
@@ -36,6 +35,7 @@
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.SystemTime;
@@ -45,7 +45,6 @@
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
@@ -53,6 +52,7 @@
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
@@ -76,7 +76,7 @@
* Offsets and Consumer Position
* Kafka maintains a numerical offset for each record in a partition. This offset acts as a kind of unique identifier of
* a record within that partition, and also denotes the position of the consumer in the partition. That is, a consumer
- * which has position 5 has consumed records with offsets 0 through 4 and will next receive record with offset 5. There
+ * which has position 5 has consumed records with offsets 0 through 4 and will next receive the record with offset 5. There
* are actually two notions of position relevant to the user of the consumer.
*
* The {@link #position(TopicPartition) position} of the consumer gives the offset of the next record that will be given
@@ -95,22 +95,23 @@
*
*
Consumer Groups and Topic Subscriptions
*
- * Kafka uses the concept of consumer groups to allow a pool of processes to divide up the work of consuming and
+ * Kafka uses the concept of consumer groups to allow a pool of processes to divide the work of consuming and
* processing records. These processes can either be running on the same machine or, as is more likely, they can be
- * distributed over many machines to provide additional scalability and fault tolerance for processing.
+ * distributed over many machines to provide scalability and fault tolerance for processing.
*
- * Each Kafka consumer is able to configure a consumer group that it belongs to, and can dynamically set the
- * list of topics it wants to subscribe to through {@link #subscribe(List, ConsumerRebalanceListener)},
- * or subscribe to all topics matching certain pattern through {@link #subscribe(Pattern, ConsumerRebalanceListener)}.
- * Kafka will deliver each message in the
- * subscribed topics to one process in each consumer group. This is achieved by balancing the partitions in the topic
- * over the consumer processes in each group. So if there is a topic with four partitions, and a consumer group with two
- * processes, each process would consume from two partitions. This group membership is maintained dynamically: if a
- * process fails the partitions assigned to it will be reassigned to other processes in the same group, and if a new
- * process joins the group, partitions will be moved from existing consumers to this new process.
+ * Each Kafka consumer is able to configure a consumer group that it belongs to, and can dynamically set the list
+ * of topics it wants to subscribe to through one of the {@link #subscribe(Collection, ConsumerRebalanceListener) subscribe}
+ * APIs. Kafka will deliver each message in the subscribed topics to one process in each consumer group.
+ * This is achieved by balancing the partitions between all members in the consumer group so that each partition is
+ * assigned to exactly one consumer in the group. So if there is a topic with four partitions, and a consumer group with two
+ * processes, each process would consume from two partitions.
*
- * So if two processes subscribe to a topic both specifying different groups they will each get all the records in that
- * topic; if they both specify the same group they will each get about half the records.
+ * Membership in a consumer group is maintained dynamically: if a process fails, the partitions assigned to it will
+ * be reassigned to other consumers in the same group. Similarly, if a new consumer joins the group, partitions will be moved
+ * from existing consumers to the new one. This is known as rebalancing the group and is discussed in more
+ * detail below. Note that the same process is also used when new partitions are added
+ * to one of the subscribed topics: the group automatically detects the new partitions and rebalances the group so
+ * that every new partition is assigned to one of the members.
*
* Conceptually you can think of a consumer group as being a single logical subscriber that happens to be made up of
* multiple processes. As a multi-subscriber system, Kafka naturally supports having any number of consumer groups for a
@@ -127,8 +128,51 @@
* commits (note that offsets are always committed for a given consumer group), etc.
* See Storing Offsets Outside Kafka for more details
*
- * It is also possible for the consumer to manually specify the partitions that are assigned to it through {@link #assign(List)},
- * which disables this dynamic partition assignment.
+ * It is also possible for the consumer to manually assign specific partitions
+ * (similar to the older "simple" consumer) using {@link #assign(Collection)}. In this case, dynamic partition
+ * assignment and consumer group coordination will be disabled.
+ *
+ *
+ *
+ * After subscribing to a set of topics, the consumer will automatically join the group when {@link #poll(long)} is
+ * invoked. The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer
+ * will stay in the group and continue to receive messages from the partitions it was assigned. Underneath the covers,
+ * the poll API sends periodic heartbeats to the server; when you stop calling poll (perhaps because an exception was thrown),
+ * then no heartbeats will be sent. If a period of the configured session timeout elapses before the server
+ * has received a heartbeat, then the consumer will be kicked out of the group and its partitions will be reassigned.
+ * This is designed to prevent situations where the consumer has failed, yet continues to hold onto the partitions
+ * it was assigned (thus preventing active consumers in the group from taking them). To stay in the group, you
+ * have to prove you are still alive by calling poll.
+ *
+ * The implication of this design is that message processing time in the poll loop must be bounded so that
+ * heartbeats can be sent before expiration of the session timeout. What typically happens when processing time
+ * exceeds the session timeout is that the consumer won't be able to commit offsets for any of the processed records.
+ * For example, this is indicated by a {@link CommitFailedException} thrown from {@link #commitSync()}. This
+ * guarantees that only active members of the group are allowed to commit offsets. If the consumer
+ * has been kicked out of the group, then its partitions will have been assigned to another member, which will be
+ * committing its own offsets as it handles new records. This gives offset commits an isolation guarantee.
+ *
+ * The consumer provides two configuration settings to control this behavior:
+ *
+ * session.timeout.ms: By increasing the session timeout, you can give the consumer more
+ * time to handle a batch of records returned from {@link #poll(long)}. The only drawback is that it
+ * will take longer for the server to detect hard consumer failures, which can cause a delay before
+ * a rebalance can be completed. However, clean shutdown with {@link #close()} is not impacted since
+ * the consumer will send an explicit message to the server to leave the group and cause an immediate
+ * rebalance.
+ * max.poll.records: Processing time in the poll loop is typically proportional to the number
+ * of records processed, so it's natural to want to set a limit on the number of records handled at once.
+ * This setting provides that. By default, there is essentially no limit.
+ *
+ *
+ * For use cases where message processing time varies unpredictably, neither of these options may be viable.
+ * The recommended way to handle these cases is to move message processing to another thread, which allows
+ * the consumer to continue sending heartbeats while the processor is still working. Some care must be taken
+ * to ensure that committed offsets do not get ahead of the actual position. Typically, you must disable automatic
+ * commits and manually commit processed offsets for records only after the thread has finished handling them
+ * (depending on the delivery semantics you need). Note also that you will generally need to {@link #pause(Collection)}
+ * the partition so that no new records are received from poll until after thread has finished handling those
+ * previously returned.
*
*
Usage Examples
* The consumer APIs offer flexibility to cover a variety of consumption use cases. Here are some examples to
@@ -166,11 +210,11 @@
* In this example the client is subscribing to the topics foo and bar as part of a group of consumers
* called test as described above.
*
- * The broker will automatically detect failed processes in the test group by using a heartbeat mechanism. The
- * consumer will automatically ping the cluster periodically, which lets the cluster know that it is alive. Note that
- * the consumer is single-threaded, so periodic heartbeats can only be sent when {@link #poll(long)} is called. As long as
- * the consumer is able to do this it is considered alive and retains the right to consume from the partitions assigned
- * to it. If it stops heartbeating by failing to call {@link #poll(long)} for a period of time longer than session.timeout.ms
+ * The broker will automatically detect failed processes in the test group by using a heartbeat mechanism. The
+ * consumer will automatically ping the cluster periodically, which lets the cluster know that it is alive. Note that
+ * the consumer is single-threaded, so periodic heartbeats can only be sent when {@link #poll(long)} is called. As long as
+ * the consumer is able to do this it is considered alive and retains the right to consume from the partitions assigned
+ * to it. If it stops heartbeating by failing to call {@link #poll(long)} for a period of time longer than session.timeout.ms
* then it will be considered dead and its partitions will be assigned to another process.
*
* The deserializer settings specify how to turn bytes into objects. For example, by specifying string deserializers, we
@@ -244,27 +288,23 @@
* Note: The committed offset should always be the offset of the next message that your application will read.
* Thus, when calling {@link #commitSync(Map) commitSync(offsets)} you should add one to the offset of the last message processed.
*
- *
Subscribing To Specific Partitions
+ *
*
- * In the previous examples we subscribed to the topics we were interested in and let Kafka give our particular process
- * a fair share of the partitions for those topics. This provides a simple load balancing mechanism so multiple
- * instances of our program can divided up the work of processing records.
- *
- * In this mode the consumer will just get the partitions it subscribes to and if the consumer instance fails no attempt
- * will be made to rebalance partitions to other instances.
+ * In the previous examples, we subscribed to the topics we were interested in and let Kafka dynamically assign a
+ * fair share of the partitions for those topics based on the active consumers in the group. However, in
+ * some cases you may need finer control over the specific partitions that are assigned. For example:
*
- * There are several cases where this makes sense:
*
- * - The first case is if the process is maintaining some kind of local state associated with that partition (like a
- * local on-disk key-value store) and hence it should only get records for the partition it is maintaining on disk.
- *
- Another case is if the process itself is highly available and will be restarted if it fails (perhaps using a
+ *
- If the process is maintaining some kind of local state associated with that partition (like a
+ * local on-disk key-value store), then it should only get records for the partition it is maintaining on disk.
+ *
- If the process itself is highly available and will be restarted if it fails (perhaps using a
* cluster management framework like YARN, Mesos, or AWS facilities, or as part of a stream processing framework). In
- * this case there is no need for Kafka to detect the failure and reassign the partition, rather the consuming process
+ * this case there is no need for Kafka to detect the failure and reassign the partition since the consuming process
* will be restarted on another machine.
*
*
- * This mode is easy to specify, rather than subscribing to the topic, the consumer just subscribes to particular
- * partitions:
+ * To use this mode, instead of subscribing to the topic using {@link #subscribe(Collection) subscribe}, you just call
+ * {@link #assign(Collection)} with the full list of partitions that you want to consume.
*
*
* String topic = "foo";
@@ -273,11 +313,15 @@
* consumer.assign(Arrays.asList(partition0, partition1));
*
*
- * The group that the consumer specifies is still used for committing offsets, but now the set of partitions will only
- * be changed if the consumer specifies new partitions, and no attempt at failure detection will be made.
+ * Once assigned, you can call {@link #poll(long) poll} in a loop, just as in the preceding examples to consume
+ * records. The group that the consumer specifies is still used for committing offsets, but now the set of partitions
+ * will only change with another call to {@link #assign(Collection) assign}. Manual partition assignment does
+ * not use group coordination, so consumer failures will not cause assigned partitions to be rebalanced. Each consumer
+ * acts independently even if it shares a groupId with another consumer. To avoid offset commit conflicts, you should
+ * usually ensure that the groupId is unique for each consumer instance.
*
- * It isn't possible to mix both subscription to specific partitions (with no load balancing) and to topics (with load
- * balancing) using the same consumer instance.
+ * Note that it isn't possible to mix manual partition assignment (i.e. using {@link #assign(Collection) assign})
+ * with dynamic partition assignment through topic subscription (i.e. using {@link #subscribe(Collection) subscribe}).
*
*
*
@@ -313,7 +357,7 @@
* This type of usage is simplest when the partition assignment is also done manually (this would be likely in the
* search index use case described above). If the partition assignment is done automatically special care is
* needed to handle the case where partition assignments change. This can be done by providing a
- * {@link ConsumerRebalanceListener} instance in the call to {@link #subscribe(List, ConsumerRebalanceListener)}
+ * {@link ConsumerRebalanceListener} instance in the call to {@link #subscribe(Collection, ConsumerRebalanceListener)}
* and {@link #subscribe(Pattern, ConsumerRebalanceListener)}.
* For example, when partitions are taken from a consumer the consumer will want to commit its offset for those partitions by
* implementing {@link ConsumerRebalanceListener#onPartitionsRevoked(Collection)}. When partitions are assigned to a
@@ -342,7 +386,7 @@
*
* Kafka allows specifying the position using {@link #seek(TopicPartition, long)} to specify the new position. Special
* methods for seeking to the earliest and latest offset the server maintains are also available (
- * {@link #seekToBeginning(TopicPartition...)} and {@link #seekToEnd(TopicPartition...)} respectively).
+ * {@link #seekToBeginning(Collection)} and {@link #seekToEnd(Collection)} respectively).
*
*
Consumption Flow Control
*
@@ -359,7 +403,7 @@
* fetching other topics.
*
*
- * Kafka supports dynamic controlling of consumption flows by using {@link #pause(TopicPartition...)} and {@link #resume(TopicPartition...)}
+ * Kafka supports dynamic controlling of consumption flows by using {@link #pause(Collection)} and {@link #resume(Collection)}
* to pause the consumption on the specified assigned partitions and resume the consumption
* on the specified paused partitions respectively in the future {@link #poll(long)} calls.
*
@@ -453,7 +497,6 @@
* commit.
*
*/
-@InterfaceStability.Unstable
public class KafkaConsumer implements Consumer {
private static final Logger log = LoggerFactory.getLogger(KafkaConsumer.class);
@@ -586,8 +629,9 @@ private KafkaConsumer(ConsumerConfig config,
config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), time);
- this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs);
- OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase());
+ this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
+ config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG));
+ OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase(Locale.ROOT));
this.subscriptions = new SubscriptionState(offsetResetStrategy);
List assignors = config.getConfiguredInstances(
ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
@@ -612,7 +656,8 @@ private KafkaConsumer(ConsumerConfig config,
new ConsumerCoordinator.DefaultOffsetCommitCallback(),
config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG),
config.getLong(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG),
- this.interceptors);
+ this.interceptors,
+ config.getBoolean(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG));
if (keyDeserializer == null) {
this.keyDeserializer = config.getConfiguredInstance(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
Deserializer.class);
@@ -659,7 +704,7 @@ private KafkaConsumer(ConsumerConfig config,
/**
* Get the set of partitions currently assigned to this consumer. If subscription happened by directly assigning
- * partitions using {@link #assign(List)} then this will simply return the same partitions that
+ * partitions using {@link #assign(Collection)} then this will simply return the same partitions that
* were assigned. If topic subscription was used, then this will give the set of topic partitions currently assigned
* to the consumer (which may be none if the assignment hasn't happened yet, or the partitions are in the
* process of getting reassigned).
@@ -676,7 +721,7 @@ public Set assignment() {
/**
* Get the current subscription. Will return the same topics used in the most recent call to
- * {@link #subscribe(List, ConsumerRebalanceListener)}, or an empty set if no such call has been made.
+ * {@link #subscribe(Collection, ConsumerRebalanceListener)}, or an empty set if no such call has been made.
* @return The set of topics currently subscribed to
*/
public Set subscription() {
@@ -692,7 +737,7 @@ public Set subscription() {
* Subscribe to the given list of topics to get dynamically
* assigned partitions. Topic subscriptions are not incremental. This list will replace the current
* assignment (if there is one). Note that it is not possible to combine topic subscription with group management
- * with manual partition assignment through {@link #assign(List)}.
+ * with manual partition assignment through {@link #assign(Collection)}.
*
* If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}.
*
@@ -717,7 +762,7 @@ public Set subscription() {
* subscribed topics
*/
@Override
- public void subscribe(List topics, ConsumerRebalanceListener listener) {
+ public void subscribe(Collection topics, ConsumerRebalanceListener listener) {
acquire();
try {
if (topics.isEmpty()) {
@@ -737,21 +782,21 @@ public void subscribe(List topics, ConsumerRebalanceListener listener) {
* Subscribe to the given list of topics to get dynamically assigned partitions.
* Topic subscriptions are not incremental. This list will replace the current
* assignment (if there is one). It is not possible to combine topic subscription with group management
- * with manual partition assignment through {@link #assign(List)}.
+ * with manual partition assignment through {@link #assign(Collection)}.
*
* If the given list of topics is empty, it is treated the same as {@link #unsubscribe()}.
*
*
- * This is a short-hand for {@link #subscribe(List, ConsumerRebalanceListener)}, which
+ * This is a short-hand for {@link #subscribe(Collection, ConsumerRebalanceListener)}, which
* uses a noop listener. If you need the ability to either seek to particular offsets, you should prefer
- * {@link #subscribe(List, ConsumerRebalanceListener)}, since group rebalances will cause partition offsets
+ * {@link #subscribe(Collection, ConsumerRebalanceListener)}, since group rebalances will cause partition offsets
* to be reset. You should also prefer to provide your own listener if you are doing your own offset
* management since the listener gives you an opportunity to commit offsets before a rebalance finishes.
*
* @param topics The list of topics to subscribe to
*/
@Override
- public void subscribe(List topics) {
+ public void subscribe(Collection topics) {
subscribe(topics, new NoOpConsumerRebalanceListener());
}
@@ -784,8 +829,8 @@ public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
}
/**
- * Unsubscribe from topics currently subscribed with {@link #subscribe(List)}. This
- * also clears any partitions directly assigned through {@link #assign(List)}.
+ * Unsubscribe from topics currently subscribed with {@link #subscribe(Collection)}. This
+ * also clears any partitions directly assigned through {@link #assign(Collection)}.
*/
public void unsubscribe() {
acquire();
@@ -805,13 +850,13 @@ public void unsubscribe() {
*
* Manual topic assignment through this method does not use the consumer's group management
* functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic
- * metadata change. Note that it is not possible to use both manual partition assignment with {@link #assign(List)}
- * and group assignment with {@link #subscribe(List, ConsumerRebalanceListener)}.
+ * metadata change. Note that it is not possible to use both manual partition assignment with {@link #assign(Collection)}
+ * and group assignment with {@link #subscribe(Collection, ConsumerRebalanceListener)}.
*
* @param partitions The list of partitions to assign this consumer
*/
@Override
- public void assign(List partitions) {
+ public void assign(Collection partitions) {
acquire();
try {
log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", "));
@@ -870,7 +915,7 @@ public ConsumerRecords poll(long timeout) {
// must return these records to users to process before being interrupted or
// auto-committing offsets
fetcher.sendFetches(metadata.fetch());
- client.quickPoll();
+ client.quickPoll(false);
return this.interceptors == null
? new ConsumerRecords<>(records) : this.interceptors.onConsume(new ConsumerRecords<>(records));
}
@@ -930,7 +975,7 @@ private Map>> pollOnce(long timeout) {
* encountered (in which case it is thrown to the caller).
*
* @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried.
- * This can only occur if you are using automatic group management with {@link #subscribe(List)},
+ * This can only occur if you are using automatic group management with {@link #subscribe(Collection)},
* or if there is an active group with the same groupId which is using group management.
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
@@ -962,7 +1007,7 @@ public void commitSync() {
*
* @param offsets A map of offsets by partition with associated metadata
* @throws org.apache.kafka.clients.consumer.CommitFailedException if the commit failed and cannot be retried.
- * This can only occur if you are using automatic group management with {@link #subscribe(List)},
+ * This can only occur if you are using automatic group management with {@link #subscribe(Collection)},
* or if there is an active group with the same groupId which is using group management.
* @throws org.apache.kafka.common.errors.WakeupException if {@link #wakeup()} is called before or while this
* function is called
@@ -1062,11 +1107,10 @@ public void seek(TopicPartition partition, long offset) {
* first offset in all partitions only when {@link #poll(long)} or {@link #position(TopicPartition)} are called.
* If no partition is provided, seek to the first offset for all of the currently assigned partitions.
*/
- public void seekToBeginning(TopicPartition... partitions) {
+ public void seekToBeginning(Collection partitions) {
acquire();
try {
- Collection parts = partitions.length == 0 ? this.subscriptions.assignedPartitions()
- : Arrays.asList(partitions);
+ Collection parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
for (TopicPartition tp : parts) {
log.debug("Seeking to beginning of partition {}", tp);
subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
@@ -1081,11 +1125,10 @@ public void seekToBeginning(TopicPartition... partitions) {
* final offset in all partitions only when {@link #poll(long)} or {@link #position(TopicPartition)} are called.
* If no partition is provided, seek to the final offset for all of the currently assigned partitions.
*/
- public void seekToEnd(TopicPartition... partitions) {
+ public void seekToEnd(Collection partitions) {
acquire();
try {
- Collection parts = partitions.length == 0 ? this.subscriptions.assignedPartitions()
- : Arrays.asList(partitions);
+ Collection parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
for (TopicPartition tp : parts) {
log.debug("Seeking to end of partition {}", tp);
subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
@@ -1191,7 +1234,7 @@ public List partitionsFor(String topic) {
if (parts != null)
return parts;
- Map> topicMetadata = fetcher.getTopicMetadata(Collections.singletonList(topic), requestTimeoutMs);
+ Map> topicMetadata = fetcher.getTopicMetadata(new MetadataRequest(Collections.singletonList(topic)), requestTimeoutMs);
return topicMetadata.get(topic);
} finally {
release();
@@ -1221,13 +1264,13 @@ public Map> listTopics() {
/**
* Suspend fetching from the requested partitions. Future calls to {@link #poll(long)} will not return
- * any records from these partitions until they have been resumed using {@link #resume(TopicPartition...)}.
+ * any records from these partitions until they have been resumed using {@link #resume(Collection)}.
* Note that this method does not affect partition subscription. In particular, it does not cause a group
* rebalance when automatic assignment is used.
* @param partitions The partitions which should be paused
*/
@Override
- public void pause(TopicPartition... partitions) {
+ public void pause(Collection partitions) {
acquire();
try {
for (TopicPartition partition: partitions) {
@@ -1240,34 +1283,34 @@ public void pause(TopicPartition... partitions) {
}
/**
- * Get the set of partitions that were previously paused by a call to {@link #pause(TopicPartition...)}.
- *
- * @return The set of paused partitions
+ * Resume specified partitions which have been paused with {@link #pause(Collection)}. New calls to
+ * {@link #poll(long)} will return records from these partitions if there are any to be fetched.
+ * If the partitions were not previously paused, this method is a no-op.
+ * @param partitions The partitions which should be resumed
*/
@Override
- public Set paused() {
+ public void resume(Collection partitions) {
acquire();
try {
- return Collections.unmodifiableSet(subscriptions.pausedPartitions());
+ for (TopicPartition partition: partitions) {
+ log.debug("Resuming partition {}", partition);
+ subscriptions.resume(partition);
+ }
} finally {
release();
}
}
/**
- * Resume specified partitions which have been paused with {@link #pause(TopicPartition...)}. New calls to
- * {@link #poll(long)} will return records from these partitions if there are any to be fetched.
- * If the partitions were not previously paused, this method is a no-op.
- * @param partitions The partitions which should be resumed
+ * Get the set of partitions that were previously paused by a call to {@link #pause(Collection)}.
+ *
+ * @return The set of paused partitions
*/
@Override
- public void resume(TopicPartition... partitions) {
+ public Set paused() {
acquire();
try {
- for (TopicPartition partition: partitions) {
- log.debug("Resuming partition {}", partition);
- subscriptions.resume(partition);
- }
+ return Collections.unmodifiableSet(subscriptions.pausedPartitions());
} finally {
release();
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
index c7f0a46754b35..8dce1f1f9411c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java
@@ -67,7 +67,7 @@ public MockConsumer(OffsetResetStrategy offsetResetStrategy) {
this.exception = null;
this.wakeup = new AtomicBoolean(false);
}
-
+
@Override
public Set assignment() {
return this.subscriptions.assignedPartitions();
@@ -86,7 +86,7 @@ public Set subscription() {
}
@Override
- public void subscribe(List topics) {
+ public void subscribe(Collection topics) {
subscribe(topics, new NoOpConsumerRebalanceListener());
}
@@ -105,13 +105,13 @@ public void subscribe(Pattern pattern, final ConsumerRebalanceListener listener)
}
@Override
- public void subscribe(List topics, final ConsumerRebalanceListener listener) {
+ public void subscribe(Collection topics, final ConsumerRebalanceListener listener) {
ensureNotClosed();
this.subscriptions.subscribe(topics, listener);
}
@Override
- public void assign(List partitions) {
+ public void assign(Collection partitions) {
ensureNotClosed();
this.subscriptions.assignFromUser(partitions);
}
@@ -238,7 +238,7 @@ public long position(TopicPartition partition) {
}
@Override
- public void seekToBeginning(TopicPartition... partitions) {
+ public void seekToBeginning(Collection partitions) {
ensureNotClosed();
for (TopicPartition tp : partitions)
subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST);
@@ -249,7 +249,7 @@ public void updateBeginningOffsets(Map newOffsets) {
}
@Override
- public void seekToEnd(TopicPartition... partitions) {
+ public void seekToEnd(Collection partitions) {
ensureNotClosed();
for (TopicPartition tp : partitions)
subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST);
@@ -287,7 +287,7 @@ public void updatePartitions(String topic, List partitions) {
}
@Override
- public void pause(TopicPartition... partitions) {
+ public void pause(Collection partitions) {
for (TopicPartition partition : partitions) {
subscriptions.pause(partition);
paused.add(partition);
@@ -295,7 +295,7 @@ public void pause(TopicPartition... partitions) {
}
@Override
- public void resume(TopicPartition... partitions) {
+ public void resume(Collection partitions) {
for (TopicPartition partition : partitions) {
subscriptions.resume(partition);
paused.remove(partition);
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
index 66b257dba3099..df8bf37adbada 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/OffsetAndMetadata.java
@@ -29,9 +29,6 @@ public class OffsetAndMetadata implements Serializable {
* @param metadata Non-null metadata
*/
public OffsetAndMetadata(long offset, String metadata) {
- if (metadata == null)
- throw new IllegalArgumentException("Metadata cannot be null");
-
this.offset = offset;
this.metadata = metadata;
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index c6492bc66d2a6..15185d7de03a1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -17,6 +17,7 @@
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
+import org.apache.kafka.common.errors.GroupCoordinatorNotAvailableException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
@@ -174,7 +175,7 @@ protected abstract void onJoinComplete(int generation,
*/
public void ensureCoordinatorKnown() {
while (coordinatorUnknown()) {
- RequestFuture future = sendGroupMetadataRequest();
+ RequestFuture future = sendGroupCoordinatorRequest();
client.poll(future);
if (future.failed()) {
@@ -216,14 +217,26 @@ public void ensureActiveGroup() {
continue;
}
- RequestFuture future = performGroupJoin();
+ RequestFuture future = sendJoinGroupRequest();
+ future.addListener(new RequestFutureListener() {
+ @Override
+ public void onSuccess(ByteBuffer value) {
+ // handle join completion in the callback so that the callback will be invoked
+ // even if the consumer is woken up before finishing the rebalance
+ onJoinComplete(generation, memberId, protocol, value);
+ needsJoinPrepare = true;
+ heartbeatTask.reset();
+ }
+
+ @Override
+ public void onFailure(RuntimeException e) {
+ // we handle failures below after the request finishes. if the join completes
+ // after having been woken up, the exception is ignored and we will rejoin
+ }
+ });
client.poll(future);
- if (future.succeeded()) {
- onJoinComplete(generation, memberId, protocol, future.value());
- needsJoinPrepare = true;
- heartbeatTask.reset();
- } else {
+ if (future.failed()) {
RuntimeException exception = future.exception();
if (exception instanceof UnknownMemberIdException ||
exception instanceof RebalanceInProgressException ||
@@ -299,12 +312,12 @@ public void onFailure(RuntimeException e) {
* elected leader by the coordinator.
* @return A request future which wraps the assignment returned from the group leader
*/
- private RequestFuture performGroupJoin() {
+ private RequestFuture sendJoinGroupRequest() {
if (coordinatorUnknown())
return RequestFuture.coordinatorNotAvailable();
// send a join group request to the coordinator
- log.debug("(Re-)joining group {}", groupId);
+ log.info("(Re-)joining group {}", groupId);
JoinGroupRequest request = new JoinGroupRequest(
groupId,
this.sessionTimeoutMs,
@@ -312,8 +325,7 @@ private RequestFuture performGroupJoin() {
protocolType(),
metadata());
- // create the request for the coordinator
- log.debug("Issuing request ({}: {}) to coordinator {}", ApiKeys.JOIN_GROUP, request, this.coordinator.id());
+ log.debug("Sending JoinGroup ({}) to coordinator {}", request, this.coordinator);
return client.send(coordinator, ApiKeys.JOIN_GROUP, request)
.compose(new JoinGroupResponseHandler());
}
@@ -328,10 +340,9 @@ public JoinGroupResponse parse(ClientResponse response) {
@Override
public void handle(JoinGroupResponse joinResponse, RequestFuture future) {
- // process the response
- short errorCode = joinResponse.errorCode();
- if (errorCode == Errors.NONE.code()) {
- log.debug("Joined group: {}", joinResponse.toStruct());
+ Errors error = Errors.forCode(joinResponse.errorCode());
+ if (error == Errors.NONE) {
+ log.debug("Received successful join group response for group {}: {}", groupId, joinResponse.toStruct());
AbstractCoordinator.this.memberId = joinResponse.memberId();
AbstractCoordinator.this.generation = joinResponse.generationId();
AbstractCoordinator.this.rejoinNeeded = false;
@@ -342,37 +353,33 @@ public void handle(JoinGroupResponse joinResponse, RequestFuture fut
} else {
onJoinFollower().chain(future);
}
- } else if (errorCode == Errors.GROUP_LOAD_IN_PROGRESS.code()) {
- log.debug("Attempt to join group {} rejected since coordinator is loading the group.", groupId);
+ } else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
+ log.debug("Attempt to join group {} rejected since coordinator {} is loading the group.", groupId,
+ coordinator);
// backoff and retry
- future.raise(Errors.forCode(errorCode));
- } else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()) {
+ future.raise(error);
+ } else if (error == Errors.UNKNOWN_MEMBER_ID) {
// reset the member id and retry immediately
AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
- log.info("Attempt to join group {} failed due to unknown member id, resetting and retrying.",
- groupId);
+ log.debug("Attempt to join group {} failed due to unknown member id.", groupId);
future.raise(Errors.UNKNOWN_MEMBER_ID);
- } else if (errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()
- || errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) {
+ } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
+ || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
// re-discover the coordinator and retry with backoff
coordinatorDead();
- log.info("Attempt to join group {} failed due to obsolete coordinator information, retrying.",
- groupId);
- future.raise(Errors.forCode(errorCode));
- } else if (errorCode == Errors.INCONSISTENT_GROUP_PROTOCOL.code()
- || errorCode == Errors.INVALID_SESSION_TIMEOUT.code()
- || errorCode == Errors.INVALID_GROUP_ID.code()) {
+ log.debug("Attempt to join group {} failed due to obsolete coordinator information: {}", groupId, error.message());
+ future.raise(error);
+ } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
+ || error == Errors.INVALID_SESSION_TIMEOUT
+ || error == Errors.INVALID_GROUP_ID) {
// log the error and re-throw the exception
- Errors error = Errors.forCode(errorCode);
- log.error("Attempt to join group {} failed due to: {}",
- groupId, error.exception().getMessage());
+ log.error("Attempt to join group {} failed due to fatal error: {}", groupId, error.message());
future.raise(error);
- } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
+ } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(groupId));
} else {
// unexpected error, throw the exception
- future.raise(new KafkaException("Unexpected error in join group response: "
- + Errors.forCode(joinResponse.errorCode()).exception().getMessage()));
+ future.raise(new KafkaException("Unexpected error in join group response: " + error.message()));
}
}
}
@@ -381,7 +388,7 @@ private RequestFuture onJoinFollower() {
// send follower's sync group with an empty assignment
SyncGroupRequest request = new SyncGroupRequest(groupId, generation,
memberId, Collections.emptyMap());
- log.debug("Issuing follower SyncGroup ({}: {}) to coordinator {}", ApiKeys.SYNC_GROUP, request, this.coordinator.id());
+ log.debug("Sending follower SyncGroup for group {} to coordinator {}: {}", groupId, this.coordinator, request);
return sendSyncGroupRequest(request);
}
@@ -392,7 +399,7 @@ private RequestFuture onJoinLeader(JoinGroupResponse joinResponse) {
joinResponse.members());
SyncGroupRequest request = new SyncGroupRequest(groupId, generation, memberId, groupAssignment);
- log.debug("Issuing leader SyncGroup ({}: {}) to coordinator {}", ApiKeys.SYNC_GROUP, request, this.coordinator.id());
+ log.debug("Sending leader SyncGroup for group {} to coordinator {}: {}", groupId, this.coordinator, request);
return sendSyncGroupRequest(request);
} catch (RuntimeException e) {
return RequestFuture.failure(e);
@@ -403,10 +410,10 @@ private RequestFuture sendSyncGroupRequest(SyncGroupRequest request)
if (coordinatorUnknown())
return RequestFuture.coordinatorNotAvailable();
return client.send(coordinator, ApiKeys.SYNC_GROUP, request)
- .compose(new SyncGroupRequestHandler());
+ .compose(new SyncGroupResponseHandler());
}
- private class SyncGroupRequestHandler extends CoordinatorResponseHandler {
+ private class SyncGroupResponseHandler extends CoordinatorResponseHandler {
@Override
public SyncGroupResponse parse(ClientResponse response) {
@@ -418,7 +425,7 @@ public void handle(SyncGroupResponse syncResponse,
RequestFuture future) {
Errors error = Errors.forCode(syncResponse.errorCode());
if (error == Errors.NONE) {
- log.debug("Received successful sync group response for group {}: {}", groupId, syncResponse.toStruct());
+ log.info("Successfully joined group {} with generation {}", groupId, generation);
sensors.syncLatency.record(response.requestLatencyMs());
future.complete(syncResponse.memberAssignment());
} else {
@@ -426,20 +433,20 @@ public void handle(SyncGroupResponse syncResponse,
if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(groupId));
} else if (error == Errors.REBALANCE_IN_PROGRESS) {
- log.info("SyncGroup for group {} failed due to coordinator rebalance, rejoining the group", groupId);
+ log.debug("SyncGroup for group {} failed due to coordinator rebalance", groupId);
future.raise(error);
} else if (error == Errors.UNKNOWN_MEMBER_ID
|| error == Errors.ILLEGAL_GENERATION) {
- log.info("SyncGroup for group {} failed due to {}, rejoining the group", groupId, error);
+ log.debug("SyncGroup for group {} failed due to {}", groupId, error);
AbstractCoordinator.this.memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
future.raise(error);
} else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR_FOR_GROUP) {
- log.info("SyncGroup for group {} failed due to {}, will find new coordinator and rejoin", groupId, error);
+ log.debug("SyncGroup for group {} failed due to {}", groupId, error);
coordinatorDead();
future.raise(error);
} else {
- future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.exception().getMessage()));
+ future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.message()));
}
}
}
@@ -450,7 +457,7 @@ public void handle(SyncGroupResponse syncResponse,
* one of the brokers. The returned future should be polled to get the result of the request.
* @return A request future which indicates the completion of the metadata request
*/
- private RequestFuture sendGroupMetadataRequest() {
+ private RequestFuture sendGroupCoordinatorRequest() {
// initiate the group metadata request
// find a node to ask about the coordinator
Node node = this.client.leastLoadedNode();
@@ -460,7 +467,7 @@ private RequestFuture sendGroupMetadataRequest() {
return RequestFuture.noBrokersAvailable();
} else {
// create a group metadata request
- log.debug("Issuing group metadata request to broker {}", node.id());
+ log.debug("Sending coordinator request for group {} to broker {}", groupId, node);
GroupCoordinatorRequest metadataRequest = new GroupCoordinatorRequest(this.groupId);
return client.send(node, ApiKeys.GROUP_COORDINATOR, metadataRequest)
.compose(new RequestFutureAdapter() {
@@ -473,7 +480,7 @@ public void onSuccess(ClientResponse response, RequestFuture future) {
}
private void handleGroupMetadataResponse(ClientResponse resp, RequestFuture future) {
- log.debug("Group metadata response {}", resp);
+ log.debug("Received group coordinator response {}", resp);
if (!coordinatorUnknown()) {
// We already found the coordinator, so ignore the request
@@ -483,22 +490,24 @@ private void handleGroupMetadataResponse(ClientResponse resp, RequestFuture 0)
heartbeatTask.reset();
future.complete(null);
- } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
+ } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(groupId));
} else {
- future.raise(Errors.forCode(errorCode));
+ future.raise(error);
}
}
}
@@ -524,7 +533,8 @@ public boolean coordinatorUnknown() {
*/
protected void coordinatorDead() {
if (this.coordinator != null) {
- log.info("Marking the coordinator {} dead.", this.coordinator.id());
+ log.info("Marking the coordinator {} dead for group {}", this.coordinator, groupId);
+ client.failUnsentRequests(this.coordinator, GroupCoordinatorNotAvailableException.INSTANCE);
this.coordinator = null;
}
}
@@ -566,7 +576,7 @@ public void onSuccess(Void value) {}
@Override
public void onFailure(RuntimeException e) {
- log.info("LeaveGroup request failed with error", e);
+ log.debug("LeaveGroup request for group {} failed with error", groupId, e);
}
});
@@ -608,33 +618,33 @@ public HeartbeatResponse parse(ClientResponse response) {
@Override
public void handle(HeartbeatResponse heartbeatResponse, RequestFuture future) {
sensors.heartbeatLatency.record(response.requestLatencyMs());
- short errorCode = heartbeatResponse.errorCode();
- if (errorCode == Errors.NONE.code()) {
- log.debug("Received successful heartbeat response.");
+ Errors error = Errors.forCode(heartbeatResponse.errorCode());
+ if (error == Errors.NONE) {
+ log.debug("Received successful heartbeat response for group {}", groupId);
future.complete(null);
- } else if (errorCode == Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()
- || errorCode == Errors.NOT_COORDINATOR_FOR_GROUP.code()) {
- log.info("Attempt to heart beat failed since coordinator is either not started or not valid, marking it as dead.");
+ } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
+ || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
+ log.debug("Attempt to heart beat failed for group {} since coordinator {} is either not started or not valid.",
+ groupId, coordinator);
coordinatorDead();
- future.raise(Errors.forCode(errorCode));
- } else if (errorCode == Errors.REBALANCE_IN_PROGRESS.code()) {
- log.info("Attempt to heart beat failed since the group is rebalancing, try to re-join group.");
+ future.raise(error);
+ } else if (error == Errors.REBALANCE_IN_PROGRESS) {
+ log.debug("Attempt to heart beat failed for group {} since it is rebalancing.", groupId);
AbstractCoordinator.this.rejoinNeeded = true;
future.raise(Errors.REBALANCE_IN_PROGRESS);
- } else if (errorCode == Errors.ILLEGAL_GENERATION.code()) {
- log.info("Attempt to heart beat failed since generation id is not legal, try to re-join group.");
+ } else if (error == Errors.ILLEGAL_GENERATION) {
+ log.debug("Attempt to heart beat failed for group {} since generation id is not legal.", groupId);
AbstractCoordinator.this.rejoinNeeded = true;
future.raise(Errors.ILLEGAL_GENERATION);
- } else if (errorCode == Errors.UNKNOWN_MEMBER_ID.code()) {
- log.info("Attempt to heart beat failed since member id is not valid, reset it and try to re-join group.");
+ } else if (error == Errors.UNKNOWN_MEMBER_ID) {
+ log.debug("Attempt to heart beat failed for group {} since member id is not valid.", groupId);
memberId = JoinGroupRequest.UNKNOWN_MEMBER_ID;
AbstractCoordinator.this.rejoinNeeded = true;
future.raise(Errors.UNKNOWN_MEMBER_ID);
- } else if (errorCode == Errors.GROUP_AUTHORIZATION_FAILED.code()) {
+ } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
future.raise(new GroupAuthorizationException(groupId));
} else {
- future.raise(new KafkaException("Unexpected errorCode in heartbeat response: "
- + Errors.forCode(errorCode).exception().getMessage()));
+ future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
}
}
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index aa39e11929116..887f47c1cde4c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -26,6 +26,7 @@
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.internals.TopicConstants;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
@@ -62,13 +63,16 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
private final List assignors;
private final org.apache.kafka.clients.Metadata metadata;
- private final MetadataSnapshot metadataSnapshot;
private final ConsumerCoordinatorMetrics sensors;
private final SubscriptionState subscriptions;
private final OffsetCommitCallback defaultOffsetCommitCallback;
private final boolean autoCommitEnabled;
private final AutoCommitTask autoCommitTask;
- private final ConsumerInterceptors interceptors;
+ private final ConsumerInterceptors, ?> interceptors;
+ private final boolean excludeInternalTopics;
+
+ private MetadataSnapshot metadataSnapshot;
+ private MetadataSnapshot assignmentSnapshot;
/**
* Initialize the coordination manager.
@@ -87,7 +91,8 @@ public ConsumerCoordinator(ConsumerNetworkClient client,
OffsetCommitCallback defaultOffsetCommitCallback,
boolean autoCommitEnabled,
long autoCommitIntervalMs,
- ConsumerInterceptors interceptors) {
+ ConsumerInterceptors, ?> interceptors,
+ boolean excludeInternalTopics) {
super(client,
groupId,
sessionTimeoutMs,
@@ -99,7 +104,7 @@ public ConsumerCoordinator(ConsumerNetworkClient client,
this.metadata = metadata;
this.metadata.requestUpdate();
- this.metadataSnapshot = new MetadataSnapshot();
+ this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch());
this.subscriptions = subscriptions;
this.defaultOffsetCommitCallback = defaultOffsetCommitCallback;
this.autoCommitEnabled = autoCommitEnabled;
@@ -107,9 +112,16 @@ public ConsumerCoordinator(ConsumerNetworkClient client,
addMetadataListener();
- this.autoCommitTask = autoCommitEnabled ? new AutoCommitTask(autoCommitIntervalMs) : null;
+ if (autoCommitEnabled) {
+ this.autoCommitTask = new AutoCommitTask(autoCommitIntervalMs);
+ this.autoCommitTask.reschedule();
+ } else {
+ this.autoCommitTask = null;
+ }
+
this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix);
this.interceptors = interceptors;
+ this.excludeInternalTopics = excludeInternalTopics;
}
@Override
@@ -140,7 +152,8 @@ public void onMetadataUpdate(Cluster cluster) {
final List topicsToSubscribe = new ArrayList<>();
for (String topic : cluster.topics())
- if (subscriptions.getSubscribedPattern().matcher(topic).matches())
+ if (subscriptions.getSubscribedPattern().matcher(topic).matches() &&
+ !(excludeInternalTopics && TopicConstants.INTERNAL_TOPICS.contains(topic)))
topicsToSubscribe.add(topic);
subscriptions.changeSubscription(topicsToSubscribe);
@@ -148,8 +161,14 @@ public void onMetadataUpdate(Cluster cluster) {
}
// check if there are any changes to the metadata which should trigger a rebalance
- if (metadataSnapshot.update(subscriptions, cluster) && subscriptions.partitionsAutoAssigned())
- subscriptions.needReassignment();
+ if (subscriptions.partitionsAutoAssigned()) {
+ MetadataSnapshot snapshot = new MetadataSnapshot(subscriptions, cluster);
+ if (!snapshot.equals(metadataSnapshot)) {
+ metadataSnapshot = snapshot;
+ subscriptions.needReassignment();
+ }
+ }
+
}
});
}
@@ -167,6 +186,13 @@ protected void onJoinComplete(int generation,
String memberId,
String assignmentStrategy,
ByteBuffer assignmentBuffer) {
+ // if we were the assignor, then we need to make sure that there have been no metadata updates
+ // since the rebalance begin. Otherwise, we won't rebalance again until the next metadata change
+ if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot)) {
+ subscriptions.needReassignment();
+ return;
+ }
+
PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
if (assignor == null)
throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
@@ -182,21 +208,21 @@ protected void onJoinComplete(int generation,
// give the assignor a chance to update internal state based on the received assignment
assignor.onAssignment(assignment);
- // restart the autocommit task if needed
+ // reschedule the auto commit starting from now
if (autoCommitEnabled)
- autoCommitTask.enable();
+ autoCommitTask.reschedule();
// execute the user's callback after rebalance
ConsumerRebalanceListener listener = subscriptions.listener();
- log.debug("Setting newly assigned partitions {}", subscriptions.assignedPartitions());
+ log.info("Setting newly assigned partitions {} for group {}", subscriptions.assignedPartitions(), groupId);
try {
Set assigned = new HashSet<>(subscriptions.assignedPartitions());
listener.onPartitionsAssigned(assigned);
} catch (WakeupException e) {
throw e;
} catch (Exception e) {
- log.error("User provided listener " + listener.getClass().getName()
- + " failed on partition assignment: ", e);
+ log.error("User provided listener {} for group {} failed on partition assignment",
+ listener.getClass().getName(), groupId, e);
}
}
@@ -220,13 +246,18 @@ protected Map performAssignment(String leaderId,
// which ensures that all metadata changes will eventually be seen
this.subscriptions.groupSubscribe(allSubscribedTopics);
metadata.setTopics(this.subscriptions.groupSubscription());
+
+ // update metadata (if needed) and keep track of the metadata used for assignment so that
+ // we can check after rebalance completion whether anything has changed
client.ensureFreshMetadata();
+ assignmentSnapshot = metadataSnapshot;
- log.debug("Performing {} assignment for subscriptions {}", assignor.name(), subscriptions);
+ log.debug("Performing assignment for group {} using strategy {} with subscriptions {}",
+ groupId, assignor.name(), subscriptions);
Map assignment = assignor.assign(metadata.fetch(), subscriptions);
- log.debug("Finished assignment: {}", assignment);
+ log.debug("Finished assignment for group {}: {}", groupId, assignment);
Map groupAssignment = new HashMap<>();
for (Map.Entry assignmentEntry : assignment.entrySet()) {
@@ -244,17 +275,18 @@ protected void onJoinPrepare(int generation, String memberId) {
// execute the user's callback before rebalance
ConsumerRebalanceListener listener = subscriptions.listener();
- log.debug("Revoking previously assigned partitions {}", subscriptions.assignedPartitions());
+ log.info("Revoking previously assigned partitions {} for group {}", subscriptions.assignedPartitions(), groupId);
try {
Set revoked = new HashSet<>(subscriptions.assignedPartitions());
listener.onPartitionsRevoked(revoked);
} catch (WakeupException e) {
throw e;
} catch (Exception e) {
- log.error("User provided listener " + listener.getClass().getName()
- + " failed on partition revocation: ", e);
+ log.error("User provided listener {} for group {} failed on partition revocation",
+ listener.getClass().getName(), groupId, e);
}
+ assignmentSnapshot = null;
subscriptions.needReassignment();
}
@@ -339,6 +371,10 @@ public void onFailure(RuntimeException e) {
cb.onComplete(offsets, e);
}
});
+
+ // ensure commit has a chance to be transmitted (without blocking on its completion)
+ // note that we allow delayed tasks to be executed in case heartbeats need to be sent
+ client.quickPoll(true);
}
/**
@@ -374,59 +410,40 @@ public void commitOffsetsSync(Map offsets) {
private class AutoCommitTask implements DelayedTask {
private final long interval;
- private boolean enabled = false;
- private boolean requestInFlight = false;
public AutoCommitTask(long interval) {
this.interval = interval;
}
- public void enable() {
- if (!enabled) {
- // there shouldn't be any instances scheduled, but call unschedule anyway to ensure
- // that this task is only ever scheduled once
- client.unschedule(this);
- this.enabled = true;
-
- if (!requestInFlight) {
- long now = time.milliseconds();
- client.schedule(this, interval + now);
- }
- }
- }
-
- public void disable() {
- this.enabled = false;
- client.unschedule(this);
+ private void reschedule() {
+ client.schedule(this, time.milliseconds() + interval);
}
private void reschedule(long at) {
- if (enabled)
- client.schedule(this, at);
+ client.schedule(this, at);
}
public void run(final long now) {
- if (!enabled)
+ if (coordinatorUnknown()) {
+ log.debug("Cannot auto-commit offsets for group {} since the coordinator is unknown", groupId);
+ reschedule(now + retryBackoffMs);
return;
+ }
- if (coordinatorUnknown()) {
- log.debug("Cannot auto-commit offsets now since the coordinator is unknown, will retry after backoff");
- client.schedule(this, now + retryBackoffMs);
+ if (needRejoin()) {
+ // skip the commit when we're rejoining since we'll commit offsets synchronously
+ // before the revocation callback is invoked
+ reschedule(now + interval);
return;
}
- requestInFlight = true;
commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() {
@Override
public void onComplete(Map offsets, Exception exception) {
- requestInFlight = false;
if (exception == null) {
reschedule(now + interval);
- } else if (exception instanceof SendFailedException) {
- log.debug("Failed to send automatic offset commit, will retry immediately");
- reschedule(now);
} else {
- log.warn("Auto offset commit failed: {}", exception.getMessage());
+ log.warn("Auto offset commit failed for group {}: {}", groupId, exception.getMessage());
reschedule(now + interval);
}
}
@@ -436,10 +453,6 @@ public void onComplete(Map offsets, Exception
private void maybeAutoCommitOffsetsSync() {
if (autoCommitEnabled) {
- // disable periodic commits prior to committing synchronously. note that they will
- // be re-enabled after a rebalance completes
- autoCommitTask.disable();
-
try {
commitOffsetsSync(subscriptions.allConsumed());
} catch (WakeupException e) {
@@ -447,7 +460,7 @@ private void maybeAutoCommitOffsetsSync() {
throw e;
} catch (Exception e) {
// consistent with async auto-commit failures, we do not propagate the exception
- log.warn("Auto offset commit failed: ", e.getMessage());
+ log.warn("Auto offset commit failed for group {}: {}", groupId, e.getMessage());
}
}
}
@@ -481,7 +494,7 @@ private RequestFuture sendOffsetCommitRequest(final Map futu
Errors error = Errors.forCode(entry.getValue());
if (error == Errors.NONE) {
- log.debug("Committed offset {} for partition {}", offset, tp);
+ log.debug("Group {} committed offset {} for partition {}", groupId, offset, tp);
if (subscriptions.isAssigned(tp))
// update the local cache only if the partition is still assigned
subscriptions.committed(tp, offsetAndMetadata);
} else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
- log.error("Unauthorized to commit for group {}", groupId);
+ log.error("Not authorized to commit offsets for group {}", groupId);
future.raise(new GroupAuthorizationException(groupId));
return;
} else if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
@@ -533,18 +546,18 @@ public void handle(OffsetCommitResponse commitResponse, RequestFuture futu
} else if (error == Errors.OFFSET_METADATA_TOO_LARGE
|| error == Errors.INVALID_COMMIT_OFFSET_SIZE) {
// raise the error to the user
- log.info("Offset commit for group {} failed on partition {} due to {}, will retry", groupId, tp, error);
+ log.debug("Offset commit for group {} failed on partition {}: {}", groupId, tp, error.message());
future.raise(error);
return;
} else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
// just retry
- log.info("Offset commit for group {} failed due to {}, will retry", groupId, error);
+ log.debug("Offset commit for group {} failed: {}", groupId, error.message());
future.raise(error);
return;
} else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE
|| error == Errors.NOT_COORDINATOR_FOR_GROUP
|| error == Errors.REQUEST_TIMED_OUT) {
- log.info("Offset commit for group {} failed due to {}, will find new coordinator and retry", groupId, error);
+ log.debug("Offset commit for group {} failed: {}", groupId, error.message());
coordinatorDead();
future.raise(error);
return;
@@ -552,19 +565,24 @@ public void handle(OffsetCommitResponse commitResponse, RequestFuture futu
|| error == Errors.ILLEGAL_GENERATION
|| error == Errors.REBALANCE_IN_PROGRESS) {
// need to re-join group
- log.error("Error {} occurred while committing offsets for group {}", error, groupId);
+ log.debug("Offset commit for group {} failed: {}", groupId, error.message());
subscriptions.needReassignment();
- future.raise(new CommitFailedException("Commit cannot be completed due to group rebalance"));
+ future.raise(new CommitFailedException("Commit cannot be completed since the group has already " +
+ "rebalanced and assigned the partitions to another member. This means that the time " +
+ "between subsequent calls to poll() was longer than the configured session.timeout.ms, " +
+ "which typically implies that the poll loop is spending too much time message processing. " +
+ "You can address this either by increasing the session timeout or by reducing the maximum " +
+ "size of batches returned in poll() with max.poll.records."));
return;
} else {
- log.error("Error committing partition {} at offset {}: {}", tp, offset, error.exception().getMessage());
- future.raise(new KafkaException("Unexpected error in commit: " + error.exception().getMessage()));
+ log.error("Group {} failed to commit partition {} at offset {}: {}", groupId, tp, offset, error.message());
+ future.raise(new KafkaException("Unexpected error in commit: " + error.message()));
return;
}
}
if (!unauthorizedTopics.isEmpty()) {
- log.error("Unauthorized to commit to topics {}", unauthorizedTopics);
+ log.error("Not authorized to commit to topics {} for group {}", unauthorizedTopics, groupId);
future.raise(new TopicAuthorizationException(unauthorizedTopics));
} else {
future.complete(null);
@@ -583,9 +601,9 @@ private RequestFuture