KAFKA-12648: Pt. 1 - Add NamedTopology to protocol and state directory structure#10609
Conversation
|
cc @wcarlson5 |
8378ef3 to
744df1f
Compare
be84837 to
0b3f176
Compare
d54b17b to
f11911b
Compare
ead4def to
dcd259e
Compare
wcarlson5
left a comment
There was a problem hiding this comment.
This makes sense. I don't see anything that worries me and won't be cleaned/finished in the follow ups, so I think we are good to merge!
There was a problem hiding this comment.
I am assuming that this will be done in a later PR
There was a problem hiding this comment.
Yep, that's covered by Pt. 2 -- the PR is available for review, but it's on top of this PR so it's probably going to be difficult to review until this one is merged
There was a problem hiding this comment.
I think I saw a comment about updating these with named topologies later. Is there are reason you are waiting?
There was a problem hiding this comment.
That comment was more about usages in integration/other unit tests, where we may want to tie in some NamedTopologyies once the basic feature is fully implemented. For the StateDirectoryTest everything should already be implemented so we should have the NamedTopology logic in that class covered by tests in this PR (actually I need to add a few more, I think)
dcd259e to
abd71ca
Compare
guozhangwang
left a comment
There was a problem hiding this comment.
Made a quick pass over the non-testing code, left some clarification questions.
| private List<TaskDirectory> listTaskDirectories(final FileFilter filter) { | ||
| final List<TaskDirectory> taskDirectories = new ArrayList<>(); | ||
| if (hasPersistentStores && stateDir.exists()) { | ||
| if (hasNamedTopologies) { |
There was a problem hiding this comment.
Is it possible that we can have named topology state dirs and unamed (original) state dirs co-exist here?
There was a problem hiding this comment.
No, that should not be allowed. We have checks to verify this in a few places where it matters, but it's an assumption we can make here. I'm not sure if your question was from an implementation point of view or a semantic one, but I can further clarify or justify why it should not be allowed if you want
There was a problem hiding this comment.
I was asking more for a semantic one -- as long as this is not expected then I'm happy for this piece as is :)
| } | ||
| } | ||
| } | ||
| maybeCleanEmptyNamedTopologyDirs(); |
There was a problem hiding this comment.
Should we move this into the try/catch IOException block as well (ditto below)?
| final SubscriptionInfoData.TaskOffsetSum taskOffsetSum = new SubscriptionInfoData.TaskOffsetSum(); | ||
| final TaskId task = t.getKey(); | ||
| taskOffsetSum.setTopicGroupId(task.topicGroupId); | ||
| taskOffsetSum.setPartition(task.partition); |
There was a problem hiding this comment.
Could you remind me why we want to include the partition id in the new version as well?
There was a problem hiding this comment.
Ah, yes. I tried to explain this with a comment on the SubscriptionInfoData.json schema but I'll call it out again in the SubscriptionInfo.java class. Previously we encoded the offset sums as a nested "map" of <topicGroupId, <partition, offsetSum>>, where the "map" is really an array and the array struct does not itself allow for struct types. It's just a gap in the API that no one has cared or had time to close, not a fundamental principle. Anyways this meant we had a TopicGroupId and a PartitionToOffsetSum struct, where in turn the PartitionToOffsetSum was composed of the partition and offset sum base types.
I guess this was reasonable enough when there were only 3 base fields, but if we wanted to maintain this nested array structure it would mean adding more and more nested structs each time we added a field. I felt this would get to be too complicated and annoying to deal with so I flattened the OffsetSum struct out to just include each base field directly
There was a problem hiding this comment.
Thanks for the explanation!
| package org.apache.kafka.streams.integration; | ||
|
|
||
| public class NamedTopologyIntegrationTest { | ||
| //TODO KAFKA-12648 |
There was a problem hiding this comment.
Just wanted to lay out my test plan somewhere, so it doesn't seem like I'm merging all this code with no intention to ever test it. Once the final pieces are in (should be by Pt. 2) these are the things I think are important to touch on with integration tests. Leave a comment if you have any more suggestions or feedback 🙂
| import static org.junit.Assert.assertTrue; | ||
| import static org.junit.Assert.fail; | ||
|
|
||
| //TODO KAFKA-12648: add tests for named topology specific stuff |
There was a problem hiding this comment.
I've added a few tests for the named topology stuff but I definitely want to add more and just haven't yet had time. Since I'm still on-call and therefore unlikely to have time until next week, if you both are able to do a quick pass and don't have any further feedback on the PR as-is, it may make sense to just merge this PR tomorrow (Friday) and do a quick followup PR for the tests next week.
That way I can rebase the next PR (Pt. 2) and you all can actually begin reviewing that. cc @guozhangwang @wcarlson5
|
Rebased after the TaskId changes in KIP-470, and responded to all comments. Not much has changed since the last review, just cleaning up here and there. |
112b768 to
c254a79
Compare
| } | ||
|
|
||
| private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) { | ||
| for (final File taskDir : listNonEmptyTaskDirectories()) { |
There was a problem hiding this comment.
Just want to call this out since it's a change in behavior unrelated to this PR -- actually just something we could/should have cleaned up after removing the lock/file based locking. Previously we couldn't ever delete empty task dirs by cleaner thread (due to that Windows bug), now we can, so we should not exclude empty dirs here
wcarlson5
left a comment
There was a problem hiding this comment.
I don't have any problems with the recent changes so I am still +1
c31dacc to
4e49061
Compare
87e428b to
9c0542c
Compare
9c0542c to
7175e27
Compare
| } | ||
|
|
||
| @Test | ||
| public void shouldCleanupObsoleteTaskDirectoriesInNamedTopologiesAndDeleteTheParentDirectories() throws IOException { |
There was a problem hiding this comment.
Could we add a test case to verify that in case both named topology dir and non-named topology dir co-exist, we would at certain step check against and throw?
There was a problem hiding this comment.
Ack, will add this test in the next PR
guozhangwang
left a comment
There was a problem hiding this comment.
Just another minor comment about covering the not-allowed co-existence of both named and non-named topologies, otherwise LGTM.
|
Thanks @guozhangwang ! I'm going to go ahead and merge this one now since it's been out there for so long, I will follow up on that test case in the Pt. 2 PR |
…ogyBuilders of named topologies (#10683) Pt. 1: #10609 Pt. 2: #10683 Pt. 3: #10788 The TopologyMetadata is next up after Pt. 1 #10609. This PR sets up the basic architecture for running an app with multiple NamedTopologies, though the APIs to add/remove them dynamically are not implemented until Pt. 3 Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
Pt. 1: #10609 Pt. 2: #10683 Pt. 3: #10788 In Pt. 3 we implement the addNamedTopology API. This can be used to update the processing topology of a running Kafka Streams application without resetting the app, or even pausing/restarting the process. It's up to the user to ensure that this API is called on every instance of an application to ensure all clients are able to run the newly added NamedTopology. Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
…ogyBuilders of named topologies (apache#10683) Pt. 1: apache#10609 Pt. 2: apache#10683 Pt. 3: apache#10788 The TopologyMetadata is next up after Pt. 1 apache#10609. This PR sets up the basic architecture for running an app with multiple NamedTopologies, though the APIs to add/remove them dynamically are not implemented until Pt. 3 Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
Pt. 1: apache#10609 Pt. 2: apache#10683 Pt. 3: apache#10788 In Pt. 3 we implement the addNamedTopology API. This can be used to update the processing topology of a running Kafka Streams application without resetting the app, or even pausing/restarting the process. It's up to the user to ensure that this API is called on every instance of an application to ensure all clients are able to run the newly added NamedTopology. Reviewers: Guozhang Wang <guozhang@confluent.io>, Walker Carlson <wcarlson@confluent.io>
Pt. 1: #10609
Pt. 2: #10683
Pt. 3: #10788
This PR includes adding the NamedTopology to the Subscription/AssignmentInfo, and to the StateDirectory so it can place NamedTopology tasks within the hierarchical structure with task directories under the NamedTopology parent dir.