Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ private void maybeUpdateJoinedSubscription(Set<TopicPartition> assignedPartition
// into the subscriptions as long as they still match the subscribed pattern

Set<String> addedTopics = new HashSet<>();
// this is a copy because its handed to listener below
// this is a copy because it's handed to listener below
for (TopicPartition tp : assignedPartitions) {
if (!joinedSubscription.contains(tp.topic()))
addedTopics.add(tp.topic());
Expand Down Expand Up @@ -759,7 +759,7 @@ private void validateCooperativeAssignment(final Map<String, List<TopicPartition
protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
log.debug("Executing onJoinPrepare with generation {} and memberId {}", generation, memberId);
if (joinPrepareTimer == null) {
// We should complete onJoinPrepare before rebalanceTimeout,
// We should complete onJoinPrepare before rebalanceTimeoutMs,
// and continue to join group to avoid member got kicked out from group
joinPrepareTimer = time.timer(rebalanceConfig.rebalanceTimeoutMs);
} else {
Expand All @@ -782,10 +782,10 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {

// Keep retrying/waiting the offset commit when:
// 1. offset commit haven't done (and joinPrepareTimer not expired)
// 2. failed with retryable exception (and joinPrepareTimer not expired)
// 2. failed with retriable exception (and joinPrepareTimer not expired)
// Otherwise, continue to revoke partitions, ex:
// 1. if joinPrepareTime has expired
// 2. if offset commit failed with no-retryable exception
// 1. if joinPrepareTimer has expired
// 2. if offset commit failed with non-retriable exception
// 3. if offset commit success
boolean onJoinPrepareAsyncCommitCompleted = true;
if (joinPrepareTimer.isExpired()) {
Expand Down Expand Up @@ -841,7 +841,7 @@ protected boolean onJoinPrepare(Timer timer, int generation, String memberId) {
break;

case COOPERATIVE:
// only revoke those partitions that are not in the subscription any more.
// only revoke those partitions that are not in the subscription anymore.
Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());
revokedPartitions.addAll(ownedPartitions.stream()
.filter(tp -> !subscriptions.subscription().contains(tp.topic()))
Expand Down