Skip to content

KAFKA-12487: Add support for cooperative consumer protocol with sink connectors#10563

Merged
kkonstantine merged 6 commits intoapache:trunkfrom
C0urante:kafka-12487
Nov 10, 2021
Merged

KAFKA-12487: Add support for cooperative consumer protocol with sink connectors#10563
kkonstantine merged 6 commits intoapache:trunkfrom
C0urante:kafka-12487

Conversation

@C0urante
Copy link
Copy Markdown
Contributor

@C0urante C0urante commented Apr 19, 2021

Jira

Currently, the WorkerSinkTask's consumer rebalance listener (and related logic) is hardcoded to assume eager rebalancing, which means that all partitions are revoked any time a rebalance occurs and then the set of partitions included in onPartitionsAssigned is assumed to be the complete assignment for the task. Not only does this cause failures when the cooperative consumer protocol is used, it fails to take advantage of the benefits provided by that protocol.

These changes alter framework logic to not only not break when the cooperative consumer protocol is used for a sink connector, but to reap the benefits of it as well, by not revoking partitions unnecessarily from tasks just to reopen them immediately after the rebalance has completed.

This change will be necessary in order to support KIP-726, which currently proposes that the default consumer partition assignor be changed to the CooperativeStickyAssignor.

Two integration tests are added to verify sink task behavior with both eager and cooperative consumer protocols, and new and existing unit tests are adopted as well.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@C0urante
Copy link
Copy Markdown
Contributor Author

@ncliang @gharris1727 do either of you have time to take a look?

@ableegoldman @showuon @guozhangwang FYI, this should be the only change to Connect necessary for KIP-726.

Copy link
Copy Markdown
Contributor

@ncliang ncliang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @C0urante . The overall approach LG. I have just a couple questions.

task.close(topicPartitions);
if (workerErrantRecordReporter != null) {
log.trace("Cancelling reported errors for {}", topicPartitions);
workerErrantRecordReporter.cancelFutures(topicPartitions);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if cancelling the outstanding futures for error reporting is the right thing to do here. Would it be reasonable to await their completion for a reasonable amount of time before giving up?

Copy link
Copy Markdown
Contributor Author

@C0urante C0urante Apr 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, we've lost the partition assignment and won't be able to commit offsets for them, and since offset commits don't take place before outstanding error reports have completed, we know that no offsets for the records that caused these error reports have been committed either. So, we're guaranteed that the records that the task reported errors for will be redelivered, to whichever task now owns these partitions, regardless of whether we wait here for the reporting to complete.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. So the distinction is because in the revoked case we get a chance to await on the errant record reporter futures before we commit offsets for the revoked partitions, but in the lost case we've already lost the partition and don't get a chance to commit offsets. Since we already do not own the partition, we should not be reporting errors for it and should let the current owner take that responsibility. It should be noted that the cancelation is best effort, so there is a chance we duplicate reporting for the errant record.

// sure anything we paused that the task didn't request to be paused *and* which we still own is resumed.
// Also make sure our tracking of paused partitions is updated to remove any partitions we no longer own.
pausedForRedelivery = false;
pausedForRedelivery = pausedForRedelivery && !messageBatch.isEmpty();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if this change is required. The way I read the current implementation, we make sure that the paused partitions contain only assigned partitions in the block below, setting the paused partitions on context. We then rely on the code block in iteration() to resume partitions that should not be paused.

            } else if (!pausedForRedelivery) {
                resumeAll();
                onResume();
            }

Setting this to anything other than false causes us not to resume partitions which we own that were not explicitly requested to be paused.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Setting this to anything other than false causes us not to resume partitions which we own that were not explicitly requested to be paused.

Yep, that's intentional. The retry loop for sink tasks works like this:

  • Grab new batch of records from consumer, store in messageBatch
  • Deliver batch to task via SinkTask::put
  • Catch a RetriableException from the task
  • Pause all partitions on the consumer and set pausedForRedelivery to true
  • Redeliver batch (stored in the messageBatch field) to task via SinkTask::put continuously
  • Once a delivery attempt is successful, clear the batch stored in messageBatch and, since pausedForRedelivery is true, resume all partitions on the consumer and reset pausedForDelivery to false

When consumer rebalances meant a mass-revocation and mass-reassignment of partitions, we could get away with unconditionally resetting pausedForRedelivery to true during them, since offsets for any message batches going through the retry loop would not be committed and those records would be guaranteed to be redelivered after the rebalance. However, with incremental consumer rebalances, a task might not be forced to commit offsets for some (or even all) records in its current batch (if, for example, the new assignment only included additional partitions and no revocations). In that case, we have to be careful to remain paused so that the current batch can continue to be retried.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm actually wondering if there's a bug in the existing code; not sure pausedForRedelivery should be reset to false after a consumer rebalance. If the worker is in the middle of retrying a failed batch delivery, all the partitions will be paused, and after the rebalance, nothing explicitly resumes them.

During normal execution (when the connector isn't in the middle of redelivering a failed batch) this doesn't surface because partition consumption hasn't been paused anyways. But that assumption breaks down if we've paused to retry a failed batch.

I think it might be necessary to throw in a call to resumeAll() if pausedForRedelivery transitions from true to false as a result of a consumer rebalance (or, if we're lazy, we can just throw it in unconditionally at the end of each consumer rebalance if pausedForRedelivery is false since Consumer::resume is a no-op for partitions that weren't already paused).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oooooh, one more edge case to consider--if new partitions are assigned while in the middle of retrying a failed batch, right now the worker might accidentally receive new records from those partitions, which would violate the assertion that new records can be received from the consumer if and only if the messageBatch field is empty.

I think we can (and should) handle this by invoking pauseAll() if pausedForRedelivery is still true after removing records from revoked partitions from the current batch.

Copy link
Copy Markdown
Contributor Author

@C0urante C0urante Apr 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've pushed a change that should address this gap and includes a new unit test for the pause/resume logic while paused for redelivery across consumer rebalances.

Copy link
Copy Markdown
Contributor

@ncliang ncliang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for the PR @C0urante . The changes here are complex with many edge cases, but I think you've covered them with sufficient test coverage.

task.close(topicPartitions);
if (workerErrantRecordReporter != null) {
log.trace("Cancelling reported errors for {}", topicPartitions);
workerErrantRecordReporter.cancelFutures(topicPartitions);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. So the distinction is because in the revoked case we get a chance to await on the errant record reporter futures before we commit offsets for the revoked partitions, but in the lost case we've already lost the partition and don't get a chance to commit offsets. Since we already do not own the partition, we should not be reporting errors for it and should let the current owner take that responsibility. It should be noted that the cancelation is best effort, so there is a chance we duplicate reporting for the errant record.

Copy link
Copy Markdown
Contributor

@kkonstantine kkonstantine left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @C0urante
Left a few comments

log.debug("{} Rewinding topic partition {} to offset {}", this, entry.getKey(), entry.getValue().offset());
consumer.seek(entry.getKey(), entry.getValue().offset());
}
currentOffsets = new HashMap<>(lastCommittedOffsets);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change means we never create a fresh copy of the offsets map.
Is this correct? Is there a risk for the offsets to keep being added and never removed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I believe this would lead to incorrect metrics given the use of the currentOffsets field when invoking sinkTaskMetricsGroup::assignedOffsets, and may lead to issues with the end-of-life call to closePartitions as well.

I'll add cleanup logic in the consumer rebalance listener.

commitStarted = now;
sinkTaskMetricsGroup.recordOffsetSequenceNumber(commitSeqno);

Map<TopicPartition, OffsetAndMetadata> lastCommittedOffsets = this.lastCommittedOffsets.entrySet().stream()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shadowing of a member field this way is not ideal.
For example there's an older usage of this collection below. I assume it's the new local variable we want to use, but I can't be 100% sure.

Should we call this actuallyCommittedOffsets or similar?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The "actually committed offsets" (as in, the ones that we pass to the consumer) are computed a bit further on in the method. Maybe lastCommittedOffsetsForPartitions? Bit of a mouthful, will try to think of something better.

log.trace("Awaiting all reported errors to be completed");
workerErrantRecordReporter.awaitAllFutures();
log.trace("Completed all reported errors");
log.trace("Awaiting reported errors for {} to be completed", topicPartitions);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we want to print the actual list of partitions here, which might be long. And do it twice.
I see the same pattern is applied elsewhere. I understand the value of explicit listing.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's fair; it may be useful to list once but probably not more than that. I'll remove the set of partitions from these log lines and add a single TRACE-level line at the top of the method that logs the set of partitions; at that level, I think the verbosity is acceptable, especially after this change goes out in case we need to do some debugging of unexpected behavior. LMK what you think, though.

this, partition, taskProvidedOffset, consumer.assignment());
this, partition, taskProvidedOffset, allAssignedTopicPartitions);
} else {
log.debug("{} Ignoring task provided offset {}/{} -- topic partition not requested, requested={}",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's a requested topic partition?
Also, above we mention just partition

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, s/topic partition/partition/

"Requested" here means that although the partition is assigned to the task, it is not one of the partitions that we are currently committing offsets for.

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
log.debug("{} Partitions assigned {}", WorkerSinkTask.this, partitions);
lastCommittedOffsets = new HashMap<>();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar question as above. This is a map that only grows now. Is this correct?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Addressed above; it is almost certainly not correct)

// Make sure we don't have any leftover data since offsets will be reset to committed positions
messageBatch.clear();
// Make sure we don't have any leftover data since offsets for these partitions will be reset to committed positions
messageBatch.removeIf(record -> partitions.contains(new TopicPartition(record.topic(), record.kafkaPartition())));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you worry that this became more expensive now, especially in cases where we want to remove everything as before?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It certainly will be more expensive, although this will only occur in an edge case where the task has thrown a RetriableException from SinkTask::put, then before a follow-up invocation of SinkTask::put is able to succeed, a consumer rebalance takes place and partitions are revoked.

A naive improvement might be to convert messageBatch to a Map<TopicPartition, List<SinkRecord>>, which would allow us to quickly filter out records belonging to a given topic partition. But that would also be less efficient since we'd have to re-join those lists together before delivering records to the task, and would have to populate that map in the first place after retrieving the original list of records from Consumer::poll.

Not sure if there's a good way around this; if you believe it's a blocker I'm happy to spend some more time on it, though.


// Visible for testing
protected final LinkedList<Future<Void>> futures;
protected final Map<TopicPartition, Future<Void>> futures;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want this to be a concurrent map probably good idea to depict this in the declaration.
(same as you do in the tests below)

Suggested change
protected final Map<TopicPartition, Future<Void>> futures;
protected final ConcurrentMap<TopicPartition, Future<Void>> futures;

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this style and agree with the suggestion; however, there's actually a bug in the PR right now where we only store one Future at a time per topic-partition, so will address in a follow-up commit instead of just clicking the Commit suggestion button.

@C0urante
Copy link
Copy Markdown
Contributor Author

Thanks for taking a look @kkonstantine, this is ready for another round.

@C0urante
Copy link
Copy Markdown
Contributor Author

C0urante commented Oct 7, 2021

@kkonstantine Would you mind giving this another pass? It's been over two months.

Copy link
Copy Markdown
Contributor

@kkonstantine kkonstantine left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @C0urante
LGTM.

I reran the tests and there were no relevant failures. Merging to trunk, 3.1 and 3.0

@kkonstantine kkonstantine merged commit 39b1bf4 into apache:trunk Nov 10, 2021
kkonstantine pushed a commit that referenced this pull request Nov 10, 2021
…connectors (#10563)

Currently, the `WorkerSinkTask`'s consumer rebalance listener (and related logic) is hardcoded to assume eager rebalancing, which means that all partitions are revoked any time a rebalance occurs and then the set of partitions included in `onPartitionsAssigned` is assumed to be the complete assignment for the task. Not only does this cause failures when the cooperative consumer protocol is used, it fails to take advantage of the benefits provided by that protocol.

These changes alter framework logic to not only not break when the cooperative consumer protocol is used for a sink connector, but to reap the benefits of it as well, by not revoking partitions unnecessarily from tasks just to reopen them immediately after the rebalance has completed.

This change will be necessary in order to support [KIP-726](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177048248), which currently proposes that the default consumer partition assignor be changed to the `CooperativeStickyAssignor`.

Two integration tests are added to verify sink task behavior with both eager and cooperative consumer protocols, and new and existing unit tests are adopted as well.

Reviewers: Nigel Liang <nigel@nigelliang.com>, Konstantine Karantasis <k.karantasis@gmail.com>
kkonstantine pushed a commit that referenced this pull request Nov 10, 2021
…connectors (#10563)

Currently, the `WorkerSinkTask`'s consumer rebalance listener (and related logic) is hardcoded to assume eager rebalancing, which means that all partitions are revoked any time a rebalance occurs and then the set of partitions included in `onPartitionsAssigned` is assumed to be the complete assignment for the task. Not only does this cause failures when the cooperative consumer protocol is used, it fails to take advantage of the benefits provided by that protocol.

These changes alter framework logic to not only not break when the cooperative consumer protocol is used for a sink connector, but to reap the benefits of it as well, by not revoking partitions unnecessarily from tasks just to reopen them immediately after the rebalance has completed.

This change will be necessary in order to support [KIP-726](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177048248), which currently proposes that the default consumer partition assignor be changed to the `CooperativeStickyAssignor`.

Two integration tests are added to verify sink task behavior with both eager and cooperative consumer protocols, and new and existing unit tests are adopted as well.

Reviewers: Nigel Liang <nigel@nigelliang.com>, Konstantine Karantasis <k.karantasis@gmail.com>
stanislavkozlovski added a commit to stanislavkozlovski/kafka that referenced this pull request Nov 11, 2021
…ntegration-11-nov

* ak/trunk: (15 commits)
  KAFKA-13429: ignore bin on new modules (apache#11415)
  KAFKA-12648: introduce TopologyConfig and TaskConfig for topology-level overrides (apache#11272)
  KAFKA-12487: Add support for cooperative consumer protocol with sink connectors (apache#10563)
  MINOR: Log client disconnect events at INFO level (apache#11449)
  MINOR: Remove topic null check from `TopicIdPartition` and adjust constructor order (apache#11403)
  KAFKA-13417; Ensure dynamic reconfigurations set old config properly (apache#11448)
  MINOR: Adding a constant to denote UNKNOWN leader in LeaderAndEpoch (apache#11477)
  KAFKA-10543: Convert KTable joins to new PAPI (apache#11412)
  KAFKA-12226: Commit source task offsets without blocking on batch delivery (apache#11323)
  KAFKA-13396: Allow create topic without partition/replicaFactor (apache#11429)
  ...
@C0urante C0urante deleted the kafka-12487 branch November 12, 2021 19:37
xdgrulez pushed a commit to xdgrulez/kafka that referenced this pull request Dec 22, 2021
…connectors (apache#10563)

Currently, the `WorkerSinkTask`'s consumer rebalance listener (and related logic) is hardcoded to assume eager rebalancing, which means that all partitions are revoked any time a rebalance occurs and then the set of partitions included in `onPartitionsAssigned` is assumed to be the complete assignment for the task. Not only does this cause failures when the cooperative consumer protocol is used, it fails to take advantage of the benefits provided by that protocol.

These changes alter framework logic to not only not break when the cooperative consumer protocol is used for a sink connector, but to reap the benefits of it as well, by not revoking partitions unnecessarily from tasks just to reopen them immediately after the rebalance has completed.

This change will be necessary in order to support [KIP-726](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177048248), which currently proposes that the default consumer partition assignor be changed to the `CooperativeStickyAssignor`.

Two integration tests are added to verify sink task behavior with both eager and cooperative consumer protocols, and new and existing unit tests are adopted as well.

Reviewers: Nigel Liang <nigel@nigelliang.com>, Konstantine Karantasis <k.karantasis@gmail.com>
lmr3796 pushed a commit to lmr3796/kafka that referenced this pull request Jun 2, 2022
…connectors (apache#10563)

Currently, the `WorkerSinkTask`'s consumer rebalance listener (and related logic) is hardcoded to assume eager rebalancing, which means that all partitions are revoked any time a rebalance occurs and then the set of partitions included in `onPartitionsAssigned` is assumed to be the complete assignment for the task. Not only does this cause failures when the cooperative consumer protocol is used, it fails to take advantage of the benefits provided by that protocol.

These changes alter framework logic to not only not break when the cooperative consumer protocol is used for a sink connector, but to reap the benefits of it as well, by not revoking partitions unnecessarily from tasks just to reopen them immediately after the rebalance has completed.

This change will be necessary in order to support [KIP-726](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=177048248), which currently proposes that the default consumer partition assignor be changed to the `CooperativeStickyAssignor`.

Two integration tests are added to verify sink task behavior with both eager and cooperative consumer protocols, and new and existing unit tests are adopted as well.

Reviewers: Nigel Liang <nigel@nigelliang.com>, Konstantine Karantasis <k.karantasis@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants