Skip to content

MINOR: fix KafkaStreams#cleanUp(): should throw and fail for lock conflict#4713

Closed
mjsax wants to merge 4 commits intoapache:trunkfrom
mjsax:minor-improve-cleanup-test
Closed

MINOR: fix KafkaStreams#cleanUp(): should throw and fail for lock conflict#4713
mjsax wants to merge 4 commits intoapache:trunkfrom
mjsax:minor-improve-cleanup-test

Conversation

@mjsax
Copy link
Copy Markdown
Member

@mjsax mjsax commented Mar 14, 2018

No description provided.

@mjsax mjsax added the streams label Mar 14, 2018
@mjsax mjsax requested review from dguy and guozhangwang March 14, 2018 21:14
@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Mar 14, 2018

\cc @bbejeck @vvcephei @tedyu @gbloggs

While digging into KAFKA-6647 if found this issues. Related to #4702

@mjsax mjsax changed the title MINOR: fix KafkaStreams#cleanUp() to should throw and fail for lock conflict MINOR: fix KafkaStreams#cleanUp(): should throw and fail for lock conflict Mar 14, 2018
@tedyu
Copy link
Copy Markdown
Contributor

tedyu commented Mar 14, 2018

lgtm

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @mjsax, LGTM

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add two more positive tests and see if we would reproduce the issue reported in KAFKA-6647?

  1. in thread A: create the file, open the channel, grab the lock, and then release the lock, and close the channel; and then call cleanup which should do the above again, and we should not expect any exception.

  2. in thread A: create the file, open the channel, grab the lock, and then release the lock, and close the channel; and then in a different thread B, call cleanup which should do the above again, and we should not expect any exception.

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

THanks @mjsax , the code change makes sense to me. I left a comment about whether we can reproduce the issue in KAFKA-6647.

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Mar 15, 2018

Added more tests -- please have a detailed look if the tests cover all scenarios we want to cover and that I did not mess up the setup. Locally, the test passed.

@gbloggs Could you check out this branch and run the test in your environment to see if they pass or fail there? It's hard to fix as long as we can't reproduce.

Copy link
Copy Markdown
Contributor

@vvcephei vvcephei left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good to me, but I do agree with getting @gbloggs to verify it works as expected.

@gbloggs
Copy link
Copy Markdown

gbloggs commented Mar 19, 2018

I've tried running the changes in this PR and it seems to have made things worse. Streams will not start at all any more if KafKaStreams.cleanUp() is called before start(). Getting the following error:

Caused by: org.apache.kafka.streams.errors.StreamsException: java.nio.file.DirectoryNotEmptyException: C:\tmp\kafka-streams\depot-query-stream-processor.local-dev\global
at org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:262)
at org.apache.kafka.streams.KafkaStreams.cleanUp(KafkaStreams.java:931)
...
Caused by: java.nio.file.DirectoryNotEmptyException: C:\tmp\kafka-streams\depot-query-stream-processor.local-dev\global
at sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:266)
at sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
at java.nio.file.Files.delete(Files.java:1126)
at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:651)
at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:634)
at java.nio.file.Files.walkFileTree(Files.java:2688)
at java.nio.file.Files.walkFileTree(Files.java:2742)
at org.apache.kafka.common.utils.Utils.delete(Utils.java:634)
at org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:255)
... 20 more

If I add cleanUp() to the shutdownHook I get:
Exception in thread "Thread-24" org.apache.kafka.streams.errors.StreamsException: java.nio.file.DirectoryNotEmptyException: \tmp\kafka-streams\depot-query-stream-processor.local-dev\1_0
at org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:250)
at org.apache.kafka.streams.KafkaStreams.cleanUp(KafkaStreams.java:931)
at <foo.bar>.KafkaStreamsConfiguration$StreamCloser.run(KafkaStreamsConfiguration.java:160)
Caused by: java.nio.file.DirectoryNotEmptyException: \tmp\kafka-streams\depot-query-stream-processor.local-dev\1_0
at sun.nio.fs.WindowsFileSystemProvider.implDelete(WindowsFileSystemProvider.java:266)
at sun.nio.fs.AbstractFileSystemProvider.delete(AbstractFileSystemProvider.java:103)
at java.nio.file.Files.delete(Files.java:1126)
at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:651)
at org.apache.kafka.common.utils.Utils$1.postVisitDirectory(Utils.java:634)
at java.nio.file.Files.walkFileTree(Files.java:2688)
at java.nio.file.Files.walkFileTree(Files.java:2742)
at org.apache.kafka.common.utils.Utils.delete(Utils.java:634)
at org.apache.kafka.streams.processor.internals.StateDirectory.cleanRemovedTasks(StateDirectory.java:319)
at org.apache.kafka.streams.processor.internals.StateDirectory.clean(StateDirectory.java:247)
... 2 more

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Mar 19, 2018

Thanks for looking into that. It actually make sense that it is "worse" now because this PR fixes a swallowed exception, and also add the check for the global state directory.

The good thing is, what we can reproduce the issue now reliably. In the impl, we still hold the .lock file when we try to delete the directory. This works fine on Linux. It seems that Windows does not allow this. Maybe, we should move the lock-file from the state directory into the top-level directory and use a different lock-file name for each task to fix this.

What do others think? If we agree that this should be the right fix, I can add it to this PR.

@tedyu
Copy link
Copy Markdown
Contributor

tedyu commented Mar 19, 2018

Isn't that what I did in #4702 ?

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Mar 19, 2018

@tedyu Yes.

@gbloggs Could try to cherry-pick @tedyu PR into you test env to see if it fixes the issues?

@guozhangwang
Copy link
Copy Markdown
Contributor

Maybe, we should move the lock-file from the state directory into the top-level directory and use a different lock-file name for each task to fix this.

I left a comment on the ticket about thinking carefully for moving the file lock:

regarding to your PR, changing the state directory structure such as lifting the lock file on level up may not be a backward compatible change, since if users upgrade their streams library version to the one that includes this change their code will be broken.

Because it is possible that two threads will conflict as they are holding on two different threads.

A slightly different and more complicated approach if we want to keep the lock file in the same directory, is to delete other file / folders under that task id except the lock file:

/<state.dir>/<application.id>/<task.id>/.lock   // keep this file, delete all others.
/<state.dir>/<application.id>/<task.id>/.checkpoint 
/<state.dir>/<application.id>/<task.id>/storeName1/
/<state.dir>/<application.id>/<task.id>/storeName2/
/<state.dir>/<application.id>/<task.id>/rocksdb/storeName3/
/<state.dir>/<application.id>/<task.id>/rocksdb/storeName4/

And depending on manualUserCall, we can then delete the file and the folder after releasing the lock safely if that flag is true; or write to the lock file some bytes indicating that this task id directory is deleted so that in the cleanup() call it can be removed eventually. In other words, only in KafkaStreams.cleanup call we will try to complete wipe out the task id directory, while for the background cleanup thread we will only focus on freeing up spaces.

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Mar 19, 2018

That's a good point @guozhangwang -- I originally thought, that this cannot happen because different instances need to be configured with different state directories -- but this is only true if a global store is used -- if no global store is use, the state directory can be shared and I agree that it would be problematic as you pointed out.

@gbloggs
Copy link
Copy Markdown

gbloggs commented Mar 20, 2018

I am having issues building Kafka clean from master. Keep getting:

:streams:test-utils:build
:jar_core_2_11
Building project 'core' with Scala version 2.11.12
:jar_core_2_11 FAILED

I have tried many different methods. I have IntelliJ with Scala but that fails. I am using Gradle 4.6, Scala 2.12.4, Java 8 on windows with gitbash/mingw64. I have tried setting SCALA_VERSION and tried updating the version in both kafka-run-class.bat & .sh and gradle.properties to 2.12.4. I was hoping to add some tests but cant get anything built. Anybody know how to resolve this?

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Mar 20, 2018

Not sure... Can you build on command line? Did you try to set ./gradlew -PscalaVersion=2.12.4 jar ?

@gbloggs
Copy link
Copy Markdown

gbloggs commented Mar 20, 2018

I tried all the options including the -P. All on command line via git bash or the terminal in intellij. 😢

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Sep 7, 2018

@guozhangwang @bbejeck @vvcephei @tedyu @gbloggs

Was wondering what the status of this is? Do we wait for @tedyu to finish #4702 ?

@mjsax mjsax force-pushed the minor-improve-cleanup-test branch from 32d3ee1 to d9e12ed Compare November 23, 2018 21:16
@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Nov 23, 2018

I just rebased this to make sure we can eventually merge it. Blocked via #5650 that should get merged first.

@mjsax mjsax force-pushed the minor-improve-cleanup-test branch from d9e12ed to f27f524 Compare August 21, 2019 20:51
@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Mar 17, 2020

@guozhangwang Given that #8267 is merged, do we still need this PR? Or can we drop it?

@guozhangwang
Copy link
Copy Markdown
Contributor

@mjsax I think we can drop it now.

@mjsax mjsax closed this Mar 17, 2020
@mjsax mjsax deleted the minor-improve-cleanup-test branch March 17, 2020 03:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants