Skip to content

KAFKA-13472: Correct last-committed offsets tracking for sink tasks after partial revocation#11526

Merged
kkonstantine merged 2 commits intoapache:trunkfrom
C0urante:kafka-13472
Nov 29, 2021
Merged

KAFKA-13472: Correct last-committed offsets tracking for sink tasks after partial revocation#11526
kkonstantine merged 2 commits intoapache:trunkfrom
C0urante:kafka-13472

Conversation

@C0urante
Copy link
Copy Markdown
Contributor

Jira

The WorkerSinkTask.lastCommittedOffsets field is now added to (via Map::putAll) after a successful offset commit, instead of being completely overwritten. In order to prevent this collection from growing indefinitely, elements are removed from it after topic partitions are revoked from the task's consumer.

Two test cases are added to WorkerSinkTaskTest:

  • A basic test to verify the "rewind for redelivery" behavior when a task throws an exception from SinkTask::preCommit; surprisingly, no existing test cases appear to cover this scenario
  • A more sophisticated test to verify this same behavior, but with a few rounds of cooperative consumer rebalancing beforehand that expose a bug in the current logic for the WorkerSinkTask class

The VerifiableSinkTask class is also updated to only flush the requested topic partitions in its flush method. This is technically unrelated to the issue addressed by this PR and can be moved to a separate PR if necessary; including it here as the original context for identifying this bug was debugging failed system tests and the logic in this part of the tests was originally suspected as a cause of the test failure.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@C0urante
Copy link
Copy Markdown
Contributor Author

@kkonstantine would you mind taking a look at this 3.1 blocker? There was a small issue in #10563 that causes a regression in some edge cases.

Copy link
Copy Markdown
Contributor

@kkonstantine kkonstantine left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix @C0urante.
LGTM overall, I have a comment on the removal method.

Also, by reading your description, I understand this issue was discovered through system tests. Would it be possible to have run these system tests before merging the original PR? Where these AK system tests or connector system tests?

topicPartitions.forEach(currentOffsets::remove);
}
updatePartitionCount();
topicPartitions.forEach(lastCommittedOffsets::remove);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is removeAll a supported operation by the underlying set of the hashmap here? Is it applicable?
(same question above for currentOffsets)

Copy link
Copy Markdown
Contributor Author

@C0urante C0urante Nov 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, neat trick. Ran unit+integration tests locally after making this change and they all passed; looks like the operation is supported, and yes, it does appear applicable.

@C0urante
Copy link
Copy Markdown
Contributor Author

Thanks @kkonstantine. My description may have been a little misleading; to clarify, none of this was caught by automated testing (unit, integration, or system); it was discovered by re-reading the code base while investigating an unrelated failure. I made note of system tests because the fix to the VerifiableSinkTask class seemed like it may have been useful initially, and after discovering that it was not the cause of the failure, still seemed worth keeping around in case a future test relied on that part of its behavior being correct. It is not related to the bug addressed by this PR, though, so it felt worth calling out in case we want to withhold that change for this blocker PR during code freeze.

Copy link
Copy Markdown
Contributor

@kkonstantine kkonstantine left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix @C0urante !
The test failures are not relevant to this PR
LGTM

Merging to trunk and 3.1/3.0 given that this is an approved blocker. cc @dajac

@kkonstantine kkonstantine merged commit e8dcbb9 into apache:trunk Nov 29, 2021
kkonstantine pushed a commit that referenced this pull request Nov 29, 2021
…fter partial revocation (#11526)

The `WorkerSinkTask.lastCommittedOffsets` field is now added to (via `Map::putAll`) after a successful offset commit, instead of being completely overwritten. In order to prevent this collection from growing indefinitely, elements are removed from it after topic partitions are revoked from the task's consumer.

Two test cases are added to `WorkerSinkTaskTest`:

- A basic test to verify the "rewind for redelivery" behavior when a task throws an exception from `SinkTask::preCommit`; surprisingly, no existing test cases appear to cover this scenario
- A more sophisticated test to verify this same behavior, but with a few rounds of cooperative consumer rebalancing beforehand that expose a bug in the current logic for the `WorkerSinkTask` class

The `VerifiableSinkTask` class is also updated to only flush the requested topic partitions in its `flush` method. This is technically unrelated to the issue addressed by this PR and can be moved to a separate PR if necessary; including it here as the original context for identifying this bug was debugging failed system tests and the logic in this part of the tests was originally suspected as a cause of the test failure.

Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
kkonstantine pushed a commit that referenced this pull request Nov 29, 2021
…fter partial revocation (#11526)

The `WorkerSinkTask.lastCommittedOffsets` field is now added to (via `Map::putAll`) after a successful offset commit, instead of being completely overwritten. In order to prevent this collection from growing indefinitely, elements are removed from it after topic partitions are revoked from the task's consumer.

Two test cases are added to `WorkerSinkTaskTest`:

- A basic test to verify the "rewind for redelivery" behavior when a task throws an exception from `SinkTask::preCommit`; surprisingly, no existing test cases appear to cover this scenario
- A more sophisticated test to verify this same behavior, but with a few rounds of cooperative consumer rebalancing beforehand that expose a bug in the current logic for the `WorkerSinkTask` class

The `VerifiableSinkTask` class is also updated to only flush the requested topic partitions in its `flush` method. This is technically unrelated to the issue addressed by this PR and can be moved to a separate PR if necessary; including it here as the original context for identifying this bug was debugging failed system tests and the logic in this part of the tests was originally suspected as a cause of the test failure.

Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
@C0urante C0urante deleted the kafka-13472 branch November 29, 2021 20:17
@C0urante
Copy link
Copy Markdown
Contributor Author

Thanks @kkonstantine, appreciate the turnaround.

xdgrulez pushed a commit to xdgrulez/kafka that referenced this pull request Dec 22, 2021
…fter partial revocation (apache#11526)

The `WorkerSinkTask.lastCommittedOffsets` field is now added to (via `Map::putAll`) after a successful offset commit, instead of being completely overwritten. In order to prevent this collection from growing indefinitely, elements are removed from it after topic partitions are revoked from the task's consumer.

Two test cases are added to `WorkerSinkTaskTest`:

- A basic test to verify the "rewind for redelivery" behavior when a task throws an exception from `SinkTask::preCommit`; surprisingly, no existing test cases appear to cover this scenario
- A more sophisticated test to verify this same behavior, but with a few rounds of cooperative consumer rebalancing beforehand that expose a bug in the current logic for the `WorkerSinkTask` class

The `VerifiableSinkTask` class is also updated to only flush the requested topic partitions in its `flush` method. This is technically unrelated to the issue addressed by this PR and can be moved to a separate PR if necessary; including it here as the original context for identifying this bug was debugging failed system tests and the logic in this part of the tests was originally suspected as a cause of the test failure.

Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
lmr3796 pushed a commit to lmr3796/kafka that referenced this pull request Jun 2, 2022
…fter partial revocation (apache#11526)

The `WorkerSinkTask.lastCommittedOffsets` field is now added to (via `Map::putAll`) after a successful offset commit, instead of being completely overwritten. In order to prevent this collection from growing indefinitely, elements are removed from it after topic partitions are revoked from the task's consumer.

Two test cases are added to `WorkerSinkTaskTest`:

- A basic test to verify the "rewind for redelivery" behavior when a task throws an exception from `SinkTask::preCommit`; surprisingly, no existing test cases appear to cover this scenario
- A more sophisticated test to verify this same behavior, but with a few rounds of cooperative consumer rebalancing beforehand that expose a bug in the current logic for the `WorkerSinkTask` class

The `VerifiableSinkTask` class is also updated to only flush the requested topic partitions in its `flush` method. This is technically unrelated to the issue addressed by this PR and can be moved to a separate PR if necessary; including it here as the original context for identifying this bug was debugging failed system tests and the logic in this part of the tests was originally suspected as a cause of the test failure.

Reviewers: Konstantine Karantasis <k.karantasis@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants