diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java index fb005d1bde57d..6355cae2bc6bd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java @@ -99,8 +99,10 @@ public void start(final NamedTopology initialTopology) { /** * Start up Streams with a collection of initial NamedTopologies (may be empty) + * + * Note: this is synchronized to ensure that the application state cannot change while we add topologies */ - public void start(final Collection initialTopologies) { + public synchronized void start(final Collection initialTopologies) { log.info("Starting Streams with topologies: {}", initialTopologies); for (final NamedTopology topology : initialTopologies) { final AddNamedTopologyResult addNamedTopologyResult = addNamedTopology(topology); @@ -145,7 +147,7 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) { /** * @return the NamedTopology for the specific name, or Optional.empty() if the application has no NamedTopology of that name */ - public Optional getTopologyByName(final String name) { + public synchronized Optional getTopologyByName(final String name) { return Optional.ofNullable(topologyMetadata.lookupBuilderForNamedTopology(name)).map(InternalTopologyBuilder::namedTopology); } @@ -180,7 +182,9 @@ public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) ); } else { topologyMetadata.registerAndBuildNewTopology(future, newTopology.internalTopologyBuilder()); + maybeCompleteFutureIfStillInCREATED(future, "adding topology " + newTopology.name()); } + return new AddNamedTopologyResult(future); } @@ -205,7 +209,8 @@ public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemo if (hasStartedOrFinishedShuttingDown()) { log.error("Attempted to remove topology {} from while the Kafka Streams was in state {}, " - + "application must be started first.", topologyToRemove, state + + "topologies cannot be modified if the application has begun or completed shutting down.", + topologyToRemove, state ); removeTopologyFuture.completeExceptionally( new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state) @@ -218,6 +223,7 @@ public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemo new UnknownTopologyException("Unable to remove topology", topologyToRemove) ); } + final Set partitionsToReset = metadataForLocalThreads() .stream() .flatMap(t -> { @@ -230,53 +236,76 @@ public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemo topologyMetadata.unregisterTopology(removeTopologyFuture, topologyToRemove); - if (resetOffsets) { + final boolean skipResetForUnstartedApplication = + maybeCompleteFutureIfStillInCREATED(removeTopologyFuture, "removing topology " + topologyToRemove); + + if (resetOffsets && !skipResetForUnstartedApplication) { log.info("Resetting offsets for the following partitions of {} removed NamedTopology {}: {}", removeTopologyFuture.isCompletedExceptionally() ? "unsuccessfully" : "successfully", topologyToRemove, partitionsToReset ); - if (!partitionsToReset.isEmpty()) { - removeTopologyFuture.whenComplete((v, throwable) -> { - if (throwable != null) { - removeTopologyFuture.completeExceptionally(throwable); - } - DeleteConsumerGroupOffsetsResult deleteOffsetsResult = null; - while (deleteOffsetsResult == null) { - try { - deleteOffsetsResult = adminClient.deleteConsumerGroupOffsets( - applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), partitionsToReset); - deleteOffsetsResult.all().get(); - } catch (final InterruptedException ex) { + return resetOffsets(removeTopologyFuture, partitionsToReset); + } else { + return new RemoveNamedTopologyResult(removeTopologyFuture); + } + } + + /** + * @return true iff the application is still in CREATED and the future was completed + */ + private boolean maybeCompleteFutureIfStillInCREATED(final KafkaFutureImpl updateTopologyFuture, + final String operation) { + if (state == State.CREATED && !updateTopologyFuture.isDone()) { + updateTopologyFuture.complete(null); + log.info("Completed {} since application has not been started", operation); + return true; + } else { + return false; + } + } + + private RemoveNamedTopologyResult resetOffsets(final KafkaFutureImpl removeTopologyFuture, + final Set partitionsToReset) { + if (!partitionsToReset.isEmpty()) { + removeTopologyFuture.whenComplete((v, throwable) -> { + if (throwable != null) { + removeTopologyFuture.completeExceptionally(throwable); + } + DeleteConsumerGroupOffsetsResult deleteOffsetsResult = null; + while (deleteOffsetsResult == null) { + try { + deleteOffsetsResult = adminClient.deleteConsumerGroupOffsets( + applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), partitionsToReset); + deleteOffsetsResult.all().get(); + } catch (final InterruptedException ex) { + ex.printStackTrace(); + break; + } catch (final ExecutionException ex) { + if (ex.getCause() != null && + ex.getCause() instanceof GroupSubscribedToTopicException && + ex.getCause() + .getMessage() + .equals("Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it.")) { ex.printStackTrace(); + } else if (ex.getCause() != null && + ex.getCause() instanceof GroupIdNotFoundException) { + log.debug("The offsets have been reset by another client or the group has been deleted, no need to retry further."); break; - } catch (final ExecutionException ex) { - if (ex.getCause() != null && - ex.getCause() instanceof GroupSubscribedToTopicException && - ex.getCause() - .getMessage() - .equals("Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it.")) { - ex.printStackTrace(); - } else if (ex.getCause() != null && - ex.getCause() instanceof GroupIdNotFoundException) { - log.debug("The offsets have been reset by another client or the group has been deleted, no need to retry further."); - break; - } else { - removeTopologyFuture.completeExceptionally(ex); - } - deleteOffsetsResult = null; - } - try { - Thread.sleep(100); - } catch (final InterruptedException ex) { - ex.printStackTrace(); + } else { + removeTopologyFuture.completeExceptionally(ex); } + deleteOffsetsResult = null; } - removeTopologyFuture.complete(null); - }); - return new RemoveNamedTopologyResult(removeTopologyFuture, removeTopologyFuture); - } + try { + Thread.sleep(100); + } catch (final InterruptedException ex) { + ex.printStackTrace(); + } + } + removeTopologyFuture.complete(null); + }); } - return new RemoveNamedTopologyResult(removeTopologyFuture); + return new RemoveNamedTopologyResult(removeTopologyFuture, removeTopologyFuture); } /** diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java index 29e7e6a7a6633..65f5a9d82771b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java @@ -124,6 +124,17 @@ public void shouldAllowSameStoreNameToBeUsedByMultipleNamedTopologies() { streams.start(asList(builder1.build(), builder2.build())); } + @Test + public void shouldAllowAddingAndRemovingNamedTopologyAndReturnBeforeCallingStart() throws Exception { + builder1.stream("stream-1").selectKey((k, v) -> v).groupByKey().count(Materialized.as(Stores.inMemoryKeyValueStore("store"))); + builder2.stream("stream-2").selectKey((k, v) -> v).groupByKey().count(Materialized.as(Stores.inMemoryKeyValueStore("store"))); + + streams.addNamedTopology(builder1.build()).all().get(); + streams.addNamedTopology(builder2.build()).all().get(); + + streams.removeNamedTopology("topology-2").all().get(); + } + @Test public void shouldThrowTopologyExceptionWhenMultipleNamedTopologiesCreateStreamFromSameInputTopic() { builder1.stream("stream"); @@ -138,7 +149,7 @@ public void shouldThrowTopologyExceptionWhenMultipleNamedTopologiesCreateStreamF } @Test - public void shouldThrowTopologyExceptionWhenAddingNamedTopologyReadingFromSameInputTopic() { + public void shouldThrowTopologyExceptionWhenAddingNamedTopologyReadingFromSameInputTopicAfterStart() { builder1.stream("stream"); builder2.stream("stream"); @@ -154,6 +165,21 @@ public void shouldThrowTopologyExceptionWhenAddingNamedTopologyReadingFromSameIn assertThat(exception.getCause().getClass(), equalTo(TopologyException.class)); } + @Test + public void shouldThrowTopologyExceptionWhenAddingNamedTopologyReadingFromSameInputTopicBeforeStart() { + builder1.stream("stream"); + builder2.stream("stream"); + + streams.addNamedTopology(builder1.build()); + + final ExecutionException exception = assertThrows( + ExecutionException.class, + () -> streams.addNamedTopology(builder2.build()).all().get() + ); + + assertThat(exception.getCause().getClass(), equalTo(TopologyException.class)); + } + @Test public void shouldThrowTopologyExceptionWhenMultipleNamedTopologiesCreateTableFromSameInputTopic() { builder1.table("table");