Skip to content

KAFKA-12226: Commit source task offsets without blocking on batch delivery#11323

Merged
rhauch merged 15 commits intoapache:trunkfrom
C0urante:kafka-12226
Nov 7, 2021
Merged

KAFKA-12226: Commit source task offsets without blocking on batch delivery#11323
rhauch merged 15 commits intoapache:trunkfrom
C0urante:kafka-12226

Conversation

@C0urante
Copy link
Copy Markdown
Contributor

@C0urante C0urante commented Sep 13, 2021

Jira

Replaces #10112

Replaces the current batch-based logic for offset commits with a dynamic, non-blocking approach outlined in discussion on #10112 here, here, here, here, and here.

Essentially, a deque is kept for every source partition that a source task produces records for, and each element in that deque is a SubmittedRecord with a flag to track whether the producer has ack'd the delivery of that source record to Kafka yet. Periodically, the worker (on the same thread that polls the source task for records and transforms, converts, and dispatches them to the producer) polls acknowledged elements from the beginning of each of these deques and collects the latest offsets from these elements, storing them in a snapshot that is then committed on the separate source task offset thread.

The behavior of the offset.flush.timeout.ms property is retained, but essentially now only applies to the actual writing of offset data to the internal offsets topic (if running in distributed mode) or the offsets file (if running in standalone mode). No time is spent during WorkerSourceTask::commitOffsets blocking on the acknowledgment of records by the producer.

It's possible that memory exhaustion may occur if, for example, a single Kafka partition is offline for an extended period. In cases like this, the collection of deques in the SubmittedRecords class may continue to grow indefinitely until the partition comes back online and the SubmittedRecords in those deques that targeted the formerly-offline Kafka partition are acknowledged and can be removed. Although this may be suboptimal, it is no worse than the existing behavior of the framework in these cases.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@C0urante
Copy link
Copy Markdown
Contributor Author

CC @rhauch; hopefully this is fairly close to what you had in mind.

Copy link
Copy Markdown
Contributor

@rhauch rhauch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice job, @C0urante. Overall this is exactly along the lines of what I was suggesting, and I even like some improvements you made, like calling updateCommittableOffsets() from the WorkerSourceTask.execute() method.

Most of my comments are minor nits to clarify/improve phrasing in JavaDoc, or to expand the unit tests a bit. One comment/question is about saving some effort when there are no offsets.

Otherwise, looks great and almost ready to merge.

C0urante and others added 6 commits October 1, 2021 12:28
…/SubmittedRecords.java

Co-authored-by: Randall Hauch <rhauch@gmail.com>
…/SubmittedRecords.java

Co-authored-by: Randall Hauch <rhauch@gmail.com>
…/SubmittedRecords.java

Co-authored-by: Randall Hauch <rhauch@gmail.com>
…/SubmittedRecords.java

Co-authored-by: Randall Hauch <rhauch@gmail.com>
…/SubmittedRecords.java

Co-authored-by: Randall Hauch <rhauch@gmail.com>
@C0urante
Copy link
Copy Markdown
Contributor Author

C0urante commented Oct 1, 2021

Thanks @rhauch, I've addressed all of the comments that seemed straightforward and left a response on the ones where a bit of discussion seems warranted before making changes. This is ready for another round.

Copy link
Copy Markdown
Contributor

@rhauch rhauch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @C0urante. This looks really good. I have a few more minor suggestions and a few questions.

}

maybeThrowProducerSendException();
updateCommittableOffsets();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I now have a question: why did you choose to add it before the poll() (a few lines down) rather than after, perhaps after the if (!sendRecords()l) {...} block below?

The reason I ask is that if one loop of the while polls for records and sends them (where they are sent to the producer and asynchronously acked), but then the connector is paused about the same time, then the offsets for those records will not be committed until after the connector is resumed above. Is that intentional?

C0urante and others added 2 commits October 11, 2021 14:05
…/SubmittedRecords.java

Co-authored-by: Randall Hauch <rhauch@gmail.com>
@C0urante
Copy link
Copy Markdown
Contributor Author

Thanks @rhauch. Ready for another round when you have time.

Copy link
Copy Markdown
Contributor

@rhauch rhauch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @C0urante. Just a few questions below.

Comment on lines +251 to +253

updateCommittableOffsets();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, maybe I wasn't clear in my [previous comment about this call[(https://github.com//pull/11323#discussion_r724437761). I think there is an edge case here that we could deal with a bit better. Consider the following scenario as we walk through the loop in execute(). Th eWorkerSourceTask is not paused, and has been sending and committing offsets for records.

On some pass through the execute() while loop:

  1. shouldPause() returns false
  2. maybeThrowProducerSendException() does nothing since no exception was set from the producer
  3. poll() is called to get new records from the source task;
  4. updateCommittableOffsets() is called to update the committableOffsets map for any records sent in previous loops that have been acked
  5. sendRecords() is called with the records retrieved in step 3 earlier in this same pass, which for each of these new records enqueues a SubmittedRecord and calls producer.send(...) on each record with a callback that acks the submitted record.

But just after step 1 in the aforementioned pass, the connector and its tasks are paused. This means that the next pass through the WorkerSourceTask.execute() while loop:

  1. shouldPause() returns true, so
  2. onPause() is called and awaitUnpause() is called.

At that point, the thread blocks. But the records that were send to the producer in step 5 of the previous pass may have already been acked, meaning we could have update the offsets just before we paused. That might not have been enough time for all of the records submitted in that step to be acked, but if we were to move the updateCommittableOffsets() to just before the if (shouldPause()) check then we will get the offsets for as many acked records as possible just before the thread will pause.

In all other non-paused scenarios, I'm not sure it matters where in this loop we call updateCommittableOffsets(). But for the just-been-paused scenario, moving it to the first (or last) operation in the loop gives us a bit more of a chance to commit the offsets for as many acked records as possible.

WDYT?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh, sorry. Your initial point was very clear, although I really appreciate the detailed writeup here. It was an implementation snafu. I wanted to handle the case where poll produced no records, which meant invoking updateCommittableOffsets before the if (toSend == null) continue; section. Of course, that didn't actually address the original concern, which is that we may miss a chance to update offsets for records just-dispatched to the producer in sendRecords.

I like the idea of placing updateCommittableOffsets right before the if (shouldPause()) check, at the top of the loop; will do.

