Skip to content

KAFKA-12226: Prevent source task offset failure when producer is overwhelmed#10112

Closed
C0urante wants to merge 4 commits intoapache:trunkfrom
C0urante:kafka-12226
Closed

KAFKA-12226: Prevent source task offset failure when producer is overwhelmed#10112
C0urante wants to merge 4 commits intoapache:trunkfrom
C0urante:kafka-12226

Conversation

@C0urante
Copy link
Copy Markdown
Contributor

Jira

When a task fails to commit offsets because all outstanding records haven't been ack'd by the broker yet, it's better to retry that same batch. Otherwise, the set of outstanding records can grow indefinitely and all subsequent offset commit attempts can fail. By retrying the same batch, it becomes possible to eventually commit offsets, even when the producer is unable to keep up with the throughput of the records provided to it by the task.

Two unit tests are added to verify this behavior.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@C0urante
Copy link
Copy Markdown
Contributor Author

@gharris1727 @tombentley @chia7712 anyone got a moment? 😃

Copy link
Copy Markdown
Contributor

@gharris1727 gharris1727 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think recordFlushPending is a much better name than flushing, these seem like good changes to me.

It's a bit tough to parse the tests, but that seems par for the course in that file. The two test you added seem to be testing very similar variants of the same scenario though, and it's very hard to pick out the difference between them. Is there a way to make the functional differences between the variants more clear to the next person to read these tests?

@C0urante
Copy link
Copy Markdown
Contributor Author

Yeah, that's fair. I found

@Test
public void testRevoke() throws TimeoutException {
revokeAndReassign(false);
}
@Test
public void testIncompleteRebalanceBeforeRevoke() throws TimeoutException {
revokeAndReassign(true);
}
public void revokeAndReassign(boolean incompleteRebalance) throws TimeoutException {
connectProtocolVersion = CONNECT_PROTOCOL_V1;
int configOffset = 1;
// Join group and get initial assignment
EasyMock.expect(member.memberId()).andStubReturn("member");
EasyMock.expect(member.currentProtocolVersion()).andStubReturn(connectProtocolVersion);
EasyMock.expect(worker.getPlugins()).andReturn(plugins);
// The lists need to be mutable because assignments might be removed
expectRebalance(configOffset, new ArrayList<>(singletonList(CONN1)), new ArrayList<>(singletonList(TASK1)));
expectPostRebalanceCatchup(SNAPSHOT);
Capture<Callback<TargetState>> onFirstStart = newCapture();
worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED), capture(onFirstStart));
PowerMock.expectLastCall().andAnswer(() -> {
onFirstStart.getValue().onCompletion(null, TargetState.STARTED);
return true;
});
member.wakeup();
PowerMock.expectLastCall();
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
worker.startTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
PowerMock.expectLastCall().andReturn(true);
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
// worker is stable with an existing set of tasks
if (incompleteRebalance) {
// Perform a partial re-balance just prior to the revocation
// bump the configOffset to trigger reading the config topic to the end
configOffset++;
expectRebalance(configOffset, Arrays.asList(), Arrays.asList());
// give it the wrong snapshot, as if we're out of sync/can't reach the broker
expectPostRebalanceCatchup(SNAPSHOT);
member.requestRejoin();
PowerMock.expectLastCall();
// tick exits early because we failed, and doesn't do the poll at the end of the method
// the worker did not startWork or reset the rebalanceResolved flag
}
// Revoke the connector in the next rebalance
expectRebalance(Arrays.asList(CONN1), Arrays.asList(),
ConnectProtocol.Assignment.NO_ERROR, configOffset, Arrays.asList(),
Arrays.asList());
if (incompleteRebalance) {
// Same as SNAPSHOT, except with an updated offset
// Allow the task to read to the end of the topic and complete the rebalance
ClusterConfigState secondSnapshot = new ClusterConfigState(
configOffset, null, Collections.singletonMap(CONN1, 3),
Collections.singletonMap(CONN1, CONN1_CONFIG), Collections.singletonMap(CONN1, TargetState.STARTED),
TASK_CONFIGS_MAP, Collections.emptySet());
expectPostRebalanceCatchup(secondSnapshot);
}
member.requestRejoin();
PowerMock.expectLastCall();
// re-assign the connector back to the same worker to ensure state was cleaned up
expectRebalance(configOffset, Arrays.asList(CONN1), Arrays.asList());
EasyMock.expect(worker.getPlugins()).andReturn(plugins);
Capture<Callback<TargetState>> onSecondStart = newCapture();
worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(),
EasyMock.anyObject(),
EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED), capture(onSecondStart));
PowerMock.expectLastCall().andAnswer(() -> {
onSecondStart.getValue().onCompletion(null, TargetState.STARTED);
return true;
});
member.wakeup();
PowerMock.expectLastCall();
EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig))
.andReturn(TASK_CONFIGS);
member.poll(EasyMock.anyInt());
PowerMock.expectLastCall();
PowerMock.replayAll();
which seems applicable here. I'll refactor the tests to use a similar approach. Thanks Greg!

Copy link
Copy Markdown
Contributor

@gharris1727 gharris1727 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @C0urante, LGTM!

@C0urante
Copy link
Copy Markdown
Contributor Author

The only consistent test failure for the last run appears unrelated to the changes here. See https://github.com/apache/kafka/pull/10140/checks?check_run_id=1917700907 and https://github.com/apache/kafka/pull/10077/checks?check_run_id=1877979511 for other instances of the same failure.

org.apache.kafka.connect.integration.InternalTopicsIntegrationTest.testCreateInternalTopicsWithDefaultSettings

org.opentest4j.AssertionFailedError: Condition not met within timeout 30000. Didn't find the topics [connect-storage-topic-connect-cluster-1, connect-config-topic-connect-cluster-1, connect-offset-topic-connect-cluster-1] ==> expected: <true> but was: <false>

Copy link
Copy Markdown
Member

@mimaison mimaison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR.
I wonder if the test could be simplified a bit as it currently looks pretty scary! This is in part caused by EasyMock.

Restarting the task is nice to ensure we recover correctly but could we try that with send() blocking, we're still able to progress with the offsets?

@C0urante
Copy link
Copy Markdown
Contributor Author

Yeah, the unit tests for the worker classes in general can be a little gnarly. The boilerplate segments (for things like setting up the converter, transformation chain, task context, headers, topic tracking, offset buffering, status tracking, and performing metrics assertions) are in line with the other tests in this class, though. The only difference with the new unit test here is that we want to test task behavior under some pretty fine-grained circumstances, which is accomplished right now by setting up latches and awaiting them to ensure that the task (which is running on a separate thread) has reached certain points in its lifecycle and not gone any further. If you have suggestions for how to improve that I'm all ears!

I'm not sure what you're referring to with task restart--that's not tested for at the moment, and any tests for that would likely be dependent on the success (or lack thereof) of any prior offset commit attempts, which are already tested for. Can you clarify or provide a brief example?

@mimaison
Copy link
Copy Markdown
Member

