-
Notifications
You must be signed in to change notification settings - Fork 15.1k
KAFKA-12648: fix #add/removeNamedTopology blocking behavior when app is in CREATED #11813
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -99,8 +99,10 @@ public void start(final NamedTopology initialTopology) { | |
|
|
||
| /** | ||
| * Start up Streams with a collection of initial NamedTopologies (may be empty) | ||
| * | ||
| * Note: this is synchronized to ensure that the application state cannot change while we add topologies | ||
| */ | ||
| public void start(final Collection<NamedTopology> initialTopologies) { | ||
| public synchronized void start(final Collection<NamedTopology> initialTopologies) { | ||
| log.info("Starting Streams with topologies: {}", initialTopologies); | ||
| for (final NamedTopology topology : initialTopologies) { | ||
| final AddNamedTopologyResult addNamedTopologyResult = addNamedTopology(topology); | ||
|
|
@@ -145,7 +147,7 @@ public NamedTopologyBuilder newNamedTopologyBuilder(final String topologyName) { | |
| /** | ||
| * @return the NamedTopology for the specific name, or Optional.empty() if the application has no NamedTopology of that name | ||
| */ | ||
| public Optional<NamedTopology> getTopologyByName(final String name) { | ||
| public synchronized Optional<NamedTopology> getTopologyByName(final String name) { | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should make sure this is thread safe since it's how we check to make sure a name isn't already used when trying to add a new topology |
||
| return Optional.ofNullable(topologyMetadata.lookupBuilderForNamedTopology(name)).map(InternalTopologyBuilder::namedTopology); | ||
| } | ||
|
|
||
|
|
@@ -180,7 +182,9 @@ public AddNamedTopologyResult addNamedTopology(final NamedTopology newTopology) | |
| ); | ||
| } else { | ||
| topologyMetadata.registerAndBuildNewTopology(future, newTopology.internalTopologyBuilder()); | ||
| maybeCompleteFutureIfStillInCREATED(future, "adding topology " + newTopology.name()); | ||
| } | ||
|
|
||
| return new AddNamedTopologyResult(future); | ||
| } | ||
|
|
||
|
|
@@ -205,7 +209,8 @@ public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemo | |
|
|
||
| if (hasStartedOrFinishedShuttingDown()) { | ||
| log.error("Attempted to remove topology {} from while the Kafka Streams was in state {}, " | ||
| + "application must be started first.", topologyToRemove, state | ||
| + "topologies cannot be modified if the application has begun or completed shutting down.", | ||
| topologyToRemove, state | ||
| ); | ||
| removeTopologyFuture.completeExceptionally( | ||
| new IllegalStateException("Cannot remove a NamedTopology while the state is " + super.state) | ||
|
|
@@ -218,6 +223,7 @@ public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemo | |
| new UnknownTopologyException("Unable to remove topology", topologyToRemove) | ||
| ); | ||
| } | ||
|
|
||
| final Set<TopicPartition> partitionsToReset = metadataForLocalThreads() | ||
| .stream() | ||
| .flatMap(t -> { | ||
|
|
@@ -230,53 +236,76 @@ public RemoveNamedTopologyResult removeNamedTopology(final String topologyToRemo | |
|
|
||
| topologyMetadata.unregisterTopology(removeTopologyFuture, topologyToRemove); | ||
|
|
||
| if (resetOffsets) { | ||
| final boolean skipResetForUnstartedApplication = | ||
| maybeCompleteFutureIfStillInCREATED(removeTopologyFuture, "removing topology " + topologyToRemove); | ||
|
|
||
| if (resetOffsets && !skipResetForUnstartedApplication) { | ||
| log.info("Resetting offsets for the following partitions of {} removed NamedTopology {}: {}", | ||
| removeTopologyFuture.isCompletedExceptionally() ? "unsuccessfully" : "successfully", | ||
| topologyToRemove, partitionsToReset | ||
| ); | ||
| if (!partitionsToReset.isEmpty()) { | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The offset reset code is pretty long so I pulled it out into its own method to clean things up a bit |
||
| removeTopologyFuture.whenComplete((v, throwable) -> { | ||
| if (throwable != null) { | ||
| removeTopologyFuture.completeExceptionally(throwable); | ||
| } | ||
| DeleteConsumerGroupOffsetsResult deleteOffsetsResult = null; | ||
| while (deleteOffsetsResult == null) { | ||
| try { | ||
| deleteOffsetsResult = adminClient.deleteConsumerGroupOffsets( | ||
| applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), partitionsToReset); | ||
| deleteOffsetsResult.all().get(); | ||
| } catch (final InterruptedException ex) { | ||
| return resetOffsets(removeTopologyFuture, partitionsToReset); | ||
| } else { | ||
| return new RemoveNamedTopologyResult(removeTopologyFuture); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * @return true iff the application is still in CREATED and the future was completed | ||
| */ | ||
| private boolean maybeCompleteFutureIfStillInCREATED(final KafkaFutureImpl<Void> updateTopologyFuture, | ||
| final String operation) { | ||
| if (state == State.CREATED && !updateTopologyFuture.isDone()) { | ||
| updateTopologyFuture.complete(null); | ||
| log.info("Completed {} since application has not been started", operation); | ||
| return true; | ||
| } else { | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| private RemoveNamedTopologyResult resetOffsets(final KafkaFutureImpl<Void> removeTopologyFuture, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm assuming this is just extracting the inlined function and hence skipped and did not compare line by line :)
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yep -- just direct copy/paste |
||
| final Set<TopicPartition> partitionsToReset) { | ||
| if (!partitionsToReset.isEmpty()) { | ||
| removeTopologyFuture.whenComplete((v, throwable) -> { | ||
| if (throwable != null) { | ||
| removeTopologyFuture.completeExceptionally(throwable); | ||
| } | ||
| DeleteConsumerGroupOffsetsResult deleteOffsetsResult = null; | ||
| while (deleteOffsetsResult == null) { | ||
| try { | ||
| deleteOffsetsResult = adminClient.deleteConsumerGroupOffsets( | ||
| applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), partitionsToReset); | ||
| deleteOffsetsResult.all().get(); | ||
| } catch (final InterruptedException ex) { | ||
| ex.printStackTrace(); | ||
| break; | ||
| } catch (final ExecutionException ex) { | ||
| if (ex.getCause() != null && | ||
| ex.getCause() instanceof GroupSubscribedToTopicException && | ||
| ex.getCause() | ||
| .getMessage() | ||
| .equals("Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it.")) { | ||
| ex.printStackTrace(); | ||
| } else if (ex.getCause() != null && | ||
| ex.getCause() instanceof GroupIdNotFoundException) { | ||
| log.debug("The offsets have been reset by another client or the group has been deleted, no need to retry further."); | ||
| break; | ||
| } catch (final ExecutionException ex) { | ||
| if (ex.getCause() != null && | ||
| ex.getCause() instanceof GroupSubscribedToTopicException && | ||
| ex.getCause() | ||
| .getMessage() | ||
| .equals("Deleting offsets of a topic is forbidden while the consumer group is actively subscribed to it.")) { | ||
| ex.printStackTrace(); | ||
| } else if (ex.getCause() != null && | ||
| ex.getCause() instanceof GroupIdNotFoundException) { | ||
| log.debug("The offsets have been reset by another client or the group has been deleted, no need to retry further."); | ||
| break; | ||
| } else { | ||
| removeTopologyFuture.completeExceptionally(ex); | ||
| } | ||
| deleteOffsetsResult = null; | ||
| } | ||
| try { | ||
| Thread.sleep(100); | ||
| } catch (final InterruptedException ex) { | ||
| ex.printStackTrace(); | ||
| } else { | ||
| removeTopologyFuture.completeExceptionally(ex); | ||
| } | ||
| deleteOffsetsResult = null; | ||
| } | ||
| removeTopologyFuture.complete(null); | ||
| }); | ||
| return new RemoveNamedTopologyResult(removeTopologyFuture, removeTopologyFuture); | ||
| } | ||
| try { | ||
| Thread.sleep(100); | ||
| } catch (final InterruptedException ex) { | ||
| ex.printStackTrace(); | ||
| } | ||
| } | ||
| removeTopologyFuture.complete(null); | ||
| }); | ||
| } | ||
| return new RemoveNamedTopologyResult(removeTopologyFuture); | ||
| return new RemoveNamedTopologyResult(removeTopologyFuture, removeTopologyFuture); | ||
| } | ||
|
|
||
| /** | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -124,6 +124,17 @@ public void shouldAllowSameStoreNameToBeUsedByMultipleNamedTopologies() { | |
| streams.start(asList(builder1.build(), builder2.build())); | ||
| } | ||
|
|
||
| @Test | ||
| public void shouldAllowAddingAndRemovingNamedTopologyAndReturnBeforeCallingStart() throws Exception { | ||
|
ableegoldman marked this conversation as resolved.
|
||
| builder1.stream("stream-1").selectKey((k, v) -> v).groupByKey().count(Materialized.as(Stores.inMemoryKeyValueStore("store"))); | ||
| builder2.stream("stream-2").selectKey((k, v) -> v).groupByKey().count(Materialized.as(Stores.inMemoryKeyValueStore("store"))); | ||
|
|
||
| streams.addNamedTopology(builder1.build()).all().get(); | ||
| streams.addNamedTopology(builder2.build()).all().get(); | ||
|
|
||
| streams.removeNamedTopology("topology-2").all().get(); | ||
| } | ||
|
|
||
| @Test | ||
| public void shouldThrowTopologyExceptionWhenMultipleNamedTopologiesCreateStreamFromSameInputTopic() { | ||
| builder1.stream("stream"); | ||
|
|
@@ -138,7 +149,7 @@ public void shouldThrowTopologyExceptionWhenMultipleNamedTopologiesCreateStreamF | |
| } | ||
|
|
||
| @Test | ||
| public void shouldThrowTopologyExceptionWhenAddingNamedTopologyReadingFromSameInputTopic() { | ||
| public void shouldThrowTopologyExceptionWhenAddingNamedTopologyReadingFromSameInputTopicAfterStart() { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: After start?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you elaborate? I did add a "AfterStart" suffix here -- is that what you meant? |
||
| builder1.stream("stream"); | ||
| builder2.stream("stream"); | ||
|
|
||
|
|
@@ -154,6 +165,21 @@ public void shouldThrowTopologyExceptionWhenAddingNamedTopologyReadingFromSameIn | |
| assertThat(exception.getCause().getClass(), equalTo(TopologyException.class)); | ||
| } | ||
|
|
||
| @Test | ||
| public void shouldThrowTopologyExceptionWhenAddingNamedTopologyReadingFromSameInputTopicBeforeStart() { | ||
| builder1.stream("stream"); | ||
| builder2.stream("stream"); | ||
|
|
||
| streams.addNamedTopology(builder1.build()); | ||
|
|
||
| final ExecutionException exception = assertThrows( | ||
| ExecutionException.class, | ||
| () -> streams.addNamedTopology(builder2.build()).all().get() | ||
| ); | ||
|
|
||
| assertThat(exception.getCause().getClass(), equalTo(TopologyException.class)); | ||
| } | ||
|
|
||
| @Test | ||
| public void shouldThrowTopologyExceptionWhenMultipleNamedTopologiesCreateTableFromSameInputTopic() { | ||
| builder1.table("table"); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
super.start()is already synchronized but we should just go ahead and synchronize at the first layerThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I took some time to understand why we want to synchronize here, as at the first sight it looks a bit unclear to me:
/* means caller -> callee */
inherited.start: synchronized, public ->
addNamedTopology: unsynchronized, public, register topology metadata ->
completedFutureForUnstartedApp: synchronized, private, check state
removeNamedTopology: unsynchronized, public, unregister metadata topology ->
completedFutureForUnstartedApp: synchronized, private, check state
Register/unregister topology metadata is synchronized, and parent.start would modify state.
I think I understand now that it's because
addNamedTopologyis not synchronized, plus when we have multiple named topology we want to keep the state unchanged while adding them one-by-one. Is that the case? If yes maybe it's better to add such reasoning in the javadoc above.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack, will do