Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ public abstract class AbstractTask {
/**
* @throws ProcessorStateException if the state manager cannot be created
*/
protected AbstractTask(TaskId id,
String applicationId,
Collection<TopicPartition> partitions,
ProcessorTopology topology,
Consumer<byte[], byte[]> consumer,
Consumer<byte[], byte[]> restoreConsumer,
boolean isStandby,
StateDirectory stateDirectory,
protected AbstractTask(final TaskId id,
final String applicationId,
final Collection<TopicPartition> partitions,
final ProcessorTopology topology,
final Consumer<byte[], byte[]> consumer,
final Consumer<byte[], byte[]> restoreConsumer,
final boolean isStandby,
final StateDirectory stateDirectory,
final ThreadCache cache) {
this.id = id;
this.applicationId = applicationId;
Expand All @@ -70,7 +70,7 @@ protected AbstractTask(TaskId id,

// create the processor state manager
try {
this.stateMgr = new ProcessorStateManager(id, partitions, restoreConsumer, isStandby, stateDirectory, topology.storeToChangelogTopic());
stateMgr = new ProcessorStateManager(id, partitions, restoreConsumer, isStandby, stateDirectory, topology.storeToChangelogTopic());
} catch (IOException e) {
throw new ProcessorStateException(String.format("task [%s] Error while creating the state manager", id), e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor.internals;

import org.apache.kafka.common.TopicPartition;

import java.util.Map;

// Interface to indicate that an object has associated partition offsets that can be checkpointed
interface Checkpointable {
void checkpoint(final Map<TopicPartition, Long> offsets);
Map<TopicPartition, Long> checkpointed();
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,11 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
private final File baseDir;
private final OffsetCheckpoint checkpoint;
private final Set<String> globalStoreNames = new HashSet<>();
private HashMap<TopicPartition, Long> checkpointableOffsets;
private final Map<TopicPartition, Long> checkpointableOffsets = new HashMap<>();

public GlobalStateManagerImpl(final ProcessorTopology topology,
final Consumer<byte[], byte[]> consumer,
final StateDirectory stateDirectory) {
final Consumer<byte[], byte[]> consumer,
final StateDirectory stateDirectory) {
this.topology = topology;
this.consumer = consumer;
this.stateDirectory = stateDirectory;
Expand All @@ -81,8 +81,7 @@ public Set<String> initialize(final InternalProcessorContext processorContext) {
}

try {
this.checkpointableOffsets = new HashMap<>(checkpoint.read());
checkpoint.delete();
this.checkpointableOffsets.putAll(checkpoint.read());
} catch (IOException e) {
try {
stateDirectory.unlockGlobalState();
Expand Down Expand Up @@ -220,13 +219,14 @@ public void close(final Map<TopicPartition, Long> offsets) throws IOException {
if (closeFailed.length() > 0) {
throw new ProcessorStateException("Exceptions caught during close of 1 or more global state stores\n" + closeFailed);
}
writeCheckpoints(offsets);
checkpoint(offsets);
} finally {
stateDirectory.unlockGlobalState();
}
}

private void writeCheckpoints(final Map<TopicPartition, Long> offsets) {
@Override
public void checkpoint(final Map<TopicPartition, Long> offsets) {
if (!offsets.isEmpty()) {
checkpointableOffsets.putAll(offsets);
try {
Expand All @@ -238,7 +238,7 @@ private void writeCheckpoints(final Map<TopicPartition, Long> offsets) {
}

@Override
public Map<TopicPartition, Long> checkpointedOffsets() {
public Map<TopicPartition, Long> checkpointed() {
return Collections.unmodifiableMap(checkpointableOffsets);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public Map<TopicPartition, Long> initialize() {
}
initTopology();
processorContext.initialized();
return stateMgr.checkpointedOffsets();
return stateMgr.checkpointed();
}


Expand All @@ -89,6 +89,7 @@ public void update(final ConsumerRecord<byte[], byte[]> record) {

public void flushState() {
stateMgr.flush(processorContext);
stateMgr.checkpoint(offsets);
}

public void close() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class ProcessorStateManager implements StateManager {
// TODO: this map does not work with customized grouper where multiple partitions
// of the same topic can be assigned to the same topic.
private final Map<String, TopicPartition> partitionForTopic;
private final OffsetCheckpoint checkpoint;

/**
* @throws LockException if the state directory cannot be locked because another thread holds the lock
Expand Down Expand Up @@ -103,11 +104,8 @@ public ProcessorStateManager(final TaskId taskId,
}

// load the checkpoint information
OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
this.checkpointedOffsets = new HashMap<>(checkpoint.read());

// delete the checkpoint file after finish loading its stored offsets
checkpoint.delete();
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the checkpoint file grow indefinitely?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no - it is overwritten each time. See OffsetCheckpoint#write


Expand Down Expand Up @@ -250,7 +248,7 @@ private void restoreActiveState(String topicName, StateRestoreCallback stateRest
}
}

public Map<TopicPartition, Long> checkpointedOffsets() {
public Map<TopicPartition, Long> checkpointed() {
Map<TopicPartition, Long> partitionsAndOffsets = new HashMap<>();

for (Map.Entry<String, StateRestoreCallback> entry : restoreCallbacks.entrySet()) {
Expand Down Expand Up @@ -347,29 +345,7 @@ public void close(Map<TopicPartition, Long> ackedOffsets) throws IOException {
}

if (ackedOffsets != null) {
Map<TopicPartition, Long> checkpointOffsets = new HashMap<>();
for (String storeName : stores.keySet()) {
// only checkpoint the offset to the offsets file if
// it is persistent AND changelog enabled
if (stores.get(storeName).persistent() && storeToChangelogTopic.containsKey(storeName)) {
String changelogTopic = storeToChangelogTopic.get(storeName);
TopicPartition topicPartition = new TopicPartition(changelogTopic, getPartition(storeName));
Long offset = ackedOffsets.get(topicPartition);

if (offset != null) {
// store the last offset + 1 (the log position after restoration)
checkpointOffsets.put(topicPartition, offset + 1);
} else {
// if no record was produced. we need to check the restored offset.
offset = restoredOffsets.get(topicPartition);
if (offset != null)
checkpointOffsets.put(topicPartition, offset);
}
}
}
// write the checkpoint file before closing, to indicate clean shutdown
OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
checkpoint.write(checkpointOffsets);
checkpoint(ackedOffsets);
}

}
Expand All @@ -379,6 +355,31 @@ public void close(Map<TopicPartition, Long> ackedOffsets) throws IOException {
}
}

// write the checkpoint
@Override
public void checkpoint(final Map<TopicPartition, Long> ackedOffsets) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ackedOffsets indicates that those offsets are acked already, but those offsets are going to get acked after the checkpoint was written, right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nah, the offsets are acked first. They need to be otherwise we risk writing a checkpoint for data that has not been acked.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Got confused about the correct order...

for (String storeName : stores.keySet()) {
// only checkpoint the offset to the offsets file if
// it is persistent AND changelog enabled
if (stores.get(storeName).persistent() && storeToChangelogTopic.containsKey(storeName)) {
final String changelogTopic = storeToChangelogTopic.get(storeName);
final TopicPartition topicPartition = new TopicPartition(changelogTopic, getPartition(storeName));
if (ackedOffsets.containsKey(topicPartition)) {
// store the last offset + 1 (the log position after restoration)
checkpointedOffsets.put(topicPartition, ackedOffsets.get(topicPartition) + 1);
} else if (restoredOffsets.containsKey(topicPartition)) {
checkpointedOffsets.put(topicPartition, restoredOffsets.get(topicPartition));
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about the case that both if's are false -- can this happen? If not, we should throw an exception

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it can happen. The table may not have received any updates in the period, so both would be empty

}
}
// write the checkpoint file before closing, to indicate clean shutdown
try {
checkpoint.write(checkpointedOffsets);
} catch (IOException e) {
log.warn("Failed to write checkpoint file to {}", new File(baseDir, CHECKPOINT_FILE_NAME), e);
}
}

private int getPartition(String topic) {
TopicPartition partition = partitionForTopic.get(topic);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,15 @@ public class StandbyTask extends AbstractTask {
* @param metrics the {@link StreamsMetrics} created by the thread
* @param stateDirectory the {@link StateDirectory} created by the thread
*/
public StandbyTask(TaskId id,
String applicationId,
Collection<TopicPartition> partitions,
ProcessorTopology topology,
Consumer<byte[], byte[]> consumer,
Consumer<byte[], byte[]> restoreConsumer,
StreamsConfig config,
StreamsMetrics metrics, final StateDirectory stateDirectory) {
public StandbyTask(final TaskId id,
final String applicationId,
final Collection<TopicPartition> partitions,
final ProcessorTopology topology,
final Consumer<byte[], byte[]> consumer,
final Consumer<byte[], byte[]> restoreConsumer,
final StreamsConfig config,
final StreamsMetrics metrics,
final StateDirectory stateDirectory) {
super(id, applicationId, partitions, topology, consumer, restoreConsumer, true, stateDirectory, null);

// initialize the topology with its own context
Expand All @@ -67,9 +68,9 @@ public StandbyTask(TaskId id,
log.info("standby-task [{}] Initializing state stores", id());
initializeStateStores();

((StandbyContextImpl) this.processorContext).initialized();
this.processorContext.initialized();

this.checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointedOffsets());
this.checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointed());
}

public Map<TopicPartition, Long> checkpointedOffsets() {
Expand All @@ -92,7 +93,7 @@ public List<ConsumerRecord<byte[], byte[]>> update(TopicPartition partition, Lis
public void commit() {
log.debug("standby-task [{}] Committing its state", id());
stateMgr.flush(processorContext);

stateMgr.checkpoint(Collections.<TopicPartition, Long>emptyMap());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this correct, to always write empty map (is it interpreted as offset 0)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes it is correct as for standby task there are no ackedOffsets. It uses the restoredOffsets in ProcessorStateManager

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, as there are no ackedOffsets for standby tasks. The stateMgr will use the restored offsets when checkpointing.

// reinitialize offset limits
initializeOffsetLimits();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.io.IOException;
import java.util.Map;

interface StateManager {
interface StateManager extends Checkpointable {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move function checkpointedOffsets() into Checkpointable as well, and maybe rename to checkpointed as well?

File baseDir();

void register(final StateStore store, final boolean loggingEnabled, final StateRestoreCallback stateRestoreCallback);
Expand All @@ -36,6 +36,4 @@ interface StateManager {
StateStore getGlobalStore(final String name);

StateStore getStore(final String name);

Map<TopicPartition, Long> checkpointedOffsets();
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,9 @@ public void run() {
log.trace("{} Start flushing its producer's sent records upon committing its state", logPrefix);
// 2) flush produced records in the downstream and change logs of local states
recordCollector.flush();

// 3) commit consumed offsets if it is dirty already
// 3) write checkpoints for any local state
stateMgr.checkpoint(recordCollectorOffsets());
// 4) commit consumed offsets if it is dirty already
commitOffsets();
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class InMemoryKeyValueStore<K, V> implements KeyValueStore<K, V> {

private StateSerdes<K, V> serdes;

InMemoryKeyValueStore(final String name, final Serde<K> keySerde, final Serde<V> valueSerde) {
public InMemoryKeyValueStore(final String name, final Serde<K> keySerde, final Serde<V> valueSerde) {
this.name = name;
this.keySerde = keySerde;
this.valueSerde = valueSerde;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public void shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException() t
}

private AbstractTask createTask(final Consumer consumer) {
final MockTime time = new MockTime();
return new AbstractTask(new TaskId(0, 0),
"app",
Collections.singletonList(new TopicPartition("t", 0)),
Expand All @@ -72,7 +73,7 @@ private AbstractTask createTask(final Consumer consumer) {
consumer,
consumer,
false,
new StateDirectory("app", TestUtils.tempDirectory().getPath(), new MockTime()),
new StateDirectory("app", TestUtils.tempDirectory().getPath(), time),
new ThreadCache("testCache", 0, new MockStreamsMetrics(new Metrics()))) {
@Override
public void commit() {
Expand Down
Loading