From af2a57c2694d47180b5f3a963b622556bf6ee2e4 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Fri, 25 Feb 2022 21:17:24 -0800 Subject: [PATCH 1/5] fix blocking behavior, add tests --- .../KafkaStreamsNamedTopologyWrapper.java | 102 +++++++++++------- .../internals/NamedTopologyTest.java | 11 ++ 2 files changed, 73 insertions(+), 40 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java index fb005d1bde57d..7c25341c50915 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java @@ -100,7 +100,7 @@ public void start(final NamedTopology initialTopology) { /** * Start up Streams with a collection of initial NamedTopologies (may be empty) */ - public void start(final Collection initialTopologies) { + public synchronized void start(final Collection initialTopologies) { log.info("Starting Streams with topologies: {}", initialTopologies); for (final NamedTopology topology : initialTopologies) { final AddNamedTopologyResult addNamedTopologyResult = addNamedTopology(topology); @@ -145,7 +145,7 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) { /** * @return the NamedTopology for the specific name, or Optional.empty() if the application has no NamedTopology of that name */ - public Optional getTopologyByName(final String name) { + public synchronized Optional getTopologyByName(final String name) { return Optional.ofNullable(topologyMetadata.lookupBuilderForNamedTopology(name)).map(InternalTopologyBuilder::namedTopology); } @@ -180,7 +180,9 @@ public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) ); } else { topologyMetadata.registerAndBuildNewTopology(future, newTopology.internalTopologyBuilder()); + completedFutureForUnstartedApp(future, "adding topology"); } + return new AddNamedTopologyResult(future); } @@ -218,6 +220,7 @@ public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemo new UnknownTopologyException("Unable to remove topology", topologyToRemove) ); } + final Set partitionsToReset = metadataForLocalThreads() .stream() .flatMap(t -> { @@ -230,53 +233,72 @@ public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemo topologyMetadata.unregisterTopology(removeTopologyFuture, topologyToRemove); - if (resetOffsets) { + if (!completedFutureForUnstartedApp(removeTopologyFuture, "removing topology") && resetOffsets) { log.info("Resetting offsets for the following partitions of {} removed NamedTopology {}: {}", removeTopologyFuture.isCompletedExceptionally() ? "unsuccessfully" : "successfully", topologyToRemove, partitionsToReset ); - if (!partitionsToReset.isEmpty()) { - removeTopologyFuture.whenComplete((v, throwable) -> { - if (throwable != null) { - removeTopologyFuture.completeExceptionally(throwable); - } - DeleteConsumerGroupOffsetsResult deleteOffsetsResult = null; - while (deleteOffsetsResult == null) { - try { - deleteOffsetsResult = adminClient.deleteConsumerGroupOffsets( - applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), partitionsToReset); - deleteOffsetsResult.all().get(); - } catch (final InterruptedException ex) { + resetOffsets(removeTopologyFuture, partitionsToReset); + } + return new RemoveNamedTopologyResult(removeTopologyFuture); + } + + /** + * @return true iff the application is still in CREATED and the future was completed + */ + private synchronized boolean completedFutureForUnstartedApp(final KafkaFutureImpl updateTopologyFuture, + final String operation) { + if (state == State.CREATED && !updateTopologyFuture.isDone()) { + updateTopologyFuture.complete(null); + log.info("Completed {} since application has not been started", operation); + return true; + } else { + return false; + } + } + + private RemoveNamedTopologyResult resetOffsets(final KafkaFutureImpl removeTopologyFuture, + Set partitionsToReset) { + if (!partitionsToReset.isEmpty()) { + removeTopologyFuture.whenComplete((v, throwable) -> { + if (throwable != null) { + removeTopologyFuture.completeExceptionally(throwable); + } + DeleteConsumerGroupOffsetsResult deleteOffsetsResult = null; + while (deleteOffsetsResult == null) { + try { + deleteOffsetsResult = adminClient.deleteConsumerGroupOffsets( + applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), partitionsToReset); + deleteOffsetsResult.all().get(); + } catch (final InterruptedException ex) { + ex.printStackTrace(); + break; + } catch (final ExecutionException ex) { + if (ex.getCause() != null && + ex.getCause() instanceof GroupSubscribedToTopicException && + ex.getCause() + .getMessage() + .equals("Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it.")) { ex.printStackTrace(); + } else if (ex.getCause() != null && + ex.getCause() instanceof GroupIdNotFoundException) { + log.debug("The offsets have been reset by another client or the group has been deleted, no need to retry further."); break; - } catch (final ExecutionException ex) { - if (ex.getCause() != null && - ex.getCause() instanceof GroupSubscribedToTopicException && - ex.getCause() - .getMessage() - .equals("Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it.")) { - ex.printStackTrace(); - } else if (ex.getCause() != null && - ex.getCause() instanceof GroupIdNotFoundException) { - log.debug("The offsets have been reset by another client or the group has been deleted, no need to retry further."); - break; - } else { - removeTopologyFuture.completeExceptionally(ex); - } - deleteOffsetsResult = null; - } - try { - Thread.sleep(100); - } catch (final InterruptedException ex) { - ex.printStackTrace(); + } else { + removeTopologyFuture.completeExceptionally(ex); } + deleteOffsetsResult = null; } - removeTopologyFuture.complete(null); - }); - return new RemoveNamedTopologyResult(removeTopologyFuture, removeTopologyFuture); - } + try { + Thread.sleep(100); + } catch (final InterruptedException ex) { + ex.printStackTrace(); + } + } + removeTopologyFuture.complete(null); + }); } - return new RemoveNamedTopologyResult(removeTopologyFuture); + return new RemoveNamedTopologyResult(removeTopologyFuture, removeTopologyFuture); } /** diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java index 29e7e6a7a6633..790972019a1d8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java @@ -124,6 +124,17 @@ public void shouldAllowSameStoreNameToBeUsedByMultipleNamedTopologies() { streams.start(asList(builder1.build(), builder2.build())); } + @Test + public void shouldAllowAddingAndRemovingNamedTopologyAndReturnBeforeCallingStart() throws Exception { + builder1.stream("stream-1").selectKey((k, v) -> v).groupByKey().count(Materialized.as(Stores.inMemoryKeyValueStore("store"))); + builder2.stream("stream-2").selectKey((k, v) -> v).groupByKey().count(Materialized.as(Stores.inMemoryKeyValueStore("store"))); + + streams.addNamedTopology(builder1.build()).all().get(); + streams.addNamedTopology(builder2.build()).all().get(); + + streams.removeNamedTopology("topology-2").all().get(); + } + @Test public void shouldThrowTopologyExceptionWhenMultipleNamedTopologiesCreateStreamFromSameInputTopic() { builder1.stream("stream"); From 2a6127fb2eca45f0856566113816de479d29b80a Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Fri, 25 Feb 2022 21:19:04 -0800 Subject: [PATCH 2/5] checkstyle --- .../namedtopology/KafkaStreamsNamedTopologyWrapper.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java index 7c25341c50915..8242a20d3aaa4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java @@ -258,7 +258,7 @@ private synchronized boolean completedFutureForUnstartedApp(final KafkaFutureImp } private RemoveNamedTopologyResult resetOffsets(final KafkaFutureImpl removeTopologyFuture, - Set partitionsToReset) { + final Set partitionsToReset) { if (!partitionsToReset.isEmpty()) { removeTopologyFuture.whenComplete((v, throwable) -> { if (throwable != null) { From 36805cf6d72af995bfd18e9478ef19c494f226c3 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Mon, 28 Feb 2022 11:39:43 -0800 Subject: [PATCH 3/5] add unit test --- .../processor/internals/NamedTopologyTest.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java index 790972019a1d8..65f5a9d82771b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/NamedTopologyTest.java @@ -149,7 +149,7 @@ public void shouldThrowTopologyExceptionWhenMultipleNamedTopologiesCreateStreamF } @Test - public void shouldThrowTopologyExceptionWhenAddingNamedTopologyReadingFromSameInputTopic() { + public void shouldThrowTopologyExceptionWhenAddingNamedTopologyReadingFromSameInputTopicAfterStart() { builder1.stream("stream"); builder2.stream("stream"); @@ -165,6 +165,21 @@ public void shouldThrowTopologyExceptionWhenAddingNamedTopologyReadingFromSameIn assertThat(exception.getCause().getClass(), equalTo(TopologyException.class)); } + @Test + public void shouldThrowTopologyExceptionWhenAddingNamedTopologyReadingFromSameInputTopicBeforeStart() { + builder1.stream("stream"); + builder2.stream("stream"); + + streams.addNamedTopology(builder1.build()); + + final ExecutionException exception = assertThrows( + ExecutionException.class, + () -> streams.addNamedTopology(builder2.build()).all().get() + ); + + assertThat(exception.getCause().getClass(), equalTo(TopologyException.class)); + } + @Test public void shouldThrowTopologyExceptionWhenMultipleNamedTopologiesCreateTableFromSameInputTopic() { builder1.table("table"); From c91ad0319b7c2ee7444ac26ad6c5344a1df750a8 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Mon, 28 Feb 2022 15:52:18 -0800 Subject: [PATCH 4/5] review feedback --- .../namedtopology/KafkaStreamsNamedTopologyWrapper.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java index 8242a20d3aaa4..c15ae4eebb692 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java @@ -99,6 +99,8 @@ public void start(final NamedTopology initialTopology) { /** * Start up Streams with a collection of initial NamedTopologies (may be empty) + * + * Note: this is synchronized to ensure that the application state cannot change while we add topologies */ public synchronized void start(final Collection initialTopologies) { log.info("Starting Streams with topologies: {}", initialTopologies); @@ -233,14 +235,15 @@ public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemo topologyMetadata.unregisterTopology(removeTopologyFuture, topologyToRemove); - if (!completedFutureForUnstartedApp(removeTopologyFuture, "removing topology") && resetOffsets) { + if (resetOffsets && !completedFutureForUnstartedApp(removeTopologyFuture, "removing topology")) { log.info("Resetting offsets for the following partitions of {} removed NamedTopology {}: {}", removeTopologyFuture.isCompletedExceptionally() ? "unsuccessfully" : "successfully", topologyToRemove, partitionsToReset ); - resetOffsets(removeTopologyFuture, partitionsToReset); + return resetOffsets(removeTopologyFuture, partitionsToReset); + } else { + return new RemoveNamedTopologyResult(removeTopologyFuture); } - return new RemoveNamedTopologyResult(removeTopologyFuture); } /** From d7d671631b5088d0ffb89966f71963182643e187 Mon Sep 17 00:00:00 2001 From: "A. Sophie Blee-Goldman" Date: Wed, 2 Mar 2022 00:41:07 -0800 Subject: [PATCH 5/5] remove synchronization and extract CREATED check --- .../KafkaStreamsNamedTopologyWrapper.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java index c15ae4eebb692..6355cae2bc6bd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java @@ -182,7 +182,7 @@ public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) ); } else { topologyMetadata.registerAndBuildNewTopology(future, newTopology.internalTopologyBuilder()); - completedFutureForUnstartedApp(future, "adding topology"); + maybeCompleteFutureIfStillInCREATED(future, "adding topology " + newTopology.name()); } return new AddNamedTopologyResult(future); @@ -209,7 +209,8 @@ public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemo if (hasStartedOrFinishedShuttingDown()) { log.error("Attempted to remove topology {} from while the Kafka Streams was in state {}, " - + "application must be started first.", topologyToRemove, state + + "topologies cannot be modified if the application has begun or completed shutting down.", + topologyToRemove, state ); removeTopologyFuture.completeExceptionally( new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state) @@ -235,7 +236,10 @@ public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemo topologyMetadata.unregisterTopology(removeTopologyFuture, topologyToRemove); - if (resetOffsets && !completedFutureForUnstartedApp(removeTopologyFuture, "removing topology")) { + final boolean skipResetForUnstartedApplication = + maybeCompleteFutureIfStillInCREATED(removeTopologyFuture, "removing topology " + topologyToRemove); + + if (resetOffsets && !skipResetForUnstartedApplication) { log.info("Resetting offsets for the following partitions of {} removed NamedTopology {}: {}", removeTopologyFuture.isCompletedExceptionally() ? "unsuccessfully" : "successfully", topologyToRemove, partitionsToReset @@ -249,8 +253,8 @@ public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemo /** * @return true iff the application is still in CREATED and the future was completed */ - private synchronized boolean completedFutureForUnstartedApp(final KafkaFutureImpl updateTopologyFuture, - final String operation) { + private boolean maybeCompleteFutureIfStillInCREATED(final KafkaFutureImpl updateTopologyFuture, + final String operation) { if (state == State.CREATED && !updateTopologyFuture.isDone()) { updateTopologyFuture.complete(null); log.info("Completed {} since application has not been started", operation); @@ -301,7 +305,7 @@ private RemoveNamedTopologyResult resetOffsets(final KafkaFutureImpl remov removeTopologyFuture.complete(null); }); } - return new RemoveNamedTopologyResult(removeTopologyFuture, removeTopologyFuture); + return new RemoveNamedTopologyResult(removeTopologyFuture, removeTopologyFuture); } /**