I did not look at the tests closely yet but I thought sourceTask.stop(); (https://github.com/apache/kafka/pull/10112/files#diff-b8da514e331b4d8bff623626ee31a45048b753911193b564a9b65f02b97f53c8R922) was a key part of the test. Maybe it doesn't!

I hope to take another look some time next week.

@C0urante
Copy link
Copy Markdown
Contributor Author

C0urante commented Feb 19, 2021

Ah, gotcha! Yeah, SourceTask::stop isn't really a key part of the functional changes here or the tests added. The idea is just to prevent high-throughput source tasks whose producers are unable to keep up with them from entering a death spiral where they stop being able to commit offsets completely.

This is because right now, when an offset commit attempt fails because the current batch of source records wasn't flushed to Kafka in time, all backlogged records that were read from SourceTask::poll during that time get added to the batch, which just compounds the problem.

For example:

  1. The source task generates 10000 records and the worker starts sending those to Kafka.
  2. An offset commit is triggered on a separate thread and the worker waits for up to offset.flush.timeout.ms milliseconds for all of those records to be ack'd by the broker. In the meantime, on the main thread, the worker continues polling the source task for data.
  3. The worker times out while waiting the original batch of 10000 records to be flushed; let's say that 5000 managed to be written successfully but 5000 remain unacknowledged. Additionally, during this time, the worker managed to poll an additional 1000 records from the task.

At this point, the current behavior is:

  1. Abort the offset commit, and start a new batch consisting of the 5000 unacknowledged records from the previous batch and the 1000 records polled from the task during the failed offset commit attempt.
  2. Continue adding newly-polled records to that batch until the next offset commit attempt is triggered.

If the task is generating a steady throughput of 10000 records per offset commit attempt, and the worker's producer is only able to write 5000 of those before the offset commit attempt times out, the worker will never be able to successfully commit offsets for the task, even though there are plenty of records that have been sent to and ack'd by the broker.

The proposed behavior in the PR is:

  1. Abort the offset commit, and keep the old batch of the 5000 unacknowledged records. Add the 1000 records polled during the failed offset commit attempt to a backlog.
  2. Continue adding newly-polled records from the task to that backlog.
  3. On the next offset commit attempt, only wait to flush out the records from the active batch (i.e., the 5000 unacknowledged records), and only write offsets for that batch.
  4. If successful, use the backlog of records as the new batch of records. Otherwise, keep the same batch of records and continue adding newly-polled records to the backlog.

private boolean flushing;
private boolean recordFlushPending;
private boolean offsetFlushPending;
private CountDownLatch stopRequestedLatch;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: while we're at it, this could be final

boolean flushStarted = offsetWriter.beginFlush();
// No need to begin a new offset flush if we timed out waiting for records to be flushed to
// Kafka in a prior attempt.
if (!recordFlushPending) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand it correctly, the main difference in this patch is that we no longer fail the flush if the messages cannot be drained quickly enough from outstandingMessages. A few questions come to mind:

  1. Is the flush timeout still a useful configuration? Was it ever? Even if we timeout, we still have to wait for the records that were sent to the producer.
  2. While we are waiting for outstandingMessages to be drained, we are still accumulating messages in outstandingMessagesBacklog. I imagine we can get into a pattern here once we fill up the accumulator. While we're waiting for outstandingMessages to complete, we fill outstandingMessagesBacklog. Once the flush completes, outstandingMessagesBacklog becomes outstandingMessages and we are stuck waiting again. Could this prevent us from satisfying the commit interval?

Overall, I can't shake the feeling that this logic is more complicated than necessary. Why do we need the concept of flushing at all? It would be more intuitive to just commit whatever the latest offsets are. Note that we do not use outstandingMessages for the purpose of retries. Once a request has been handed off to the producer successfully, we rely on the producer to handle retries. Any delivery failure after that is treated as fatal. So then does oustandingMessages serve any other purpose other than tracking flushing? I am probably missing something here. It has been a long time since I reviewed this logic.

Copy link
Copy Markdown
Contributor Author

@C0urante C0urante Feb 24, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I think it's a necessary evil, since source task offset commits are conducted on a single thread. Without a timeout for offset commits, a single task could block indefinitely and disable offset commits for all other tasks on the cluster.
  2. This is definitely possible; I think the only saving grace here is that the combined sizes of the outstandingMessages and outstandingMessagesBacklog fields is going to be naturally throttled by the producer's buffer. If too many records are accumulated, the call to Producer::send will block synchronously until space is freed up, at which point, the worker can continue polling the task for new records. This isn't ideal as it will essentially cause the producer's entire buffer to be occupied until the throughput of record production from the task decreases and/or the write throughput of the producer rises to meet it, but it at least establishes an upper bound for how large a single batch of records in the oustandingMessages field ever gets. It may take several offset commit attempts for all of the records in that batch to be ack'd, with all but the last (successful) attempt timing out and failing, but forward progress with offset commits should still be possible.

I share your feelings about the complexity here. I think ultimately it arises from two constraints:

  1. A worker-global producer is used to write source offsets to the internal offsets topic right now. Although this doesn't necessarily require the single-threaded logic for offset commits mentioned above, things become simpler with it.
  2. (Please correct me if I'm wrong on this point; my core knowledge is a little fuzzy and maybe there are stronger guarantees than I'm aware of) Out-of-order acknowledgment of records makes tracking the latest offset for a given source partition a little less trivial than it seems initially. For example, if a task produces two records with the same source partition that end up being delivered to different topic-partitions, the second record may be ack'd before the first, and when it comes time for offset commit, the framework would have to refrain from committing offsets for that second record until the first is also ack'd.

I don't think either of these points make it impossible to add even more-fine-grained offset commit behavior and/or remove offset commit timeouts, but the work involved would be a fair amount heavier than this relatively-minor patch. If you'd prefer to see something along those lines, could we consider merging this patch for the moment and perform a more serious overhaul of the source task offset commit logic as a follow-up, possibly with a small design discussion on a Jira ticket to make sure there's alignment on the new behavior?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Please correct me if I'm wrong on this point; my core knowledge is a little fuzzy and maybe there are stronger guarantees than I'm aware of) Out-of-order acknowledgment of records makes tracking the latest offset for a given source partition a little less trivial than it seems initially. For example, if a task produces two records with the same source partition that end up being delivered to different topic-partitions, the second record may be ack'd before the first, and when it comes time for offset commit, the framework would have to refrain from committing offsets for that second record until the first is also ack'd.

Ok, that rings a bell. I think I see how the logic works now and I don't see an obvious way to make it simpler. Doing something finer-grained as you said might be the way to go. Anyway, I agree this is something to save for a follow-up improvement.

I think it's a necessary evil, since source task offset commits are conducted on a single thread. Without a timeout for offset commits, a single task could block indefinitely and disable offset commits for all other tasks on the cluster.

Hmm.. This is suspicious. Why do we need to block the executor while we wait for the flush? Would it be simpler to let the worker source task finish the flush and the offset commit in its own event thread? We end up blocking the event thread anyway because of the need to do it under the lock.

Copy link
Copy Markdown
Contributor Author

@C0urante C0urante Feb 25, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We end up blocking the event thread anyway because of the need to do it under the lock.

I think we actually keep polling the task for records during the offset commit, which is the entire reason we have the outstandingMessagesBacklog field. Without it, we'd just add everything to outstandingMessages knowing that, if we've made it to the point of adding a record to that collection, we're not in the process of committing offset, right?

Concretely, we can see that the offset thread relinquishes the lock on the WorkerSourceTask instance while waiting for outstanding messages to be ack'd.

I'm not sure we need to perform offset commits on a separate thread, but it is in line with what we do for sink tasks, where we leverage the Consumer::commitAsync method.

If we want to consider making offset commit synchronous (which is likely going to happen anyways when transactional writes for exactly-once source are introduced), that also might be worth a follow-up. The biggest problem I can think of with that approach would be that a single offline topic-partition would block up the entire task thread when it comes time for offset commit. If we keep the timeout for offset commit, then that'd limit the fallout and allow us to resume polling new records from the task and dispatching them to the producer after the commit attempt timed out. However, there'd still be a non-negligible throughput hit (especially for workers configured with higher offset timeouts).

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's mostly the flushing that concerns me, not really the offset commit. I don't think we need to make it synchronous, just that it seems silly to block that shared scheduler to complete it. My thought instead was to let the scheduler trigger the flush, but then let the task be responsible for waiting for its completion. While waiting, of course, it can continue writing to outstandingMessagesBacklog. So I don't think there should be any issue from a throughput perspective.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been ruminating over this for a few days and I think it should be possible to make task offset commits independent of each other by changing the source task offset commit scheduler to use a multi-threaded executor instead of a global single-threaded executor for all tasks. This isn't quite the same thing as what you're proposing since tasks would still not be responsible for waiting for flush completion (the offset scheduler's threads would be), but it's a smaller change and as far as I can tell, the potential downsides only really amount to a few extra threads being created.

