diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 5067da6902480..bb0c40a83dc05 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -152,7 +152,7 @@ public class KafkaStreams implements AutoCloseable { private final Logger log; private final String clientId; private final Metrics metrics; - private final StreamsConfig config; + protected final StreamsConfig applicationConfigs; protected final List threads; protected final StateDirectory stateDirectory; private final StreamsMetadataState streamsMetadataState; @@ -795,12 +795,12 @@ public KafkaStreams(final Topology topology, * you still must {@link #close()} it to avoid resource leaks. * * @param topology the topology specifying the computational logic - * @param config configs for Kafka Streams + * @param applicationConfigs configs for Kafka Streams * @throws StreamsException if any fatal error occurs */ public KafkaStreams(final Topology topology, - final StreamsConfig config) { - this(topology, config, new DefaultKafkaClientSupplier()); + final StreamsConfig applicationConfigs) { + this(topology, applicationConfigs, new DefaultKafkaClientSupplier()); } /** @@ -810,15 +810,15 @@ public KafkaStreams(final Topology topology, * you still must {@link #close()} it to avoid resource leaks. * * @param topology the topology specifying the computational logic - * @param config configs for Kafka Streams + * @param applicationConfigs configs for Kafka Streams * @param clientSupplier the Kafka clients supplier which provides underlying producer and consumer clients * for the new {@code KafkaStreams} instance * @throws StreamsException if any fatal error occurs */ public KafkaStreams(final Topology topology, - final StreamsConfig config, + final StreamsConfig applicationConfigs, final KafkaClientSupplier clientSupplier) { - this(new TopologyMetadata(topology.internalTopologyBuilder, config), config, clientSupplier); + this(new TopologyMetadata(topology.internalTopologyBuilder, applicationConfigs), applicationConfigs, clientSupplier); } /** @@ -828,34 +828,34 @@ public KafkaStreams(final Topology topology, * you still must {@link #close()} it to avoid resource leaks. * * @param topology the topology specifying the computational logic - * @param config configs for Kafka Streams + * @param applicationConfigs configs for Kafka Streams * @param time {@code Time} implementation; cannot be null * @throws StreamsException if any fatal error occurs */ public KafkaStreams(final Topology topology, - final StreamsConfig config, + final StreamsConfig applicationConfigs, final Time time) { - this(new TopologyMetadata(topology.internalTopologyBuilder, config), config, new DefaultKafkaClientSupplier(), time); + this(new TopologyMetadata(topology.internalTopologyBuilder, applicationConfigs), applicationConfigs, new DefaultKafkaClientSupplier(), time); } private KafkaStreams(final Topology topology, - final StreamsConfig config, + final StreamsConfig applicationConfigs, final KafkaClientSupplier clientSupplier, final Time time) throws StreamsException { - this(new TopologyMetadata(topology.internalTopologyBuilder, config), config, clientSupplier, time); + this(new TopologyMetadata(topology.internalTopologyBuilder, applicationConfigs), applicationConfigs, clientSupplier, time); } protected KafkaStreams(final TopologyMetadata topologyMetadata, - final StreamsConfig config, + final StreamsConfig applicationConfigs, final KafkaClientSupplier clientSupplier) throws StreamsException { - this(topologyMetadata, config, clientSupplier, Time.SYSTEM); + this(topologyMetadata, applicationConfigs, clientSupplier, Time.SYSTEM); } private KafkaStreams(final TopologyMetadata topologyMetadata, - final StreamsConfig config, + final StreamsConfig applicationConfigs, final KafkaClientSupplier clientSupplier, final Time time) throws StreamsException { - this.config = config; + this.applicationConfigs = applicationConfigs; this.time = time; this.topologyMetadata = topologyMetadata; @@ -864,15 +864,15 @@ private KafkaStreams(final TopologyMetadata topologyMetadata, final boolean hasGlobalTopology = topologyMetadata.hasGlobalTopology(); try { - stateDirectory = new StateDirectory(config, time, topologyMetadata.hasPersistentStores(), topologyMetadata.hasNamedTopologies()); + stateDirectory = new StateDirectory(applicationConfigs, time, topologyMetadata.hasPersistentStores(), topologyMetadata.hasNamedTopologies()); processId = stateDirectory.initializeProcessId(); } catch (final ProcessorStateException fatal) { throw new StreamsException(fatal); } // The application ID is a required config and hence should always have value - final String userClientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG); - final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG); + final String userClientId = applicationConfigs.getString(StreamsConfig.CLIENT_ID_CONFIG); + final String applicationId = applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG); if (userClientId.length() <= 0) { clientId = applicationId + "-" + processId; } else { @@ -883,22 +883,22 @@ private KafkaStreams(final TopologyMetadata topologyMetadata, // use client id instead of thread client id since this admin client may be shared among threads this.clientSupplier = clientSupplier; - adminClient = clientSupplier.getAdmin(config.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId))); + adminClient = clientSupplier.getAdmin(applicationConfigs.getAdminConfigs(ClientUtils.getSharedAdminClientId(clientId))); log.info("Kafka Streams version: {}", ClientMetrics.version()); log.info("Kafka Streams commit ID: {}", ClientMetrics.commitId()); - metrics = getMetrics(config, time, clientId); + metrics = getMetrics(applicationConfigs, time, clientId); streamsMetrics = new StreamsMetricsImpl( metrics, clientId, - config.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG), + applicationConfigs.getString(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG), time ); ClientMetrics.addVersionMetric(streamsMetrics); ClientMetrics.addCommitIdMetric(streamsMetrics); - ClientMetrics.addApplicationIdMetric(streamsMetrics, config.getString(StreamsConfig.APPLICATION_ID_CONFIG)); + ClientMetrics.addApplicationIdMetric(streamsMetrics, applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG)); ClientMetrics.addTopologyDescriptionMetric(streamsMetrics, (metricsConfig, now) -> this.topologyMetadata.topologyDescriptionString()); ClientMetrics.addStateMetric(streamsMetrics, (metricsConfig, now) -> state); threads = Collections.synchronizedList(new LinkedList<>()); @@ -906,14 +906,14 @@ private KafkaStreams(final TopologyMetadata topologyMetadata, streamsMetadataState = new StreamsMetadataState( this.topologyMetadata, - parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG))); + parseHostInfo(applicationConfigs.getString(StreamsConfig.APPLICATION_SERVER_CONFIG))); oldHandler = false; streamsUncaughtExceptionHandler = this::defaultStreamsUncaughtExceptionHandler; delegatingStateRestoreListener = new DelegatingStateRestoreListener(); - totalCacheSize = config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG); - final int numStreamThreads = topologyMetadata.getNumStreamThreads(config); + totalCacheSize = applicationConfigs.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG); + final int numStreamThreads = topologyMetadata.getNumStreamThreads(applicationConfigs); final long cacheSizePerThread = getCacheSizePerThread(numStreamThreads); GlobalStreamThread.State globalThreadState = null; @@ -921,8 +921,8 @@ private KafkaStreams(final TopologyMetadata topologyMetadata, final String globalThreadId = clientId + "-GlobalStreamThread"; globalStreamThread = new GlobalStreamThread( topologyMetadata.globalTaskTopology(), - config, - clientSupplier.getGlobalConsumer(config.getGlobalConsumerConfigs(clientId)), + applicationConfigs, + clientSupplier.getGlobalConsumer(applicationConfigs.getGlobalConsumerConfigs(clientId)), stateDirectory, cacheSizePerThread, streamsMetrics, @@ -949,13 +949,13 @@ private KafkaStreams(final TopologyMetadata topologyMetadata, } stateDirCleaner = setupStateDirCleaner(); - rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, config); + rocksDBMetricsRecordingService = maybeCreateRocksDBMetricsRecordingService(clientId, applicationConfigs); } private StreamThread createAndAddStreamThread(final long cacheSizePerThread, final int threadIdx) { final StreamThread streamThread = StreamThread.create( topologyMetadata, - config, + applicationConfigs, clientSupplier, adminClient, processId, @@ -964,6 +964,7 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin time, streamsMetadataState, cacheSizePerThread, + stateDirectory, delegatingStateRestoreListener, threadIdx, @@ -1117,7 +1118,7 @@ private Optional removeStreamThread(final long timeoutMs) throws Timeout final Collection membersToRemove = Collections.singletonList(memberToRemove); final RemoveMembersFromConsumerGroupResult removeMembersFromConsumerGroupResult = adminClient.removeMembersFromConsumerGroup( - config.getString(StreamsConfig.APPLICATION_ID_CONFIG), + applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), new RemoveMembersFromConsumerGroupOptions(membersToRemove) ); try { @@ -1125,16 +1126,16 @@ private Optional removeStreamThread(final long timeoutMs) throws Timeout removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(remainingTimeMs, TimeUnit.MILLISECONDS); } catch (final java.util.concurrent.TimeoutException e) { log.error("Could not remove static member {} from consumer group {} due to a timeout: {}", - groupInstanceID.get(), config.getString(StreamsConfig.APPLICATION_ID_CONFIG), e); + groupInstanceID.get(), applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), e); throw new TimeoutException(e.getMessage(), e); } catch (final InterruptedException e) { Thread.currentThread().interrupt(); } catch (final ExecutionException e) { log.error("Could not remove static member {} from consumer group {} due to: {}", - groupInstanceID.get(), config.getString(StreamsConfig.APPLICATION_ID_CONFIG), e); + groupInstanceID.get(), applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), e); throw new StreamsException( "Could not remove static member " + groupInstanceID.get() - + " from consumer group " + config.getString(StreamsConfig.APPLICATION_ID_CONFIG) + + " from consumer group " + applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG) + " for the following reason: ", e.getCause() ); @@ -1286,7 +1287,7 @@ public synchronized void start() throws IllegalStateException, StreamsException processStreamThread(StreamThread::start); - final Long cleanupDelay = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG); + final Long cleanupDelay = applicationConfigs.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG); stateDirCleaner.scheduleAtFixedRate(() -> { // we do not use lock here since we only read on the value and act on it if (state == State.RUNNING) { diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java index f10dc9356f783..5f5d0b7853aac 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java @@ -38,6 +38,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorAdapter; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.SourceNode; +import org.apache.kafka.streams.processor.internals.namedtopology.TopologyConfig; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; @@ -75,12 +76,18 @@ public class StreamsBuilder { protected final InternalStreamsBuilder internalStreamsBuilder; public StreamsBuilder() { - topology = getNewTopology(); + topology = new Topology(); internalTopologyBuilder = topology.internalTopologyBuilder; internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder); } - protected Topology getNewTopology() { + protected StreamsBuilder(final TopologyConfig topologyConfigs) { + topology = getNewTopology(topologyConfigs); + internalTopologyBuilder = topology.internalTopologyBuilder; + internalStreamsBuilder = new InternalStreamsBuilder(internalTopologyBuilder); + } + + protected Topology getNewTopology(final TopologyConfig topologyConfigs) { return new Topology(); } @@ -610,7 +617,11 @@ public synchronized Topology build() { * @return the {@link Topology} that represents the specified processing logic */ public synchronized Topology build(final Properties props) { - internalStreamsBuilder.buildAndOptimizeTopology(props); + final boolean optimizeTopology = + props != null && + StreamsConfig.OPTIMIZE.equals(props.getProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG)); + + internalStreamsBuilder.buildAndOptimizeTopology(optimizeTopology); return topology; } } diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 098539a46eb6d..2ebd5543ad0bc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -349,7 +349,7 @@ public class StreamsConfig extends AbstractConfig { /** {@code buffered.records.per.partition} */ @SuppressWarnings("WeakerAccess") public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition"; - private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "Maximum number of records to buffer per partition."; + public static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "Maximum number of records to buffer per partition."; /** {@code built.in.metrics.version} */ public static final String BUILT_IN_METRICS_VERSION_CONFIG = "built.in.metrics.version"; @@ -358,7 +358,7 @@ public class StreamsConfig extends AbstractConfig { /** {@code cache.max.bytes.buffering} */ @SuppressWarnings("WeakerAccess") public static final String CACHE_MAX_BYTES_BUFFERING_CONFIG = "cache.max.bytes.buffering"; - private static final String CACHE_MAX_BYTES_BUFFERING_DOC = "Maximum number of memory bytes to be used for buffering across all threads"; + public static final String CACHE_MAX_BYTES_BUFFERING_DOC = "Maximum number of memory bytes to be used for buffering across all threads"; /** {@code client.id} */ @SuppressWarnings("WeakerAccess") @@ -380,7 +380,7 @@ public class StreamsConfig extends AbstractConfig { /** {@code default.deserialization.exception.handler} */ @SuppressWarnings("WeakerAccess") public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG = "default.deserialization.exception.handler"; - private static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the org.apache.kafka.streams.errors.DeserializationExceptionHandler interface."; + public static final String DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the org.apache.kafka.streams.errors.DeserializationExceptionHandler interface."; /** {@code default.production.exception.handler} */ @SuppressWarnings("WeakerAccess") @@ -423,11 +423,11 @@ public class StreamsConfig extends AbstractConfig { /** {@code default.timestamp.extractor} */ @SuppressWarnings("WeakerAccess") public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "default.timestamp.extractor"; - private static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC = "Default timestamp extractor class that implements the org.apache.kafka.streams.processor.TimestampExtractor interface."; + public static final String DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC = "Default timestamp extractor class that implements the org.apache.kafka.streams.processor.TimestampExtractor interface."; /** {@code max.task.idle.ms} */ public static final String MAX_TASK_IDLE_MS_CONFIG = "max.task.idle.ms"; - private static final String MAX_TASK_IDLE_MS_DOC = "This config controls whether joins and merges" + public static final String MAX_TASK_IDLE_MS_DOC = "This config controls whether joins and merges" + " may produce out-of-order results." + " The config value is the maximum amount of time in milliseconds a stream task will stay idle" + " when it is fully caught up on some (but not all) input partitions" diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java b/streams/src/main/java/org/apache/kafka/streams/Topology.java index 7c45de7f47c9a..0eb5e2b3861b4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/Topology.java +++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java @@ -54,7 +54,15 @@ */ public class Topology { - protected final InternalTopologyBuilder internalTopologyBuilder = new InternalTopologyBuilder(); + protected final InternalTopologyBuilder internalTopologyBuilder; + + public Topology() { + this(new InternalTopologyBuilder()); + } + + protected Topology(final InternalTopologyBuilder internalTopologyBuilder) { + this.internalTopologyBuilder = internalTopologyBuilder; + } /** * Sets the {@code auto.offset.reset} configuration when diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index 7f75f72a6584d..f5046b352f02c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -19,7 +19,6 @@ import java.util.TreeMap; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.kstream.GlobalKTable; @@ -51,7 +50,6 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.PriorityQueue; -import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; @@ -270,13 +268,17 @@ private void maybeAddNodeForOptimizationMetadata(final GraphNode node) { // use this method for testing only public void buildAndOptimizeTopology() { - buildAndOptimizeTopology(null); + buildAndOptimizeTopology(false); } - public void buildAndOptimizeTopology(final Properties props) { + public void buildAndOptimizeTopology(final boolean optimizeTopology) { mergeDuplicateSourceNodes(); - maybePerformOptimizations(props); + if (optimizeTopology) { + LOG.debug("Optimizing the Kafka Streams graph for repartition nodes"); + optimizeKTableSourceTopics(); + maybeOptimizeRepartitionOperations(); + } final PriorityQueue graphNodePriorityQueue = new PriorityQueue<>(5, Comparator.comparing(GraphNode::buildPriority)); @@ -346,15 +348,6 @@ private void mergeDuplicateSourceNodes() { } } - private void maybePerformOptimizations(final Properties props) { - - if (props != null && StreamsConfig.OPTIMIZE.equals(props.getProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG))) { - LOG.debug("Optimizing the Kafka Streams graph for repartition nodes"); - optimizeKTableSourceTopics(); - maybeOptimizeRepartitionOperations(); - } - } - private void optimizeKTableSourceTopics() { LOG.debug("Marking KTable source nodes to optimize using source topic for changelogs "); tableSourceNodes.forEach(node -> ((TableSourceNode) node).reuseSourceTopicForChangeLog(true)); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java index 75ec24c686aad..3f6c7f9b821ec 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java @@ -51,7 +51,7 @@ class ActiveTaskCreator { private final TopologyMetadata topologyMetadata; - private final StreamsConfig config; + private final StreamsConfig applicationConfig; private final StreamsMetricsImpl streamsMetrics; private final StateDirectory stateDirectory; private final ChangelogReader storeChangelogReader; @@ -71,7 +71,7 @@ class ActiveTaskCreator { private final Map> unknownTasksToBeCreated = new HashMap<>(); ActiveTaskCreator(final TopologyMetadata topologyMetadata, - final StreamsConfig config, + final StreamsConfig applicationConfig, final StreamsMetricsImpl streamsMetrics, final StateDirectory stateDirectory, final ChangelogReader storeChangelogReader, @@ -82,7 +82,7 @@ class ActiveTaskCreator { final UUID processId, final Logger log) { this.topologyMetadata = topologyMetadata; - this.config = config; + this.applicationConfig = applicationConfig; this.streamsMetrics = streamsMetrics; this.stateDirectory = stateDirectory; this.storeChangelogReader = storeChangelogReader; @@ -93,7 +93,7 @@ class ActiveTaskCreator { this.log = log; createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics); - processingMode = StreamThread.processingMode(config); + processingMode = StreamThread.processingMode(applicationConfig); if (processingMode == EXACTLY_ONCE_ALPHA) { threadProducer = null; @@ -105,7 +105,7 @@ class ActiveTaskCreator { final LogContext logContext = new LogContext(threadIdPrefix); threadProducer = new StreamsProducer( - config, + applicationConfig, threadId, clientSupplier, null, @@ -170,7 +170,7 @@ Collection createTasks(final Consumer consumer, final ProcessorStateManager stateManager = new ProcessorStateManager( taskId, Task.TaskType.ACTIVE, - StreamThread.eosEnabled(config), + StreamThread.eosEnabled(applicationConfig), logContext, stateDirectory, storeChangelogReader, @@ -180,7 +180,7 @@ Collection createTasks(final Consumer consumer, final InternalProcessorContext context = new ProcessorContextImpl( taskId, - config, + applicationConfig, stateManager, streamsMetrics, cache @@ -239,7 +239,7 @@ private StreamTask createActiveTask(final TaskId taskId, if (processingMode == StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA) { log.info("Creating producer client for task {}", taskId); streamsProducer = new StreamsProducer( - config, + applicationConfig, threadId, clientSupplier, taskId, @@ -255,7 +255,7 @@ private StreamTask createActiveTask(final TaskId taskId, logContext, taskId, streamsProducer, - config.defaultProductionExceptionHandler(), + applicationConfig.defaultProductionExceptionHandler(), streamsMetrics ); @@ -264,7 +264,7 @@ private StreamTask createActiveTask(final TaskId taskId, inputPartitions, topology, consumer, - config, + topologyMetadata.getTaskConfigFor(taskId), streamsMetrics, stateDirectory, cache, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index dd07c107e98a0..99d3bdf59b0c0 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -31,6 +31,7 @@ import org.apache.kafka.streams.processor.api.ProcessorSupplier; import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology; +import org.apache.kafka.streams.processor.internals.namedtopology.TopologyConfig; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.internals.SessionStoreBuilder; import org.apache.kafka.streams.state.internals.TimestampedWindowStoreBuilder; @@ -64,6 +65,15 @@ public class InternalTopologyBuilder { + public InternalTopologyBuilder() { + this.topologyName = null; + } + + public InternalTopologyBuilder(final TopologyConfig topologyConfigs) { + this.topologyConfigs = topologyConfigs; + this.topologyName = topologyConfigs.topologyName; + } + private static final Logger log = LoggerFactory.getLogger(InternalTopologyBuilder.class); private static final String[] NO_PREDECESSORS = {}; @@ -133,12 +143,15 @@ public class InternalTopologyBuilder { private Map> nodeGroups = null; - private StreamsConfig config = null; - - // The name of the topology this builder belongs to, or null if none - private String topologyName; + // The name of the topology this builder belongs to, or null if this is not a NamedTopology + private final String topologyName; + // TODO KAFKA-13336: we can remove this referance once we make the Topology/NamedTopology class into an interface and implement it private NamedTopology namedTopology; + // TODO KAFKA-13283: once we enforce all configs be passed in when constructing the topology builder then we can set + // this up front and make it final, but for now we have to wait for the global app configs passed in to rewriteTopology + private TopologyConfig topologyConfigs; // the configs for this topology, including overrides and global defaults + private boolean hasPersistentStores = false; public static class StateStoreFactory { @@ -338,19 +351,6 @@ Sink describe() { } } - - public void setNamedTopology(final NamedTopology topology) { - final String topologyName = topology.name(); - Objects.requireNonNull(topologyName, "topology name can't be null"); - Objects.requireNonNull(topology, "named topology can't be null"); - if (this.topologyName != null) { - log.error("Tried to reset the topologyName to {} but it was already set to {}", topologyName, this.topologyName); - throw new IllegalStateException("The topologyName has already been set to " + this.topologyName); - } - this.namedTopology = topology; - this.topologyName = topologyName; - } - // public for testing only public final InternalTopologyBuilder setApplicationId(final String applicationId) { Objects.requireNonNull(applicationId, "applicationId can't be null"); @@ -359,15 +359,17 @@ public final InternalTopologyBuilder setApplicationId(final String applicationId return this; } - public synchronized final InternalTopologyBuilder setStreamsConfig(final StreamsConfig config) { - Objects.requireNonNull(config, "config can't be null"); - this.config = config; + public synchronized final void setStreamsConfig(final StreamsConfig applicationConfig) { + Objects.requireNonNull(applicationConfig, "config can't be null"); + topologyConfigs = new TopologyConfig(applicationConfig); + } - return this; + public synchronized final void setNamedTopology(final NamedTopology namedTopology) { + this.namedTopology = namedTopology; } - public synchronized final StreamsConfig getStreamsConfig() { - return config; + public synchronized TopologyConfig topologyConfigs() { + return topologyConfigs; } public String topologyName() { @@ -381,11 +383,11 @@ public NamedTopology namedTopology() { public synchronized final InternalTopologyBuilder rewriteTopology(final StreamsConfig config) { Objects.requireNonNull(config, "config can't be null"); - // set application id setApplicationId(config.getString(StreamsConfig.APPLICATION_ID_CONFIG)); + setStreamsConfig(config); // maybe strip out caching layers - if (config.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG) == 0L) { + if (topologyConfigs.cacheSize == 0L) { for (final StateStoreFactory storeFactory : stateFactories.values()) { storeFactory.builder.withCachingDisabled(); } @@ -400,9 +402,6 @@ public synchronized final InternalTopologyBuilder rewriteTopology(final StreamsC globalStateStores.put(storeBuilder.name(), storeBuilder.build()); } - // set streams config - setStreamsConfig(config); - return this; } @@ -1987,7 +1986,7 @@ public String toString() { if (namedTopology == null) { sb.append("Topologies:\n "); } else { - sb.append("Topology - ").append(namedTopology).append(":\n "); + sb.append("Topology: ").append(namedTopology).append(":\n "); } final TopologyDescription.Subtopology[] sortedSubtopologies = subtopologies.descendingSet().toArray(new TopologyDescription.Subtopology[0]); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 802bca1624827..6b3d6794e18d1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -27,6 +27,7 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics; +import org.apache.kafka.streams.processor.internals.namedtopology.TopologyConfig.TaskConfig; import org.apache.kafka.streams.state.internals.ThreadCache; import java.util.Collections; @@ -55,7 +56,7 @@ public class StandbyTask extends AbstractTask implements Task { StandbyTask(final TaskId id, final Set inputPartitions, final ProcessorTopology topology, - final StreamsConfig config, + final TaskConfig config, final StreamsMetricsImpl streamsMetrics, final ProcessorStateManager stateMgr, final StateDirectory stateDirectory, @@ -67,7 +68,7 @@ public class StandbyTask extends AbstractTask implements Task { stateDirectory, stateMgr, inputPartitions, - config.getLong(StreamsConfig.TASK_TIMEOUT_MS_CONFIG), + config.taskTimeoutMs, "standby-task", StandbyTask.class ); @@ -76,7 +77,7 @@ public class StandbyTask extends AbstractTask implements Task { processorContext.transitionToStandby(cache); closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), streamsMetrics); - eosEnabled = StreamThread.eosEnabled(config); + this.eosEnabled = config.eosEnabled; } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java index ee2a3e1938060..a2854d4f1b5bb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java @@ -38,7 +38,7 @@ class StandbyTaskCreator { private final TopologyMetadata topologyMetadata; - private final StreamsConfig config; + private final StreamsConfig applicationConfig; private final StreamsMetricsImpl streamsMetrics; private final StateDirectory stateDirectory; private final ChangelogReader storeChangelogReader; @@ -50,14 +50,14 @@ class StandbyTaskCreator { private final Map> unknownTasksToBeCreated = new HashMap<>(); StandbyTaskCreator(final TopologyMetadata topologyMetadata, - final StreamsConfig config, + final StreamsConfig applicationConfig, final StreamsMetricsImpl streamsMetrics, final StateDirectory stateDirectory, final ChangelogReader storeChangelogReader, final String threadId, final Logger log) { this.topologyMetadata = topologyMetadata; - this.config = config; + this.applicationConfig = applicationConfig; this.streamsMetrics = streamsMetrics; this.stateDirectory = stateDirectory; this.storeChangelogReader = storeChangelogReader; @@ -101,7 +101,7 @@ Collection createTasks(final Map> tasksToBeCre final ProcessorStateManager stateManager = new ProcessorStateManager( taskId, Task.TaskType.STANDBY, - StreamThread.eosEnabled(config), + StreamThread.eosEnabled(applicationConfig), getLogContext(taskId), stateDirectory, storeChangelogReader, @@ -111,7 +111,7 @@ Collection createTasks(final Map> tasksToBeCre final InternalProcessorContext context = new ProcessorContextImpl( taskId, - config, + applicationConfig, stateManager, streamsMetrics, dummyCache @@ -160,7 +160,7 @@ StandbyTask createStandbyTask(final TaskId taskId, taskId, inputPartitions, topology, - config, + topologyMetadata.getTaskConfigFor(taskId), streamsMetrics, stateManager, stateDirectory, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 7c858696975fe..32369c983f19b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -27,7 +27,6 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; -import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.DeserializationExceptionHandler; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.StreamsException; @@ -44,6 +43,7 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics; +import org.apache.kafka.streams.processor.internals.namedtopology.TopologyConfig.TaskConfig; import org.apache.kafka.streams.state.internals.ThreadCache; import java.io.IOException; @@ -117,7 +117,7 @@ public StreamTask(final TaskId id, final Set inputPartitions, final ProcessorTopology topology, final Consumer mainConsumer, - final StreamsConfig config, + final TaskConfig config, final StreamsMetricsImpl streamsMetrics, final StateDirectory stateDirectory, final ThreadCache cache, @@ -132,7 +132,7 @@ public StreamTask(final TaskId id, stateDirectory, stateMgr, inputPartitions, - config.getLong(StreamsConfig.TASK_TIMEOUT_MS_CONFIG), + config.taskTimeoutMs, "task", StreamTask.class ); @@ -143,7 +143,7 @@ public StreamTask(final TaskId id, this.time = time; this.recordCollector = recordCollector; - eosEnabled = StreamThread.eosEnabled(config); + this.eosEnabled = config.eosEnabled; final String threadId = Thread.currentThread().getName(); this.streamsMetrics = streamsMetrics; @@ -171,19 +171,19 @@ public StreamTask(final TaskId id, streamTimePunctuationQueue = new PunctuationQueue(); systemTimePunctuationQueue = new PunctuationQueue(); - maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG); + maxBufferedSize = config.maxBufferedSize; // initialize the consumed and committed offset cache consumedOffsets = new HashMap<>(); resetOffsetsForPartitions = new HashSet<>(); - recordQueueCreator = new RecordQueueCreator(this.logContext, config.defaultTimestampExtractor(), config.defaultDeserializationExceptionHandler()); + recordQueueCreator = new RecordQueueCreator(this.logContext, config.timestampExtractor, config.deserializationExceptionHandler); recordInfo = new PartitionGroup.RecordInfo(); final Sensor enforcedProcessingSensor; enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics); - final long maxTaskIdleMs = config.getLong(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG); + final long maxTaskIdleMs = config.maxTaskIdleMs; partitionGroup = new PartitionGroup( logContext, createPartitionQueues(), diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 88814b0c93475..1390cdf2ed667 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -596,6 +596,14 @@ boolean runLoop() { // until the rebalance is completed before we close and commit the tasks while (isRunning() || taskManager.isRebalanceInProgress()) { try { + checkForTopologyUpdates(); + // If we received the shutdown signal while waiting for a topology to be added, we can + // stop polling regardless of the rebalance status since we know there are no tasks left + if (!isRunning() && topologyMetadata.isEmpty()) { + log.info("Shutting down thread with empty topology."); + break; + } + maybeSendShutdown(); final long size = cacheResizeSize.getAndSet(-1L); if (size != -1L) { @@ -914,8 +922,6 @@ private void checkForTopologyUpdates() { } private long pollPhase() { - checkForTopologyUpdates(); - final ConsumerRecords records; log.debug("Invoking poll on main Consumer"); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index c5ba0c183a2f7..d861ef16db8e2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -1250,7 +1250,7 @@ public void updateTaskEndMetadata(final TopicPartition topicPartition, final Lon /** * Handle any added or removed NamedTopologies. Check if any uncreated assigned tasks belong to a newly - * added NamedTopology and create them if so, then freeze any tasks whose named topology no longer exists + * added NamedTopology and create them if so, then close any tasks whose named topology no longer exists */ void handleTopologyUpdates() { tasks.maybeCreateTasksFromNewTopologies(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java index 129ca09ad5c74..7714d3bba1140 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo; import org.apache.kafka.streams.processor.internals.StreamThread.State; +import org.apache.kafka.streams.processor.internals.namedtopology.TopologyConfig.TaskConfig; import java.util.ArrayList; import java.util.Collection; @@ -161,6 +162,11 @@ public void unregisterTopology(final String topologyName) { } } + public TaskConfig getTaskConfigFor(final TaskId taskId) { + final InternalTopologyBuilder builder = lookupBuilderForTask(taskId); + return builder.topologyConfigs().getTaskConfig(); + } + public void buildAndRewriteTopology() { applyToEachBuilder(this::buildAndVerifyTopology); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java index d4170d499c595..661e8aa846d17 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java @@ -23,6 +23,7 @@ import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.processor.internals.TopologyMetadata; @@ -32,7 +33,6 @@ import java.util.Optional; import java.util.Properties; import java.util.concurrent.ConcurrentSkipListMap; -import java.util.stream.Collectors; /** * This is currently an internal and experimental feature for enabling certain kinds of topology upgrades. Use at @@ -49,67 +49,64 @@ public class KafkaStreamsNamedTopologyWrapper extends KafkaStreams { /** - * A Kafka Streams application with a single initial NamedTopology + * An empty Kafka Streams application that allows NamedTopologies to be added at a later point */ - public KafkaStreamsNamedTopologyWrapper(final NamedTopology topology, final Properties props) { - this(Collections.singleton(topology), new StreamsConfig(props), new DefaultKafkaClientSupplier()); + public KafkaStreamsNamedTopologyWrapper(final Properties props) { + this(new StreamsConfig(props), new DefaultKafkaClientSupplier()); } - /** - * A Kafka Streams application with a single initial NamedTopology - */ - public KafkaStreamsNamedTopologyWrapper(final NamedTopology topology, final Properties props, final KafkaClientSupplier clientSupplier) { - this(Collections.singleton(topology), new StreamsConfig(props), clientSupplier); + public KafkaStreamsNamedTopologyWrapper(final Properties props, final KafkaClientSupplier clientSupplier) { + this(new StreamsConfig(props), clientSupplier); + } + + private KafkaStreamsNamedTopologyWrapper(final StreamsConfig config, final KafkaClientSupplier clientSupplier) { + super(new TopologyMetadata(new ConcurrentSkipListMap<>(), config), config, clientSupplier); } /** - * An empty Kafka Streams application that allows NamedTopologies to be added at a later point + * Start up Streams with a single initial NamedTopology */ - public KafkaStreamsNamedTopologyWrapper(final Properties props) { - this(Collections.emptyList(), new StreamsConfig(props), new DefaultKafkaClientSupplier()); + public void start(final NamedTopology initialTopology) { + start(Collections.singleton(initialTopology)); } /** - * An empty Kafka Streams application that allows NamedTopologies to be added at a later point + * Start up Streams with a collection of initial NamedTopologies */ - public KafkaStreamsNamedTopologyWrapper(final Properties props, final KafkaClientSupplier clientSupplier) { - this(Collections.emptyList(), new StreamsConfig(props), clientSupplier); + public void start(final Collection initialTopologies) { + for (final NamedTopology topology : initialTopologies) { + addNamedTopology(topology); + } + super.start(); } /** - * A Kafka Streams application with a multiple initial NamedTopologies + * Provides a high-level DSL for specifying the processing logic of your application and building it into an + * independent topology that can be executed by this {@link KafkaStreams}. * - * @throws IllegalArgumentException if any of the named topologies have the same name - * @throws TopologyException if multiple NamedTopologies subscribe to the same input topics or pattern + * @param topologyName The name for this topology + * @param topologyOverrides The properties and any config overrides for this topology + * + * @throws IllegalArgumentException if the name contains the character sequence "__" */ - public KafkaStreamsNamedTopologyWrapper(final Collection topologies, final Properties props) { - this(topologies, new StreamsConfig(props), new DefaultKafkaClientSupplier()); + public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName, final Properties topologyOverrides) { + if (topologyName.contains(TaskId.NAMED_TOPOLOGY_DELIMITER)) { + throw new IllegalArgumentException("The character sequence '__' is not allowed in a NamedTopology, please select a new name"); + } + return new NamedTopologyBuilder(topologyName, applicationConfigs, topologyOverrides); } /** - * A Kafka Streams application with a multiple initial NamedTopologies + * Provides a high-level DSL for specifying the processing logic of your application and building it into an + * independent topology that can be executed by this {@link KafkaStreams}. This method will use the global + * application {@link StreamsConfig} passed in to the constructor for all topology-level configs. To override + * any of these for this specific Topology, use {@link #newNamedTopologyBuilder(String, Properties)}. + * @param topologyName The name for this topology * - * @throws IllegalArgumentException if any of the named topologies have the same name - * @throws TopologyException if multiple NamedTopologies subscribe to the same input topics or pattern + * @throws IllegalArgumentException if the name contains the character sequence "__" */ - public KafkaStreamsNamedTopologyWrapper(final Collection topologies, final Properties props, final KafkaClientSupplier clientSupplier) { - this(topologies, new StreamsConfig(props), clientSupplier); - } - - private KafkaStreamsNamedTopologyWrapper(final Collection topologies, final StreamsConfig config, final KafkaClientSupplier clientSupplier) { - super( - new TopologyMetadata( - topologies.stream().collect(Collectors.toMap( - NamedTopology::name, - NamedTopology::internalTopologyBuilder, - (v1, v2) -> { - throw new IllegalArgumentException("Topology names must be unique"); - }, - () -> new ConcurrentSkipListMap<>())), - config), - config, - clientSupplier - ); + public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) { + return newNamedTopologyBuilder(topologyName, new Properties()); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/NamedTopology.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/NamedTopology.java index 0cdfd5b61c5cc..a1debbe4b5620 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/NamedTopology.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/NamedTopology.java @@ -19,29 +19,24 @@ import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import java.util.List; public class NamedTopology extends Topology { - private final Logger log = LoggerFactory.getLogger(NamedTopology.class); - private String name; - - void setTopologyName(final String newTopologyName) { - if (name != null) { - log.error("Unable to set topologyName = {} because the name is already set to {}", newTopologyName, name); - throw new IllegalStateException("Tried to set topologyName but the name was already set"); - } - name = newTopologyName; - internalTopologyBuilder.setNamedTopology(this); + NamedTopology(final InternalTopologyBuilder internalTopologyBuilder) { + super(internalTopologyBuilder); } + /** + * @return the name of this topology + */ public String name() { - return name; + return internalTopologyBuilder.topologyName(); } + /** + * @return the list of all source topics this topology is subscribed to + */ public List sourceTopics() { return super.internalTopologyBuilder.fullSourceTopicNames(); } @@ -49,4 +44,8 @@ public List sourceTopics() { InternalTopologyBuilder internalTopologyBuilder() { return internalTopologyBuilder; } + + TopologyConfig topologyConfigs() { + return internalTopologyBuilder.topologyConfigs(); + } } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/NamedTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/NamedTopologyBuilder.java new file mode 100644 index 0000000000000..ddd9192d53e55 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/NamedTopologyBuilder.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.namedtopology; + +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; + +import java.util.Properties; + +public class NamedTopologyBuilder extends StreamsBuilder { + + NamedTopologyBuilder(final String topologyName, final StreamsConfig applicationConfigs, final Properties topologyOverrides) { + super(new TopologyConfig(topologyName, applicationConfigs, topologyOverrides)); + internalTopologyBuilder.setNamedTopology((NamedTopology) topology); + } + + @Override + public synchronized NamedTopology build() { + super.build(internalTopologyBuilder.topologyConfigs().topologyOverrides); + return (NamedTopology) topology; + } + + @Override + protected NamedTopology getNewTopology(final TopologyConfig topologyConfigs) { + return new NamedTopology(new InternalTopologyBuilder(topologyConfigs)); + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/NamedTopologyStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/NamedTopologyStreamsBuilder.java deleted file mode 100644 index 5d3fad83732fd..0000000000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/NamedTopologyStreamsBuilder.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.processor.internals.namedtopology; - -import org.apache.kafka.streams.StreamsBuilder; -import org.apache.kafka.streams.Topology; -import org.apache.kafka.streams.processor.TaskId; - -import java.util.Properties; - -public class NamedTopologyStreamsBuilder extends StreamsBuilder { - final String topologyName; - - /** - * @param topologyName any string representing your NamedTopology, all characters allowed except for "__" - * @throws IllegalArgumentException if the name contains the character sequence "__" - */ - public NamedTopologyStreamsBuilder(final String topologyName) { - super(); - this.topologyName = topologyName; - if (topologyName.contains(TaskId.NAMED_TOPOLOGY_DELIMITER)) { - throw new IllegalArgumentException("The character sequence '__' is not allowed in a NamedTopology, please select a new name"); - } - } - - public synchronized NamedTopology buildNamedTopology(final Properties props) { - super.build(props); - final NamedTopology namedTopology = (NamedTopology) super.topology; - namedTopology.setTopologyName(topologyName); - return namedTopology; - } - - @Override - public Topology getNewTopology() { - return new NamedTopology(); - } -} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java new file mode 100644 index 0000000000000..1fe2b852ab1c6 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TopologyConfig.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.namedtopology; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.DeserializationExceptionHandler; +import org.apache.kafka.streams.processor.TimestampExtractor; +import org.apache.kafka.streams.processor.internals.StreamThread; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Properties; +import java.util.function.Supplier; + +import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_DOC; +import static org.apache.kafka.streams.StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.CACHE_MAX_BYTES_BUFFERING_DOC; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC; +import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.MAX_TASK_IDLE_MS_DOC; +import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_CONFIG; +import static org.apache.kafka.streams.StreamsConfig.TASK_TIMEOUT_MS_DOC; + +/** + * Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the + * {@link org.apache.kafka.streams.KafkaStreams} or {@link KafkaStreamsNamedTopologyWrapper} constructors will + * determine the defaults, which can then be overridden for specific topologies by passing them in when creating the + * topology builders via the {@link org.apache.kafka.streams.StreamsBuilder()} method. + */ +public class TopologyConfig extends AbstractConfig { + private static final ConfigDef CONFIG; + static { + CONFIG = new ConfigDef() + .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG, + Type.INT, + null, + Importance.LOW, + BUFFERED_RECORDS_PER_PARTITION_DOC) + .define(CACHE_MAX_BYTES_BUFFERING_CONFIG, + Type.LONG, + null, + Importance.MEDIUM, + CACHE_MAX_BYTES_BUFFERING_DOC) + .define(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, + Type.CLASS, + null, + Importance.MEDIUM, + DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_DOC) + .define(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, + Type.CLASS, + null, + Importance.MEDIUM, + DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_DOC) + .define(MAX_TASK_IDLE_MS_CONFIG, + Type.LONG, + null, + Importance.MEDIUM, + MAX_TASK_IDLE_MS_DOC) + .define(TASK_TIMEOUT_MS_CONFIG, + Type.LONG, + null, + Importance.MEDIUM, + TASK_TIMEOUT_MS_DOC); + } + private final Logger log = LoggerFactory.getLogger(TopologyConfig.class); + + public final String topologyName; + public final boolean eosEnabled; + + public final StreamsConfig applicationConfigs; + public final Properties topologyOverrides; + + public final int maxBufferedSize; + public final long cacheSize; + public final long maxTaskIdleMs; + public final long taskTimeoutMs; + public final Supplier timestampExtractorSupplier; + public final Supplier deserializationExceptionHandlerSupplier; + + public TopologyConfig(final StreamsConfig globalAppConfigs) { + this(null, globalAppConfigs, new Properties()); + } + + public TopologyConfig(final String topologyName, final StreamsConfig globalAppConfigs, final Properties topologyOverrides) { + super(CONFIG, topologyOverrides, false); + + this.topologyName = topologyName; + this.eosEnabled = StreamThread.eosEnabled(globalAppConfigs); + + this.applicationConfigs = globalAppConfigs; + this.topologyOverrides = topologyOverrides; + + if (isTopologyOverride(BUFFERED_RECORDS_PER_PARTITION_CONFIG, topologyOverrides)) { + maxBufferedSize = getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG); + log.info("Topology {} is overriding {} to {}", topologyName, BUFFERED_RECORDS_PER_PARTITION_CONFIG, maxBufferedSize); + } else { + maxBufferedSize = globalAppConfigs.getInt(BUFFERED_RECORDS_PER_PARTITION_CONFIG); + } + + if (isTopologyOverride(CACHE_MAX_BYTES_BUFFERING_CONFIG, topologyOverrides)) { + cacheSize = getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG); + log.info("Topology {} is overriding {} to {}", topologyName, CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSize); + } else { + cacheSize = globalAppConfigs.getLong(CACHE_MAX_BYTES_BUFFERING_CONFIG); + } + + if (isTopologyOverride(MAX_TASK_IDLE_MS_CONFIG, topologyOverrides)) { + maxTaskIdleMs = getLong(MAX_TASK_IDLE_MS_CONFIG); + log.info("Topology {} is overridding {} to {}", topologyName, MAX_TASK_IDLE_MS_CONFIG, maxTaskIdleMs); + } else { + maxTaskIdleMs = globalAppConfigs.getLong(MAX_TASK_IDLE_MS_CONFIG); + } + + if (isTopologyOverride(TASK_TIMEOUT_MS_CONFIG, topologyOverrides)) { + taskTimeoutMs = getLong(TASK_TIMEOUT_MS_CONFIG); + log.info("Topology {} is overriding {} to {}", topologyName, TASK_TIMEOUT_MS_CONFIG, taskTimeoutMs); + } else { + taskTimeoutMs = globalAppConfigs.getLong(TASK_TIMEOUT_MS_CONFIG); + } + + if (isTopologyOverride(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, topologyOverrides)) { + timestampExtractorSupplier = () -> getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); + log.info("Topology {} is overriding {} to {}", topologyName, DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, getClass(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG)); + } else { + timestampExtractorSupplier = () -> globalAppConfigs.getConfiguredInstance(DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class); + } + + if (isTopologyOverride(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, topologyOverrides)) { + deserializationExceptionHandlerSupplier = () -> getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class); + log.info("Topology {} is overriding {} to {}", topologyName, DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, getClass(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG)); + } else { + deserializationExceptionHandlerSupplier = () -> globalAppConfigs.getConfiguredInstance(DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeserializationExceptionHandler.class); + } + } + + public boolean isNamedTopology() { + return topologyName != null; + } + + /** + * @return true if there is an override for this config in the properties of this NamedTopology. Applications that + * don't use named topologies will just refer to the global defaults regardless of the topology properties + */ + private boolean isTopologyOverride(final String config, final Properties topologyOverrides) { + // TODO KAFKA-13283: eventually we should always have the topology props override the global ones regardless + // of whether it's a named topology or not + return topologyName != null && topologyOverrides.containsKey(config); + } + + public TaskConfig getTaskConfig() { + return new TaskConfig( + maxTaskIdleMs, + taskTimeoutMs, + maxBufferedSize, + timestampExtractorSupplier.get(), + deserializationExceptionHandlerSupplier.get(), + eosEnabled + ); + } + + public static class TaskConfig { + public final long maxTaskIdleMs; + public final long taskTimeoutMs; + public final int maxBufferedSize; + public final TimestampExtractor timestampExtractor; + public final DeserializationExceptionHandler deserializationExceptionHandler; + public final boolean eosEnabled; + + private TaskConfig(final long maxTaskIdleMs, + final long taskTimeoutMs, + final int maxBufferedSize, + final TimestampExtractor timestampExtractor, + final DeserializationExceptionHandler deserializationExceptionHandler, + final boolean eosEnabled) { + this.maxTaskIdleMs = maxTaskIdleMs; + this.taskTimeoutMs = taskTimeoutMs; + this.maxBufferedSize = maxBufferedSize; + this.timestampExtractor = timestampExtractor; + this.deserializationExceptionHandler = deserializationExceptionHandler; + this.eosEnabled = eosEnabled; + } + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java index 2b01feec691a1..ff3f39b8b22e6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java @@ -35,8 +35,7 @@ import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper; -import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology; -import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyStreamsBuilder; +import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.utils.UniqueTopicSerdeScope; @@ -51,7 +50,6 @@ import org.junit.Test; import org.junit.rules.TestName; import java.time.Duration; -import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Properties; @@ -62,6 +60,7 @@ import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.streams.KeyValue.pair; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived; import static org.hamcrest.CoreMatchers.equalTo; @@ -73,6 +72,10 @@ public class NamedTopologyIntegrationTest { public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + private static final String TOPOLOGY_1 = "topology-1"; + private static final String TOPOLOGY_2 = "topology-2"; + private static final String TOPOLOGY_3 = "topology-3"; + // TODO KAFKA-12648: // 1) full test coverage for add/removeNamedTopology, covering: // - the "last topology removed" case @@ -90,7 +93,6 @@ public class NamedTopologyIntegrationTest { private final static String SUM_OUTPUT = "sum"; private final static String COUNT_OUTPUT = "count"; - // "delayed" input topics which are empty at start to allow control over when input data appears private final static String DELAYED_INPUT_STREAM_1 = "delayed-input-stream-1"; private final static String DELAYED_INPUT_STREAM_2 = "delayed-input-stream-2"; @@ -141,23 +143,23 @@ public static void closeCluster() { private final KafkaClientSupplier clientSupplier = new DefaultKafkaClientSupplier(); - // builders for the 1st Streams instance (default) - private final NamedTopologyStreamsBuilder topology1Builder = new NamedTopologyStreamsBuilder("topology-1"); - private final NamedTopologyStreamsBuilder topology2Builder = new NamedTopologyStreamsBuilder("topology-2"); - private final NamedTopologyStreamsBuilder topology3Builder = new NamedTopologyStreamsBuilder("topology-3"); - - // builders for the 2nd Streams instance - private final NamedTopologyStreamsBuilder topology1Builder2 = new NamedTopologyStreamsBuilder("topology-1"); - private final NamedTopologyStreamsBuilder topology2Builder2 = new NamedTopologyStreamsBuilder("topology-2"); - private final NamedTopologyStreamsBuilder topology3Builder2 = new NamedTopologyStreamsBuilder("topology-3"); - private Properties props; private Properties props2; private KafkaStreamsNamedTopologyWrapper streams; private KafkaStreamsNamedTopologyWrapper streams2; - private Properties configProps() { + // builders for the 1st Streams instance (default) + private NamedTopologyBuilder topology1Builder; + private NamedTopologyBuilder topology2Builder; + private NamedTopologyBuilder topology3Builder; + + // builders for the 2nd Streams instance + private NamedTopologyBuilder topology1Builder2; + private NamedTopologyBuilder topology2Builder2; + private NamedTopologyBuilder topology3Builder2; + + private Properties configProps(final String appId) { final Properties streamsConfiguration = new Properties(); streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, appId); streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); @@ -174,11 +176,16 @@ private Properties configProps() { @Before public void setup() throws Exception { appId = safeUniqueTestName(NamedTopologyIntegrationTest.class, testName); - changelog1 = appId + "-topology-1-store-changelog"; - changelog2 = appId + "-topology-2-store-changelog"; - changelog3 = appId + "-topology-3-store-changelog"; - props = configProps(); - props2 = configProps(); + changelog1 = appId + "-" + TOPOLOGY_1 + "-store-changelog"; + changelog2 = appId + "-" + TOPOLOGY_2 + "-store-changelog"; + changelog3 = appId + "-" + TOPOLOGY_3 + "-store-changelog"; + props = configProps(appId); + + streams = new KafkaStreamsNamedTopologyWrapper(props, clientSupplier); + + topology1Builder = streams.newNamedTopologyBuilder(TOPOLOGY_1); + topology2Builder = streams.newNamedTopologyBuilder(TOPOLOGY_2); + topology3Builder = streams.newNamedTopologyBuilder(TOPOLOGY_3); // TODO KAFKA-12648: refactor to avoid deleting & (re)creating outputs topics for each test CLUSTER.createTopic(OUTPUT_STREAM_1, 2, 1); @@ -186,6 +193,14 @@ public void setup() throws Exception { CLUSTER.createTopic(OUTPUT_STREAM_3, 2, 1); } + private void setupSecondKafkaStreams() { + props2 = configProps(appId); + streams2 = new KafkaStreamsNamedTopologyWrapper(props2, clientSupplier); + topology1Builder2 = streams2.newNamedTopologyBuilder(TOPOLOGY_1); + topology2Builder2 = streams2.newNamedTopologyBuilder(TOPOLOGY_2); + topology3Builder2 = streams2.newNamedTopologyBuilder(TOPOLOGY_3); + } + @After public void shutdown() throws Exception { if (streams != null) { @@ -203,10 +218,10 @@ public void shouldPrefixAllInternalTopicNamesWithNamedTopology() throws Exceptio final String countTopologyName = "count-topology"; final String fkjTopologyName = "FKJ-topology"; - final NamedTopologyStreamsBuilder countBuilder = new NamedTopologyStreamsBuilder(countTopologyName); + final NamedTopologyBuilder countBuilder = streams.newNamedTopologyBuilder(countTopologyName); countBuilder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(); - final NamedTopologyStreamsBuilder fkjBuilder = new NamedTopologyStreamsBuilder(fkjTopologyName); + final NamedTopologyBuilder fkjBuilder = streams.newNamedTopologyBuilder(fkjTopologyName); final UniqueTopicSerdeScope serdeScope = new UniqueTopicSerdeScope(); final KTable left = fkjBuilder.table( @@ -225,8 +240,8 @@ public void shouldPrefixAllInternalTopicNamesWithNamedTopology() throws Exceptio (value1, value2) -> String.valueOf(value1 + value2), Materialized.with(null, serdeScope.decorateSerde(Serdes.String(), props, false))); - streams = new KafkaStreamsNamedTopologyWrapper(buildNamedTopologies(fkjBuilder, countBuilder), props, clientSupplier); - IntegrationTestUtils.startApplicationAndWaitUntilRunning(singletonList(streams), Duration.ofSeconds(15)); + streams.start(asList(fkjBuilder.build(), countBuilder.build())); + waitForApplicationState(singletonList(streams), State.RUNNING, Duration.ofSeconds(60)); final String countTopicPrefix = appId + "-" + countTopologyName; final String fkjTopicPrefix = appId + "-" + fkjTopologyName; @@ -253,8 +268,8 @@ public void shouldProcessSingleNamedTopologyAndPrefixInternalTopics() throws Exc .groupByKey() .count(ROCKSDB_STORE) .toStream().to(OUTPUT_STREAM_1); - streams = new KafkaStreamsNamedTopologyWrapper(topology1Builder.buildNamedTopology(props), props, clientSupplier); - IntegrationTestUtils.startApplicationAndWaitUntilRunning(singletonList(streams), Duration.ofSeconds(15)); + streams.start(topology1Builder.build()); + waitForApplicationState(singletonList(streams), State.RUNNING, Duration.ofSeconds(30)); final List> results = waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3); assertThat(results, equalTo(COUNT_OUTPUT_DATA)); @@ -268,8 +283,8 @@ public void shouldProcessMultipleIdenticalNamedTopologiesWithInMemoryAndPersiste topology1Builder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(ROCKSDB_STORE).toStream().to(OUTPUT_STREAM_1); topology2Builder.stream(INPUT_STREAM_2).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2); topology3Builder.stream(INPUT_STREAM_3).groupBy((k, v) -> k).count(ROCKSDB_STORE).toStream().to(OUTPUT_STREAM_3); - streams = new KafkaStreamsNamedTopologyWrapper(buildNamedTopologies(topology1Builder, topology2Builder, topology3Builder), props, clientSupplier); - IntegrationTestUtils.startApplicationAndWaitUntilRunning(singletonList(streams), Duration.ofSeconds(15)); + streams.start(asList(topology1Builder.build(), topology2Builder.build(), topology3Builder.build())); + waitForApplicationState(singletonList(streams), State.RUNNING, Duration.ofSeconds(30)); assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA)); assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA)); @@ -281,8 +296,7 @@ public void shouldProcessMultipleIdenticalNamedTopologiesWithInMemoryAndPersiste @Test public void shouldAddNamedTopologyToUnstartedApplicationWithEmptyInitialTopology() throws Exception { topology1Builder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1); - streams = new KafkaStreamsNamedTopologyWrapper(props, clientSupplier); - streams.addNamedTopology(topology1Builder.buildNamedTopology(props)); + streams.addNamedTopology(topology1Builder.build()); IntegrationTestUtils.startApplicationAndWaitUntilRunning(singletonList(streams), Duration.ofSeconds(15)); assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA)); @@ -291,10 +305,9 @@ public void shouldAddNamedTopologyToUnstartedApplicationWithEmptyInitialTopology @Test public void shouldAddNamedTopologyToRunningApplicationWithEmptyInitialTopology() throws Exception { topology1Builder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1); - streams = new KafkaStreamsNamedTopologyWrapper(props, clientSupplier); streams.start(); - streams.addNamedTopology(topology1Builder.buildNamedTopology(props)); + streams.addNamedTopology(topology1Builder.build()); IntegrationTestUtils.waitForApplicationState(singletonList(streams), State.RUNNING, Duration.ofSeconds(15)); assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA)); @@ -304,10 +317,9 @@ public void shouldAddNamedTopologyToRunningApplicationWithEmptyInitialTopology() public void shouldAddNamedTopologyToRunningApplicationWithSingleInitialNamedTopology() throws Exception { topology1Builder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1); topology2Builder.stream(INPUT_STREAM_2).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2); - streams = new KafkaStreamsNamedTopologyWrapper(topology1Builder.buildNamedTopology(props), props, clientSupplier); - IntegrationTestUtils.startApplicationAndWaitUntilRunning(singletonList(streams), Duration.ofSeconds(15)); - - streams.addNamedTopology(topology2Builder.buildNamedTopology(props)); + streams.start(topology1Builder.build()); + waitForApplicationState(singletonList(streams), State.RUNNING, Duration.ofSeconds(30)); + streams.addNamedTopology(topology2Builder.build()); assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA)); assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA)); @@ -318,10 +330,10 @@ public void shouldAddNamedTopologyToRunningApplicationWithMultipleInitialNamedTo topology1Builder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(ROCKSDB_STORE).toStream().to(OUTPUT_STREAM_1); topology2Builder.stream(INPUT_STREAM_2).groupBy((k, v) -> k).count(ROCKSDB_STORE).toStream().to(OUTPUT_STREAM_2); topology3Builder.stream(INPUT_STREAM_3).groupBy((k, v) -> k).count(ROCKSDB_STORE).toStream().to(OUTPUT_STREAM_3); - streams = new KafkaStreamsNamedTopologyWrapper(buildNamedTopologies(topology1Builder, topology2Builder), props, clientSupplier); - IntegrationTestUtils.startApplicationAndWaitUntilRunning(singletonList(streams), Duration.ofSeconds(15)); + streams.start(asList(topology1Builder.build(), topology2Builder.build())); + waitForApplicationState(singletonList(streams), State.RUNNING, Duration.ofSeconds(30)); - streams.addNamedTopology(topology3Builder.buildNamedTopology(props)); + streams.addNamedTopology(topology3Builder.build()); assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA)); assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA)); @@ -330,18 +342,19 @@ public void shouldAddNamedTopologyToRunningApplicationWithMultipleInitialNamedTo @Test public void shouldAddNamedTopologyToRunningApplicationWithMultipleNodes() throws Exception { + setupSecondKafkaStreams(); topology1Builder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1); topology1Builder2.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1); topology2Builder.stream(INPUT_STREAM_2).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2); topology2Builder2.stream(INPUT_STREAM_2).groupBy((k, v) -> k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2); - streams = new KafkaStreamsNamedTopologyWrapper(topology1Builder.buildNamedTopology(props), props, clientSupplier); - streams2 = new KafkaStreamsNamedTopologyWrapper(topology1Builder2.buildNamedTopology(props2), props2, clientSupplier); - IntegrationTestUtils.startApplicationAndWaitUntilRunning(asList(streams, streams2), Duration.ofSeconds(15)); + streams.start(topology1Builder.build()); + streams2.start(topology1Builder2.build()); + waitForApplicationState(asList(streams, streams2), State.RUNNING, Duration.ofSeconds(30)); - streams.addNamedTopology(topology2Builder.buildNamedTopology(props)); - streams2.addNamedTopology(topology2Builder2.buildNamedTopology(props2)); + streams.addNamedTopology(topology2Builder.build()); + streams2.addNamedTopology(topology2Builder2.build()); assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA)); assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA)); @@ -359,8 +372,8 @@ public void shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing() throws topology2Builder.stream(DELAYED_INPUT_STREAM_2).map((k, v) -> { throw new IllegalStateException("Should not process any records for removed topology-2"); }); - streams = new KafkaStreamsNamedTopologyWrapper(buildNamedTopologies(topology1Builder, topology2Builder), props, clientSupplier); - IntegrationTestUtils.startApplicationAndWaitUntilRunning(singletonList(streams), Duration.ofSeconds(15)); + streams.start(asList(topology1Builder.build(), topology2Builder.build())); + waitForApplicationState(singletonList(streams), State.RUNNING, Duration.ofSeconds(30)); streams.removeNamedTopology("topology-2"); @@ -378,8 +391,9 @@ public void shouldRemoveAndReplaceTopologicallyIncompatibleNamedTopology() throw final KStream inputStream1 = topology1Builder.stream(INPUT_STREAM_1); inputStream1.groupByKey().count().toStream().to(COUNT_OUTPUT); inputStream1.groupByKey().reduce(Long::sum).toStream().to(SUM_OUTPUT); - streams = new KafkaStreamsNamedTopologyWrapper(buildNamedTopologies(topology1Builder), props, clientSupplier); - IntegrationTestUtils.startApplicationAndWaitUntilRunning(singletonList(streams), Duration.ofSeconds(15)); + streams = new KafkaStreamsNamedTopologyWrapper(props, clientSupplier); + streams.start(singletonList(topology1Builder.build())); + waitForApplicationState(singletonList(streams), State.RUNNING, Duration.ofSeconds(30)); assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA)); assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), equalTo(SUM_OUTPUT_DATA)); @@ -387,12 +401,13 @@ public void shouldRemoveAndReplaceTopologicallyIncompatibleNamedTopology() throw streams.cleanUpNamedTopology("topology-1"); // Prepare a new named topology with the same name but an incompatible topology (stateful subtopologies swap order) + final NamedTopologyBuilder topology1Builder2 = streams.newNamedTopologyBuilder(TOPOLOGY_1); final KStream inputStream2 = topology1Builder2.stream(DELAYED_INPUT_STREAM_1); inputStream2.groupByKey().reduce(Long::sum).toStream().to(SUM_OUTPUT); inputStream2.groupByKey().count().toStream().to(COUNT_OUTPUT); produceToInputTopics(DELAYED_INPUT_STREAM_1, STANDARD_INPUT_DATA); - streams.addNamedTopology(topology1Builder2.buildNamedTopology(props)); + streams.addNamedTopology(topology1Builder2.build()); assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA)); assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), equalTo(SUM_OUTPUT_DATA)); @@ -404,8 +419,8 @@ public void shouldAllowPatternSubscriptionWithMultipleNamedTopologies() throws E topology1Builder.stream(Pattern.compile(INPUT_STREAM_1)).groupBy((k, v) -> k).count().toStream().to(OUTPUT_STREAM_1); topology2Builder.stream(Pattern.compile(INPUT_STREAM_2)).groupBy((k, v) -> k).count().toStream().to(OUTPUT_STREAM_2); topology3Builder.stream(Pattern.compile(INPUT_STREAM_3)).groupBy((k, v) -> k).count().toStream().to(OUTPUT_STREAM_3); - streams = new KafkaStreamsNamedTopologyWrapper(buildNamedTopologies(topology1Builder, topology2Builder, topology3Builder), props, clientSupplier); - IntegrationTestUtils.startApplicationAndWaitUntilRunning(singletonList(streams), Duration.ofSeconds(15)); + streams.start(asList(topology1Builder.build(), topology2Builder.build(), topology3Builder.build())); + waitForApplicationState(singletonList(streams), State.RUNNING, Duration.ofSeconds(30)); assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA)); assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA)); @@ -417,8 +432,8 @@ public void shouldAllowMixedCollectionAndPatternSubscriptionWithMultipleNamedTop topology1Builder.stream(INPUT_STREAM_1).groupBy((k, v) -> k).count().toStream().to(OUTPUT_STREAM_1); topology2Builder.stream(Pattern.compile(INPUT_STREAM_2)).groupBy((k, v) -> k).count().toStream().to(OUTPUT_STREAM_2); topology3Builder.stream(Pattern.compile(INPUT_STREAM_3)).groupBy((k, v) -> k).count().toStream().to(OUTPUT_STREAM_3); - streams = new KafkaStreamsNamedTopologyWrapper(buildNamedTopologies(topology1Builder, topology2Builder, topology3Builder), props, clientSupplier); - IntegrationTestUtils.startApplicationAndWaitUntilRunning(singletonList(streams), Duration.ofSeconds(15)); + streams.start(asList(topology1Builder.build(), topology2Builder.build(), topology3Builder.build())); + waitForApplicationState(singletonList(streams), State.RUNNING, Duration.ofSeconds(30)); assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA)); assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA)); @@ -433,12 +448,4 @@ private static void produceToInputTopics(final String topic, final Collection buildNamedTopologies(final NamedTopologyStreamsBuilder... builders) { - final List topologies = new ArrayList<>(); - for (final NamedTopologyStreamsBuilder builder : builders) { - topologies.add(builder.buildNamedTopology(props)); - } - return topologies; - } } diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java index fb2d0a181768e..f9d2708e98b63 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/StoreQueryIntegrationTest.java @@ -23,6 +23,7 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KafkaStreams.State; import org.apache.kafka.streams.KeyQueryMetadata; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StoreQueryParameters; @@ -34,7 +35,7 @@ import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper; -import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyStreamsBuilder; +import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.QueryableStoreType; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; @@ -66,6 +67,7 @@ import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.getStore; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName; import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning; +import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState; import static org.apache.kafka.streams.state.QueryableStoreTypes.keyValueStore; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; @@ -397,23 +399,25 @@ public void shouldQuerySpecificStalePartitionStoresMultiStreamThreadsNamedTopolo final Semaphore semaphore = new Semaphore(0); final int numStreamThreads = 2; - final NamedTopologyStreamsBuilder builder1A = new NamedTopologyStreamsBuilder("topology-A"); - getStreamsBuilderWithTopology(builder1A, semaphore); - - final NamedTopologyStreamsBuilder builder2A = new NamedTopologyStreamsBuilder("topology-A"); - getStreamsBuilderWithTopology(builder2A, semaphore); - final Properties streamsConfiguration1 = streamsConfiguration(); streamsConfiguration1.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numStreamThreads); final Properties streamsConfiguration2 = streamsConfiguration(); streamsConfiguration2.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numStreamThreads); - final KafkaStreamsNamedTopologyWrapper kafkaStreams1 = createNamedTopologyKafkaStreams(builder1A, streamsConfiguration1); - final KafkaStreamsNamedTopologyWrapper kafkaStreams2 = createNamedTopologyKafkaStreams(builder2A, streamsConfiguration2); + final KafkaStreamsNamedTopologyWrapper kafkaStreams1 = createNamedTopologyKafkaStreams(streamsConfiguration1); + final KafkaStreamsNamedTopologyWrapper kafkaStreams2 = createNamedTopologyKafkaStreams(streamsConfiguration2); final List kafkaStreamsList = Arrays.asList(kafkaStreams1, kafkaStreams2); - startApplicationAndWaitUntilRunning(kafkaStreamsList, Duration.ofSeconds(60)); + final NamedTopologyBuilder builder1A = kafkaStreams1.newNamedTopologyBuilder("topology-A", streamsConfiguration1); + getStreamsBuilderWithTopology(builder1A, semaphore); + + final NamedTopologyBuilder builder2A = kafkaStreams2.newNamedTopologyBuilder("topology-A", streamsConfiguration2); + getStreamsBuilderWithTopology(builder2A, semaphore); + + kafkaStreams1.start(builder1A.build()); + kafkaStreams2.start(builder2A.build()); + waitForApplicationState(kafkaStreamsList, State.RUNNING, Duration.ofSeconds(60)); assertTrue(kafkaStreams1.metadataForLocalThreads().size() > 1); assertTrue(kafkaStreams2.metadataForLocalThreads().size() > 1); @@ -567,8 +571,8 @@ private KafkaStreams createKafkaStreams(final StreamsBuilder builder, final Prop return streams; } - private KafkaStreamsNamedTopologyWrapper createNamedTopologyKafkaStreams(final NamedTopologyStreamsBuilder builder, final Properties config) { - final KafkaStreamsNamedTopologyWrapper streams = new KafkaStreamsNamedTopologyWrapper(builder.buildNamedTopology(config), config); + private KafkaStreamsNamedTopologyWrapper createNamedTopologyKafkaStreams(final Properties config) { + final KafkaStreamsNamedTopologyWrapper streams = new KafkaStreamsNamedTopologyWrapper(config); streamsToCleanup.add(streams); return streams; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java index 74d81bd7f3a37..4b4ff571673f6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreatorTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.namedtopology.TopologyConfig; import org.apache.kafka.streams.state.internals.ThreadCache; import org.apache.kafka.test.MockClientSupplier; import org.easymock.EasyMockRunner; @@ -80,8 +81,6 @@ public class ActiveTaskCreatorTest { private ActiveTaskCreator activeTaskCreator; - - // non-EOS test // functional test @@ -477,6 +476,7 @@ private void createTasks() { final SourceNode sourceNode = mock(SourceNode.class); reset(builder, stateDirectory); + expect(builder.topologyConfigs()).andStubReturn(new TopologyConfig(new StreamsConfig(properties))); expect(builder.buildSubtopology(0)).andReturn(topology).anyTimes(); expect(stateDirectory.getOrCreateDirectoryForTask(task00)).andReturn(mock(File.class)); expect(stateDirectory.checkpointFileFor(task00)).andReturn(mock(File.class)); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index 74f40599e9e0c..8d4cb46b8864d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -23,12 +23,14 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.TopologyDescription; +import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.SubtopologyDescription; import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology; +import org.apache.kafka.streams.processor.internals.namedtopology.TopologyConfig; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; @@ -47,6 +49,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.regex.Pattern; @@ -927,10 +930,64 @@ public void shouldSetCorrectSourceNodesWithRegexUpdatedTopics() { } @Test - public void shouldSetStreamsConfigOnRewriteTopology() { + public void shouldSetTopologyConfigOnRewriteTopology() { + final Properties globalProps = StreamsTestUtils.getStreamsConfig(); + globalProps.put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 100L); + final StreamsConfig globalStreamsConfig = new StreamsConfig(globalProps); + final InternalTopologyBuilder topologyBuilder = builder.rewriteTopology(globalStreamsConfig); + assertThat(topologyBuilder.topologyConfigs(), equalTo(new TopologyConfig(null, globalStreamsConfig, new Properties()))); + assertThat(topologyBuilder.topologyConfigs().getTaskConfig().maxTaskIdleMs, equalTo(100L)); + } + + @Test + public void shouldOverrideGlobalStreamsConfigWhenGivenNamedTopologyProps() { + final Properties topologyOverrides = new Properties(); + topologyOverrides.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 12345L); + topologyOverrides.put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 500L); + topologyOverrides.put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1000L); + topologyOverrides.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 15); + topologyOverrides.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class); + topologyOverrides.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); + final StreamsConfig config = new StreamsConfig(StreamsTestUtils.getStreamsConfig()); - final InternalTopologyBuilder topologyBuilder = builder.rewriteTopology(config); - assertThat(topologyBuilder.getStreamsConfig(), equalTo(config)); + final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder( + new TopologyConfig( + "my-topology", + config, + topologyOverrides) + ); + + assertThat(topologyBuilder.topologyConfigs().cacheSize, is(12345L)); + assertThat(topologyBuilder.topologyConfigs().getTaskConfig().maxTaskIdleMs, equalTo(500L)); + assertThat(topologyBuilder.topologyConfigs().getTaskConfig().taskTimeoutMs, equalTo(1000L)); + assertThat(topologyBuilder.topologyConfigs().getTaskConfig().maxBufferedSize, equalTo(15)); + assertThat(topologyBuilder.topologyConfigs().getTaskConfig().timestampExtractor.getClass(), equalTo(MockTimestampExtractor.class)); + assertThat(topologyBuilder.topologyConfigs().getTaskConfig().deserializationExceptionHandler.getClass(), equalTo(LogAndContinueExceptionHandler.class)); + } + + @Test + public void shouldNotOverrideGlobalStreamsConfigWhenGivenUnnamedTopologyProps() { + final Properties streamsProps = StreamsTestUtils.getStreamsConfig(); + streamsProps.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 12345L); + streamsProps.put(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG, 500L); + streamsProps.put(StreamsConfig.TASK_TIMEOUT_MS_CONFIG, 1000L); + streamsProps.put(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, 15); + streamsProps.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class); + streamsProps.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class); + + final StreamsConfig config = new StreamsConfig(streamsProps); + final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder( + new TopologyConfig( + "my-topology", + config, + new Properties()) + ); + assertThat(topologyBuilder.topologyConfigs().cacheSize, is(12345L)); + assertThat(topologyBuilder.topologyConfigs().getTaskConfig().maxTaskIdleMs, is(500L)); + assertThat(topologyBuilder.topologyConfigs().getTaskConfig().taskTimeoutMs, is(1000L)); + assertThat(topologyBuilder.topologyConfigs().getTaskConfig().maxBufferedSize, is(15)); + assertThat(topologyBuilder.topologyConfigs().getTaskConfig().timestampExtractor.getClass(), is(MockTimestampExtractor.class)); + assertThat(topologyBuilder.topologyConfigs().getTaskConfig().deserializationExceptionHandler.getClass(), is(LogAndContinueExceptionHandler.class)); } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java index 59b3eda727d0f..761fa07101931 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java @@ -22,7 +22,7 @@ import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper; import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopology; -import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyStreamsBuilder; +import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.test.TestUtils; @@ -41,11 +41,11 @@ public class NamedTopologyTest { final KafkaClientSupplier clientSupplier = new DefaultKafkaClientSupplier(); final Properties props = configProps(); - final NamedTopologyStreamsBuilder builder1 = new NamedTopologyStreamsBuilder("topology-1"); - final NamedTopologyStreamsBuilder builder2 = new NamedTopologyStreamsBuilder("topology-2"); - final NamedTopologyStreamsBuilder builder3 = new NamedTopologyStreamsBuilder("topology-3"); + private final KafkaStreamsNamedTopologyWrapper streams = new KafkaStreamsNamedTopologyWrapper(props); - KafkaStreamsNamedTopologyWrapper streams; + private final NamedTopologyBuilder builder1 = streams.newNamedTopologyBuilder("topology-1"); + private final NamedTopologyBuilder builder2 = streams.newNamedTopologyBuilder("topology-2"); + private final NamedTopologyBuilder builder3 = streams.newNamedTopologyBuilder("topology-3"); @Before public void setup() { @@ -56,9 +56,7 @@ public void setup() { @After public void cleanup() { - if (streams != null) { - streams.close(); - } + streams.close(); } private static Properties configProps() { @@ -71,14 +69,13 @@ private static Properties configProps() { @Test public void shouldThrowIllegalArgumentOnIllegalName() { - assertThrows(IllegalArgumentException.class, () -> new NamedTopologyStreamsBuilder("__not-allowed__")); + assertThrows(IllegalArgumentException.class, () -> streams.newNamedTopologyBuilder("__not-allowed__")); } @Test public void shouldBuildSingleNamedTopology() { builder1.stream("stream-1").filter((k, v) -> !k.equals(v)).to("output-1"); - - streams = new KafkaStreamsNamedTopologyWrapper(builder1.buildNamedTopology(props), props, clientSupplier); + streams.start(builder1.build()); } @Test @@ -87,22 +84,20 @@ public void shouldBuildMultipleIdenticalNamedTopologyWithRepartition() { builder2.stream("stream-2").selectKey((k, v) -> v).groupByKey().count().toStream().to("output-2"); builder3.stream("stream-3").selectKey((k, v) -> v).groupByKey().count().toStream().to("output-3"); - streams = new KafkaStreamsNamedTopologyWrapper( + streams.start( asList( - builder1.buildNamedTopology(props), - builder2.buildNamedTopology(props), - builder3.buildNamedTopology(props)), - props, - clientSupplier + builder1.build(), + builder2.build(), + builder3.build()) ); } @Test public void shouldReturnTopologyByName() { - final NamedTopology topology1 = builder1.buildNamedTopology(props); - final NamedTopology topology2 = builder2.buildNamedTopology(props); - final NamedTopology topology3 = builder3.buildNamedTopology(props); - streams = new KafkaStreamsNamedTopologyWrapper(asList(topology1, topology2, topology3), props, clientSupplier); + final NamedTopology topology1 = builder1.build(); + final NamedTopology topology2 = builder2.build(); + final NamedTopology topology3 = builder3.build(); + streams.start(asList(topology1, topology2, topology3)); assertThat(streams.getTopologyByName("topology-1").get(), equalTo(topology1)); assertThat(streams.getTopologyByName("topology-2").get(), equalTo(topology2)); assertThat(streams.getTopologyByName("topology-3").get(), equalTo(topology3)); @@ -110,7 +105,7 @@ public void shouldReturnTopologyByName() { @Test public void shouldReturnEmptyWhenLookingUpNonExistentTopologyByName() { - streams = new KafkaStreamsNamedTopologyWrapper(builder1.buildNamedTopology(props), props, clientSupplier); + streams.start(builder1.build()); assertThat(streams.getTopologyByName("non-existent-topology").isPresent(), equalTo(false)); } @@ -119,12 +114,7 @@ public void shouldAllowSameStoreNameToBeUsedByMultipleNamedTopologies() { builder1.stream("stream-1").selectKey((k, v) -> v).groupByKey().count(Materialized.as(Stores.inMemoryKeyValueStore("store"))); builder2.stream("stream-2").selectKey((k, v) -> v).groupByKey().count(Materialized.as(Stores.inMemoryKeyValueStore("store"))); - streams = new KafkaStreamsNamedTopologyWrapper(asList( - builder1.buildNamedTopology(props), - builder2.buildNamedTopology(props)), - props, - clientSupplier - ); + streams.start(asList(builder1.build(), builder2.build())); } @Test @@ -134,12 +124,9 @@ public void shouldThrowTopologyExceptionWhenMultipleNamedTopologiesCreateStreamF assertThrows( TopologyException.class, - () -> streams = new KafkaStreamsNamedTopologyWrapper( - asList( - builder1.buildNamedTopology(props), - builder2.buildNamedTopology(props)), - props, - clientSupplier) + () -> streams.start(asList( + builder1.build(), + builder2.build())) ); } @@ -150,12 +137,9 @@ public void shouldThrowTopologyExceptionWhenMultipleNamedTopologiesCreateTableFr assertThrows( TopologyException.class, - () -> streams = new KafkaStreamsNamedTopologyWrapper( - asList( - builder1.buildNamedTopology(props), - builder2.buildNamedTopology(props)), - props, - clientSupplier) + () -> streams.start(asList( + builder1.build(), + builder2.build())) ); } @@ -166,12 +150,9 @@ public void shouldThrowTopologyExceptionWhenMultipleNamedTopologiesCreateStreamA assertThrows( TopologyException.class, - () -> streams = new KafkaStreamsNamedTopologyWrapper( - asList( - builder1.buildNamedTopology(props), - builder2.buildNamedTopology(props)), - props, - clientSupplier) + () -> streams.start(asList( + builder1.build(), + builder2.build())) ); } @@ -182,12 +163,9 @@ public void shouldThrowTopologyExceptionWhenMultipleNamedTopologiesCreateStreamF assertThrows( TopologyException.class, - () -> streams = new KafkaStreamsNamedTopologyWrapper( - asList( - builder1.buildNamedTopology(props), - builder2.buildNamedTopology(props)), - props, - clientSupplier) + () -> streams.start(asList( + builder1.build(), + builder2.build())) ); } @@ -198,24 +176,21 @@ public void shouldThrowTopologyExceptionWhenMultipleNamedTopologiesCreateStreamF assertThrows( TopologyException.class, - () -> streams = new KafkaStreamsNamedTopologyWrapper( - asList( - builder1.buildNamedTopology(props), - builder2.buildNamedTopology(props)), - props, - clientSupplier) + () -> streams.start(asList( + builder1.build(), + builder2.build())) ); } @Test public void shouldDescribeWithSingleNamedTopology() { builder1.stream("input").filter((k, v) -> !k.equals(v)).to("output"); - streams = new KafkaStreamsNamedTopologyWrapper(builder1.buildNamedTopology(props), props, clientSupplier); + streams.start(builder1.build()); assertThat( streams.getFullTopologyDescription(), equalTo( - "Topology - topology-1:\n" + "Topology: topology-1:\n" + " Sub-topology: 0\n" + " Source: KSTREAM-SOURCE-0000000000 (topics: [input-1])\n" + " --> none\n" @@ -237,19 +212,17 @@ public void shouldDescribeWithMultipleNamedTopologies() { builder2.stream("stream-2").filter((k, v) -> !k.equals(v)).to("output-2"); builder3.stream("stream-3").filter((k, v) -> !k.equals(v)).to("output-3"); - streams = new KafkaStreamsNamedTopologyWrapper( + streams.start( asList( - builder1.buildNamedTopology(props), - builder2.buildNamedTopology(props), - builder3.buildNamedTopology(props)), - props, - clientSupplier + builder1.build(), + builder2.build(), + builder3.build()) ); assertThat( streams.getFullTopologyDescription(), equalTo( - "Topology - topology-1:\n" + "Topology: topology-1:\n" + " Sub-topology: 0\n" + " Source: KSTREAM-SOURCE-0000000000 (topics: [input-1])\n" + " --> none\n" @@ -263,7 +236,7 @@ public void shouldDescribeWithMultipleNamedTopologies() { + " Sink: KSTREAM-SINK-0000000003 (topic: output-1)\n" + " <-- KSTREAM-FILTER-0000000002\n" + "\n" - + "Topology - topology-2:\n" + + "Topology: topology-2:\n" + " Sub-topology: 0\n" + " Source: KSTREAM-SOURCE-0000000000 (topics: [input-2])\n" + " --> none\n" @@ -277,7 +250,7 @@ public void shouldDescribeWithMultipleNamedTopologies() { + " Sink: KSTREAM-SINK-0000000003 (topic: output-2)\n" + " <-- KSTREAM-FILTER-0000000002\n" + "\n" - + "Topology - topology-3:\n" + + "Topology: topology-3:\n" + " Sub-topology: 0\n" + " Source: KSTREAM-SOURCE-0000000000 (topics: [input-3])\n" + " --> none\n" @@ -295,8 +268,6 @@ public void shouldDescribeWithMultipleNamedTopologies() { @Test public void shouldDescribeWithEmptyNamedTopology() { - streams = new KafkaStreamsNamedTopologyWrapper(props, clientSupplier); - assertThat(streams.getFullTopologyDescription(), equalTo("")); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index 50f4c33db7925..00daaa6e9d82b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -36,6 +36,7 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.namedtopology.TopologyConfig; import org.apache.kafka.streams.state.internals.ThreadCache; import org.apache.kafka.test.MockKeyValueStore; import org.apache.kafka.test.MockKeyValueStoreBuilder; @@ -581,7 +582,7 @@ public void shouldInitTaskTimeoutAndEventuallyThrow() { } @Test - public void shouldCLearTaskTimeout() { + public void shouldClearTaskTimeout() { EasyMock.replay(stateManager); task = createStandbyTask(); @@ -611,7 +612,7 @@ private StandbyTask createStandbyTask() { taskId, Collections.singleton(partition), topology, - config, + new TopologyConfig(config).getTaskConfig(), streamsMetrics, stateManager, stateDirectory, diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index c372234ad3551..1b3c5001d17a0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -57,6 +57,7 @@ import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.namedtopology.TopologyConfig; import org.apache.kafka.streams.state.internals.ThreadCache; import org.apache.kafka.test.MockKeyValueStore; import org.apache.kafka.test.MockProcessorNode; @@ -83,6 +84,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Properties; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -1593,7 +1595,7 @@ public void shouldReturnOffsetsForRepartitionTopicsForPurging() { mkSet(partition1, repartition), topology, consumer, - config, + new TopologyConfig(null, config, new Properties()).getTaskConfig(), streamsMetrics, stateDirectory, cache, @@ -2179,7 +2181,7 @@ public void shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic() { partitions, topology, consumer, - createConfig("100"), + new TopologyConfig(null, createConfig("100"), new Properties()).getTaskConfig(), metrics, stateDirectory, cache, @@ -2246,7 +2248,7 @@ private StreamTask createOptimizedStatefulTask(final StreamsConfig config, final mkSet(partition1), topology, consumer, - config, + new TopologyConfig(null, config, new Properties()).getTaskConfig(), streamsMetrics, stateDirectory, cache, @@ -2287,7 +2289,7 @@ public Map committed(final Set tasksToCommit) { } }; + final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); + topologyMetadata.buildAndRewriteTopology(); final StreamThread thread = new StreamThread( mockTime, config, @@ -772,7 +780,7 @@ int commit(final Collection tasksToCommit) { null, taskManager, streamsMetrics, - new TopologyMetadata(internalTopologyBuilder, config), + topologyMetadata, CLIENT_ID, new LogContext(""), new AtomicInteger(), @@ -827,6 +835,8 @@ public void shouldRecordCommitLatency() { final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); + final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); + topologyMetadata.buildAndRewriteTopology(); final TaskManager taskManager = new TaskManager( null, @@ -836,7 +846,7 @@ public void shouldRecordCommitLatency() { null, activeTaskCreator, standbyTaskCreator, - new TopologyMetadata(internalTopologyBuilder, config), + topologyMetadata, null, null, null @@ -859,7 +869,7 @@ int commit(final Collection tasksToCommit) { null, taskManager, streamsMetrics, - new TopologyMetadata(internalTopologyBuilder, config), + topologyMetadata, CLIENT_ID, new LogContext(""), new AtomicInteger(), @@ -1168,7 +1178,6 @@ public void shouldNotReturnDataAfterTaskMigrated() { final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet()); final InternalTopologyBuilder internalTopologyBuilder = EasyMock.createNiceMock(InternalTopologyBuilder.class); - expect(internalTopologyBuilder.sourceTopicCollection()).andReturn(Collections.singletonList(topic1)).times(2); final MockConsumer consumer = new MockConsumer<>(OffsetResetStrategy.LATEST); @@ -1190,7 +1199,6 @@ public void restore(final Map tasks) { }; taskManager.handleLostAll(); - EasyMock.replay(taskManager, internalTopologyBuilder); final StreamsMetricsImpl streamsMetrics = @@ -1241,6 +1249,8 @@ public void shouldShutdownTaskManagerOnCloseWithoutStart() { final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); + final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); + topologyMetadata.buildAndRewriteTopology(); final StreamThread thread = new StreamThread( mockTime, config, @@ -1251,7 +1261,7 @@ public void shouldShutdownTaskManagerOnCloseWithoutStart() { null, taskManager, streamsMetrics, - new TopologyMetadata(internalTopologyBuilder, config), + topologyMetadata, CLIENT_ID, new LogContext(""), new AtomicInteger(), @@ -1279,6 +1289,8 @@ public void shouldOnlyShutdownOnce() { final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); + final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); + topologyMetadata.buildAndRewriteTopology(); final StreamThread thread = new StreamThread( mockTime, config, @@ -1289,7 +1301,7 @@ public void shouldOnlyShutdownOnce() { null, taskManager, streamsMetrics, - new TopologyMetadata(internalTopologyBuilder, config), + topologyMetadata, CLIENT_ID, new LogContext(""), new AtomicInteger(), @@ -1615,10 +1627,11 @@ public void shouldReturnActiveTaskMetadataWhileRunningState() { mockTime ); - internalTopologyBuilder.buildTopology(); + final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); + topologyMetadata.buildAndRewriteTopology(); final StreamThread thread = StreamThread.create( - new TopologyMetadata(internalTopologyBuilder, config), + topologyMetadata, config, clientSupplier, clientSupplier.getAdmin(config.getAdminConfigs(CLIENT_ID)), @@ -2186,6 +2199,8 @@ public void shouldThrowTaskMigratedExceptionHandlingTaskLost() { final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); + final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); + topologyMetadata.buildAndRewriteTopology(); final StreamThread thread = new StreamThread( mockTime, config, @@ -2196,7 +2211,7 @@ public void shouldThrowTaskMigratedExceptionHandlingTaskLost() { null, taskManager, streamsMetrics, - new TopologyMetadata(internalTopologyBuilder, config), + topologyMetadata, CLIENT_ID, new LogContext(""), new AtomicInteger(), @@ -2234,6 +2249,8 @@ public void shouldThrowTaskMigratedExceptionHandlingRevocation() { final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); + final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); + topologyMetadata.buildAndRewriteTopology(); final StreamThread thread = new StreamThread( mockTime, config, @@ -2244,7 +2261,7 @@ public void shouldThrowTaskMigratedExceptionHandlingRevocation() { null, taskManager, streamsMetrics, - new TopologyMetadata(internalTopologyBuilder, config), + topologyMetadata, CLIENT_ID, new LogContext(""), new AtomicInteger(), @@ -2640,6 +2657,8 @@ public void shouldNotCommitNonRunningNonRestoringTasks() { final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); + final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); + topologyMetadata.buildAndRewriteTopology(); final StreamThread thread = new StreamThread( mockTime, config, @@ -2650,7 +2669,7 @@ public void shouldNotCommitNonRunningNonRestoringTasks() { null, taskManager, streamsMetrics, - new TopologyMetadata(internalTopologyBuilder, config), + topologyMetadata, CLIENT_ID, new LogContext(""), new AtomicInteger(), @@ -2782,6 +2801,8 @@ public void shouldTransmitTaskManagerMetrics() { final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); + final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); + topologyMetadata.buildAndRewriteTopology(); final StreamThread thread = new StreamThread( mockTime, new StreamsConfig(configProps(true)), @@ -2792,7 +2813,7 @@ public void shouldTransmitTaskManagerMetrics() { null, taskManager, streamsMetrics, - new TopologyMetadata(internalTopologyBuilder, config), + topologyMetadata, CLIENT_ID, new LogContext(""), new AtomicInteger(), @@ -2823,6 +2844,8 @@ public void shouldConstructAdminMetrics() { final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(metrics, CLIENT_ID, StreamsConfig.METRICS_LATEST, mockTime); + final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config); + topologyMetadata.buildAndRewriteTopology(); final StreamThread thread = new StreamThread( mockTime, config, @@ -2833,7 +2856,7 @@ public void shouldConstructAdminMetrics() { null, taskManager, streamsMetrics, - new TopologyMetadata(internalTopologyBuilder, config), + topologyMetadata, CLIENT_ID, new LogContext(""), new AtomicInteger(), @@ -2938,6 +2961,7 @@ private void setupInternalTopologyWithoutState() { (ProcessorSupplier) MockApiProcessor::new, "source1" ); + internalTopologyBuilder.setStreamsConfig(config); } // TODO: change return type to `StandbyTask` diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index c56f7bce9af17..72f92e2959190 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -45,6 +45,7 @@ import org.apache.kafka.streams.processor.internals.StreamsProducer; import org.apache.kafka.streams.processor.internals.Task; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; +import org.apache.kafka.streams.processor.internals.namedtopology.TopologyConfig; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.streams.state.ReadOnlySessionStore; @@ -442,7 +443,7 @@ private StreamTask createStreamsTask(final StreamsConfig streamsConfig, partitions, topology, clientSupplier.consumer, - streamsConfig, + new TopologyConfig(null, streamsConfig, new Properties()).getTaskConfig(), streamsMetrics, stateDirectory, EasyMock.createNiceMock(ThreadCache.class), diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 05f10e98f2ef9..2dc4026f40b52 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -70,6 +70,7 @@ import org.apache.kafka.streams.processor.internals.Task; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; +import org.apache.kafka.streams.processor.internals.namedtopology.TopologyConfig.TaskConfig; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; @@ -367,7 +368,7 @@ public Consumer getGlobalConsumer(final Map conf ); setupGlobalTask(mockWallClockTime, streamsConfig, streamsMetrics, cache); - setupTask(streamsConfig, streamsMetrics, cache); + setupTask(streamsConfig, streamsMetrics, cache, internalTopologyBuilder.topologyConfigs().getTaskConfig()); } private static void logIfTaskIdleEnabled(final StreamsConfig streamsConfig) { @@ -469,7 +470,8 @@ private void setupGlobalTask(final Time mockWallClockTime, @SuppressWarnings("deprecation") private void setupTask(final StreamsConfig streamsConfig, final StreamsMetricsImpl streamsMetrics, - final ThreadCache cache) { + final ThreadCache cache, + final TaskConfig taskConfig) { if (!partitionsByInputTopic.isEmpty()) { consumer.assign(partitionsByInputTopic.values()); final Map startOffsets = new HashMap<>(); @@ -509,7 +511,7 @@ private void setupTask(final StreamsConfig streamsConfig, new HashSet<>(partitionsByInputTopic.values()), processorTopology, consumer, - streamsConfig, + taskConfig, streamsMetrics, stateDirectory, cache,