diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java index 1ce1e7451a71d..a450f8ead1ae6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java @@ -26,6 +26,7 @@ import org.apache.kafka.streams.kstream.SlidingWindows; import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windows; +import org.apache.kafka.streams.kstream.internals.graph.GracePeriodGraphNode; import org.apache.kafka.streams.kstream.internals.graph.GraphNode; import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder; import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode; @@ -110,10 +111,11 @@ KTable build(final Map "-cogroup-agg-" + counter++, builder, CogroupedKStreamImpl.AGGREGATE_NAME); - final ProcessorGraphNode aggProcessorNode = - new ProcessorGraphNode<>( + final GracePeriodGraphNode aggProcessorNode = + new GracePeriodGraphNode<>( kStreamAggProcessorName, - new ProcessorParameters<>(parentProcessor, kStreamAggProcessorName) + new ProcessorParameters<>(parentProcessor, kStreamAggProcessorName), + windows.gracePeriodMs() ); processors.add(aggProcessorNode); builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), aggProcessorNode); @@ -149,10 +151,12 @@ KTable build(final Map, Aggregator aggProcessorNode = - new ProcessorGraphNode<>( + final long gracePeriod = sessionWindows.gracePeriodMs() + sessionWindows.inactivityGap(); + final GracePeriodGraphNode aggProcessorNode = + new GracePeriodGraphNode<>( kStreamAggProcessorName, - new ProcessorParameters<>(parentProcessor, kStreamAggProcessorName) + new ProcessorParameters<>(parentProcessor, kStreamAggProcessorName), + gracePeriod ); processors.add(aggProcessorNode); builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), aggProcessorNode); @@ -187,10 +191,11 @@ KTable build(final Map, Aggregator aggProcessorNode = - new ProcessorGraphNode<>( + final GracePeriodGraphNode aggProcessorNode = + new GracePeriodGraphNode<>( kStreamAggProcessorName, - new ProcessorParameters<>(parentProcessor, kStreamAggProcessorName) + new ProcessorParameters<>(parentProcessor, kStreamAggProcessorName), + slidingWindows.gracePeriodMs() ); processors.add(aggProcessorNode); builder.addGraphNode(parentNodes.get(kGroupedStream.getKey()), aggProcessorNode); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java index bc84e69a1ff6d..b99034c5306b5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java @@ -20,10 +20,10 @@ import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.internals.graph.GracePeriodGraphNode; import org.apache.kafka.streams.kstream.internals.graph.GraphNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode; import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters; -import org.apache.kafka.streams.processor.internals.StoreFactory; import java.util.Collections; import java.util.Set; @@ -66,23 +66,67 @@ class GroupedStreamAggregateBuilder { this.userProvidedRepartitionTopicName = groupedInternal.name(); } - KTable build(final NamedInternal functionName, - final StoreFactory storeFactory, - final KStreamAggProcessorSupplier aggregateSupplier, - final String queryableStoreName, - final Serde keySerde, - final Serde valueSerde, - final boolean isOutputVersioned) { - assert queryableStoreName == null || queryableStoreName.equals(storeFactory.storeName()); + KTable buildNonWindowed(final NamedInternal functionName, + final String storeName, + final KStreamAggProcessorSupplier aggregateSupplier, + final String queryableStoreName, + final Serde keySerde, + final Serde valueSerde, + final boolean isOutputVersioned) { + final String aggFunctionName = functionName.name(); + + final ProcessorGraphNode aggProcessorNode = + new ProcessorGraphNode<>( + aggFunctionName, + new ProcessorParameters<>(aggregateSupplier, aggFunctionName) + ); + aggProcessorNode.setOutputVersioned(isOutputVersioned); + + return build(aggFunctionName, storeName, aggregateSupplier, aggProcessorNode, queryableStoreName, keySerde, valueSerde); + } + + KTable buildWindowed(final NamedInternal functionName, + final String storeName, + final long gracePeriod, + final KStreamAggProcessorSupplier aggregateSupplier, + final String queryableStoreName, + final Serde keySerde, + final Serde valueSerde, + final boolean isOutputVersioned) { final String aggFunctionName = functionName.name(); + final GracePeriodGraphNode gracePeriodAggProcessorNode = + new GracePeriodGraphNode<>( + aggFunctionName, + new ProcessorParameters<>(aggregateSupplier, aggFunctionName), + gracePeriod + ); + + gracePeriodAggProcessorNode.setOutputVersioned(isOutputVersioned); + + return build(aggFunctionName, storeName, aggregateSupplier, gracePeriodAggProcessorNode, queryableStoreName, keySerde, valueSerde); + } + + private KTable build(final String aggFunctionName, + final String storeName, + final KStreamAggProcessorSupplier aggregateSupplier, + final ProcessorGraphNode aggProcessorNode, + final String queryableStoreName, + final Serde keySerde, + final Serde valueSerde) { + if (!(queryableStoreName == null || queryableStoreName.equals(storeName))) { + throw new IllegalStateException(String.format("queryableStoreName should be null or equal to storeName" + + " but got storeName='%s' and queryableStoreName='%s'", + storeName, queryableStoreName)); + } + String sourceName = this.name; GraphNode parentNode = graphNode; if (repartitionRequired) { final OptimizableRepartitionNodeBuilder repartitionNodeBuilder = optimizableRepartitionNodeBuilder(); - final String repartitionTopicPrefix = userProvidedRepartitionTopicName != null ? userProvidedRepartitionTopicName : storeFactory.storeName(); + final String repartitionTopicPrefix = userProvidedRepartitionTopicName != null ? userProvidedRepartitionTopicName : storeName; sourceName = createRepartitionSource(repartitionTopicPrefix, repartitionNodeBuilder); // First time through we need to create a repartition node. @@ -97,14 +141,7 @@ KTable build(final NamedInternal functionName, parentNode = repartitionNode; } - final ProcessorGraphNode statefulProcessorNode = - new ProcessorGraphNode<>( - aggFunctionName, - new ProcessorParameters<>(aggregateSupplier, aggFunctionName) - ); - statefulProcessorNode.setOutputVersioned(isOutputVersioned); - - builder.addGraphNode(parentNode, statefulProcessorNode); + builder.addGraphNode(parentNode, aggProcessorNode); return new KTableImpl<>(aggFunctionName, keySerde, @@ -112,9 +149,8 @@ KTable build(final NamedInternal functionName, sourceName.equals(this.name) ? subTopologySourceNodes : Collections.singleton(sourceName), queryableStoreName, aggregateSupplier, - statefulProcessorNode, + aggProcessorNode, builder); - } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java index cc335e1383de3..73c6174b27bdc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java @@ -244,9 +244,9 @@ private KTable doAggregate(final KStreamAggProcessorSupplier storeFactory) { - return aggregateBuilder.build( + return aggregateBuilder.buildNonWindowed( new NamedInternal(functionName), - storeFactory, + storeFactory.storeName(), aggregateSupplier, storeFactory.queryableStoreName(), storeFactory.keySerde(), diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 7deb9468c3fbc..5ea0d6db3f503 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -1225,8 +1225,7 @@ public KStream process( } final String name = new NamedInternal(named).name(); - final ProcessorToStateConnectorNode processNode = new - ProcessorToStateConnectorNode<>( + final ProcessorToStateConnectorNode processNode = new ProcessorToStateConnectorNode<>( name, new ProcessorParameters<>(processorSupplier, name), stateStoreNames); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java index 989984d42f8f6..0c0f557b5c9b5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SessionWindowedKStreamImpl.java @@ -110,10 +110,12 @@ private KTable, Long> doCount(final Named named, final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); final StoreFactory storeFactory = new SessionStoreMaterializer<>(materializedInternal, windows, emitStrategy); + final long gracePeriod = windows.gracePeriodMs() + windows.inactivityGap(); - return aggregateBuilder.build( + return aggregateBuilder.buildWindowed( new NamedInternal(aggregateName), - storeFactory, + storeFactory.storeName(), + gracePeriod, new KStreamSessionWindowAggregate<>( windows, storeFactory, @@ -162,10 +164,12 @@ public KTable, V> reduce(final Reducer reducer, final String reduceName = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME); final StoreFactory storeFactory = new SessionStoreMaterializer<>(materializedInternal, windows, emitStrategy); + final long gracePeriod = windows.gracePeriodMs() + windows.inactivityGap(); - return aggregateBuilder.build( + return aggregateBuilder.buildWindowed( new NamedInternal(reduceName), - storeFactory, + storeFactory.storeName(), + gracePeriod, new KStreamSessionWindowAggregate<>( windows, storeFactory, @@ -222,10 +226,12 @@ public KTable, VR> aggregate(final Initializer initializer, final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); final StoreFactory storeFactory = new SessionStoreMaterializer<>(materializedInternal, windows, emitStrategy); + final long gracePeriod = windows.gracePeriodMs() + windows.inactivityGap(); - return aggregateBuilder.build( + return aggregateBuilder.buildWindowed( new NamedInternal(aggregateName), - storeFactory, + storeFactory.storeName(), + gracePeriod, new KStreamSessionWindowAggregate<>( windows, storeFactory, diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java index 16b2d0185aee0..c2af4652f8fbc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedKStreamImpl.java @@ -93,9 +93,10 @@ private KTable, Long> doCount(final Named named, final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); final StoreFactory storeFactory = new SlidingWindowStoreMaterializer<>(materializedInternal, windows, emitStrategy); - return aggregateBuilder.build( + return aggregateBuilder.buildWindowed( new NamedInternal(aggregateName), - storeFactory, + storeFactory.storeName(), + windows.gracePeriodMs(), new KStreamSlidingWindowAggregate<>(windows, storeFactory, emitStrategy, aggregateBuilder.countInitializer, aggregateBuilder.countAggregator), materializedInternal.queryableStoreName(), materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifferenceMs()) : null, @@ -139,9 +140,10 @@ public KTable, VR> aggregate(final Initializer initializer, final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); final StoreFactory storeFactory = new SlidingWindowStoreMaterializer<>(materializedInternal, windows, emitStrategy); - return aggregateBuilder.build( + return aggregateBuilder.buildWindowed( new NamedInternal(aggregateName), - storeFactory, + storeFactory.storeName(), + windows.gracePeriodMs(), new KStreamSlidingWindowAggregate<>(windows, storeFactory, emitStrategy, initializer, aggregator), materializedInternal.queryableStoreName(), materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifferenceMs()) : null, @@ -186,9 +188,10 @@ public KTable, V> reduce(final Reducer reducer, final String reduceName = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME); final StoreFactory storeFactory = new SlidingWindowStoreMaterializer<>(materializedInternal, windows, emitStrategy); - return aggregateBuilder.build( + return aggregateBuilder.buildWindowed( new NamedInternal(reduceName), - storeFactory, + storeFactory.storeName(), + windows.gracePeriodMs(), new KStreamSlidingWindowAggregate<>(windows, storeFactory, emitStrategy, aggregateBuilder.reduceInitializer, aggregatorForReducer(reducer)), materializedInternal.queryableStoreName(), materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<>(materializedInternal.keySerde(), windows.timeDifferenceMs()) : null, diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java index 80a671abdc5eb..5240f6f0ef095 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/TimeWindowedKStreamImpl.java @@ -105,9 +105,10 @@ private KTable, Long> doCount(final Named named, final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); final StoreFactory storeFactory = new WindowStoreMaterializer<>(materializedInternal, windows, emitStrategy); - return aggregateBuilder.build( + return aggregateBuilder.buildWindowed( new NamedInternal(aggregateName), - storeFactory, + storeFactory.storeName(), + windows.gracePeriodMs(), new KStreamWindowAggregate<>( windows, storeFactory, @@ -158,9 +159,10 @@ public KTable, VR> aggregate(final Initializer initializer, final String aggregateName = new NamedInternal(named).orElseGenerateWithPrefix(builder, AGGREGATE_NAME); final StoreFactory storeFactory = new WindowStoreMaterializer<>(materializedInternal, windows, emitStrategy); - return aggregateBuilder.build( + return aggregateBuilder.buildWindowed( new NamedInternal(aggregateName), - storeFactory, + storeFactory.storeName(), + windows.gracePeriodMs(), new KStreamWindowAggregate<>( windows, storeFactory, @@ -210,9 +212,10 @@ public KTable, V> reduce(final Reducer reducer, final String reduceName = new NamedInternal(named).orElseGenerateWithPrefix(builder, REDUCE_NAME); final StoreFactory storeFactory = new WindowStoreMaterializer<>(materializedInternal, windows, emitStrategy); - return aggregateBuilder.build( + return aggregateBuilder.buildWindowed( new NamedInternal(reduceName), - storeFactory, + storeFactory.storeName(), + windows.gracePeriodMs(), new KStreamWindowAggregate<>( windows, storeFactory, diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GracePeriodGraphNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GracePeriodGraphNode.java new file mode 100644 index 0000000000000..c6ed537fd0bf9 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GracePeriodGraphNode.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals.graph; + +/** + * Represents a stateful {@link ProcessorGraphNode} where a semantic grace period is defined for the processor + * and its state. + */ +public class GracePeriodGraphNode extends ProcessorGraphNode { + + private final long gracePeriod; + + public GracePeriodGraphNode(final String nodeName, + final ProcessorParameters processorParameters, + final long gracePeriod) { + super(nodeName, processorParameters); + this.gracePeriod = gracePeriod; + } + + public long gracePeriod() { + return gracePeriod; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java index d58b3a3b1a955..09ed36284a811 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtil.java @@ -17,13 +17,6 @@ package org.apache.kafka.streams.kstream.internals.graph; import org.apache.kafka.streams.errors.TopologyException; -import org.apache.kafka.streams.kstream.SessionWindows; -import org.apache.kafka.streams.kstream.SlidingWindows; -import org.apache.kafka.streams.kstream.Windows; -import org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate; -import org.apache.kafka.streams.kstream.internals.KStreamSlidingWindowAggregate; -import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate; -import org.apache.kafka.streams.processor.api.ProcessorSupplier; public final class GraphGraceSearchUtil { private GraphGraceSearchUtil() {} @@ -32,6 +25,7 @@ public static long findAndVerifyWindowGrace(final GraphNode graphNode) { return findAndVerifyWindowGrace(graphNode, ""); } + @SuppressWarnings("rawtypes") private static long findAndVerifyWindowGrace(final GraphNode graphNode, final String chain) { // error base case: we traversed off the end of the graph without finding a window definition if (graphNode == null) { @@ -40,11 +34,8 @@ private static long findAndVerifyWindowGrace(final GraphNode graphNode, final St ); } // base case: return if this node defines a grace period. - { - final Long gracePeriod = extractGracePeriod(graphNode); - if (gracePeriod != null) { - return gracePeriod; - } + if (graphNode instanceof GracePeriodGraphNode) { + return ((GracePeriodGraphNode) graphNode).gracePeriod(); } final String newChain = chain.equals("") ? graphNode.nodeName() : graphNode.nodeName() + "->" + chain; @@ -70,27 +61,4 @@ private static long findAndVerifyWindowGrace(final GraphNode graphNode, final St return inheritedGrace; } - @SuppressWarnings("rawtypes") - private static Long extractGracePeriod(final GraphNode node) { - if (node instanceof ProcessorGraphNode) { - final ProcessorSupplier processorSupplier = ((ProcessorGraphNode) node).processorParameters().processorSupplier(); - if (processorSupplier instanceof KStreamWindowAggregate) { - final KStreamWindowAggregate kStreamWindowAggregate = (KStreamWindowAggregate) processorSupplier; - final Windows windows = kStreamWindowAggregate.windows(); - return windows.gracePeriodMs(); - } else if (processorSupplier instanceof KStreamSessionWindowAggregate) { - final KStreamSessionWindowAggregate kStreamSessionWindowAggregate = (KStreamSessionWindowAggregate) processorSupplier; - final SessionWindows windows = kStreamSessionWindowAggregate.windows(); - return windows.gracePeriodMs() + windows.inactivityGap(); - } else if (processorSupplier instanceof KStreamSlidingWindowAggregate) { - final KStreamSlidingWindowAggregate kStreamSlidingWindowAggregate = (KStreamSlidingWindowAggregate) processorSupplier; - final SlidingWindows windows = kStreamSlidingWindowAggregate.windows(); - return windows.gracePeriodMs(); - } else { - return null; - } - } else { - return null; - } - } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java index 4c37fa2f5cf1f..5c52187bab2e1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java @@ -811,7 +811,7 @@ public void shouldWorkBeforeJoinLeft() { } @Test - public void shouldWorkWithCogrouped() { + public void shouldWorkWithCogroupedTimeWindows() { final StreamsBuilder builder = new StreamsBuilder(); final KGroupedStream stream1 = builder.stream("one", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())); @@ -823,6 +823,32 @@ public void shouldWorkWithCogrouped() { .toStream(); } + @Test + public void shouldWorkWithCogroupedSlidingWindows() { + final StreamsBuilder builder = new StreamsBuilder(); + + final KGroupedStream stream1 = builder.stream("one", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())); + final KGroupedStream stream2 = builder.stream("two", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())); + final KStream, Object> cogrouped = stream1.cogroup((key, value, aggregate) -> aggregate + value).cogroup(stream2, (key, value, aggregate) -> aggregate + value) + .windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(15))) + .aggregate(() -> "", Named.as("test"), Materialized.as("store")) + .suppress(Suppressed.untilWindowCloses(unbounded())) + .toStream(); + } + + @Test + public void shouldWorkWithCogroupedSessionWindows() { + final StreamsBuilder builder = new StreamsBuilder(); + + final KGroupedStream stream1 = builder.stream("one", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())); + final KGroupedStream stream2 = builder.stream("two", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String())); + final KStream, Object> cogrouped = stream1.cogroup((key, value, aggregate) -> aggregate + value).cogroup(stream2, (key, value, aggregate) -> aggregate + value) + .windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofMinutes(15), Duration.ofMinutes(5))) + .aggregate(() -> "", (k, v1, v2) -> "", Named.as("test"), Materialized.as("store")) + .suppress(Suppressed.untilWindowCloses(unbounded())) + .toStream(); + } + private static void verify(final List> results, final List> expectedResults) { if (results.size() != expectedResults.size()) { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java index 5db1439835cb2..6ed7dea0fb3dd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/GraphGraceSearchUtilTest.java @@ -87,7 +87,7 @@ public void process(final Record record) {} @Test public void shouldExtractGraceFromKStreamWindowAggregateNode() { final TimeWindows windows = TimeWindows.ofSizeAndGrace(ofMillis(10L), ofMillis(1234L)); - final ProcessorGraphNode node = new ProcessorGraphNode<>( + final ProcessorGraphNode node = new GracePeriodGraphNode<>( "asdf", new ProcessorParameters<>( new KStreamWindowAggregate( @@ -98,18 +98,19 @@ public void shouldExtractGraceFromKStreamWindowAggregateNode() { null ), "asdf" - ) + ), + windows.gracePeriodMs() ); final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node); - assertThat(extracted, is(windows.gracePeriodMs())); + assertThat(extracted, is(1234L)); } @Test public void shouldExtractGraceFromKStreamSessionWindowAggregateNode() { final SessionWindows windows = SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L)); - final ProcessorGraphNode node = new ProcessorGraphNode<>( + final ProcessorGraphNode node = new GracePeriodGraphNode<>( "asdf", new ProcessorParameters<>( new KStreamSessionWindowAggregate( @@ -121,21 +122,23 @@ public void shouldExtractGraceFromKStreamSessionWindowAggregateNode() { null ), "asdf" - ) + ), + windows.gracePeriodMs() + windows.inactivityGap() ); final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node); - assertThat(extracted, is(windows.gracePeriodMs() + windows.inactivityGap())); + assertThat(extracted, is(1244L)); } @Test public void shouldExtractGraceFromSessionAncestorThroughStatefulParent() { final SessionWindows windows = SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L)); - final ProcessorGraphNode graceGrandparent = new ProcessorGraphNode<>( + final ProcessorGraphNode graceGrandparent = new GracePeriodGraphNode<>( "asdf", new ProcessorParameters<>(new KStreamSessionWindowAggregate( windows, mockStoreFactory("asdf"), EmitStrategy.onWindowUpdate(), null, null, null - ), "asdf") + ), "asdf"), + windows.gracePeriodMs() + windows.inactivityGap() ); final ProcessorGraphNode statefulParent = new ProcessorGraphNode<>( @@ -167,13 +170,13 @@ public void process(final Record record) {} statefulParent.addChild(node); final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node); - assertThat(extracted, is(windows.gracePeriodMs() + windows.inactivityGap())); + assertThat(extracted, is(1244L)); } @Test public void shouldExtractGraceFromSessionAncestorThroughStatelessParent() { final SessionWindows windows = SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L)); - final ProcessorGraphNode graceGrandparent = new ProcessorGraphNode<>( + final ProcessorGraphNode graceGrandparent = new GracePeriodGraphNode<>( "asdf", new ProcessorParameters<>( new KStreamSessionWindowAggregate( @@ -185,7 +188,8 @@ public void shouldExtractGraceFromSessionAncestorThroughStatelessParent() { null ), "asdf" - ) + ), + windows.gracePeriodMs() + windows.inactivityGap() ); final ProcessorGraphNode statelessParent = new ProcessorGraphNode<>( @@ -217,16 +221,17 @@ public void process(final Record record) {} statelessParent.addChild(node); final long extracted = GraphGraceSearchUtil.findAndVerifyWindowGrace(node); - assertThat(extracted, is(windows.gracePeriodMs() + windows.inactivityGap())); + assertThat(extracted, is(1244L)); } @Test public void shouldUseMaxIfMultiParentsDoNotAgreeOnGrace() { - final ProcessorGraphNode leftParent = new ProcessorGraphNode<>( + final SessionWindows leftWindows = SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L)); + final ProcessorGraphNode leftParent = new GracePeriodGraphNode<>( "asdf", new ProcessorParameters<>( new KStreamSessionWindowAggregate( - SessionWindows.ofInactivityGapAndGrace(ofMillis(10L), ofMillis(1234L)), + leftWindows, mockStoreFactory("asdf"), EmitStrategy.onWindowUpdate(), null, @@ -234,21 +239,24 @@ public void shouldUseMaxIfMultiParentsDoNotAgreeOnGrace() { null ), "asdf" - ) + ), + leftWindows.gracePeriodMs() + leftWindows.inactivityGap() ); - final ProcessorGraphNode rightParent = new ProcessorGraphNode<>( + final TimeWindows rightWindows = TimeWindows.ofSizeAndGrace(ofMillis(10L), ofMillis(4321L)); + final ProcessorGraphNode rightParent = new GracePeriodGraphNode<>( "asdf", new ProcessorParameters<>( new KStreamWindowAggregate( - TimeWindows.ofSizeAndGrace(ofMillis(10L), ofMillis(4321L)), + rightWindows, mockStoreFactory("asdf"), EmitStrategy.onWindowUpdate(), null, null ), "asdf" - ) + ), + rightWindows.gracePeriodMs() ); final ProcessorGraphNode node = new ProcessorGraphNode<>(