Skip to content

KAFKA-13469: Block for in-flight record delivery before end-of-life source task offset commit#11524

Merged
rhauch merged 5 commits intoapache:trunkfrom
C0urante:kafka-13469
Nov 30, 2021
Merged

KAFKA-13469: Block for in-flight record delivery before end-of-life source task offset commit#11524
rhauch merged 5 commits intoapache:trunkfrom
C0urante:kafka-13469

Conversation

@C0urante
Copy link
Copy Markdown
Contributor

Jira

Although committing source task offsets without blocking on the delivery of all in-flight records is beneficial most of the time, it can lead to duplicate record delivery if there are in-flight records at the time of the task's end-of-life offset commit.

A best-effort attempt is made here to wait for any such in-flight records to be delivered before proceeding with the end-of-life offset commit for source tasks. Connect will block for up to offset.flush.timeout.ms milliseconds before calculating the latest committable offsets for the task and flushing those to the persistent offset store.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

// Ignore and allow to exit.
} finally {
submittedRecords.awaitAllMessages(
workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG),
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Waiting for up to offset.flush.timeout.ms milliseconds here may cause shutdown to block for double that time (since the subsequent call to WorkerSourceTask::commitOffsets may also block for offset.flush.timeout.ms milliseconds while flushing offset information to the persistent backing store).

I initially considered an approach where we would require the entire combination of (awaiting in-flight messages, computing committable offsets, committing offsets to backing store) to complete in offset.flush.timeout.ms milliseconds, but this came with the unfortunate drawback that, if any in-flight record were undeliverable within the flush timeout, it would prevent any offsets from being committed, which seems counter to the original motivation of the changes we made for KAFKA-12226 in #11323.

We could also consider blocking for at most half of offset.flush.timeout.ms here and then blocking for the remaining time in commitOffsets if we want to honor the docstring for offset.flush.timeout.ms as strictly as possible:

Maximum number of milliseconds to wait for records to flush and partition offset data to be committed to offset storage before cancelling the process and restoring the offset data to be committed in a future attempt.

But I don't know if that's really necessary since in this case, these offsets have no chance of being committed in a future attempt (at least, not by the same task instance), and the offset commit is taking place on the task's work thread instead of the source task offset commit thread, so there's no worry about squatting on that thread and blocking other tasks from being able to commit offsets while it's in use.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think I agree. The previous behavior called commitOffsets() on shutdown, and that method blocked for up to offset.flush.timeout.ms anyway. So IMO it makes sense to block up to that amount of time before calling updateCommittableOffsets() and commitOffsets() on the subsequent lines.

@C0urante
Copy link
Copy Markdown
Contributor Author

@rhauch would you mind taking a look at this 3.1 blocker? There was an issue in #11323 that causes a regression in some fairly-common cases.

Copy link
Copy Markdown
Contributor

@rhauch rhauch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @C0urante. I like the approach you've taken here, and as noted below it does seem consistent with the previous logic that blocked in commitOffsets() for up to offset.flush.timeout.ms milliseconds upon shutdown.

I do have a few suggestions, but mostly just around removing what I think is unnecessary synchronization around the already concurrent types used in the new fields.

Comment on lines +154 to +160
// Create a new message drain latch as a local variable to avoid SpotBugs warnings about inconsistent synchronization
// on an instance variable when invoking CountDownLatch::await outside a synchronized block
CountDownLatch messageDrainLatch;
synchronized (this) {
messageDrainLatch = new CountDownLatch(numUnackedMessages.get());
this.messageDrainLatch = messageDrainLatch;
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this code be simplified and the synchronized block removed altogether by changing the messageDrainLatch field to:

    private final AtomicReference<CountDownLatch> messageDrainLatch = new AtomicReference<>();

and then changing these lines above to:

Suggested change
// Create a new message drain latch as a local variable to avoid SpotBugs warnings about inconsistent synchronization
// on an instance variable when invoking CountDownLatch::await outside a synchronized block
CountDownLatch messageDrainLatch;
synchronized (this) {
messageDrainLatch = new CountDownLatch(numUnackedMessages.get());
this.messageDrainLatch = messageDrainLatch;
}
// Create a new message drain latch as a local variable to avoid SpotBugs warnings about inconsistent synchronization
// on an instance variable when invoking CountDownLatch::await outside a synchronized block
CountDownLatch messageDrainLatch = this.messageDrainLatch.updateAndGet(existing -> new CountDownLatch(numUnackedMessages.get()));

This synchronized block ensures that the latch is initialized sized with the number of unacked messages at the time this method is called but does not prevent new messages from being added. Using concurrent types for the two fields solves the first issue, while the second is prevented by having the WorkerSourceTask#execute() method call submit(...), which increments numUnackedMessages, and then only call awaitAllMessages() after the task has been told to stop (at which point submit(...) will not be called again and numAckedMessages will not be incremented).

And calling out that last assumption would be good. Might be as simple as adding the following to thesubmitMessages(...) JavaDoc:

     *
     * <p>This method should never be called after {@link #awaitAllMessages(long, TimeUnit)} has been called.

You'd also have to change the messageAcked() method to use the atomic reference:

    private void messageAcked() {
        numUnackedMessages.decrementAndGet();
        CountDownLatch messageDrainLatch = this.messageDrainLatch.get();
        if (messageDrainLatch != null) {
            messageDrainLatch.countDown();
        }
    }

and remove the synchronized keyword. Again, I don't think we need to atomically update both the number of acked messages and counts down the latch atomically; we really just need them each to be updated consistently.

There are a few reasons why this might be better:

  1. We avoid synchronized keyword, which might be unclear to our future selves ("Is the documentation about not being thread-safe wrong?").
  2. We can make the fields be final, which IMO makes the logic a bit easier to follow.
  3. We don't need stricter synchronization than individual atomic updates of each field.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Randall. I agree that the synchronization here is, even if necessary, inelegant, and I hope that we can improve things. But I'm worried that the proposal here may be prone to a race condition.

Imagine we restructure the code with your suggestions and the result is this:

class SubmittedRecords {

    private final AtomicReference messageDrainLatch = new AtomicReference<>();

    private boolean awaitAllMessages(long timeout, TimeUnit timeUnit) {
        // (2)
        CountDownLatch messageDrainLatch = this.messageDrainLatch.updateAndGet(existing -> new CountDownLatch(numUnackedMessages.get()));
        try {
            return messageDrainLatch.await(timeout, timeUnit);
        } catch (InterruptedException e) {
            return false;
        }
    }

    private void messageAcked() {
        // (1)
        numUnackedMessages.decrementAndGet();
        // (3)
        CountDownLatch messageDrainLatch = this.messageDrainLatch.get();
        if (messageDrainLatch != null) {
            messageDrainLatch.countDown();
        }
    }
}

Isn't it still possible that the lines marked (1), (2), and (3) could execute in that order? And in that case, wouldn't it cause awaitAllMessages to return early since the CountDownLatch created in part (2) would use the already-decremented value of numUnackedMessages after part (1) was executed, but then also be counted down for the same message in part (3)?

FWIW, I originally used a volatile int for the numUnackedMessages field, but got a SpotBugs warning about incrementing a volatile field being a non-atomic operation for lines like numUnackedMessages++; in SubmittedRecords::submit. If we synchronize every access to that field, it shouldn't matter that increments/decrements are non-atomic, and we can consider adding an exemption to spotbugs-exclude.xml.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just pushed an update that simplifies things a little bit but still utilizes synchronized blocks. I realized that, since we're synchronizing around every read of the numUnackedMessages field, it doesn't even need to be declared volatile, and so we can just replace it with a regular, primitive int.

// Ignore and allow to exit.
} finally {
submittedRecords.awaitAllMessages(
workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think I agree. The previous behavior called commitOffsets() on shutdown, and that method blocked for up to offset.flush.timeout.ms anyway. So IMO it makes sense to block up to that amount of time before calling updateCommittableOffsets() and commitOffsets() on the subsequent lines.

C0urante and others added 2 commits November 23, 2021 11:28
…/SubmittedRecords.java

Co-authored-by: Randall Hauch <rhauch@gmail.com>
…ssages field final, move initialization of final fields to their declarations
@C0urante
Copy link
Copy Markdown
Contributor Author

Thanks for the rapid turnaround, @rhauch. I'm not convinced yet that the suggestion to restructure SubmittedRecords to remove synchronized blocks will result in correct behavior but I'm happy to make that change (or others that make the logic easier to follow) if we can verify that it'll be correct.

Copy link
Copy Markdown
Contributor

@rhauch rhauch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @C0urante. This is a bit cleaner. Two relatively minor comments below.

Comment on lines +206 to +208
if (this.acked.compareAndSet(false, true)) {
messageAcked();
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice simplification.

try {
return messageDrainLatch.await(timeout, timeUnit);
} catch (InterruptedException e) {
return false;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this clear the interrupted flag before returning, since we're handling the interruption here?

Copy link
Copy Markdown
Contributor Author

@C0urante C0urante Nov 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this is already accomplished. According to the Javadocs for CountDownLatch::await:

If the current thread:
     • has its interrupted status set on entry to this method; or
     • is interrupted while waiting,
then InterruptedException is thrown and the current thread's interrupted status is cleared.

@C0urante
Copy link
Copy Markdown
Contributor Author

I ran the connect_distributed_test.ConnectDistributedTest.test_bounce system test repeatedly with the changes from this PR and, with clean=True (the configuration for the tests that causes them to check to ensure that no duplicates are produced), there were 24 green runs.

Since this test was the one that originally surfaced the regression addressed here, and all unit and integration tests for Connect are passing, I believe this gives us reasonable confidence that the fix here works as expected and does not introduce another regression.

Copy link
Copy Markdown
Contributor

@rhauch rhauch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix, @C0urante. Looks good, given some successful builds and all failures that appear to be unrelated to Connect.

@rhauch rhauch merged commit f875576 into apache:trunk Nov 30, 2021
@C0urante C0urante deleted the kafka-13469 branch November 30, 2021 16:38
@C0urante
Copy link
Copy Markdown
Contributor Author

Thanks Randall, appreciate the turnaround.

rhauch pushed a commit that referenced this pull request Nov 30, 2021
…ource task offset commit (#11524)

Although committing source task offsets without blocking on the delivery of all in-flight records is beneficial most of the time, it can lead to duplicate record delivery if there are in-flight records at the time of the task's end-of-life offset commit.

A best-effort attempt is made here to wait for any such in-flight records to be delivered before proceeding with the end-of-life offset commit for source tasks. Connect will block for up to offset.flush.timeout.ms milliseconds before calculating the latest committable offsets for the task and flushing those to the persistent offset store.

Author: Chris Egerton <chrise@confluent.io>
Reviewer: Randall Hauch <rhauch@gmail.com>
rhauch pushed a commit that referenced this pull request Nov 30, 2021
…ource task offset commit (#11524)

Although committing source task offsets without blocking on the delivery of all in-flight records is beneficial most of the time, it can lead to duplicate record delivery if there are in-flight records at the time of the task's end-of-life offset commit.

A best-effort attempt is made here to wait for any such in-flight records to be delivered before proceeding with the end-of-life offset commit for source tasks. Connect will block for up to offset.flush.timeout.ms milliseconds before calculating the latest committable offsets for the task and flushing those to the persistent offset store.

Author: Chris Egerton <chrise@confluent.io>
Reviewer: Randall Hauch <rhauch@gmail.com>
xdgrulez pushed a commit to xdgrulez/kafka that referenced this pull request Dec 22, 2021
…ource task offset commit (apache#11524)

Although committing source task offsets without blocking on the delivery of all in-flight records is beneficial most of the time, it can lead to duplicate record delivery if there are in-flight records at the time of the task's end-of-life offset commit.

A best-effort attempt is made here to wait for any such in-flight records to be delivered before proceeding with the end-of-life offset commit for source tasks. Connect will block for up to offset.flush.timeout.ms milliseconds before calculating the latest committable offsets for the task and flushing those to the persistent offset store.

Author: Chris Egerton <chrise@confluent.io>
Reviewer: Randall Hauch <rhauch@gmail.com>
lmr3796 pushed a commit to lmr3796/kafka that referenced this pull request Jun 2, 2022
…ource task offset commit (apache#11524)

Although committing source task offsets without blocking on the delivery of all in-flight records is beneficial most of the time, it can lead to duplicate record delivery if there are in-flight records at the time of the task's end-of-life offset commit.

A best-effort attempt is made here to wait for any such in-flight records to be delivered before proceeding with the end-of-life offset commit for source tasks. Connect will block for up to offset.flush.timeout.ms milliseconds before calculating the latest committable offsets for the task and flushing those to the persistent offset store.

Author: Chris Egerton <chrise@confluent.io>
Reviewer: Randall Hauch <rhauch@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants