From c75d3027fca883b4ec4e482656655d080282202a Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Tue, 7 Jan 2020 20:15:55 -0800 Subject: [PATCH 1/6] add consumer config and change subscription type --- .../apache/kafka/streams/StreamsConfig.java | 3 ++ .../internals/InternalTopologyBuilder.java | 37 +++++++++++++++++++ .../processor/internals/StreamThread.java | 12 +++++- 3 files changed, 50 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index d6b2e97b48c57..568520ed952f5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -1064,6 +1064,9 @@ public Map getMainConsumerConfigs(final String groupId, final St consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamsPartitionAssignor.class.getName()); consumerProps.put(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)); + // disable auto topic creation + consumerProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false"); + // add admin retries configs for creating topics final AdminClientConfig adminClientDefaultConfig = new AdminClientConfig(getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames())); consumerProps.put(adminClientPrefix(AdminClientConfig.RETRIES_CONFIG), adminClientDefaultConfig.getInt(AdminClientConfig.RETRIES_CONFIG)); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index a1db8d9b3c9fb..b8ccf580154d9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.StreamsConfig; @@ -124,6 +125,8 @@ public class InternalTopologyBuilder { private Pattern topicPattern = null; + private Collection topicCollection = null; + private Map> nodeGroups = null; public static class StateStoreFactory { @@ -1213,7 +1216,41 @@ SubscriptionUpdates subscriptionUpdates() { return subscriptionUpdates; } + boolean hasPatternSubscribedTopics() { + return (!nodeToSourcePatterns.isEmpty()); + } + + synchronized Collection sourceTopicCollection() { + if (hasPatternSubscribedTopics() || topicPattern != null) { + log.error("Collection subscription should not be used with hasPatternSubscribedTopics = {}, topicPattern = {}", + hasPatternSubscribedTopics(), topicPattern); + throw new IllegalStateException("Main consumer tried to use collection subscription when it should have used patterns to subscribe source topics."); + } else { + log.debug("No source topics using pattern subscription found, using regular subscription for the main consumer."); + } + + if (topicCollection == null) { + topicCollection = new ArrayList<>(); + if (!nodeToSourceTopics.isEmpty()) { + for (final List topics : nodeToSourceTopics.values()) { + topicCollection.addAll(maybeDecorateInternalSourceTopics(topics)); + } + topicCollection.removeAll(globalTopics); + } + } + + return topicCollection; + } + synchronized Pattern sourceTopicPattern() { + if (!hasPatternSubscribedTopics() || topicCollection != null) { + log.error("Pattern subscription should not be used with hasPatternSubscribedTopics = {}, topicCollection = {}", + hasPatternSubscribedTopics(), topicCollection); + throw new IllegalStateException("Main consumer tried to use pattern subscription when it should have used collections to subscribe source topics."); + } else { + log.debug("Found pattern subscribed source topics, falling back to pattern subscription for the main consumer."); + } + if (topicPattern == null) { final List allSourceTopics = new ArrayList<>(); if (!nodeToSourceTopics.isEmpty()) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index dd00420a91e16..db4a08998dfe5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -690,7 +690,7 @@ public void run() { * @throws StreamsException if the store's change log does not contain the partition */ private void runLoop() { - consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener); + subscribeConsumer(); while (isRunning()) { try { @@ -713,7 +713,15 @@ private void runLoop() { private void enforceRebalance() { consumer.unsubscribe(); - consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener); + subscribeConsumer(); + } + + private void subscribeConsumer() { + if (builder.hasPatternSubscribedTopics()) { + consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener); + } else { + consumer.subscribe(builder.sourceTopicCollection(), rebalanceListener); + } } /** From 4bbab82fa3fc19d86e7ba46731007d0fbe857eda Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Wed, 15 Jan 2020 19:35:41 -0800 Subject: [PATCH 2/6] side cleanup --- .../internals/graph/StreamSourceNode.java | 2 +- .../internals/graph/TableSourceNode.java | 2 +- .../internals/InternalTopologyBuilder.java | 147 +++++------------- .../processor/internals/StreamThread.java | 2 +- .../internals/StreamsPartitionAssignor.java | 6 +- .../processor/internals/TaskManager.java | 4 +- 6 files changed, 49 insertions(+), 114 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java index 317a95f8aca43..b519237575913 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java @@ -51,7 +51,7 @@ public StreamSourceNode(final String nodeName, this.consumedInternal = consumedInternal; } - public Collection getTopicNames() { + public Collection topicNames() { return new ArrayList<>(topicNames); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java index b1df6ecbea741..cf5d70a7dd65b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java @@ -81,7 +81,7 @@ public static TableSourceNodeBuilder tableSourceNodeBuilder() { @Override @SuppressWarnings("unchecked") public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { - final String topicName = getTopicNames().iterator().next(); + final String topicName = topicNames().iterator().next(); // TODO: we assume source KTables can only be timestamped-key-value stores for now. // should be expanded for other types of stores as well. diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index b8ccf580154d9..46267a189ee56 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.processor.internals; -import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.StreamsConfig; @@ -61,10 +60,8 @@ public class InternalTopologyBuilder { // node factories in a topological order private final Map nodeFactories = new LinkedHashMap<>(); - // state factories private final Map stateFactories = new HashMap<>(); - // built global state stores private final Map globalStateBuilders = new LinkedHashMap<>(); // built global state stores @@ -88,10 +85,6 @@ public class InternalTopologyBuilder { // map from sink processor names to sink topic (without application-id prefix for internal topics) private final Map nodeToSinkTopic = new HashMap<>(); - // map from topics to their matched regex patterns, this is to ensure one topic is passed through on source node - // even if it can be matched by multiple regex patterns - private final Map topicToPatterns = new HashMap<>(); - // map from state store names to all the topics subscribed from source processors that // are connected to these state stores private final Map> stateStoreNameToSourceTopics = new HashMap<>(); @@ -119,14 +112,13 @@ public class InternalTopologyBuilder { private final QuickUnion nodeGrouper = new QuickUnion<>(); - private SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates(); + // Used to capture subscribed topics via Patterns discovered during the partition assignment process. + private final Set subscriptionUpdates = new HashSet<>(); private String applicationId = null; private Pattern topicPattern = null; - private Collection topicCollection = null; - private Map> nodeGroups = null; public static class StateStoreFactory { @@ -221,6 +213,10 @@ Processor describe() { } } + // Map from topics to their matched regex patterns, this is to ensure one topic is passed through on source node + // even if it can be matched by multiple regex patterns. Only used by SourceNodeFactory + private static final Map topicToPatterns = new HashMap<>(); + private class SourceNodeFactory extends NodeFactory { private final List topics; private final Pattern pattern; @@ -918,7 +914,7 @@ private void buildSourceNode(final Map topicSourceMap, final SourceNode node) { final List topics = (sourceNodeFactory.pattern != null) ? - sourceNodeFactory.getTopics(subscriptionUpdates.getUpdates()) : + sourceNodeFactory.getTopics(subscriptionUpdates()) : sourceNodeFactory.topics; for (final String topic : topics) { @@ -1060,24 +1056,23 @@ public synchronized Map topicGroups() { } private void setRegexMatchedTopicsToSourceNodes() { - if (subscriptionUpdates.hasUpdates()) { - for (final Map.Entry stringPatternEntry : nodeToSourcePatterns.entrySet()) { - final SourceNodeFactory sourceNode = - (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey()); - //need to update nodeToSourceTopics with topics matched from given regex - nodeToSourceTopics.put( - stringPatternEntry.getKey(), - sourceNode.getTopics(subscriptionUpdates.getUpdates())); - log.debug("nodeToSourceTopics {}", nodeToSourceTopics); + if (hasSubscriptionUpdates()) { + for (final String nodeName : nodeToSourcePatterns.keySet()) { + //need to update nodeToSourceTopics and sourceTopicNames with topics matched from given regex + final List sourceTopics = ((SourceNodeFactory) nodeFactories.get(nodeName)) + .getTopics(subscriptionUpdates); + nodeToSourceTopics.put(nodeName, sourceTopics); + sourceTopicNames.addAll(sourceTopics); } + log.debug("Updated nodeToSourceTopics: {}", nodeToSourceTopics); } } private void setRegexMatchedTopicToStateStore() { - if (subscriptionUpdates.hasUpdates()) { + if (hasSubscriptionUpdates()) { for (final Map.Entry> storePattern : stateStoreNameToSourceRegex.entrySet()) { final Set updatedTopicsForStateStore = new HashSet<>(); - for (final String subscriptionUpdateTopic : subscriptionUpdates.getUpdates()) { + for (final String subscriptionUpdateTopic : subscriptionUpdates()) { for (final Pattern pattern : storePattern.getValue()) { if (pattern.matcher(subscriptionUpdateTopic).matches()) { updatedTopicsForStateStore.add(subscriptionUpdateTopic); @@ -1120,11 +1115,11 @@ private Pattern resetTopicsPattern(final Set resetTopics, final Set resetPatterns) { final List topics = maybeDecorateInternalSourceTopics(resetTopics); - return buildPatternForOffsetResetTopics(topics, resetPatterns); + return buildPattern(topics, resetPatterns); } - private static Pattern buildPatternForOffsetResetTopics(final Collection sourceTopics, - final Collection sourcePatterns) { + private static Pattern buildPattern(final Collection sourceTopics, + final Collection sourcePatterns) { final StringBuilder builder = new StringBuilder(); for (final String topic : sourceTopics) { @@ -1212,71 +1207,28 @@ private String decorateTopic(final String topic) { return applicationId + "-" + topic; } - SubscriptionUpdates subscriptionUpdates() { - return subscriptionUpdates; - } - - boolean hasPatternSubscribedTopics() { + boolean shouldUsePatternSubscription() { return (!nodeToSourcePatterns.isEmpty()); } synchronized Collection sourceTopicCollection() { - if (hasPatternSubscribedTopics() || topicPattern != null) { - log.error("Collection subscription should not be used with hasPatternSubscribedTopics = {}, topicPattern = {}", - hasPatternSubscribedTopics(), topicPattern); - throw new IllegalStateException("Main consumer tried to use collection subscription when it should have used patterns to subscribe source topics."); - } else { - log.debug("No source topics using pattern subscription found, using regular subscription for the main consumer."); - } + log.debug("No source topics using pattern subscription found, using regular subscription for the main consumer."); - if (topicCollection == null) { - topicCollection = new ArrayList<>(); - if (!nodeToSourceTopics.isEmpty()) { - for (final List topics : nodeToSourceTopics.values()) { - topicCollection.addAll(maybeDecorateInternalSourceTopics(topics)); - } - topicCollection.removeAll(globalTopics); - } - } - - return topicCollection; + return sourceTopicNames; } synchronized Pattern sourceTopicPattern() { - if (!hasPatternSubscribedTopics() || topicCollection != null) { - log.error("Pattern subscription should not be used with hasPatternSubscribedTopics = {}, topicCollection = {}", - hasPatternSubscribedTopics(), topicCollection); - throw new IllegalStateException("Main consumer tried to use pattern subscription when it should have used collections to subscribe source topics."); - } else { - log.debug("Found pattern subscribed source topics, falling back to pattern subscription for the main consumer."); - } + log.debug("Found pattern subscribed source topics, falling back to pattern subscription for the main consumer."); if (topicPattern == null) { - final List allSourceTopics = new ArrayList<>(); - if (!nodeToSourceTopics.isEmpty()) { - for (final List topics : nodeToSourceTopics.values()) { - allSourceTopics.addAll(maybeDecorateInternalSourceTopics(topics)); - } - allSourceTopics.removeAll(globalTopics); - } + final List allSourceTopics = new ArrayList<>(sourceTopicNames); Collections.sort(allSourceTopics); - - topicPattern = buildPatternForOffsetResetTopics(allSourceTopics, nodeToSourcePatterns.values()); + topicPattern = buildPattern(allSourceTopics, nodeToSourcePatterns.values()); } return topicPattern; } - // package-private for testing only - synchronized void updateSubscriptions(final SubscriptionUpdates subscriptionUpdates, - final String logPrefix) { - log.debug("{}updating builder with {} topic(s) with possible matching regex subscription(s)", - logPrefix, subscriptionUpdates); - this.subscriptionUpdates = subscriptionUpdates; - setRegexMatchedTopicsToSourceNodes(); - setRegexMatchedTopicToStateStore(); - } - private boolean isGlobalSource(final String nodeName) { final NodeFactory nodeFactory = nodeFactories.get(nodeName); @@ -1902,42 +1854,25 @@ private static String nodeNames(final Set nodes) { return sb.toString(); } - /** - * Used to capture subscribed topic via Patterns discovered during the - * partition assignment process. - */ - public static class SubscriptionUpdates { - - private final Set updatedTopicSubscriptions = new HashSet<>(); - - private void updateTopics(final Collection topicNames) { - updatedTopicSubscriptions.clear(); - updatedTopicSubscriptions.addAll(topicNames); - } - - public Collection getUpdates() { - return Collections.unmodifiableSet(updatedTopicSubscriptions); - } - - boolean hasUpdates() { - return !updatedTopicSubscriptions.isEmpty(); - } - - @Override - public String toString() { - return String.format("SubscriptionUpdates{updatedTopicSubscriptions=%s}", updatedTopicSubscriptions); - } + Collection subscriptionUpdates() { + return Collections.unmodifiableSet(subscriptionUpdates); } - void updateSubscribedTopics(final Set topics, - final String logPrefix) { - final SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates(); - log.debug("{}found {} topics possibly matching regex", logPrefix, topics); - // update the topic groups with the returned subscription set for regex pattern subscriptions - subscriptionUpdates.updateTopics(topics); - updateSubscriptions(subscriptionUpdates, logPrefix); + boolean hasSubscriptionUpdates() { + return !subscriptionUpdates.isEmpty(); } + synchronized void updateSubscribedTopics(final Set topics, + final String logPrefix) { + log.debug("{}found {} topics possibly matching subscription", logPrefix, topics); + subscriptionUpdates.clear(); + subscriptionUpdates.addAll(topics); + + log.debug("{}updating builder with {} topic(s) with possible matching regex subscription(s)", + logPrefix, subscriptionUpdates); + setRegexMatchedTopicsToSourceNodes(); + setRegexMatchedTopicToStateStore(); + } // following functions are for test only diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index db4a08998dfe5..cfd671048d373 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -717,7 +717,7 @@ private void enforceRebalance() { } private void subscribeConsumer() { - if (builder.hasPatternSubscribedTopics()) { + if (builder.shouldUsePatternSubscription()) { consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener); } else { consumer.subscribe(builder.sourceTopicCollection(), rebalanceListener); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 169ce88a96a05..d3baf6e9951c3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -241,9 +241,9 @@ public ByteBuffer subscriptionUserData(final Set topics) { } protected static Set prepareForSubscription(final TaskManager taskManager, - final Set topics, - final Set standbyTasks, - final RebalanceProtocol rebalanceProtocol) { + final Set topics, + final Set standbyTasks, + final RebalanceProtocol rebalanceProtocol) { // Any tasks that are not yet running are counted as standby tasks for assignment purposes, // along with any old tasks for which we still found state on disk final Set activeTasks; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 782d6b2ec79e2..9385ca1fd380f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -517,7 +517,7 @@ public void updateSubscriptionsFromAssignment(final List partiti assignedTopics.add(topicPartition.topic()); } - final Collection existingTopics = builder().subscriptionUpdates().getUpdates(); + final Collection existingTopics = builder().subscriptionUpdates(); if (!existingTopics.containsAll(assignedTopics)) { assignedTopics.addAll(existingTopics); builder().updateSubscribedTopics(assignedTopics, logPrefix); @@ -527,7 +527,7 @@ public void updateSubscriptionsFromAssignment(final List partiti public void updateSubscriptionsFromMetadata(final Set topics) { if (builder().sourceTopicPattern() != null) { - final Collection existingTopics = builder().subscriptionUpdates().getUpdates(); + final Collection existingTopics = builder().subscriptionUpdates(); if (!existingTopics.equals(topics)) { builder().updateSubscribedTopics(topics, logPrefix); } From ad55ffc923b0f87f51277519cfebe9ff602a1927 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Wed, 15 Jan 2020 19:40:58 -0800 Subject: [PATCH 3/6] replace null check in other classes with added method --- .../streams/processor/internals/InternalTopologyBuilder.java | 2 +- .../kafka/streams/processor/internals/StreamThread.java | 2 +- .../apache/kafka/streams/processor/internals/TaskManager.java | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 46267a189ee56..f747c36beb64f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -1207,7 +1207,7 @@ private String decorateTopic(final String topic) { return applicationId + "-" + topic; } - boolean shouldUsePatternSubscription() { + boolean usesPatternSubscription() { return (!nodeToSourcePatterns.isEmpty()); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index cfd671048d373..8bf7ef90c32e2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -717,7 +717,7 @@ private void enforceRebalance() { } private void subscribeConsumer() { - if (builder.shouldUsePatternSubscription()) { + if (builder.usesPatternSubscription()) { consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener); } else { consumer.subscribe(builder.sourceTopicCollection(), rebalanceListener); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 9385ca1fd380f..64c27e34650a5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -511,7 +511,7 @@ public void setAssignmentMetadata(final Map> activeT } public void updateSubscriptionsFromAssignment(final List partitions) { - if (builder().sourceTopicPattern() != null) { + if (builder().usesPatternSubscription()) { final Set assignedTopics = new HashSet<>(); for (final TopicPartition topicPartition : partitions) { assignedTopics.add(topicPartition.topic()); @@ -526,7 +526,7 @@ public void updateSubscriptionsFromAssignment(final List partiti } public void updateSubscriptionsFromMetadata(final Set topics) { - if (builder().sourceTopicPattern() != null) { + if (builder().usesPatternSubscription()) { final Collection existingTopics = builder().subscriptionUpdates(); if (!existingTopics.equals(topics)) { builder().updateSubscribedTopics(topics, logPrefix); From a9567353de535ee72e9eb82f91e5a47acb0e95a7 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Tue, 21 Jan 2020 17:10:38 -0800 Subject: [PATCH 4/6] fix up tests, add unit test --- .../internals/InternalTopologyBuilder.java | 17 +++++--- .../processor/internals/TaskManager.java | 4 +- .../InternalTopologyBuilderTest.java | 38 +++++++++-------- .../processor/internals/TaskManagerTest.java | 41 ++++++++----------- 4 files changed, 51 insertions(+), 49 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index f747c36beb64f..a98dd458d0a4a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -119,6 +119,8 @@ public class InternalTopologyBuilder { private Pattern topicPattern = null; + private List topicCollection = null; + private Map> nodeGroups = null; public static class StateStoreFactory { @@ -215,7 +217,7 @@ Processor describe() { // Map from topics to their matched regex patterns, this is to ensure one topic is passed through on source node // even if it can be matched by multiple regex patterns. Only used by SourceNodeFactory - private static final Map topicToPatterns = new HashMap<>(); + private final Map topicToPatterns = new HashMap<>(); private class SourceNodeFactory extends NodeFactory { private final List topics; @@ -1208,20 +1210,25 @@ private String decorateTopic(final String topic) { } boolean usesPatternSubscription() { - return (!nodeToSourcePatterns.isEmpty()); + return !nodeToSourcePatterns.isEmpty(); } synchronized Collection sourceTopicCollection() { log.debug("No source topics using pattern subscription found, using regular subscription for the main consumer."); - return sourceTopicNames; + if (topicCollection == null) { + topicCollection = maybeDecorateInternalSourceTopics(sourceTopicNames); + Collections.sort(topicCollection); + } + + return topicCollection; } synchronized Pattern sourceTopicPattern() { log.debug("Found pattern subscribed source topics, falling back to pattern subscription for the main consumer."); if (topicPattern == null) { - final List allSourceTopics = new ArrayList<>(sourceTopicNames); + final List allSourceTopics = maybeDecorateInternalSourceTopics(sourceTopicNames); Collections.sort(allSourceTopics); topicPattern = buildPattern(allSourceTopics, nodeToSourcePatterns.values()); } @@ -1854,7 +1861,7 @@ private static String nodeNames(final Set nodes) { return sb.toString(); } - Collection subscriptionUpdates() { + Set subscriptionUpdates() { return Collections.unmodifiableSet(subscriptionUpdates); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 64c27e34650a5..9385ca1fd380f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -511,7 +511,7 @@ public void setAssignmentMetadata(final Map> activeT } public void updateSubscriptionsFromAssignment(final List partitions) { - if (builder().usesPatternSubscription()) { + if (builder().sourceTopicPattern() != null) { final Set assignedTopics = new HashSet<>(); for (final TopicPartition topicPartition : partitions) { assignedTopics.add(topicPartition.topic()); @@ -526,7 +526,7 @@ public void updateSubscriptionsFromAssignment(final List partiti } public void updateSubscriptionsFromMetadata(final Set topics) { - if (builder().usesPatternSubscription()) { + if (builder().sourceTopicPattern() != null) { final Collection existingTopics = builder().subscriptionUpdates(); if (!existingTopics.equals(topics)) { builder().updateSubscribedTopics(topics, logPrefix); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index 766b7dc64ccc2..8008cc14cc198 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -16,9 +16,11 @@ */ package org.apache.kafka.streams.processor.internals; +import java.util.Arrays; import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyDescription; @@ -34,7 +36,6 @@ import org.apache.kafka.test.StreamsTestUtils; import org.junit.Test; -import java.lang.reflect.Field; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -231,7 +232,19 @@ public void testAddSinkConnectedWithMultipleParent() { } @Test - public void testSourceTopics() { + public void testOnlyTopicNameSourceTopics() { + builder.setApplicationId("X"); + builder.addSource(null, "source-1", null, null, null, "topic-1"); + builder.addSource(null, "source-2", null, null, null, "topic-2"); + builder.addSource(null, "source-3", null, null, null, "topic-3"); + builder.addInternalTopic("topic-3"); + + assertFalse(builder.usesPatternSubscription()); + assertEquals(Arrays.asList("X-topic-3", "topic-1", "topic-2"), builder.sourceTopicCollection()); + } + + @Test + public void testPatternSourceTopics() { builder.setApplicationId("X"); builder.addSource(null, "source-1", null, null, null, "topic-1"); builder.addSource(null, "source-2", null, null, null, "topic-2"); @@ -667,22 +680,18 @@ public void shouldAddInternalTopicConfigForRepartitionTopics() { @SuppressWarnings("unchecked") @Test - public void shouldSetCorrectSourceNodesWithRegexUpdatedTopics() throws Exception { + public void shouldSetCorrectSourceNodesWithRegexUpdatedTopics() { builder.addSource(null, "source-1", null, null, null, "topic-foo"); builder.addSource(null, "source-2", null, null, null, Pattern.compile("topic-[A-C]")); builder.addSource(null, "source-3", null, null, null, Pattern.compile("topic-\\d")); - final InternalTopologyBuilder.SubscriptionUpdates subscriptionUpdates = new InternalTopologyBuilder.SubscriptionUpdates(); - final Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions"); - updatedTopicsField.setAccessible(true); - - final Set updatedTopics = (Set) updatedTopicsField.get(subscriptionUpdates); + final Set updatedTopics = new HashSet<>(); updatedTopics.add("topic-B"); updatedTopics.add("topic-3"); updatedTopics.add("topic-A"); - builder.updateSubscriptions(subscriptionUpdates, null); + builder.updateSubscribedTopics(updatedTopics, null); builder.setApplicationId("test-id"); final Map topicGroups = builder.topicGroups(); @@ -690,7 +699,6 @@ public void shouldSetCorrectSourceNodesWithRegexUpdatedTopics() throws Exception assertTrue(topicGroups.get(1).sourceTopics.contains("topic-A")); assertTrue(topicGroups.get(1).sourceTopics.contains("topic-B")); assertTrue(topicGroups.get(2).sourceTopics.contains("topic-3")); - } @Test @@ -754,22 +762,18 @@ public void shouldSortProcessorNodesCorrectly() { @SuppressWarnings("unchecked") @Test - public void shouldConnectRegexMatchedTopicsToStateStore() throws Exception { + public void shouldConnectRegexMatchedTopicsToStateStore() { builder.addSource(null, "ingest", null, null, null, Pattern.compile("topic-\\d+")); builder.addProcessor("my-processor", new MockProcessorSupplier(), "ingest"); builder.addStateStore(storeBuilder, "my-processor"); - final InternalTopologyBuilder.SubscriptionUpdates subscriptionUpdates = new InternalTopologyBuilder.SubscriptionUpdates(); - final Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions"); - updatedTopicsField.setAccessible(true); - - final Set updatedTopics = (Set) updatedTopicsField.get(subscriptionUpdates); + final Set updatedTopics = new HashSet<>(); updatedTopics.add("topic-2"); updatedTopics.add("topic-3"); updatedTopics.add("topic-A"); - builder.updateSubscriptions(subscriptionUpdates, "test-thread"); + builder.updateSubscribedTopics(updatedTopics, "test-thread"); builder.setApplicationId("test-app"); final Map> stateStoreAndTopics = builder.stateStoreNameToSourceTopics(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 8e4b2c242166d..b95cec3a5cab7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -70,8 +70,7 @@ public class TaskManagerTest { private final Map> taskId0Assignment = Collections.singletonMap(taskId0, taskId0Partitions); private final Map taskId0PartitionToTaskId = Collections.singletonMap(t1p0, taskId0); - @Mock(type = MockType.STRICT) - private InternalTopologyBuilder.SubscriptionUpdates subscriptionUpdates; + @Mock(type = MockType.STRICT) private InternalTopologyBuilder topologyBuilder; @Mock(type = MockType.STRICT) @@ -118,6 +117,7 @@ public class TaskManagerTest { private final Set revokedTasks = new HashSet<>(); private final List revokedPartitions = new ArrayList<>(); private final List revokedChangelogs = Collections.emptyList(); + private Set subscriptionUpdates = Collections.emptySet(); @Rule public final TemporaryFolder testFolder = new TemporaryFolder(); @@ -152,69 +152,61 @@ private void replay() { @Test public void shouldUpdateSubscriptionFromAssignment() { mockTopologyBuilder(); - expect(subscriptionUpdates.getUpdates()).andReturn(Utils.mkSet(topic1)); + expect(topologyBuilder.subscriptionUpdates()).andReturn(Utils.mkSet(topic1)); topologyBuilder.updateSubscribedTopics(EasyMock.eq(Utils.mkSet(topic1, topic2)), EasyMock.anyString()); expectLastCall().once(); EasyMock.replay(activeTaskCreator, - topologyBuilder, - subscriptionUpdates); + topologyBuilder); taskManager.updateSubscriptionsFromAssignment(asList(t1p1, t2p1)); EasyMock.verify(activeTaskCreator, - topologyBuilder, - subscriptionUpdates); + topologyBuilder); } @Test public void shouldNotUpdateSubscriptionFromAssignment() { mockTopologyBuilder(); - expect(subscriptionUpdates.getUpdates()).andReturn(Utils.mkSet(topic1, topic2)); + expect(topologyBuilder.subscriptionUpdates()).andReturn(Utils.mkSet(topic1, topic2)); EasyMock.replay(activeTaskCreator, - topologyBuilder, - subscriptionUpdates); + topologyBuilder); taskManager.updateSubscriptionsFromAssignment(asList(t1p1)); EasyMock.verify(activeTaskCreator, - topologyBuilder, - subscriptionUpdates); + topologyBuilder); } @Test public void shouldUpdateSubscriptionFromMetadata() { mockTopologyBuilder(); - expect(subscriptionUpdates.getUpdates()).andReturn(Utils.mkSet(topic1)); + expect(topologyBuilder.subscriptionUpdates()).andReturn(Utils.mkSet(topic1)); topologyBuilder.updateSubscribedTopics(EasyMock.eq(Utils.mkSet(topic1, topic2)), EasyMock.anyString()); expectLastCall().once(); EasyMock.replay(activeTaskCreator, - topologyBuilder, - subscriptionUpdates); + topologyBuilder); taskManager.updateSubscriptionsFromMetadata(Utils.mkSet(topic1, topic2)); EasyMock.verify(activeTaskCreator, - topologyBuilder, - subscriptionUpdates); + topologyBuilder); } @Test public void shouldNotUpdateSubscriptionFromMetadata() { mockTopologyBuilder(); - expect(subscriptionUpdates.getUpdates()).andReturn(Utils.mkSet(topic1)); + expect(topologyBuilder.subscriptionUpdates()).andReturn(Utils.mkSet(topic1)); EasyMock.replay(activeTaskCreator, - topologyBuilder, - subscriptionUpdates); + topologyBuilder); taskManager.updateSubscriptionsFromMetadata(Utils.mkSet(topic1)); EasyMock.verify(activeTaskCreator, - topologyBuilder, - subscriptionUpdates); + topologyBuilder); } @Test @@ -677,7 +669,7 @@ private void mockAssignStandbyPartitions(final long offset) { } private void mockSingleActiveTask() { - expect(activeTaskCreator.createTasks(EasyMock.>anyObject(), + expect(activeTaskCreator.createTasks(EasyMock.anyObject(), EasyMock.eq(taskId0Assignment))) .andReturn(Collections.singletonList(streamTask)); @@ -685,7 +677,6 @@ private void mockSingleActiveTask() { private void mockTopologyBuilder() { expect(activeTaskCreator.builder()).andReturn(topologyBuilder).anyTimes(); - expect(topologyBuilder.sourceTopicPattern()).andReturn(Pattern.compile("abc")); - expect(topologyBuilder.subscriptionUpdates()).andReturn(subscriptionUpdates); + expect(topologyBuilder.sourceTopicPattern()).andReturn(Pattern.compile("abc")).anyTimes(); } } From 3b52129691c1ced9c1e44656e8df8f7934039890 Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Wed, 22 Jan 2020 13:41:37 -0800 Subject: [PATCH 5/6] fix checkstyle --- .../streams/processor/internals/InternalTopologyBuilderTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index 8008cc14cc198..d09337aae952d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -20,7 +20,6 @@ import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyDescription; From 9ba3ee4e5849cae6c81d5b229846e0507630647f Mon Sep 17 00:00:00 2001 From: ableegoldman Date: Wed, 22 Jan 2020 16:51:26 -0800 Subject: [PATCH 6/6] github comment --- .../streams/processor/internals/InternalTopologyBuilder.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index a98dd458d0a4a..409569db5b999 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -1060,9 +1060,9 @@ public synchronized Map topicGroups() { private void setRegexMatchedTopicsToSourceNodes() { if (hasSubscriptionUpdates()) { for (final String nodeName : nodeToSourcePatterns.keySet()) { + final SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(nodeName); + final List sourceTopics = sourceNode.getTopics(subscriptionUpdates); //need to update nodeToSourceTopics and sourceTopicNames with topics matched from given regex - final List sourceTopics = ((SourceNodeFactory) nodeFactories.get(nodeName)) - .getTopics(subscriptionUpdates); nodeToSourceTopics.put(nodeName, sourceTopics); sourceTopicNames.addAll(sourceTopics); }