Skip to content

KAFKA-7367: Streams should not create state store directories unless they are needed#5696

Merged
mjsax merged 7 commits intoapache:trunkfrom
kamalcph:KAFKA-7367
Nov 27, 2018
Merged

KAFKA-7367: Streams should not create state store directories unless they are needed#5696
mjsax merged 7 commits intoapache:trunkfrom
kamalcph:KAFKA-7367

Conversation

@kamalcph
Copy link
Copy Markdown
Contributor

More detailed description of your change,
if necessary. The PR title and PR message become
the squashed commit message, so use a separate
comment to ping reviewers.

Summary of testing strategy (including rationale)
for the feature or bug fix. Unit and/or integration
tests are expected for any behaviour change and
system tests should be considered for larger changes.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@kamalcph
Copy link
Copy Markdown
Contributor Author

@vvcephei @mjsax
Could you please review this PR ?

@kamalcph
Copy link
Copy Markdown
Contributor Author

kamalcph commented Oct 1, 2018

retest this please

Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. And sorry for late review.

I think we can simplify the PR, if we put all the logic into StateDirectory itself.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this? Might be simpler to update StateDirectory to not acquire the lock() but just return true for this case?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assume that there are multiple tasks in a topology in which some of them have persistent state stores and others have in-memory stores. In this case, StateDirectory don't have any knowledge whether to create the TaskId directory or not.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I am missing something, but if there is no stateful task, we don't create the StateDirectory -- and task directories are nested within StateDirectory:

StateDirectory == /<state.dir/<application.id>/

overall:
/<state.dir>/<application.id>/<task.id>/<store>

From my understanding, the ticket addresses the case that there is no disk. If there is disk, and there is one stateful task, we can also create task directories from stateless task -- I agree that it would be ok to not create them, but this increase code complexity and the gain seems to be small?

Thoughts?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax I was thinking the same thing. I understand the goal of not creating the directories for stateless tasks, but I'm not sure the increase in code buys much.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the code to create task directory for all the tasks even if any one of the task has persistent store.

@mjsax mjsax added the streams label Oct 4, 2018
Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, @kamalcph for the PR. Left one minor comment.

For the integration test can we add a test where a stateful task migrates to an instance that didn't have a stateful task on startup?

@kamalcph
Copy link
Copy Markdown
Contributor Author

kamalcph commented Oct 5, 2018

@mjsax @bbejeck
Thanks for reviewing the PR. I've updated it as per your comments. Please take another look.

@kamalcph
Copy link
Copy Markdown
Contributor Author

kamalcph commented Oct 5, 2018

For the integration test can we add a test where a stateful task migrates to an instance that didn't have a stateful task on startup?

I've done a manual test by running WordCountDemo program in 2 instances on my machine by configuring different state directories and having only one partition for input & output topic. If you point me to an existing taskMigration test case for reference, I'm ready to add one.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Oct 5, 2018

@bbejeck Would it be sufficient, to run a regular integration test with only stateless tasks and check for the non-existence of the state directory? What do we gain if we test task migration scenario? Seems I am missing something?

@kamalcph
Copy link
Copy Markdown
Contributor Author

kamalcph commented Oct 5, 2018

Test cases are passing in my local machine. gradle :streams:test. I couldn't find the test case which is failing.

Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for updating the PR. Couple of follow up comments.

Comment thread streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java Outdated
Comment thread streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java Outdated
@kamalcph
Copy link
Copy Markdown
Contributor Author

kamalcph commented Oct 9, 2018

@mjsax
Addressed most of your comments. Please take one more look.

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Oct 9, 2018

@bbejeck Would it be sufficient, to run a regular integration test with only stateless tasks and check for the non-existence of the state directory? What do we gain if we test task migration scenario? Seems I am missing something?

Yes I'll agree with you, maybe I was being overly conservative. But my point was if a stateful task migrates to to an instance with only stateless tasks previously, we can ensure the directories are created.

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kamalcph I took another pass overall looks good, just two minor comments

Comment thread streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java Outdated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: maybe change the name as we don't use the EmbeddedBroker anymore? I don't have any good suggestions ATM though.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to StatelessTopologyDiskAccessTest and moved to package org.apache.kafka.streams.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed to TopologyDiskAccessTest to convey the test information clearly.

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Oct 9, 2018

The failures are related, both Java 8 and Java 10 have streams:test-utils:checkstyleTest I've described this in a little more detail in my comments above.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Oct 9, 2018

But my point was if a stateful task migrates to to an instance with only stateless tasks previously, we can ensure the directories are created.

That is a very good point @bbejeck -- I think we should be fine though. In KafkaStreams constructor, the state directory is create if there is at least one stateful sub-topology. Thus, the directory is created independent of task assignment (right?). Thus, internal flag createStateDirectory is set to true for all KafkaStreams instances and creating the task directories should work correctly.

Does this sound correct?

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Oct 9, 2018

I got a thumbs up from @bbejeck -- thus, if I interpret Bill's reaction, we should rewrite the tests to use TopologyTestDriver instead of EmbeddedKafka to simplify the code and reduce test runtime.

Can you update the PR accordingly @kamalcph ?

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Oct 9, 2018

That is a very good point @bbejeck -- I think we should be fine though. In KafkaStreams constructor, the state directory is create if there is at least one stateful sub-topology. Thus, the directory is created independent of task assignment (right?). Thus, internal flag createStateDirectory is set to true for all KafkaStreams instances and creating the task directories should work correctly.

Sorry about not sharing my thoughts in text.

Yes after going over the PR again, I agree with that the createStateDirectory flag should be sufficient for creating state directories for any stateful task in a given sub-topology. Unless I'm mistaken, I think @kamalcph has already updated the tests to use the TopologyTestDriver over the EmbeddedKafka.

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Oct 9, 2018

failure unrelated

retest this please

@kamalcph
Copy link
Copy Markdown
Contributor Author

retest this please

@kamalcph
Copy link
Copy Markdown
Contributor Author

@mjsax @bbejeck
Resolved the conflicts against trunk. Test failures are un-related.

@kamalcph
Copy link
Copy Markdown
Contributor Author

@mjsax @bbejeck @vvcephei
Please take a look.

Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kamalcph Sorry for the long wait... It was a little crazy the last time, and I did not have time earlier to review.

Overall, looks very good already. Couple of minor comments.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should prefix this with TestUtils.IO_TMP_DIR ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed it.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a test into ProcessorTopologyTest for this new method

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test case added.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above: we should add a test for this method

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test case added.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add a new test to KafkaStreamsTest to check that no directory is created for this case

Copy link
Copy Markdown
Contributor Author

@kamalcph kamalcph Nov 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the above case, TopologyDiskAccessTest class is added. KafkaStreamsTest tests only the KafkaStreams logic (It doesn't create any topology which is required for the above case).

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KafkaStreamsTest tests only the KafkaStreams logic

Well, because KafkaStreams is responsible to create/or-not-crate the directory, this is part of KafkaStreams logic, isn't it? Thus, IMHO we should test this behavior in KafkaStreamsTest. Nothing prevents you create any topology in the test for this.

Also, TopologyDisAccessTest does not use KafkaStreams but TopologyTestDriver and thus, does not cover this code path.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, moved the test cases to KafkaStreamsTest.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do we gain by this test?

Copy link
Copy Markdown
Contributor Author

@kamalcph kamalcph Nov 4, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To ensure that state directory should not gets created when using the stateless / in-memory topology with the PAPI / DSL APIs.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be part of KafkaStreamsTest (cf. my other comment)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed TopologyDiskAccessTest class.

@kamalcph
Copy link
Copy Markdown
Contributor Author

kamalcph commented Nov 4, 2018

@mjsax
Thanks for your patience in reviewing this PR. I've addressed your comments. Please take one more look.

@kamalcph
Copy link
Copy Markdown
Contributor Author

kamalcph commented Nov 5, 2018

@mjsax
Addressed your second review comments. Please take another look.

@kamalcph
Copy link
Copy Markdown
Contributor Author

kamalcph commented Nov 5, 2018

retest this please

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should split this into multiple smaller tests, each testing one scenario:

  • statelessTopologyShouldNotHavePersistentLocalStore()
  • inMemoryStoreShouldNotResultInPersistentLocalStore()
  • persistenLocalStoreShouldBeDetected()

Or similar test method names that explain what the test is doing / expecting / testing.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the test case.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the test case.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to above

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: X -> anyAppId

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the cleanup!

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this case covered by statelessPAPITopologyShouldNotCreateStateDirectory ? DSL compiles down into a Topology anyway.

Similar below.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DSL tests are removed.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: statelessPAPITopologyShouldNotCreateStateDirectory -> statelessTopologyShouldNotCreateStateDirectory

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

renamed the test cases.

Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call for second review @guozhangwang @bbejeck @vvcephei

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Nov 7, 2018

Couple for Streams test failures. Do we have tickets for those flaky tests?

java.lang.AssertionError: Condition not met within timeout 30000. Did not receive all [KeyValue(A, 3), KeyValue(B, 3), KeyValue(C, 3)] records from topic outputTopic_0
	at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:346)
	at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:323)
	at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:444)
	at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(IntegrationTestUtils.java:411)
	at org.apache.kafka.streams.integration.RepartitionOptimizingIntegrationTest.runIntegrationTest(RepartitionOptimizingIntegrationTest.java:198)
	at org.apache.kafka.streams.integration.RepartitionOptimizingIntegrationTest.shouldSendCorrectRecords_OPTIMIZED(RepartitionOptimizingIntegrationTest.java:119)
java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TopicExistsException: Topic 'outputTopic_0' already exists.
	at org.apache.kafka.streams.integration.utils.KafkaEmbedded.createTopic(KafkaEmbedded.java:185)
	at org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.createTopic(EmbeddedKafkaCluster.java:196)
	at org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.createTopics(EmbeddedKafkaCluster.java:160)
	at org.apache.kafka.streams.integration.RepartitionOptimizingIntegrationTest.setUp(RepartitionOptimizingIntegrationTest.java:103)
java.lang.AssertionError: Condition not met within timeout 30000. Topics not deleted after 30000 milli seconds.
	at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:346)
	at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:323)
	at org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteAllTopicsAndWait(EmbeddedKafkaCluster.java:282)
	at org.apache.kafka.streams.integration.RepartitionWithMergeOptimizingIntegrationTest.tearDown(RepartitionWithMergeOptimizingIntegrationTest.java:101)
java.lang.AssertionError: Condition not met within timeout 30000. Topics not deleted after 30000 milli seconds.
	at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:346)
	at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:323)
	at org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteAllTopicsAndWait(EmbeddedKafkaCluster.java:282)
	at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.cleanStateAfterTest(IntegrationTestUtils.java:134)
	at org.apache.kafka.streams.integration.SuppressionIntegrationTest.shouldSuppressIntermediateEventsWithRecordLimit(SuppressionIntegrationTest.java:279)
java.lang.AssertionError: Condition not met within timeout 10000. Test consumer group abstract-reset-integration-test still active even after waiting 10000 ms.
	at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:346)
	at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:323)
	at org.apache.kafka.streams.integration.AbstractResetIntegrationTest.prepareTest(AbstractResetIntegrationTest.java:195)
	at org.apache.kafka.streams.integration.ResetIntegrationTest.before(ResetIntegrationTest.java:62)
java.lang.AssertionError: Condition not met within timeout 10000. Test consumer group abstract-reset-integration-test still active even after waiting 10000 ms.
	at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:346)
	at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:323)
	at org.apache.kafka.streams.integration.AbstractResetIntegrationTest.prepareTest(AbstractResetIntegrationTest.java:195)
	at org.apache.kafka.streams.integration.ResetIntegrationWithSslTest.before(ResetIntegrationWithSslTest.java:76)

@kamalcph
Copy link
Copy Markdown
Contributor Author

@vvcephei @bbejeck @guozhangwang
Please take a look when you get chance.

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Nov 21, 2018

@kamalcph Could you rebase this PR to resolve the merge conflicts?

@kamalcph
Copy link
Copy Markdown
Contributor Author

@mjsax
Resolved the conflicts with trunk.

After rebase, couple of test cases are failing:

Class org.apache.kafka.streams.KafkaStreamsTest

inMemoryStatefulTopologyShouldNotCreateStateDirectory

org.apache.kafka.streams.errors.StreamsException: Failed to read checkpoints for global state globalStores
	at org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl.initialize(GlobalStateManagerImpl.java:115)
	at org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.initialize(GlobalStateUpdateTask.java:61)
	at org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.initialize(GlobalStreamThread.java:229)
	at org.apache.kafka.streams.processor.internals.GlobalStreamThread.initialize(GlobalStreamThread.java:345)
	at org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:270)
Caused by: java.nio.file.FileSystemException: /var/folders/tt/7fd821cd1l9dsgwf75lrrn5m0000gp/T/kafka-4l8dk/appId/global/.checkpoint: Not a directory
	at sun.nio.fs.UnixException.translateToIOException(UnixException.java:91)
	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
	at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
	at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
	at java.nio.file.Files.newByteChannel(Files.java:361)
	at java.nio.file.Files.newByteChannel(Files.java:407)
	at java.nio.file.spi.FileSystemProvider.newInputStream(FileSystemProvider.java:384)
	at java.nio.file.Files.newInputStream(Files.java:152)
	at java.nio.file.Files.newBufferedReader(Files.java:2784)
	at java.nio.file.Files.newBufferedReader(Files.java:2816)
	at org.apache.kafka.streams.state.internals.OffsetCheckpoint.read(OffsetCheckpoint.java:127)
	at org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl.initialize(GlobalStateManagerImpl.java:108)
	... 4 more

This one, I'll fix. This may be due to the recent changes in trunk.

