Skip to content

KAFKA-4692: Make testNo thread safe#2761

Closed
original-brownbear wants to merge 1 commit intoapache:trunkfrom
original-brownbear:KAFKA-4692
Closed

KAFKA-4692: Make testNo thread safe#2761
original-brownbear wants to merge 1 commit intoapache:trunkfrom
original-brownbear:KAFKA-4692

Conversation

@original-brownbear
Copy link
Copy Markdown
Member

https://issues.apache.org/jira/browse/KAFKA-4692 should be resolved by this.

The problem here (or at least one problem, causing trouble) appears to have been that the testNo field is not thread-safe the way it is used here (volatile doesn't help anything really) to generate a unique test number (for creating non-conflicting topic names).
If two instances of this class are set up inside the same fork and testNo++; is run in one just as the other runs e.g.

    private void createTopics() throws InterruptedException {
        streamOneInput = "stream-one-" + testNo;
        outputTopic = "output-" + testNo;
        CLUSTER.createTopic(streamOneInput, 3, 1);
        CLUSTER.createTopic(outputTopic);
    }

then both will have the same value for the topic names while sharing the same static CLUSTER field too.
I fixed this by simply making the actually used testNo an instance variable that is derived from an increasing static counter for each test in an atomic way. I think this should resolve any naming collisions here.

@original-brownbear
Copy link
Copy Markdown
Member Author

@guozhangwang ping :)

@asfbot
Copy link
Copy Markdown

asfbot commented Mar 29, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2512/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Mar 29, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2508/
Test PASSed (JDK 8 and Scala 2.12).

@guozhangwang
Copy link
Copy Markdown
Contributor

retest this please.

@guozhangwang
Copy link
Copy Markdown
Contributor

@original-brownbear thanks for the patch!

@mjsax could you take a look?

@ijuma
Copy link
Copy Markdown
Member

ijuma commented Mar 29, 2017

Why are multiple threads accessing that field though? In theory, there should be a single thread.

@asfbot
Copy link
Copy Markdown

asfbot commented Mar 29, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2508/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Mar 29, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/2512/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Mar 29, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/2516/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Mar 29, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2512/
Test FAILed (JDK 8 and Scala 2.12).

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Mar 30, 2017

I agree with @ijuma ... @original-brownbear Can you elaborate?

@original-brownbear
Copy link
Copy Markdown
Member Author

@ijuma @mjsax phew after some research I agree, the doc for Gradle and JUnit clearly indicates sequential execution in each fork. So in theory, I agree.

In practice, I'm not so sure. I could see naming collisions for the topic names (will edit in the exact error here later, don't have it handy right now). Also what I can see if I turn the logging up is this exception thrown in the Gradle runners repeatedly:

java.lang.UnsupportedOperationException: Cannot nest operations in the same thread. Each nested operation must run in its own thread.
	at org.gradle.internal.operations.DefaultBuildOperationWorkerRegistry.doStartOperation(DefaultBuildOperationWorkerRegistry.java:65)
	at org.gradle.internal.operations.DefaultBuildOperationWorkerRegistry.access$400(DefaultBuildOperationWorkerRegistry.java:30)
	at org.gradle.internal.operations.DefaultBuildOperationWorkerRegistry$DefaultOperation.operationStart(DefaultBuildOperationWorkerRegistry.java:163)
	at org.gradle.api.internal.tasks.testing.worker.ForkingTestClassProcessor.processTestClass(ForkingTestClassProcessor.java:68)
	at org.gradle.api.internal.tasks.testing.processors.RestartEveryNTestClassProcessor.processTestClass(RestartEveryNTestClassProcessor.java:47)
	at sun.reflect.GeneratedMethodAccessor118.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
	at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
	at org.gradle.internal.dispatch.FailureHandlingDispatch.dispatch(FailureHandlingDispatch.java:29)
	at org.gradle.internal.dispatch.AsyncDispatch.dispatchMessages(AsyncDispatch.java:132)
	at org.gradle.internal.dispatch.AsyncDispatch.access$000(AsyncDispatch.java:33)
	at org.gradle.internal.dispatch.AsyncDispatch$1.run(AsyncDispatch.java:72)
	at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:54)
	at org.gradle.internal.concurrent.StoppableExecutorImpl$1.run(StoppableExecutorImpl.java:40)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

... not sure if this is related. Will try to research this later today.

Also, and I know that only having a sample size of ~30 runs each with 2 ^= 1/15 failing in trunk and none with this fix is pretty small, but so far it appears to work. I'll try to instrument threads and process id in the before method and try to get more "proof" later :)

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Mar 30, 2017

I was looking into this, and I am not sure if reusing topic might be the root case. I would suggest, to delete and recreate user topics for each test. Note, that topic create/delete happens async so you need to add a check to avoid race conditions (cf. #2757)

@original-brownbear
Copy link
Copy Markdown
Member Author

original-brownbear commented Mar 30, 2017

@mjsax good timing, fully agree! I'm on the wrong path here. I tried this original-brownbear@ceaed37 in a loop today and couldn't get this to fail => disproved my own approach grml :) This will need a fix like you say too I think. I'll add a separate PR for that, this one just got confusing.
Sorry for wasting time here, I mistakenly assumed JUnit+Gradle to work similar to Surefire when it didn't!

@original-brownbear original-brownbear deleted the KAFKA-4692 branch March 30, 2017 17:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants