From 2073cd23029f804ec91b19ae7a2919c2b8102244 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Mon, 9 Jan 2017 21:31:41 -0800 Subject: [PATCH 1/5] refactor topology builder --- .../kafka/streams/kstream/KStreamBuilder.java | 24 ++++--- .../streams/processor/TopologyBuilder.java | 22 ++++--- .../processor/internals/AbstractTask.java | 2 +- .../internals/ProcessorStateManager.java | 63 ++++++++----------- .../internals/ProcessorTopology.java | 29 ++++----- .../processor/internals/StandbyTask.java | 5 ++ .../processor/TopologyBuilderTest.java | 1 - .../processor/internals/AbstractTaskTest.java | 3 +- .../internals/ProcessorStateManagerTest.java | 17 +++-- .../processor/internals/StandbyTaskTest.java | 14 +++-- .../processor/internals/StreamTaskTest.java | 12 ++-- .../apache/kafka/test/KStreamTestDriver.java | 15 +---- .../test/ProcessorTopologyTestDriver.java | 2 +- 13 files changed, 97 insertions(+), 112 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java index 33085c9fa98b0..30ad28dc3b921 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java @@ -152,16 +152,20 @@ public KTable table(Serde keySerde, Serde valSerde, String to addSource(source, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topic); addProcessor(name, processorSupplier, source); - final KTableImpl kTable = new KTableImpl<>(this, name, processorSupplier, Collections.singleton(source), storeName); - StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(storeName, - keySerde, - valSerde, - false, - Collections.emptyMap(), - true); - - addStateStore(storeSupplier, name); - connectSourceStoreAndTopic(storeName, topic); + final KTableImpl kTable = new KTableImpl<>(this, name, processorSupplier, Collections.singleton(source), storeName); + + // only materialize the KTable into a state store if the storeName is not null + if (storeName != null) { + StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(storeName, + keySerde, + valSerde, + false, + Collections.emptyMap(), + true); + + addStateStore(storeSupplier, name); + connectSourceStoreAndTopic(storeName, topic); + } return kTable; } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index b055d523240de..cae9444f716a6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -89,9 +89,9 @@ public class TopologyBuilder { // are connected to these state stores private final Map> stateStoreNameToSourceTopics = new HashMap<>(); - // map from state store names that are directly associated with source processors to their subscribed topics, + // map from state store names to ths state store's corresponding changelog topic if possible, // this is used in the extended KStreamBuilder. - private final HashMap sourceStoreToSourceTopic = new HashMap<>(); + private final Map storeToChangelogTopic = new HashMap<>(); private final QuickUnion nodeGrouper = new QuickUnion<>(); @@ -607,11 +607,15 @@ public synchronized final TopologyBuilder connectProcessorAndStateStores(String return this; } + /** + * This is used only for KStreamBuilder: when adding a KTable from a source topic, + * we need to add the topic as the KTable's materialized state store's changelog. + */ protected synchronized final TopologyBuilder connectSourceStoreAndTopic(String sourceStoreName, String topic) { - if (sourceStoreToSourceTopic.containsKey(sourceStoreName)) { + if (storeToChangelogTopic.containsKey(sourceStoreName)) { throw new TopologyBuilderException("Source store " + sourceStoreName + " is already added."); } - sourceStoreToSourceTopic.put(sourceStoreName, topic); + storeToChangelogTopic.put(sourceStoreName, topic); return this; } @@ -796,7 +800,6 @@ private ProcessorTopology build(Set nodeGroup) { Map topicSourceMap = new HashMap<>(); Map topicSinkMap = new HashMap<>(); Map stateStoreMap = new LinkedHashMap<>(); - Map storeToProcessorNodeMap = new HashMap<>(); // create processor nodes in a topological order ("nodeFactories" is already topologically sorted) for (NodeFactory factory : nodeFactories.values()) { @@ -814,7 +817,12 @@ private ProcessorTopology build(Set nodeGroup) { final StateStoreSupplier supplier = stateFactories.get(stateStoreName).supplier; final StateStore stateStore = supplier.get(); stateStoreMap.put(stateStoreName, stateStore); - storeToProcessorNodeMap.put(stateStore, node); + + // remember the changelog topic if this state store is change-logging enabled + if (supplier.loggingEnabled() && !storeToChangelogTopic.containsKey(stateStoreName)) { + final String changelogTopic = ProcessorStateManager.storeChangelogTopic(this.applicationId, stateStoreName); + storeToChangelogTopic.put(stateStoreName, changelogTopic); + } } } } else if (factory instanceof SourceNodeFactory) { @@ -848,7 +856,7 @@ private ProcessorTopology build(Set nodeGroup) { } } - return new ProcessorTopology(processorNodes, topicSourceMap, topicSinkMap, new ArrayList<>(stateStoreMap.values()), sourceStoreToSourceTopic, storeToProcessorNodeMap); + return new ProcessorTopology(processorNodes, topicSourceMap, topicSinkMap, new ArrayList<>(stateStoreMap.values()), storeToChangelogTopic); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index b438b75f29e9e..04086336b52fa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -70,7 +70,7 @@ protected AbstractTask(TaskId id, // create the processor state manager try { - this.stateMgr = new ProcessorStateManager(applicationId, id, partitions, restoreConsumer, isStandby, stateDirectory, topology.sourceStoreToSourceTopic(), topology.storeToProcessorNodeMap()); + this.stateMgr = new ProcessorStateManager(applicationId, id, partitions, restoreConsumer, isStandby, stateDirectory, topology.sourceStoreToSourceTopic()); } catch (IOException e) { throw new ProcessorStateException(String.format("task [%s] Error while creating the state manager", id), e); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index c81df6c350454..b580a82064ead 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -38,11 +38,9 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.Set; import static java.util.Collections.singleton; @@ -53,50 +51,51 @@ public class ProcessorStateManager { public static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog"; public static final String CHECKPOINT_FILE_NAME = ".checkpoint"; + private final File baseDir; + private final TaskId taskId; private final String logPrefix; + private final boolean isStandby; private final String applicationId; - private final int defaultPartition; - private final Map partitionForTopic; - private final File baseDir; + private final StateDirectory stateDirectory; private final Map stores; - private final Set loggingEnabled; private final Consumer restoreConsumer; + private final Map offsetLimits; private final Map restoredOffsets; private final Map checkpointedOffsets; - private final Map offsetLimits; - private final boolean isStandby; private final Map restoreCallbacks; // used for standby tasks, keyed by state topic name - private final Map sourceStoreToSourceTopic; - private final TaskId taskId; - private final StateDirectory stateDirectory; - private final Map stateStoreProcessorNodeMap; + private final Map storeToChangelogTopic; + + // TODO: this map does not work with customized grouper where multiple partitions + // of the same topic can be assigned to the same topic. + private final Map partitionForTopic; /** * @throws LockException if the state directory cannot be locked because another thread holds the lock * (this might be recoverable by retrying) * @throws IOException if any severe error happens while creating or locking the state directory */ - public ProcessorStateManager(String applicationId, TaskId taskId, Collection sources, Consumer restoreConsumer, boolean isStandby, - StateDirectory stateDirectory, final Map sourceStoreToSourceTopic, - final Map stateStoreProcessorNodeMap) throws LockException, IOException { + public ProcessorStateManager(final String applicationId, + final TaskId taskId, + final Collection sources, + final Consumer restoreConsumer, + final boolean isStandby, + final StateDirectory stateDirectory, + final Map storeToChangelogTopic) throws LockException, IOException { this.applicationId = applicationId; - this.defaultPartition = taskId.partition; this.taskId = taskId; this.stateDirectory = stateDirectory; - this.stateStoreProcessorNodeMap = stateStoreProcessorNodeMap; + this.baseDir = stateDirectory.directoryForTask(taskId); this.partitionForTopic = new HashMap<>(); for (TopicPartition source : sources) { this.partitionForTopic.put(source.topic(), source); } this.stores = new LinkedHashMap<>(); - this.loggingEnabled = new HashSet<>(); this.restoreConsumer = restoreConsumer; + this.offsetLimits = new HashMap<>(); this.restoredOffsets = new HashMap<>(); this.isStandby = isStandby; this.restoreCallbacks = isStandby ? new HashMap() : null; - this.offsetLimits = new HashMap<>(); - this.baseDir = stateDirectory.directoryForTask(taskId); - this.sourceStoreToSourceTopic = sourceStoreToSourceTopic; + this.storeToChangelogTopic = storeToChangelogTopic; this.logPrefix = String.format("task [%s]", taskId); @@ -124,6 +123,9 @@ public File baseDir() { /** * @throws IllegalArgumentException if the store name has already been registered or if it is not a valid name * (e.g., when it conflicts with the names of internal topics, like the checkpoint file name) + * + * // TODO: parameter loggingEnabled can be removed now + * * @throws StreamsException if the store's change log does not contain the partition */ public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) { @@ -137,17 +139,8 @@ public void register(StateStore store, boolean loggingEnabled, StateRestoreCallb throw new IllegalArgumentException(String.format("%s Store %s has already been registered.", logPrefix, store.name())); } - if (loggingEnabled) { - this.loggingEnabled.add(store.name()); - } - // check that the underlying change log topic exist or not - String topic = null; - if (loggingEnabled) { - topic = storeChangelogTopic(this.applicationId, store.name()); - } else if (sourceStoreToSourceTopic != null && sourceStoreToSourceTopic.containsKey(store.name())) { - topic = sourceStoreToSourceTopic.get(store.name()); - } + String topic = storeToChangelogTopic.get(store.name()); if (topic == null) { this.stores.put(store.name(), store); @@ -325,10 +318,6 @@ public void flush(final InternalProcessorContext context) { if (!this.stores.isEmpty()) { log.debug("{} Flushing all stores registered in the state manager", logPrefix); for (StateStore store : this.stores.values()) { - final ProcessorNode processorNode = stateStoreProcessorNodeMap.get(store); - if (processorNode != null) { - context.setCurrentNode(processorNode); - } try { log.trace("{} Flushing store={}", logPrefix, store.name()); store.flush(); @@ -361,7 +350,7 @@ public void close(Map ackedOffsets) throws IOException { Map checkpointOffsets = new HashMap<>(); for (String storeName : stores.keySet()) { TopicPartition part; - if (loggingEnabled.contains(storeName)) + if (storeToChangelogTopic.containsKey(storeName)) part = new TopicPartition(storeChangelogTopic(applicationId, storeName), getPartition(storeName)); else part = new TopicPartition(storeName, getPartition(storeName)); @@ -396,6 +385,6 @@ public void close(Map ackedOffsets) throws IOException { private int getPartition(String topic) { TopicPartition partition = partitionForTopic.get(topic); - return partition == null ? defaultPartition : partition.partition(); + return partition == null ? taskId.partition : partition.partition(); } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java index 9ccc25292c65e..5883c8aabb580 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals; + import org.apache.kafka.streams.processor.StateStore; import java.util.Collections; @@ -23,27 +24,25 @@ import java.util.List; import java.util.Map; import java.util.Set; + public class ProcessorTopology { private final List processorNodes; private final Map sourceByTopics; private final Map sinkByTopics; private final List stateStores; - private final Map sourceStoreToSourceTopic; - private final Map storeToProcessorNodeMap; - - public ProcessorTopology(List processorNodes, - Map sourceByTopics, - Map sinkByTopics, - List stateStores, - Map sourceStoreToSourceTopic, - Map storeToProcessorNodeMap) { + private final Map storeToChangelogTopic; + + public ProcessorTopology(final List processorNodes, + final Map sourceByTopics, + final Map sinkByTopics, + final List stateStores, + final Map storeToChangelogTopic) { this.processorNodes = Collections.unmodifiableList(processorNodes); this.sourceByTopics = Collections.unmodifiableMap(sourceByTopics); this.sinkByTopics = Collections.unmodifiableMap(sinkByTopics); - this.stateStores = Collections.unmodifiableList(stateStores); - this.sourceStoreToSourceTopic = sourceStoreToSourceTopic; - this.storeToProcessorNodeMap = Collections.unmodifiableMap(storeToProcessorNodeMap); + this.stateStores = Collections.unmodifiableList(stateStores); + this.storeToChangelogTopic = Collections.unmodifiableMap(storeToChangelogTopic); } public Set sourceTopics() { @@ -79,11 +78,7 @@ public List stateStores() { } public Map sourceStoreToSourceTopic() { - return sourceStoreToSourceTopic; - } - - public Map storeToProcessorNodeMap() { - return storeToProcessorNodeMap; + return storeToChangelogTopic; } private String childrenToString(String indent, List> children) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 4437a1955e700..a543e6504d9ce 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -97,6 +97,11 @@ public void commit() { initializeOffsetLimits(); } + @Override + protected Map recordCollectorOffsets() { + return recordCollector.offsets(); + } + @Override public void close() { //no-op diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index c402c9b10b168..d71b32ee88da1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -547,7 +547,6 @@ public void shouldThroughOnUnassignedStateStoreAccess() { final ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig, builder, LocalMockProcessorSupplier.STORE_NAME); driver.process("topic", null, null); - } catch (final StreamsException e) { final Throwable cause = e.getCause(); if (cause != null diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java index b1c19ba060afe..9b95e7b9aaef3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java @@ -65,8 +65,7 @@ private AbstractTask createTask(final Consumer consumer) { Collections.emptyMap(), Collections.emptyMap(), Collections.emptyList(), - Collections.emptyMap(), - Collections.emptyMap() + Collections.emptyMap() ), consumer, consumer, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 7023712871b1b..54ccdc4f95823 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -29,7 +29,6 @@ import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.state.StateSerdes; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; @@ -214,7 +213,7 @@ public void cleanup() { public void testNoTopic() throws IOException { MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, null, Collections.emptyMap()); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, Collections.emptyMap()); try { stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback); } finally { @@ -242,7 +241,7 @@ public void testRegisterPersistentStore() throws IOException { MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, null, Collections.emptyMap()); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, Collections.emptyMap()); try { restoreConsumer.reset(); @@ -291,7 +290,7 @@ public void testRegisterNonPersistentStore() throws IOException { MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non persistent store - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 2), noPartitions, restoreConsumer, false, stateDirectory, null, Collections.emptyMap()); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 2), noPartitions, restoreConsumer, false, stateDirectory, Collections.emptyMap()); try { restoreConsumer.reset(); @@ -364,7 +363,7 @@ public void testChangeLogOffsets() throws IOException { // if there is an source partition, inherit the partition id Set sourcePartitions = Utils.mkSet(new TopicPartition(storeTopicName3, 1)); - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, sourcePartitions, restoreConsumer, true, stateDirectory, null, Collections.emptyMap()); // standby + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, sourcePartitions, restoreConsumer, true, stateDirectory, Collections.emptyMap()); // standby try { restoreConsumer.reset(); @@ -398,7 +397,7 @@ public void testGetStore() throws IOException { MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, restoreConsumer, false, stateDirectory, null, Collections.emptyMap()); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, restoreConsumer, false, stateDirectory, Collections.emptyMap()); try { stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback); @@ -436,7 +435,7 @@ public void testFlushAndClose() throws IOException { MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true); MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, null, Collections.emptyMap()); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, null); try { // make sure the checkpoint file is deleted assertFalse(checkpointFile.exists()); @@ -469,7 +468,7 @@ public void testFlushAndClose() throws IOException { @Test public void shouldRegisterStoreWithoutLoggingEnabledAndNotBackedByATopic() throws Exception { MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, null, Collections.emptyMap()); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, Collections.emptyMap()); stateMgr.register(mockStateStore, false, mockStateStore.stateRestoreCallback); assertNotNull(stateMgr.getStore(nonPersistentStoreName)); } @@ -490,7 +489,7 @@ public void shouldNotWriteCheckpointsIfAckeOffsetsIsNull() throws Exception { final MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true); - final ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, null, Collections.emptyMap()); + final ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, Collections.emptyMap()); restoreConsumer.reset(); stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 10a86fe6db8f6..6d630ffb9a839 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -32,7 +32,6 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.test.MockStateStoreSupplier; @@ -81,8 +80,12 @@ public class StandbyTaskTest { new MockStateStoreSupplier(storeName1, false).get(), new MockStateStoreSupplier(storeName2, true).get() ), - Collections.emptyMap(), - Collections.emptyMap()); + new HashMap() { + { + put(storeName1, storeChangelogTopicName1); + put(storeName2, storeChangelogTopicName2); + } + }); private final TopicPartition ktable = new TopicPartition("ktable1", 0); private final Set ktablePartitions = Utils.mkSet(ktable); @@ -97,8 +100,7 @@ public class StandbyTaskTest { { put("ktable1", ktable.topic()); } - }, - Collections.emptyMap()); + }); private File baseDir; private StateDirectory stateDirectory; @@ -319,7 +321,7 @@ public void shouldNotThrowUnsupportedOperationExceptionWhenInitializingStateStor new PartitionInfo(changelogName, 0, Node.noNode(), new Node[0], new Node[0]))); final KStreamBuilder builder = new KStreamBuilder(); builder.stream("topic").groupByKey().count("my-store"); - final ProcessorTopology topology = builder.build(0); + final ProcessorTopology topology = builder.setApplicationId(applicationId).build(0); StreamsConfig config = createConfig(baseDir); new StandbyTask(taskId, applicationId, partitions, topology, consumer, restoreStateConsumer, config, new StreamsMetrics() { @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 3d50007f46e2a..3cfb7df3dc527 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -86,8 +86,7 @@ public class StreamTaskTest { }, Collections.emptyMap(), Collections.emptyList(), - Collections.emptyMap(), - Collections.emptyMap()); + Collections.emptyMap()); private File baseDir; private StateDirectory stateDirectory; private RecordCollectorImpl recordCollector; @@ -314,8 +313,7 @@ public void process(final Object key, final Object value) { sourceNodes, Collections.emptyMap(), Collections.emptyList(), - Collections.emptyMap(), - Collections.emptyMap()); + Collections.emptyMap()); final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, restoreStateConsumer, config, null, stateDirectory, new ThreadCache(0), recordCollector); final int offset = 20; streamTask.addRecords(partition1, Collections.singletonList( @@ -362,8 +360,7 @@ public void punctuate(final long timestamp) { Collections.emptyMap(), Collections.emptyMap(), Collections.emptyList(), - Collections.emptyMap(), - Collections.emptyMap()); + Collections.emptyMap()); final StreamTask streamTask = new StreamTask(new TaskId(0, 0), "applicationId", partitions, topology, consumer, restoreStateConsumer, config, null, stateDirectory, new ThreadCache(0), recordCollector); try { @@ -382,8 +379,7 @@ public void shouldFlushRecordCollectorOnFlushState() throws Exception { Collections.emptyMap(), Collections.emptyMap(), Collections.emptyList(), - Collections.emptyMap(), - Collections.emptyMap()); + Collections.emptyMap()); final AtomicBoolean flushed = new AtomicBoolean(false); final NoOpRecordCollector recordCollector = new NoOpRecordCollector() { @Override diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java index f51cc0e57bd0d..81e8460124450 100644 --- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java @@ -225,20 +225,9 @@ public Map allStateStores() { } public void flushState() { - final ProcessorNode current = currNode; - try { - for (StateStore stateStore : context.allStateStores().values()) { - final ProcessorNode processorNode = topology.storeToProcessorNodeMap().get(stateStore); - if (processorNode != null) { - currNode = processorNode; - } - stateStore.flush(); - } - } finally { - currNode = current; - + for (StateStore stateStore : context.allStateStores().values()) { + stateStore.flush(); } - } public void setCurrentNode(final ProcessorNode currentNode) { diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java index 7dad4082c4628..823e4223f97f7 100644 --- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java @@ -152,7 +152,7 @@ public class ProcessorTopologyTestDriver { */ public ProcessorTopologyTestDriver(StreamsConfig config, TopologyBuilder builder, String... storeNames) { id = new TaskId(0, 0); - topology = builder.setApplicationId("ProcessorTopologyTestDriver").build(null); + topology = builder.setApplicationId(applicationId).build(null); // Set up the consumer and producer ... consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); From 675caca2121f47f846c067365a106d2eb0861a6e Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Mon, 9 Jan 2017 22:18:08 -0800 Subject: [PATCH 2/5] fix unit tests --- .../internals/ProcessorStateManager.java | 22 +++++---- .../processor/internals/StandbyTask.java | 5 --- .../internals/ProcessorStateManagerTest.java | 45 ++++++++++++++----- .../processor/internals/StreamThreadTest.java | 10 ++--- 4 files changed, 49 insertions(+), 33 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index b580a82064ead..8054757f63f8e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -274,7 +274,6 @@ public List> updateStandbyStates(TopicPartition s List> remainingRecords = null; // restore states from changelog records - StateRestoreCallback restoreCallback = restoreCallbacks.get(storePartition.topic()); long lastOffset = -1L; @@ -295,6 +294,7 @@ public List> updateStandbyStates(TopicPartition s } count++; } + // record the restored offset for its change log partition restoredOffsets.put(storePartition, lastOffset + 1); @@ -349,24 +349,22 @@ public void close(Map ackedOffsets) throws IOException { if (ackedOffsets != null) { Map checkpointOffsets = new HashMap<>(); for (String storeName : stores.keySet()) { - TopicPartition part; - if (storeToChangelogTopic.containsKey(storeName)) - part = new TopicPartition(storeChangelogTopic(applicationId, storeName), getPartition(storeName)); - else - part = new TopicPartition(storeName, getPartition(storeName)); + // only checkpoint the offset to the offsets file if + // it is persistent AND changelog enabled + if (stores.get(storeName).persistent() && storeToChangelogTopic.containsKey(storeName)) { + String changelogTopic = storeToChangelogTopic.get(storeName); + TopicPartition topicPartition = new TopicPartition(changelogTopic, getPartition(storeName)); - // only checkpoint the offset to the offsets file if it is persistent; - if (stores.get(storeName).persistent()) { - Long offset = ackedOffsets.get(part); + Long offset = ackedOffsets.get(topicPartition); if (offset != null) { // store the last offset + 1 (the log position after restoration) - checkpointOffsets.put(part, offset + 1); + checkpointOffsets.put(topicPartition, offset + 1); } else { // if no record was produced. we need to check the restored offset. - offset = restoredOffsets.get(part); + offset = restoredOffsets.get(topicPartition); if (offset != null) - checkpointOffsets.put(part, offset); + checkpointOffsets.put(topicPartition, offset); } } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index a543e6504d9ce..4437a1955e700 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -97,11 +97,6 @@ public void commit() { initializeOffsetLimits(); } - @Override - protected Map recordCollectorOffsets() { - return recordCollector.offsets(); - } - @Override public void close() { //no-op diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 54ccdc4f95823..de547230ae3f9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -63,11 +63,11 @@ public class ProcessorStateManagerTest { public static class MockRestoreConsumer extends MockConsumer { private final Serializer serializer = new IntegerSerializer(); - public TopicPartition assignedPartition = null; - public TopicPartition seekPartition = null; - public long seekOffset = -1L; - public boolean seekToBeginingCalled = false; - public boolean seekToEndCalled = false; + private TopicPartition assignedPartition = null; + private TopicPartition seekPartition = null; + private long seekOffset = -1L; + private boolean seekToBeginingCalled = false; + private boolean seekToEndCalled = false; private long endOffset = 0L; private long currentOffset = 0L; @@ -192,7 +192,6 @@ public synchronized void seekToEnd(Collection partitions) { private final Set noPartitions = Collections.emptySet(); private final String applicationId = "test-application"; - private final String stateDir = "test"; private final String persistentStoreName = "persistentStore"; private final String nonPersistentStoreName = "nonPersistentStore"; private final String persistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId, persistentStoreName); @@ -213,7 +212,11 @@ public void cleanup() { public void testNoTopic() throws IOException { MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, Collections.emptyMap()); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, new HashMap() { + { + put(nonPersistentStoreName, nonPersistentStoreName); + } + }); try { stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback); } finally { @@ -241,7 +244,12 @@ public void testRegisterPersistentStore() throws IOException { MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore", true); // persistent store - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, Collections.emptyMap()); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, new HashMap() { + { + put(persistentStoreName, persistentStoreTopicName); + put(nonPersistentStoreName, nonPersistentStoreName); + } + }); try { restoreConsumer.reset(); @@ -290,7 +298,12 @@ public void testRegisterNonPersistentStore() throws IOException { MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non persistent store - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 2), noPartitions, restoreConsumer, false, stateDirectory, Collections.emptyMap()); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0, 2), noPartitions, restoreConsumer, false, stateDirectory, new HashMap() { + { + put(persistentStoreName, persistentStoreTopicName); + put(nonPersistentStoreName, nonPersistentStoreTopicName); + } + }); try { restoreConsumer.reset(); @@ -331,6 +344,11 @@ public void testChangeLogOffsets() throws IOException { String storeTopicName2 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName2); String storeTopicName3 = ProcessorStateManager.storeChangelogTopic(applicationId, storeName3); + Map storeToChangelogTopic = new HashMap<>(); + storeToChangelogTopic.put(storeName1, storeTopicName1); + storeToChangelogTopic.put(storeName2, storeTopicName2); + storeToChangelogTopic.put(storeName3, storeTopicName3); + OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME)); checkpoint.write(Collections.singletonMap(new TopicPartition(storeTopicName1, 0), lastCheckpointedOffset)); @@ -363,7 +381,7 @@ public void testChangeLogOffsets() throws IOException { // if there is an source partition, inherit the partition id Set sourcePartitions = Utils.mkSet(new TopicPartition(storeTopicName3, 1)); - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, sourcePartitions, restoreConsumer, true, stateDirectory, Collections.emptyMap()); // standby + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, sourcePartitions, restoreConsumer, true, stateDirectory, storeToChangelogTopic); // standby try { restoreConsumer.reset(); @@ -435,7 +453,12 @@ public void testFlushAndClose() throws IOException { MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true); MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, null); + ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId, noPartitions, restoreConsumer, false, stateDirectory, new HashMap() { + { + put(persistentStoreName, persistentStoreTopicName); + put(nonPersistentStoreName, nonPersistentStoreTopicName); + } + }); try { // make sure the checkpoint file is deleted assertFalse(checkpointFile.exists()); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 442fc3a7c9548..4ad3c883fab93 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -655,7 +655,7 @@ public void testInjectClients() { @Test public void shouldNotNullPointerWhenStandbyTasksAssignedAndNoStateStoresForTopology() throws Exception { final TopologyBuilder builder = new TopologyBuilder(); - builder.setApplicationId("appId") + builder.setApplicationId(applicationId) .addSource("name", "topic") .addSink("out", "output"); @@ -678,7 +678,7 @@ Map> standbyTasks() { @Test public void shouldInitializeRestoreConsumerWithOffsetsFromStandbyTasks() throws Exception { final KStreamBuilder builder = new KStreamBuilder(); - builder.setApplicationId("appId"); + builder.setApplicationId(applicationId); builder.stream("t1").groupByKey().count("count-one"); builder.stream("t2").groupByKey().count("count-two"); final StreamsConfig config = new StreamsConfig(configProps()); @@ -729,7 +729,7 @@ Map> standbyTasks() { @Test public void shouldCloseSuspendedTasksThatAreNoLongerAssignedToThisStreamThreadBeforeCreatingNewTasks() throws Exception { final KStreamBuilder builder = new KStreamBuilder(); - builder.setApplicationId("appId"); + builder.setApplicationId(applicationId); builder.stream("t1").groupByKey().count("count-one"); builder.stream("t2").groupByKey().count("count-two"); final StreamsConfig config = new StreamsConfig(configProps()); @@ -794,7 +794,7 @@ Map> activeTasks() { @Test public void shouldCloseActiveTasksThatAreAssignedToThisStreamThreadButAssignmentHasChangedBeforeCreatingNewTasks() throws Exception { final KStreamBuilder builder = new KStreamBuilder(); - builder.setApplicationId("appId"); + builder.setApplicationId(applicationId); builder.stream(Pattern.compile("t.*")).to("out"); final StreamsConfig config = new StreamsConfig(configProps()); final MockClientSupplier clientSupplier = new MockClientSupplier(); @@ -806,7 +806,7 @@ clientId, processId, new Metrics(), new MockTime(), new StreamsMetadataState(bui @Override protected StreamTask createStreamTask(final TaskId id, final Collection partitions) { final ProcessorTopology topology = builder.build(id.topicGroupId); - final TestStreamTask task = new TestStreamTask(id, "appId", partitions, topology, consumer, producer, restoreConsumer, config, stateDirectory); + final TestStreamTask task = new TestStreamTask(id, applicationId, partitions, topology, consumer, producer, restoreConsumer, config, stateDirectory); createdTasks.put(partitions, task); return task; } From dba8b5d531e06e0ec05044c96a669867381e2b84 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Wed, 11 Jan 2017 09:09:43 -0800 Subject: [PATCH 3/5] add unit tests --- .../streams/kstream/KStreamBuilderTest.java | 26 ++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java index 52decf48cc173..ec6a58c3acdfa 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java @@ -104,11 +104,35 @@ public void shouldHaveCorrectSourceTopicsForTableFromMergedStream() throws Excep final KStream source1 = builder.stream(topic1); final KStream source2 = builder.stream(topic2); final KStream source3 = builder.stream(topic3); + final KStream processedSource1 = + source1.mapValues(new ValueMapper() { + @Override + public String apply(final String value) { + return value; + } + }).filter(new Predicate() { + @Override + public boolean test(final String key, final String value) { + return true; + } + }); + final KStream processedSource2 = source2.filter(new Predicate() { + @Override + public boolean test(final String key, final String value) { + return true; + } + }); - final KStream merged = builder.merge(source1, source2, source3); + final KStream merged = builder.merge(processedSource1, processedSource2, source3); merged.groupByKey().count("my-table"); final Map> actual = builder.stateStoreNameToSourceTopics(); assertEquals(Utils.mkSet("topic-1", "topic-2", "topic-3"), actual.get("my-table")); + + + final KStream merged = builder.merge(processedSource1, processedSource2); + merged.groupByKey().count("my-table"); + final Map> actual = builder.stateStoreNameToSourceTopics(); + assertEquals(Utils.mkSet("topic-1", "topic-2"), actual.get("my-table")); } @Test From 3d05f0105d64e56744e7581eb63948ff8903a882 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Wed, 11 Jan 2017 11:30:40 -0800 Subject: [PATCH 4/5] minor fix --- .../processor/internals/AbstractTask.java | 2 +- .../internals/ProcessorTopology.java | 2 +- .../streams/kstream/KStreamBuilderTest.java | 55 ++++++------------- 3 files changed, 19 insertions(+), 40 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 04086336b52fa..cb7ae9641ade8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -70,7 +70,7 @@ protected AbstractTask(TaskId id, // create the processor state manager try { - this.stateMgr = new ProcessorStateManager(applicationId, id, partitions, restoreConsumer, isStandby, stateDirectory, topology.sourceStoreToSourceTopic()); + this.stateMgr = new ProcessorStateManager(applicationId, id, partitions, restoreConsumer, isStandby, stateDirectory, topology.storeToChangelogTopic()); } catch (IOException e) { throw new ProcessorStateException(String.format("task [%s] Error while creating the state manager", id), e); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java index 5883c8aabb580..57784d373e795 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java @@ -77,7 +77,7 @@ public List stateStores() { return stateStores; } - public Map sourceStoreToSourceTopic() { + public Map storeToChangelogTopic() { return storeToChangelogTopic; } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java index ec6a58c3acdfa..27fa22a2b8d7c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.internals.KStreamImpl; import org.apache.kafka.streams.errors.TopologyBuilderException; +import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockKeyValueMapper; import org.apache.kafka.test.MockProcessorSupplier; @@ -127,44 +128,6 @@ public boolean test(final String key, final String value) { merged.groupByKey().count("my-table"); final Map> actual = builder.stateStoreNameToSourceTopics(); assertEquals(Utils.mkSet("topic-1", "topic-2", "topic-3"), actual.get("my-table")); - - - final KStream merged = builder.merge(processedSource1, processedSource2); - merged.groupByKey().count("my-table"); - final Map> actual = builder.stateStoreNameToSourceTopics(); - assertEquals(Utils.mkSet("topic-1", "topic-2"), actual.get("my-table")); - } - - @Test - public void shouldHaveCorrectSourceTopicsForTableFromMergedStreamWithProcessors() throws Exception { - final String topic1 = "topic-1"; - final String topic2 = "topic-2"; - final KStreamBuilder builder = new KStreamBuilder(); - final KStream source1 = builder.stream(topic1); - final KStream source2 = builder.stream(topic2); - final KStream processedSource1 = - source1.mapValues(new ValueMapper() { - @Override - public String apply(final String value) { - return value; - } - }).filter(new Predicate() { - @Override - public boolean test(final String key, final String value) { - return true; - } - }); - final KStream processedSource2 = source2.filter(new Predicate() { - @Override - public boolean test(final String key, final String value) { - return true; - } - }); - - final KStream merged = builder.merge(processedSource1, processedSource2); - merged.groupByKey().count("my-table"); - final Map> actual = builder.stateStoreNameToSourceTopics(); - assertEquals(Utils.mkSet("topic-1", "topic-2"), actual.get("my-table")); } @Test(expected = TopologyBuilderException.class) @@ -177,6 +140,22 @@ public void shouldThrowExceptionWhenTopicNamesAreNull() throws Exception { new KStreamBuilder().stream(Serdes.String(), Serdes.String(), null, null); } + @Test + public void shouldNotMaterializeSourceKTableIfStateNameNotSpecified() throws Exception { + final KStreamBuilder builder = new KStreamBuilder(); + builder.setApplicationId("app-id"); + + builder.table("topic1", "table1"); + builder.table("topic2", null); + + ProcessorTopology topology = builder.build(null); + + assertEquals(1, topology.stateStores().size()); + assertEquals("table1", topology.stateStores().get(0).name()); + assertEquals(1, topology.storeToChangelogTopic().size()); + assertEquals("topic1", topology.storeToChangelogTopic().get("table1")); + } + @Test public void shouldMapStateStoresToCorrectSourceTopics() throws Exception { final KStreamBuilder builder = new KStreamBuilder(); From 07baf066b53a6645ecb0ed537f2be6085ff71fc9 Mon Sep 17 00:00:00 2001 From: Guozhang Wang Date: Fri, 13 Jan 2017 11:27:07 -0800 Subject: [PATCH 5/5] fix typo --- .../org/apache/kafka/streams/processor/TopologyBuilder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index 32e0557f2d070..b25fcad3d1f2f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -97,7 +97,7 @@ public class TopologyBuilder { // are connected to these state stores private final Map> stateStoreNameToSourceTopics = new HashMap<>(); - // map from state store names to ths state store's corresponding changelog topic if possible, + // map from state store names to this state store's corresponding changelog topic if possible, // this is used in the extended KStreamBuilder. private final Map storeToChangelogTopic = new HashMap<>();