The usage of scheduleWithFixedDelay already ensures that two offset commits for the same task won't be active at the same time, as it "Creates and executes a periodic action that becomes enabled first after the given initial delay, and subsequently with the given delay between the termination of one execution and the commencement of the next."

Beyond that, the only concern that comes to mind is potential races caused by concurrent access of the offset backing store and its underlying resources.

In distributed mode, the KafkaOffsetBackingStore and its usage of the underlying KafkaBasedLog appear to be thread-safe as everything basically boils down to calls to Producer::send, which should be fine.

In standalone mode, the MemoryOffsetBackingStore handles all writes/reads of the local offsets file via a single-threaded executor, so concurrent calls to MemoryOffsetBackingStore::set should also be fine.

Granted, none of this addresses your original concern, which is whether an offset commit timeout is necessary at all. In response to that, I think we may also want to revisit the offset commit logic and possibly do away with a timeout altogether. In sink tasks, for example, offset commit timeouts are almost a cosmetic feature at this point and are really only useful for metrics tracking. However, at the moment it's actually been pretty useful to us to monitor source task offset commit success/failure JMX metrics as a means of tracking overall task health. We might be able to make up the difference by relying on metrics for the number of active records, but it's probably not safe to make that assumption for all users, especially for what is intended to be a bug fix. So, if possible, I'd like to leave a lot of the offset commit logic intact as it is for the moment and try to keep the changes here minimal.

To summarize: I'd like to proceed by keeping the currently-proposed changes, and changing the source task offset committer to use a multi-threaded executor instead of a single-threaded executor. I can file a follow-up ticket to track improvements in offset commit logic (definitely for source tasks, and possibly for sinks) and we can look into that if it becomes a problem in the future. What do you think?

@C0urante C0urante closed this Jul 17, 2021
@C0urante C0urante deleted the kafka-12226 branch July 17, 2021 18:50
@C0urante C0urante restored the kafka-12226 branch August 31, 2021 21:18
@C0urante C0urante reopened this Aug 31, 2021
Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had another look at this PR. Here is my understanding:

In the old logic, if we could not flush pending records before the expiration of offset.flush.timeout.ms, then we would give up and try again later. The problem with this is that we may reach a point where the producer has built a big enough backlog of outstanding messages that they cannot be flushed before the expiration of offset.flush.timeout.ms. Basically we are taking records from the connector faster than they can be flushed. And if we ever reach this state, the connector is dead in the water because we are not able to commit offsets any more. So its progress appears to stall even though the data is still being copied.

The patch gets around the problem by relaxing offset.flush.timeout.ms a little bit. Rather than treating expiration of records as a fatal error, we continue to allow more time for outstandingMessages to be drained. This ensures that we do not have to wait for the messages from outstandingMessagesBacklog which are added while the flush is in progress.

