diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index b15d07f80969e..011e351b0850e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -74,7 +74,7 @@ *
* The {@link #position(TopicPartition) position} of the consumer gives the offset of the next record that will be given @@ -93,22 +93,23 @@ * *
- * Each Kafka consumer is able to configure a consumer group that it belongs to, and can dynamically set the - * list of topics it wants to subscribe to through {@link #subscribe(Collection, ConsumerRebalanceListener)}, - * or subscribe to all topics matching certain pattern through {@link #subscribe(Pattern, ConsumerRebalanceListener)}. - * Kafka will deliver each message in the - * subscribed topics to one process in each consumer group. This is achieved by balancing the partitions in the topic - * over the consumer processes in each group. So if there is a topic with four partitions, and a consumer group with two - * processes, each process would consume from two partitions. This group membership is maintained dynamically: if a - * process fails the partitions assigned to it will be reassigned to other processes in the same group, and if a new - * process joins the group, partitions will be moved from existing consumers to this new process. + * Each Kafka consumer is able to configure a consumer group that it belongs to, and can dynamically set the list + * of topics it wants to subscribe to through one of the {@link #subscribe(Collection, ConsumerRebalanceListener) subscribe} + * APIs. Kafka will deliver each message in the subscribed topics to one process in each consumer group. + * This is achieved by balancing the partitions between all members in the consumer group so that each partition is + * assigned to exactly one consumer in the group. So if there is a topic with four partitions, and a consumer group with two + * processes, each process would consume from two partitions. *
- * So if two processes subscribe to a topic both specifying different groups they will each get all the records in that - * topic; if they both specify the same group they will each get about half the records. + * Membership in a consumer group is maintained dynamically: if a process fails, the partitions assigned to it will + * be reassigned to other consumers in the same group. Similarly, if a new consumer joins the group, partitions will be moved + * from existing consumers to the new one. This is known as rebalancing the group and is discussed in more + * detail below. Note that the same process is also used when new partitions are added + * to one of the subscribed topics: the group automatically detects the new partitions and rebalances the group so + * that every new partition is assigned to one of the members. *
* Conceptually you can think of a consumer group as being a single logical subscriber that happens to be made up of * multiple processes. As a multi-subscriber system, Kafka naturally supports having any number of consumer groups for a @@ -128,6 +129,48 @@ * It is also possible for the consumer to manually specify the partitions that are assigned to it through {@link #assign(Collection)}, * which disables this dynamic partition assignment. * + *
+ * The implication of this design is that message processing time in the poll loop must be bounded so that + * heartbeats can be sent before expiration of the session timeout. What typically happens when processing time + * exceeds the session timeout is that the consumer won't be able to commit offsets for any of the processed records. + * For example, this is indicated by a {@link CommitFailedException} thrown from {@link #commitSync()}. This + * guarantees that only active members of the group are allowed to commit offsets. If the consumer + * has been kicked out of the group, then its partitions will have been assigned to another member, which will be + * committing its own offsets as it handles new records. This gives offset commits an isolation guarantee. + *
+ * The consumer provides two configuration settings to control this behavior: + *
session.timeout.ms: By increasing the session timeout, you can give the consumer more
+ * time to handle a batch of records returned from {@link #poll(long)}. The only drawback is that it
+ * will take longer for the server to detect hard consumer failures, which can cause a delay before
+ * a rebalance can be completed. However, clean shutdown with {@link #close()} is not impacted since
+ * the consumer will send an explicit message to the server to leave the group and cause an immediate
+ * rebalance.max.poll.records: Processing time in the poll loop is typically proportional to the number
+ * of records processed, so it's natural to want to set a limit on the number of records handled at once.
+ * This setting provides that. By default, there is essentially no limit.+ * For use cases where message processing time varies unpredictably, neither of these options may be viable. + * The recommended way to handle these cases is to move message processing to another thread, which allows + * the consumer to continue sending heartbeats while the processor is still working. Some care must be taken + * to ensure that committed offsets do not get ahead of the actual position. Typically, you must disable automatic + * commits and manually commit processed offsets for records only after the thread has finished handling them + * (depending on the delivery semantics you need). Note also that you will generally need to {@link #pause(Collection)} + * the partition so that no new records are received from poll until after thread has finished handling those + * previously returned. + * *