diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java index c1a74e73b7160..aecd8abd744d3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java @@ -261,16 +261,20 @@ public KTable table(AutoOffsetReset offsetReset, Serde keySerde, addSource(offsetReset, source, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topic); addProcessor(name, processorSupplier, source); - final KTableImpl kTable = new KTableImpl<>(this, name, processorSupplier, Collections.singleton(source), storeName); - StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(storeName, - keySerde, - valSerde, - false, - Collections.emptyMap(), - true); - - addStateStore(storeSupplier, name); - connectSourceStoreAndTopic(storeName, topic); + final KTableImpl kTable = new KTableImpl<>(this, name, processorSupplier, Collections.singleton(source), storeName); + + // only materialize the KTable into a state store if the storeName is not null + if (storeName != null) { + StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(storeName, + keySerde, + valSerde, + false, + Collections.emptyMap(), + true); + + addStateStore(storeSupplier, name); + connectSourceStoreAndTopic(storeName, topic); + } return kTable; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index e8f09944dfca2..b25fcad3d1f2f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -97,9 +97,9 @@ public class TopologyBuilder { // are connected to these state stores private final Map> stateStoreNameToSourceTopics = new HashMap<>(); - // map from state store names that are directly associated with source processors to their subscribed topics, + // map from state store names to this state store's corresponding changelog topic if possible, // this is used in the extended KStreamBuilder. - private final HashMap sourceStoreToSourceTopic = new HashMap<>(); + private final Map storeToChangelogTopic = new HashMap<>(); // all global topics private final Set globalTopics = new HashSet<>(); @@ -784,11 +784,15 @@ public synchronized final TopologyBuilder connectProcessorAndStateStores(String return this; } + /** + * This is used only for KStreamBuilder: when adding a KTable from a source topic, + * we need to add the topic as the KTable's materialized state store's changelog. + */ protected synchronized final TopologyBuilder connectSourceStoreAndTopic(String sourceStoreName, String topic) { - if (sourceStoreToSourceTopic.containsKey(sourceStoreName)) { + if (storeToChangelogTopic.containsKey(sourceStoreName)) { throw new TopologyBuilderException("Source store " + sourceStoreName + " is already added."); } - sourceStoreToSourceTopic.put(sourceStoreName, topic); + storeToChangelogTopic.put(sourceStoreName, topic); return this; } @@ -1026,7 +1030,6 @@ private ProcessorTopology build(Set nodeGroup) { Map topicSourceMap = new HashMap<>(); Map topicSinkMap = new HashMap<>(); Map stateStoreMap = new LinkedHashMap<>(); - Map storeToProcessorNodeMap = new HashMap<>(); // create processor nodes in a topological order ("nodeFactories" is already topologically sorted) for (NodeFactory factory : nodeFactories.values()) { @@ -1041,9 +1044,22 @@ private ProcessorTopology build(Set nodeGroup) { } for (String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) { if (!stateStoreMap.containsKey(stateStoreName)) { - final StateStore stateStore = getStateStore(stateStoreName); + StateStore stateStore; + + if (stateFactories.containsKey(stateStoreName)) { + final StateStoreSupplier supplier = stateFactories.get(stateStoreName).supplier; + stateStore = supplier.get(); + + // remember the changelog topic if this state store is change-logging enabled + if (supplier.loggingEnabled() && !storeToChangelogTopic.containsKey(stateStoreName)) { + final String changelogTopic = ProcessorStateManager.storeChangelogTopic(this.applicationId, stateStoreName); + storeToChangelogTopic.put(stateStoreName, changelogTopic); + } + } else { + stateStore = globalStateStores.get(stateStoreName); + } + stateStoreMap.put(stateStoreName, stateStore); - storeToProcessorNodeMap.put(stateStore, node); } } } else if (factory instanceof SourceNodeFactory) { @@ -1077,13 +1093,7 @@ private ProcessorTopology build(Set nodeGroup) { } } - return new ProcessorTopology(processorNodes, - topicSourceMap, - topicSinkMap, - new ArrayList<>(stateStoreMap.values()), - sourceStoreToSourceTopic, - storeToProcessorNodeMap, - new ArrayList<>(globalStateStores.values())); + return new ProcessorTopology(processorNodes, topicSourceMap, topicSinkMap, new ArrayList<>(stateStoreMap.values()), storeToChangelogTopic, new ArrayList<>(globalStateStores.values())); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index bed331140fc4d..0730c68ca0c9f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -70,7 +70,7 @@ protected AbstractTask(TaskId id, // create the processor state manager try { - this.stateMgr = new ProcessorStateManager(applicationId, id, partitions, restoreConsumer, isStandby, stateDirectory, topology.sourceStoreToSourceTopic(), topology.storeToProcessorNodeMap()); + this.stateMgr = new ProcessorStateManager(applicationId, id, partitions, restoreConsumer, isStandby, stateDirectory, topology.storeToChangelogTopic()); } catch (IOException e) { throw new ProcessorStateException(String.format("task [%s] Error while creating the state manager", id), e); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index 927f62bfc84ab..75349931aa199 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -144,7 +144,7 @@ public void register(final StateStore store, } private List topicPartitionsForStore(final StateStore store) { - final String sourceTopic = topology.sourceStoreToSourceTopic().get(store.name()); + final String sourceTopic = topology.storeToChangelogTopic().get(store.name()); final List partitionInfos = consumer.partitionsFor(sourceTopic); if (partitionInfos == null || partitionInfos.isEmpty()) { throw new StreamsException(String.format("There are no partitions available for topic %s when initializing global store %s", sourceTopic, store.name())); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java index 9723f3cd80998..40f2a3c826aa8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java @@ -59,7 +59,7 @@ public GlobalStateUpdateTask(final ProcessorTopology topology, @SuppressWarnings("unchecked") public Map initialize() { final Set storeNames = stateMgr.initialize(processorContext); - final Map storeNameToTopic = topology.sourceStoreToSourceTopic(); + final Map storeNameToTopic = topology.storeToChangelogTopic(); for (final String storeName : storeNames) { final String sourceTopic = storeNameToTopic.get(storeName); final SourceNode source = topology.source(sourceTopic); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index dca8192a677cb..a21c3e8dc4f92 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -38,11 +38,9 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Set; import static java.util.Collections.singleton; @@ -53,52 +51,51 @@ public class ProcessorStateManager implements StateManager { public static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog"; public static final String CHECKPOINT_FILE_NAME = ".checkpoint"; - private final String logPrefix; - private final String applicationId; - private final int defaultPartition; - private final Map partitionForTopic; private final File baseDir; + private final TaskId taskId; + private final String logPrefix; + private final boolean isStandby; + private final StateDirectory stateDirectory; private final Map stores; private final Map globalStores; - private final Set loggingEnabled; private final Consumer restoreConsumer; + private final Map offsetLimits; private final Map restoredOffsets; private final Map checkpointedOffsets; - private final Map offsetLimits; - private final boolean isStandby; private final Map restoreCallbacks; // used for standby tasks, keyed by state topic name - private final Map sourceStoreToSourceTopic; - private final TaskId taskId; - private final StateDirectory stateDirectory; - private final Map stateStoreProcessorNodeMap; + private final Map storeToChangelogTopic; + + // TODO: this map does not work with customized grouper where multiple partitions + // of the same topic can be assigned to the same topic. + private final Map partitionForTopic; /** * @throws LockException if the state directory cannot be locked because another thread holds the lock * (this might be recoverable by retrying) * @throws IOException if any severe error happens while creating or locking the state directory */ - public ProcessorStateManager(String applicationId, TaskId taskId, Collection sources, Consumer restoreConsumer, boolean isStandby, - StateDirectory stateDirectory, final Map sourceStoreToSourceTopic, - final Map stateStoreProcessorNodeMap) throws LockException, IOException { - this.applicationId = applicationId; - this.defaultPartition = taskId.partition; + public ProcessorStateManager(final String applicationId, + final TaskId taskId, + final Collection sources, + final Consumer restoreConsumer, + final boolean isStandby, + final StateDirectory stateDirectory, + final Map storeToChangelogTopic) throws LockException, IOException { this.taskId = taskId; this.stateDirectory = stateDirectory; - this.stateStoreProcessorNodeMap = stateStoreProcessorNodeMap; + this.baseDir = stateDirectory.directoryForTask(taskId); this.partitionForTopic = new HashMap<>(); for (TopicPartition source : sources) { this.partitionForTopic.put(source.topic(), source); } this.stores = new LinkedHashMap<>(); this.globalStores = new HashMap<>(); - this.loggingEnabled = new HashSet<>(); this.restoreConsumer = restoreConsumer; + this.offsetLimits = new HashMap<>(); this.restoredOffsets = new HashMap<>(); this.isStandby = isStandby; this.restoreCallbacks = isStandby ? new HashMap() : null; - this.offsetLimits = new HashMap<>(); - this.baseDir = stateDirectory.directoryForTask(taskId); - this.sourceStoreToSourceTopic = sourceStoreToSourceTopic; + this.storeToChangelogTopic = storeToChangelogTopic; this.logPrefix = String.format("task [%s]", taskId); @@ -126,6 +123,9 @@ public File baseDir() { /** * @throws IllegalArgumentException if the store name has already been registered or if it is not a valid name * (e.g., when it conflicts with the names of internal topics, like the checkpoint file name) + * + * // TODO: parameter loggingEnabled can be removed now + * * @throws StreamsException if the store's change log does not contain the partition */ public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) { @@ -139,17 +139,8 @@ public void register(StateStore store, boolean loggingEnabled, StateRestoreCallb throw new IllegalArgumentException(String.format("%s Store %s has already been registered.", logPrefix, store.name())); } - if (loggingEnabled) { - this.loggingEnabled.add(store.name()); - } - // check that the underlying change log topic exist or not - String topic = null; - if (loggingEnabled) { - topic = storeChangelogTopic(this.applicationId, store.name()); - } else if (sourceStoreToSourceTopic != null && sourceStoreToSourceTopic.containsKey(store.name())) { - topic = sourceStoreToSourceTopic.get(store.name()); - } + String topic = storeToChangelogTopic.get(store.name()); if (topic == null) { this.stores.put(store.name(), store); @@ -283,7 +274,6 @@ public List> updateStandbyStates(TopicPartition s List> remainingRecords = null; // restore states from changelog records - StateRestoreCallback restoreCallback = restoreCallbacks.get(storePartition.topic()); long lastOffset = -1L; @@ -304,6 +294,7 @@ public List> updateStandbyStates(TopicPartition s } count++; } + // record the restored offset for its change log partition restoredOffsets.put(storePartition, lastOffset + 1); @@ -328,10 +319,6 @@ public void flush(final InternalProcessorContext context) { if (!this.stores.isEmpty()) { log.debug("{} Flushing all stores registered in the state manager", logPrefix); for (StateStore store : this.stores.values()) { - final ProcessorNode processorNode = stateStoreProcessorNodeMap.get(store); - if (processorNode != null) { - context.setCurrentNode(processorNode); - } try { log.trace("{} Flushing store={}", logPrefix, store.name()); store.flush(); @@ -364,24 +351,22 @@ public void close(Map ackedOffsets) throws IOException { if (ackedOffsets != null) { Map checkpointOffsets = new HashMap<>(); for (String storeName : stores.keySet()) { - TopicPartition part; - if (loggingEnabled.contains(storeName)) - part = new TopicPartition(storeChangelogTopic(applicationId, storeName), getPartition(storeName)); - else - part = new TopicPartition(storeName, getPartition(storeName)); + // only checkpoint the offset to the offsets file if + // it is persistent AND changelog enabled + if (stores.get(storeName).persistent() && storeToChangelogTopic.containsKey(storeName)) { + String changelogTopic = storeToChangelogTopic.get(storeName); + TopicPartition topicPartition = new TopicPartition(changelogTopic, getPartition(storeName)); - // only checkpoint the offset to the offsets file if it is persistent; - if (stores.get(storeName).persistent()) { - Long offset = ackedOffsets.get(part); + Long offset = ackedOffsets.get(topicPartition); if (offset != null) { // store the last offset + 1 (the log position after restoration) - checkpointOffsets.put(part, offset + 1); + checkpointOffsets.put(topicPartition, offset + 1); } else { // if no record was produced. we need to check the restored offset. - offset = restoredOffsets.get(part); + offset = restoredOffsets.get(topicPartition); if (offset != null) - checkpointOffsets.put(part, offset); + checkpointOffsets.put(topicPartition, offset); } } } @@ -400,7 +385,7 @@ public void close(Map ackedOffsets) throws IOException { private int getPartition(String topic) { TopicPartition partition = partitionForTopic.get(topic); - return partition == null ? defaultPartition : partition.partition(); + return partition == null ? taskId.partition : partition.partition(); } void registerGlobalStateStores(final List stateStores) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java index 1eff35176aa44..10042fbbaec1b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; + import org.apache.kafka.streams.processor.StateStore; import java.util.Collections; @@ -23,29 +24,27 @@ import java.util.List; import java.util.Map; import java.util.Set; + public class ProcessorTopology { private final List processorNodes; - private final Map sourceByTopics; - private final Map sinkByTopics; private final List stateStores; - private final Map sourceStoreToSourceTopic; - private final Map storeToProcessorNodeMap; private final List globalStateStores; + private final Map sourceByTopics; + private final Map sinkByTopics; + private final Map storeToChangelogTopic; public ProcessorTopology(final List processorNodes, final Map sourceByTopics, final Map sinkByTopics, final List stateStores, - final Map sourceStoreToSourceTopic, - final Map storeToProcessorNodeMap, + final Map storeToChangelogTopic, final List globalStateStores) { this.processorNodes = Collections.unmodifiableList(processorNodes); this.sourceByTopics = Collections.unmodifiableMap(sourceByTopics); this.sinkByTopics = Collections.unmodifiableMap(sinkByTopics); - this.stateStores = Collections.unmodifiableList(stateStores); - this.sourceStoreToSourceTopic = sourceStoreToSourceTopic; - this.storeToProcessorNodeMap = Collections.unmodifiableMap(storeToProcessorNodeMap); + this.stateStores = Collections.unmodifiableList(stateStores); + this.storeToChangelogTopic = Collections.unmodifiableMap(storeToChangelogTopic); this.globalStateStores = Collections.unmodifiableList(globalStateStores); } @@ -81,15 +80,10 @@ public List stateStores() { return stateStores; } - public Map sourceStoreToSourceTopic() { - return sourceStoreToSourceTopic; - } - - public Map storeToProcessorNodeMap() { - return storeToProcessorNodeMap; + public Map storeToChangelogTopic() { + return storeToChangelogTopic; } - public List globalStateStores() { return globalStateStores; } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java index 91b212713e97b..c32082c641c96 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java @@ -22,7 +22,6 @@ import org.apache.kafka.streams.kstream.internals.KStreamImpl; import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockKeyValueMapper; @@ -110,20 +109,6 @@ public void shouldHaveCorrectSourceTopicsForTableFromMergedStream() throws Excep final KStream source1 = builder.stream(topic1); final KStream source2 = builder.stream(topic2); final KStream source3 = builder.stream(topic3); - - final KStream merged = builder.merge(source1, source2, source3); - merged.groupByKey().count("my-table"); - final Map> actual = builder.stateStoreNameToSourceTopics(); - assertEquals(Utils.mkSet("topic-1", "topic-2", "topic-3"), actual.get("my-table")); - } - - @Test - public void shouldHaveCorrectSourceTopicsForTableFromMergedStreamWithProcessors() throws Exception { - final String topic1 = "topic-1"; - final String topic2 = "topic-2"; - final KStreamBuilder builder = new KStreamBuilder(); - final KStream source1 = builder.stream(topic1); - final KStream source2 = builder.stream(topic2); final KStream processedSource1 = source1.mapValues(new ValueMapper() { @Override @@ -143,10 +128,10 @@ public boolean test(final String key, final String value) { } }); - final KStream merged = builder.merge(processedSource1, processedSource2); + final KStream merged = builder.merge(processedSource1, processedSource2, source3); merged.groupByKey().count("my-table"); final Map> actual = builder.stateStoreNameToSourceTopics(); - assertEquals(Utils.mkSet("topic-1", "topic-2"), actual.get("my-table")); + assertEquals(Utils.mkSet("topic-1", "topic-2", "topic-3"), actual.get("my-table")); } @Test(expected = TopologyBuilderException.class) @@ -160,21 +145,19 @@ public void shouldThrowExceptionWhenTopicNamesAreNull() throws Exception { } @Test - public void shouldNotGroupGlobalTableWithOtherStreams() throws Exception { + public void shouldNotMaterializeSourceKTableIfStateNameNotSpecified() throws Exception { final KStreamBuilder builder = new KStreamBuilder(); - final GlobalKTable globalTable = builder.globalTable("table", "globalTable"); - final KStream stream = builder.stream("t1"); - final KeyValueMapper kvMapper = new KeyValueMapper() { - @Override - public String apply(final String key, final String value) { - return value; - } - }; - stream.leftJoin(globalTable, kvMapper, MockValueJoiner.TOSTRING_JOINER); - builder.stream("t2"); builder.setApplicationId("app-id"); - final Map> nodeGroups = builder.nodeGroups(); - assertEquals(Utils.mkSet("KTABLE-SOURCE-0000000001", "KSTREAM-SOURCE-0000000000"), nodeGroups.get(0)); + + builder.table("topic1", "table1"); + builder.table("topic2", null); + + ProcessorTopology topology = builder.build(null); + + assertEquals(1, topology.stateStores().size()); + assertEquals("table1", topology.stateStores().get(0).name()); + assertEquals(1, topology.storeToChangelogTopic().size()); + assertEquals("topic1", topology.storeToChangelogTopic().get("table1")); } @Test @@ -182,11 +165,10 @@ public void shouldBuildSimpleGlobalTableTopology() throws Exception { final KStreamBuilder builder = new KStreamBuilder(); builder.globalTable("table", "globalTable"); final ProcessorTopology topology = builder.buildGlobalStateTopology(); - final Map stateStoreProcessorNodeMap = topology.storeToProcessorNodeMap(); - assertEquals(1, stateStoreProcessorNodeMap.size()); - final StateStore store = stateStoreProcessorNodeMap.keySet().iterator().next(); + final List stateStores = topology.globalStateStores(); + final StateStore store = stateStores.iterator().next(); + assertEquals(1, stateStores.size()); assertEquals("globalTable", store.name()); - assertEquals("KTABLE-SOURCE-0000000001", stateStoreProcessorNodeMap.get(store).name()); } @Test @@ -251,5 +233,4 @@ public void shouldMapStateStoresToCorrectSourceTopics() throws Exception { assertEquals(Collections.singleton("table-topic"), builder.stateStoreNameToSourceTopics().get("table-store")); assertEquals(Collections.singleton("app-id-KSTREAM-MAP-0000000003-repartition"), builder.stateStoreNameToSourceTopics().get("count")); } - } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index b545005f12d45..4712320d160ed 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -547,7 +547,6 @@ public void shouldThroughOnUnassignedStateStoreAccess() { final ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig, builder, LocalMockProcessorSupplier.STORE_NAME); driver.process("topic", null, null); - } catch (final StreamsException e) { final Throwable cause = e.getCause(); if (cause != null diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java index 9816b6b256b0a..16967bcd1b0a4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java @@ -67,7 +67,6 @@ private AbstractTask createTask(final Consumer consumer) { Collections.emptyMap(), Collections.emptyList(), Collections.emptyMap(), - Collections.emptyMap(), Collections.emptyList()), consumer, consumer, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index 2d10c138e956f..db51cefabf587 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -85,7 +85,6 @@ public void before() throws IOException { Collections.emptyMap(), Collections.emptyList(), storeToTopic, - storeToProcessorNode, Arrays.asList(store1, store2)); context = new NoOpProcessorContext(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java index afb8e76ce202a..df0b73ca8a4cd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java @@ -74,7 +74,6 @@ public void before() { Collections.emptyMap(), Collections.emptyList(), storeToTopic, - Collections.emptyMap(), Collections.emptyList()); context = new NoOpProcessorContext(); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 7023712871b1b..de547230ae3f9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -29,7 +29,6 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; @@ -64,11 +63,11 @@ public class ProcessorStateManagerTest { public static class MockRestoreConsumer extends MockConsumer { private final Serializer serializer = new IntegerSerializer(); - public TopicPartition assignedPartition = null; - public TopicPartition seekPartition = null; - public long seekOffset = -1L; - public boolean seekToBeginingCalled = false; - public boolean seekToEndCalled = false; + private TopicPartition assignedPartition = null; + private TopicPartition seekPartition = null; + private long seekOffset = -1L; + private boolean seekToBeginingCalled = false; + private boolean seekToEndCalled = false; private long endOffset = 0L; private long currentOffset = 0L; @@ -193,7 +192,6 @@ public synchronized void seekToEnd(Collection partitions) { private final Set noPartitions = Collections.emptySet(); private final String applicationId = "test-application"; - private final String stateDir = "test"; private final String persistentStoreName = "persistentStore"; private final String nonPersistentStoreName = "nonPersistentStore"; private final String persistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId, persistentStoreName); @@ -214,7 +212,11 @@ public void cleanup() { public void testNoTopic() throws IOException { MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, null, Collections.emptyMap()); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, new HashMap() { + { + put(nonPersistentStoreName, nonPersistentStoreName); + } + }); try { stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback); } finally { @@ -242,7 +244,12 @@ public void testRegisterPersistentStore() throws IOException { MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, null, Collections.emptyMap()); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, new HashMap() { + { + put(persistentStoreName, persistentStoreTopicName); + put(nonPersistentStoreName, nonPersistentStoreName); + } + }); try { restoreConsumer.reset(); @@ -291,7 +298,12 @@ public void testRegisterNonPersistentStore() throws IOException { MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non persistent store - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 2), noPartitions, restoreConsumer, false, stateDirectory, null, Collections.emptyMap()); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 2), noPartitions, restoreConsumer, false, stateDirectory, new HashMap() { + { + put(persistentStoreName, persistentStoreTopicName); + put(nonPersistentStoreName, nonPersistentStoreTopicName); + } + }); try { restoreConsumer.reset(); @@ -332,6 +344,11 @@ public void testChangeLogOffsets() throws IOException { String storeTopicName2 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName2); String storeTopicName3 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName3); + Map storeToChangelogTopic = new HashMap<>(); + storeToChangelogTopic.put(storeName1, storeTopicName1); + storeToChangelogTopic.put(storeName2, storeTopicName2); + storeToChangelogTopic.put(storeName3, storeTopicName3); + OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME)); checkpoint.write(Collections.singletonMap(new TopicPartition(storeTopicName1, 0), lastCheckpointedOffset)); @@ -364,7 +381,7 @@ public void testChangeLogOffsets() throws IOException { // if there is an source partition, inherit the partition id Set sourcePartitions = Utils.mkSet(new TopicPartition(storeTopicName3, 1)); - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, sourcePartitions, restoreConsumer, true, stateDirectory, null, Collections.emptyMap()); // standby + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, sourcePartitions, restoreConsumer, true, stateDirectory, storeToChangelogTopic); // standby try { restoreConsumer.reset(); @@ -398,7 +415,7 @@ public void testGetStore() throws IOException { MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, restoreConsumer, false, stateDirectory, null, Collections.emptyMap()); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, restoreConsumer, false, stateDirectory, Collections.emptyMap()); try { stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback); @@ -436,7 +453,12 @@ public void testFlushAndClose() throws IOException { MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true); MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, null, Collections.emptyMap()); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, new HashMap() { + { + put(persistentStoreName, persistentStoreTopicName); + put(nonPersistentStoreName, nonPersistentStoreTopicName); + } + }); try { // make sure the checkpoint file is deleted assertFalse(checkpointFile.exists()); @@ -469,7 +491,7 @@ public void testFlushAndClose() throws IOException { @Test public void shouldRegisterStoreWithoutLoggingEnabledAndNotBackedByATopic() throws Exception { MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, null, Collections.emptyMap()); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, Collections.emptyMap()); stateMgr.register(mockStateStore, false, mockStateStore.stateRestoreCallback); assertNotNull(stateMgr.getStore(nonPersistentStoreName)); } @@ -490,7 +512,7 @@ public void shouldNotWriteCheckpointsIfAckeOffsetsIsNull() throws Exception { final MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true); - final ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, null, Collections.emptyMap()); + final ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, Collections.emptyMap()); restoreConsumer.reset(); stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index fdf0115620171..2d32e780faee3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -80,8 +80,12 @@ public class StandbyTaskTest { new MockStateStoreSupplier(storeName1, false).get(), new MockStateStoreSupplier(storeName2, true).get() ), - Collections.emptyMap(), - Collections.emptyMap(), + new HashMap() { + { + put(storeName1, storeChangelogTopicName1); + put(storeName2, storeChangelogTopicName2); + } + }, Collections.emptyList()); private final TopicPartition ktable = new TopicPartition("ktable1", 0); @@ -94,11 +98,10 @@ public class StandbyTaskTest { new MockStateStoreSupplier(ktable.topic(), true, false).get() ), new HashMap() { - { - put("ktable1", ktable.topic()); - } - }, - Collections.emptyMap(), + { + put("ktable1", ktable.topic()); + } + }, Collections.emptyList()); private File baseDir; private StateDirectory stateDirectory; @@ -320,7 +323,7 @@ public void shouldNotThrowUnsupportedOperationExceptionWhenInitializingStateStor new PartitionInfo(changelogName, 0, Node.noNode(), new Node[0], new Node[0]))); final KStreamBuilder builder = new KStreamBuilder(); builder.stream("topic").groupByKey().count("my-store"); - final ProcessorTopology topology = builder.build(0); + final ProcessorTopology topology = builder.setApplicationId(applicationId).build(0); StreamsConfig config = createConfig(baseDir); new StandbyTask(taskId, applicationId, partitions, topology, consumer, restoreStateConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 4b9f92fa03e3b..4e95911e8d4b4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -91,7 +91,6 @@ public class StreamTaskTest { Collections.emptyMap(), Collections.emptyList(), Collections.emptyMap(), - Collections.emptyMap(), Collections.emptyList()); private File baseDir; private StateDirectory stateDirectory; @@ -350,10 +349,10 @@ public void process(final Object key, final Object value) { Collections.emptyMap(), Collections.emptyList(), Collections.emptyMap(), - Collections.emptyMap(), Collections.emptyList()); - final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, restoreStateConsumer, config, streamsMetrics, stateDirectory, new ThreadCache("testCache", 0, streamsMetrics), new MockTime(), recordCollector); + final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", partitions, + topology, consumer, restoreStateConsumer, config, streamsMetrics, stateDirectory, new ThreadCache("testCache", 0, streamsMetrics), new MockTime(), recordCollector); final int offset = 20; streamTask.addRecords(partition1, Collections.singletonList( new ConsumerRecord<>(partition1.topic(), partition1.partition(), offset, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue))); @@ -400,7 +399,6 @@ public void punctuate(final long timestamp) { Collections.emptyMap(), Collections.emptyList(), Collections.emptyMap(), - Collections.emptyMap(), Collections.emptyList()); final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics()); final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, @@ -424,7 +422,6 @@ public void shouldFlushRecordCollectorOnFlushState() throws Exception { Collections.emptyMap(), Collections.emptyList(), Collections.emptyMap(), - Collections.emptyMap(), Collections.emptyList()); final AtomicBoolean flushed = new AtomicBoolean(false); final NoOpRecordCollector recordCollector = new NoOpRecordCollector() { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 8f03e4fb4216e..20d428b16ff5f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -706,7 +706,7 @@ clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(bui @Test public void shouldNotNullPointerWhenStandbyTasksAssignedAndNoStateStoresForTopology() throws Exception { final TopologyBuilder builder = new TopologyBuilder(); - builder.setApplicationId("appId") + builder.setApplicationId(applicationId) .addSource("name", "topic") .addSink("out", "output"); @@ -729,7 +729,7 @@ Map> standbyTasks() { @Test public void shouldInitializeRestoreConsumerWithOffsetsFromStandbyTasks() throws Exception { final KStreamBuilder builder = new KStreamBuilder(); - builder.setApplicationId("appId"); + builder.setApplicationId(applicationId); builder.stream("t1").groupByKey().count("count-one"); builder.stream("t2").groupByKey().count("count-two"); final StreamsConfig config = new StreamsConfig(configProps()); @@ -780,7 +780,7 @@ Map> standbyTasks() { @Test public void shouldCloseSuspendedTasksThatAreNoLongerAssignedToThisStreamThreadBeforeCreatingNewTasks() throws Exception { final KStreamBuilder builder = new KStreamBuilder(); - builder.setApplicationId("appId"); + builder.setApplicationId(applicationId); builder.stream("t1").groupByKey().count("count-one"); builder.stream("t2").groupByKey().count("count-two"); final StreamsConfig config = new StreamsConfig(configProps()); @@ -845,7 +845,7 @@ Map> activeTasks() { @Test public void shouldCloseActiveTasksThatAreAssignedToThisStreamThreadButAssignmentHasChangedBeforeCreatingNewTasks() throws Exception { final KStreamBuilder builder = new KStreamBuilder(); - builder.setApplicationId("appId"); + builder.setApplicationId(applicationId); builder.stream(Pattern.compile("t.*")).to("out"); final StreamsConfig config = new StreamsConfig(configProps()); final MockClientSupplier clientSupplier = new MockClientSupplier(); @@ -857,7 +857,7 @@ clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(bui @Override protected StreamTask createStreamTask(final TaskId id, final Collection partitions) { final ProcessorTopology topology = builder.build(id.topicGroupId); - final TestStreamTask task = new TestStreamTask(id, "appId", partitions, topology, consumer, + final TestStreamTask task = new TestStreamTask(id, applicationId, partitions, topology, consumer, producer, restoreConsumer, config, new MockStreamsMetrics(new Metrics()), stateDirectory); createdTasks.put(partitions, task); return task; diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java index b38daf149377c..d51384c12bc72 100644 --- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java @@ -240,20 +240,9 @@ public Map allStateStores() { } public void flushState() { - final ProcessorNode current = currNode; - try { - for (StateStore stateStore : context.allStateStores().values()) { - final ProcessorNode processorNode = topology.storeToProcessorNodeMap().get(stateStore); - if (processorNode != null) { - currNode = processorNode; - } - stateStore.flush(); - } - } finally { - currNode = current; - + for (StateStore stateStore : context.allStateStores().values()) { + stateStore.flush(); } - } public void setCurrentNode(final ProcessorNode currentNode) { diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java index 0850b6052789e..89ca0df9e584c 100644 --- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java @@ -162,7 +162,7 @@ public class ProcessorTopologyTestDriver { */ public ProcessorTopologyTestDriver(StreamsConfig config, TopologyBuilder builder, String... storeNames) { id = new TaskId(0, 0); - topology = builder.setApplicationId("ProcessorTopologyTestDriver").build(null); + topology = builder.setApplicationId(applicationId).build(null); globalTopology = builder.buildGlobalStateTopology(); // Set up the consumer and producer ...