Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class InternalStreamsBuilder implements InternalNameProvider {
private final AtomicInteger buildPriorityIndex = new AtomicInteger(0);
private final LinkedHashMap<StreamsGraphNode, LinkedHashSet<OptimizableRepartitionNode>> keyChangingOperationsToOptimizableRepartitionNodes = new LinkedHashMap<>();
private final LinkedHashSet<StreamsGraphNode> mergeNodes = new LinkedHashSet<>();
private final LinkedHashSet<StreamsGraphNode> tableSourceNodes = new LinkedHashSet<>();

private static final String TOPOLOGY_ROOT = "root";
private static final Logger LOG = LoggerFactory.getLogger(InternalStreamsBuilder.class);
Expand Down Expand Up @@ -254,6 +255,8 @@ private void maybeAddNodeForOptimizationMetadata(final StreamsGraphNode node) {
}
} else if (node.isMergeNode()) {
mergeNodes.add(node);
} else if (node instanceof TableSourceNode) {
tableSourceNodes.add(node);
}
}

Expand Down Expand Up @@ -292,10 +295,16 @@ private void maybePerformOptimizations(final Properties props) {

if (props != null && StreamsConfig.OPTIMIZE.equals(props.getProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION))) {
LOG.debug("Optimizing the Kafka Streams graph for repartition nodes");
optimizeKTableSourceTopics();
maybeOptimizeRepartitionOperations();
}
}

private void optimizeKTableSourceTopics() {
LOG.debug("Marking KTable source nodes to optimize using source topic for changelogs ");
tableSourceNodes.forEach(node -> ((TableSourceNode) node).reuseSourceTopicForChangeLog(true));
}

@SuppressWarnings("unchecked")
private void maybeOptimizeRepartitionOperations() {
maybeUpdateKeyChangingRepartitionNodeMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class TableSourceNode<K, V> extends StreamSourceNode<K, V> {
private final ProcessorParameters<K, V> processorParameters;
private final String sourceName;
private final boolean isGlobalKTable;
private boolean shouldReuseSourceTopicForChangelog = false;

private TableSourceNode(final String nodeName,
final String sourceName,
Expand All @@ -57,6 +58,11 @@ private TableSourceNode(final String nodeName,
this.materializedInternal = materializedInternal;
}


public void reuseSourceTopicForChangeLog(final boolean shouldReuseSourceTopicForChangelog) {
this.shouldReuseSourceTopicForChangelog = shouldReuseSourceTopicForChangelog;
}

@Override
public String toString() {
return "TableSourceNode{" +
Expand Down Expand Up @@ -104,7 +110,11 @@ public void writeToTopology(final InternalTopologyBuilder topologyBuilder) {
final KTableSource<K, V> ktableSource = (KTableSource<K, V>) processorParameters.processorSupplier();
if (ktableSource.queryableName() != null) {
topologyBuilder.addStateStore(storeBuilder, nodeName());
topologyBuilder.markSourceStoreAndTopic(storeBuilder, topicName);

if (shouldReuseSourceTopicForChangelog) {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the optimization and we only apply it for materialized KTable objects.

storeBuilder.withLoggingDisabled();
topologyBuilder.connectSourceStoreAndTopic(storeBuilder.name(), topicName);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,6 @@ public class InternalTopologyBuilder {

private Map<Integer, Set<String>> nodeGroups = null;

// TODO: this is only temporary for 2.0 and should be removed
private final Map<StoreBuilder, String> storeToSourceChangelogTopic = new HashMap<>();

public static class StateStoreFactory {
private final StoreBuilder builder;
private final Set<String> users = new HashSet<>();
Expand Down Expand Up @@ -359,10 +356,6 @@ public synchronized final InternalTopologyBuilder rewriteTopology(final StreamsC
globalStateStores.put(storeBuilder.name(), storeBuilder.build());
}

// adjust the topology if optimization is turned on.
// TODO: to be removed post 2.0
adjust(config);

return this;
}

Expand Down Expand Up @@ -606,22 +599,14 @@ public final void connectProcessorAndStateStores(final String processorName,
nodeGroups = null;
}

private void connectSourceStoreAndTopic(final String sourceStoreName,
public void connectSourceStoreAndTopic(final String sourceStoreName,
final String topic) {
if (storeToChangelogTopic.containsKey(sourceStoreName)) {
throw new TopologyException("Source store " + sourceStoreName + " is already added.");
}
storeToChangelogTopic.put(sourceStoreName, topic);
}

public final void markSourceStoreAndTopic(final StoreBuilder storeBuilder,
final String topic) {
if (storeToSourceChangelogTopic.containsKey(storeBuilder)) {
throw new TopologyException("Source store " + storeBuilder.name() + " is already used.");
}
storeToSourceChangelogTopic.put(storeBuilder, topic);
}

public final void addInternalTopic(final String topicName) {
Objects.requireNonNull(topicName, "topicName can't be null");
internalTopicNames.add(topicName);
Expand Down Expand Up @@ -1071,25 +1056,6 @@ public synchronized Map<Integer, TopicsInfo> topicGroups() {
return Collections.unmodifiableMap(topicGroups);
}

// Adjust the generated topology based on the configs.
// Not exposed as public API and should be removed post 2.0
private void adjust(final StreamsConfig config) {
final boolean enableOptimization20 =
config.getString(StreamsConfig.TOPOLOGY_OPTIMIZATION).equals(StreamsConfig.OPTIMIZE);

if (enableOptimization20) {
for (final Map.Entry<StoreBuilder, String> entry : storeToSourceChangelogTopic.entrySet()) {
final StoreBuilder storeBuilder = entry.getKey();
final String topicName = entry.getValue();

// update store map to disable logging for this store
storeBuilder.withLoggingDisabled();
addStateStore(storeBuilder, true);
connectSourceStoreAndTopic(storeBuilder.name(), topicName);
}
}
}

private void setRegexMatchedTopicsToSourceNodes() {
if (subscriptionUpdates.hasUpdates()) {
for (final Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,9 +349,9 @@ public void shouldNotMaterializeStoresIfNotRequired() {
public void shouldReuseSourceTopicAsChangelogsWithOptimization20() {
final String topic = "topic";
builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("store"));
final Topology topology = builder.build();
final Properties props = StreamsTestUtils.getStreamsConfig();
props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
final Topology topology = builder.build(props);

final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(topology);
internalTopologyBuilder.rewriteTopology(new StreamsConfig(props));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public void shouldRestoreStateFromSourceTopic() throws Exception {
}
});

kafkaStreams = new KafkaStreams(builder.build(), props);
kafkaStreams = new KafkaStreams(builder.build(props), props);
kafkaStreams.setStateListener((newState, oldState) -> {
if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
startupLatch.countDown();
Expand Down