Assuming my understanding is right, the only concern I have with this patch is the following. Ultimately this issue comes down to a slow producer which is not keeping up with the connector. When we begin a flush, we have to drain all of the outstanding data before we can commit offsets. For a slow producer, this could take a very long time (even weeks given delivery.timeout.ms of Int.MaxValue). We are eventually able to make progress, but users may still see progress indefinitely stalled. A good fix here I think would either prevent the backlog from reaching this point in the first place, or it would make offset commits more of an asynchronous process which does not depend on flushing all pending data. Intuitively, you expect the worker to commit whatever its progress is regularly without respect to the speed of the producer. A slow worker still goes slow, but at least users can track its progress. My understanding of the semantics here is a bit limited, so I do not know if this is possible.

The issues we have seen related to this issue came about from one slow broker or partition. This is a bad scenario for the producer because the pending data for the slow broker can exhaust the whole buffer. This effectively slows down every other partition since we constantly have to wait for room to free up in the buffer. I think it would be interesting to consider improvements to the partitioning logic in the producer to take into account the size of the pending data. The producer could then compensate for a slow broker by writing less data to it. On the other hand, this would cause data imbalances which might have downstream effects, so it might not be a clear-cut win.

Anyway, I am not very familiar with this logic, so I am hoping for additional reviews from @mimaison , @kkonstantine , and @rhauch to push this patch through. If you folks are happy with it, please do not wait for me.

@rhauch
Copy link
Copy Markdown
Contributor

rhauch commented Sep 1, 2021

First of all, thanks for trying to fix this issue, @C0urante.

And thanks for your insight, @hachikuji. I agree that it seems like we should not have to block the offset commits until the full batch of records has been written to Kafka.

I suspect the current logic was written this way because it's the simplest thing to do, given that the source partition map and offset map in the source records are opaque, meaning we can't sort them and have to instead rely upon the order of the source records returned by the connector. And because the producer can make progress writing to some topic partitions while not making progress on others, it's possible that some records in a batch are written before earlier records in the same batch.

The bottom line is that we have to track offsets that can be committed using only the order of the records that were generated by the source task. The current logic simply blocks committing offsets until each "batch" of records is completely flushed. That way we can commit all of the offsets in the batch together, and let the offset writer rely upon ordering to use only the latest offset map for each partition map when we tell it to flush.

But, flushing offsets requires synchronization, and the current logic switches between the outstandingMessages and outstandingMessagesBacklog buffers to track the "batches" of records that have to complete for offset commits. It's really sort of a mess.

@hachikuji wrote:

The patch gets around the problem by relaxing offset.flush.timeout.ms a little bit. Rather than treating expiration of records as a fatal error, we continue to allow more time for outstandingMessages to be drained. This ensures that we do not have to wait for the messages from outstandingMessagesBacklog which are added while the flush is in progress.

That's my understanding, too. And maybe I don't grasp the subtleties of the fix, but it seems like the fix won't necessarily help when a producer is consistently slow. In such cases, the outstandingMessages will fill with the records sent to the producer since the previous commit offsets, and as soon as we start committing offsets all records then get added to outstandingMessagesBacklog. If the producer writes records significantly slower than the source task generates them, then outstandingMessagesBacklog could be larger than outstandingMessages by the time the offsets for outstandingMessages are finally committed, especially if we're blocking offset commits even longer with this change. So while we're able to eventually commit those first offsets, if the backlog is larger then it will likely take longer for the producer to flush those records than it took the producer to flush the first batch. The offset commit thread remains blocked for longer and longer periods of time.

Fortunately, we do have back pressure to not let this get this too out of control: when the producer's buffer fills up, the worker source task's thread will block (up to max.block.ms) on calls to producer.send(...), and the worker source task will retry any sends that fail after that timeout. And since this same thread that calls poll(), the worker source task will eventually slow calls to poll().

But I think we can change how offsets are flushed such that we don't have to wait for the producer, and instead we can simply flush the latest offsets for records that have been successfully written at that point. We just need a different mechanism (other than the two "outstanding" lists and the flush-related flags) to track the offsets for the most recently written records.

One way to do that is to use a single concurrent queue that bookkeeps records in the same order as generated by the source task, but in a way that allows us to track which records have been acked and tolerates those records being acked in any order.

For example, we could replace the outstandingMessages and outstandingMessagesBacklog fields in WorkerSourceTask with something like this:

   private final Queue<SubmittedRecord> submittedRecords = new ConcurrentLinkedQueue<>();

An element is appended to this queue just before the record is sent to the producer, and the SubmittedRecord class allows us to track which of these records has been acknowledged:

    protected static class SubmittedRecord {
        private final SourceRecord record;
        private final AtomicBoolean acked = new AtomicBoolean();
        public SubmittedRecord(SourceRecord sourceRecord) {
            record = Objects.requireNonNull(sourceRecord);
        }
        public void acknowledge() {
            acked.set(true);
        }
        public boolean isAcknowledged() {
            return acked.get();
        }
        public SourceRecord record() {
            return record;
        }
    }

and where acknowledge() is called from the producer callback and the commitOffsets() method can safely call isAcknowledged() and record() from the commit thread. The sendRecords() method would add a SubmittedRecord to the end of the queue for each record that will be sent to the producer:

    private boolean sendRecords() {
        ...
        for (final SourceRecord preTransformRecord : toSend) {
            ...
            SubmittedRecord submittedRecord = new SubmittedRecord(record);
            if (!submittedRecords.offer(submittedRecord)) {
                // If a blocking queue, then retry using the existing mechanism in WorkerSourceTask
                log.warn("{} Failed to add record to buffer. Backing off before retrying", this);
                toSend = toSend.subList(processed, toSend.size());
                lastSendFailed = true;
                counter.retryRemaining();
                return false;
            }
            ...

and then have the producer callback call the SubmittedRecord.acknowledge() method:

            try {
                ...
                producer.send(
                    producerRecord,
                    (recordMetadata, e) -> {
                        if (e != null) {
                            ...
                        } else {
                            submittedRecord.acknowledge();
                            ...
                        }
                    });

This effectively replaces the outstandingMessages, outstandingMessagesBacklog and flushing flag, and it simplifying the logic in the sendRecords() to not have to know which of those to use.

Then here's the big change: in commitOffsets(), we can dequeue all acked records, then take the snapshot of offsets, and immediately flush offsets without waiting for the producer. And by using a concurrent queue, we don't even need to synchronize between the sendRecords() method adding to the back of the queue and the commitOffsets() pulling from the front of the queue.

    public boolean commitOffsets() {
        ...
        // Dequeue all submitted records that have been
        while (!submittedRecords.isEmpty()) {
            SubmittedRecord next = submittedRecords.peek();
            if (!next.isAcknowledged()) {
                // This record is not yet acknowledge, so we can't continue processing any more offsets
                break;
            }
            submittedRecords.poll();
            // The record is acknowledged, so add the offsets to the offset writer
            // Offsets are converted & serialized in the OffsetWriter
            SourceRecord record = next.record();
            offsetWriter.offset(record.sourcePartition(), record.sourceOffset());
        }
        ...
        synchronized (this) {
            boolean flushStarted = offsetWriter.beginFlush();
            if (!flushStarted) {
                ...
            }
        }

        // Now we can actually flush the offsets to user storage.
        Future<Void> flushFuture = offsetWriter.doFlush((error, result) -> {
            if (error != null) {
                log.error("{} Failed to flush offsets to storage: ", WorkerSourceTask.this, error);
            } else {
                log.trace("{} Finished flushing offsets to storage", WorkerSourceTask.this);
            }
        });
        ...
    }

I've shown the snippet above using a non-blocking queue of unlimited size. I think we could do this because the existing WorkerSourceTask logic already handles the possibility that the producer.send(...) blocks when its buffer is full, up to max.block.ms before throwing a retriable exception, and then retrying the send if needed. Since this happens on the same thread that calls SourceTask.poll(), this existing logic already has the backpressure that is based upon the producer setting and that prevents the source task getting too far ahead of the producer.

Alternatively, we could use a blocking queue, but this would require an additional worker configuration, which is not ideal and can't be backported.

@C0urante, WDYT?

@rhauch
Copy link
Copy Markdown
Contributor

rhauch commented Sep 1, 2021

My previous suggestion simply dequeues all completed records until an unacked record is found. This is really straightforward, but we could try to do better.

We could dequeue all records except those that have a source partition that has not been acked. For example, let's say we have enqueue 4 records when commitOffsets() is called:

1. SubmittedRecord{ SourceRecord{...partition=P1,offset=O1...}, acked=true}
2. SubmittedRecord{ SourceRecord{...partition=P1,offset=O2...}, acked=true}
3. SubmittedRecord{ SourceRecord{...partition=P1,offset=O3...}, acked=false}
4. SubmittedRecord{ SourceRecord{...partition=P2,offset=O1...}, acked=true}
5. SubmittedRecord{ SourceRecord{...partition=P2,offset=O2...}, acked=true}
6. SubmittedRecord{ SourceRecord{...partition=P1,offset=O4...}, acked=false}
7. SubmittedRecord{ SourceRecord{...partition=P2,offset=O3...}, acked=true}
8. SubmittedRecord{ SourceRecord{...partition=P2,offset=O4...}, acked=true}
9. SubmittedRecord{ SourceRecord{...partition=P2,offset=O5...}, acked=true}
10. SubmittedRecord{ SourceRecord{...partition=P2,offset=O6...}, acked=false}

This might happen if records 4, 5, and 7-10 were written to different topic partitions than records 1, 2, 3, and 6, and the producer is stuck on the latter partitions.

With the simplistic logic, we'd only dequeue record 1 and 2, we'd add the offset for these two records to the offset writer, and we'd flush offset partition=P1,offset=O2. We'd end up with the following remaining in the queue (using the same record numbers as before):

3. SubmittedRecord{ SourceRecord{...partition=P1,offset=O3...}, acked=false}
4. SubmittedRecord{ SourceRecord{...partition=P2,offset=O1...}, acked=true}
5. SubmittedRecord{ SourceRecord{...partition=P2,offset=O2...}, acked=true}
6. SubmittedRecord{ SourceRecord{...partition=P1,offset=O4...}, acked=false}
7. SubmittedRecord{ SourceRecord{...partition=P2,offset=O3...}, acked=true}
8. SubmittedRecord{ SourceRecord{...partition=P2,offset=O4...}, acked=true}
9. SubmittedRecord{ SourceRecord{...partition=P2,offset=O5...}, acked=true}
10. SubmittedRecord{ SourceRecord{...partition=P2,offset=O6...}, acked=false}

There are quite a few records with source partition P2 that were acked but not dequeued, simply because they were behind an unacked record with a different source partition.

However, if we dequeue all acked records with a source partition map that does not match a previously un-acked record, then we'd be able to dequeue more records and also flush offsets partition=P1,offset=O2,partition=P2,offset=O7. We'd end up with a much smaller queue (again, using the same record numbers as before):

3. SubmittedRecord{ SourceRecord{...partition=P1,offset=O3...}, acked=false}
6. SubmittedRecord{ SourceRecord{...partition=P1,offset=O4...}, acked=false}
10. SubmittedRecord{ SourceRecord{...partition=P2,offset=O6...}, acked=false}

This minor change will dramatically improve the ability to commit offsets closer to what has actually be acked.

        // Dequeue all submitted records that have been acknowledged and don't have a source partition with an unacked record
        Set<Map<String, ?>> unackedPartitions = new HashSet<>();
        Iterator<SubmittedRecord> iter = submittedRecords.iterator();
        while (iter.hasNext()) {
            SubmittedRecord next = iter.next();
            SourceRecord record = next.record();
            Map<String, ?> partition = record.sourcePartition();
            if (next.isAcknowledged() && !unackedPartitions.contains(partition)) {
                // The record is acknowledged and does not share a source partition with an unacknowledged record,
                // so we can remove it from the queue and write the offsets to the offset writer.
                iter.remove();
                offsetWriter.offset(partition, record.sourceOffset());
            } else {
                // As soon as we see an unacknowledged record, we have to prevent dequeuing all subsequent records that use that same partition
                unackedPartitions.add(partition);
            }
        }

@rhauch
Copy link
Copy Markdown
Contributor

rhauch commented Sep 1, 2021

Also, with my proposed approach, the offsetWriter is only used (directly or indirectly) within the commitOffsets() method. That means we could actually do without the synchronized(this) block around the offsetWriter.beginFlush().

@C0urante
Copy link
Copy Markdown
Contributor Author

C0urante commented Sep 7, 2021

@rhauch Overall that looks good to me. It's an elegant solution to the tricky problem you noted about the opacity of task-provided source offsets w/r/t ordering.

I'm a little worried about offset commits taking longer and longer with the more sophisticated approach you proposed (where we would unconditionally iterate over every record in the batch, instead of only until the first unacknowledged record). It's true that there would be natural back pressure from the producer as its buffer.memory gets eaten up, but with the default of 32MB, it still seems possible for a large number of unacknowledged records to build up. If this does happen, then offset commits may end up exceeding the offset.flush.timeout.ms for the worker, which may cause issues with the current model where a single shared, worker-global thread is used for offset commits of all tasks.

If this is a valid concern and we'd like to take it into account for now, I can think of a couple ways to handle it off the top of my head:

  1. Use the simpler approach that blocks offset commits across the board if a single record remains unacknowledged for a long period of time (which may realistically be a problem if a single partition out of many is unavailable for some reason).
  2. Enable concurrent offset commits by multiple tasks.
  3. Instead of a single dequeue per task, use a ConcurrentMap<Map<String, ?>, Queue<SubmittedRecord>> that stores a single dequeue per unique source partition. This would allow us to iterate over the bare minimum number of records for every single offset commit and not spend time, for example, on accumulated records for unavailable Kafka partitions. We'd still have to iterate over those records eventually if the Kafka partition came back online, but that iteration would only ever occur once, instead of once for every offset commit.

I think option 3 may be warranted, although it's still possible that offset commits take a long time if 32MB worth of records end up getting queued. Option 2 may be worth implementing or at least considering as a follow-up item to handle this case.

Thoughts?

@rhauch
Copy link
Copy Markdown
Contributor

rhauch commented Sep 7, 2021

@C0urante, thanks for the feedback on my suggestion. I like your option 3, because it does allow the iteration to stop on each source partition as soon as it encounters the first unacknowledged record in each queue. I also think that the behavior with the suggested approach and your option 3 is still a lot better than the current situation.

One question, though: you mention that it might be a problem if iterating over the submitted records takes longer than offset.flush.timeout.ms. But IIUC the offset.flush.timeout.ms would actually not be used anymore, as there actually are no timeouts as the offset commit thread doesn't block anymore. So, worst case, if task A has a ton of submitted records that have to be iterated over (e.g., fast producer and fast source task), it might slow the committing of offsets for other tasks. (Again, this is not any worse than the current behavior.) But your option 2 would help with this at the risk of using more threads, and so we may also want to consider this to help ensure that no slowly-proceeding producer of a task blocks other offset commits.

Of course, another option might be to incur the iteration on the worker source task thread. That would essentially move the use of the queue(s) to the worker source task thread, tho we still need to get the offsets to the offset commit thread and so would likely have to keep the synchronization blocks around the offset writer snapshot. On one hand, that's putting more work onto the worker source task thread and making the offset thread super straightforward (snapshot and write); on the other it's putting the onus on the worker source task thread.

Thoughts?

@C0urante
Copy link
Copy Markdown
Contributor Author

C0urante commented Sep 8, 2021

I also think that the behavior with the suggested approach and your option 3 is still a lot better than the current situation.

Agreed 👍

IIUC the offset.flush.timeout.ms would actually not be used anymore, as there actually are no timeouts as the offset commit thread doesn't block anymore.

That's mostly correct--we wouldn't be waiting on a blocking operation while iterating through the dequeue(s), although we might still choose to block on the actual write to the offset topic in the same way that we currently do just for the sake of metrics and allowing users to monitor the health of the connection between the Connect worker and the offsets topic. Not a huge deal though, and the point that we wouldn't be blocking on the task's producer is still valid.

I think the issue is less that we'd end up timing out and more that we'd end up violating the guarantee that's provided right now by the framework that each task gets to take up only offset.flush.timeout.ms milliseconds per offset commit attempt before aborting the attempt and yielding control to the next task. A dequeue-based approach may actually be worse than the current behavior in that regard if there's no check in place to ensure that iterating over the dequeue doesn't exceed the offset flush timeout. Probably worth the tradeoff, but we can probably satisfy both objectives with your suggestion:

another option might be to incur the iteration on the worker source task thread.

I think this'd be great, especially with the snapshotting logic you mention, which should basically eliminate any blocking between the two threads except to prevent race conditions while simple operations like clearing a hash map or assigning a new value to an instance variable take place.

One thing that gave me pause initially was the realization that we'd be double-iterating over every source record at this point: once to transform, convert, and dispatch the record to the producer, and then once to verify that it had been acknowledged while iterating over the dequeue it's in. But I can't imagine it'd make a serious difference with CPU utilization given that transformation, conversion, and dispatching to a producer are likely to be at least an order of magnitude more expensive than just checking a boolean flag and possibly inserting the record's offset into a hash map. And memory utilization should be very close to the existing approach, which already tracks every single unacknowledged record in the outstandingMessages and outstandingMessagesBacklog fields.

I think this buys us enough that my earlier-mentioned option 2 (multiple threads for offset commits) isn't called for, since the only blocking operation that would be performed during offset commit at this point is a write to the offsets topic. If the offsets topic is unavailable, it's likely that the impact would be the same across all tasks (unless the task is using a separate offsets topic, which will become possible once the changes for KIP-618 are merged), and even if not, things wouldn't be made any worse than they already are: the offset flush timeout would expire, and the next task in line would get its chance to commit offsets.

@rhauch If this is all agreeable I think we're ready to start implementing. Since you've provided a lot of the code yourself I'm happy to let you take on that work if you'd like; otherwise, I'll get started and see if I can have a new PR with these changes out by early next week.

@rhauch
Copy link
Copy Markdown
Contributor

rhauch commented Sep 8, 2021

@C0urante wrote:

@rhauch If this is all agreeable I think we're ready to start implementing. Since you've provided a lot of the code yourself I'm happy to let you take on that work if you'd like; otherwise, I'll get started and see if I can have a new PR with these changes out by early next week.

Sounds good to me! I'm looking forward to your new PR; please link here and ping me. Thanks!

@C0urante
Copy link
Copy Markdown
Contributor Author

@rhauch (and, if interested, @hachikuji) new PR is up: #11323

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants