diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index d6b2e97b48c57..568520ed952f5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -1064,6 +1064,9 @@ public Map getMainConsumerConfigs(final String groupId, final St consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, StreamsPartitionAssignor.class.getName()); consumerProps.put(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)); + // disable auto topic creation + consumerProps.put(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false"); + // add admin retries configs for creating topics final AdminClientConfig adminClientDefaultConfig = new AdminClientConfig(getClientPropsWithPrefix(ADMIN_CLIENT_PREFIX, AdminClientConfig.configNames())); consumerProps.put(adminClientPrefix(AdminClientConfig.RETRIES_CONFIG), adminClientDefaultConfig.getInt(AdminClientConfig.RETRIES_CONFIG)); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java index 317a95f8aca43..b519237575913 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamSourceNode.java @@ -51,7 +51,7 @@ public StreamSourceNode(final String nodeName, this.consumedInternal = consumedInternal; } - public Collection getTopicNames() { + public Collection topicNames() { return new ArrayList<>(topicNames); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java index b1df6ecbea741..cf5d70a7dd65b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/TableSourceNode.java @@ -81,7 +81,7 @@ public static TableSourceNodeBuilder tableSourceNodeBuilder() { @Override @SuppressWarnings("unchecked") public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { - final String topicName = getTopicNames().iterator().next(); + final String topicName = topicNames().iterator().next(); // TODO: we assume source KTables can only be timestamped-key-value stores for now. // should be expanded for other types of stores as well. diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index a1db8d9b3c9fb..409569db5b999 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -60,10 +60,8 @@ public class InternalTopologyBuilder { // node factories in a topological order private final Map nodeFactories = new LinkedHashMap<>(); - // state factories private final Map stateFactories = new HashMap<>(); - // built global state stores private final Map globalStateBuilders = new LinkedHashMap<>(); // built global state stores @@ -87,10 +85,6 @@ public class InternalTopologyBuilder { // map from sink processor names to sink topic (without application-id prefix for internal topics) private final Map nodeToSinkTopic = new HashMap<>(); - // map from topics to their matched regex patterns, this is to ensure one topic is passed through on source node - // even if it can be matched by multiple regex patterns - private final Map topicToPatterns = new HashMap<>(); - // map from state store names to all the topics subscribed from source processors that // are connected to these state stores private final Map> stateStoreNameToSourceTopics = new HashMap<>(); @@ -118,12 +112,15 @@ public class InternalTopologyBuilder { private final QuickUnion nodeGrouper = new QuickUnion<>(); - private SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates(); + // Used to capture subscribed topics via Patterns discovered during the partition assignment process. + private final Set subscriptionUpdates = new HashSet<>(); private String applicationId = null; private Pattern topicPattern = null; + private List topicCollection = null; + private Map> nodeGroups = null; public static class StateStoreFactory { @@ -218,6 +215,10 @@ Processor describe() { } } + // Map from topics to their matched regex patterns, this is to ensure one topic is passed through on source node + // even if it can be matched by multiple regex patterns. Only used by SourceNodeFactory + private final Map topicToPatterns = new HashMap<>(); + private class SourceNodeFactory extends NodeFactory { private final List topics; private final Pattern pattern; @@ -915,7 +916,7 @@ private void buildSourceNode(final Map topicSourceMap, final SourceNode node) { final List topics = (sourceNodeFactory.pattern != null) ? - sourceNodeFactory.getTopics(subscriptionUpdates.getUpdates()) : + sourceNodeFactory.getTopics(subscriptionUpdates()) : sourceNodeFactory.topics; for (final String topic : topics) { @@ -1057,24 +1058,23 @@ public synchronized Map topicGroups() { } private void setRegexMatchedTopicsToSourceNodes() { - if (subscriptionUpdates.hasUpdates()) { - for (final Map.Entry stringPatternEntry : nodeToSourcePatterns.entrySet()) { - final SourceNodeFactory sourceNode = - (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey()); - //need to update nodeToSourceTopics with topics matched from given regex - nodeToSourceTopics.put( - stringPatternEntry.getKey(), - sourceNode.getTopics(subscriptionUpdates.getUpdates())); - log.debug("nodeToSourceTopics {}", nodeToSourceTopics); + if (hasSubscriptionUpdates()) { + for (final String nodeName : nodeToSourcePatterns.keySet()) { + final SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(nodeName); + final List sourceTopics = sourceNode.getTopics(subscriptionUpdates); + //need to update nodeToSourceTopics and sourceTopicNames with topics matched from given regex + nodeToSourceTopics.put(nodeName, sourceTopics); + sourceTopicNames.addAll(sourceTopics); } + log.debug("Updated nodeToSourceTopics: {}", nodeToSourceTopics); } } private void setRegexMatchedTopicToStateStore() { - if (subscriptionUpdates.hasUpdates()) { + if (hasSubscriptionUpdates()) { for (final Map.Entry> storePattern : stateStoreNameToSourceRegex.entrySet()) { final Set updatedTopicsForStateStore = new HashSet<>(); - for (final String subscriptionUpdateTopic : subscriptionUpdates.getUpdates()) { + for (final String subscriptionUpdateTopic : subscriptionUpdates()) { for (final Pattern pattern : storePattern.getValue()) { if (pattern.matcher(subscriptionUpdateTopic).matches()) { updatedTopicsForStateStore.add(subscriptionUpdateTopic); @@ -1117,11 +1117,11 @@ private Pattern resetTopicsPattern(final Set resetTopics, final Set resetPatterns) { final List topics = maybeDecorateInternalSourceTopics(resetTopics); - return buildPatternForOffsetResetTopics(topics, resetPatterns); + return buildPattern(topics, resetPatterns); } - private static Pattern buildPatternForOffsetResetTopics(final Collection sourceTopics, - final Collection sourcePatterns) { + private static Pattern buildPattern(final Collection sourceTopics, + final Collection sourcePatterns) { final StringBuilder builder = new StringBuilder(); for (final String topic : sourceTopics) { @@ -1209,37 +1209,33 @@ private String decorateTopic(final String topic) { return applicationId + "-" + topic; } - SubscriptionUpdates subscriptionUpdates() { - return subscriptionUpdates; + boolean usesPatternSubscription() { + return !nodeToSourcePatterns.isEmpty(); + } + + synchronized Collection sourceTopicCollection() { + log.debug("No source topics using pattern subscription found, using regular subscription for the main consumer."); + + if (topicCollection == null) { + topicCollection = maybeDecorateInternalSourceTopics(sourceTopicNames); + Collections.sort(topicCollection); + } + + return topicCollection; } synchronized Pattern sourceTopicPattern() { + log.debug("Found pattern subscribed source topics, falling back to pattern subscription for the main consumer."); + if (topicPattern == null) { - final List allSourceTopics = new ArrayList<>(); - if (!nodeToSourceTopics.isEmpty()) { - for (final List topics : nodeToSourceTopics.values()) { - allSourceTopics.addAll(maybeDecorateInternalSourceTopics(topics)); - } - allSourceTopics.removeAll(globalTopics); - } + final List allSourceTopics = maybeDecorateInternalSourceTopics(sourceTopicNames); Collections.sort(allSourceTopics); - - topicPattern = buildPatternForOffsetResetTopics(allSourceTopics, nodeToSourcePatterns.values()); + topicPattern = buildPattern(allSourceTopics, nodeToSourcePatterns.values()); } return topicPattern; } - // package-private for testing only - synchronized void updateSubscriptions(final SubscriptionUpdates subscriptionUpdates, - final String logPrefix) { - log.debug("{}updating builder with {} topic(s) with possible matching regex subscription(s)", - logPrefix, subscriptionUpdates); - this.subscriptionUpdates = subscriptionUpdates; - setRegexMatchedTopicsToSourceNodes(); - setRegexMatchedTopicToStateStore(); - } - private boolean isGlobalSource(final String nodeName) { final NodeFactory nodeFactory = nodeFactories.get(nodeName); @@ -1865,42 +1861,25 @@ private static String nodeNames(final Set nodes) { return sb.toString(); } - /** - * Used to capture subscribed topic via Patterns discovered during the - * partition assignment process. - */ - public static class SubscriptionUpdates { - - private final Set updatedTopicSubscriptions = new HashSet<>(); - - private void updateTopics(final Collection topicNames) { - updatedTopicSubscriptions.clear(); - updatedTopicSubscriptions.addAll(topicNames); - } - - public Collection getUpdates() { - return Collections.unmodifiableSet(updatedTopicSubscriptions); - } - - boolean hasUpdates() { - return !updatedTopicSubscriptions.isEmpty(); - } - - @Override - public String toString() { - return String.format("SubscriptionUpdates{updatedTopicSubscriptions=%s}", updatedTopicSubscriptions); - } + Set subscriptionUpdates() { + return Collections.unmodifiableSet(subscriptionUpdates); } - void updateSubscribedTopics(final Set topics, - final String logPrefix) { - final SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates(); - log.debug("{}found {} topics possibly matching regex", logPrefix, topics); - // update the topic groups with the returned subscription set for regex pattern subscriptions - subscriptionUpdates.updateTopics(topics); - updateSubscriptions(subscriptionUpdates, logPrefix); + boolean hasSubscriptionUpdates() { + return !subscriptionUpdates.isEmpty(); } + synchronized void updateSubscribedTopics(final Set topics, + final String logPrefix) { + log.debug("{}found {} topics possibly matching subscription", logPrefix, topics); + subscriptionUpdates.clear(); + subscriptionUpdates.addAll(topics); + + log.debug("{}updating builder with {} topic(s) with possible matching regex subscription(s)", + logPrefix, subscriptionUpdates); + setRegexMatchedTopicsToSourceNodes(); + setRegexMatchedTopicToStateStore(); + } // following functions are for test only diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index dd00420a91e16..8bf7ef90c32e2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -690,7 +690,7 @@ public void run() { * @throws StreamsException if the store's change log does not contain the partition */ private void runLoop() { - consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener); + subscribeConsumer(); while (isRunning()) { try { @@ -713,7 +713,15 @@ private void runLoop() { private void enforceRebalance() { consumer.unsubscribe(); - consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener); + subscribeConsumer(); + } + + private void subscribeConsumer() { + if (builder.usesPatternSubscription()) { + consumer.subscribe(builder.sourceTopicPattern(), rebalanceListener); + } else { + consumer.subscribe(builder.sourceTopicCollection(), rebalanceListener); + } } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java index 169ce88a96a05..d3baf6e9951c3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java @@ -241,9 +241,9 @@ public ByteBuffer subscriptionUserData(final Set topics) { } protected static Set prepareForSubscription(final TaskManager taskManager, - final Set topics, - final Set standbyTasks, - final RebalanceProtocol rebalanceProtocol) { + final Set topics, + final Set standbyTasks, + final RebalanceProtocol rebalanceProtocol) { // Any tasks that are not yet running are counted as standby tasks for assignment purposes, // along with any old tasks for which we still found state on disk final Set activeTasks; diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 782d6b2ec79e2..9385ca1fd380f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -517,7 +517,7 @@ public void updateSubscriptionsFromAssignment(final List partiti assignedTopics.add(topicPartition.topic()); } - final Collection existingTopics = builder().subscriptionUpdates().getUpdates(); + final Collection existingTopics = builder().subscriptionUpdates(); if (!existingTopics.containsAll(assignedTopics)) { assignedTopics.addAll(existingTopics); builder().updateSubscribedTopics(assignedTopics, logPrefix); @@ -527,7 +527,7 @@ public void updateSubscriptionsFromAssignment(final List partiti public void updateSubscriptionsFromMetadata(final Set topics) { if (builder().sourceTopicPattern() != null) { - final Collection existingTopics = builder().subscriptionUpdates().getUpdates(); + final Collection existingTopics = builder().subscriptionUpdates(); if (!existingTopics.equals(topics)) { builder().updateSubscribedTopics(topics, logPrefix); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index 766b7dc64ccc2..d09337aae952d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; +import java.util.Arrays; import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; @@ -34,7 +35,6 @@ import org.apache.kafka.test.StreamsTestUtils; import org.junit.Test; -import java.lang.reflect.Field; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -231,7 +231,19 @@ public void testAddSinkConnectedWithMultipleParent() { } @Test - public void testSourceTopics() { + public void testOnlyTopicNameSourceTopics() { + builder.setApplicationId("X"); + builder.addSource(null, "source-1", null, null, null, "topic-1"); + builder.addSource(null, "source-2", null, null, null, "topic-2"); + builder.addSource(null, "source-3", null, null, null, "topic-3"); + builder.addInternalTopic("topic-3"); + + assertFalse(builder.usesPatternSubscription()); + assertEquals(Arrays.asList("X-topic-3", "topic-1", "topic-2"), builder.sourceTopicCollection()); + } + + @Test + public void testPatternSourceTopics() { builder.setApplicationId("X"); builder.addSource(null, "source-1", null, null, null, "topic-1"); builder.addSource(null, "source-2", null, null, null, "topic-2"); @@ -667,22 +679,18 @@ public void shouldAddInternalTopicConfigForRepartitionTopics() { @SuppressWarnings("unchecked") @Test - public void shouldSetCorrectSourceNodesWithRegexUpdatedTopics() throws Exception { + public void shouldSetCorrectSourceNodesWithRegexUpdatedTopics() { builder.addSource(null, "source-1", null, null, null, "topic-foo"); builder.addSource(null, "source-2", null, null, null, Pattern.compile("topic-[A-C]")); builder.addSource(null, "source-3", null, null, null, Pattern.compile("topic-\\d")); - final InternalTopologyBuilder.SubscriptionUpdates subscriptionUpdates = new InternalTopologyBuilder.SubscriptionUpdates(); - final Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions"); - updatedTopicsField.setAccessible(true); - - final Set updatedTopics = (Set) updatedTopicsField.get(subscriptionUpdates); + final Set updatedTopics = new HashSet<>(); updatedTopics.add("topic-B"); updatedTopics.add("topic-3"); updatedTopics.add("topic-A"); - builder.updateSubscriptions(subscriptionUpdates, null); + builder.updateSubscribedTopics(updatedTopics, null); builder.setApplicationId("test-id"); final Map topicGroups = builder.topicGroups(); @@ -690,7 +698,6 @@ public void shouldSetCorrectSourceNodesWithRegexUpdatedTopics() throws Exception assertTrue(topicGroups.get(1).sourceTopics.contains("topic-A")); assertTrue(topicGroups.get(1).sourceTopics.contains("topic-B")); assertTrue(topicGroups.get(2).sourceTopics.contains("topic-3")); - } @Test @@ -754,22 +761,18 @@ public void shouldSortProcessorNodesCorrectly() { @SuppressWarnings("unchecked") @Test - public void shouldConnectRegexMatchedTopicsToStateStore() throws Exception { + public void shouldConnectRegexMatchedTopicsToStateStore() { builder.addSource(null, "ingest", null, null, null, Pattern.compile("topic-\\d+")); builder.addProcessor("my-processor", new MockProcessorSupplier(), "ingest"); builder.addStateStore(storeBuilder, "my-processor"); - final InternalTopologyBuilder.SubscriptionUpdates subscriptionUpdates = new InternalTopologyBuilder.SubscriptionUpdates(); - final Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions"); - updatedTopicsField.setAccessible(true); - - final Set updatedTopics = (Set) updatedTopicsField.get(subscriptionUpdates); + final Set updatedTopics = new HashSet<>(); updatedTopics.add("topic-2"); updatedTopics.add("topic-3"); updatedTopics.add("topic-A"); - builder.updateSubscriptions(subscriptionUpdates, "test-thread"); + builder.updateSubscribedTopics(updatedTopics, "test-thread"); builder.setApplicationId("test-app"); final Map> stateStoreAndTopics = builder.stateStoreNameToSourceTopics(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 8e4b2c242166d..b95cec3a5cab7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -70,8 +70,7 @@ public class TaskManagerTest { private final Map> taskId0Assignment = Collections.singletonMap(taskId0, taskId0Partitions); private final Map taskId0PartitionToTaskId = Collections.singletonMap(t1p0, taskId0); - @Mock(type = MockType.STRICT) - private InternalTopologyBuilder.SubscriptionUpdates subscriptionUpdates; + @Mock(type = MockType.STRICT) private InternalTopologyBuilder topologyBuilder; @Mock(type = MockType.STRICT) @@ -118,6 +117,7 @@ public class TaskManagerTest { private final Set revokedTasks = new HashSet<>(); private final List revokedPartitions = new ArrayList<>(); private final List revokedChangelogs = Collections.emptyList(); + private Set subscriptionUpdates = Collections.emptySet(); @Rule public final TemporaryFolder testFolder = new TemporaryFolder(); @@ -152,69 +152,61 @@ private void replay() { @Test public void shouldUpdateSubscriptionFromAssignment() { mockTopologyBuilder(); - expect(subscriptionUpdates.getUpdates()).andReturn(Utils.mkSet(topic1)); + expect(topologyBuilder.subscriptionUpdates()).andReturn(Utils.mkSet(topic1)); topologyBuilder.updateSubscribedTopics(EasyMock.eq(Utils.mkSet(topic1, topic2)), EasyMock.anyString()); expectLastCall().once(); EasyMock.replay(activeTaskCreator, - topologyBuilder, - subscriptionUpdates); + topologyBuilder); taskManager.updateSubscriptionsFromAssignment(asList(t1p1, t2p1)); EasyMock.verify(activeTaskCreator, - topologyBuilder, - subscriptionUpdates); + topologyBuilder); } @Test public void shouldNotUpdateSubscriptionFromAssignment() { mockTopologyBuilder(); - expect(subscriptionUpdates.getUpdates()).andReturn(Utils.mkSet(topic1, topic2)); + expect(topologyBuilder.subscriptionUpdates()).andReturn(Utils.mkSet(topic1, topic2)); EasyMock.replay(activeTaskCreator, - topologyBuilder, - subscriptionUpdates); + topologyBuilder); taskManager.updateSubscriptionsFromAssignment(asList(t1p1)); EasyMock.verify(activeTaskCreator, - topologyBuilder, - subscriptionUpdates); + topologyBuilder); } @Test public void shouldUpdateSubscriptionFromMetadata() { mockTopologyBuilder(); - expect(subscriptionUpdates.getUpdates()).andReturn(Utils.mkSet(topic1)); + expect(topologyBuilder.subscriptionUpdates()).andReturn(Utils.mkSet(topic1)); topologyBuilder.updateSubscribedTopics(EasyMock.eq(Utils.mkSet(topic1, topic2)), EasyMock.anyString()); expectLastCall().once(); EasyMock.replay(activeTaskCreator, - topologyBuilder, - subscriptionUpdates); + topologyBuilder); taskManager.updateSubscriptionsFromMetadata(Utils.mkSet(topic1, topic2)); EasyMock.verify(activeTaskCreator, - topologyBuilder, - subscriptionUpdates); + topologyBuilder); } @Test public void shouldNotUpdateSubscriptionFromMetadata() { mockTopologyBuilder(); - expect(subscriptionUpdates.getUpdates()).andReturn(Utils.mkSet(topic1)); + expect(topologyBuilder.subscriptionUpdates()).andReturn(Utils.mkSet(topic1)); EasyMock.replay(activeTaskCreator, - topologyBuilder, - subscriptionUpdates); + topologyBuilder); taskManager.updateSubscriptionsFromMetadata(Utils.mkSet(topic1)); EasyMock.verify(activeTaskCreator, - topologyBuilder, - subscriptionUpdates); + topologyBuilder); } @Test @@ -677,7 +669,7 @@ private void mockAssignStandbyPartitions(final long offset) { } private void mockSingleActiveTask() { - expect(activeTaskCreator.createTasks(EasyMock.>anyObject(), + expect(activeTaskCreator.createTasks(EasyMock.anyObject(), EasyMock.eq(taskId0Assignment))) .andReturn(Collections.singletonList(streamTask)); @@ -685,7 +677,6 @@ private void mockSingleActiveTask() { private void mockTopologyBuilder() { expect(activeTaskCreator.builder()).andReturn(topologyBuilder).anyTimes(); - expect(topologyBuilder.sourceTopicPattern()).andReturn(Pattern.compile("abc")); - expect(topologyBuilder.subscriptionUpdates()).andReturn(subscriptionUpdates); + expect(topologyBuilder.sourceTopicPattern()).andReturn(Pattern.compile("abc")).anyTimes(); } }