From 6743dc63293e2d0fca57dcb7d1a0ace5237837b0 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Tue, 31 Jan 2017 13:37:00 +0000 Subject: [PATCH 1/9] checkpoint statestores --- .../apache/kafka/streams/StreamsConfig.java | 13 +- .../processor/internals/AbstractTask.java | 8 +- .../processor/internals/Checkpointable.java | 26 +++ .../processor/internals/Checkpointer.java | 46 +++++ .../internals/GlobalStateManagerImpl.java | 14 +- .../internals/GlobalStateUpdateTask.java | 6 +- .../internals/GlobalStreamThread.java | 5 +- .../internals/ProcessorStateManager.java | 56 ++--- .../processor/internals/StandbyTask.java | 12 +- .../processor/internals/StateManager.java | 2 +- .../processor/internals/StreamTask.java | 5 +- .../processor/internals/StreamThread.java | 2 +- .../streams/state/internals/RocksDBStore.java | 1 + .../processor/internals/AbstractTaskTest.java | 3 +- .../processor/internals/CheckpointerTest.java | 84 ++++++++ .../internals/GlobalStateManagerImplTest.java | 47 ++++- .../internals/GlobalStateTaskTest.java | 21 +- .../internals/ProcessorStateManagerTest.java | 195 ++++++++++++------ .../processor/internals/StandbyTaskTest.java | 62 +++++- .../processor/internals/StreamTaskTest.java | 62 ++++++ .../kafka/test/GlobalStateManagerStub.java | 5 + .../test/ProcessorTopologyTestDriver.java | 5 +- 22 files changed, 553 insertions(+), 127 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointable.java create mode 100644 streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointer.java create mode 100644 streams/src/test/java/org/apache/kafka/streams/processor/internals/CheckpointerTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 57db027e83ea0..3d33d76f811b7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -214,6 +214,12 @@ public class StreamsConfig extends AbstractConfig { public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC; + /** {@code statestore.checkpoint.interval.ms} */ + public static final String STATESTORE_CHECKPOINT_INTERVAL_MS_CONFIG = "statestore.checkpoint.interval.ms"; + private static final String STATESTORE_CHECKPOINT_INTERVAL_MS_DOC = "The minimum frequency at which state stores will be checkpointed. " + + "This enables some control over the amount of data that will need to be restored in the event of a hard crash. " + + "A value <= 0 turns checkpointing off."; + static { CONFIG = new ConfigDef() .define(APPLICATION_ID_CONFIG, // required with no default value @@ -383,7 +389,12 @@ public class StreamsConfig extends AbstractConfig { 40 * 1000, atLeast(0), ConfigDef.Importance.MEDIUM, - REQUEST_TIMEOUT_MS_DOC); + REQUEST_TIMEOUT_MS_DOC) + .define(STATESTORE_CHECKPOINT_INTERVAL_MS_CONFIG, + Type.LONG, + 5 * 60 * 1000, + Importance.LOW, + STATESTORE_CHECKPOINT_INTERVAL_MS_DOC); } // this is the list of configs for underlying clients diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 55418d5b4e030..1e445d626523d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; @@ -47,6 +48,7 @@ public abstract class AbstractTask { protected final Consumer consumer; protected final ProcessorStateManager stateMgr; protected final Set partitions; + protected final Checkpointer checkpointer; protected InternalProcessorContext processorContext; protected final ThreadCache cache; /** @@ -60,7 +62,9 @@ protected AbstractTask(TaskId id, Consumer restoreConsumer, boolean isStandby, StateDirectory stateDirectory, - final ThreadCache cache) { + final ThreadCache cache, + final Time time, + final long checkpointInterval) { this.id = id; this.applicationId = applicationId; this.partitions = new HashSet<>(partitions); @@ -71,7 +75,7 @@ protected AbstractTask(TaskId id, // create the processor state manager try { this.stateMgr = new ProcessorStateManager(id, partitions, restoreConsumer, isStandby, stateDirectory, topology.storeToChangelogTopic()); - + this.checkpointer = new Checkpointer(time, stateMgr, checkpointInterval); } catch (IOException e) { throw new ProcessorStateException(String.format("task [%s] Error while creating the state manager", id), e); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointable.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointable.java new file mode 100644 index 0000000000000..abce6d2659a36 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointable.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.TopicPartition; + +import java.util.Map; + +// Interface to indicate that an object can be Check pointed +interface Checkpointable { + void checkpoint(final Map offsets); +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointer.java new file mode 100644 index 0000000000000..84c3a0876cb99 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointer.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Time; + +import java.util.Map; + +public class Checkpointer { + + private final Time time; + private final Checkpointable checkpointable; + private final long checkpointInterval; + private long lastCheckpointMs; + + public Checkpointer(final Time time, + final Checkpointable checkpointable, + final long checkpointInterval) { + this.time = time; + this.checkpointable = checkpointable; + this.lastCheckpointMs = time.milliseconds(); + this.checkpointInterval = checkpointInterval; + } + + public void checkpoint(final Map offsets) { + if (checkpointInterval > 0 && time.milliseconds() >= lastCheckpointMs + checkpointInterval) { + checkpointable.checkpoint(offsets); + lastCheckpointMs = time.milliseconds(); + } + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index 75349931aa199..5b80024becc36 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -58,11 +58,11 @@ public class GlobalStateManagerImpl implements GlobalStateManager { private final File baseDir; private final OffsetCheckpoint checkpoint; private final Set globalStoreNames = new HashSet<>(); - private HashMap checkpointableOffsets; + private final Map checkpointableOffsets = new HashMap<>(); public GlobalStateManagerImpl(final ProcessorTopology topology, - final Consumer consumer, - final StateDirectory stateDirectory) { + final Consumer consumer, + final StateDirectory stateDirectory) { this.topology = topology; this.consumer = consumer; this.stateDirectory = stateDirectory; @@ -81,8 +81,7 @@ public Set initialize(final InternalProcessorContext processorContext) { } try { - this.checkpointableOffsets = new HashMap<>(checkpoint.read()); - checkpoint.delete(); + this.checkpointableOffsets.putAll(checkpoint.read()); } catch (IOException e) { try { stateDirectory.unlockGlobalState(); @@ -220,13 +219,14 @@ public void close(final Map offsets) throws IOException { if (closeFailed.length() > 0) { throw new ProcessorStateException("Exceptions caught during close of 1 or more global state stores\n" + closeFailed); } - writeCheckpoints(offsets); + checkpoint(offsets); } finally { stateDirectory.unlockGlobalState(); } } - private void writeCheckpoints(final Map offsets) { + @Override + public void checkpoint(final Map offsets) { if (!offsets.isEmpty()) { checkpointableOffsets.putAll(offsets); try { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java index 40f2a3c826aa8..2a3d3a886d9c8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java @@ -42,6 +42,7 @@ private class SourceNodeAndDeserializer { private final ProcessorTopology topology; private final InternalProcessorContext processorContext; + private final Checkpointer checkpointer; private final Map offsets = new HashMap<>(); private final Map deserializers = new HashMap<>(); private final GlobalStateManager stateMgr; @@ -49,11 +50,13 @@ private class SourceNodeAndDeserializer { public GlobalStateUpdateTask(final ProcessorTopology topology, final InternalProcessorContext processorContext, - final GlobalStateManager stateMgr) { + final GlobalStateManager stateMgr, + final Checkpointer checkpointer) { this.topology = topology; this.stateMgr = stateMgr; this.processorContext = processorContext; + this.checkpointer = checkpointer; } @SuppressWarnings("unchecked") @@ -89,6 +92,7 @@ public void update(final ConsumerRecord record) { public void flushState() { stateMgr.flush(processorContext); + checkpointer.checkpoint(offsets); } public void close() throws IOException { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java index b4e15f2e573e9..2a37949877fa6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java @@ -160,7 +160,10 @@ private StateConsumer initialize() { stateMgr, streamsMetrics, cache), - stateMgr), + stateMgr, + new Checkpointer(time, + stateMgr, + 10L)), time, config.getLong(StreamsConfig.POLL_MS_CONFIG), config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index ad16c77c2a8ba..48ad3f2eb2401 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -68,6 +68,7 @@ public class ProcessorStateManager implements StateManager { // TODO: this map does not work with customized grouper where multiple partitions // of the same topic can be assigned to the same topic. private final Map partitionForTopic; + private final OffsetCheckpoint checkpoint; /** * @throws LockException if the state directory cannot be locked because another thread holds the lock @@ -103,11 +104,8 @@ public ProcessorStateManager(final TaskId taskId, } // load the checkpoint information - OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME)); + checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME)); this.checkpointedOffsets = new HashMap<>(checkpoint.read()); - - // delete the checkpoint file after finish loading its stored offsets - checkpoint.delete(); } @@ -348,30 +346,7 @@ public void close(Map ackedOffsets) throws IOException { } if (ackedOffsets != null) { - Map checkpointOffsets = new HashMap<>(); - for (String storeName : stores.keySet()) { - // only checkpoint the offset to the offsets file if - // it is persistent AND changelog enabled - if (stores.get(storeName).persistent() && storeToChangelogTopic.containsKey(storeName)) { - String changelogTopic = storeToChangelogTopic.get(storeName); - TopicPartition topicPartition = new TopicPartition(changelogTopic, getPartition(storeName)); - - Long offset = ackedOffsets.get(topicPartition); - - if (offset != null) { - // store the last offset + 1 (the log position after restoration) - checkpointOffsets.put(topicPartition, offset + 1); - } else { - // if no record was produced. we need to check the restored offset. - offset = restoredOffsets.get(topicPartition); - if (offset != null) - checkpointOffsets.put(topicPartition, offset); - } - } - } - // write the checkpoint file before closing, to indicate clean shutdown - OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME)); - checkpoint.write(checkpointOffsets); + checkpoint(ackedOffsets); } } @@ -381,6 +356,31 @@ public void close(Map ackedOffsets) throws IOException { } } + // write the checkpoint + @Override + public void checkpoint(final Map ackedOffsets) { + for (String storeName : stores.keySet()) { + // only checkpoint the offset to the offsets file if + // it is persistent AND changelog enabled + if (stores.get(storeName).persistent() && storeToChangelogTopic.containsKey(storeName)) { + final String changelogTopic = storeToChangelogTopic.get(storeName); + final TopicPartition topicPartition = new TopicPartition(changelogTopic, getPartition(storeName)); + if (ackedOffsets.containsKey(topicPartition)) { + // store the last offset + 1 (the log position after restoration) + checkpointedOffsets.put(topicPartition, ackedOffsets.get(topicPartition) + 1); + } else if (restoredOffsets.containsKey(topicPartition)) { + checkpointedOffsets.put(topicPartition, restoredOffsets.get(topicPartition)); + } + } + } + // write the checkpoint file before closing, to indicate clean shutdown + try { + checkpoint.write(checkpointedOffsets); + } catch (IOException e) { + log.warn("Failed to write checkpoint file to {}", new File(baseDir, CHECKPOINT_FILE_NAME), e); + } + } + private int getPartition(String topic) { TopicPartition partition = partitionForTopic.get(topic); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 4437a1955e700..27d17bed811ce 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.processor.TaskId; @@ -50,6 +51,7 @@ public class StandbyTask extends AbstractTask { * @param config the {@link StreamsConfig} specified by the user * @param metrics the {@link StreamsMetrics} created by the thread * @param stateDirectory the {@link StateDirectory} created by the thread + * @param time */ public StandbyTask(TaskId id, String applicationId, @@ -58,8 +60,10 @@ public StandbyTask(TaskId id, Consumer consumer, Consumer restoreConsumer, StreamsConfig config, - StreamsMetrics metrics, final StateDirectory stateDirectory) { - super(id, applicationId, partitions, topology, consumer, restoreConsumer, true, stateDirectory, null); + StreamsMetrics metrics, + final StateDirectory stateDirectory, + final Time time) { + super(id, applicationId, partitions, topology, consumer, restoreConsumer, true, stateDirectory, null, Time.SYSTEM, config.getLong(StreamsConfig.STATESTORE_CHECKPOINT_INTERVAL_MS_CONFIG)); // initialize the topology with its own context this.processorContext = new StandbyContextImpl(id, applicationId, config, stateMgr, metrics); @@ -67,7 +71,7 @@ public StandbyTask(TaskId id, log.info("standby-task [{}] Initializing state stores", id()); initializeStateStores(); - ((StandbyContextImpl) this.processorContext).initialized(); + this.processorContext.initialized(); this.checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointedOffsets()); } @@ -92,7 +96,7 @@ public List> update(TopicPartition partition, Lis public void commit() { log.debug("standby-task [{}] Committing its state", id()); stateMgr.flush(processorContext); - + checkpointer.checkpoint(Collections.emptyMap()); // reinitialize offset limits initializeOffsetLimits(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java index 7343c857fa68b..96e5b5d835c3c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java @@ -24,7 +24,7 @@ import java.io.IOException; import java.util.Map; -interface StateManager { +interface StateManager extends Checkpointable { File baseDir(); void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback stateRestoreCallback); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 7375fb597a0e3..05aba73f89746 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -74,7 +74,8 @@ public void run() { log.trace("{} Start flushing its producer's sent records upon committing its state", logPrefix); // 2) flush produced records in the downstream and change logs of local states recordCollector.flush(); - + // 3) write checkpoints for any local state + checkpointer.checkpoint(recordCollectorOffsets()); // 3) commit consumed offsets if it is dirty already commitOffsets(); } @@ -105,7 +106,7 @@ public StreamTask(TaskId id, ThreadCache cache, Time time, final RecordCollector recordCollector) { - super(id, applicationId, partitions, topology, consumer, restoreConsumer, false, stateDirectory, cache); + super(id, applicationId, partitions, topology, consumer, restoreConsumer, false, stateDirectory, cache, time, config.getLong(StreamsConfig.STATESTORE_CHECKPOINT_INTERVAL_MS_CONFIG)); this.punctuationQueue = new PunctuationQueue(); this.maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG); this.metrics = new TaskMetrics(metrics); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 01281423cca35..402bd6558f31f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -945,7 +945,7 @@ StandbyTask createStandbyTask(TaskId id, Collection partitions) ProcessorTopology topology = builder.build(id.topicGroupId); if (!topology.stateStores().isEmpty()) { - return new StandbyTask(id, applicationId, partitions, topology, consumer, restoreConsumer, config, streamsMetrics, stateDirectory); + return new StandbyTask(id, applicationId, partitions, topology, consumer, restoreConsumer, config, streamsMetrics, stateDirectory, Time.SYSTEM); } else { return null; } diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 5c83ddfc6045c..9048069556e34 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -35,6 +35,7 @@ import org.rocksdb.CompactionStyle; import org.rocksdb.CompressionType; import org.rocksdb.FlushOptions; +import org.rocksdb.InfoLogLevel; import org.rocksdb.Options; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java index 16967bcd1b0a4..a715dc73cd3e9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; @@ -72,7 +73,7 @@ private AbstractTask createTask(final Consumer consumer) { consumer, false, new StateDirectory("app", TestUtils.tempDirectory().getPath()), - new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics()))) { + new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())), Time.SYSTEM, 0) { @Override public void commit() { // do nothing diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CheckpointerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CheckpointerTest.java new file mode 100644 index 0000000000000..db0777c44ecdb --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CheckpointerTest.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.MockTime; +import org.junit.Test; + +import java.util.Collections; +import java.util.Map; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +public class CheckpointerTest { + + private static final int CHECKPOINT_INTERVAL = 1000; + private final MockTime time = new MockTime(); + private final CheckpointableStub checkpointable = new CheckpointableStub(); + private final Checkpointer checkpointer = new Checkpointer(time, checkpointable, CHECKPOINT_INTERVAL); + + @Test + public void shouldNotCheckpointIfCheckpointIntervalHasntElapsed() throws Exception { + checkpointer.checkpoint(Collections.emptyMap()); + assertThat(checkpointable.callCount, equalTo(0)); + } + + @Test + public void shouldCheckpointIfIntervalHasElapsed() throws Exception { + time.sleep(CHECKPOINT_INTERVAL); + checkpointer.checkpoint(Collections.emptyMap()); + assertThat(checkpointable.callCount, equalTo(1)); + } + + @Test + public void shouldCheckpointEveryTimeTheIntervalHasElapsed() throws Exception { + time.sleep(CHECKPOINT_INTERVAL); + checkpointer.checkpoint(Collections.emptyMap()); + time.sleep(CHECKPOINT_INTERVAL); + checkpointer.checkpoint(Collections.emptyMap()); + assertThat(checkpointable.callCount, equalTo(2)); + } + + @Test + public void shouldNotCheckpointBetweenIntervals() throws Exception { + time.sleep(CHECKPOINT_INTERVAL); + checkpointer.checkpoint(Collections.emptyMap()); + time.sleep(CHECKPOINT_INTERVAL / 2); + checkpointer.checkpoint(Collections.emptyMap()); + assertThat(checkpointable.callCount, equalTo(1)); + } + + @Test + public void shouldNotCheckpointIfIntervalLessThanOrEqualToZero() throws Exception { + final Checkpointer checkpointer = new Checkpointer(time, checkpointable, 0); + time.sleep(1000); + checkpointer.checkpoint(Collections.emptyMap()); + assertThat(checkpointable.callCount, equalTo(0)); + } + + class CheckpointableStub implements Checkpointable { + int callCount = 0; + @Override + public void checkpoint(final Map offsets) { + callCount++; + } + } + +} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index db51cefabf587..c45356e16ebb1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -49,6 +49,8 @@ import java.util.Map; import java.util.Set; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; @@ -127,10 +129,10 @@ public void shouldReadCheckpointOffsets() throws Exception { } @Test - public void shouldDeleteCheckpointFileAfteLoaded() throws Exception { + public void shouldNotDeleteCheckpointFileAfterLoaded() throws Exception { writeCheckpoint(); stateManager.initialize(context); - assertFalse(checkpointFile.exists()); + assertTrue(checkpointFile.exists()); } @Test(expected = StreamsException.class) @@ -168,7 +170,7 @@ public void shouldThrowIllegalArgumentIfTryingToRegisterStoreThatIsNotGlobal() t } @Test - public void shouldThrowIllegalArgumenExceptionIfAttemptingToRegisterStoreTwice() throws Exception { + public void shouldThrowIllegalArgumentExceptionIfAttemptingToRegisterStoreTwice() throws Exception { stateManager.initialize(context); initializeConsumer(2, 1, t1); stateManager.register(store1, false, new TheStateRestoreCallback()); @@ -271,9 +273,7 @@ public void shouldWriteCheckpointsOnClose() throws Exception { stateManager.register(store1, false, stateRestoreCallback); final Map expected = Collections.singletonMap(t1, 25L); stateManager.close(expected); - final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(), - ProcessorStateManager.CHECKPOINT_FILE_NAME)); - final Map result = offsetCheckpoint.read(); + final Map result = readOffsetsCheckpoint(); assertEquals(expected, result); } @@ -379,6 +379,41 @@ public void shouldReleaseLockIfExceptionWhenLoadingCheckpoints() throws Exceptio } } + @Test + public void shouldCheckpointOffsets() throws Exception { + final Map offsets = Collections.singletonMap(t1, 25L); + stateManager.initialize(context); + + stateManager.checkpoint(offsets); + + final Map result = readOffsetsCheckpoint(); + assertThat(result, equalTo(offsets)); + assertThat(stateManager.checkpointedOffsets(), equalTo(offsets)); + } + + @Test + public void shouldNotRemoveOffsetsOfUnUpdatedTablesDuringCheckpoint() throws Exception { + stateManager.initialize(context); + final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback(); + initializeConsumer(10, 1, t1); + stateManager.register(store1, false, stateRestoreCallback); + initializeConsumer(20, 1, t2); + stateManager.register(store2, false, stateRestoreCallback); + + final Map initialCheckpoint = stateManager.checkpointedOffsets(); + stateManager.checkpoint(Collections.singletonMap(t1, 101L)); + + final Map updatedCheckpoint = stateManager.checkpointedOffsets(); + assertThat(updatedCheckpoint.get(t2), equalTo(initialCheckpoint.get(t2))); + assertThat(updatedCheckpoint.get(t1), equalTo(101L)); + } + + private Map readOffsetsCheckpoint() throws IOException { + final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(), + ProcessorStateManager.CHECKPOINT_FILE_NAME)); + return offsetCheckpoint.read(); + } + private void initializeConsumer(final long numRecords, final long startOffset, final TopicPartition topicPartition) { final HashMap startOffsets = new HashMap<>(); startOffsets.put(topicPartition, 1L); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java index df0b73ca8a4cd..4e24b14c2cffc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.test.GlobalStateManagerStub; @@ -38,11 +39,15 @@ import java.util.Map; import java.util.Set; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; public class GlobalStateTaskTest { + private final MockTime time = new MockTime(0); + private final int checkpointInterval = 10; private Map offsets; private GlobalStateUpdateTask globalStateTask; private GlobalStateManagerStub stateMgr; @@ -83,7 +88,8 @@ public void before() { offsets.put(t1, 50L); offsets.put(t2, 100L); stateMgr = new GlobalStateManagerStub(storeNames, offsets); - globalStateTask = new GlobalStateUpdateTask(topology, context, stateMgr); + final Checkpointer checkpointer = new Checkpointer(time, stateMgr, checkpointInterval); + globalStateTask = new GlobalStateUpdateTask(topology, context, stateMgr, checkpointer); } @Test @@ -140,4 +146,17 @@ public void shouldCloseStateManagerWithOffsets() throws Exception { assertEquals(expectedOffsets, stateMgr.checkpointedOffsets()); assertTrue(stateMgr.closed); } + + @Test + public void shouldCheckpointOffsetsWhenStateIsFlushed() throws Exception { + final Map expectedOffsets = new HashMap<>(); + expectedOffsets.put(t1, 102L); + expectedOffsets.put(t2, 100L); + globalStateTask.initialize(); + globalStateTask.update(new ConsumerRecord<>("t1", 1, 101, "foo".getBytes(), "foo".getBytes())); + time.sleep(checkpointInterval); + globalStateTask.flushState(); + assertThat(stateMgr.checkpointedOffsets(), equalTo(expectedOffsets)); + } + } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index 602601a17e731..d14743f8b7409 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.errors.StreamsException; @@ -49,6 +50,8 @@ import java.util.Map; import java.util.Set; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -57,14 +60,10 @@ public class ProcessorStateManagerTest { - private File baseDir; - private StateDirectory stateDirectory; - public static class MockRestoreConsumer extends MockConsumer { private final Serializer serializer = new IntegerSerializer(); private TopicPartition assignedPartition = null; - private TopicPartition seekPartition = null; private long seekOffset = -1L; private boolean seekToBeginingCalled = false; private boolean seekToEndCalled = false; @@ -155,7 +154,6 @@ public synchronized void seek(TopicPartition partition, long offset) { if (seekOffset >= 0) throw new IllegalStateException("RestoreConsumer: offset already seeked"); - seekPartition = partition; seekOffset = offset; currentOffset = offset; super.seek(partition, offset); @@ -190,17 +188,34 @@ public synchronized void seekToEnd(Collection partitions) { } } + private final MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); private final Set noPartitions = Collections.emptySet(); private final String applicationId = "test-application"; private final String persistentStoreName = "persistentStore"; private final String nonPersistentStoreName = "nonPersistentStore"; private final String persistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId, persistentStoreName); private final String nonPersistentStoreTopicName = ProcessorStateManager.storeChangelogTopic(applicationId, nonPersistentStoreName); + private final TaskId taskId01 = new TaskId(0, 1); + private final MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true); + private final MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); + private final TopicPartition persistentStorePartition = new TopicPartition(persistentStoreTopicName, 1); + private File baseDir; + private StateDirectory stateDirectory; + private File checkpointFile; + private OffsetCheckpoint checkpoint; @Before public void setup() { baseDir = TestUtils.tempDirectory(); stateDirectory = new StateDirectory(applicationId, baseDir.getPath()); + checkpointFile = new File(stateDirectory.directoryForTask(taskId01), ProcessorStateManager.CHECKPOINT_FILE_NAME); + checkpoint = new OffsetCheckpoint(checkpointFile); + restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList( + new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]) + )); + restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList( + new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]) + )); } @After @@ -283,8 +298,6 @@ public void testRegisterPersistentStore() throws IOException { public void testRegisterNonPersistentStore() throws IOException { long lastCheckpointedOffset = 10L; - MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); - OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(baseDir, ProcessorStateManager.CHECKPOINT_FILE_NAME)); checkpoint.write(Collections.singletonMap(new TopicPartition(persistentStoreTopicName, 2), lastCheckpointedOffset)); @@ -296,8 +309,6 @@ public void testRegisterNonPersistentStore() throws IOException { TopicPartition partition = new TopicPartition(persistentStoreTopicName, 2); restoreConsumer.updateEndOffsets(Collections.singletonMap(partition, 13L)); - MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); // non persistent store - ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 2), noPartitions, restoreConsumer, false, stateDirectory, new HashMap() { { put(persistentStoreName, persistentStoreTopicName); @@ -308,7 +319,7 @@ public void testRegisterNonPersistentStore() throws IOException { restoreConsumer.reset(); ArrayList expectedKeys = new ArrayList<>(); - long offset = -1L; + long offset; for (int i = 1; i <= 3; i++) { offset = (long) (i + 100); int key = i; @@ -329,12 +340,13 @@ public void testRegisterNonPersistentStore() throws IOException { } finally { stateMgr.close(Collections.emptyMap()); } - } @Test public void testChangeLogOffsets() throws IOException { final TaskId taskId = new TaskId(0, 0); + final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint( + new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME)); long lastCheckpointedOffset = 10L; String storeName1 = "store1"; String storeName2 = "store2"; @@ -349,10 +361,7 @@ public void testChangeLogOffsets() throws IOException { storeToChangelogTopic.put(storeName2, storeTopicName2); storeToChangelogTopic.put(storeName3, storeTopicName3); - OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME)); - checkpoint.write(Collections.singletonMap(new TopicPartition(storeTopicName1, 0), lastCheckpointedOffset)); - - MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); + offsetCheckpoint.write(Collections.singletonMap(new TopicPartition(storeTopicName1, 0), lastCheckpointedOffset)); restoreConsumer.updatePartitions(storeTopicName1, Utils.mkList( new PartitionInfo(storeTopicName1, 0, Node.noNode(), new Node[0], new Node[0]) @@ -407,20 +416,12 @@ public void testChangeLogOffsets() throws IOException { @Test public void testGetStore() throws IOException { - MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); - - restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList( - new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]) - )); - - MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - - ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), noPartitions, restoreConsumer, false, stateDirectory, Collections.emptyMap()); + final ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), noPartitions, restoreConsumer, false, stateDirectory, Collections.emptyMap()); try { - stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback); + stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback); assertNull(stateMgr.getStore("noSuchStore")); - assertEquals(mockStateStore, stateMgr.getStore(nonPersistentStoreName)); + assertEquals(nonPersistentStore, stateMgr.getStore(nonPersistentStoreName)); } finally { stateMgr.close(Collections.emptyMap()); @@ -429,39 +430,24 @@ public void testGetStore() throws IOException { @Test public void testFlushAndClose() throws IOException { - final TaskId taskId = new TaskId(0, 1); - File checkpointFile = new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME); // write an empty checkpoint file - OffsetCheckpoint oldCheckpoint = new OffsetCheckpoint(checkpointFile); - oldCheckpoint.write(Collections.emptyMap()); - - MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); - - restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList( - new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]) - )); - restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList( - new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]) - )); + checkpoint.write(Collections.emptyMap()); // set up ack'ed offsets - HashMap ackedOffsets = new HashMap<>(); + final HashMap ackedOffsets = new HashMap<>(); ackedOffsets.put(new TopicPartition(persistentStoreTopicName, 1), 123L); ackedOffsets.put(new TopicPartition(nonPersistentStoreTopicName, 1), 456L); ackedOffsets.put(new TopicPartition(ProcessorStateManager.storeChangelogTopic(applicationId, "otherTopic"), 1), 789L); - MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true); - MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); - - ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, noPartitions, restoreConsumer, false, stateDirectory, new HashMap() { + ProcessorStateManager stateMgr = new ProcessorStateManager(taskId01, noPartitions, restoreConsumer, false, stateDirectory, new HashMap() { { put(persistentStoreName, persistentStoreTopicName); put(nonPersistentStoreName, nonPersistentStoreTopicName); } }); try { - // make sure the checkpoint file is deleted - assertFalse(checkpointFile.exists()); + // make sure the checkpoint file isn't deleted + assertTrue(checkpointFile.exists()); restoreConsumer.reset(); stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback); @@ -482,42 +468,123 @@ public void testFlushAndClose() throws IOException { assertTrue(checkpointFile.exists()); // the checkpoint file should contain an offset from the persistent store only. - OffsetCheckpoint newCheckpoint = new OffsetCheckpoint(checkpointFile); - Map checkpointedOffsets = newCheckpoint.read(); + final Map checkpointedOffsets = checkpoint.read(); assertEquals(1, checkpointedOffsets.size()); assertEquals(new Long(123L + 1L), checkpointedOffsets.get(new TopicPartition(persistentStoreTopicName, 1))); } @Test public void shouldRegisterStoreWithoutLoggingEnabledAndNotBackedByATopic() throws Exception { - MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName, false); ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, Collections.emptyMap()); - stateMgr.register(mockStateStore, false, mockStateStore.stateRestoreCallback); + stateMgr.register(nonPersistentStore, false, nonPersistentStore.stateRestoreCallback); assertNotNull(stateMgr.getStore(nonPersistentStoreName)); } @Test - public void shouldNotWriteCheckpointsIfAckeOffsetsIsNull() throws Exception { - final TaskId taskId = new TaskId(0, 1); - final File checkpointFile = new File(stateDirectory.directoryForTask(taskId), ProcessorStateManager.CHECKPOINT_FILE_NAME); - // write an empty checkpoint file - final OffsetCheckpoint oldCheckpoint = new OffsetCheckpoint(checkpointFile); - oldCheckpoint.write(Collections.emptyMap()); + public void shouldNotChangeOffsetsIfAckedOffsetsIsNull() throws Exception { + final Map offsets = Collections.singletonMap(persistentStorePartition, 99L); + checkpoint.write(offsets); - final MockRestoreConsumer restoreConsumer = new MockRestoreConsumer(); + final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId01, + noPartitions, + restoreConsumer, + false, + stateDirectory, + Collections.emptyMap()); - restoreConsumer.updatePartitions(persistentStoreTopicName, Utils.mkList( - new PartitionInfo(persistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]) - )); + restoreConsumer.reset(); + stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback); + stateMgr.close(null); + final Map read = checkpoint.read(); + assertThat(read, equalTo(offsets)); + } + + @Test + public void shouldWriteCheckpointForPersistentLogEnabledStore() throws Exception { + final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId01, + noPartitions, + restoreConsumer, + false, + stateDirectory, + Collections.singletonMap(persistentStore.name(), + persistentStoreTopicName)); + restoreConsumer.reset(); + stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback); + + + stateMgr.checkpoint(Collections.singletonMap(persistentStorePartition, 10L)); + final Map read = checkpoint.read(); + assertThat(read, equalTo(Collections.singletonMap(persistentStorePartition, 11L))); + } - final MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName, true); - final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, noPartitions, restoreConsumer, false, stateDirectory, Collections.emptyMap()); + @Test + public void shouldWriteCheckpointForStandbyReplica() throws Exception { + final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId01, + noPartitions, + restoreConsumer, + true, + stateDirectory, + Collections.singletonMap(persistentStore.name(), + persistentStoreTopicName)); restoreConsumer.reset(); stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback); - stateMgr.close(null); - assertFalse(checkpointFile.exists()); + final byte[] bytes = Serdes.Integer().serializer().serialize("", 10); + stateMgr.updateStandbyStates(persistentStorePartition, + Collections.singletonList( + new ConsumerRecord<>(persistentStorePartition.topic(), + persistentStorePartition.partition(), + 888L, + bytes, + bytes))); + + stateMgr.checkpoint(Collections.emptyMap()); + + final Map read = checkpoint.read(); + assertThat(read, equalTo(Collections.singletonMap(persistentStorePartition, 889L))); + + } + + @Test + public void shouldNotWriteCheckpointForNonPersistent() throws Exception { + final TopicPartition topicPartition = new TopicPartition(nonPersistentStoreTopicName, 1); + + restoreConsumer.updatePartitions(nonPersistentStoreTopicName, Utils.mkList( + new PartitionInfo(nonPersistentStoreTopicName, 1, Node.noNode(), new Node[0], new Node[0]) + )); + + final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId01, + noPartitions, + restoreConsumer, + true, + stateDirectory, + Collections.singletonMap(nonPersistentStoreName, + nonPersistentStoreTopicName)); + + restoreConsumer.reset(); + stateMgr.register(nonPersistentStore, true, nonPersistentStore.stateRestoreCallback); + stateMgr.checkpoint(Collections.singletonMap(topicPartition, 876L)); + + final Map read = checkpoint.read(); + assertThat(read, equalTo(Collections.emptyMap())); + } + + @Test + public void shouldNotWriteCheckpointForStoresWithoutChangelogTopic() throws Exception { + final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId01, + noPartitions, + restoreConsumer, + true, + stateDirectory, + Collections.emptyMap()); + + stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback); + + stateMgr.checkpoint(Collections.singletonMap(persistentStorePartition, 987L)); + + final Map read = checkpoint.read(); + assertThat(read, equalTo(Collections.emptyMap())); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 2d32e780faee3..e40876553a0c2 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -27,7 +27,10 @@ import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStreamBuilder; @@ -53,6 +56,8 @@ import java.util.Set; import static java.util.Collections.singleton; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -114,6 +119,7 @@ private StreamsConfig createConfig(final File baseDir) throws Exception { setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath()); setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()); + setProperty(StreamsConfig.STATESTORE_CHECKPOINT_INTERVAL_MS_CONFIG, "1"); } }); } @@ -150,7 +156,7 @@ public void cleanup() { @Test public void testStorePartitions() throws Exception { StreamsConfig config = createConfig(baseDir); - StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, restoreStateConsumer, config, null, stateDirectory); + StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, restoreStateConsumer, config, null, stateDirectory, Time.SYSTEM); assertEquals(Utils.mkSet(partition2), new HashSet<>(task.changeLogPartitions())); @@ -160,7 +166,7 @@ public void testStorePartitions() throws Exception { @Test(expected = Exception.class) public void testUpdateNonPersistentStore() throws Exception { StreamsConfig config = createConfig(baseDir); - StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, restoreStateConsumer, config, null, stateDirectory); + StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, restoreStateConsumer, config, null, stateDirectory, Time.SYSTEM); restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions())); @@ -174,7 +180,7 @@ public void testUpdateNonPersistentStore() throws Exception { @Test public void testUpdate() throws Exception { StreamsConfig config = createConfig(baseDir); - StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, restoreStateConsumer, config, null, stateDirectory); + StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, restoreStateConsumer, config, null, stateDirectory, Time.SYSTEM); restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions())); @@ -232,7 +238,7 @@ public void testUpdateKTable() throws Exception { )); StreamsConfig config = createConfig(baseDir); - StandbyTask task = new StandbyTask(taskId, applicationId, ktablePartitions, ktableTopology, consumer, restoreStateConsumer, config, null, stateDirectory); + StandbyTask task = new StandbyTask(taskId, applicationId, ktablePartitions, ktableTopology, consumer, restoreStateConsumer, config, null, stateDirectory, Time.SYSTEM); restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions())); @@ -315,7 +321,7 @@ public void shouldNotThrowUnsupportedOperationExceptionWhenInitializingStateStor final String changelogName = "test-application-my-store-changelog"; final List partitions = Utils.mkList(new TopicPartition(changelogName, 0)); consumer.assign(partitions); - Map committedOffsets = new HashMap<>(); + final Map committedOffsets = new HashMap<>(); committedOffsets.put(new TopicPartition(changelogName, 0), new OffsetAndMetadata(0L)); consumer.commitSync(committedOffsets); @@ -326,9 +332,53 @@ public void shouldNotThrowUnsupportedOperationExceptionWhenInitializingStateStor final ProcessorTopology topology = builder.setApplicationId(applicationId).build(0); StreamsConfig config = createConfig(baseDir); new StandbyTask(taskId, applicationId, partitions, topology, consumer, restoreStateConsumer, config, - new MockStreamsMetrics(new Metrics()), stateDirectory); + new MockStreamsMetrics(new Metrics()), stateDirectory, Time.SYSTEM); + + } + + @Test + public void shouldCheckpointStoreOffsetsOnCommit() throws Exception { + consumer.assign(Utils.mkList(ktable)); + final Map committedOffsets = new HashMap<>(); + committedOffsets.put(new TopicPartition(ktable.topic(), ktable.partition()), new OffsetAndMetadata(100L)); + consumer.commitSync(committedOffsets); + + restoreStateConsumer.updatePartitions("ktable1", Utils.mkList( + new PartitionInfo("ktable1", 0, Node.noNode(), new Node[0], new Node[0]))); + + final TaskId taskId = new TaskId(0, 0); + final MockTime time = new MockTime(); + final StreamsConfig config = createConfig(baseDir); + final StandbyTask task = new StandbyTask(taskId, + applicationId, + ktablePartitions, + ktableTopology, + consumer, + restoreStateConsumer, + config, + null, + stateDirectory, + time); + + + restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions())); + + final byte[] serializedValue = Serdes.Integer().serializer().serialize("", 1); + task.update(ktable, Collections.singletonList(new ConsumerRecord<>(ktable.topic(), + ktable.partition(), + 50L, + serializedValue, + serializedValue))); + + time.sleep(config.getLong(StreamsConfig.STATESTORE_CHECKPOINT_INTERVAL_MS_CONFIG)); + task.commit(); + + final Map checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId), + ProcessorStateManager.CHECKPOINT_FILE_NAME)).read(); + assertThat(checkpoint, equalTo(Collections.singletonMap(ktable, 51L))); } + private List> records(ConsumerRecord... recs) { return Arrays.asList(recs); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 0479b9d5e34d4..24072015018b6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -22,6 +22,8 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.record.TimestampType; @@ -39,7 +41,9 @@ import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.state.internals.OffsetCheckpoint; import org.apache.kafka.streams.state.internals.ThreadCache; +import org.apache.kafka.test.InMemoryKeyValueStore; import org.apache.kafka.test.MockProcessorNode; import org.apache.kafka.test.MockSourceNode; import org.apache.kafka.test.MockTimestampExtractor; @@ -59,6 +63,8 @@ import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -437,6 +443,62 @@ public void flush() { assertTrue(flushed.get()); } + @Test + public void shouldCheckpointOffsetsOnCommit() throws Exception { + final String storeName = "test"; + final String changelogTopic = ProcessorStateManager.storeChangelogTopic("appId", storeName); + final InMemoryKeyValueStore inMemoryStore = new InMemoryKeyValueStore(storeName) { + @Override + public void init(final ProcessorContext context, final StateStore root) { + context.register(root, true, null); + } + + @Override + public boolean persistent() { + return true; + } + }; + final ProcessorTopology topology = new ProcessorTopology(Collections.emptyList(), + Collections.emptyMap(), + Collections.emptyMap(), + Collections.singletonList(inMemoryStore), + Collections.singletonMap(storeName, changelogTopic), + Collections.emptyList()); + + final TopicPartition partition = new TopicPartition(changelogTopic, 0); + final NoOpRecordCollector recordCollector = new NoOpRecordCollector() { + @Override + public Map offsets() { + + return Collections.singletonMap(partition, 543L); + } + }; + + restoreStateConsumer.updatePartitions(changelogTopic, + Collections.singletonList( + new PartitionInfo(changelogTopic, 0, null, new Node[0], new Node[0]))); + restoreStateConsumer.updateEndOffsets(Collections.singletonMap(partition, 0L)); + restoreStateConsumer.updateBeginningOffsets(Collections.singletonMap(partition, 0L)); + + final StreamsMetrics streamsMetrics = new MockStreamsMetrics(new Metrics()); + final TaskId taskId = new TaskId(0, 0); + final MockTime time = new MockTime(); + final StreamsConfig config = createConfig(baseDir); + final StreamTask streamTask = new StreamTask(taskId, "appId", partitions, topology, consumer, + restoreStateConsumer, config, streamsMetrics, + stateDirectory, new ThreadCache("testCache", 0, streamsMetrics), + time, recordCollector); + + time.sleep(config.getLong(StreamsConfig.STATESTORE_CHECKPOINT_INTERVAL_MS_CONFIG)); + + streamTask.commit(); + final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId), + ProcessorStateManager.CHECKPOINT_FILE_NAME)); + + assertThat(checkpoint.read(), equalTo(Collections.singletonMap(partition, 544L))); + + } + private Iterable> records(ConsumerRecord... recs) { return Arrays.asList(recs); } diff --git a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java index 2f3ef26635ecf..bbec224169268 100644 --- a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java +++ b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java @@ -66,6 +66,11 @@ public void close(final Map offsets) throws IOException { closed = true; } + @Override + public void checkpoint(final Map offsets) { + this.offsets.putAll(offsets); + } + @Override public StateStore getGlobalStore(final String name) { return null; diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java index 89ca0df9e584c..02b271e367e3f 100644 --- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java @@ -38,11 +38,13 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.kafka.streams.processor.internals.Checkpointer; import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl; import org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl; import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask; @@ -204,7 +206,8 @@ public List partitionsFor(String topic) { final GlobalStateManagerImpl stateManager = new GlobalStateManagerImpl(globalTopology, globalConsumer, stateDirectory); globalStateTask = new GlobalStateUpdateTask(globalTopology, new GlobalProcessorContextImpl(config, stateManager, streamsMetrics, cache), - stateManager); + stateManager, + new Checkpointer(Time.SYSTEM, stateManager, 0)); globalStateTask.initialize(); } From 291ff370a0bc714bcb4dda1b4fb98eb71e0d967f Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Tue, 31 Jan 2017 18:12:39 +0000 Subject: [PATCH 2/9] fix checkstyle --- .../org/apache/kafka/streams/state/internals/RocksDBStore.java | 1 - 1 file changed, 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java index 9048069556e34..5c83ddfc6045c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java @@ -35,7 +35,6 @@ import org.rocksdb.CompactionStyle; import org.rocksdb.CompressionType; import org.rocksdb.FlushOptions; -import org.rocksdb.InfoLogLevel; import org.rocksdb.Options; import org.rocksdb.RocksDB; import org.rocksdb.RocksDBException; From c908a0dfb85e5e27a7b36e4e5c47beb50bf6f6cd Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Thu, 2 Feb 2017 15:33:34 +0000 Subject: [PATCH 3/9] fix comment --- .../kafka/streams/processor/internals/Checkpointable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointable.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointable.java index abce6d2659a36..5fdc3730b5ccc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointable.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointable.java @@ -20,7 +20,7 @@ import java.util.Map; -// Interface to indicate that an object can be Check pointed +// Interface to indicate that an object can be checkpointed interface Checkpointable { void checkpoint(final Map offsets); } From f7553a31da9b71c5468df0e50abc3888cb2d81c3 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Fri, 3 Feb 2017 09:33:10 +0000 Subject: [PATCH 4/9] feedback --- .../processor/internals/AbstractTask.java | 20 +++++++++---------- .../processor/internals/Checkpointer.java | 6 +++--- .../processor/internals/StandbyTask.java | 18 ++++++++--------- 3 files changed, 22 insertions(+), 22 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 1e445d626523d..080582b05f6e2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -54,14 +54,14 @@ public abstract class AbstractTask { /** * @throws ProcessorStateException if the state manager cannot be created */ - protected AbstractTask(TaskId id, - String applicationId, - Collection partitions, - ProcessorTopology topology, - Consumer consumer, - Consumer restoreConsumer, - boolean isStandby, - StateDirectory stateDirectory, + protected AbstractTask(final TaskId id, + final String applicationId, + final Collection partitions, + final ProcessorTopology topology, + final Consumer consumer, + final Consumer restoreConsumer, + final boolean isStandby, + final StateDirectory stateDirectory, final ThreadCache cache, final Time time, final long checkpointInterval) { @@ -74,8 +74,8 @@ protected AbstractTask(TaskId id, // create the processor state manager try { - this.stateMgr = new ProcessorStateManager(id, partitions, restoreConsumer, isStandby, stateDirectory, topology.storeToChangelogTopic()); - this.checkpointer = new Checkpointer(time, stateMgr, checkpointInterval); + stateMgr = new ProcessorStateManager(id, partitions, restoreConsumer, isStandby, stateDirectory, topology.storeToChangelogTopic()); + checkpointer = new Checkpointer(time, stateMgr, checkpointInterval); } catch (IOException e) { throw new ProcessorStateException(String.format("task [%s] Error while creating the state manager", id), e); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointer.java index 84c3a0876cb99..bcac8a14d43fa 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointer.java @@ -29,12 +29,12 @@ public class Checkpointer { private long lastCheckpointMs; public Checkpointer(final Time time, - final Checkpointable checkpointable, - final long checkpointInterval) { + final Checkpointable checkpointable, + final long checkpointInterval) { this.time = time; this.checkpointable = checkpointable; - this.lastCheckpointMs = time.milliseconds(); this.checkpointInterval = checkpointInterval; + lastCheckpointMs = time.milliseconds(); } public void checkpoint(final Map offsets) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 27d17bed811ce..08efd3d458dd0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -53,17 +53,17 @@ public class StandbyTask extends AbstractTask { * @param stateDirectory the {@link StateDirectory} created by the thread * @param time */ - public StandbyTask(TaskId id, - String applicationId, - Collection partitions, - ProcessorTopology topology, - Consumer consumer, - Consumer restoreConsumer, - StreamsConfig config, - StreamsMetrics metrics, + public StandbyTask(final TaskId id, + final String applicationId, + final Collection partitions, + final ProcessorTopology topology, + final Consumer consumer, + final Consumer restoreConsumer, + final StreamsConfig config, + final StreamsMetrics metrics, final StateDirectory stateDirectory, final Time time) { - super(id, applicationId, partitions, topology, consumer, restoreConsumer, true, stateDirectory, null, Time.SYSTEM, config.getLong(StreamsConfig.STATESTORE_CHECKPOINT_INTERVAL_MS_CONFIG)); + super(id, applicationId, partitions, topology, consumer, restoreConsumer, true, stateDirectory, null, time, config.getLong(StreamsConfig.STATESTORE_CHECKPOINT_INTERVAL_MS_CONFIG)); // initialize the topology with its own context this.processorContext = new StandbyContextImpl(id, applicationId, config, stateMgr, metrics); From 2531a3ccc4abd369f0f258b89c9a3aeed80ba407 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Wed, 8 Feb 2017 09:03:26 +0000 Subject: [PATCH 5/9] always checkpoint --- .../java/org/apache/kafka/streams/StreamsConfig.java | 4 ++-- .../streams/processor/internals/Checkpointer.java | 2 +- .../streams/processor/internals/CheckpointerTest.java | 10 +--------- 3 files changed, 4 insertions(+), 12 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 3d33d76f811b7..cca2aa4451d3c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -217,8 +217,7 @@ public class StreamsConfig extends AbstractConfig { /** {@code statestore.checkpoint.interval.ms} */ public static final String STATESTORE_CHECKPOINT_INTERVAL_MS_CONFIG = "statestore.checkpoint.interval.ms"; private static final String STATESTORE_CHECKPOINT_INTERVAL_MS_DOC = "The minimum frequency at which state stores will be checkpointed. " + - "This enables some control over the amount of data that will need to be restored in the event of a hard crash. " + - "A value <= 0 turns checkpointing off."; + "This enables some control over the amount of data that will need to be restored in the event of a hard crash."; static { CONFIG = new ConfigDef() @@ -393,6 +392,7 @@ public class StreamsConfig extends AbstractConfig { .define(STATESTORE_CHECKPOINT_INTERVAL_MS_CONFIG, Type.LONG, 5 * 60 * 1000, + atLeast(0), Importance.LOW, STATESTORE_CHECKPOINT_INTERVAL_MS_DOC); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointer.java index bcac8a14d43fa..dedc6f0d397f0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointer.java @@ -38,7 +38,7 @@ public Checkpointer(final Time time, } public void checkpoint(final Map offsets) { - if (checkpointInterval > 0 && time.milliseconds() >= lastCheckpointMs + checkpointInterval) { + if (time.milliseconds() >= lastCheckpointMs + checkpointInterval) { checkpointable.checkpoint(offsets); lastCheckpointMs = time.milliseconds(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CheckpointerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CheckpointerTest.java index db0777c44ecdb..86f7d6df83a22 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CheckpointerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CheckpointerTest.java @@ -64,15 +64,7 @@ public void shouldNotCheckpointBetweenIntervals() throws Exception { checkpointer.checkpoint(Collections.emptyMap()); assertThat(checkpointable.callCount, equalTo(1)); } - - @Test - public void shouldNotCheckpointIfIntervalLessThanOrEqualToZero() throws Exception { - final Checkpointer checkpointer = new Checkpointer(time, checkpointable, 0); - time.sleep(1000); - checkpointer.checkpoint(Collections.emptyMap()); - assertThat(checkpointable.callCount, equalTo(0)); - } - + class CheckpointableStub implements Checkpointable { int callCount = 0; @Override From 52745a5c49c2bbec29d0b6bdb12281f056f2ddd6 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Wed, 8 Feb 2017 10:26:56 +0000 Subject: [PATCH 6/9] feedback --- .../apache/kafka/streams/processor/internals/StreamTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 05aba73f89746..0ffea095dc493 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -76,7 +76,7 @@ public void run() { recordCollector.flush(); // 3) write checkpoints for any local state checkpointer.checkpoint(recordCollectorOffsets()); - // 3) commit consumed offsets if it is dirty already + // 4) commit consumed offsets if it is dirty already commitOffsets(); } }; From 05e08c1505c445cddf9e82619d6df44d673ffc06 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Tue, 14 Feb 2017 08:54:06 -0800 Subject: [PATCH 7/9] remove checkpoint interval config --- .../org/apache/kafka/streams/StreamsConfig.java | 13 +------------ .../streams/processor/internals/StandbyTask.java | 2 +- .../streams/processor/internals/StreamTask.java | 2 +- .../processor/internals/StandbyTaskTest.java | 4 ++-- .../streams/processor/internals/StreamTaskTest.java | 2 +- 5 files changed, 6 insertions(+), 17 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index a86a79d1ba825..4af8109a226b3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -214,11 +214,6 @@ public class StreamsConfig extends AbstractConfig { public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG; private static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC; - /** {@code statestore.checkpoint.interval.ms} */ - public static final String STATESTORE_CHECKPOINT_INTERVAL_MS_CONFIG = "statestore.checkpoint.interval.ms"; - private static final String STATESTORE_CHECKPOINT_INTERVAL_MS_DOC = "The minimum frequency at which state stores will be checkpointed. " + - "This enables some control over the amount of data that will need to be restored in the event of a hard crash."; - static { CONFIG = new ConfigDef() .define(APPLICATION_ID_CONFIG, // required with no default value @@ -388,13 +383,7 @@ public class StreamsConfig extends AbstractConfig { 40 * 1000, atLeast(0), ConfigDef.Importance.MEDIUM, - REQUEST_TIMEOUT_MS_DOC) - .define(STATESTORE_CHECKPOINT_INTERVAL_MS_CONFIG, - Type.LONG, - 5 * 60 * 1000, - atLeast(0), - Importance.LOW, - STATESTORE_CHECKPOINT_INTERVAL_MS_DOC); + REQUEST_TIMEOUT_MS_DOC); } // this is the list of configs for underlying clients diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 08efd3d458dd0..9b8369fec84a3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -63,7 +63,7 @@ public StandbyTask(final TaskId id, final StreamsMetrics metrics, final StateDirectory stateDirectory, final Time time) { - super(id, applicationId, partitions, topology, consumer, restoreConsumer, true, stateDirectory, null, time, config.getLong(StreamsConfig.STATESTORE_CHECKPOINT_INTERVAL_MS_CONFIG)); + super(id, applicationId, partitions, topology, consumer, restoreConsumer, true, stateDirectory, null, time, config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)); // initialize the topology with its own context this.processorContext = new StandbyContextImpl(id, applicationId, config, stateMgr, metrics); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 00e228aae2368..d0a770d72ec00 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -108,7 +108,7 @@ public StreamTask(TaskId id, ThreadCache cache, Time time, final RecordCollector recordCollector) { - super(id, applicationId, partitions, topology, consumer, restoreConsumer, false, stateDirectory, cache, time, config.getLong(StreamsConfig.STATESTORE_CHECKPOINT_INTERVAL_MS_CONFIG)); + super(id, applicationId, partitions, topology, consumer, restoreConsumer, false, stateDirectory, cache, time, config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)); this.punctuationQueue = new PunctuationQueue(); this.maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG); this.metrics = new TaskMetrics(metrics); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 336378533e8db..ed8d7e274ca38 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -119,7 +119,7 @@ private StreamsConfig createConfig(final File baseDir) throws Exception { setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath()); setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()); - setProperty(StreamsConfig.STATESTORE_CHECKPOINT_INTERVAL_MS_CONFIG, "1"); + setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1"); } }); } @@ -370,7 +370,7 @@ public void shouldCheckpointStoreOffsetsOnCommit() throws Exception { serializedValue, serializedValue))); - time.sleep(config.getLong(StreamsConfig.STATESTORE_CHECKPOINT_INTERVAL_MS_CONFIG)); + time.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)); task.commit(); final Map checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId), diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 9d0f58af619ed..5c72fc9bff78c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -466,7 +466,7 @@ public Map offsets() { stateDirectory, new ThreadCache("testCache", 0, streamsMetrics), time, recordCollector); - time.sleep(config.getLong(StreamsConfig.STATESTORE_CHECKPOINT_INTERVAL_MS_CONFIG)); + time.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)); streamTask.commit(); final OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(stateDirectory.directoryForTask(taskId), From 90ca3e85910bd95df80a3a58ef966617e4d7ac6a Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Thu, 16 Feb 2017 08:34:02 -0800 Subject: [PATCH 8/9] remove Checpointer as it is no longer needed --- .../processor/internals/AbstractTask.java | 7 +- .../internals/GlobalStateUpdateTask.java | 7 +- .../internals/GlobalStreamThread.java | 6 +- .../processor/internals/StandbyTask.java | 4 +- .../processor/internals/StreamTask.java | 4 +- .../processor/internals/AbstractTaskTest.java | 2 +- .../processor/internals/CheckpointerTest.java | 76 ------------------- .../internals/GlobalStateTaskTest.java | 3 +- .../test/ProcessorTopologyTestDriver.java | 5 +- 9 files changed, 13 insertions(+), 101 deletions(-) delete mode 100644 streams/src/test/java/org/apache/kafka/streams/processor/internals/CheckpointerTest.java diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 080582b05f6e2..8de5d2304d05a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -23,7 +23,6 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.WakeupException; -import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; @@ -48,7 +47,6 @@ public abstract class AbstractTask { protected final Consumer consumer; protected final ProcessorStateManager stateMgr; protected final Set partitions; - protected final Checkpointer checkpointer; protected InternalProcessorContext processorContext; protected final ThreadCache cache; /** @@ -62,9 +60,7 @@ protected AbstractTask(final TaskId id, final Consumer restoreConsumer, final boolean isStandby, final StateDirectory stateDirectory, - final ThreadCache cache, - final Time time, - final long checkpointInterval) { + final ThreadCache cache) { this.id = id; this.applicationId = applicationId; this.partitions = new HashSet<>(partitions); @@ -75,7 +71,6 @@ protected AbstractTask(final TaskId id, // create the processor state manager try { stateMgr = new ProcessorStateManager(id, partitions, restoreConsumer, isStandby, stateDirectory, topology.storeToChangelogTopic()); - checkpointer = new Checkpointer(time, stateMgr, checkpointInterval); } catch (IOException e) { throw new ProcessorStateException(String.format("task [%s] Error while creating the state manager", id), e); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java index 2a3d3a886d9c8..cc18f2d4decfd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java @@ -42,7 +42,6 @@ private class SourceNodeAndDeserializer { private final ProcessorTopology topology; private final InternalProcessorContext processorContext; - private final Checkpointer checkpointer; private final Map offsets = new HashMap<>(); private final Map deserializers = new HashMap<>(); private final GlobalStateManager stateMgr; @@ -50,13 +49,11 @@ private class SourceNodeAndDeserializer { public GlobalStateUpdateTask(final ProcessorTopology topology, final InternalProcessorContext processorContext, - final GlobalStateManager stateMgr, - final Checkpointer checkpointer) { + final GlobalStateManager stateMgr) { this.topology = topology; this.stateMgr = stateMgr; this.processorContext = processorContext; - this.checkpointer = checkpointer; } @SuppressWarnings("unchecked") @@ -92,7 +89,7 @@ public void update(final ConsumerRecord record) { public void flushState() { stateMgr.flush(processorContext); - checkpointer.checkpoint(offsets); + stateMgr.checkpoint(offsets); } public void close() throws IOException { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java index 2a37949877fa6..f222c8e855345 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java @@ -160,10 +160,8 @@ private StateConsumer initialize() { stateMgr, streamsMetrics, cache), - stateMgr, - new Checkpointer(time, - stateMgr, - 10L)), + stateMgr + ), time, config.getLong(StreamsConfig.POLL_MS_CONFIG), config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 9b8369fec84a3..fcd3f5fdbaec3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -63,7 +63,7 @@ public StandbyTask(final TaskId id, final StreamsMetrics metrics, final StateDirectory stateDirectory, final Time time) { - super(id, applicationId, partitions, topology, consumer, restoreConsumer, true, stateDirectory, null, time, config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)); + super(id, applicationId, partitions, topology, consumer, restoreConsumer, true, stateDirectory, null); // initialize the topology with its own context this.processorContext = new StandbyContextImpl(id, applicationId, config, stateMgr, metrics); @@ -96,7 +96,7 @@ public List> update(TopicPartition partition, Lis public void commit() { log.debug("standby-task [{}] Committing its state", id()); stateMgr.flush(processorContext); - checkpointer.checkpoint(Collections.emptyMap()); + stateMgr.checkpoint(Collections.emptyMap()); // reinitialize offset limits initializeOffsetLimits(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index d0a770d72ec00..d95ac4bac6fc1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -77,7 +77,7 @@ public void run() { // 2) flush produced records in the downstream and change logs of local states recordCollector.flush(); // 3) write checkpoints for any local state - checkpointer.checkpoint(recordCollectorOffsets()); + stateMgr.checkpoint(recordCollectorOffsets()); // 4) commit consumed offsets if it is dirty already commitOffsets(); } @@ -108,7 +108,7 @@ public StreamTask(TaskId id, ThreadCache cache, Time time, final RecordCollector recordCollector) { - super(id, applicationId, partitions, topology, consumer, restoreConsumer, false, stateDirectory, cache, time, config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)); + super(id, applicationId, partitions, topology, consumer, restoreConsumer, false, stateDirectory, cache); this.punctuationQueue = new PunctuationQueue(); this.maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG); this.metrics = new TaskMetrics(metrics); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java index d0af1694a23b4..f288f984d1ab1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java @@ -74,7 +74,7 @@ private AbstractTask createTask(final Consumer consumer) { consumer, false, new StateDirectory("app", TestUtils.tempDirectory().getPath(), time), - new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics())), time, 0) { + new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics()))) { @Override public void commit() { // do nothing diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CheckpointerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CheckpointerTest.java deleted file mode 100644 index 86f7d6df83a22..0000000000000 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CheckpointerTest.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.processor.internals; - -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.utils.MockTime; -import org.junit.Test; - -import java.util.Collections; -import java.util.Map; - -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.MatcherAssert.assertThat; - -public class CheckpointerTest { - - private static final int CHECKPOINT_INTERVAL = 1000; - private final MockTime time = new MockTime(); - private final CheckpointableStub checkpointable = new CheckpointableStub(); - private final Checkpointer checkpointer = new Checkpointer(time, checkpointable, CHECKPOINT_INTERVAL); - - @Test - public void shouldNotCheckpointIfCheckpointIntervalHasntElapsed() throws Exception { - checkpointer.checkpoint(Collections.emptyMap()); - assertThat(checkpointable.callCount, equalTo(0)); - } - - @Test - public void shouldCheckpointIfIntervalHasElapsed() throws Exception { - time.sleep(CHECKPOINT_INTERVAL); - checkpointer.checkpoint(Collections.emptyMap()); - assertThat(checkpointable.callCount, equalTo(1)); - } - - @Test - public void shouldCheckpointEveryTimeTheIntervalHasElapsed() throws Exception { - time.sleep(CHECKPOINT_INTERVAL); - checkpointer.checkpoint(Collections.emptyMap()); - time.sleep(CHECKPOINT_INTERVAL); - checkpointer.checkpoint(Collections.emptyMap()); - assertThat(checkpointable.callCount, equalTo(2)); - } - - @Test - public void shouldNotCheckpointBetweenIntervals() throws Exception { - time.sleep(CHECKPOINT_INTERVAL); - checkpointer.checkpoint(Collections.emptyMap()); - time.sleep(CHECKPOINT_INTERVAL / 2); - checkpointer.checkpoint(Collections.emptyMap()); - assertThat(checkpointable.callCount, equalTo(1)); - } - - class CheckpointableStub implements Checkpointable { - int callCount = 0; - @Override - public void checkpoint(final Map offsets) { - callCount++; - } - } - -} \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java index 4e24b14c2cffc..6d30405a6ca60 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java @@ -88,8 +88,7 @@ public void before() { offsets.put(t1, 50L); offsets.put(t2, 100L); stateMgr = new GlobalStateManagerStub(storeNames, offsets); - final Checkpointer checkpointer = new Checkpointer(time, stateMgr, checkpointInterval); - globalStateTask = new GlobalStateUpdateTask(topology, context, stateMgr, checkpointer); + globalStateTask = new GlobalStateUpdateTask(topology, context, stateMgr); } @Test diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java index b98c777651ffb..ac8933dcb6aac 100644 --- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java @@ -46,7 +46,6 @@ import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TopologyBuilder; -import org.apache.kafka.streams.processor.internals.Checkpointer; import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl; import org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl; import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask; @@ -212,8 +211,8 @@ public List partitionsFor(String topic) { final GlobalStateManagerImpl stateManager = new GlobalStateManagerImpl(globalTopology, globalConsumer, stateDirectory); globalStateTask = new GlobalStateUpdateTask(globalTopology, new GlobalProcessorContextImpl(config, stateManager, streamsMetrics, cache), - stateManager, - new Checkpointer(Time.SYSTEM, stateManager, 0)); + stateManager + ); globalStateTask.initialize(); } From dfe52c643cd735e3784783392834e8152eefa986 Mon Sep 17 00:00:00 2001 From: Damian Guy Date: Thu, 16 Feb 2017 15:11:06 -0800 Subject: [PATCH 9/9] address comments --- .../processor/internals/Checkpointable.java | 3 +- .../processor/internals/Checkpointer.java | 46 ------------------- .../internals/GlobalStateManagerImpl.java | 2 +- .../internals/GlobalStateUpdateTask.java | 2 +- .../internals/GlobalStreamThread.java | 3 +- .../internals/ProcessorStateManager.java | 2 +- .../processor/internals/StandbyTask.java | 7 +-- .../processor/internals/StateManager.java | 2 - .../processor/internals/StreamThread.java | 2 +- .../internals/GlobalStateManagerImplTest.java | 8 ++-- .../internals/GlobalStateTaskTest.java | 8 +--- .../internals/ProcessorStateManagerTest.java | 2 +- .../processor/internals/StandbyTaskTest.java | 16 +++---- .../processor/internals/StateManagerStub.java | 2 +- .../kafka/test/GlobalStateManagerStub.java | 2 +- 15 files changed, 25 insertions(+), 82 deletions(-) delete mode 100644 streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointer.java diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointable.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointable.java index 5fdc3730b5ccc..7b02d5b934365 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointable.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointable.java @@ -20,7 +20,8 @@ import java.util.Map; -// Interface to indicate that an object can be checkpointed +// Interface to indicate that an object has associated partition offsets that can be checkpointed interface Checkpointable { void checkpoint(final Map offsets); + Map checkpointed(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointer.java deleted file mode 100644 index dedc6f0d397f0..0000000000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Checkpointer.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.processor.internals; - -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.utils.Time; - -import java.util.Map; - -public class Checkpointer { - - private final Time time; - private final Checkpointable checkpointable; - private final long checkpointInterval; - private long lastCheckpointMs; - - public Checkpointer(final Time time, - final Checkpointable checkpointable, - final long checkpointInterval) { - this.time = time; - this.checkpointable = checkpointable; - this.checkpointInterval = checkpointInterval; - lastCheckpointMs = time.milliseconds(); - } - - public void checkpoint(final Map offsets) { - if (time.milliseconds() >= lastCheckpointMs + checkpointInterval) { - checkpointable.checkpoint(offsets); - lastCheckpointMs = time.milliseconds(); - } - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java index 5b80024becc36..3819bb5a09d70 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java @@ -238,7 +238,7 @@ public void checkpoint(final Map offsets) { } @Override - public Map checkpointedOffsets() { + public Map checkpointed() { return Collections.unmodifiableMap(checkpointableOffsets); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java index cc18f2d4decfd..6da37e4c1c003 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java @@ -67,7 +67,7 @@ public Map initialize() { } initTopology(); processorContext.initialized(); - return stateMgr.checkpointedOffsets(); + return stateMgr.checkpointed(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java index f222c8e855345..b4e15f2e573e9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java @@ -160,8 +160,7 @@ private StateConsumer initialize() { stateMgr, streamsMetrics, cache), - stateMgr - ), + stateMgr), time, config.getLong(StreamsConfig.POLL_MS_CONFIG), config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index baec4a6db2f45..0e8caa2b76ed8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -248,7 +248,7 @@ private void restoreActiveState(String topicName, StateRestoreCallback stateRest } } - public Map checkpointedOffsets() { + public Map checkpointed() { Map partitionsAndOffsets = new HashMap<>(); for (Map.Entry entry : restoreCallbacks.entrySet()) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index fcd3f5fdbaec3..a27098c34f22e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -20,7 +20,6 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.processor.TaskId; @@ -51,7 +50,6 @@ public class StandbyTask extends AbstractTask { * @param config the {@link StreamsConfig} specified by the user * @param metrics the {@link StreamsMetrics} created by the thread * @param stateDirectory the {@link StateDirectory} created by the thread - * @param time */ public StandbyTask(final TaskId id, final String applicationId, @@ -61,8 +59,7 @@ public StandbyTask(final TaskId id, final Consumer restoreConsumer, final StreamsConfig config, final StreamsMetrics metrics, - final StateDirectory stateDirectory, - final Time time) { + final StateDirectory stateDirectory) { super(id, applicationId, partitions, topology, consumer, restoreConsumer, true, stateDirectory, null); // initialize the topology with its own context @@ -73,7 +70,7 @@ public StandbyTask(final TaskId id, this.processorContext.initialized(); - this.checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointedOffsets()); + this.checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointed()); } public Map checkpointedOffsets() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java index 96e5b5d835c3c..3102b77163086 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManager.java @@ -36,6 +36,4 @@ interface StateManager extends Checkpointable { StateStore getGlobalStore(final String name); StateStore getStore(final String name); - - Map checkpointedOffsets(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 1b0fa3de2f203..fba3db573af14 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -945,7 +945,7 @@ StandbyTask createStandbyTask(TaskId id, Collection partitions) ProcessorTopology topology = builder.build(id.topicGroupId); if (!topology.stateStores().isEmpty()) { - return new StandbyTask(id, applicationId, partitions, topology, consumer, restoreConsumer, config, streamsMetrics, stateDirectory, Time.SYSTEM); + return new StandbyTask(id, applicationId, partitions, topology, consumer, restoreConsumer, config, streamsMetrics, stateDirectory); } else { return null; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java index f817cbfbc8233..062079fefcbea 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java @@ -127,7 +127,7 @@ public void shouldReadCheckpointOffsets() throws Exception { final Map expected = writeCheckpoint(); stateManager.initialize(context); - final Map offsets = stateManager.checkpointedOffsets(); + final Map offsets = stateManager.checkpointed(); assertEquals(expected, offsets); } @@ -385,7 +385,7 @@ public void shouldCheckpointOffsets() throws Exception { final Map result = readOffsetsCheckpoint(); assertThat(result, equalTo(offsets)); - assertThat(stateManager.checkpointedOffsets(), equalTo(offsets)); + assertThat(stateManager.checkpointed(), equalTo(offsets)); } @Test @@ -397,10 +397,10 @@ public void shouldNotRemoveOffsetsOfUnUpdatedTablesDuringCheckpoint() throws Exc initializeConsumer(20, 1, t2); stateManager.register(store2, false, stateRestoreCallback); - final Map initialCheckpoint = stateManager.checkpointedOffsets(); + final Map initialCheckpoint = stateManager.checkpointed(); stateManager.checkpoint(Collections.singletonMap(t1, 101L)); - final Map updatedCheckpoint = stateManager.checkpointedOffsets(); + final Map updatedCheckpoint = stateManager.checkpointed(); assertThat(updatedCheckpoint.get(t2), equalTo(initialCheckpoint.get(t2))); assertThat(updatedCheckpoint.get(t1), equalTo(101L)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java index 6d30405a6ca60..66999bc8efbe1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateTaskTest.java @@ -22,7 +22,6 @@ import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.test.GlobalStateManagerStub; @@ -46,8 +45,6 @@ public class GlobalStateTaskTest { - private final MockTime time = new MockTime(0); - private final int checkpointInterval = 10; private Map offsets; private GlobalStateUpdateTask globalStateTask; private GlobalStateManagerStub stateMgr; @@ -142,7 +139,7 @@ public void shouldCloseStateManagerWithOffsets() throws Exception { globalStateTask.initialize(); globalStateTask.update(new ConsumerRecord<>("t1", 1, 51, "foo".getBytes(), "foo".getBytes())); globalStateTask.close(); - assertEquals(expectedOffsets, stateMgr.checkpointedOffsets()); + assertEquals(expectedOffsets, stateMgr.checkpointed()); assertTrue(stateMgr.closed); } @@ -153,9 +150,8 @@ public void shouldCheckpointOffsetsWhenStateIsFlushed() throws Exception { expectedOffsets.put(t2, 100L); globalStateTask.initialize(); globalStateTask.update(new ConsumerRecord<>("t1", 1, 101, "foo".getBytes(), "foo".getBytes())); - time.sleep(checkpointInterval); globalStateTask.flushState(); - assertThat(stateMgr.checkpointedOffsets(), equalTo(expectedOffsets)); + assertThat(stateMgr.checkpointed(), equalTo(expectedOffsets)); } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index fc29c594b8e32..f1d3090b2c026 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -414,7 +414,7 @@ public void testChangeLogOffsets() throws IOException { stateMgr.register(store2, true, store2.stateRestoreCallback); stateMgr.register(store3, true, store3.stateRestoreCallback); - Map changeLogOffsets = stateMgr.checkpointedOffsets(); + Map changeLogOffsets = stateMgr.checkpointed(); assertEquals(3, changeLogOffsets.size()); assertTrue(changeLogOffsets.containsKey(partition1)); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index ed8d7e274ca38..ef4ebcc7c6a8d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -30,7 +30,6 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.MockTime; -import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStreamBuilder; @@ -119,7 +118,6 @@ private StreamsConfig createConfig(final File baseDir) throws Exception { setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath()); setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()); - setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1"); } }); } @@ -156,7 +154,7 @@ public void cleanup() { @Test public void testStorePartitions() throws Exception { StreamsConfig config = createConfig(baseDir); - StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, restoreStateConsumer, config, null, stateDirectory, Time.SYSTEM); + StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, restoreStateConsumer, config, null, stateDirectory); assertEquals(Utils.mkSet(partition2), new HashSet<>(task.changeLogPartitions())); @@ -166,7 +164,7 @@ public void testStorePartitions() throws Exception { @Test(expected = Exception.class) public void testUpdateNonPersistentStore() throws Exception { StreamsConfig config = createConfig(baseDir); - StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, restoreStateConsumer, config, null, stateDirectory, Time.SYSTEM); + StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, restoreStateConsumer, config, null, stateDirectory); restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions())); @@ -180,7 +178,7 @@ public void testUpdateNonPersistentStore() throws Exception { @Test public void testUpdate() throws Exception { StreamsConfig config = createConfig(baseDir); - StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, restoreStateConsumer, config, null, stateDirectory, Time.SYSTEM); + StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology, consumer, restoreStateConsumer, config, null, stateDirectory); restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions())); @@ -238,7 +236,7 @@ public void testUpdateKTable() throws Exception { )); StreamsConfig config = createConfig(baseDir); - StandbyTask task = new StandbyTask(taskId, applicationId, ktablePartitions, ktableTopology, consumer, restoreStateConsumer, config, null, stateDirectory, Time.SYSTEM); + StandbyTask task = new StandbyTask(taskId, applicationId, ktablePartitions, ktableTopology, consumer, restoreStateConsumer, config, null, stateDirectory); restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions())); @@ -332,7 +330,7 @@ public void shouldNotThrowUnsupportedOperationExceptionWhenInitializingStateStor final ProcessorTopology topology = builder.setApplicationId(applicationId).build(0); StreamsConfig config = createConfig(baseDir); new StandbyTask(taskId, applicationId, partitions, topology, consumer, restoreStateConsumer, config, - new MockStreamsMetrics(new Metrics()), stateDirectory, Time.SYSTEM); + new MockStreamsMetrics(new Metrics()), stateDirectory); } @@ -357,8 +355,8 @@ public void shouldCheckpointStoreOffsetsOnCommit() throws Exception { restoreStateConsumer, config, null, - stateDirectory, - time); + stateDirectory + ); restoreStateConsumer.assign(new ArrayList<>(task.changeLogPartitions())); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java index a928b6eb6320d..f4aca9fa04aaf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerStub.java @@ -57,7 +57,7 @@ public StateStore getStore(final String name) { } @Override - public Map checkpointedOffsets() { + public Map checkpointed() { return null; } diff --git a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java index bbec224169268..612a0da47e6d4 100644 --- a/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java +++ b/streams/src/test/java/org/apache/kafka/test/GlobalStateManagerStub.java @@ -82,7 +82,7 @@ public StateStore getStore(final String name) { } @Override - public Map checkpointedOffsets() { + public Map checkpointed() { return offsets; } }