From c632ccda0192bc3821a00dbd3108b5a96240acd7 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Wed, 23 Mar 2016 19:13:03 -0700 Subject: [PATCH] KAFKA-3418: add javadoc section describing consumer failure detection --- .../kafka/clients/consumer/KafkaConsumer.java | 71 +++++++++++++++---- 1 file changed, 57 insertions(+), 14 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index b15d07f80969e..011e351b0850e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -74,7 +74,7 @@ *

Offsets and Consumer Position

* Kafka maintains a numerical offset for each record in a partition. This offset acts as a kind of unique identifier of * a record within that partition, and also denotes the position of the consumer in the partition. That is, a consumer - * which has position 5 has consumed records with offsets 0 through 4 and will next receive record with offset 5. There + * which has position 5 has consumed records with offsets 0 through 4 and will next receive the record with offset 5. There * are actually two notions of position relevant to the user of the consumer. *

* The {@link #position(TopicPartition) position} of the consumer gives the offset of the next record that will be given @@ -93,22 +93,23 @@ * *

Consumer Groups and Topic Subscriptions

* - * Kafka uses the concept of consumer groups to allow a pool of processes to divide up the work of consuming and + * Kafka uses the concept of consumer groups to allow a pool of processes to divide the work of consuming and * processing records. These processes can either be running on the same machine or, as is more likely, they can be - * distributed over many machines to provide additional scalability and fault tolerance for processing. + * distributed over many machines to provide scalability and fault tolerance for processing. *

- * Each Kafka consumer is able to configure a consumer group that it belongs to, and can dynamically set the - * list of topics it wants to subscribe to through {@link #subscribe(Collection, ConsumerRebalanceListener)}, - * or subscribe to all topics matching certain pattern through {@link #subscribe(Pattern, ConsumerRebalanceListener)}. - * Kafka will deliver each message in the - * subscribed topics to one process in each consumer group. This is achieved by balancing the partitions in the topic - * over the consumer processes in each group. So if there is a topic with four partitions, and a consumer group with two - * processes, each process would consume from two partitions. This group membership is maintained dynamically: if a - * process fails the partitions assigned to it will be reassigned to other processes in the same group, and if a new - * process joins the group, partitions will be moved from existing consumers to this new process. + * Each Kafka consumer is able to configure a consumer group that it belongs to, and can dynamically set the list + * of topics it wants to subscribe to through one of the {@link #subscribe(Collection, ConsumerRebalanceListener) subscribe} + * APIs. Kafka will deliver each message in the subscribed topics to one process in each consumer group. + * This is achieved by balancing the partitions between all members in the consumer group so that each partition is + * assigned to exactly one consumer in the group. So if there is a topic with four partitions, and a consumer group with two + * processes, each process would consume from two partitions. *

- * So if two processes subscribe to a topic both specifying different groups they will each get all the records in that - * topic; if they both specify the same group they will each get about half the records. + * Membership in a consumer group is maintained dynamically: if a process fails, the partitions assigned to it will + * be reassigned to other consumers in the same group. Similarly, if a new consumer joins the group, partitions will be moved + * from existing consumers to the new one. This is known as rebalancing the group and is discussed in more + * detail below. Note that the same process is also used when new partitions are added + * to one of the subscribed topics: the group automatically detects the new partitions and rebalances the group so + * that every new partition is assigned to one of the members. *

* Conceptually you can think of a consumer group as being a single logical subscriber that happens to be made up of * multiple processes. As a multi-subscriber system, Kafka naturally supports having any number of consumer groups for a @@ -128,6 +129,48 @@ * It is also possible for the consumer to manually specify the partitions that are assigned to it through {@link #assign(Collection)}, * which disables this dynamic partition assignment. * + *

Detecting Consumer Failures

+ * + * After subscribing to a set of topics, the consumer will automatically join the group when {@link #poll(long)} is + * invoked. The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer + * will stay in the group and continue to receive messages from the partitions it was assigned. Underneath the covers, + * the poll API sends periodic heartbeats to the server; when you stop calling poll (perhaps because an exception was thrown), + * then no heartbeats will be sent. If a period of the configured session timeout elapses before the server + * has received a heartbeat, then the consumer will be kicked out of the group and its partitions will be reassigned. + * This is designed to prevent situations where the consumer has failed, yet continues to hold onto the partitions + * it was assigned (thus preventing active consumers in the group from taking them). To stay in the group, you + * have to prove you are still alive by calling poll. + *

+ * The implication of this design is that message processing time in the poll loop must be bounded so that + * heartbeats can be sent before expiration of the session timeout. What typically happens when processing time + * exceeds the session timeout is that the consumer won't be able to commit offsets for any of the processed records. + * For example, this is indicated by a {@link CommitFailedException} thrown from {@link #commitSync()}. This + * guarantees that only active members of the group are allowed to commit offsets. If the consumer + * has been kicked out of the group, then its partitions will have been assigned to another member, which will be + * committing its own offsets as it handles new records. This gives offset commits an isolation guarantee. + *

+ * The consumer provides two configuration settings to control this behavior: + *

    + *
  1. session.timeout.ms: By increasing the session timeout, you can give the consumer more + * time to handle a batch of records returned from {@link #poll(long)}. The only drawback is that it + * will take longer for the server to detect hard consumer failures, which can cause a delay before + * a rebalance can be completed. However, clean shutdown with {@link #close()} is not impacted since + * the consumer will send an explicit message to the server to leave the group and cause an immediate + * rebalance.
  2. + *
  3. max.poll.records: Processing time in the poll loop is typically proportional to the number + * of records processed, so it's natural to want to set a limit on the number of records handled at once. + * This setting provides that. By default, there is essentially no limit.
  4. + *
+ *

+ * For use cases where message processing time varies unpredictably, neither of these options may be viable. + * The recommended way to handle these cases is to move message processing to another thread, which allows + * the consumer to continue sending heartbeats while the processor is still working. Some care must be taken + * to ensure that committed offsets do not get ahead of the actual position. Typically, you must disable automatic + * commits and manually commit processed offsets for records only after the thread has finished handling them + * (depending on the delivery semantics you need). Note also that you will generally need to {@link #pause(Collection)} + * the partition so that no new records are received from poll until after thread has finished handling those + * previously returned. + * *

Usage Examples

* The consumer APIs offer flexibility to cover a variety of consumption use cases. Here are some examples to * demonstrate how to use them.