KAFKA-4317: Regularly checkpoint StateStore changelog offsets#2471
KAFKA-4317: Regularly checkpoint StateStore changelog offsets#2471dguy wants to merge 13 commits intoapache:trunkfrom
Conversation
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Tests fail because of checkstyle error. |
|
doh! i ran the build before commit, too. Must have accidentally changed something |
|
FYI - i need to do a KIP for the new config param |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Just want to say thanks for implementing this! |
|
Refer to this link for build results (access rights to CI server needed): |
|
I just ran the simple benchmark with checkpointing off and checkpointing set to the same as the commit interval (10 seconds). Oddly (probably just other stuff going on) the 3 runs with checkpointing on had better throughput than the three without. Throughput without checkpointing: Throughput with checkpointing: Based on this small sample my assumption is that overhead of checkpointing is negligible. The overhead would increase as the number of stores increases, but these are tiny files. |
|
|
||
| import java.util.Map; | ||
|
|
||
| // Interface to indicate that an object can be Check pointed |
| this.checkpointedOffsets = new HashMap<>(checkpoint.read()); | ||
|
|
||
| // delete the checkpoint file after finish loading its stored offsets | ||
| checkpoint.delete(); |
There was a problem hiding this comment.
Can the checkpoint file grow indefinitely?
There was a problem hiding this comment.
no - it is overwritten each time. See OffsetCheckpoint#write
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Nit: can you please update the PR or JIRA or KIP name (they should match) |
| private final long checkpointInterval; | ||
| private long lastCheckpointMs; | ||
|
|
||
| public Checkpointer(final Time time, |
|
|
||
| // write the checkpoint | ||
| @Override | ||
| public void checkpoint(final Map<TopicPartition, Long> ackedOffsets) { |
There was a problem hiding this comment.
ackedOffsets indicates that those offsets are acked already, but those offsets are going to get acked after the checkpoint was written, right?
There was a problem hiding this comment.
Nah, the offsets are acked first. They need to be otherwise we risk writing a checkpoint for data that has not been acked.
There was a problem hiding this comment.
Yes. Got confused about the correct order...
| checkpointedOffsets.put(topicPartition, ackedOffsets.get(topicPartition) + 1); | ||
| } else if (restoredOffsets.containsKey(topicPartition)) { | ||
| checkpointedOffsets.put(topicPartition, restoredOffsets.get(topicPartition)); | ||
| } |
There was a problem hiding this comment.
what about the case that both if's are false -- can this happen? If not, we should throw an exception
There was a problem hiding this comment.
Yeah it can happen. The table may not have received any updates in the period, so both would be empty
| Consumer<byte[], byte[]> consumer, | ||
| Consumer<byte[], byte[]> restoreConsumer, | ||
| StreamsConfig config, | ||
| StreamsMetrics metrics, final StateDirectory stateDirectory) { |
|
|
||
| // 3) write checkpoints for any local state | ||
| checkpointer.checkpoint(recordCollectorOffsets()); | ||
| // 3) commit consumed offsets if it is dirty already |
There was a problem hiding this comment.
I like 3) 3) better ;-)
| try { | ||
| this.stateMgr = new ProcessorStateManager(id, partitions, restoreConsumer, isStandby, stateDirectory, topology.storeToChangelogTopic()); | ||
|
|
||
| this.checkpointer = new Checkpointer(time, stateMgr, checkpointInterval); |
There was a problem hiding this comment.
remove this (we should avoid this whenever possible)
| final long checkpointInterval) { | ||
| this.time = time; | ||
| this.checkpointable = checkpointable; | ||
| this.lastCheckpointMs = time.milliseconds(); |
There was a problem hiding this comment.
nit: remove this and move one line down -- first initialize with parameters, than everything else
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
I've removed the checkpoint config. So this can be committed without the KIP as there are no public API changes. |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
| Time time, | ||
| final RecordCollector recordCollector) { | ||
| super(id, applicationId, partitions, topology, consumer, restoreConsumer, false, stateDirectory, cache); | ||
| super(id, applicationId, partitions, topology, consumer, restoreConsumer, false, stateDirectory, cache, time, config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)); |
There was a problem hiding this comment.
If we are always writing the checkpointing file upon committing, do we still need this parameter? Or we can just execute the logic of Checkpointer.checkpoint() without the timing conditional?
There was a problem hiding this comment.
Correct. It, and the Checkpointer aren't really needed anymore. I'll remove them
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
|
||
| import java.util.Map; | ||
|
|
||
| public class Checkpointer { |
There was a problem hiding this comment.
As discussed before, this class can be removed?
|
|
||
| import java.util.Map; | ||
|
|
||
| // Interface to indicate that an object can be checkpointed |
There was a problem hiding this comment.
nit: an object -> an object has associated partition offsets that can be ...
| import java.util.Map; | ||
|
|
||
| interface StateManager { | ||
| interface StateManager extends Checkpointable { |
There was a problem hiding this comment.
Should we move function checkpointedOffsets() into Checkpointable as well, and maybe rename to checkpointed as well?
| streamsMetrics, | ||
| cache), | ||
| stateMgr), | ||
| stateMgr |
There was a problem hiding this comment.
Is this intentional? Or did you just want to align line 163 with previous lines?
| final StreamsConfig config, | ||
| final StreamsMetrics metrics, | ||
| final StateDirectory stateDirectory, | ||
| final Time time) { |
There was a problem hiding this comment.
Where is this param used?
There was a problem hiding this comment.
Thought i removed that, too. Obviously i was sleep coding this morning
| log.debug("standby-task [{}] Committing its state", id()); | ||
| stateMgr.flush(processorContext); | ||
|
|
||
| stateMgr.checkpoint(Collections.<TopicPartition, Long>emptyMap()); |
There was a problem hiding this comment.
Is this correct, to always write empty map (is it interpreted as offset 0)?
There was a problem hiding this comment.
yes it is correct as for standby task there are no ackedOffsets. It uses the restoredOffsets in ProcessorStateManager
There was a problem hiding this comment.
Yes, as there are no ackedOffsets for standby tasks. The stateMgr will use the restored offsets when checkpointing.
| public class GlobalStateTaskTest { | ||
|
|
||
| private final MockTime time = new MockTime(0); | ||
| private final int checkpointInterval = 10; |
There was a problem hiding this comment.
Do we still need this?
| setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); | ||
| setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath()); | ||
| setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName()); | ||
| setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "1"); |
There was a problem hiding this comment.
Since we are using MockTime, we do not need to enforce the internal to be very small but still can run it as fast as we want, right?
|
@guozhangwang thanks. I've tidied it up (for real this time!) I apologize for my terrible attempt this morning. |
|
Set up a streams system test on https://jenkins.confluent.io/job/kafka-streams-system-test-pr/1/ |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
|
Refer to this link for build results (access rights to CI server needed): |
Currently the checkpoint file is deleted at state store initialization and it is only ever written again during a clean shutdown. This can result in significant delays during restarts as the entire store needs to be loaded from the changelog. We can mitigate against this by frequently checkpointing the offsets. The checkpointing happens only during the commit phase, i.e, after we have manually flushed the store and the producer. So we guarantee that the checkpointed offsets are never greater than what has been flushed. In the event of hard failure we can recover by reading the checkpoints and consuming from the stored offsets. Author: Damian Guy <damian.guy@gmail.com> Reviewers: Eno Thereska, Matthias J. Sax, Guozhang Wang Closes apache#2471 from dguy/kafka-4317
Currently the checkpoint file is deleted at state store initialization and it is only ever written again during a clean shutdown. This can result in significant delays during restarts as the entire store needs to be loaded from the changelog.
We can mitigate against this by frequently checkpointing the offsets. The checkpointing happens only during the commit phase, i.e, after we have manually flushed the store and the producer. So we guarantee that the checkpointed offsets are never greater than what has been flushed.
In the event of hard failure we can recover by reading the checkpoints and consuming from the stored offsets.