Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
3 changes: 1 addition & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,6 @@ def determineCommitId() {
}


def excludedSpotlessModules = [':streams']


apply from: file('wrapper.gradle')
Expand Down Expand Up @@ -758,7 +757,7 @@ subprojects {
// the task `removeUnusedImports` is implemented by google-java-format,
// and unfortunately the google-java-format version used by spotless 6.14.0 can't work with JDK 21.
// Hence, we apply spotless tasks only if the env is either JDK11 or JDK17
if ((JavaVersion.current().isJava11() || (JavaVersion.current() == JavaVersion.VERSION_17)) && project.path !in excludedSpotlessModules) {
if ((JavaVersion.current().isJava11() || (JavaVersion.current() == JavaVersion.VERSION_17))) {
apply plugin: 'com.diffplug.spotless'
spotless {
java {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.kafka.streams.state.internals.GlobalStateStoreProvider;
import org.apache.kafka.streams.state.internals.QueryableStoreProvider;
import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;

import org.slf4j.Logger;

import java.time.Duration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
*/
package org.apache.kafka.streams;

import java.util.Arrays;
import java.util.HashSet;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
Expand Down Expand Up @@ -52,13 +50,16 @@
import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
import org.apache.kafka.streams.processor.internals.assignment.RackAwareTaskAssignor;
import org.apache.kafka.streams.state.BuiltInDslStoreSuppliers;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,5 +140,3 @@ Sensor addSensor(final String name,
*/
void removeSensor(final Sensor sensor);
}


21 changes: 11 additions & 10 deletions streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.TimestampExtractor;

import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
import org.apache.kafka.streams.state.DslStoreSuppliers;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Optional;
import java.util.Properties;
import java.util.function.Supplier;
Expand All @@ -40,23 +41,23 @@
import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_DOC;
import static org.apache.kafka.streams.StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.CACHE_MAX_BYTES_BUFFERING_DOC;
import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_DEFAULT;
import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_DOC;
import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_DOC;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_DOC;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC;
import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_DEFAULT;
import static org.apache.kafka.streams.StreamsConfig.DSL_STORE_SUPPLIERS_CLASS_DOC;
import static org.apache.kafka.streams.StreamsConfig.IN_MEMORY;
import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_DOC;
import static org.apache.kafka.streams.StreamsConfig.ROCKS_DB;
import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.STATESTORE_CACHE_MAX_BYTES_DOC;
import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_DOC;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_CONFIG;
import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_DOC;
import static org.apache.kafka.streams.StreamsConfig.ROCKS_DB;
import static org.apache.kafka.streams.StreamsConfig.IN_MEMORY;
import static org.apache.kafka.streams.internals.StreamsConfigUtils.getTotalCacheSize;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,4 +188,3 @@ interface Sink extends Node {
Set<GlobalStore> globalStores();

}

Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
*/
package org.apache.kafka.streams.errors;

import java.util.Map;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Map;

/**
* {@code ProductionExceptionHandler} that always instructs streams to fail when an exception
* happens while attempting to produce result records.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.ProcessorContext;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.streams.errors;

import org.apache.kafka.streams.processor.api.Record;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.ProcessorContext;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.streams.errors;

import org.apache.kafka.streams.processor.api.Record;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.streams.internals;

import org.apache.kafka.streams.StreamsConfig;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.metrics.Sensor.RecordingLevel;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.kafka.streams.kstream.internals.UnlimitedWindow;
import org.apache.kafka.streams.kstream.internals.emitstrategy.WindowCloseStrategy;
import org.apache.kafka.streams.kstream.internals.emitstrategy.WindowUpdateStrategy;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,5 +42,3 @@ public interface ForeachAction<K, V> {
*/
void apply(final K key, final V value);
}


Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.processor.ConnectedStoreProvider;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessor;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorContext;
import org.apache.kafka.streams.processor.api.FixedKeyProcessorSupplier;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TopicNameExtractor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import java.time.Duration;
import java.util.Objects;

import static java.time.Duration.ofMillis;
import static org.apache.kafka.streams.internals.ApiUtils.prepareMillisCheckFailMsgPrefix;
import static org.apache.kafka.streams.internals.ApiUtils.validateMillisecondDuration;
import static org.apache.kafka.streams.kstream.Windows.DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD;
import static org.apache.kafka.streams.kstream.Windows.NO_GRACE_PERIOD;
import static java.time.Duration.ofMillis;

/**
* A session based window specification used for aggregating events into sessions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.WindowStore;

import java.time.Duration;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.WindowStore;

import java.time.Duration;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@
*/
package org.apache.kafka.streams.kstream;

import java.time.Duration;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.processor.Punctuator;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.To;

import java.time.Duration;

/**
* The {@code Transformer} interface is for stateful mapping of an input record to zero, one, or multiple new output
* records (both key and value type can be altered arbitrarily).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.kstream;

import java.time.Duration;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.ProcessorContext;
Expand All @@ -25,6 +24,8 @@
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.To;

import java.time.Duration;

/**
* The {@code ValueTransformer} interface for stateful mapping of a value to a new value (with possible new type).
* This is a stateful record-by-record operation, i.e, {@link #transform(Object)} is invoked individually for each
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.kstream;

import java.time.Duration;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.ProcessorContext;
Expand All @@ -25,6 +24,8 @@
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.To;

import java.time.Duration;

/**
* The {@code ValueTransformerWithKey} interface for stateful mapping of a value to a new value (with possible new type).
* This is a stateful record-by-record operation, i.e, {@link #transform(Object, Object)} is invoked individually for each
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
*/
package org.apache.kafka.streams.kstream.internals;

import java.util.HashSet;
import java.util.Set;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.DslStoreSuppliers;

import java.util.HashSet;
import java.util.Set;

public abstract class AbstractConfigurableStoreFactory implements StoreFactory {
private final Set<String> connectedProcessorNames = new HashSet<>();
private DslStoreSuppliers dslStoreSuppliers;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,6 @@
*/
package org.apache.kafka.streams.kstream.internals;

import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION;
import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emitFinalLatencySensor;
import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emittedRecordsSensor;
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.Time;
Expand All @@ -40,8 +35,14 @@
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;

import org.slf4j.Logger;

import static org.apache.kafka.streams.StreamsConfig.InternalConfig.EMIT_INTERVAL_MS_KSTREAMS_WINDOWED_AGGREGATION;
import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emitFinalLatencySensor;
import static org.apache.kafka.streams.processor.internals.metrics.ProcessorNodeMetrics.emittedRecordsSensor;
import static org.apache.kafka.streams.processor.internals.metrics.TaskMetrics.droppedRecordsSensor;

public abstract class AbstractKStreamTimeWindowAggregateProcessor<KIn, VIn, VAgg> extends ContextualProcessor<KIn, VIn, Windowed<KIn>, Change<VAgg>> {

private final Time time = Time.SYSTEM;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;

import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;

public class CogroupedKStreamImpl<K, VOut> extends AbstractStream<K, VOut> implements CogroupedKStream<K, VOut> {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,6 @@
*/
package org.apache.kafka.streams.kstream.internals;

import static org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.optimizableRepartitionNodeBuilder;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.EmitStrategy;
Expand All @@ -34,14 +26,23 @@
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorGraphNode;
import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
import org.apache.kafka.streams.kstream.internals.graph.StatefulProcessorNode;
import org.apache.kafka.streams.kstream.internals.graph.GraphNode;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.StoreFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Map.Entry;

import static org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode.optimizableRepartitionNodeBuilder;

class CogroupedStreamAggregateBuilder<K, VOut> {
private final InternalStreamsBuilder builder;
private final Map<KGroupedStreamImpl<K, ?>, GraphNode> parentNodes = new LinkedHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.processor.TimestampExtractor;

public class ConsumedInternal<K, V> extends Consumed<K, V> {
Expand Down
Loading