Skip to content

KAFKA-4588: Wait for topics to be created in QueryableStateIntegrationTest.shouldNotMakeStoreAvailableUntilAllStoresAvailable#2371

Closed
dguy wants to merge 5 commits intoapache:trunkfrom
dguy:integration-test-fix
Closed

KAFKA-4588: Wait for topics to be created in QueryableStateIntegrationTest.shouldNotMakeStoreAvailableUntilAllStoresAvailable#2371
dguy wants to merge 5 commits intoapache:trunkfrom
dguy:integration-test-fix

Conversation

@dguy
Copy link
Copy Markdown
Contributor

@dguy dguy commented Jan 13, 2017

After debugging this i can see the times that it fails there is a race between when the topic is actually created/ready on the broker and when the assignment happens. When it fails StreamPartitionAssignor.assign(..) gets called with a Cluster with no topics. Hence the test hangs as no tasks get assigned. To fix this I added a waitForTopics method to EmbeddedKafkaCluster. This will wait until the topics have been created.

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/846/
Test FAILed (JDK 7 and Scala 2.10).

@dguy
Copy link
Copy Markdown
Contributor Author

dguy commented Jan 13, 2017

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/849/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/847/
Test PASSed (JDK 8 and Scala 2.12).

@guozhangwang
Copy link
Copy Markdown
Contributor

I have a clarification question about the issue, which is not directly related to this test failure: in practice, it is likely that StreamsPartitionAssignor.assign() gets called when the passed in Cluster does not include all the refreshed metadata, if it has been created just before the starting of the app. If it does cause the assignor to fail I think it is a general issue to fix.

So when you say test hangs as no tasks gets assigned, which line of code is specifically causing this issue, is it because of this:

@dguy
Copy link
Copy Markdown
Contributor Author

dguy commented Jan 13, 2017

@guozhangwang it isn't an issue with StreamsPartitionAssignor.assign() - it doesn't fail. It just receives an empty Cluster object and then does the right thing, i.e., assigns nothing. So the StreamThread.ConsumerRebalanceListener.onPartitionsAssigned gets called with an empty assignment and no tasks get created.

What happen is we call KafkaConsumer.subscribe(..) it in turn calls metadata.setTopics(..). This triggres a metadata refresh. The Cluster metadata that comes back and is set in metadata.update(..) is empty. This is due to:
WARN Error while fetching metadata with correlation id 2 : {stream-three-2=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient:695)
Then when ConsumerCoordinator.performAssign happens and it calls metadata.fetch() it gets the empty Cluster object and passes it to StreamPartitionAssignor.assign(...)

The assignment never happens again (at least not within the 30 seconds the test runs for)

@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jan 13, 2017

This might fix queryOnRebalance(), too. Did start to investigate but could not find a cause so far...

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 13, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/847/
Test FAILed (JDK 7 and Scala 2.10).

Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall, left a single comment.

}

public void waitForTopics(final StreamsConfig config, final String...topics) throws InterruptedException, IOException {
final StreamsKafkaClient streamsKafkaClient = new StreamsKafkaClient(config);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using a separate client, could we migrate the scala TestUtils.waitUntilMetadataIsPropagated code, that looks into broker.apis.metadataCache directly?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by migrate I mean we can add a similar function in the java TestUtils class.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure i'll take a look at it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, can't add it the the java TestUtils class as clients doesn't have a dependency on core.
Thought i'd add it to StreamsTestUtils, but can't as it breaks checkstyle:

[ant:checkstyle] /Users/damian/dev/apache/kafka/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java:18:1: Disallowed import - kafka.api.PartitionStateInfo.
[ant:checkstyle] /Users/damian/dev/apache/kafka/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java:19:1: Disallowed import - kafka.api.Request.
[ant:checkstyle] /Users/damian/dev/apache/kafka/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java:20:1: Disallowed import - kafka.server.KafkaServer.
[ant:checkstyle] /Users/damian/dev/apache/kafka/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java:21:1: Disallowed import - kafka.server.MetadataCache.
[ant:checkstyle] /Users/damian/dev/apache/kafka/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java:26:1: Disallowed import - scala.Option.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok - i've moved it to IntegrationTestUtils as the org.apache.kafka.streams.integration package is already allowed to import most of these. I've had to add 1 more package to the allowed imports.

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 16, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/908/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 16, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/906/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 16, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/906/
Test FAILed (JDK 8 and Scala 2.12).

@guozhangwang
Copy link
Copy Markdown
Contributor

retest this please

@guozhangwang
Copy link
Copy Markdown
Contributor

LGTM overall. Seems ResetIntegrationTest test failure has some other causes (seen this in Jenkins builds, cc @mjsax ).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 16, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/918/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 16, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/918/
Test FAILed (JDK 8 and Scala 2.12).

@guozhangwang
Copy link
Copy Markdown
Contributor

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 16, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/920/
Test FAILed (JDK 8 and Scala 2.11).

@dguy
Copy link
Copy Markdown
Contributor Author

dguy commented Jan 16, 2017 via email

@guozhangwang
Copy link
Copy Markdown
Contributor

@dguy Do you think it makes sense to always enforce waitForTopicPartitions in createTopics, or there are some scenarios that we do not want to wait for that?

@dguy
Copy link
Copy Markdown
Contributor Author

dguy commented Jan 17, 2017

@guozhangwang Sure makes sense. I'll do that

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/923/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/923/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/925/
Test FAILed (JDK 8 and Scala 2.11).

@dguy
Copy link
Copy Markdown
Contributor Author

dguy commented Jan 17, 2017

retest this please

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/929/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/931/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/929/
Test FAILed (JDK 7 and Scala 2.10).

asfgit pushed a commit that referenced this pull request Jan 17, 2017
…nTest.shouldNotMakeStoreAvailableUntilAllStoresAvailable

After debugging this i can see the times that it fails there is a race between when the topic is actually created/ready on the broker and when the assignment happens. When it fails `StreamPartitionAssignor.assign(..)` gets called with a `Cluster` with no topics. Hence the test hangs as no tasks get assigned. To fix this I added a `waitForTopics` method to `EmbeddedKafkaCluster`. This will wait until the topics have been created.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax, Guozhang Wang

Closes #2371 from dguy/integration-test-fix

(cherry picked from commit 825f225)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>
@guozhangwang
Copy link
Copy Markdown
Contributor

guozhangwang commented Jan 17, 2017

Thanks @dguy . Merged to trunk and piggy-backed to 0.10.2.

@asfgit asfgit closed this in 825f225 Jan 17, 2017
@mjsax
Copy link
Copy Markdown
Member

mjsax commented Jan 17, 2017

@guozhangwang ResetIntegrationTest fails from time to time since ZK dependency got removed. Maybe it is related to this. But as ZK PR still has some follow up tasks, I am not sure if we should talk any action for the test already of just monitor it the next days.

@dguy dguy deleted the integration-test-fix branch January 19, 2017 11:40
soenkeliebau pushed a commit to soenkeliebau/kafka that referenced this pull request Feb 7, 2017
…nTest.shouldNotMakeStoreAvailableUntilAllStoresAvailable

After debugging this i can see the times that it fails there is a race between when the topic is actually created/ready on the broker and when the assignment happens. When it fails `StreamPartitionAssignor.assign(..)` gets called with a `Cluster` with no topics. Hence the test hangs as no tasks get assigned. To fix this I added a `waitForTopics` method to `EmbeddedKafkaCluster`. This will wait until the topics have been created.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax, Guozhang Wang

Closes apache#2371 from dguy/integration-test-fix
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants