From 88291e8d389fd3d5546844cf04e5aaa1bda75bcd Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Wed, 4 Aug 2021 22:29:34 -0700 Subject: [PATCH 1/3] WIP --- .../internals/ActiveTaskCreator.java | 22 +++++----- .../processor/internals/StandbyTask.java | 8 ++-- .../internals/StandbyTaskCreator.java | 14 ++++--- .../processor/internals/StreamTask.java | 14 ++++--- .../processor/internals/TopologyMetadata.java | 37 ++++++++++++++++- .../KafkaStreamsNamedTopologyWrapper.java | 22 +++++++--- .../internals/namedtopology/TaskConfig.java | 41 +++++++++++++++++++ .../NamedTopologyIntegrationTest.java | 2 +- 8 files changed, 126 insertions(+), 34 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TaskConfig.java diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java index 06171d3850988..6d1f9f6c6ac9b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ActiveTaskCreator.java @@ -30,6 +30,7 @@ import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics; +import org.apache.kafka.streams.processor.internals.namedtopology.TaskConfig; import org.apache.kafka.streams.state.internals.ThreadCache; import org.slf4j.Logger; @@ -51,7 +52,7 @@ class ActiveTaskCreator { private final TopologyMetadata topologyMetadata; - private final StreamsConfig config; + private final StreamsConfig applicationConfig; private final StreamsMetricsImpl streamsMetrics; private final StateDirectory stateDirectory; private final ChangelogReader storeChangelogReader; @@ -71,7 +72,7 @@ class ActiveTaskCreator { private final Map> unknownTasksToBeCreated = new HashMap<>(); ActiveTaskCreator(final TopologyMetadata topologyMetadata, - final StreamsConfig config, + final StreamsConfig applicationConfig, final StreamsMetricsImpl streamsMetrics, final StateDirectory stateDirectory, final ChangelogReader storeChangelogReader, @@ -82,7 +83,7 @@ class ActiveTaskCreator { final UUID processId, final Logger log) { this.topologyMetadata = topologyMetadata; - this.config = config; + this.applicationConfig = applicationConfig; this.streamsMetrics = streamsMetrics; this.stateDirectory = stateDirectory; this.storeChangelogReader = storeChangelogReader; @@ -93,7 +94,7 @@ class ActiveTaskCreator { this.log = log; createTaskSensor = ThreadMetrics.createTaskSensor(threadId, streamsMetrics); - processingMode = StreamThread.processingMode(config); + processingMode = StreamThread.processingMode(applicationConfig); if (processingMode == EXACTLY_ONCE_ALPHA) { threadProducer = null; @@ -105,7 +106,7 @@ class ActiveTaskCreator { final LogContext logContext = new LogContext(threadIdPrefix); threadProducer = new StreamsProducer( - config, + applicationConfig, threadId, clientSupplier, null, @@ -169,7 +170,7 @@ Collection createTasks(final Consumer consumer, final ProcessorStateManager stateManager = new ProcessorStateManager( taskId, Task.TaskType.ACTIVE, - StreamThread.eosEnabled(config), + StreamThread.eosEnabled(applicationConfig), logContext, stateDirectory, storeChangelogReader, @@ -179,7 +180,7 @@ Collection createTasks(final Consumer consumer, final InternalProcessorContext context = new ProcessorContextImpl( taskId, - config, + applicationConfig, stateManager, streamsMetrics, cache @@ -238,7 +239,7 @@ private StreamTask createActiveTask(final TaskId taskId, if (processingMode == StreamThread.ProcessingMode.EXACTLY_ONCE_ALPHA) { log.info("Creating producer client for task {}", taskId); streamsProducer = new StreamsProducer( - config, + applicationConfig, threadId, clientSupplier, taskId, @@ -253,7 +254,7 @@ private StreamTask createActiveTask(final TaskId taskId, logContext, taskId, streamsProducer, - config.defaultProductionExceptionHandler(), + applicationConfig.defaultProductionExceptionHandler(), streamsMetrics ); @@ -262,7 +263,8 @@ private StreamTask createActiveTask(final TaskId taskId, inputPartitions, topology, consumer, - config, + new TaskConfig(applicationConfig, topologyMetadata.getConfigForTask(taskId)), + StreamThread.eosEnabled(applicationConfig), streamsMetrics, stateDirectory, cache, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java index 802bca1624827..c028e737d9efe 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java @@ -27,6 +27,7 @@ import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics; +import org.apache.kafka.streams.processor.internals.namedtopology.TaskConfig; import org.apache.kafka.streams.state.internals.ThreadCache; import java.util.Collections; @@ -55,7 +56,8 @@ public class StandbyTask extends AbstractTask implements Task { StandbyTask(final TaskId id, final Set inputPartitions, final ProcessorTopology topology, - final StreamsConfig config, + final TaskConfig config, + final boolean eosEnabled, final StreamsMetricsImpl streamsMetrics, final ProcessorStateManager stateMgr, final StateDirectory stateDirectory, @@ -67,7 +69,7 @@ public class StandbyTask extends AbstractTask implements Task { stateDirectory, stateMgr, inputPartitions, - config.getLong(StreamsConfig.TASK_TIMEOUT_MS_CONFIG), + config.maxTaskIdleMs, "standby-task", StandbyTask.class ); @@ -76,7 +78,7 @@ public class StandbyTask extends AbstractTask implements Task { processorContext.transitionToStandby(cache); closeTaskSensor = ThreadMetrics.closeTaskSensor(Thread.currentThread().getName(), streamsMetrics); - eosEnabled = StreamThread.eosEnabled(config); + this.eosEnabled = eosEnabled; } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java index ee2a3e1938060..6251d97cd0366 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTaskCreator.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.processor.internals.Task.TaskType; import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics; +import org.apache.kafka.streams.processor.internals.namedtopology.TaskConfig; import org.apache.kafka.streams.state.internals.ThreadCache; import org.slf4j.Logger; @@ -38,7 +39,7 @@ class StandbyTaskCreator { private final TopologyMetadata topologyMetadata; - private final StreamsConfig config; + private final StreamsConfig applicationConfig; private final StreamsMetricsImpl streamsMetrics; private final StateDirectory stateDirectory; private final ChangelogReader storeChangelogReader; @@ -50,14 +51,14 @@ class StandbyTaskCreator { private final Map> unknownTasksToBeCreated = new HashMap<>(); StandbyTaskCreator(final TopologyMetadata topologyMetadata, - final StreamsConfig config, + final StreamsConfig applicationConfig, final StreamsMetricsImpl streamsMetrics, final StateDirectory stateDirectory, final ChangelogReader storeChangelogReader, final String threadId, final Logger log) { this.topologyMetadata = topologyMetadata; - this.config = config; + this.applicationConfig = applicationConfig; this.streamsMetrics = streamsMetrics; this.stateDirectory = stateDirectory; this.storeChangelogReader = storeChangelogReader; @@ -101,7 +102,7 @@ Collection createTasks(final Map> tasksToBeCre final ProcessorStateManager stateManager = new ProcessorStateManager( taskId, Task.TaskType.STANDBY, - StreamThread.eosEnabled(config), + StreamThread.eosEnabled(applicationConfig), getLogContext(taskId), stateDirectory, storeChangelogReader, @@ -111,7 +112,7 @@ Collection createTasks(final Map> tasksToBeCre final InternalProcessorContext context = new ProcessorContextImpl( taskId, - config, + applicationConfig, stateManager, streamsMetrics, dummyCache @@ -160,7 +161,8 @@ StandbyTask createStandbyTask(final TaskId taskId, taskId, inputPartitions, topology, - config, + new TaskConfig(applicationConfig, topologyMetadata.getConfigForTask(taskId)), + StreamThread.eosEnabled(applicationConfig), streamsMetrics, stateManager, stateDirectory, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index d59ec66840665..2ee1cf882fbab 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -44,6 +44,7 @@ import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; import org.apache.kafka.streams.processor.internals.metrics.ThreadMetrics; +import org.apache.kafka.streams.processor.internals.namedtopology.TaskConfig; import org.apache.kafka.streams.state.internals.ThreadCache; import java.io.IOException; @@ -117,7 +118,8 @@ public StreamTask(final TaskId id, final Set inputPartitions, final ProcessorTopology topology, final Consumer mainConsumer, - final StreamsConfig config, + final TaskConfig config, + final boolean eosEnabled, final StreamsMetricsImpl streamsMetrics, final StateDirectory stateDirectory, final ThreadCache cache, @@ -132,7 +134,7 @@ public StreamTask(final TaskId id, stateDirectory, stateMgr, inputPartitions, - config.getLong(StreamsConfig.TASK_TIMEOUT_MS_CONFIG), + config.taskTimeoutMs, "task", StreamTask.class ); @@ -143,7 +145,7 @@ public StreamTask(final TaskId id, this.time = time; this.recordCollector = recordCollector; - eosEnabled = StreamThread.eosEnabled(config); + this.eosEnabled = eosEnabled; final String threadId = Thread.currentThread().getName(); this.streamsMetrics = streamsMetrics; @@ -171,19 +173,19 @@ public StreamTask(final TaskId id, streamTimePunctuationQueue = new PunctuationQueue(); systemTimePunctuationQueue = new PunctuationQueue(); - maxBufferedSize = config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG); + maxBufferedSize = config.maxBufferedSize; // initialize the consumed and committed offset cache consumedOffsets = new HashMap<>(); resetOffsetsForPartitions = new HashSet<>(); - recordQueueCreator = new RecordQueueCreator(this.logContext, config.defaultTimestampExtractor(), config.defaultDeserializationExceptionHandler()); + recordQueueCreator = new RecordQueueCreator(this.logContext, config.timestampExtractor, config.deserializationExceptionHandler); recordInfo = new PartitionGroup.RecordInfo(); final Sensor enforcedProcessingSensor; enforcedProcessingSensor = TaskMetrics.enforcedProcessingSensor(threadId, taskId, streamsMetrics); - final long maxTaskIdleMs = config.getLong(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG); + final long maxTaskIdleMs = config.maxTaskIdleMs; partitionGroup = new PartitionGroup( logContext, createPartitionQueues(), diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java index 129ca09ad5c74..b05d263df33a9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java @@ -17,7 +17,10 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.internals.KafkaCompletableFuture; +import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.processor.StateStore; @@ -30,12 +33,14 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -70,6 +75,20 @@ public static class TopologyVersion { public AtomicLong topologyVersion = new AtomicLong(0L); // the local topology version public ReentrantLock topologyLock = new ReentrantLock(); public Condition topologyCV = topologyLock.newCondition(); + // TODO KAFKA-12648: Pt.4 + public List activeTopologyWaiters = new LinkedList<>(); + } + + public static class TopologyVersionWaiters { + final long topologyVersion; // the (minimum) version to wait for these threads to cross + final Semaphore threadSemaphore; // semaphore to count threads initially below this version that have updated + final KafkaCompletableFuture future; // the future waiting on all threads to be updated + + public TopologyVersionWaiters(final Semaphore threadSemaphore, final long topologyVersion, final KafkaCompletableFuture future) { + this.topologyVersion = topologyVersion; + this.threadSemaphore = threadSemaphore; + this.future = future; + } } public TopologyMetadata(final InternalTopologyBuilder builder, @@ -107,6 +126,10 @@ private void unlock() { version.topologyLock.unlock(); } + public StreamsConfig getConfigForTask(final TaskId taskId) { + return lookupBuilderForTask(taskId).getStreamsConfig(); + } + public void wakeupThreads() { try { lock(); @@ -134,7 +157,11 @@ public void maybeWaitForNonEmptyTopology(final Supplier threadState) { } } - public void registerAndBuildNewTopology(final InternalTopologyBuilder newTopologyBuilder) { + /** + * Adds the topology and registers a future that listens for all threads on the older version to see the update + */ + public KafkaFuture registerAndBuildNewTopology(final InternalTopologyBuilder newTopologyBuilder) { + final KafkaFuture future = new KafkaFutureImpl<>(); try { lock(); version.topologyVersion.incrementAndGet(); @@ -145,9 +172,14 @@ public void registerAndBuildNewTopology(final InternalTopologyBuilder newTopolog } finally { unlock(); } + return future; } - public void unregisterTopology(final String topologyName) { + /** + * Removes the topology and registers a future that listens for all threads on the older version to see the update + */ + public KafkaFuture unregisterTopology(final String topologyName) { + final KafkaFuture future = new KafkaFutureImpl<>(); try { lock(); version.topologyVersion.incrementAndGet(); @@ -159,6 +191,7 @@ public void unregisterTopology(final String topologyName) { } finally { unlock(); } + return future; } public void buildAndRewriteTopology() { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java index d4170d499c595..71ad45a98b1b4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.streams.processor.internals.namedtopology; +import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.annotation.InterfaceStability.Unstable; import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.KafkaStreams; @@ -121,40 +122,49 @@ public Optional getTopologyByName(final String name) { /** * Add a new NamedTopology to a running Kafka Streams app. If multiple instances of the application are running, - * you should inform all of them by calling {@link #addNamedTopology(NamedTopology)} on each client in order for + * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for * it to begin processing the new topology. * + * @return a {@link KafkaFuture} that completes successfully when all threads on this client have picked up the + * new {@link NamedTopology}. Note that the new topology is not guaranteed to begin processing on this client or + * any others until its addition has been completed by all instances of the application. + * * @throws IllegalArgumentException if this topology name is already in use * @throws IllegalStateException if streams has not been started or has already shut down * @throws TopologyException if this topology subscribes to any input topics or pattern already in use */ - public void addNamedTopology(final NamedTopology newTopology) { + public KafkaFuture addNamedTopology(final NamedTopology newTopology) { if (hasStartedOrFinishedShuttingDown()) { throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state); } else if (getTopologyByName(newTopology.name()).isPresent()) { throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() + " as another of the same name already exists"); } - topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder()); + return topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder()); } /** * Remove an existing NamedTopology from a running Kafka Streams app. If multiple instances of the application are - * running, you should inform all of them by calling {@link #removeNamedTopology(String)} on each client to ensure + * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure * it stops processing the old topology. * + * @return a {@link KafkaFuture} that completes successfully when all threads on this client have removed the + * corresponding {@link NamedTopology}. At this point no more of its tasks will be processed by the current client, + * but there may still be other clients which do. It is only guaranteed that this {@link NamedTopology} has fully + * stopped processing when all clients have successfully completed their corresponding {@link KafkaFuture}. + * * @throws IllegalArgumentException if this topology name cannot be found * @throws IllegalStateException if streams has not been started or has already shut down * @throws TopologyException if this topology subscribes to any input topics or pattern already in use */ - public void removeNamedTopology(final String topologyToRemove) { + public KafkaFuture removeNamedTopology(final String topologyToRemove) { if (!isRunningOrRebalancing()) { throw new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state); } else if (!getTopologyByName(topologyToRemove).isPresent()) { throw new IllegalArgumentException("Unable to locate for removal a NamedTopology called " + topologyToRemove); } - topologyMetadata.unregisterTopology(topologyToRemove); + return topologyMetadata.unregisterTopology(topologyToRemove); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TaskConfig.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TaskConfig.java new file mode 100644 index 0000000000000..6c813a4e57cad --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/TaskConfig.java @@ -0,0 +1,41 @@ +package org.apache.kafka.streams.processor.internals.namedtopology; + +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.DeserializationExceptionHandler; +import org.apache.kafka.streams.processor.TimestampExtractor; + +import java.util.Properties; +import java.util.function.Function; + +/** + * Streams configs that apply at the task level. At the moment they can be specified either via the configs passed in + * to the {@link org.apache.kafka.streams.KafkaStreams} or {@link KafkaStreamsNamedTopologyWrapper} constructor, or + * else by passing them in to {@link org.apache.kafka.streams.StreamsBuilder#build(Properties)} or + * {@link NamedTopologyStreamsBuilder#buildNamedTopology(Properties)} + */ +public class TaskConfig { + public final long maxTaskIdleMs; + public final long taskTimeoutMs; + public final int maxBufferedSize; + public final TimestampExtractor timestampExtractor; + public final DeserializationExceptionHandler deserializationExceptionHandler; + + public TaskConfig(final StreamsConfig applicationConfig, final StreamsConfig topologyConfig) { + maxTaskIdleMs = getConfig(applicationConfig, topologyConfig, config -> config.getLong(StreamsConfig.MAX_TASK_IDLE_MS_CONFIG)); + taskTimeoutMs = getConfig(applicationConfig, topologyConfig, config -> config.getLong(StreamsConfig.TASK_TIMEOUT_MS_CONFIG)); + maxBufferedSize = getConfig(applicationConfig, topologyConfig, config -> config.getInt(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG)); + timestampExtractor = getConfig(applicationConfig, topologyConfig, StreamsConfig::defaultTimestampExtractor); + deserializationExceptionHandler = getConfig(applicationConfig, topologyConfig, StreamsConfig::defaultDeserializationExceptionHandler); + } + + /** + * @return the value of this config passed in to the topology if it exists, otherwise default application-wide config + */ + static V getConfig(final StreamsConfig applicationConfig, + final StreamsConfig topologyConfig, + final Function configGetter) { + final V topologyOverride = configGetter.apply(topologyConfig); + final V appValue = configGetter.apply(applicationConfig); + return topologyOverride == null ? appValue : topologyOverride; + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java index 2b01feec691a1..ca2bfb150d548 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java @@ -73,7 +73,7 @@ public class NamedTopologyIntegrationTest { public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); - // TODO KAFKA-12648: + // TODO KAFKA-12648: Pt.4 // 1) full test coverage for add/removeNamedTopology, covering: // - the "last topology removed" case // - test using multiple clients, with standbys From 00429fd8160179309b7710a0ff0a2e58648f636e Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Sun, 19 Sep 2021 03:25:37 -0700 Subject: [PATCH 2/3] add test from Walker: PR #11258 --- .../NamedTopologyIntegrationTest.java | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java index ca2bfb150d548..6c4b4d2943ce0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java @@ -425,6 +425,34 @@ public void shouldAllowMixedCollectionAndPatternSubscriptionWithMultipleNamedTop assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, OUTPUT_STREAM_3, 3), equalTo(COUNT_OUTPUT_DATA)); } + @Test + public void shouldAddToEmptyInitialTopologyRemoveThenAddNewNamedTopology() throws Exception { + CLUSTER.createTopics(SUM_OUTPUT, COUNT_OUTPUT); + // Build up named topology with two stateful subtopologies + final KStream inputStream1 = topology1Builder.stream(INPUT_STREAM_1); + inputStream1.groupByKey().count().toStream().to(COUNT_OUTPUT); + inputStream1.groupByKey().reduce(Long::sum).toStream().to(SUM_OUTPUT); + streams = new KafkaStreamsNamedTopologyWrapper(props, clientSupplier); + streams.start(); + streams.addNamedTopology(topology1Builder.buildNamedTopology(props)); + + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA)); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), equalTo(SUM_OUTPUT_DATA)); + streams.removeNamedTopology("topology-1"); + streams.cleanUpNamedTopology("topology-1"); + + final KStream inputStream2 = topology1Builder2.stream(INPUT_STREAM_1); + inputStream2.groupByKey().count().toStream().to(COUNT_OUTPUT); + inputStream2.groupByKey().reduce(Long::sum).toStream().to(SUM_OUTPUT); + + produceToInputTopics(DELAYED_INPUT_STREAM_1, STANDARD_INPUT_DATA); + streams.addNamedTopology(topology1Builder2.buildNamedTopology(props)); + + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA)); + assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), equalTo(SUM_OUTPUT_DATA)); + CLUSTER.deleteTopics(SUM_OUTPUT, COUNT_OUTPUT); + } + private static void produceToInputTopics(final String topic, final Collection> records) { IntegrationTestUtils.produceKeyValuesSynchronously( topic, From b10ca76a534ec0528ca5df58984ebc2bf3db312e Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Wed, 20 Oct 2021 22:51:17 -0700 Subject: [PATCH 3/3] add 'resetOffsets' parameter --- .../apache/kafka/streams/KafkaStreams.java | 5 +- .../internals/InternalTopologyBuilder.java | 8 +++ .../processor/internals/TopologyMetadata.java | 4 ++ .../namedtopology/AddNamedTopologyResult.java | 37 ++++++++++ .../KafkaStreamsNamedTopologyWrapper.java | 35 +++++---- .../RemoveNamedTopologyResult.java | 71 +++++++++++++++++++ .../NamedTopologyIntegrationTest.java | 6 +- 7 files changed, 148 insertions(+), 18 deletions(-) create mode 100644 streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/AddNamedTopologyResult.java create mode 100644 streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/RemoveNamedTopologyResult.java diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index eee55fc091104..c724a80605ff3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -148,6 +148,7 @@ public class KafkaStreams implements AutoCloseable { private final Time time; private final Logger log; private final String clientId; + protected final String applicationId; private final Metrics metrics; private final StreamsConfig config; protected final List threads; @@ -155,7 +156,7 @@ public class KafkaStreams implements AutoCloseable { private final StreamsMetadataState streamsMetadataState; private final ScheduledExecutorService stateDirCleaner; private final ScheduledExecutorService rocksDBMetricsRecordingService; - private final Admin adminClient; + protected final Admin adminClient; private final StreamsMetricsImpl streamsMetrics; private final long totalCacheSize; private final StreamStateListener streamStateListener; @@ -854,7 +855,7 @@ private KafkaStreams(final TopologyMetadata topologyMetadata, // The application ID is a required config and hence should always have value final String userClientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG); - final String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG); + this.applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG); if (userClientId.length() <= 0) { clientId = applicationId + "-" + processId; } else { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index dd07c107e98a0..6708f7143c681 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -2107,6 +2107,14 @@ public synchronized List fullSourceTopicNames() { return new ArrayList<>(maybeDecorateInternalSourceTopics(sourceTopicNames)); } + /** + * @return a list of all topic partitions that have ever been consumed from, and possibly have committed offsets for + */ + public synchronized Set allSourceTopicPartitions() { + // TODO KAFKA-12648: Pt.4 + return new HashSet<>(); + } + /** * @return a copy of the string representation of any pattern subscribed source nodes */ diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java index b05d263df33a9..a6ae0cf4a37be 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java @@ -398,6 +398,10 @@ public Collection sourceTopicsForStore(final String storeName) { return sourceTopics; } + public Set allSourceTopicPartitionsForTopology(final String topologyName) { + return lookupBuilderForNamedTopology(topologyName).allSourceTopicPartitions(); + } + public Map topicGroups() { final Map topicGroups = new HashMap<>(); applyToEachBuilder(b -> topicGroups.putAll(b.topicGroups())); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/AddNamedTopologyResult.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/AddNamedTopologyResult.java new file mode 100644 index 0000000000000..09005145e6525 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/AddNamedTopologyResult.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.namedtopology; + +import org.apache.kafka.common.KafkaFuture; + +public class AddNamedTopologyResult { + + private final KafkaFuture addTopologyFuture; + + public AddNamedTopologyResult(final KafkaFuture addTopologyFuture) { + this.addTopologyFuture = addTopologyFuture; + } + + /** + * @return a {@link KafkaFuture} that completes successfully when all threads on this client have picked up the + * new {@link NamedTopology}. Note that the new topology is not guaranteed to begin processing on this client or + * any others until its addition has been completed by all instances of the application. + */ + public KafkaFuture all() { + return addTopologyFuture; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java index 71ad45a98b1b4..8fd3718e5e3bb 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java @@ -16,7 +16,9 @@ */ package org.apache.kafka.streams.processor.internals.namedtopology; +import org.apache.kafka.clients.admin.DeleteConsumerGroupOffsetsResult; import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.annotation.InterfaceStability.Unstable; import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.KafkaStreams; @@ -32,6 +34,7 @@ import java.util.Collections; import java.util.Optional; import java.util.Properties; +import java.util.Set; import java.util.concurrent.ConcurrentSkipListMap; import java.util.stream.Collectors; @@ -125,22 +128,20 @@ public Optional getTopologyByName(final String name) { * you should inform all of them by calling {@code #addNamedTopology(NamedTopology)} on each client in order for * it to begin processing the new topology. * - * @return a {@link KafkaFuture} that completes successfully when all threads on this client have picked up the - * new {@link NamedTopology}. Note that the new topology is not guaranteed to begin processing on this client or - * any others until its addition has been completed by all instances of the application. - * * @throws IllegalArgumentException if this topology name is already in use * @throws IllegalStateException if streams has not been started or has already shut down * @throws TopologyException if this topology subscribes to any input topics or pattern already in use */ - public KafkaFuture addNamedTopology(final NamedTopology newTopology) { + public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) { if (hasStartedOrFinishedShuttingDown()) { throw new IllegalStateException("Cannot add a NamedTopology while the state is " + super.state); } else if (getTopologyByName(newTopology.name()).isPresent()) { throw new IllegalArgumentException("Unable to add the new NamedTopology " + newTopology.name() + " as another of the same name already exists"); } - return topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder()); + return new AddNamedTopologyResult( + topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder()) + ); } /** @@ -148,23 +149,31 @@ public KafkaFuture addNamedTopology(final NamedTopology newTopology) { * running, you should inform all of them by calling {@code #removeNamedTopology(String)} on each client to ensure * it stops processing the old topology. * - * @return a {@link KafkaFuture} that completes successfully when all threads on this client have removed the - * corresponding {@link NamedTopology}. At this point no more of its tasks will be processed by the current client, - * but there may still be other clients which do. It is only guaranteed that this {@link NamedTopology} has fully - * stopped processing when all clients have successfully completed their corresponding {@link KafkaFuture}. + * @param topologyToRemove name of the topology to be removed + * @param resetOffsets whether to reset the committed offsets for any source topics * * @throws IllegalArgumentException if this topology name cannot be found * @throws IllegalStateException if streams has not been started or has already shut down * @throws TopologyException if this topology subscribes to any input topics or pattern already in use */ - public KafkaFuture removeNamedTopology(final String topologyToRemove) { + public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemove, final boolean resetOffsets) { if (!isRunningOrRebalancing()) { throw new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state); } else if (!getTopologyByName(topologyToRemove).isPresent()) { throw new IllegalArgumentException("Unable to locate for removal a NamedTopology called " + topologyToRemove); } - return topologyMetadata.unregisterTopology(topologyToRemove); + final KafkaFuture removeTopologyFuture = topologyMetadata.unregisterTopology(topologyToRemove); + + if (resetOffsets) { + final Set partitionsToReset = topologyMetadata.allSourceTopicPartitionsForTopology(topologyToRemove); + if (!partitionsToReset.isEmpty()) { + final DeleteConsumerGroupOffsetsResult deleteOffsetsResult = adminClient.deleteConsumerGroupOffsets(applicationId, partitionsToReset); + return new RemoveNamedTopologyResult(removeTopologyFuture, deleteOffsetsResult); + } + } + + return new RemoveNamedTopologyResult(removeTopologyFuture); } /** @@ -172,7 +181,7 @@ public KafkaFuture removeNamedTopology(final String topologyToRemove) { * @link StreamsConfig#APPLICATION_ID_CONFIG application ID} in the ({@link StreamsConfig#STATE_DIR_CONFIG}) *

* May be called while the Streams is in any state, but only on a {@link NamedTopology} that has already been - * removed via {@link #removeNamedTopology(String)}. + * removed via {@link #removeNamedTopology(String, boolean)}. *

* Calling this method triggers a restore of local {@link StateStore}s for this {@link NamedTopology} if it is * ever re-added via {@link #addNamedTopology(NamedTopology)}. diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/RemoveNamedTopologyResult.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/RemoveNamedTopologyResult.java new file mode 100644 index 0000000000000..a708782ed875e --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/RemoveNamedTopologyResult.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals.namedtopology; + +import org.apache.kafka.clients.admin.DeleteConsumerGroupOffsetsResult; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.internals.KafkaFutureImpl; + +public class RemoveNamedTopologyResult { + private final KafkaFuture removeTopologyFuture; + private final DeleteConsumerGroupOffsetsResult deleteOffsetsResult; + + public RemoveNamedTopologyResult(final KafkaFuture removeTopologyFuture, final DeleteConsumerGroupOffsetsResult deleteOffsetsResult) { + this.removeTopologyFuture = removeTopologyFuture; + this.deleteOffsetsResult = deleteOffsetsResult; + } + + public RemoveNamedTopologyResult(final KafkaFuture removeTopologyFuture) { + this(removeTopologyFuture, null); + } + + public KafkaFuture removeTopologyFuture() { + return removeTopologyFuture; + } + + public DeleteConsumerGroupOffsetsResult deleteOffsetsResult() { + return deleteOffsetsResult; + } + + /** + * @return a {@link KafkaFuture} that completes successfully when all threads on this client have removed the + * corresponding {@link NamedTopology} and all source topic offsets have been deleted (if applicable). At this + * point no more of its tasks will be processed by the current client, but there may still be other clients which + * do. It is only guaranteed that this {@link NamedTopology} has fully stopped processing when all clients have + * successfully completed their corresponding {@link KafkaFuture}. + */ + public final KafkaFuture all() { + final KafkaFutureImpl result = new KafkaFutureImpl<>(); + + if (deleteOffsetsResult != null) { + deleteOffsetsResult.all().whenComplete((ignore, throwable) -> { + if (throwable != null) { + result.completeExceptionally(throwable); + } + }); + } + + removeTopologyFuture.whenComplete((ignore, throwable) -> { + if (throwable != null) { + result.completeExceptionally(throwable); + } else { + result.complete(null); + } + }); + return result; + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java index 6c4b4d2943ce0..b755c4304b6fb 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java @@ -362,7 +362,7 @@ public void shouldRemoveOneNamedTopologyWhileAnotherContinuesProcessing() throws streams = new KafkaStreamsNamedTopologyWrapper(buildNamedTopologies(topology1Builder, topology2Builder), props, clientSupplier); IntegrationTestUtils.startApplicationAndWaitUntilRunning(singletonList(streams), Duration.ofSeconds(15)); - streams.removeNamedTopology("topology-2"); + streams.removeNamedTopology("topology-2", false); produceToInputTopics(DELAYED_INPUT_STREAM_1, STANDARD_INPUT_DATA); produceToInputTopics(DELAYED_INPUT_STREAM_2, STANDARD_INPUT_DATA); @@ -383,7 +383,7 @@ public void shouldRemoveAndReplaceTopologicallyIncompatibleNamedTopology() throw assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA)); assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), equalTo(SUM_OUTPUT_DATA)); - streams.removeNamedTopology("topology-1"); + streams.removeNamedTopology("topology-1", false); streams.cleanUpNamedTopology("topology-1"); // Prepare a new named topology with the same name but an incompatible topology (stateful subtopologies swap order) @@ -438,7 +438,7 @@ public void shouldAddToEmptyInitialTopologyRemoveThenAddNewNamedTopology() throw assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, COUNT_OUTPUT, 3), equalTo(COUNT_OUTPUT_DATA)); assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, SUM_OUTPUT, 3), equalTo(SUM_OUTPUT_DATA)); - streams.removeNamedTopology("topology-1"); + streams.removeNamedTopology("topology-1", false); streams.cleanUpNamedTopology("topology-1"); final KStream inputStream2 = topology1Builder2.stream(INPUT_STREAM_1);