Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@
* <h3>Offsets and Consumer Position</h3>
* Kafka maintains a numerical offset for each record in a partition. This offset acts as a kind of unique identifier of
* a record within that partition, and also denotes the position of the consumer in the partition. That is, a consumer
* which has position 5 has consumed records with offsets 0 through 4 and will next receive record with offset 5. There
* which has position 5 has consumed records with offsets 0 through 4 and will next receive the record with offset 5. There
* are actually two notions of position relevant to the user of the consumer.
* <p>
* The {@link #position(TopicPartition) position} of the consumer gives the offset of the next record that will be given
Expand All @@ -93,22 +93,23 @@
*
* <h3>Consumer Groups and Topic Subscriptions</h3>
*
* Kafka uses the concept of <i>consumer groups</i> to allow a pool of processes to divide up the work of consuming and
* Kafka uses the concept of <i>consumer groups</i> to allow a pool of processes to divide the work of consuming and
* processing records. These processes can either be running on the same machine or, as is more likely, they can be
* distributed over many machines to provide additional scalability and fault tolerance for processing.
* distributed over many machines to provide scalability and fault tolerance for processing.
* <p>
* Each Kafka consumer is able to configure a consumer group that it belongs to, and can dynamically set the
* list of topics it wants to subscribe to through {@link #subscribe(Collection, ConsumerRebalanceListener)},
* or subscribe to all topics matching certain pattern through {@link #subscribe(Pattern, ConsumerRebalanceListener)}.
* Kafka will deliver each message in the
* subscribed topics to one process in each consumer group. This is achieved by balancing the partitions in the topic
* over the consumer processes in each group. So if there is a topic with four partitions, and a consumer group with two
* processes, each process would consume from two partitions. This group membership is maintained dynamically: if a
* process fails the partitions assigned to it will be reassigned to other processes in the same group, and if a new
* process joins the group, partitions will be moved from existing consumers to this new process.
* Each Kafka consumer is able to configure a consumer group that it belongs to, and can dynamically set the list
* of topics it wants to subscribe to through one of the {@link #subscribe(Collection, ConsumerRebalanceListener) subscribe}
* APIs. Kafka will deliver each message in the subscribed topics to one process in each consumer group.
* This is achieved by balancing the partitions between all members in the consumer group so that each partition is
* assigned to exactly one consumer in the group. So if there is a topic with four partitions, and a consumer group with two
* processes, each process would consume from two partitions.
* <p>
* So if two processes subscribe to a topic both specifying different groups they will each get all the records in that
* topic; if they both specify the same group they will each get about half the records.
* Membership in a consumer group is maintained dynamically: if a process fails, the partitions assigned to it will
* be reassigned to other consumers in the same group. Similarly, if a new consumer joins the group, partitions will be moved
* from existing consumers to the new one. This is known as <i>rebalancing</i> the group and is discussed in more
* detail <a href="#failuredetection">below</a>. Note that the same process is also used when new partitions are added
* to one of the subscribed topics: the group automatically detects the new partitions and rebalances the group so
* that every new partition is assigned to one of the members.
* <p>
* Conceptually you can think of a consumer group as being a single logical subscriber that happens to be made up of
* multiple processes. As a multi-subscriber system, Kafka naturally supports having any number of consumer groups for a
Expand All @@ -128,6 +129,48 @@
* It is also possible for the consumer to manually specify the partitions that are assigned to it through {@link #assign(Collection)},
* which disables this dynamic partition assignment.
*
* <h3><a name="failuredetection">Detecting Consumer Failures</a></h3>
*
* After subscribing to a set of topics, the consumer will automatically join the group when {@link #poll(long)} is
* invoked. The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer
* will stay in the group and continue to receive messages from the partitions it was assigned. Underneath the covers,
* the poll API sends periodic heartbeats to the server; when you stop calling poll (perhaps because an exception was thrown),
* then no heartbeats will be sent. If a period of the configured <i>session timeout</i> elapses before the server
* has received a heartbeat, then the consumer will be kicked out of the group and its partitions will be reassigned.
* This is designed to prevent situations where the consumer has failed, yet continues to hold onto the partitions
* it was assigned (thus preventing active consumers in the group from taking them). To stay in the group, you
* have to prove you are still alive by calling poll.
* <p>
* The implication of this design is that message processing time in the poll loop must be bounded so that
* heartbeats can be sent before expiration of the session timeout. What typically happens when processing time
* exceeds the session timeout is that the consumer won't be able to commit offsets for any of the processed records.
* For example, this is indicated by a {@link CommitFailedException} thrown from {@link #commitSync()}. This
* guarantees that only active members of the group are allowed to commit offsets. If the consumer
* has been kicked out of the group, then its partitions will have been assigned to another member, which will be
* committing its own offsets as it handles new records. This gives offset commits an isolation guarantee.
* <p>
* The consumer provides two configuration settings to control this behavior:
* <ol>
* <li><code>session.timeout.ms</code>: By increasing the session timeout, you can give the consumer more
* time to handle a batch of records returned from {@link #poll(long)}. The only drawback is that it
* will take longer for the server to detect hard consumer failures, which can cause a delay before
* a rebalance can be completed. However, clean shutdown with {@link #close()} is not impacted since
* the consumer will send an explicit message to the server to leave the group and cause an immediate
* rebalance.</li>
* <li><code>max.poll.records</code>: Processing time in the poll loop is typically proportional to the number
* of records processed, so it's natural to want to set a limit on the number of records handled at once.
* This setting provides that. By default, there is essentially no limit.</li>
* </ol>
* <p>
* For use cases where message processing time varies unpredictably, neither of these options may be viable.
* The recommended way to handle these cases is to move message processing to another thread, which allows
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it correct that another downside of this approach is that the foreground thread could end up buffering too much data as we have no way to apply backpressure?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ijuma I think this question is addressed below -- pausing partitions in the network thread effectively is backpressure and turns the network thread into a pure heartbeating thread while processing is being performed. You can also, of course, choose to buffer as much or as little as you want by adjusting when you decide to pause the collection.

I'd say the current docs explain this well enough, though I think a few code examples (in the somewhat defunct examples jar) would be the most helpful way to show how to make this work in practice.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense @ewencp, not sure how I missed that sentence when I read it originally.

* the consumer to continue sending heartbeats while the processor is still working. Some care must be taken
* to ensure that committed offsets do not get ahead of the actual position. Typically, you must disable automatic
* commits and manually commit processed offsets for records only after the thread has finished handling them
* (depending on the delivery semantics you need). Note also that you will generally need to {@link #pause(Collection)}
* the partition so that no new records are received from poll until after thread has finished handling those
* previously returned.
*
* <h3>Usage Examples</h3>
* The consumer APIs offer flexibility to cover a variety of consumption use cases. Here are some examples to
* demonstrate how to use them.
Expand Down