KAFKA-6647: Do note delete the lock file while holding the lock#8267
KAFKA-6647: Do note delete the lock file while holding the lock#8267guozhangwang merged 10 commits intoapache:trunkfrom
Conversation
| if (manualUserCall) { | ||
| throw e; | ||
| } | ||
| } catch (final OverlappingFileLockException | IOException e) { |
There was a problem hiding this comment.
This is a minor code clean up.
| throw exc; | ||
| } | ||
|
|
||
| if (rootFile.toPath().equals(path)) { |
There was a problem hiding this comment.
This is a bit tricky: for the root file, we only consider deleting it if there's no specified skipping sub-files; otherwise we never try to delete since it would doom with DirectoryNotEmpty.
There was a problem hiding this comment.
It would be good to avoid the unnecessary conversions in this method (rootFile.toPath, path.toFile). We can do some work at the start of the method to improve efficiency.
| private final Time time; | ||
| private final String appId; | ||
| private final File stateDir; | ||
| private final boolean createStateDirectory; |
There was a problem hiding this comment.
What do you think about renaming this, eg something like isStateful, hasStatefulTopology`, etc?
It's really confusing to reason about in its usage, for example why createStateDirectory => #lock returns true. You have to backtrack to where createStateDirectory is set to understand this. However, I think it's easy to reason about why isStateful => should create state directory.
Alternatively, what if we have a stateless version of the StateDirectory class that just stubs things where appropriate. Then we could get rid of this altogether. WDYT?
There was a problem hiding this comment.
That makes sense, I will do the renaming.
| * @return The list of all the existing local directories for stream tasks | ||
| */ | ||
| File[] listTaskDirectories() { | ||
| File[] lisAllTaskDirectories() { |
There was a problem hiding this comment.
| File[] lisAllTaskDirectories() { | |
| File[] listAllTaskDirectories() { |
|
@guozhangwang I opened a PR for something related some time ago: #4713 Does it overlap? |
Good call. My proposal is to skip trying to delete the lock file, which I think is essentially similar to yours (capture the exception and then skip). |
…-not-delete-lock-filet
|
I've finally able to re-produce the issue with NTFS using this example code: https://github.com/simplesteph/kafka-streams-course/blob/1.1.0/word-count/src/test/java/com/github/simplesteph/udemy/kafka/streams/WordCountAppTest.java And with my PR now it is proven fixed: I'm going to add more unit tests in my PR but the non-testing part should be ready to review now cc @mjsax @vvcephei |
|
Also cc @rodesai |
vvcephei
left a comment
There was a problem hiding this comment.
Thanks, Guozhang!
Just a couple of questions, and I think Sophie’s question is a good one, too.
Otherwise, lgtm!
|
|
||
| @Test | ||
| public void shouldListAllTaskDirectories() { | ||
| public void shouldNotListNonEmptyTaskDirectories() { |
There was a problem hiding this comment.
Is the double negative intentional here?
There was a problem hiding this comment.
Good call, will change.
| } catch (final IOException e) { | ||
| log.error("{} Failed to release the state directory lock.", logPrefix()); | ||
|
|
||
| // for manual user call, stream threads are not running so it is safe to delete |
There was a problem hiding this comment.
This comment makes me wonder why we even bother with locking at all for manual calls. If we know there’s no app running, why not just delete the whole state directory and not bother with locks?
There was a problem hiding this comment.
I thought about that too, and tried it out, it's just that based on manualUserCall to decide whether lock / unlock the block, we ended up with either much more code duplicates or clumsy and finer-grained if else condition, neither of which I like. On the other hand, since streams.cleanUp is usually a one-time thing compared with the periodic internal clean I think having two delete calls are okay to same some code duplication here.
|
@vvcephei @ableegoldman addressed your comments, please take another look. |
ableegoldman
left a comment
There was a problem hiding this comment.
We just merged PR 8246, can you rebase and make the TaskManager#tryToLockAllTaskDirectoriesleverage the newlistNonEmptyTaskDirectories` method?
Otherwise LGTM
|
@ableegoldman Just clarifying on the Also I renamed |
1. Inside StateDirectory#cleanRemovedTasks, skip deleting the lock file (and hence the parent directory) until releasing the lock. And after the lock is released only go ahead and delete the parent directory if manualUserCall == true. That is, this is triggered from KafkaStreams#cleanUp and users are responsible to make sure that Streams instance is not started and hence there are no other threads trying to grab that lock. 2. As a result, during scheduled cleanup the corresponding task.dir would not be empty but be left with only the lock file, so effectively we still achieve the goal of releasing disk spaces. For callers of listTaskDirectories like KIP-441 (cc @ableegoldman to take a look) I've introduced a new listNonEmptyTaskDirectories which excludes such dummy task.dirs with only the lock file left. 3. Also fixed KAFKA-8999 along the way to expose the exception while traversing the directory. Reviewers: A. Sophie Blee-Goldman <sophie@confluent.io>, John Roesler <vvcephei@apache.org>
|
Merged to trunk and 2.5 after tests green locally. |
* apache-github/trunk: (39 commits) MINOR: cleanup and add tests to StateDirectoryTest (apache#8304) HOTFIX: StateDirectoryTest should use Set instead of List (apache#8305) MINOR: Fix build and JavaDoc warnings (apache#8291) MINOR: Fix kafka.server.RequestQuotaTest missing new ApiKeys. (apache#8302) KAFKA-9712: Catch and handle exception thrown by reflections scanner (apache#8289) KAFKA-9670; Reduce allocations in Metadata Response preparation (apache#8236) MINOR: fix Scala 2.13 build error introduced in apache#8083 (apache#8301) MINOR: enforce non-negative invariant for checkpointed offsets (apache#8297) MINOR: comment apikey types in generated switch (apache#8201) MINOR: Fix typo in CreateTopicsResponse.json (apache#8300) KIP-546: Implement describeClientQuotas and alterClientQuotas. (apache#8083) KAFKA-6647: Do note delete the lock file while holding the lock (apache#8267) KAFKA-9677: Fix consumer fetch with small consume bandwidth quotas (apache#8290) KAFKA-9533: Fix JavaDocs of KStream.transformValues (apache#8298) MINOR: reuse pseudo-topic in FKJoin (apache#8296) KAFKA-6145: Pt 2. Include offset sums in subscription (apache#8246) KAFKA-9714; Eliminate unused reference to IBP in `TransactionStateManager` (apache#8293) KAFKA-9718; Don't log passwords for AlterConfigs in request logs (apache#8294) KAFKA-8768: DeleteRecords request/response automated protocol (apache#7957) KAFKA-9685: Solve Set concatenation perf issue in AclAuthorizer ...
|
Thank you so much for this fix!! Any idea when this will get into the next release? or how I could integrate the change sooner? |
|
It is merged into 2.5 and branch, so it will be at least in 2.5.1 and 2.6.0; if 2.5.0 RC0 gets vetoed and a new one is cut, then it would also be in 2.5.0 as well. |


Inside StateDirectory#cleanRemovedTasks, skip deleting the lock file (and hence the parent directory) until releasing the lock. And after the lock is released only go ahead and delete the parent directory if
manualUserCall == true. That is, this is triggered fromKafkaStreams#cleanUpand users are responsible to make sure that Streams instance is not started and hence there are no other threads trying to grab that lock.As a result, during scheduled cleanup the corresponding task.dir would not be empty but be left with only the lock file, so effectively we still achieve the goal of releasing disk spaces. For callers of
listTaskDirectorieslike KIP-441 (cc @ableegoldman to take a look) I've introduced a newlistNonEmptyTaskDirectorieswhich excludes such dummy task.dirs with only the lock file left.Also fixed KAFKA-8999 along the way to expose the exception while traversing the directory.
Committer Checklist (excluded from commit message)