KAFKA-12648: Pt. 3 - addNamedTopology API#10788
Conversation
678d8f8 to
fb44c25
Compare
75a9961 to
bacd140
Compare
…ogyBuilders of named topologies (#10683) Pt. 1: #10609 Pt. 2: #10683 Pt. 3: #10788 The TopologyMetadata is next up after Pt. 1 #10609. This PR sets up the basic architecture for running an app with multiple NamedTopologies, though the APIs to add/remove them dynamically are not implemented until Pt. 3 Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
ddd50d0 to
c876cb7
Compare
There was a problem hiding this comment.
We changed the behavior to return an empty Optional rather than throw, as users may want to use this API to determine whether the given named topology is known or not
32e8f55 to
33655d2
Compare
2727910 to
a1e2374
Compare
a1e2374 to
e93056c
Compare
guozhangwang
left a comment
There was a problem hiding this comment.
Made a first pass on the PR.
| public class NamedTopologyIntegrationTest { | ||
| public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); | ||
|
|
||
| // TODO KAFKA-12648: |
There was a problem hiding this comment.
This is meta question: do we have coverage on scenarios where the leader/member's bookkept named-topologies set are different? I.e. 1) the leader would not try to create any tasks that it's own topology-metadata is not aware of even if other subscriptions contain more topics, 2) vice verse, the other members would not try to create tasks for assignment that its topology metadata does not recognize, while later when they get added the tasks gets created then?
There was a problem hiding this comment.
I'm still filling out the integration test suite, especially the multi-node testing, but I'll make sure this scenario has coverage. This will probably have to be in the followup Pt. 4 which expands add/removeNamedTopology to return a Future, since being able to block on this helps a lot with the testing.
guozhangwang
left a comment
There was a problem hiding this comment.
We a pass on the new commits, but I'm a bit confused on some logic around unknown task removal and task freezing.. maybe we can chat again for me to get your thoughts?
|
@guozhangwang I think I've addressed all your feedback and significantly cleaned up the streamthread event loop + topology locking, let me know if there's anything else |
|
Java 8 & 11 tests passed, Java 16 build failure was unrelated: |
wcarlson5
left a comment
There was a problem hiding this comment.
@ableegoldman I still this this looks good!
| private final SortedMap<String, InternalTopologyBuilder> builders; // Keep sorted by topology name for readability | ||
| private final TopologyVersion version; | ||
|
|
||
| private final ConcurrentNavigableMap<String, InternalTopologyBuilder> builders; // Keep sorted by topology name for readability |
There was a problem hiding this comment.
SGTM.
What about the other comment, i.e. moving the Map<String, InternalTopologyBuilder> builders into the TopologyVersion itself? Besides the constructors, the only modifiers to builders seem to be register/deregister, in which we would always try to getAndIncrement version. So what about consolidating the modification of builders along with version bump, and hence we would not need to use a ConcurrentNavigableMap?
|
|
||
| final long pollLatency = pollPhase(); | ||
|
|
||
| topologyMetadata.maybeWaitForNonEmptyTopology(() -> state); |
There was a problem hiding this comment.
How about moving this ahead of pollPhase()? We are likely to be kicked out of the group while blocked waiting here, so it's better to be aware of that and re-join the group immediately, rather than doing the restore/etc still which may be all wasted work.
There was a problem hiding this comment.
Ack (although note that there's no wasted work on the restore phase since there's by definition nothing for the thread to do yet as it won't have been assigned any new tasks until it polls again).
I don't think it really matters much where we put this for that reason, except for the case in which we start up with no topology -- then it's a waste to join the group in the first place, so we may as well wait until we receive something to work on. So yes, I'll move it back ahead of poll
…n-empty topology before calling poll
| void shutdown(final boolean clean) { | ||
| final AtomicReference<RuntimeException> firstException = new AtomicReference<>(null); | ||
|
|
||
| final Set<Task> tasksToCloseDirty = new HashSet<>(); |
There was a problem hiding this comment.
No actual changes here, just pulled the cleanup of tasks out into a separate new #closeAndCleanUpTasks method so we can call that on tasks from removed topologies
|
|
||
| if (!remainingRevokedPartitions.isEmpty()) { | ||
| log.warn("The following partitions {} are missing from the task partitions. It could potentially " + | ||
| log.debug("The following revoked partitions {} are missing from the current task partitions. It could potentially " + |
There was a problem hiding this comment.
Making this debug since warn seems too intense, and I'm not sure it's even worthy of info -- also, with named topologies you would expect to see this almost every time a topology is removed since the thread will try to close those tasks as soon as it notices the topology's removal
|
LGTM! Please feel free to merge after green builds. |
|
Just one unrelated test failure (reopened KAFKA-13128): |
|
Merged to trunk -- thanks all for keeping up with the reviews so far 😄 |
…ogyBuilders of named topologies (apache#10683) Pt. 1: apache#10609 Pt. 2: apache#10683 Pt. 3: apache#10788 The TopologyMetadata is next up after Pt. 1 apache#10609. This PR sets up the basic architecture for running an app with multiple NamedTopologies, though the APIs to add/remove them dynamically are not implemented until Pt. 3 Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
Pt. 1: apache#10609 Pt. 2: apache#10683 Pt. 3: apache#10788 In Pt. 3 we implement the addNamedTopology API. This can be used to update the processing topology of a running Kafka Streams application without resetting the app, or even pausing/restarting the process. It's up to the user to ensure that this API is called on every instance of an application to ensure all clients are able to run the newly added NamedTopology. Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
Pt. 1: #10609
Pt. 2: #10683
Pt. 3: #10788
In Pt. 3 we implement the
addNamedTopologyAPI. This can be used to update the processing topology of a running Kafka Streams application without resetting the app, or even pausing/restarting the process. It's up to the user to ensure that this API is called on every instance of an application to ensure all clients are able to run the newly added NamedTopology. This should not be too much of a burden as it only requires that each client eventually be updated by the user -- under the covers, Streams will take care of keeping the internal state consistent while various clients wait to converge on the latest view of the full topology.Internally, when a new NamedTopology is added a rebalance will be triggered to distribute the tasks that correspond to it. To minimize disruption and wasted work, the assignor just computes the desired eventual assignment of these new tasks to clients regardless of whether the target client has been issued the
addNamedTopologyrequest yet. If a client receives tasks for a NamedTopology it does not yet recognize, it stashes them away and continues to process its other topologies. Once it receives this new NamedTopology, those tasks will be created and begin processing without triggering a new rebalance. If the new NamedTopology does not match any unknown tasks it has received, then the client must trigger a fresh rebalance for this new NamedTopology.