KAFKA-12648: fix #add/removeNamedTopology blocking behavior when app is in CREATED#11813
Conversation
| * Start up Streams with a collection of initial NamedTopologies (may be empty) | ||
| */ | ||
| public void start(final Collection<NamedTopology> initialTopologies) { | ||
| public synchronized void start(final Collection<NamedTopology> initialTopologies) { |
There was a problem hiding this comment.
super.start() is already synchronized but we should just go ahead and synchronize at the first layer
There was a problem hiding this comment.
I took some time to understand why we want to synchronize here, as at the first sight it looks a bit unclear to me:
/* means caller -> callee */
inherited.start: synchronized, public ->
addNamedTopology: unsynchronized, public, register topology metadata ->
completedFutureForUnstartedApp: synchronized, private, check state
removeNamedTopology: unsynchronized, public, unregister metadata topology ->
completedFutureForUnstartedApp: synchronized, private, check state
Register/unregister topology metadata is synchronized, and parent.start would modify state.
I think I understand now that it's because addNamedTopology is not synchronized, plus when we have multiple named topology we want to keep the state unchanged while adding them one-by-one. Is that the case? If yes maybe it's better to add such reasoning in the javadoc above.
| * @return the NamedTopology for the specific name, or Optional.empty() if the application has no NamedTopology of that name | ||
| */ | ||
| public Optional<NamedTopology> getTopologyByName(final String name) { | ||
| public synchronized Optional<NamedTopology> getTopologyByName(final String name) { |
There was a problem hiding this comment.
Should make sure this is thread safe since it's how we check to make sure a name isn't already used when trying to add a new topology
| removeTopologyFuture.isCompletedExceptionally() ? "unsuccessfully" : "successfully", | ||
| topologyToRemove, partitionsToReset | ||
| ); | ||
| if (!partitionsToReset.isEmpty()) { |
There was a problem hiding this comment.
The offset reset code is pretty long so I pulled it out into its own method to clean things up a bit
| /** | ||
| * @return true iff the application is still in CREATED and the future was completed | ||
| */ | ||
| private synchronized boolean completedFutureForUnstartedApp(final KafkaFutureImpl<Void> updateTopologyFuture, |
There was a problem hiding this comment.
This is the main fix
wcarlson5
left a comment
There was a problem hiding this comment.
I have some concerns about this behavior. I understand why we want to complete the future before we have started the streams application. However I am not sure that is the correct decision. If we take the future to be that the topology is processing then it actually makes sense for the future to not return until the streams has been started. And the user should not call get on it before then.
Maybe we can have a third part of Add/Remove topology result. We can have a is done adding, is processing and for Removing topology a future for resetting the offsets. I think this is a reasonable compromise, as it might not be possible to get all the check in as actually running the topology in the registration. However getting the future of a topology was added before the streams client was started and waiting for that to be processed is a reasonable path too IMO.
anyways, I could be convinced but I think its something we should think about
| } | ||
| } | ||
|
|
||
| private RemoveNamedTopologyResult resetOffsets(final KafkaFutureImpl<Void> removeTopologyFuture, |
There was a problem hiding this comment.
I'm assuming this is just extracting the inlined function and hence skipped and did not compare line by line :)
There was a problem hiding this comment.
Yep -- just direct copy/paste
| * Start up Streams with a collection of initial NamedTopologies (may be empty) | ||
| */ | ||
| public void start(final Collection<NamedTopology> initialTopologies) { | ||
| public synchronized void start(final Collection<NamedTopology> initialTopologies) { |
There was a problem hiding this comment.
I took some time to understand why we want to synchronize here, as at the first sight it looks a bit unclear to me:
/* means caller -> callee */
inherited.start: synchronized, public ->
addNamedTopology: unsynchronized, public, register topology metadata ->
completedFutureForUnstartedApp: synchronized, private, check state
removeNamedTopology: unsynchronized, public, unregister metadata topology ->
completedFutureForUnstartedApp: synchronized, private, check state
Register/unregister topology metadata is synchronized, and parent.start would modify state.
I think I understand now that it's because addNamedTopology is not synchronized, plus when we have multiple named topology we want to keep the state unchanged while adding them one-by-one. Is that the case? If yes maybe it's better to add such reasoning in the javadoc above.
| topologyMetadata.unregisterTopology(removeTopologyFuture, topologyToRemove); | ||
|
|
||
| if (resetOffsets) { | ||
| if (!completedFutureForUnstartedApp(removeTopologyFuture, "removing topology") && resetOffsets) { |
There was a problem hiding this comment.
nit: how about put resetOffsets as the first condition so that if it's false, we would skip the synchronized function (not sure if JIT would really be able to optimize this way)?
There was a problem hiding this comment.
We kind of assume that an application will not be doing heavy/frequent #removeNamedTopology calls, if it turns out that users want to be able to add and remove many topologies at a high rate then we can come back and try to optimize this -- it just doesn't seem to make much sense for an application to have such high turnover of topologies, this feature is generally speaking more targeted at providing a relatively stable application the ability to update its topology as needed, not for high volumes of transient topologies
There was a problem hiding this comment.
Oh mm actually this change is what's causing the tests to hang as it breaks the fix -- we actually need to ensure that we check the CREATED state and complete the future if so. But while I still stand by my comments above, ie that trying to avoid entering a synchronized block when adding or removing a topology is probably premature optimization, I actually did look over the class and believe we can make this work without synchronizing this particular method.
(However we do still need to synchronize on start, for several reasons)
There was a problem hiding this comment.
Thanks! I overlooked on its side effects..
|
|
||
| @Test | ||
| public void shouldThrowTopologyExceptionWhenAddingNamedTopologyReadingFromSameInputTopic() { | ||
| public void shouldThrowTopologyExceptionWhenAddingNamedTopologyReadingFromSameInputTopicAfterStart() { |
There was a problem hiding this comment.
Can you elaborate? I did add a "AfterStart" suffix here -- is that what you meant?
|
@ableegoldman I took another look at the latest commit, and LGTM. The jenkins build timeout seems consistent though, and maybe related; after your investigation on its cases please feel free to merge. |
|
Reviewed the latest commit, LGTM. |
|
Re-triggering jenkins. |
Currently the #add/removeNamedTopology APIs behave a little wonky when the application is still in CREATED. Since adding and removing topologies runs some validation steps there is valid reason to want to add or remove a topology on a dummy app that you don't plan to start, or a real app that you haven't started yet. But to actually check the results of the validation you need to call
get()on the future, so we need to make sure thatget()won't block forever in the case of no failure -- as is currently the case