Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -261,16 +261,20 @@ public <K, V> KTable<K, V> table(AutoOffsetReset offsetReset, Serde<K> keySerde,
addSource(offsetReset, source, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topic);
addProcessor(name, processorSupplier, source);

final KTableImpl kTable = new KTableImpl<>(this, name, processorSupplier, Collections.singleton(source), storeName);
StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(storeName,
keySerde,
valSerde,
false,
Collections.<String, String>emptyMap(),
true);

addStateStore(storeSupplier, name);
connectSourceStoreAndTopic(storeName, topic);
final KTableImpl<K, ?, V> kTable = new KTableImpl<>(this, name, processorSupplier, Collections.singleton(source), storeName);

// only materialize the KTable into a state store if the storeName is not null
if (storeName != null) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see a new test for this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, this is not committed.

StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(storeName,
keySerde,
valSerde,
false,
Collections.<String, String>emptyMap(),
true);

addStateStore(storeSupplier, name);
connectSourceStoreAndTopic(storeName, topic);
}

return kTable;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,9 @@ public class TopologyBuilder {
// are connected to these state stores
private final Map<String, Set<String>> stateStoreNameToSourceTopics = new HashMap<>();

// map from state store names that are directly associated with source processors to their subscribed topics,
// map from state store names to this state store's corresponding changelog topic if possible,
// this is used in the extended KStreamBuilder.
private final HashMap<String, String> sourceStoreToSourceTopic = new HashMap<>();
private final Map<String, String> storeToChangelogTopic = new HashMap<>();

// all global topics
private final Set<String> globalTopics = new HashSet<>();
Expand Down Expand Up @@ -784,11 +784,15 @@ public synchronized final TopologyBuilder connectProcessorAndStateStores(String
return this;
}

/**
* This is used only for KStreamBuilder: when adding a KTable from a source topic,
* we need to add the topic as the KTable's materialized state store's changelog.
*/
protected synchronized final TopologyBuilder connectSourceStoreAndTopic(String sourceStoreName, String topic) {
if (sourceStoreToSourceTopic.containsKey(sourceStoreName)) {
if (storeToChangelogTopic.containsKey(sourceStoreName)) {
throw new TopologyBuilderException("Source store " + sourceStoreName + " is already added.");
}
sourceStoreToSourceTopic.put(sourceStoreName, topic);
storeToChangelogTopic.put(sourceStoreName, topic);
return this;
}

Expand Down Expand Up @@ -1026,7 +1030,6 @@ private ProcessorTopology build(Set<String> nodeGroup) {
Map<String, SourceNode> topicSourceMap = new HashMap<>();
Map<String, SinkNode> topicSinkMap = new HashMap<>();
Map<String, StateStore> stateStoreMap = new LinkedHashMap<>();
Map<StateStore, ProcessorNode> storeToProcessorNodeMap = new HashMap<>();

// create processor nodes in a topological order ("nodeFactories" is already topologically sorted)
for (NodeFactory factory : nodeFactories.values()) {
Expand All @@ -1041,9 +1044,22 @@ private ProcessorTopology build(Set<String> nodeGroup) {
}
for (String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) {
if (!stateStoreMap.containsKey(stateStoreName)) {
final StateStore stateStore = getStateStore(stateStoreName);
StateStore stateStore;

if (stateFactories.containsKey(stateStoreName)) {
final StateStoreSupplier supplier = stateFactories.get(stateStoreName).supplier;
stateStore = supplier.get();

// remember the changelog topic if this state store is change-logging enabled
if (supplier.loggingEnabled() && !storeToChangelogTopic.containsKey(stateStoreName)) {
final String changelogTopic = ProcessorStateManager.storeChangelogTopic(this.applicationId, stateStoreName);
storeToChangelogTopic.put(stateStoreName, changelogTopic);
}
} else {
stateStore = globalStateStores.get(stateStoreName);
}

stateStoreMap.put(stateStoreName, stateStore);
storeToProcessorNodeMap.put(stateStore, node);
}
}
} else if (factory instanceof SourceNodeFactory) {
Expand Down Expand Up @@ -1077,13 +1093,7 @@ private ProcessorTopology build(Set<String> nodeGroup) {
}
}

return new ProcessorTopology(processorNodes,
topicSourceMap,
topicSinkMap,
new ArrayList<>(stateStoreMap.values()),
sourceStoreToSourceTopic,
storeToProcessorNodeMap,
new ArrayList<>(globalStateStores.values()));
return new ProcessorTopology(processorNodes, topicSourceMap, topicSinkMap, new ArrayList<>(stateStoreMap.values()), storeToChangelogTopic, new ArrayList<>(globalStateStores.values()));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ protected AbstractTask(TaskId id,

// create the processor state manager
try {
this.stateMgr = new ProcessorStateManager(applicationId, id, partitions, restoreConsumer, isStandby, stateDirectory, topology.sourceStoreToSourceTopic(), topology.storeToProcessorNodeMap());
this.stateMgr = new ProcessorStateManager(applicationId, id, partitions, restoreConsumer, isStandby, stateDirectory, topology.storeToChangelogTopic());

} catch (IOException e) {
throw new ProcessorStateException(String.format("task [%s] Error while creating the state manager", id), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public void register(final StateStore store,
}

private List<TopicPartition> topicPartitionsForStore(final StateStore store) {
final String sourceTopic = topology.sourceStoreToSourceTopic().get(store.name());
final String sourceTopic = topology.storeToChangelogTopic().get(store.name());
final List<PartitionInfo> partitionInfos = consumer.partitionsFor(sourceTopic);
if (partitionInfos == null || partitionInfos.isEmpty()) {
throw new StreamsException(String.format("There are no partitions available for topic %s when initializing global store %s", sourceTopic, store.name()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public GlobalStateUpdateTask(final ProcessorTopology topology,
@SuppressWarnings("unchecked")
public Map<TopicPartition, Long> initialize() {
final Set<String> storeNames = stateMgr.initialize(processorContext);
final Map<String, String> storeNameToTopic = topology.sourceStoreToSourceTopic();
final Map<String, String> storeNameToTopic = topology.storeToChangelogTopic();
for (final String storeName : storeNames) {
final String sourceTopic = storeNameToTopic.get(storeName);
final SourceNode source = topology.source(sourceTopic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,9 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import static java.util.Collections.singleton;

Expand All @@ -53,52 +51,51 @@ public class ProcessorStateManager implements StateManager {
public static final String STATE_CHANGELOG_TOPIC_SUFFIX = "-changelog";
public static final String CHECKPOINT_FILE_NAME = ".checkpoint";

private final String logPrefix;
private final String applicationId;
private final int defaultPartition;
private final Map<String, TopicPartition> partitionForTopic;
private final File baseDir;
private final TaskId taskId;
private final String logPrefix;
private final boolean isStandby;
private final StateDirectory stateDirectory;
private final Map<String, StateStore> stores;
private final Map<String, StateStore> globalStores;
private final Set<String> loggingEnabled;
private final Consumer<byte[], byte[]> restoreConsumer;
private final Map<TopicPartition, Long> offsetLimits;
private final Map<TopicPartition, Long> restoredOffsets;
private final Map<TopicPartition, Long> checkpointedOffsets;
private final Map<TopicPartition, Long> offsetLimits;
private final boolean isStandby;
private final Map<String, StateRestoreCallback> restoreCallbacks; // used for standby tasks, keyed by state topic name
private final Map<String, String> sourceStoreToSourceTopic;
private final TaskId taskId;
private final StateDirectory stateDirectory;
private final Map<StateStore, ProcessorNode> stateStoreProcessorNodeMap;
private final Map<String, String> storeToChangelogTopic;

// TODO: this map does not work with customized grouper where multiple partitions
// of the same topic can be assigned to the same topic.
private final Map<String, TopicPartition> partitionForTopic;

/**
* @throws LockException if the state directory cannot be locked because another thread holds the lock
* (this might be recoverable by retrying)
* @throws IOException if any severe error happens while creating or locking the state directory
*/
public ProcessorStateManager(String applicationId, TaskId taskId, Collection<TopicPartition> sources, Consumer<byte[], byte[]> restoreConsumer, boolean isStandby,
StateDirectory stateDirectory, final Map<String, String> sourceStoreToSourceTopic,
final Map<StateStore, ProcessorNode> stateStoreProcessorNodeMap) throws LockException, IOException {
this.applicationId = applicationId;
this.defaultPartition = taskId.partition;
public ProcessorStateManager(final String applicationId,
final TaskId taskId,
final Collection<TopicPartition> sources,
final Consumer<byte[], byte[]> restoreConsumer,
final boolean isStandby,
final StateDirectory stateDirectory,
final Map<String, String> storeToChangelogTopic) throws LockException, IOException {
this.taskId = taskId;
this.stateDirectory = stateDirectory;
this.stateStoreProcessorNodeMap = stateStoreProcessorNodeMap;
this.baseDir = stateDirectory.directoryForTask(taskId);
this.partitionForTopic = new HashMap<>();
for (TopicPartition source : sources) {
this.partitionForTopic.put(source.topic(), source);
}
this.stores = new LinkedHashMap<>();
this.globalStores = new HashMap<>();
this.loggingEnabled = new HashSet<>();
this.restoreConsumer = restoreConsumer;
this.offsetLimits = new HashMap<>();
this.restoredOffsets = new HashMap<>();
this.isStandby = isStandby;
this.restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>() : null;
this.offsetLimits = new HashMap<>();
this.baseDir = stateDirectory.directoryForTask(taskId);
this.sourceStoreToSourceTopic = sourceStoreToSourceTopic;
this.storeToChangelogTopic = storeToChangelogTopic;

this.logPrefix = String.format("task [%s]", taskId);

Expand Down Expand Up @@ -126,6 +123,9 @@ public File baseDir() {
/**
* @throws IllegalArgumentException if the store name has already been registered or if it is not a valid name
* (e.g., when it conflicts with the names of internal topics, like the checkpoint file name)
*
* // TODO: parameter loggingEnabled can be removed now
*
* @throws StreamsException if the store's change log does not contain the partition
*/
public void register(StateStore store, boolean loggingEnabled, StateRestoreCallback stateRestoreCallback) {
Expand All @@ -139,17 +139,8 @@ public void register(StateStore store, boolean loggingEnabled, StateRestoreCallb
throw new IllegalArgumentException(String.format("%s Store %s has already been registered.", logPrefix, store.name()));
}

if (loggingEnabled) {
this.loggingEnabled.add(store.name());
}

// check that the underlying change log topic exist or not
String topic = null;
if (loggingEnabled) {
topic = storeChangelogTopic(this.applicationId, store.name());
} else if (sourceStoreToSourceTopic != null && sourceStoreToSourceTopic.containsKey(store.name())) {
topic = sourceStoreToSourceTopic.get(store.name());
}
String topic = storeToChangelogTopic.get(store.name());

if (topic == null) {
this.stores.put(store.name(), store);
Expand Down Expand Up @@ -283,7 +274,6 @@ public List<ConsumerRecord<byte[], byte[]>> updateStandbyStates(TopicPartition s
List<ConsumerRecord<byte[], byte[]>> remainingRecords = null;

// restore states from changelog records

StateRestoreCallback restoreCallback = restoreCallbacks.get(storePartition.topic());

long lastOffset = -1L;
Expand All @@ -304,6 +294,7 @@ public List<ConsumerRecord<byte[], byte[]>> updateStandbyStates(TopicPartition s
}
count++;
}

// record the restored offset for its change log partition
restoredOffsets.put(storePartition, lastOffset + 1);

Expand All @@ -328,10 +319,6 @@ public void flush(final InternalProcessorContext context) {
if (!this.stores.isEmpty()) {
log.debug("{} Flushing all stores registered in the state manager", logPrefix);
for (StateStore store : this.stores.values()) {
final ProcessorNode processorNode = stateStoreProcessorNodeMap.get(store);
if (processorNode != null) {
context.setCurrentNode(processorNode);
}
try {
log.trace("{} Flushing store={}", logPrefix, store.name());
store.flush();
Expand Down Expand Up @@ -364,24 +351,22 @@ public void close(Map<TopicPartition, Long> ackedOffsets) throws IOException {
if (ackedOffsets != null) {
Map<TopicPartition, Long> checkpointOffsets = new HashMap<>();
for (String storeName : stores.keySet()) {
TopicPartition part;
if (loggingEnabled.contains(storeName))
part = new TopicPartition(storeChangelogTopic(applicationId, storeName), getPartition(storeName));
else
part = new TopicPartition(storeName, getPartition(storeName));
// only checkpoint the offset to the offsets file if
// it is persistent AND changelog enabled
if (stores.get(storeName).persistent() && storeToChangelogTopic.containsKey(storeName)) {
String changelogTopic = storeToChangelogTopic.get(storeName);
TopicPartition topicPartition = new TopicPartition(changelogTopic, getPartition(storeName));

// only checkpoint the offset to the offsets file if it is persistent;
if (stores.get(storeName).persistent()) {
Long offset = ackedOffsets.get(part);
Long offset = ackedOffsets.get(topicPartition);

if (offset != null) {
// store the last offset + 1 (the log position after restoration)
checkpointOffsets.put(part, offset + 1);
checkpointOffsets.put(topicPartition, offset + 1);
} else {
// if no record was produced. we need to check the restored offset.
offset = restoredOffsets.get(part);
offset = restoredOffsets.get(topicPartition);
if (offset != null)
checkpointOffsets.put(part, offset);
checkpointOffsets.put(topicPartition, offset);
}
}
}
Expand All @@ -400,7 +385,7 @@ public void close(Map<TopicPartition, Long> ackedOffsets) throws IOException {
private int getPartition(String topic) {
TopicPartition partition = partitionForTopic.get(topic);

return partition == null ? defaultPartition : partition.partition();
return partition == null ? taskId.partition : partition.partition();
}

void registerGlobalStateStores(final List<StateStore> stateStores) {
Expand Down
Loading