Skip to content

KAFKA-4060 and KAFKA-4476 follow up#2404

Closed
mjsax wants to merge 2 commits intoapache:trunkfrom
mjsax:kafka-4060-zk-test-follow-up
Closed

KAFKA-4060 and KAFKA-4476 follow up#2404
mjsax wants to merge 2 commits intoapache:trunkfrom
mjsax:kafka-4060-zk-test-follow-up

Conversation

@mjsax
Copy link
Copy Markdown
Member

@mjsax mjsax commented Jan 19, 2017

ZK removed reveal a bug in StreamPartitionAssigner but did not fix it properly. This is a follow up bug fix.

Issue:

  • If topic metadata is missing, StreamPartitionAssigner should not create any affected tasks that consume topics with missing metadata.
  • Depending downstream tasks should not be create either.
  • For tasks that are not created, no store changelog topics (if any) should get created
  • For tasks that write output to not-yet existing internal repartitioning topics, those repartitioning topics should not get created

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Jan 19, 2017

Bug fix discovered by ZK removal (test was not correctly added in KAFKA-4476).

Some additional minor cleanup.

@hjafarpour @guozhangwang @enothereska @dguy

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 19, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1006/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 19, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1008/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 19, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1006/
Test FAILed (JDK 8 and Scala 2.12).

metadataWithInternalTopics = metadata;
if (internalTopicManager != null)
metadataWithInternalTopics = metadata.withPartitions(allRepartitionTopicPartitions);
metadataWithInternalTopics = metadata.withPartitions(allRepartitionTopicPartitions);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is line 372 still needed?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Will remove it.

throw new TopologyBuilderException(String.format("stream-thread [%s] Topics not co-partitioned: [%s]", streamThread.getName(), Utils.mkString(Arrays.asList(topics), ",")));
}
} else {
if (allRepartitionTopicsNumPartitions.get(topic).numPartitions == NOT_AVAILABLE) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It this possible to happen? Could you elaborate this a bit?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. If we have an unknown topic (c.f. the test "unknownTopic") that needs repartitioning, we don't know the number of partitions for the (to be created) internal repartitioning topic. And than this case hits.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we then just ignore it if it is NOT_AVAILABLE?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. If metadata is missing, we skip over this and hope to get the metadata on the next refresh. But this strategy is not working properly yet. See #2404 (comment)

We might want to add a check for user-topics if they exist though -- currently, the app would just idle if a user topic does not exits.

final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions);

final Map<String, Integer> expectedCreatedInternalTopics = new HashMap<>();
expectedCreatedInternalTopics.put(applicationId + "-KSTREAM-JOINOTHER-0000000012-store-changelog", 3);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this once more, I think, there is still an issue. As we do not know the number the input topic partitions for one join input, we cannot know the number of task for the join, and thus, can actually not create this two changelog topics either.

\cc @guozhangwang WDYT?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that was the case wouldn't we get an exception due to the topics not being co-partitioned? We should have a test for this case.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When some expected external topics do not exist or do not have metadata updatable, current strategy is that Streams will not create any tasks of its "affected" sub-topologies, including the changelog topic and repartition topic as well, and will expect another rebalance to trigger once these topics shows up. If the source topics are input ones for the "head sub-topologies" then no tasks will be created and no internal topics will be created.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I expected. But this strategy does not work correctly...The tasks for join-subtopology should not get created (an neither this two changelog topics). Will investigate an update this PR.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dguy After metadata refresh, we might end up with a problem. With the latest changes, we should throw an exception as number of partitions would not match the expected number. I am not sure if we can have an extra test for this -- from my understanding, this test should cover the case (and maybe other). I am not sure, if we cover all cases...

public final static int UNKNOWN = -1;
public final static int NOT_AVAILABLE = -2;
final static int UNKNOWN = -1;
final static int NOT_AVAILABLE = -2;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seeing as you are restricting visibility (thanks!) this one could be private