@@ -378,7 +370,7 @@ private boolean sendRecords() {
log.trace("{} Failed record: {}", WorkerSourceTask.this, preTransformRecord);
producerSendException.compareAndSet(null, e);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're not calling submittedRecords.removeLastOccurrence(submittedRecord) here. Were you thinking that we're setting the producerSendException, which will cause the execute() method to throw this same exception on the next pass and consequently fail the task?

I think that's the right choice and no changes are required, but I do need to work through it. So pardon my thought process here.

The question is: what happens to records (and SubmittedRecord objects and their offsets) that appear after the record that resulted in the asynchronous exception?

What happens depends on what the producer behavior is, or might be in the future. IIRC the exceptions will often be unrecoverable, but it is possible that records could be sent successfully even if they were submitted to the producer after the record that failed, especially when those records were sent to a different topic partition and were actually sent by the producer before the record that failed. After all, from the producer.send() JavaDoc:

Callbacks for records being sent to the same partition are guaranteed to execute in order.

Unfortunately, we cannot infer a relationship between the topic partition for a record and its source partition. So any subsequent records that were sent to a different topic partition could still have the same source partition, and thus they should be enqueued into the same deque. Those offsets would not be committed, since their SubmittedRecord instances are after the SubmittedRecord for the record that failed to send, and the latter would never be acked (as its send failed).

But if any subsequent records were sent to a different topic partition but had a different source partition, their SubmittedRecord instances would be in a different deque than the SubmittedRecord for the record that failed to send, and their offsets could potentially be committed.

If the committed offsets were moved as suggested in a separate thread above, we'd actually get a chance to commit offsets for acked source records before failing the task. It's not super essential, but it'd be good to commit the offsets for as many of those submitted-and-acked records as possible.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So any subsequent records that were sent to a different topic partition could still have the same source partition, and thus they should be enqueued into the same deque. Those offsets would not be committed, since their SubmittedRecord instances are after the SubmittedRecord for the record that failed to send, and the latter would never be acked (as its send failed).

I think this is the "vital" section and it provides a good rationale for why we intentionally keep the failed record in the queue.

If the committed offsets were moved as suggested in a separate thread above, we'd actually get a chance to commit offsets for acked source records before failing the task. It's not super essential, but it'd be good to commit the offsets for as many of those submitted-and-acked records as possible.

We call commitOffsets in a finally block for execute right now. I think we can address this case by adding another call to updateCommittableOffsets right before this end-of-life call to commitOffsets. I've done this; LMKWYT.

C0urante and others added 2 commits October 13, 2021 09:15
…/SubmittedRecords.java

Co-authored-by: Randall Hauch <rhauch@gmail.com>
Copy link
Copy Markdown
Contributor

@kkonstantine kkonstantine left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice improvement in a part of the code that seemed to really need some modernization. I have one comment regarding the use of multiple deques vs single deque.

Comment on lines +104 to +115
public Map<Map<String, Object>, Map<String, Object>> committableOffsets() {
Map<Map<String, Object>, Map<String, Object>> result = new HashMap<>();
records.forEach((partition, queuedRecords) -> {
if (canCommitHead(queuedRecords)) {
Map<String, Object> offset = committableOffset(queuedRecords);
result.put(partition, offset);
}
});
// Clear out all empty deques from the map to keep it from growing indefinitely
records.values().removeIf(Deque::isEmpty);
return result;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@C0urante, right now we have no visibility into the number or size of deques. We can't add a metric without a KIP, but WDYT about adding some DEBUG and/or TRACE log messages here? The benefit of here rather than in the WorderSourceTask is that it would be much easier to enable DEBUG or TRACE for only these log messages. One disadvantage is that this committableOffsets() method is calls once per iteration in the WorkerSourceTask.execute() method.

I guess an alternative might be to add a method (e.g., toString()?) that output this information, and then put the log messages in WorkerSinkTask.commitOffsets().

Thoughts?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with your concerns about excess logging if a message is added to the WorkerSourceTask::execute loop.

Since we're removing this log message in this PR, I wonder if we can replace it with something similar? I think users may want to know how many total pending (i.e., unacked) messages there are, how many deques there are, and the number of messages in the largest deque (which may be useful for identifying "stuck" topic partitions).

I'll take a shot at this; LMKWYT.

Copy link
Copy Markdown
Contributor

@rhauch rhauch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea with making Pending be a memento of the offset, and with calling out the existing log message that we should replace. A few suggestions below to hopefully simplify things even more.

Comment on lines +491 to +499
log.info("There are currently {} pending messages spread across {} source partitions whose offsets will not be committed. "
+ "The source partition with the most pending messages is {}, with {} pending messages",
pendingMetadataForCommit.totalPendingMessages(),
pendingMetadataForCommit.numDeques(),
pendingMetadataForCommit.largestDequePartition(),
pendingMetadataForCommit.largestDequeSize()
);
} else {
log.info("There are currently no pending messages for this offset commit; all messages since the last commit have been acknowledged");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you point out, the old log message was:

 log.info("{} flushing {} outstanding messages for offset commit", this, outstandingMessages.size());

This log message had two things it'd be nice to keep:

  1. this as the context; and
  2. the number of records whose offsets were being committed (e.g., the number of acked records).

I think both would be good to include, especially if we're saying the number of records whose offsets are not being committed (yet).

The Pending class seems pretty useful, but computing the number of acked records is not possible here. WDYT about merging the SumittedRecords.committableOffsets() and pending() methods, by having the former return an object that contains the offset map and the metadata that can be used for logging? This class would be like Pending, though maybe CommittableOffsets is a more apt name. Plus, WorkerSourceTask would only have one volatile field that is updated atomically.

Copy link
Copy Markdown
Contributor Author

@C0urante C0urante Nov 2, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍   SGTM. I've updated the PR accordingly.

One nit: the "flushing outstanding messages for offset commit" message actually refers to the number of unacked messages in the current batch and not the number of acknowledged messages for which offsets will be committed; this has tripped up many of my colleagues who see "flushing 0 outstanding messages" and think their source connector isn't producing any data when all it really means is that its producers are keeping up with the throughput of its tasks very well.

I think both pieces of information (number of acked and unacked messages) are useful here so I've included both in the latest draft.

Copy link
Copy Markdown
Contributor

@rhauch rhauch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @C0urante, for the recent improvements to logging, and for this PR. Everything else looks great.

@rhauch rhauch merged commit c1bdfa1 into apache:trunk Nov 7, 2021
@C0urante C0urante deleted the kafka-12226 branch November 9, 2021 20:58
stanislavkozlovski added a commit to stanislavkozlovski/kafka that referenced this pull request Nov 11, 2021
…ntegration-11-nov

* ak/trunk: (15 commits)
  KAFKA-13429: ignore bin on new modules (apache#11415)
  KAFKA-12648: introduce TopologyConfig and TaskConfig for topology-level overrides (apache#11272)
  KAFKA-12487: Add support for cooperative consumer protocol with sink connectors (apache#10563)
  MINOR: Log client disconnect events at INFO level (apache#11449)
  MINOR: Remove topic null check from `TopicIdPartition` and adjust constructor order (apache#11403)
  KAFKA-13417; Ensure dynamic reconfigurations set old config properly (apache#11448)
  MINOR: Adding a constant to denote UNKNOWN leader in LeaderAndEpoch (apache#11477)
  KAFKA-10543: Convert KTable joins to new PAPI (apache#11412)
  KAFKA-12226: Commit source task offsets without blocking on batch delivery (apache#11323)
  KAFKA-13396: Allow create topic without partition/replicaFactor (apache#11429)
  ...
rhauch pushed a commit that referenced this pull request Nov 15, 2021
…ivery (#11323)

Replaces the current logic for committing source offsets, which is batch-based and blocks until the entirety of the current batch is fully written to and acknowledged by the broker, with a new non-blocking approach that commits source offsets for source records that have been "fully written" by the producer. The new logic consider a record fully written only if that source record and all records before it with the same source partition have all been written to Kafka and acknowledged.

This new logic uses a deque for every source partition that a source task produces records for. Each element in that deque is a SubmittedRecord with a flag to track whether the producer has ack'd the delivery of that source record to Kafka. Periodically, the worker (on the same thread that polls the source task for records and transforms, converts, and dispatches them to the producer) polls acknowledged elements from the beginning of each of these deques and collects the latest offsets from these elements, storing them in a snapshot that is then committed on the separate source task offset thread.

The behavior of the `offset.flush.timeout.ms property` is retained, but essentially now only applies to the actual writing of offset data to the internal offsets topic (if running in distributed mode) or the offsets file (if running in standalone mode). No time is spent during `WorkerSourceTask::commitOffsets` waiting on the acknowledgment of records by the producer.

This behavior also does not change how the records are dispatched to the producer nor how the producer sends or batches those records.

It's possible that memory exhaustion may occur if, for example, a single Kafka partition is offline for an extended period. In cases like this, the collection of deques in the SubmittedRecords class may continue to grow indefinitely until the partition comes back online and the SubmittedRecords in those deques that targeted the formerly-offline Kafka partition are acknowledged and can be removed. Although this may be suboptimal, it is no worse than the existing behavior of the framework in these cases.

Author: Chris Egerton <chrise@confluent.io>
Reviewed: Randall Hauch <rhauch@gmail.com>
rhauch pushed a commit that referenced this pull request Nov 15, 2021
…ivery (#11323)

Replaces the current logic for committing source offsets, which is batch-based and blocks until the entirety of the current batch is fully written to and acknowledged by the broker, with a new non-blocking approach that commits source offsets for source records that have been "fully written" by the producer. The new logic consider a record fully written only if that source record and all records before it with the same source partition have all been written to Kafka and acknowledged.

This new logic uses a deque for every source partition that a source task produces records for. Each element in that deque is a SubmittedRecord with a flag to track whether the producer has ack'd the delivery of that source record to Kafka. Periodically, the worker (on the same thread that polls the source task for records and transforms, converts, and dispatches them to the producer) polls acknowledged elements from the beginning of each of these deques and collects the latest offsets from these elements, storing them in a snapshot that is then committed on the separate source task offset thread.

The behavior of the `offset.flush.timeout.ms property` is retained, but essentially now only applies to the actual writing of offset data to the internal offsets topic (if running in distributed mode) or the offsets file (if running in standalone mode). No time is spent during `WorkerSourceTask::commitOffsets` waiting on the acknowledgment of records by the producer.

This behavior also does not change how the records are dispatched to the producer nor how the producer sends or batches those records.

It's possible that memory exhaustion may occur if, for example, a single Kafka partition is offline for an extended period. In cases like this, the collection of deques in the SubmittedRecords class may continue to grow indefinitely until the partition comes back online and the SubmittedRecords in those deques that targeted the formerly-offline Kafka partition are acknowledged and can be removed. Although this may be suboptimal, it is no worse than the existing behavior of the framework in these cases.

Author: Chris Egerton <chrise@confluent.io>
Reviewed: Randall Hauch <rhauch@gmail.com>
xdgrulez pushed a commit to xdgrulez/kafka that referenced this pull request Dec 22, 2021
…ivery (apache#11323)

Replaces the current logic for committing source offsets, which is batch-based and blocks until the entirety of the current batch is fully written to and acknowledged by the broker, with a new non-blocking approach that commits source offsets for source records that have been "fully written" by the producer. The new logic consider a record fully written only if that source record and all records before it with the same source partition have all been written to Kafka and acknowledged.

This new logic uses a deque for every source partition that a source task produces records for. Each element in that deque is a SubmittedRecord with a flag to track whether the producer has ack'd the delivery of that source record to Kafka. Periodically, the worker (on the same thread that polls the source task for records and transforms, converts, and dispatches them to the producer) polls acknowledged elements from the beginning of each of these deques and collects the latest offsets from these elements, storing them in a snapshot that is then committed on the separate source task offset thread.

The behavior of the `offset.flush.timeout.ms property` is retained, but essentially now only applies to the actual writing of offset data to the internal offsets topic (if running in distributed mode) or the offsets file (if running in standalone mode). No time is spent during `WorkerSourceTask::commitOffsets` waiting on the acknowledgment of records by the producer.

This behavior also does not change how the records are dispatched to the producer nor how the producer sends or batches those records.

It's possible that memory exhaustion may occur if, for example, a single Kafka partition is offline for an extended period. In cases like this, the collection of deques in the SubmittedRecords class may continue to grow indefinitely until the partition comes back online and the SubmittedRecords in those deques that targeted the formerly-offline Kafka partition are acknowledged and can be removed. Although this may be suboptimal, it is no worse than the existing behavior of the framework in these cases.

Author: Chris Egerton <chrise@confluent.io>
Reviewed: Randall Hauch <rhauch@gmail.com>
lmr3796 pushed a commit to lmr3796/kafka that referenced this pull request Jun 2, 2022
…ivery (apache#11323)

Replaces the current logic for committing source offsets, which is batch-based and blocks until the entirety of the current batch is fully written to and acknowledged by the broker, with a new non-blocking approach that commits source offsets for source records that have been "fully written" by the producer. The new logic consider a record fully written only if that source record and all records before it with the same source partition have all been written to Kafka and acknowledged.

This new logic uses a deque for every source partition that a source task produces records for. Each element in that deque is a SubmittedRecord with a flag to track whether the producer has ack'd the delivery of that source record to Kafka. Periodically, the worker (on the same thread that polls the source task for records and transforms, converts, and dispatches them to the producer) polls acknowledged elements from the beginning of each of these deques and collects the latest offsets from these elements, storing them in a snapshot that is then committed on the separate source task offset thread.

The behavior of the `offset.flush.timeout.ms property` is retained, but essentially now only applies to the actual writing of offset data to the internal offsets topic (if running in distributed mode) or the offsets file (if running in standalone mode). No time is spent during `WorkerSourceTask::commitOffsets` waiting on the acknowledgment of records by the producer.

This behavior also does not change how the records are dispatched to the producer nor how the producer sends or batches those records.

It's possible that memory exhaustion may occur if, for example, a single Kafka partition is offline for an extended period. In cases like this, the collection of deques in the SubmittedRecords class may continue to grow indefinitely until the partition comes back online and the SubmittedRecords in those deques that targeted the formerly-offline Kafka partition are acknowledged and can be removed. Although this may be suboptimal, it is no worse than the existing behavior of the framework in these cases.

Author: Chris Egerton <chrise@confluent.io>
Reviewed: Randall Hauch <rhauch@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants