Skip to content

KAFKA-4633: Always using regex pattern subscription in StreamThread#2379

Closed
guozhangwang wants to merge 17 commits intoapache:trunkfrom
guozhangwang:K4633-regex-pattern
Closed

KAFKA-4633: Always using regex pattern subscription in StreamThread#2379
guozhangwang wants to merge 17 commits intoapache:trunkfrom
guozhangwang:K4633-regex-pattern

Conversation

@guozhangwang
Copy link
Copy Markdown
Contributor

@guozhangwang guozhangwang commented Jan 14, 2017

  1. In StreamThread, always use subscribe(Pattern, ..) function in order to avoid sending MetadataRequest with specific topic names and cause brokers to possibly auto-create subscribed topics; the pattern is generated as "topic-1|topic-2..|topic-n".

  2. In ConsumerCoordinator, let the leader to refresh its metadata if the generated assignment contains some topics that is not contained in the subscribed topics; also in SubscriptionState, modified the verification for regex subscription to against the regex pattern instead of the matched topics since the returned assignment may contain some topics not yet created when joining the group but existed after the rebalance; also modified some unit tests in KafkaConsumerTest to accommodate the above changes.

  3. Minor cleanup: changed String[] to List to avoid overloaded functions.

  4. Minor cleanup: enforced strong typing in SinkNodeFactory and removed unnecessary unchecked tags.

  5. Minor cleanup: augmented unit test error message and fixed a potential transient failure in KafkaStreamTest.

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 14, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/885/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 14, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/885/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 14, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/887/
Test FAILed (JDK 8 and Scala 2.11).

for (TopicPartition tp : assignments)
if (!this.subscription.contains(tp.topic()))
throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic.");
// for (TopicPartition tp : assignments)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove this check? We still know all topics we want to subscribe to.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still pondering on it, the issue is that during rebalance streams leader can possibly assign internal topic partitions while the consumer’s not aware of the topic yet (since these topics are not created in Kafka), the above check will fail. This is actually not introduced in this topic as even before we could hit this issue but it was not exposed by tests.

I think in general this check is valid, and in that case maybe I have to get an extended consumer class as we discussed offline..

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji @mjsax Updated to use the subscriptionPattern if possible; this is discussed offline and please see if it looks "simple enough".

@guozhangwang
Copy link
Copy Markdown
Contributor Author

Also by doing this PR now I see more failures that is being worked on with #2371. This is because now Streams rebalance gets shorter and the race condition is then more likely to happen.

@guozhangwang
Copy link
Copy Markdown
Contributor Author

retest this please.

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 15, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/889/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 15, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/891/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 15, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/889/
Test PASSed (JDK 8 and Scala 2.12).

@guozhangwang
Copy link
Copy Markdown
Contributor Author

retest this please

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 16, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/901/
Test FAILed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 16, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/903/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 16, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/901/
Test FAILed (JDK 7 and Scala 2.10).

private final Deserializer<?> valDeserializer;

private SourceNodeFactory(String name, String[] topics, Pattern pattern, Deserializer keyDeserializer, Deserializer valDeserializer) {
private SourceNodeFactory(String name, String[] topics, Pattern pattern, Deserializer<?> keyDeserializer, Deserializer<?> valDeserializer) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason why you don't make the topics param a list?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Input parameters are directly from the addSource calls, whose param is String[]. I feel like deferring the asList to its constructor than letting the caller to do it. No strong opinions though.

@guozhangwang
Copy link
Copy Markdown
Contributor Author

retest this please

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/949/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/949/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 17, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/951/
Test FAILed (JDK 8 and Scala 2.11).


Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions);

// check if the assignor has realized some new topics, if yes update the snapshot
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji This is just to show my current thoughts about how to work around the unnecessary rebalance without changing Assignor APIs, this is really not that elegant but just as a demo, and I'm happy to revert this back for future general solution; my take is that if we would allow assign() functions to update the metadatasnapshot then we probably would need to change the API.

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 18, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/967/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 18, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/967/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 18, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/969/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 18, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/975/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 18, 2017

Build finished. 4117 tests run, 0 skipped, 0 failed.
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 18, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/973/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 18, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/973/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1067/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 20, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1067/
Test PASSed (JDK 8 and Scala 2.12).


// update partition assignment
subscriptions.assignFromSubscribed(assignment.partitions());
this.joinedSubscription = subscriptions.subscription();
Copy link
Copy Markdown
Contributor

@hachikuji hachikuji Jan 20, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this works. The purpose of the joinedSubscription field is to remember the exact list of topics that were used when joining the group. If a metadata update arrives after the rebalance has begun, then we can notice the fact that the joined subscription does not match the current subscription and we can trigger another rebalance. With this change, we will no longer be able to detect this case, which means that consumption from a topic matching the subscribed regex will be delayed (perhaps indefinitely).

It seems the behavior we want is to only add to joinedSubscription those topics which were added to the assignment by the leader.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hachikuji Here is my understanding, and correct me if I'm wrong:

  1. In assignFromSubscribed, we will only change the subscription if it is regex pattern and the collected topics as in topicsToSubscribe is a superset of this.subscription.

  2. Besides this added line, joinedSubscription is only set to this.subscription before sending the join group request, and the this.subscription itself is only modified either inside subscribe() calls on inside the previously line if the assigned topics is a super set of the subscription itself.

So given these two, I think this line will only update joinedSubscription with those topics that were added to the assignment by the leader If that does happen, right?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang A metadata fetch could return in the middle of a rebalance (i.e. with the JoinGroup on the wire). In this case, we may end up discovering a new topic that we didn't know about before and adding it to our subscription (see subscribeFromPattern). We are able to catch this case currently because although the subscription has changed, joinedSubscription has not. With this patch, this is no longer true.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. I can go ahead and tackle on this.

Just clarifying: After the group has formed, both leader and follower can still trigger a rebalance: leader will trigger a rebalance if the existing topics number of partitions has changed (including the topic is deleted); follower will trigger a rebalance if the subscription has changed (both due to a metadata refresh with regex pattern or user called subscribe again). Is that right?

And if we change the consumer coordinator to allow passing regex to the leader, I think joinedSubscription can be removed completely and only leader need to trigger rebalances unless users call subscribe again on any of the consumer member, is that right?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just clarifying: After the group has formed, both leader and follower can still trigger a rebalance: leader will trigger a rebalance if the existing topics number of partitions has changed (including the topic is deleted); follower will trigger a rebalance if the subscription has changed (both due to a metadata refresh with regex pattern or user called subscribe again). Is that right?

Yes, right.

And if we change the consumer coordinator to allow passing regex to the leader, I think joinedSubscription can be removed completely and only leader need to trigger rebalances unless users call subscribe again on any of the consumer member, is that right?

The leader will still have to deal with the potential for a metadata update during a rebalance, so I'm not sure we can remove joinedSubscription. At least we won't need this funky logic to try to change joinedSubscription after the rebalance though.

// TODO this part of the logic should be removed once we allow regex on leader assign and remove joinedSubscription
Set<String> addedTopics = new HashSet<>();
for (TopicPartition tp : subscriptions.assignedPartitions()) {
if (!subscriptions.subscription().contains(tp.topic()))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm puzzling a bit over whether we should be using joinedSubscription here instead of subscriptions.subscription(), or whether it matters.

Seems it should be joinedSubscription. Suppose that joinedSubscription contained only [A] at the time of rebalance. Topic B is then created by the leader and assigned. Before the rebalance completes, the consumer discovers topic B, and subscription is updated to [A, B], while joinedSubscription is still [A]. The consumer then receives the assignment containing partitions from both A and B, but addedTopics will be empty (since B was already added to subscription). The consumer will then notice that joinedSubscription is [A], while subscription is [A, B], and request an unneeded rebalance.


if (!addedTopics.isEmpty()) {
Set<String> newSubscription = new HashSet<>(subscriptions.subscription());
Set<String> newJoinedSubsciprtion = new HashSet<>(joinedSubscription);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: typo newJoinedSubsciprtion

@guozhangwang
Copy link
Copy Markdown
Contributor Author

@hachikuji Found an issue with ConsumerCoordinatorTest while working on it, that subscription is updated on the leader and hence addedTopics will be empty; thus we need to use joinedSubscription to determine the added topics and use that to add to both subscriptions.

Could you please take another look?


// check if the assignment contains some topics that is not in the original
// subscription, if yes we will obey what leader has decided and add these topics
// into the subscriptions
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add to the end, "as long as they still match the subscribed pattern"

// update partition assignment
subscriptions.assignFromSubscribed(assignment.partitions());

// check if the assignment contains some topics that is not in the original
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "is" -> "were"

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 21, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1090/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 21, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1088/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 21, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1088/
Test PASSed (JDK 7 and Scala 2.10).

@guozhangwang
Copy link
Copy Markdown
Contributor Author

@hachikuji Addressed, thanks.

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 22, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1094/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 22, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1092/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 22, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1092/
Test PASSed (JDK 7 and Scala 2.10).

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor comments. This is looking as good as it will, I guess.

final String consumerId = "leader";

subscriptions.subscribe(singleton(topicName), rebalanceListener);
subscriptions.subscribe(Pattern.compile("test.*"), rebalanceListener);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why this was changed. Do we have "normal" subscription covered elsewhere?

Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, singletonList(topicName));
partitionAssignor.prepare(Collections.singletonMap(consumerId, singletonList(tp)));
Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, singletonList(topic1));
partitionAssignor.prepare(Collections.singletonMap(consumerId, Arrays.asList(t1p, t2p)));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to comment above. Can we do this in a separate test case? This is not the "normal" case, but it should be covered somewhere.


Map<String, Assignment> assignment = assignor.assign(metadata.fetch(), subscriptions);

// user-customized assignor may have created some topics that are not in the subscription list
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we mention in this comment that this is a hack and not something we want to support long-term unless we push regex into the protocol?

@guozhangwang
Copy link
Copy Markdown
Contributor Author

@hachikuji addressed.

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 22, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1098/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 22, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1096/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link
Copy Markdown

asfbot commented Jan 22, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1096/
Test PASSed (JDK 7 and Scala 2.10).

Copy link
Copy Markdown
Contributor

@hachikuji hachikuji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

asfgit pushed a commit that referenced this pull request Jan 23, 2017
1. In StreamThread, always use subscribe(Pattern, ..) function in order to avoid sending MetadataRequest with specific topic names and cause brokers to possibly auto-create subscribed topics; the pattern is generated as "topic-1|topic-2..|topic-n".

2. In ConsumerCoordinator, let the leader to refresh its metadata if the generated assignment contains some topics that is not contained in the subscribed topics; also in SubscriptionState, modified the verification for regex subscription to against the regex pattern instead of the matched topics since the returned assignment may contain some topics not yet created when joining the group but existed after the rebalance; also modified some unit tests in `KafkaConsumerTest` to accommodate the above changes.

3. Minor cleanup: changed String[] to List<String> to avoid overloaded functions.

4. Minor cleanup: enforced strong typing in SinkNodeFactory and removed unnecessary unchecked tags.

5. Minor cleanup: augmented unit test error message and fixed a potential transient failure in KafkaStreamTest.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Damian Guy <damian.guy@gmail.com>, Matthias J. Sax <matthias@confluent.io>, Jason Gustafson <jason@confluent.io>

Closes #2379 from guozhangwang/K4633-regex-pattern

(cherry picked from commit 3400d0c)
Signed-off-by: Jason Gustafson <jason@confluent.io>
@asfgit asfgit closed this in 3400d0c Jan 23, 2017
soenkeliebau pushed a commit to soenkeliebau/kafka that referenced this pull request Feb 7, 2017
1. In StreamThread, always use subscribe(Pattern, ..) function in order to avoid sending MetadataRequest with specific topic names and cause brokers to possibly auto-create subscribed topics; the pattern is generated as "topic-1|topic-2..|topic-n".

2. In ConsumerCoordinator, let the leader to refresh its metadata if the generated assignment contains some topics that is not contained in the subscribed topics; also in SubscriptionState, modified the verification for regex subscription to against the regex pattern instead of the matched topics since the returned assignment may contain some topics not yet created when joining the group but existed after the rebalance; also modified some unit tests in `KafkaConsumerTest` to accommodate the above changes.

3. Minor cleanup: changed String[] to List<String> to avoid overloaded functions.

4. Minor cleanup: enforced strong typing in SinkNodeFactory and removed unnecessary unchecked tags.

5. Minor cleanup: augmented unit test error message and fixed a potential transient failure in KafkaStreamTest.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Damian Guy <damian.guy@gmail.com>, Matthias J. Sax <matthias@confluent.io>, Jason Gustafson <jason@confluent.io>

Closes apache#2379 from guozhangwang/K4633-regex-pattern
@guozhangwang guozhangwang deleted the K4633-regex-pattern branch July 15, 2017 22:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants