Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
f7aaa63
add new abstract processor
jeqo Mar 19, 2021
bed7c96
migrate kstream flat map values to new processor
jeqo Mar 23, 2021
9657601
apply suggestions
jeqo Mar 29, 2021
9983c9c
rollback change in test class
jeqo Mar 31, 2021
a619816
add new processor operator to kstream
jeqo Apr 1, 2021
19c8a8f
draft ktable mapvalues, filter and others
jeqo Apr 1, 2021
990ba66
ktablesource to new processor api
jeqo Apr 1, 2021
fdaf408
migrate ktable supress, reduce, and aggregate
jeqo Apr 8, 2021
ae995d2
moar migration
jeqo Apr 8, 2021
a94eb30
moar migration
jeqo Apr 8, 2021
ea67f68
migrate table joins
jeqo Apr 8, 2021
c7be1c2
migrate global ktable
jeqo Apr 8, 2021
0c3cbc9
migrate foreign key
jeqo Apr 8, 2021
8baf291
migrate ktable repartition map
jeqo Apr 8, 2021
662a10c
compile source
jeqo Apr 9, 2021
75a8f60
migrate test, first draft
jeqo Apr 9, 2021
8bd1018
migrate recordqueue
jeqo Apr 9, 2021
31f51b0
migrate testing
jeqo Apr 9, 2021
c41ecc7
tests compile!
jeqo Apr 9, 2021
d1f29ad
adjust kstream process
jeqo Apr 9, 2021
c5fb7eb
clean
jeqo Apr 9, 2021
fa52447
renames
jeqo Apr 9, 2021
7e83419
fix types order
jeqo Apr 9, 2021
b25fe00
add todos;
jeqo Apr 9, 2021
71fa2fd
adjust transformers with adapters
jeqo Apr 12, 2021
32f05a1
checkstail main
jeqo Apr 12, 2021
7c18abd
checkstyle test
jeqo Apr 12, 2021
34336fe
passing tests
jeqo Apr 12, 2021
b557fe0
fix duplicated store bug
jeqo Apr 12, 2021
dda022c
fix missing context
jeqo Apr 12, 2021
1aaadf2
add mock for old processor api
jeqo Apr 12, 2021
2c9285e
align types ordering
jeqo Apr 15, 2021
846f756
rollback change
jeqo Apr 15, 2021
5057f88
rm todo comment
jeqo Apr 15, 2021
0f9f492
align type names
jeqo Apr 15, 2021
5850705
align type names
jeqo Apr 15, 2021
46ca917
set types
jeqo Apr 15, 2021
ec89277
merged with trunk
jeqo May 10, 2021
d168526
Add support for KTable transforms with new ProcessorContext
jeqo May 13, 2021
442f351
compose forwarding disabled processor context based on internal
jeqo May 13, 2021
786c339
Merge remote-tracking branch 'upstream/trunk' into new-processor-kstr…
jeqo May 13, 2021
486657a
fix internal processor contexts on tests
jeqo May 13, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;

/**
* Interface that specifies how an exception from source node deserialization
Expand All @@ -31,14 +32,14 @@ public interface DeserializationExceptionHandler extends Configurable {
* Inspect a record and the exception received.
* <p>
* Note, that the passed in {@link ProcessorContext} only allows to access metadata like the task ID.
* However, it cannot be used to emit records via {@link ProcessorContext#forward(Object, Object)};
* However, it cannot be used to emit records via {@link ProcessorContext#forward(Record)};
* calling {@code forward()} (and some other methods) would result in a runtime exception.
*
* @param context processor context
* @param record record that failed deserialization
* @param exception the actual exception
*/
DeserializationHandlerResponse handle(final ProcessorContext context,
DeserializationHandlerResponse handle(final ProcessorContext<?, ?> context,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, man. I overlooked this in the KIP, and we can't just change this in-place, as it will break any subclasses.

What we need to do is deprecate this method and introduce a new one with a default implementation that calls back here. We can update the KIP with this change, since it's a simple oversight and follows established patterns for migrating interfaces.

final ConsumerRecord<byte[], byte[]> record,
final Exception exception);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -32,7 +32,7 @@ public class LogAndContinueExceptionHandler implements DeserializationExceptionH
private static final Logger log = LoggerFactory.getLogger(LogAndContinueExceptionHandler.class);

@Override
public DeserializationHandlerResponse handle(final ProcessorContext context,
public DeserializationHandlerResponse handle(final ProcessorContext<?, ?> context,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to deprecate also this method and add a new one? Technically, it is a class of the public API that can be extended.

final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package org.apache.kafka.streams.errors;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -32,7 +32,7 @@ public class LogAndFailExceptionHandler implements DeserializationExceptionHandl
private static final Logger log = LoggerFactory.getLogger(LogAndFailExceptionHandler.class);

@Override
public DeserializationHandlerResponse handle(final ProcessorContext context,
public DeserializationHandlerResponse handle(final ProcessorContext<?, ?> context,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to deprecate also this method and add a new one? Technically, it is a class of the public API that can be extended.

final ConsumerRecord<byte[], byte[]> record,
final Exception exception) {

Expand Down
4,134 changes: 2,196 additions & 1,938 deletions streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java

Large diffs are not rendered by default.

441 changes: 237 additions & 204 deletions streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
* record-pair of a {@link KStream}-{@link KStream}, {@link KStream}-{@link KTable}, or {@link KTable}-{@link KTable}
* join.
*
* @param <V1> first value type
* @param <V2> second value type
* @param <VR> joined value type
Comment on lines -26 to -28
Copy link
Copy Markdown
Contributor

@vvcephei vvcephei May 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also here: it doesn't seem strictly necessary to rename the generic parameters as part of this PR.

Specifically, funny story: these params used to be called V and V1, and we renamed them to V1 and V2 because we thought it made more sense :)

* @param <V> first value type
* @param <V1> second value type
* @param <VOut> joined value type
* @see KStream#join(KStream, ValueJoiner, JoinWindows)
* @see KStream#join(KStream, ValueJoiner, JoinWindows, StreamJoined)
* @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows)
Expand All @@ -40,7 +40,7 @@
* @see KTable#leftJoin(KTable, ValueJoiner)
* @see KTable#outerJoin(KTable, ValueJoiner)
*/
public interface ValueJoiner<V1, V2, VR> {
public interface ValueJoiner<V, V1, VOut> {

/**
* Return a joined value consisting of {@code value1} and {@code value2}.
Expand All @@ -49,5 +49,5 @@ public interface ValueJoiner<V1, V2, VR> {
* @param value2 the second value for joining
* @return the joined value
*/
VR apply(final V1 value1, final V2 value2);
VOut apply(final V value1, final V1 value2);
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
* record-pair of a {@link KStream}-{@link KStream}, {@link KStream}-{@link KTable}, or {@link KTable}-{@link KTable}
* join.
*
* @param <K1> key value type
* @param <V1> first value type
* @param <V2> second value type
* @param <VR> joined value type
* @param <K> key value type
* @param <V> first value type
* @param <V1> second value type
* @param <VOut> joined value type
* @see KStream#join(KStream, ValueJoinerWithKey, JoinWindows)
* @see KStream#join(KStream, ValueJoinerWithKey, JoinWindows, StreamJoined)
* @see KStream#leftJoin(KStream, ValueJoinerWithKey, JoinWindows)
Expand All @@ -44,7 +44,7 @@
* @see KStream#leftJoin(GlobalKTable, KeyValueMapper, ValueJoinerWithKey)
* @see KStream#leftJoin(GlobalKTable, KeyValueMapper, ValueJoinerWithKey, Named)
*/
public interface ValueJoinerWithKey<K1, V1, V2, VR> {
public interface ValueJoinerWithKey<K, V, V1, VOut> {

/**
* Return a joined value consisting of {@code readOnlyKey}, {@code value1} and {@code value2}.
Expand All @@ -54,5 +54,5 @@ public interface ValueJoinerWithKey<K1, V1, V2, VR> {
* @param value2 the second value for joining
* @return the joined value
*/
VR apply(final K1 readOnlyKey, final V1 value1, final V2 value2);
VOut apply(final K readOnlyKey, final V value1, final V1 value2);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@
*/
package org.apache.kafka.streams.kstream.internals;

import java.util.Collection;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
Comment on lines +19 to +22
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In KAFKA-10787 we agreed on an import order kafka, org.apache.kafka, com, net, org, java, javax and static imports. Additionally, there should be a empty line between import blocks.

Note, PR #10428 introduces check and a formatter for this.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeqo Yes, but don't feel so burdened - I am ready to expand the formatter to streams module as soon as the PR (currently formats core module only) is merged. 😉 @cadonna

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cadonna The sooner you merge the PR, I can start to apply the formatter to the streams module sooner. 😃

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.internals.ApiUtils;
import org.apache.kafka.streams.kstream.ValueJoiner;
Expand All @@ -31,11 +35,6 @@
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.state.StoreBuilder;

import java.util.Collection;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;

/*
* Any classes (KTable, KStream, etc) extending this class should follow the serde specification precedence ordering as:
*
Expand Down Expand Up @@ -144,7 +143,7 @@ public Set<StoreBuilder<?>> stores() {
};
}

static <K, V1, V2, VR> ValueJoinerWithKey<K, V1, V2, VR> toValueJoinerWithKey(final ValueJoiner<V1, V2, VR> valueJoiner) {
static <K, V, V1, VOut> ValueJoinerWithKey<K, V, V1, VOut> toValueJoinerWithKey(final ValueJoiner<V, V1, VOut> valueJoiner) {
Objects.requireNonNull(valueJoiner, "joiner can't be null");
return (readOnlyKey, value1, value2) -> valueJoiner.apply(value1, value2);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.state.StoreBuilder;

class CogroupedStreamAggregateBuilder<K, VOut> {
Expand All @@ -48,21 +48,20 @@ class CogroupedStreamAggregateBuilder<K, VOut> {
CogroupedStreamAggregateBuilder(final InternalStreamsBuilder builder) {
this.builder = builder;
}
@SuppressWarnings("unchecked")
<KR> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
final Initializer<VOut> initializer,
final NamedInternal named,
final StoreBuilder<?> storeBuilder,
final Serde<KR> keySerde,
final Serde<VOut> valueSerde,
final String queryableName) {
<KOut> KTable<KOut, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
final Initializer<VOut> initializer,
final NamedInternal named,
final StoreBuilder<?> storeBuilder,
final Serde<KOut> keySerde,
final Serde<VOut> valueSerde,
final String queryableName) {
processRepartitions(groupPatterns, storeBuilder);
final Collection<GraphNode> processors = new ArrayList<>();
final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
final Collection<KStreamAggregateProcessorSupplier> parentProcessors = new ArrayList<>();
boolean stateCreated = false;
int counter = 0;
for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
final KStreamAggProcessorSupplier<K, K, ?, ?> parentProcessor =
final KStreamAggregateProcessorSupplier<K, K, ?, ?> parentProcessor =
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be KStreamAggregateProcessorSupplier<K, ?, K, ?>? The positions of the parameters KOut and VIn on KStreamAggregateProcessorSupplier changed with respect to KStreamAggProcessorSupplier.

new KStreamAggregate<>(storeBuilder.name(), initializer, kGroupedStream.getValue());
parentProcessors.add(parentProcessor);
final StatefulProcessorNode<K, ?> statefulProcessorNode = getStatefulProcessorNode(
Expand All @@ -80,24 +79,23 @@ <KR> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? sup
return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeBuilder.name());
}

@SuppressWarnings("unchecked")
<KR, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
final Initializer<VOut> initializer,
final NamedInternal named,
final StoreBuilder<?> storeBuilder,
final Serde<KR> keySerde,
final Serde<VOut> valueSerde,
final String queryableName,
final Windows<W> windows) {
<KOut, W extends Window> KTable<KOut, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
final Initializer<VOut> initializer,
final NamedInternal named,
final StoreBuilder<?> storeBuilder,
final Serde<KOut> keySerde,
final Serde<VOut> valueSerde,
final String queryableName,
final Windows<W> windows) {
processRepartitions(groupPatterns, storeBuilder);

final Collection<GraphNode> processors = new ArrayList<>();
final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
final Collection<KStreamAggregateProcessorSupplier> parentProcessors = new ArrayList<>();
boolean stateCreated = false;
int counter = 0;
for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
final KStreamAggProcessorSupplier<K, K, ?, ?> parentProcessor =
(KStreamAggProcessorSupplier<K, K, ?, ?>) new KStreamWindowAggregate<K, K, VOut, W>(
final KStreamWindowAggregate<K, K, VOut, W> parentProcessor =
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be KStreamWindowAggregate<K, VOut, K, W>? Here I am not sure if I am missing something since the type parameter positions did not change. Why is the type parameter for V in KStreamWindowAggregate K and not ??

new KStreamWindowAggregate<>(
windows,
storeBuilder.name(),
initializer,
Expand All @@ -118,24 +116,23 @@ <KR, W extends Window> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>
return createTable(processors, parentProcessors, named, keySerde, valueSerde, queryableName, storeBuilder.name());
}

@SuppressWarnings("unchecked")
<KR> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
final Initializer<VOut> initializer,
final NamedInternal named,
final StoreBuilder<?> storeBuilder,
final Serde<KR> keySerde,
final Serde<VOut> valueSerde,
final String queryableName,
final SessionWindows sessionWindows,
final Merger<? super K, VOut> sessionMerger) {
<KOut> KTable<KOut, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
final Initializer<VOut> initializer,
final NamedInternal named,
final StoreBuilder<?> storeBuilder,
final Serde<KOut> keySerde,
final Serde<VOut> valueSerde,
final String queryableName,
final SessionWindows sessionWindows,
final Merger<? super K, VOut> sessionMerger) {
processRepartitions(groupPatterns, storeBuilder);
final Collection<GraphNode> processors = new ArrayList<>();
final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
final Collection<KStreamAggregateProcessorSupplier> parentProcessors = new ArrayList<>();
boolean stateCreated = false;
int counter = 0;
for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
final KStreamAggProcessorSupplier<K, K, ?, ?> parentProcessor =
(KStreamAggProcessorSupplier<K, K, ?, ?>) new KStreamSessionWindowAggregate<K, K, VOut>(
final KStreamSessionWindowAggregate<K, K, VOut> parentProcessor =
new KStreamSessionWindowAggregate<>(
sessionWindows,
storeBuilder.name(),
initializer,
Expand All @@ -158,22 +155,22 @@ <KR> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? sup
}

@SuppressWarnings("unchecked")
<KR> KTable<KR, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
final Initializer<VOut> initializer,
final NamedInternal named,
final StoreBuilder<?> storeBuilder,
final Serde<KR> keySerde,
final Serde<VOut> valueSerde,
final String queryableName,
final SlidingWindows slidingWindows) {
<KOut> KTable<KOut, VOut> build(final Map<KGroupedStreamImpl<K, ?>, Aggregator<? super K, ? super Object, VOut>> groupPatterns,
final Initializer<VOut> initializer,
final NamedInternal named,
final StoreBuilder<?> storeBuilder,
final Serde<KOut> keySerde,
final Serde<VOut> valueSerde,
final String queryableName,
final SlidingWindows slidingWindows) {
processRepartitions(groupPatterns, storeBuilder);
final Collection<KStreamAggProcessorSupplier> parentProcessors = new ArrayList<>();
final Collection<KStreamAggregateProcessorSupplier> parentProcessors = new ArrayList<>();
final Collection<GraphNode> processors = new ArrayList<>();
boolean stateCreated = false;
int counter = 0;
for (final Entry<KGroupedStreamImpl<K, ?>, Aggregator<? super K, Object, VOut>> kGroupedStream : groupPatterns.entrySet()) {
final KStreamAggProcessorSupplier<K, K, ?, ?> parentProcessor =
(KStreamAggProcessorSupplier<K, K, ?, ?>) new KStreamSlidingWindowAggregate<K, K, VOut>(
final KStreamSlidingWindowAggregate<K, K, VOut> parentProcessor =
new KStreamSlidingWindowAggregate<>(
slidingWindows,
storeBuilder.name(),
initializer,
Expand Down Expand Up @@ -225,26 +222,26 @@ private void processRepartitions(final Map<KGroupedStreamImpl<K, ?>, Aggregator<
}

@SuppressWarnings("unchecked")
<KR, VIn> KTable<KR, VOut> createTable(final Collection<GraphNode> processors,
final Collection<KStreamAggProcessorSupplier> parentProcessors,
final NamedInternal named,
final Serde<KR> keySerde,
final Serde<VOut> valueSerde,
final String queryableName,
final String storeName) {
<KOut, VIn> KTable<KOut, VOut> createTable(final Collection<GraphNode> processors,
final Collection<KStreamAggregateProcessorSupplier> parentProcessors,
final NamedInternal named,
final Serde<KOut> keySerde,
final Serde<VOut> valueSerde,
final String queryableName,
final String storeName) {

final String mergeProcessorName = named.suffixWithOrElseGet(
"-cogroup-merge",
builder,
CogroupedKStreamImpl.MERGE_NAME);
final KTableProcessorSupplier<K, VOut, VOut> passThrough = new KTablePassThrough<>(parentProcessors, storeName);
final KTableChangeProcessorSupplier<K, VOut, VOut, K, VOut> passThrough = new KTablePassThrough<>(parentProcessors, storeName);
final ProcessorParameters<K, VOut, ?, ?> processorParameters = new ProcessorParameters(passThrough, mergeProcessorName);
final ProcessorGraphNode<K, VOut> mergeNode =
new ProcessorGraphNode<>(mergeProcessorName, processorParameters);

builder.addGraphNode(processors, mergeNode);

return new KTableImpl<KR, VIn, VOut>(
return new KTableImpl<KOut, VIn, VOut>(
mergeProcessorName,
keySerde,
valueSerde,
Expand All @@ -258,7 +255,7 @@ <KR, VIn> KTable<KR, VOut> createTable(final Collection<GraphNode> processors,
private StatefulProcessorNode<K, ?> getStatefulProcessorNode(final String processorName,
final boolean stateCreated,
final StoreBuilder<?> storeBuilder,
final ProcessorSupplier<K, ?> kStreamAggregate) {
final ProcessorSupplier<K, ?, ?, ?> kStreamAggregate) {
final StatefulProcessorNode<K, ?> statefulProcessorNode;
if (!stateCreated) {
statefulProcessorNode =
Expand Down
Loading