internalTopicManager.makeReady(topic, numPartitions);
for (Map.Entry<String, InternalTopicMetadata> entry : topicPartitions.entrySet()) {
InternalTopicConfig topic = entry.getValue().config;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe you could make these locals final while you are at it?

throw new TopologyBuilderException(String.format("stream-thread [%s] Topics not co-partitioned: [%s]", streamThread.getName(), Utils.mkString(Arrays.asList(topics), ",")));
}
} else {
if (allRepartitionTopicsNumPartitions.get(topic).numPartitions == NOT_AVAILABLE) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we then just ignore it if it is NOT_AVAILABLE?

final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions);

final Map<String, Integer> expectedCreatedInternalTopics = new HashMap<>();
expectedCreatedInternalTopics.put(applicationId + "-KSTREAM-JOINOTHER-0000000012-store-changelog", 3);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that was the case wouldn't we get an exception due to the topics not being co-partitioned? We should have a test for this case.

@mjsax
Copy link
Copy Markdown
Member Author

mjsax commented Jan 20, 2017

@guozhangwang @dguy @enothereska

Updated this.

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1076/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1074/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1074/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1077/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1075/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1075/
Test FAILed (JDK 7 and Scala 2.10).

do not create tasks with unkonw input partitions
@mjsax mjsax force-pushed the kafka-4060-zk-test-follow-up branch from a6b6194 to 2b489a5 Compare January 20, 2017 23:20
@asfbot
Copy link
Copy Markdown

asfbot commented Jan 21, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1079/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 21, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1077/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 21, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1077/
Test PASSed (JDK 8 and Scala 2.12).

asfgit pushed a commit that referenced this pull request Jan 21, 2017
…Manager branch and fixed one copartitioning bug

ZK removed reveal a bug in `StreamPartitionAssigner` but did not fix it properly. This is a follow up bug fix.

Issue:
 - If topic metadata is missing, `StreamPartitionAssigner` should not create any affected tasks that consume topics with missing metadata.
 - Depending downstream tasks should not be create either.
 - For tasks that are not created, no store changelog topics (if any) should get created
 - For tasks that write output to not-yet existing internal repartitioning topics, those repartitioning topics should not get created

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Damian Guy, Guozhang Wang

Closes #2404 from mjsax/kafka-4060-zk-test-follow-up

(cherry picked from commit 0b99bea)
Signed-off-by: Guozhang Wang <wangguoz@gmail.com>
@guozhangwang
Copy link
Copy Markdown
Contributor

LGTM and merged to trunk, 0.10.2

@asfgit asfgit closed this in 0b99bea Jan 21, 2017

if (partitions == null) {
log.info("Skipping assigning topic {} to tasks since its metadata is not available yet", topic);
maxNumPartitions = StreamPartitionAssignor.NOT_AVAILABLE;
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang I think, I missed to insert an break; here. Otherwise this only works if the "last" topic is not known. Or actually, we can replace this statement with return StreamPartitionAssignor.NOT_AVAILABLE;

Copy link
Copy Markdown
Member Author

@mjsax mjsax Jan 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just opened a PR for this: #2418

soenkeliebau pushed a commit to soenkeliebau/kafka that referenced this pull request Feb 7, 2017
…Manager branch and fixed one copartitioning bug

ZK removed reveal a bug in `StreamPartitionAssigner` but did not fix it properly. This is a follow up bug fix.

Issue:
 - If topic metadata is missing, `StreamPartitionAssigner` should not create any affected tasks that consume topics with missing metadata.
 - Depending downstream tasks should not be create either.
 - For tasks that are not created, no store changelog topics (if any) should get created
 - For tasks that write output to not-yet existing internal repartitioning topics, those repartitioning topics should not get created

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Damian Guy, Guozhang Wang

Closes apache#2404 from mjsax/kafka-4060-zk-test-follow-up
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants