Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.StoreFactory;

import java.util.ArrayList;
Expand Down Expand Up @@ -61,24 +59,25 @@ <KR> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? sup
processRepartitions(groupPatterns, storeFactory.storeName());
final Collection<GraphNode> processors = new ArrayList<>();
final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
boolean stateCreated = false;

int counter = 0;
for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor =
new KStreamAggregate<>(storeFactory, initializer, kGroupedStream.getValue());
parentProcessors.add(parentProcessor);
final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
named.suffixWithOrElseGet(
"-cogroup-agg-" + counter++,
builder,
CogroupedKStreamImpl.AGGREGATE_NAME),
stateCreated,
storeFactory,
parentProcessor);
statefulProcessorNode.setOutputVersioned(isOutputVersioned);
stateCreated = true;
processors.add(statefulProcessorNode);
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);

final String kStreamAggProcessorName = named.suffixWithOrElseGet(
"-cogroup-agg-" + counter++,
builder,
CogroupedKStreamImpl.AGGREGATE_NAME);
final ProcessorGraphNode<K, ?> aggProcessorNode =
new ProcessorGraphNode<>(
kStreamAggProcessorName,
new ProcessorParameters<>(parentProcessor, kStreamAggProcessorName)
);
aggProcessorNode.setOutputVersioned(isOutputVersioned);
processors.add(aggProcessorNode);
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), aggProcessorNode);
}
return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.storeName());
}
Expand All @@ -96,7 +95,6 @@ <KR, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>

final Collection<GraphNode> processors = new ArrayList<>();
final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
boolean stateCreated = false;
int counter = 0;
for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor =
Expand All @@ -107,17 +105,18 @@ <KR, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>
initializer,
kGroupedStream.getValue());
parentProcessors.add(parentProcessor);
final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
named.suffixWithOrElseGet(
"-cogroup-agg-" + counter++,
builder,
CogroupedKStreamImpl.AGGREGATE_NAME),
stateCreated,
storeFactory,
parentProcessor);
stateCreated = true;
processors.add(statefulProcessorNode);
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);

final String kStreamAggProcessorName = named.suffixWithOrElseGet(
"-cogroup-agg-" + counter++,
builder,
CogroupedKStreamImpl.AGGREGATE_NAME);
final ProcessorGraphNode<K, ?> aggProcessorNode =
new ProcessorGraphNode<>(
kStreamAggProcessorName,
new ProcessorParameters<>(parentProcessor, kStreamAggProcessorName)
);
processors.add(aggProcessorNode);
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), aggProcessorNode);
}
return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.storeName());
}
Expand All @@ -135,7 +134,6 @@ <KR> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? sup
processRepartitions(groupPatterns, storeFactory.storeName());
final Collection<GraphNode> processors = new ArrayList<>();
final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
boolean stateCreated = false;
int counter = 0;
for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor =
Expand All @@ -147,17 +145,17 @@ <KR> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? sup
kGroupedStream.getValue(),
sessionMerger);
parentProcessors.add(parentProcessor);
final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
named.suffixWithOrElseGet(
"-cogroup-agg-" + counter++,
builder,
CogroupedKStreamImpl.AGGREGATE_NAME),
stateCreated,
storeFactory,
parentProcessor);
stateCreated = true;
processors.add(statefulProcessorNode);
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
final String kStreamAggProcessorName = named.suffixWithOrElseGet(
"-cogroup-agg-" + counter++,
builder,
CogroupedKStreamImpl.AGGREGATE_NAME);
final ProcessorGraphNode<K, ?> aggProcessorNode =
new ProcessorGraphNode<>(
kStreamAggProcessorName,
new ProcessorParameters<>(parentProcessor, kStreamAggProcessorName)
);
processors.add(aggProcessorNode);
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), aggProcessorNode);
}
return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.storeName());
}
Expand All @@ -174,7 +172,6 @@ <KR> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? sup
processRepartitions(groupPatterns, storeFactory.storeName());
final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
final Collection<GraphNode> processors = new ArrayList<>();
boolean stateCreated = false;
int counter = 0;
for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
final KStreamAggProcessorSupplier<K, ?, K, ?> parentProcessor =
Expand All @@ -186,17 +183,17 @@ <KR> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? sup
initializer,
kGroupedStream.getValue());
parentProcessors.add(parentProcessor);
final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
named.suffixWithOrElseGet(
"-cogroup-agg-" + counter++,
builder,
CogroupedKStreamImpl.AGGREGATE_NAME),
stateCreated,
storeFactory,
parentProcessor);
stateCreated = true;
processors.add(statefulProcessorNode);
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), statefulProcessorNode);
final String kStreamAggProcessorName = named.suffixWithOrElseGet(
"-cogroup-agg-" + counter++,
builder,
CogroupedKStreamImpl.AGGREGATE_NAME);
final ProcessorGraphNode<K, ?> aggProcessorNode =
new ProcessorGraphNode<>(
kStreamAggProcessorName,
new ProcessorParameters<>(parentProcessor, kStreamAggProcessorName)
);
processors.add(aggProcessorNode);
builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), aggProcessorNode);
}
return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeFactory.storeName());
}
Expand Down Expand Up @@ -262,30 +259,6 @@ <KR, VIn> KTable<KR, VOut> createTable(final Collection<GraphNode> processors,
builder);
}

private StatefulProcessorNode<K, ?> getStatefulProcessorNode(final String processorName,
final boolean stateCreated,
final StoreFactory storeFactory,
final ProcessorSupplier<K, ?, K, ?> kStreamAggregate) {
final StatefulProcessorNode<K, ?> statefulProcessorNode;
if (!stateCreated) {
statefulProcessorNode =
new StatefulProcessorNode<>(
processorName,
new ProcessorParameters<>(kStreamAggregate, processorName),
storeFactory
);
} else {
statefulProcessorNode =
new StatefulProcessorNode<>(
processorName,
new ProcessorParameters<>(kStreamAggregate, processorName),
new String[]{storeFactory.storeName()}
);
}

return statefulProcessorNode;
}

@SuppressWarnings("unchecked")
private <VIn> void createRepartitionSource(final String repartitionTopicNamePrefix,
final OptimizableRepartitionNodeBuilder<K, ?> optimizableRepartitionNodeBuilder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.processor.internals.StoreFactory;

import java.util.Collections;
Expand Down Expand Up @@ -97,11 +97,10 @@ <KR, VR> KTable<KR, VR> build(final NamedInternal functionName,
parentNode = repartitionNode;
}

final StatefulProcessorNode<K, V> statefulProcessorNode =
new StatefulProcessorNode<>(
final ProcessorGraphNode<K, V> statefulProcessorNode =
new ProcessorGraphNode<>(
aggFunctionName,
new ProcessorParameters<>(aggregateSupplier, aggFunctionName),
new String[] {storeFactory.storeName()}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for line 57 above and independent from the PR: just a thought, could we pass in storeFactory.storeName() in the future?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new ProcessorParameters<>(aggregateSupplier, aggFunctionName)
);
statefulProcessorNode.setOutputVersioned(isOutputVersioned);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.kstream.internals.graph.GroupedTableOperationRepartitionNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
Expand Down Expand Up @@ -88,10 +88,9 @@ private <VAgg> KTable<K, VAgg> doAggregate(final ProcessorSupplier<K, Change<V>,
// the passed in StreamsGraphNode must be the parent of the repartition node
builder.addGraphNode(this.graphNode, repartitionGraphNode);

final StatefulProcessorNode statefulProcessorNode = new StatefulProcessorNode<>(
final ProcessorGraphNode statefulProcessorNode = new ProcessorGraphNode<>(
funcName,
new ProcessorParameters<>(aggregateSupplier, funcName),
new String[]{materialized.storeName()}
new ProcessorParameters<>(aggregateSupplier, funcName)
);
statefulProcessorNode.setOutputVersioned(materialized.storeSupplier() instanceof VersionedBytesStoreSupplier);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorToStateConnectorNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamSinkNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamTableJoinNode;
import org.apache.kafka.streams.kstream.internals.graph.StreamToTableNode;
Expand Down Expand Up @@ -1225,7 +1225,8 @@ public <KOut, VOut> KStream<KOut, VOut> process(
}

final String name = new NamedInternal(named).name();
final StatefulProcessorNode<? super K, ? super V> processNode = new StatefulProcessorNode<>(
final ProcessorToStateConnectorNode<? super K, ? super V> processNode = new
ProcessorToStateConnectorNode<>(
Comment thread
ableegoldman marked this conversation as resolved.
name,
new ProcessorParameters<>(processorSupplier, name),
stateStoreNames);
Expand Down Expand Up @@ -1270,7 +1271,7 @@ public <VOut> KStream<K, VOut> processValues(
}

final String name = new NamedInternal(named).name();
final StatefulProcessorNode<? super K, ? super V> processNode = new StatefulProcessorNode<>(
final ProcessorToStateConnectorNode<? super K, ? super V> processNode = new ProcessorToStateConnectorNode<>(
name,
new ProcessorParameters<>(processorSupplier, name),
stateStoreNames);
Expand Down
Loading