KAFKA-12195 Fix synchronization issue happening in KafkaStreams#9887
KAFKA-12195 Fix synchronization issue happening in KafkaStreams#9887chia7712 merged 8 commits intoapache:trunkfrom
Conversation
…ted to flaky AdjustStreamThreadCountTest)
| resizeThreadCache(cacheSizePerThread); | ||
| return Optional.of(streamThread.getName()); | ||
| } | ||
| threads.remove(streamThread); |
There was a problem hiding this comment.
Removing element from collection when looping it is error-prone so I rewrite it by iterator.
| @@ -148,7 +146,6 @@ public void shouldAddAndRemoveThreads() throws InterruptedException { | |||
| one.start(); | |||
| latch.await(30, TimeUnit.SECONDS); | |||
| assertThat(kafkaStreams.localThreadsMetadata().size(), equalTo(oldThreadCount)); | |||
There was a problem hiding this comment.
This assert gets failed due to synchronization issue.
java.lang.AssertionError:
Expected: <2>
but: was <1>
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
at org.apache.kafka.streams.integration.AdjustStreamThreadCountTest.shouldAddAndRemoveThreads(AdjustStreamThreadCountTest.java:149)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:834)
| } | ||
|
|
||
| @Test | ||
| public void testConcurrentlyAccessThreads() throws InterruptedException { |
There was a problem hiding this comment.
This test case can cause following error if we don't apply this patch.
org.apache.kafka.streams.integration.AdjustStreamThreadCountTest > testConcurrentlyAccessThreads FAILED
java.lang.AssertionError: expected null, but was:<java.util.ConcurrentModificationException>
at org.junit.Assert.fail(Assert.java:89)
at org.junit.Assert.failNotNull(Assert.java:756)
at org.junit.Assert.assertNull(Assert.java:738)
at org.junit.Assert.assertNull(Assert.java:748)
at org.apache.kafka.streams.integration.AdjustStreamThreadCountTest.testConcurrentlyAccessThreads(AdjustStreamThreadCountTest.java:264)
| resizeThreadCache(cacheSizePerThread); | ||
| // Creating thread should hold the lock in order to avoid duplicate thread index. | ||
| // If the duplicate index happen, the metadata of thread may be duplicate too. | ||
| streamThread = createAndAddStreamThread(cacheSizePerThread, threadIdx); |
|
@cadonna Could you take a look at those concurrent issues (see description) ? I'm digging in it to observe whether there are more issues. Thanks! |
|
Hey @wcarlson5 can you also take a look at this? |
wcarlson5
left a comment
There was a problem hiding this comment.
This doesn't seems to change the actual logic so that shouldn't be a problem. The copy of the thread list to avoid an extra lock was a good idea. With #9888 this should fix the issues we are seeing and I don't see any other problems with it. @ableegoldman the synchronization of the thread list iterators was a possible issue we talked about. I didn't think that it would be a problem if we were not altering the list but it appears that I was wrong. I only have one concern otherwise LGTM. @chia7712 thanks for cleaning this up!
| streamThread.shutdown(); | ||
| if (!streamThread.getName().equals(Thread.currentThread().getName())) { | ||
| streamThread.waitOnThreadState(StreamThread.State.DEAD); | ||
| synchronized (threads) { |
There was a problem hiding this comment.
Why are we using the threads object as a lock here? is that was the Collections.synchronizedList(new LinkedList<>()); uses internally?
I am a little concerned that we are holding changeThreadCount while waiting for threads
There was a problem hiding this comment.
make sense. Let me use 'copy' to handle lock issue.
|
The unit tests may not be stable until these PRs are merged. |
|
merge #9888 to run QA again. |
issue: https://issues.apache.org/jira/browse/KAFKA-12195
related to aedb53a
Root Cause
It seems to me there are two issues on
AdjustStreamThreadCountTest.1) synchronization issue (the number of threads is inconsistent)
The synchronization list requires us to manually synchronize the iterator. non-synchronizing the list results in inconsistent results and consequently unstabilize the AdjustStreamThreadCountTest.
2) duplicate thread index when adding new stream thread
createAndAddStreamThreadis not called with holdingchangeThreadCountso it is possible that we create two threads with same thread index. It makes different threads have same metadata and thenlocalThreadsMetadatareturns incorrect number of metadata.3) assert intermediate state (it gets failed if it run too fast)
This PR already reverts the fix and it is traced by #9888
Test Result from PR
looped the test 100 times on my local. all pass
Committer Checklist (excluded from commit message)