Skip to content

KAFKA-10011: Remove task id from lockedTaskDirectories during handleLostAll#8682

Merged
guozhangwang merged 9 commits intoapache:trunkfrom
abbccdda:KAFKA-10011
May 19, 2020
Merged

KAFKA-10011: Remove task id from lockedTaskDirectories during handleLostAll#8682
guozhangwang merged 9 commits intoapache:trunkfrom
abbccdda:KAFKA-10011

Conversation

@abbccdda
Copy link
Copy Markdown

As stated, we couldn't wait for handleRebalanceComplete in the case of handleLostAll, as we already closed the active task as dirty, and could potentially require its offset in the next thread.runOnce call.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@abbccdda
Copy link
Copy Markdown
Author

test this please

@vvcephei
Copy link
Copy Markdown
Contributor

Test this please

@ableegoldman
Copy link
Copy Markdown
Member

Instead of removing the tasks during handleLostAll, can we just clear the lockedTaskDirectories set at the end of releaseLockedUnassignedTaskDirectories? This set is only used to keep track of which task directories we only temporarily locked for the rebalance, so it makes sense that it should be empty outside of a rebalance.

@guozhangwang
Copy link
Copy Markdown
Contributor

I agree with @ableegoldman here, after the while (taskIdIterator.hasNext() loop we can see if there are still remaining tasks, and then log an WARN similar to the end of handleRevocation before clearing them:

if (!remainingPartitions.isEmpty()) {
            log.warn("The following partitions {} are missing from the task partitions. It could potentially " +
                         "due to race condition of consumer detecting the heartbeat failure, or the tasks " +
                         "have been cleaned up by the handleAssignment callback.", remainingPartitions);
        }

@ableegoldman
Copy link
Copy Markdown
Member

ableegoldman commented May 19, 2020

I also think we should reset/clear the set at the beginning of tryToLockAllNonEmptyTaskDirectories, so basically we're always dealing with the current set of actually-locked tasks and don't need to worry about removing them during handleLostAll or handleCorruption/Assignment, etc

@abbccdda
Copy link
Copy Markdown
Author

Will attempt Sophie's suggestion here

Copy link
Copy Markdown
Member

@ableegoldman ableegoldman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool this LGTM, thanks for the fix!

@guozhangwang
Copy link
Copy Markdown
Contributor

I think it is still better to log a warn at the end of releaseLockedUnassignedTaskDirectories if the map is not empty at the end, will add it before merging.

@ableegoldman
Copy link
Copy Markdown
Member

@guozhangwang Why would we warn? You mean if we have any tasks leftover? These are just tasks that are currently assigned

@guozhangwang
Copy link
Copy Markdown
Contributor

@guozhangwang Why would we warn? You mean if we have any tasks leftover? These are just tasks that are currently assigned

I think we would only have leftover tasks if they are closed due to exception during the rebalance.

@ableegoldman
Copy link
Copy Markdown
Member

Well, releaseLockedUnassignedTaskDirectories does exactly that -- it only releases the unassigned task directories (which we just lock in order to report the offset sums in the subscription info). So I would expect there to almost always be leftover tasks at the end of that method

@guozhangwang guozhangwang merged commit 76e0233 into apache:trunk May 19, 2020
Kvicii pushed a commit to Kvicii/kafka that referenced this pull request May 21, 2020
* 'trunk' of github.com:apache/kafka:
  MINOR: Increase gradle daemon’s heap size to 2g (apache#8700)
  KAFKA-9603: Do not turn on bulk loading for segmented stores on stand-by tasks (apache#8661)
  KAFKA-9859 / kafka-streams-application-reset tool doesn't take into account topics generated by KTable foreign key join operation (apache#8671)
  MINOR: Fix redundant typos in comments and javadocs (apache#8693)
  KAFKA-10010: Should make state store registration idempotent (apache#8681)
  KAFKA-10011: Remove task id from lockedTaskDirectories during handleLostAll (apache#8682)
  KAFKA-9992: Eliminate JavaConverters in EmbeddedKafkaCluster (apache#8673)
  KAFKA-6145: Add unit tests to verify fix of bug KAFKA-9173 (apache#8689)
  MINOR: Update stream documentation (apache#8622)
  MINOR: Small fixes in the documentation (apache#8623)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants