diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 2a040ba508f76..8de5d2304d05a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -52,14 +52,14 @@ public abstract class AbstractTask { /** * @throws ProcessorStateException if the state manager cannot be created */ - protected AbstractTask(TaskId id, - String applicationId, - Collection partitions, - ProcessorTopology topology, - Consumer consumer, - Consumer restoreConsumer, - boolean isStandby, - StateDirectory stateDirectory, + protected AbstractTask(final TaskId id, + final String applicationId, + final Collection partitions, + final ProcessorTopology topology, + final Consumer consumer, + final Consumer restoreConsumer, + final boolean isStandby, + final StateDirectory stateDirectory, final ThreadCache cache) { this.id = id; this.applicationId = applicationId; @@ -70,7 +70,7 @@ protected AbstractTask(TaskId id, // create the processor state manager try { - this.stateMgr = new ProcessorStateManager(id, partitions, restoreConsumer, isStandby, stateDirectory, topology.storeToChangelogTopic()); + stateMgr = new ProcessorStateManager(id, partitions, restoreConsumer, isStandby, stateDirectory, topology.storeToChangelogTopic()); } catch (IOException e) { throw new ProcessorStateException(String.format("task [%s] Error while creating the state manager", id), e); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointable.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointable.java new file mode 100644 index 0000000000000..7b02d5b934365 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointable.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.TopicPartition; + +import java.util.Map; + +// Interface to indicate that an object has associated partition offsets that can be checkpointed +interface Checkpointable { + void checkpoint(final Map offsets); + Map checkpointed(); +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index 75349931aa199..3819bb5a09d70 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -58,11 +58,11 @@ public class GlobalStateManagerImpl implements GlobalStateManager { private final File baseDir; private final OffsetCheckpoint checkpoint; private final Set globalStoreNames = new HashSet<>(); - private HashMap checkpointableOffsets; + private final Map checkpointableOffsets = new HashMap<>(); public GlobalStateManagerImpl(final ProcessorTopology topology, - final Consumer consumer, - final StateDirectory stateDirectory) { + final Consumer consumer, + final StateDirectory stateDirectory) { this.topology = topology; this.consumer = consumer; this.stateDirectory = stateDirectory; @@ -81,8 +81,7 @@ public Set initialize(final InternalProcessorContext processorContext) { } try { - this.checkpointableOffsets = new HashMap<>(checkpoint.read()); - checkpoint.delete(); + this.checkpointableOffsets.putAll(checkpoint.read()); } catch (IOException e) { try { stateDirectory.unlockGlobalState(); @@ -220,13 +219,14 @@ public void close(final Map offsets) throws IOException { if (closeFailed.length() > 0) { throw new ProcessorStateException("Exceptions caught during close of 1 or more global state stores\n" + closeFailed); } - writeCheckpoints(offsets); + checkpoint(offsets); } finally { stateDirectory.unlockGlobalState(); } } - private void writeCheckpoints(final Map offsets) { + @Override + public void checkpoint(final Map offsets) { if (!offsets.isEmpty()) { checkpointableOffsets.putAll(offsets); try { @@ -238,7 +238,7 @@ private void writeCheckpoints(final Map offsets) { } @Override - public Map checkpointedOffsets() { + public Map checkpointed() { return Collections.unmodifiableMap(checkpointableOffsets); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java index 40f2a3c826aa8..6da37e4c1c003 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java @@ -67,7 +67,7 @@ public Map initialize() { } initTopology(); processorContext.initialized(); - return stateMgr.checkpointedOffsets(); + return stateMgr.checkpointed(); } @@ -89,6 +89,7 @@ public void update(final ConsumerRecord record) { public void flushState() { stateMgr.flush(processorContext); + stateMgr.checkpoint(offsets); } public void close() throws IOException { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 1c786e30a88ea..0e8caa2b76ed8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -68,6 +68,7 @@ public class ProcessorStateManager implements StateManager { // TODO: this map does not work with customized grouper where multiple partitions // of the same topic can be assigned to the same topic. private final Map partitionForTopic; + private final OffsetCheckpoint checkpoint; /** * @throws LockException if the state directory cannot be locked because another thread holds the lock @@ -103,11 +104,8 @@ public ProcessorStateManager(final TaskId taskId, } // load the checkpoint information - OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME)); + checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME)); this.checkpointedOffsets = new HashMap<>(checkpoint.read()); - - // delete the checkpoint file after finish loading its stored offsets - checkpoint.delete(); } @@ -250,7 +248,7 @@ private void restoreActiveState(String topicName, StateRestoreCallback stateRest } } - public Map checkpointedOffsets() { + public Map checkpointed() { Map partitionsAndOffsets = new HashMap<>(); for (Map.Entry entry : restoreCallbacks.entrySet()) { @@ -347,29 +345,7 @@ public void close(Map ackedOffsets) throws IOException { } if (ackedOffsets != null) { - Map checkpointOffsets = new HashMap<>(); - for (String storeName : stores.keySet()) { - // only checkpoint the offset to the offsets file if - // it is persistent AND changelog enabled - if (stores.get(storeName).persistent() && storeToChangelogTopic.containsKey(storeName)) { - String changelogTopic = storeToChangelogTopic.get(storeName); - TopicPartition topicPartition = new TopicPartition(changelogTopic, getPartition(storeName)); - Long offset = ackedOffsets.get(topicPartition); - - if (offset != null) { - // store the last offset + 1 (the log position after restoration) - checkpointOffsets.put(topicPartition, offset + 1); - } else { - // if no record was produced. we need to check the restored offset. - offset = restoredOffsets.get(topicPartition); - if (offset != null) - checkpointOffsets.put(topicPartition, offset); - } - } - } - // write the checkpoint file before closing, to indicate clean shutdown - OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME)); - checkpoint.write(checkpointOffsets); + checkpoint(ackedOffsets); } } @@ -379,6 +355,31 @@ public void close(Map ackedOffsets) throws IOException { } } + // write the checkpoint + @Override + public void checkpoint(final Map ackedOffsets) { + for (String storeName : stores.keySet()) { + // only checkpoint the offset to the offsets file if + // it is persistent AND changelog enabled + if (stores.get(storeName).persistent() && storeToChangelogTopic.containsKey(storeName)) { + final String changelogTopic = storeToChangelogTopic.get(storeName); + final TopicPartition topicPartition = new TopicPartition(changelogTopic, getPartition(storeName)); + if (ackedOffsets.containsKey(topicPartition)) { + // store the last offset + 1 (the log position after restoration) + checkpointedOffsets.put(topicPartition, ackedOffsets.get(topicPartition) + 1); + } else if (restoredOffsets.containsKey(topicPartition)) { + checkpointedOffsets.put(topicPartition, restoredOffsets.get(topicPartition)); + } + } + } + // write the checkpoint file before closing, to indicate clean shutdown + try { + checkpoint.write(checkpointedOffsets); + } catch (IOException e) { + log.warn("Failed to write checkpoint file to {}", new File(baseDir, CHECKPOINT_FILE_NAME), e); + } + } + private int getPartition(String topic) { TopicPartition partition = partitionForTopic.get(topic); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 4437a1955e700..a27098c34f22e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -51,14 +51,15 @@ public class StandbyTask extends AbstractTask { * @param metrics the {@link StreamsMetrics} created by the thread * @param stateDirectory the {@link StateDirectory} created by the thread */ - public StandbyTask(TaskId id, - String applicationId, - Collection partitions, - ProcessorTopology topology, - Consumer consumer, - Consumer restoreConsumer, - StreamsConfig config, - StreamsMetrics metrics, final StateDirectory stateDirectory) { + public StandbyTask(final TaskId id, + final String applicationId, + final Collection partitions, + final ProcessorTopology topology, + final Consumer consumer, + final Consumer restoreConsumer, + final StreamsConfig config, + final StreamsMetrics metrics, + final StateDirectory stateDirectory) { super(id, applicationId, partitions, topology, consumer, restoreConsumer, true, stateDirectory, null); // initialize the topology with its own context @@ -67,9 +68,9 @@ public StandbyTask(TaskId id, log.info("standby-task [{}] Initializing state stores", id()); initializeStateStores(); - ((StandbyContextImpl) this.processorContext).initialized(); + this.processorContext.initialized(); - this.checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointedOffsets()); + this.checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointed()); } public Map checkpointedOffsets() { @@ -92,7 +93,7 @@ public List> update(TopicPartition partition, Lis public void commit() { log.debug("standby-task [{}] Committing its state", id()); stateMgr.flush(processorContext); - + stateMgr.checkpoint(Collections.emptyMap()); // reinitialize offset limits initializeOffsetLimits(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java index 7343c857fa68b..3102b77163086 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java @@ -24,7 +24,7 @@ import java.io.IOException; import java.util.Map; -interface StateManager { +interface StateManager extends Checkpointable { File baseDir(); void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback stateRestoreCallback); @@ -36,6 +36,4 @@ interface StateManager { StateStore getGlobalStore(final String name); StateStore getStore(final String name); - - Map checkpointedOffsets(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index be77856450c07..d95ac4bac6fc1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -76,8 +76,9 @@ public void run() { log.trace("{} Start flushing its producer's sent records upon committing its state", logPrefix); // 2) flush produced records in the downstream and change logs of local states recordCollector.flush(); - - // 3) commit consumed offsets if it is dirty already + // 3) write checkpoints for any local state + stateMgr.checkpoint(recordCollectorOffsets()); + // 4) commit consumed offsets if it is dirty already commitOffsets(); } }; diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java index fe501523af1aa..28dea79acf226 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java @@ -39,7 +39,7 @@ public class InMemoryKeyValueStore implements KeyValueStore { private StateSerdes serdes; - InMemoryKeyValueStore(final String name, final Serde keySerde, final Serde valueSerde) { + public InMemoryKeyValueStore(final String name, final Serde keySerde, final Serde valueSerde) { this.name = name; this.keySerde = keySerde; this.valueSerde = valueSerde; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java index fc0953bb13e39..f288f984d1ab1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java @@ -60,6 +60,7 @@ public void shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException() t } private AbstractTask createTask(final Consumer consumer) { + final MockTime time = new MockTime(); return new AbstractTask(new TaskId(0, 0), "app", Collections.singletonList(new TopicPartition("t", 0)), @@ -72,7 +73,7 @@ private AbstractTask createTask(final Consumer consumer) { consumer, consumer, false, - new StateDirectory("app", TestUtils.tempDirectory().getPath(), new MockTime()), + new StateDirectory("app", TestUtils.tempDirectory().getPath(), time), new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics()))) { @Override public void commit() { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index 168b3004eeb86..062079fefcbea 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -50,6 +50,8 @@ import java.util.Map; import java.util.Set; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -125,15 +127,15 @@ public void shouldReadCheckpointOffsets() throws Exception { final Map expected = writeCheckpoint(); stateManager.initialize(context); - final Map offsets = stateManager.checkpointedOffsets(); + final Map offsets = stateManager.checkpointed(); assertEquals(expected, offsets); } @Test - public void shouldDeleteCheckpointFileAfteLoaded() throws Exception { + public void shouldNotDeleteCheckpointFileAfterLoaded() throws Exception { writeCheckpoint(); stateManager.initialize(context); - assertFalse(checkpointFile.exists()); + assertTrue(checkpointFile.exists()); } @Test(expected = StreamsException.class) @@ -168,7 +170,7 @@ public void shouldThrowIllegalArgumentIfTryingToRegisterStoreThatIsNotGlobal() t } @Test - public void shouldThrowIllegalArgumenExceptionIfAttemptingToRegisterStoreTwice() throws Exception { + public void shouldThrowIllegalArgumentExceptionIfAttemptingToRegisterStoreTwice() throws Exception { stateManager.initialize(context); initializeConsumer(2, 1, t1); stateManager.register(store1, false, new TheStateRestoreCallback()); @@ -271,9 +273,7 @@ public void shouldWriteCheckpointsOnClose() throws Exception { stateManager.register(store1, false, stateRestoreCallback); final Map expected = Collections.singletonMap(t1, 25L); stateManager.close(expected); - final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(), - ProcessorStateManager.CHECKPOINT_FILE_NAME)); - final Map result = offsetCheckpoint.read(); + final Map result = readOffsetsCheckpoint(); assertEquals(expected, result); } @@ -376,6 +376,41 @@ public void shouldReleaseLockIfExceptionWhenLoadingCheckpoints() throws Exceptio } } + @Test + public void shouldCheckpointOffsets() throws Exception { + final Map offsets = Collections.singletonMap(t1, 25L); + stateManager.initialize(context); + + stateManager.checkpoint(offsets); + + final Map result = readOffsetsCheckpoint(); + assertThat(result, equalTo(offsets)); + assertThat(stateManager.checkpointed(), equalTo(offsets)); + } + + @Test + public void shouldNotRemoveOffsetsOfUnUpdatedTablesDuringCheckpoint() throws Exception { + stateManager.initialize(context); + final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback(); + initializeConsumer(10, 1, t1); + stateManager.register(store1, false, stateRestoreCallback); + initializeConsumer(20, 1, t2); + stateManager.register(store2, false, stateRestoreCallback); + + final Map initialCheckpoint = stateManager.checkpointed(); + stateManager.checkpoint(Collections.singletonMap(t1, 101L)); + + final Map updatedCheckpoint = stateManager.checkpointed(); + assertThat(updatedCheckpoint.get(t2), equalTo(initialCheckpoint.get(t2))); + assertThat(updatedCheckpoint.get(t1), equalTo(101L)); + } + + private Map readOffsetsCheckpoint() throws IOException { + final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(), + ProcessorStateManager.CHECKPOINT_FILE_NAME)); + return offsetCheckpoint.read(); + } + @Test public void shouldThrowLockExceptionIfIOExceptionCaughtWhenTryingToLockStateDir() throws Exception { stateManager = new GlobalStateManagerImpl(topology, consumer, new StateDirectory("appId", stateDirPath, time) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java index df0b73ca8a4cd..66999bc8efbe1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java @@ -38,6 +38,8 @@ import java.util.Map; import java.util.Set; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -137,7 +139,19 @@ public void shouldCloseStateManagerWithOffsets() throws Exception { globalStateTask.initialize(); globalStateTask.update(new ConsumerRecord<>("t1", 1, 51, "foo".getBytes(), "foo".getBytes())); globalStateTask.close(); - assertEquals(expectedOffsets, stateMgr.checkpointedOffsets()); + assertEquals(expectedOffsets, stateMgr.checkpointed()); assertTrue(stateMgr.closed); } + + @Test + public void shouldCheckpointOffsetsWhenStateIsFlushed() throws Exception { + final Map expectedOffsets = new HashMap<>(); + expectedOffsets.put(t1, 102L); + expectedOffsets.put(t2, 100L); + globalStateTask.initialize(); + globalStateTask.update(new ConsumerRecord<>("t1", 1, 101, "foo".getBytes(), "foo".getBytes())); + globalStateTask.flushState(); + assertThat(stateMgr.checkpointed(), equalTo(expectedOffsets)); + } + } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index b8d51baff8de2..f1d3090b2c026 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; @@ -58,6 +59,8 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -71,7 +74,6 @@ public static class MockRestoreConsumer extends MockConsumer { private final Serializer serializer = new IntegerSerializer(); private TopicPartition assignedPartition = null; - private TopicPartition seekPartition = null; private long seekOffset = -1L; private boolean seekToBeginingCalled = false; private boolean seekToEndCalled = false; @@ -162,7 +164,6 @@ public synchronized void seek(TopicPartition partition, long offset) { if (seekOffset >= 0) throw new IllegalStateException("RestoreConsumer: offset already seeked"); - seekPartition = partition; seekOffset = offset; currentOffset = offset; super.seek(partition, offset); @@ -203,6 +204,9 @@ public synchronized void seekToEnd(Collection partitions) { private final String nonPersistentStoreName = "nonPersistentStore"; private final String persistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId, persistentStoreName); private final String nonPersistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId, nonPersistentStoreName); + private final MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true); + private final MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); + private final TopicPartition persistentStorePartition = new TopicPartition(persistentStoreTopicName, 1); private final String storeName = "mockStateStore"; private final String changelogTopic = ProcessorStateManager.storeChangelogTopic(applicationId, storeName); private final TopicPartition changelogTopicPartition = new TopicPartition(changelogTopic, 0); @@ -210,6 +214,8 @@ public synchronized void seekToEnd(Collection partitions) { private final MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); private final MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(storeName, true); private File baseDir; + private File checkpointFile; + private OffsetCheckpoint checkpoint; private StateDirectory stateDirectory; @@ -217,6 +223,14 @@ public synchronized void seekToEnd(Collection partitions) { public void setup() { baseDir = TestUtils.tempDirectory(); stateDirectory = new StateDirectory(applicationId, baseDir.getPath(), new MockTime()); + checkpointFile = new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME); + checkpoint = new OffsetCheckpoint(checkpointFile); + restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList( + new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]) + )); + restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList( + new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]) + )); } @After @@ -300,8 +314,6 @@ public void testRegisterPersistentStore() throws IOException { public void testRegisterNonPersistentStore() throws IOException { long lastCheckpointedOffset = 10L; - MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); - OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME)); checkpoint.write(Collections.singletonMap(new TopicPartition(persistentStoreTopicName, 2), lastCheckpointedOffset)); @@ -313,8 +325,6 @@ public void testRegisterNonPersistentStore() throws IOException { TopicPartition partition = new TopicPartition(persistentStoreTopicName, 2); restoreConsumer.updateEndOffsets(Collections.singletonMap(partition, 13L)); - MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non persistent store - ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 2), noPartitions, restoreConsumer, false, stateDirectory, new HashMap() { { put(persistentStoreName, persistentStoreTopicName); @@ -325,7 +335,7 @@ public void testRegisterNonPersistentStore() throws IOException { restoreConsumer.reset(); ArrayList expectedKeys = new ArrayList<>(); - long offset = -1L; + long offset; for (int i = 1; i <= 3; i++) { offset = (long) (i + 100); int key = i; @@ -346,12 +356,13 @@ public void testRegisterNonPersistentStore() throws IOException { } finally { stateMgr.close(Collections.emptyMap()); } - } @Test public void testChangeLogOffsets() throws IOException { final TaskId taskId = new TaskId(0, 0); + final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint( + new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME)); long lastCheckpointedOffset = 10L; String storeName1 = "store1"; String storeName2 = "store2"; @@ -366,10 +377,7 @@ public void testChangeLogOffsets() throws IOException { storeToChangelogTopic.put(storeName2, storeTopicName2); storeToChangelogTopic.put(storeName3, storeTopicName3); - OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME)); - checkpoint.write(Collections.singletonMap(new TopicPartition(storeTopicName1, 0), lastCheckpointedOffset)); - - MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); + offsetCheckpoint.write(Collections.singletonMap(new TopicPartition(storeTopicName1, 0), lastCheckpointedOffset)); restoreConsumer.updatePartitions(storeTopicName1, Utils.mkList( new PartitionInfo(storeTopicName1, 0, Node.noNode(), new Node[0], new Node[0]) @@ -406,7 +414,7 @@ public void testChangeLogOffsets() throws IOException { stateMgr.register(store2, true, store2.stateRestoreCallback); stateMgr.register(store3, true, store3.stateRestoreCallback); - Map changeLogOffsets = stateMgr.checkpointedOffsets(); + Map changeLogOffsets = stateMgr.checkpointed(); assertEquals(3, changeLogOffsets.size()); assertTrue(changeLogOffsets.containsKey(partition1)); @@ -424,20 +432,12 @@ public void testChangeLogOffsets() throws IOException { @Test public void testGetStore() throws IOException { - MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); - - restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList( - new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]) - )); - - MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - - ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), noPartitions, restoreConsumer, false, stateDirectory, Collections.emptyMap()); + final ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), noPartitions, restoreConsumer, false, stateDirectory, Collections.emptyMap()); try { - stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback); + stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback); assertNull(stateMgr.getStore("noSuchStore")); - assertEquals(mockStateStore, stateMgr.getStore(nonPersistentStoreName)); + assertEquals(nonPersistentStore, stateMgr.getStore(nonPersistentStoreName)); } finally { stateMgr.close(Collections.emptyMap()); @@ -446,30 +446,15 @@ public void testGetStore() throws IOException { @Test public void testFlushAndClose() throws IOException { - final TaskId taskId = new TaskId(0, 1); - File checkpointFile = new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME); // write an empty checkpoint file - OffsetCheckpoint oldCheckpoint = new OffsetCheckpoint(checkpointFile); - oldCheckpoint.write(Collections.emptyMap()); - - MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); - - restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList( - new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]) - )); - restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList( - new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]) - )); + checkpoint.write(Collections.emptyMap()); // set up ack'ed offsets - HashMap ackedOffsets = new HashMap<>(); + final HashMap ackedOffsets = new HashMap<>(); ackedOffsets.put(new TopicPartition(persistentStoreTopicName, 1), 123L); ackedOffsets.put(new TopicPartition(nonPersistentStoreTopicName, 1), 456L); ackedOffsets.put(new TopicPartition(ProcessorStateManager.storeChangelogTopic(applicationId, "otherTopic"), 1), 789L); - MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true); - MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, noPartitions, restoreConsumer, false, stateDirectory, new HashMap() { { put(persistentStoreName, persistentStoreTopicName); @@ -477,8 +462,8 @@ public void testFlushAndClose() throws IOException { } }); try { - // make sure the checkpoint file is deleted - assertFalse(checkpointFile.exists()); + // make sure the checkpoint file isn't deleted + assertTrue(checkpointFile.exists()); restoreConsumer.reset(); stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback); @@ -499,39 +484,122 @@ public void testFlushAndClose() throws IOException { assertTrue(checkpointFile.exists()); // the checkpoint file should contain an offset from the persistent store only. - OffsetCheckpoint newCheckpoint = new OffsetCheckpoint(checkpointFile); - Map checkpointedOffsets = newCheckpoint.read(); + final Map checkpointedOffsets = checkpoint.read(); assertEquals(1, checkpointedOffsets.size()); assertEquals(new Long(123L + 1L), checkpointedOffsets.get(new TopicPartition(persistentStoreTopicName, 1))); } @Test public void shouldRegisterStoreWithoutLoggingEnabledAndNotBackedByATopic() throws Exception { - MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, Collections.emptyMap()); - stateMgr.register(mockStateStore, false, mockStateStore.stateRestoreCallback); + stateMgr.register(nonPersistentStore, false, nonPersistentStore.stateRestoreCallback); assertNotNull(stateMgr.getStore(nonPersistentStoreName)); } @Test - public void shouldNotWriteCheckpointsIfAckeOffsetsIsNull() throws Exception { - final File checkpointFile = new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME); - // write an empty checkpoint file - final OffsetCheckpoint oldCheckpoint = new OffsetCheckpoint(checkpointFile); - oldCheckpoint.write(Collections.emptyMap()); + public void shouldNotChangeOffsetsIfAckedOffsetsIsNull() throws Exception { + final Map offsets = Collections.singletonMap(persistentStorePartition, 99L); + checkpoint.write(offsets); - restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList( - new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]) - )); + final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, + noPartitions, + restoreConsumer, + false, + stateDirectory, + Collections.emptyMap()); + + restoreConsumer.reset(); + stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback); + stateMgr.close(null); + final Map read = checkpoint.read(); + assertThat(read, equalTo(offsets)); + } + + @Test + public void shouldWriteCheckpointForPersistentLogEnabledStore() throws Exception { + final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, + noPartitions, + restoreConsumer, + false, + stateDirectory, + Collections.singletonMap(persistentStore.name(), + persistentStoreTopicName)); + restoreConsumer.reset(); + stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback); - final MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true); - final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, noPartitions, restoreConsumer, false, stateDirectory, Collections.emptyMap()); + stateMgr.checkpoint(Collections.singletonMap(persistentStorePartition, 10L)); + final Map read = checkpoint.read(); + assertThat(read, equalTo(Collections.singletonMap(persistentStorePartition, 11L))); + } + + @Test + public void shouldWriteCheckpointForStandbyReplica() throws Exception { + final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, + noPartitions, + restoreConsumer, + true, + stateDirectory, + Collections.singletonMap(persistentStore.name(), + persistentStoreTopicName)); restoreConsumer.reset(); stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback); - stateMgr.close(null); - assertFalse(checkpointFile.exists()); + final byte[] bytes = Serdes.Integer().serializer().serialize("", 10); + stateMgr.updateStandbyStates(persistentStorePartition, + Collections.singletonList( + new ConsumerRecord<>(persistentStorePartition.topic(), + persistentStorePartition.partition(), + 888L, + bytes, + bytes))); + + stateMgr.checkpoint(Collections.emptyMap()); + + final Map read = checkpoint.read(); + assertThat(read, equalTo(Collections.singletonMap(persistentStorePartition, 889L))); + + } + + @Test + public void shouldNotWriteCheckpointForNonPersistent() throws Exception { + final TopicPartition topicPartition = new TopicPartition(nonPersistentStoreTopicName, 1); + + restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList( + new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]) + )); + + final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, + noPartitions, + restoreConsumer, + true, + stateDirectory, + Collections.singletonMap(nonPersistentStoreName, + nonPersistentStoreTopicName)); + + restoreConsumer.reset(); + stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback); + stateMgr.checkpoint(Collections.singletonMap(topicPartition, 876L)); + + final Map read = checkpoint.read(); + assertThat(read, equalTo(Collections.emptyMap())); + } + + @Test + public void shouldNotWriteCheckpointForStoresWithoutChangelogTopic() throws Exception { + final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, + noPartitions, + restoreConsumer, + true, + stateDirectory, + Collections.emptyMap()); + + stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback); + + stateMgr.checkpoint(Collections.singletonMap(persistentStorePartition, 987L)); + + final Map read = checkpoint.read(); + assertThat(read, equalTo(Collections.emptyMap())); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 629e5219a263d..ef4ebcc7c6a8d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; @@ -54,6 +55,8 @@ import java.util.Set; import static java.util.Collections.singleton; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -316,7 +319,7 @@ public void shouldNotThrowUnsupportedOperationExceptionWhenInitializingStateStor final String changelogName = "test-application-my-store-changelog"; final List partitions = Utils.mkList(new TopicPartition(changelogName, 0)); consumer.assign(partitions); - Map committedOffsets = new HashMap<>(); + final Map committedOffsets = new HashMap<>(); committedOffsets.put(new TopicPartition(changelogName, 0), new OffsetAndMetadata(0L)); consumer.commitSync(committedOffsets); @@ -327,9 +330,53 @@ public void shouldNotThrowUnsupportedOperationExceptionWhenInitializingStateStor final ProcessorTopology topology = builder.setApplicationId(applicationId).build(0); StreamsConfig config = createConfig(baseDir); new StandbyTask(taskId, applicationId, partitions, topology, consumer, restoreStateConsumer, config, - new MockStreamsMetrics(new Metrics()), stateDirectory); + new MockStreamsMetrics(new Metrics()), stateDirectory); } + + @Test + public void shouldCheckpointStoreOffsetsOnCommit() throws Exception { + consumer.assign(Utils.mkList(ktable)); + final Map committedOffsets = new HashMap<>(); + committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(100L)); + consumer.commitSync(committedOffsets); + + restoreStateConsumer.updatePartitions("ktable1", Utils.mkList( + new PartitionInfo("ktable1", 0, Node.noNode(), new Node[0], new Node[0]))); + + final TaskId taskId = new TaskId(0, 0); + final MockTime time = new MockTime(); + final StreamsConfig config = createConfig(baseDir); + final StandbyTask task = new StandbyTask(taskId, + applicationId, + ktablePartitions, + ktableTopology, + consumer, + restoreStateConsumer, + config, + null, + stateDirectory + ); + + + restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions())); + + final byte[] serializedValue = Serdes.Integer().serializer().serialize("", 1); + task.update(ktable, Collections.singletonList(new ConsumerRecord<>(ktable.topic(), + ktable.partition(), + 50L, + serializedValue, + serializedValue))); + + time.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)); + task.commit(); + + final Map checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId), + ProcessorStateManager.CHECKPOINT_FILE_NAME)).read(); + assertThat(checkpoint, equalTo(Collections.singletonMap(ktable, 51L))); + + } + private List> records(ConsumerRecord... recs) { return Arrays.asList(recs); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java index 3f480596d62e2..f4aca9fa04aaf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java @@ -57,7 +57,12 @@ public StateStore getStore(final String name) { } @Override - public Map checkpointedOffsets() { + public Map checkpointed() { return null; } + + @Override + public void checkpoint(final Map offsets) { + + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 15b1d25694457..5c72fc9bff78c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -22,6 +22,8 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.record.TimestampType; @@ -39,6 +41,8 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore; +import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.streams.state.internals.ThreadCache; import org.apache.kafka.test.MockProcessorNode; import org.apache.kafka.test.MockSourceNode; @@ -415,6 +419,62 @@ restoreStateConsumer, createConfig(baseDir), streamsMetrics, assertTrue(flushed.get()); } + @SuppressWarnings("unchecked") + @Test + public void shouldCheckpointOffsetsOnCommit() throws Exception { + final String storeName = "test"; + final String changelogTopic = ProcessorStateManager.storeChangelogTopic("appId", storeName); + final InMemoryKeyValueStore inMemoryStore = new InMemoryKeyValueStore(storeName, null, null) { + @Override + public void init(final ProcessorContext context, final StateStore root) { + context.register(root, true, null); + } + + @Override + public boolean persistent() { + return true; + } + }; + final ProcessorTopology topology = new ProcessorTopology(Collections.emptyList(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.singletonList(inMemoryStore), + Collections.singletonMap(storeName, changelogTopic), + Collections.emptyList()); + + final TopicPartition partition = new TopicPartition(changelogTopic, 0); + final NoOpRecordCollector recordCollector = new NoOpRecordCollector() { + @Override + public Map offsets() { + + return Collections.singletonMap(partition, 543L); + } + }; + + restoreStateConsumer.updatePartitions(changelogTopic, + Collections.singletonList( + new PartitionInfo(changelogTopic, 0, null, new Node[0], new Node[0]))); + restoreStateConsumer.updateEndOffsets(Collections.singletonMap(partition, 0L)); + restoreStateConsumer.updateBeginningOffsets(Collections.singletonMap(partition, 0L)); + + final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics()); + final TaskId taskId = new TaskId(0, 0); + final MockTime time = new MockTime(); + final StreamsConfig config = createConfig(baseDir); + final StreamTask streamTask = new StreamTask(taskId, "appId", partitions, topology, consumer, + restoreStateConsumer, config, streamsMetrics, + stateDirectory, new ThreadCache("testCache", 0, streamsMetrics), + time, recordCollector); + + time.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)); + + streamTask.commit(); + final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId), + ProcessorStateManager.CHECKPOINT_FILE_NAME)); + + assertThat(checkpoint.read(), equalTo(Collections.singletonMap(partition, 544L))); + } + @Test public void shouldThrowIllegalStateExceptionIfCurrentNodeIsNotNullWhenPunctuateCalled() throws Exception { ((ProcessorContextImpl) task.processorContext()).setCurrentNode(processor); diff --git a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java index 2f3ef26635ecf..612a0da47e6d4 100644 --- a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java +++ b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java @@ -66,6 +66,11 @@ public void close(final Map offsets) throws IOException { closed = true; } + @Override + public void checkpoint(final Map offsets) { + this.offsets.putAll(offsets); + } + @Override public StateStore getGlobalStore(final String name) { return null; @@ -77,7 +82,7 @@ public StateStore getStore(final String name) { } @Override - public Map checkpointedOffsets() { + public Map checkpointed() { return offsets; } } diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java index b50ff347b297e..ac8933dcb6aac 100644 --- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java @@ -211,7 +211,8 @@ public List partitionsFor(String topic) { final GlobalStateManagerImpl stateManager = new GlobalStateManagerImpl(globalTopology, globalConsumer, stateDirectory); globalStateTask = new GlobalStateUpdateTask(globalTopology, new GlobalProcessorContextImpl(config, stateManager, streamsMetrics, cache), - stateManager); + stateManager + ); globalStateTask.initialize(); }