statelessTopologyShouldNotCreateStateDirectory

java.lang.AssertionError: Couldn't read the state directory : /var/folders/tt/7fd821cd1l9dsgwf75lrrn5m0000gp/T/kafka-m7Wkm
	at org.junit.Assert.fail(Assert.java:88)
	at org.apache.kafka.streams.KafkaStreamsTest.startStreamsAndCheckDirExists(KafkaStreamsTest.java:724)
	at org.apache.kafka.streams.KafkaStreamsTest.statelessTopologyShouldNotCreateStateDirectory(KafkaStreamsTest.java:613)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
	at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
	at org.junit.rules.RunRules.evaluate(RunRules.java:20)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
	at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
	at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66)
	at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
	at sun.reflect.GeneratedMethodAccessor14.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
	at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
	at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
	at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:117)
	at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:155)
	at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:137)
	at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
	at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
	at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
	at java.lang.Thread.run(Thread.java:748)

This case passes when running the test in standalone mode. But, running in test-suite it always fails with InterruptedException. I've tried to debug the issue by pre-creating the directory but the case still fails. Can you help me out to resolve this case ?

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Nov 21, 2018

The build is green, thus I am a little confused about your statement. I also checked out your PR and run KafkaStreamsTest locally an it passed... Can you elaborate?

Copy link
Copy Markdown
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good to me, once the testing is squared away I'll review again.
I'll also try running tests locally and see what I get

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Nov 21, 2018

I also checked out the PR and ran ./gradlew streams:clean streams:test locally and got a clean build

@kamalcph
Copy link
Copy Markdown
Contributor Author

I'm running the command gradle streams:clean streams:test-utils:clean streams:test streams:test-utils:test. Yesterday, consistently the above 2 test cases were failed.

Today ran the same command thrice. The following test case:
inMemoryStatefulTopologyShouldNotCreateStateDirectory - failed only once.
statelessTopologyShouldNotCreateStateDirectory - always fails in my machine (when running in suite)

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Nov 22, 2018

That's interesting. Do you develop on Mac/Linux/Windows?

@kamalcph
Copy link
Copy Markdown
Contributor Author

MAC.

$ gradle -version


Gradle 4.10

Build time: 2018-08-27 18:35:06 UTC
Revision: ee3751ed9f2034effc1f0072c2b2ee74b5dce67d

Kotlin DSL: 1.0-rc-3
Kotlin: 1.2.60
Groovy: 2.4.15
Ant: Apache Ant(TM) version 1.9.11 compiled on March 23 2018
JVM: 1.8.0_152 (Oracle Corporation 25.152-b16)
OS: Mac OS X 10.13.6 x86_64

$ java -version
java version "1.8.0_152"
Java(TM) SE Runtime Environment (build 1.8.0_152-b16)
Java HotSpot(TM) 64-Bit Server VM (build 25.152-b16, mixed mode)

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Nov 23, 2018

My environment is almost the same. But cannot reproduce... I assume it's only an issue on your machine. Not sure why. Thus, I guess we can safely merge this? Thoughts @bbejeck?

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Nov 27, 2018

My environment is almost the same. But cannot reproduce... I assume it's only an issue on your machine. Not sure why. Thus, I guess we can safely merge this? Thoughts @bbejeck?

Ok when I tried this command ./gradlew streams:clean streams:test-utils:clean streams:test streams:test-utils:test I did get a failure but when I run these commands separately i.e

  • ./gradlew streams:test-utils:clean streams:test-utils:test
  • ./gradlew streams:clean streams:test

they both pass. I think this is ok, as when I have multiple modules to test, I run the tests one at a time. The Jenkins build does the same I believe so I think it's ok to merge.

@bbejeck
Copy link
Copy Markdown
Member

bbejeck commented Nov 27, 2018

@mjsax ^^ forgot to add you to the previous comment.

Copy link
Copy Markdown
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@mjsax mjsax merged commit de24d4a into apache:trunk Nov 27, 2018
@mjsax
Copy link
Copy Markdown
Member

mjsax commented Nov 27, 2018

Merged to trunk. Thanks a lot for the PR @kamalcph!

@kamalcph kamalcph deleted the KAFKA-7367 branch November 28, 2018 05:38
pengxiaolong pushed a commit to pengxiaolong/kafka that referenced this pull request Jun 14, 2019
…they are needed (apache#5696)

* KAFKA-7367: Ensure stateless topologies don't require disk access

* KAFKA-7367: Streams should not create state store directories unless they are needed.

* Addressed the review comments.

* Addressed the review-2 comments.

* Fixed FileAlreadyExistsException

* Addressed the review-3 comments.

* Resolved the conflicts.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants