Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@
import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.errors.InvalidStateStorePartitionException;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.processor.Processor;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One super frustrating thing in this PR was that you can't suppress deprecation warnings on imports, so I have had to use the fully qualified class name of the deprecated APIs everywhere.

Aside from a lot of noise in the PR (sorry about that), this means that we have some pretty long lines. It's not always easy to get these lines under the limit without making them even harder to read due to weird line breaks. I've followed my best judgement about when it's better to just keep a line long.

import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.ThreadMetadata;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.internals.ClientUtils;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,8 @@ public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
/**
* Adds a state store to the underlying {@link Topology}.
* <p>
* It is required to connect state stores to {@link org.apache.kafka.streams.processor.Processor Processors}, {@link Transformer Transformers},
* It is required to connect state stores to {@link org.apache.kafka.streams.processor.api.Processor Processors},
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switching to the new interface where possible.

* {@link Transformer Transformers},
* or {@link ValueTransformer ValueTransformers} before they can be used.
*
* @param builder the builder used to obtain this state store {@link StateStore} instance
Expand Down Expand Up @@ -515,7 +516,8 @@ public synchronized StreamsBuilder addStateStore(final StoreBuilder<?> builder)
* This {@link ProcessorNode} should be used to keep the {@link StateStore} up-to-date.
* The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
* <p>
* It is not required to connect a global store to {@link org.apache.kafka.streams.processor.Processor Processors}, {@link Transformer Transformers},
* It is not required to connect a global store to {@link org.apache.kafka.streams.processor.api.Processor Processors},
* {@link Transformer Transformers},
* or {@link ValueTransformer ValueTransformer}; those have read-only access to all global stores by default.
* <p>
* The supplier should always generate a new instance each time {@link ProcessorSupplier#get()} gets called. Creating
Expand Down
4 changes: 2 additions & 2 deletions streams/src/main/java/org/apache/kafka/streams/Topology.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.processor.ConnectedStoreProvider;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.ProcessorAdapter;
Expand Down Expand Up @@ -655,7 +655,7 @@ public synchronized <K, V> Topology addSink(final String name,
* will be added to the topology and connected to this processor automatically.
*
* @param name the unique name of the processor node
* @param supplier the supplier used to obtain this node's {@link Processor} instance
* @param supplier the supplier used to obtain this node's {@link org.apache.kafka.streams.processor.Processor} instance
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifying old PAPI by fully qualified name where necessary.

* @param parentNames the name of one or more source or processor nodes whose output records this processor should receive
* and process
* @return itself
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
* This is a stateless record-by-record operation, i.e, {@link #apply(Object, Object)} is invoked individually for each
* record of a stream.
* If stateful processing is required, consider using
* {@link KStream#process(org.apache.kafka.streams.processor.ProcessorSupplier, String...) KStream#process(...)}.
* {@link KStream#process(org.apache.kafka.streams.processor.api.ProcessorSupplier, String...) KStream#process(...)}.
*
* @param <K> key type
* @param <V> value type
Expand Down
262 changes: 241 additions & 21 deletions streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.StoreBuilder;

class CogroupedStreamAggregateBuilder<K, VOut> {
Expand Down Expand Up @@ -255,10 +254,11 @@ <KR, VIn> KTable<KR, VOut> createTable(final Collection<GraphNode> processors,
builder);
}

@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suppressing the deprecation warnings on our internal processors, and their tests, makes up the lion's share of this PR. I've marked them all with this comment, and filed https://issues.apache.org/jira/browse/KAFKA-12939 to make sure we really migrate everything.

private StatefulProcessorNode<K, ?> getStatefulProcessorNode(final String processorName,
final boolean stateCreated,
final StoreBuilder<?> storeBuilder,
final ProcessorSupplier<K, ?> kStreamAggregate) {
final org.apache.kafka.streams.processor.ProcessorSupplier<K, ?> kStreamAggregate) {
final StatefulProcessorNode<K, ?> statefulProcessorNode;
if (!stateCreated) {
statefulProcessorNode =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueStore;

import java.util.Collections;
Expand Down Expand Up @@ -68,7 +67,8 @@ public class KGroupedTableImpl<K, V> extends AbstractStream<K, V> implements KGr
this.userProvidedRepartitionTopicName = groupedInternal.name();
}

private <T> KTable<K, T> doAggregate(final ProcessorSupplier<K, Change<V>> aggregateSupplier,
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
private <T> KTable<K, T> doAggregate(final org.apache.kafka.streams.processor.ProcessorSupplier<K, Change<V>> aggregateSupplier,
final NamedInternal named,
final String functionName,
final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materialized) {
Expand Down Expand Up @@ -145,7 +145,8 @@ public KTable<K, V> reduce(final Reducer<V> adder,
if (materializedInternal.valueSerde() == null) {
materializedInternal.withValueSerde(valueSerde);
}
final ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
final org.apache.kafka.streams.processor.ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableReduce<>(
materializedInternal.storeName(),
adder,
subtractor);
Expand Down Expand Up @@ -176,7 +177,8 @@ public KTable<K, Long> count(final Named named, final Materialized<K, Long, KeyV
materializedInternal.withValueSerde(Serdes.Long());
}

final ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
final org.apache.kafka.streams.processor.ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(
materializedInternal.storeName(),
countInitializer,
countAdder,
Expand Down Expand Up @@ -221,7 +223,8 @@ public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
if (materializedInternal.keySerde() == null) {
materializedInternal.withKeySerde(keySerde);
}
final ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
final org.apache.kafka.streams.processor.ProcessorSupplier<K, Change<V>> aggregateSupplier = new KTableAggregate<>(
materializedInternal.storeName(),
initializer,
adder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@
*/
package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.streams.processor.ProcessorSupplier;

public interface KStreamAggProcessorSupplier<K, RK, V, T> extends ProcessorSupplier<K, V> {
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public interface KStreamAggProcessorSupplier<K, RK, V, T> extends org.apache.kafka.streams.processor.ProcessorSupplier<K, V> {

KTableValueGetterSupplier<RK, T> view();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
Expand All @@ -31,6 +28,7 @@
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;
import static org.apache.kafka.streams.state.ValueAndTimestamp.getValueOrNull;

@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K, K, V, T> {
private static final Logger LOG = LoggerFactory.getLogger(KStreamAggregate.class);
private final String storeName;
Expand All @@ -48,7 +46,7 @@ public class KStreamAggregate<K, V, T> implements KStreamAggProcessorSupplier<K,
}

@Override
public Processor<K, V> get() {
public org.apache.kafka.streams.processor.Processor<K, V> get() {
return new KStreamAggregateProcessor();
}

Expand All @@ -58,13 +56,13 @@ public void enableSendingOldValues() {
}


private class KStreamAggregateProcessor extends AbstractProcessor<K, V> {
private class KStreamAggregateProcessor extends org.apache.kafka.streams.processor.AbstractProcessor<K, V> {
private TimestampedKeyValueStore<K, T> store;
private Sensor droppedRecordsSensor;
private TimestampedTupleForwarder<K, T> tupleForwarder;

@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
super.init(context);
droppedRecordsSensor = droppedRecordsSensor(
Thread.currentThread().getName(),
Expand Down Expand Up @@ -130,7 +128,7 @@ private class KStreamAggregateValueGetter implements KTableValueGetter<K, T> {
private TimestampedKeyValueStore<K, T> store;

@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
store = context.getStateStore(storeName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,12 @@
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.TransformerSupplier;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.state.StoreBuilder;

import java.util.Set;

public class KStreamFlatTransform<KIn, VIn, KOut, VOut> implements ProcessorSupplier<KIn, VIn> {
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamFlatTransform<KIn, VIn, KOut, VOut> implements org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> {

private final TransformerSupplier<? super KIn, ? super VIn, Iterable<KeyValue<KOut, VOut>>> transformerSupplier;

Expand All @@ -36,7 +33,7 @@ public KStreamFlatTransform(final TransformerSupplier<? super KIn, ? super VIn,
}

@Override
public Processor<KIn, VIn> get() {
public org.apache.kafka.streams.processor.Processor<KIn, VIn> get() {
return new KStreamFlatTransformProcessor<>(transformerSupplier.get());
}

Expand All @@ -45,7 +42,7 @@ public Set<StoreBuilder<?>> stores() {
return transformerSupplier.stores();
}

public static class KStreamFlatTransformProcessor<KIn, VIn, KOut, VOut> extends AbstractProcessor<KIn, VIn> {
public static class KStreamFlatTransformProcessor<KIn, VIn, KOut, VOut> extends org.apache.kafka.streams.processor.AbstractProcessor<KIn, VIn> {

private final Transformer<? super KIn, ? super VIn, Iterable<KeyValue<KOut, VOut>>> transformer;

Expand All @@ -54,7 +51,7 @@ public KStreamFlatTransformProcessor(final Transformer<? super KIn, ? super VIn,
}

@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
super.init(context);
transformer.init(context);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,13 @@

import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.ForwardingDisabledProcessorContext;
import org.apache.kafka.streams.state.StoreBuilder;

import java.util.Set;

public class KStreamFlatTransformValues<KIn, VIn, VOut> implements ProcessorSupplier<KIn, VIn> {
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
public class KStreamFlatTransformValues<KIn, VIn, VOut> implements org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> {

private final ValueTransformerWithKeySupplier<KIn, VIn, Iterable<VOut>> valueTransformerSupplier;

Expand All @@ -36,7 +33,7 @@ public KStreamFlatTransformValues(final ValueTransformerWithKeySupplier<KIn, VIn
}

@Override
public Processor<KIn, VIn> get() {
public org.apache.kafka.streams.processor.Processor<KIn, VIn> get() {
return new KStreamFlatTransformValuesProcessor<>(valueTransformerSupplier.get());
}

Expand All @@ -45,7 +42,7 @@ public Set<StoreBuilder<?>> stores() {
return valueTransformerSupplier.stores();
}

public static class KStreamFlatTransformValuesProcessor<KIn, VIn, VOut> extends AbstractProcessor<KIn, VIn> {
public static class KStreamFlatTransformValuesProcessor<KIn, VIn, VOut> extends org.apache.kafka.streams.processor.AbstractProcessor<KIn, VIn> {

private final ValueTransformerWithKey<KIn, VIn, Iterable<VOut>> valueTransformer;

Expand All @@ -54,7 +51,7 @@ public static class KStreamFlatTransformValuesProcessor<KIn, VIn, VOut> extends
}

@Override
public void init(final ProcessorContext context) {
public void init(final org.apache.kafka.streams.processor.ProcessorContext context) {
super.init(context);
valueTransformer.init(new ForwardingDisabledProcessorContext(context));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@

import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorSupplier;

class KStreamGlobalKTableJoin<K1, K2, R, V1, V2> implements ProcessorSupplier<K1, V1> {
@SuppressWarnings("deprecation") // Old PAPI. Needs to be migrated.
class KStreamGlobalKTableJoin<K1, K2, R, V1, V2> implements org.apache.kafka.streams.processor.ProcessorSupplier<K1, V1> {

private final KTableValueGetterSupplier<K2, V2> valueGetterSupplier;
private final ValueJoinerWithKey<? super K1, ? super V1, ? super V2, ? extends R> joiner;
Expand All @@ -39,7 +38,7 @@ class KStreamGlobalKTableJoin<K1, K2, R, V1, V2> implements ProcessorSupplier<K1
}

@Override
public Processor<K1, V1> get() {
public org.apache.kafka.streams.processor.Processor<K1, V1> get() {
return new KStreamKTableJoinProcessor<>(valueGetterSupplier.get(), mapper, joiner, leftJoin);
}
}
Loading