MINOR: further InternalTopologyBuilder cleanup #8046
Conversation
move log to initialization only cleanup #build weirdness explicitly initialize subscription
|
ready for review @guozhangwang @vvcephei |
…metadata (#7969) Also addresses KAFKA-8821 Note that we still have to fall back to using pattern subscription if the user has added any regex-based source nodes to the topology. Includes some minor cleanup on the side Reviewers: Bill Bejeck <bbejeck@gmail.com>
guozhangwang
left a comment
There was a problem hiding this comment.
Thanks for the PR. I think on the high-level it is right: we just need to make sure that if pattern subscription is used, then within PartitionAssignor we should update the internal topology builder's own bookkeeping of the topic -> source and topic -> store mappings. Just some minor comments about the call traces here.
| } | ||
| nodeGroup.removeAll(globalNodeGroups); | ||
| } | ||
| final Set<String> nodeGroup = nodeGroups().get(topicGroupId); |
There was a problem hiding this comment.
Should we check that topicGroupId is not null?
There was a problem hiding this comment.
I moved all calls where topicGroupId would be null to now call the parameterless build() instead. We can actually make topicGroupId just a regular int now
| } | ||
|
|
||
| return topicCollection; | ||
| return sourceTopicCollection; |
There was a problem hiding this comment.
Hmm.. inside StreamThread do we guarantee we always call initializeSubscription before calling these two functions?
There was a problem hiding this comment.
If not we should call initializeSubscription inside these two as well.
There was a problem hiding this comment.
I put this initialization inside build(), which gets called in the KafkaStreams constructor before any StreamThreads get created, so the InternalTopologyBuilder should always have an initialized subscription by the time it gets passed to a thread.
| subscriptionUpdates.addAll(topics); | ||
| final Collection<String> existingTopics = subscriptionUpdates(); | ||
|
|
||
| if (usesPatternSubscription() && !existingTopics.equals(topics)) { |
There was a problem hiding this comment.
Seems the callers are conditioned on usesPatternSubscription() already?
|
retest this please |
|
retest this please |
|
Ran the test suite locally on the latest commit, just one unrelated failure |
Conflicts: * build.gradle: moved avro plugin definition below newly added test retry plugin. * apache-github/trunk: MINOR: further InternalTopologyBuilder cleanup (apache#8046) MINOR: Add timer for update limit offsets (apache#8047) HOTFIX: Fix spotsbug failure in Kafka examples (apache#8051) KAFKA-9447: Add new customized EOS model example (apache#8031) KAFKA-8164: Add support for retrying failed (apache#8019) HOTFIX: checkstyle for newly added unit test KAFKA-9261; Client should handle unavailable leader metadata (apache#7770) MINOR: Fix typos introduced in KIP-559 (apache#8042) MINOR: Fixing null handilg in ValueAndTimestampSerializer (apache#7679) KAFKA-9113: Clean up task management and state management (apache#7997) MINOR: fix checkstyle issue in ConsumerConfig.java (apache#8038) KAFKA-9491; Increment high watermark after full log truncation (apache#8037) KAFKA-9477 Document RoundRobinAssignor as an option for partition.assignment.strategy (apache#8007) KAFKA-9074: Correct Connect’s `Values.parseString` to properly parse a time and timestamp literal (apache#7568) KAFKA-9492; Ignore record errors in ProduceResponse for older versions (apache#8030)
…t-for-generated-requests * apache-github/trunk: (410 commits) KAFKA-8843: KIP-515: Zookeeper TLS support MINOR: Add missing quote for malformed line content (apache#8070) MINOR: Simplify KafkaProducerTest (apache#8044) KAFKA-9507; AdminClient should check for missing committed offsets (apache#8057) KAFKA-9519: Deprecate the --zookeeper flag in ConfigCommand (apache#8056) KAFKA-9509; Fixing flakiness of MirrorConnectorsIntegrationTest.testReplication (apache#8048) HOTFIX: Fix two test failures in JDK11 (apache#8063) DOCS - clarify transactionalID and idempotent behavior (apache#7821) MINOR: further InternalTopologyBuilder cleanup (apache#8046) MINOR: Add timer for update limit offsets (apache#8047) HOTFIX: Fix spotsbug failure in Kafka examples (apache#8051) KAFKA-9447: Add new customized EOS model example (apache#8031) KAFKA-8164: Add support for retrying failed (apache#8019) HOTFIX: checkstyle for newly added unit test KAFKA-9261; Client should handle unavailable leader metadata (apache#7770) MINOR: Fix typos introduced in KIP-559 (apache#8042) MINOR: Fixing null handilg in ValueAndTimestampSerializer (apache#7679) KAFKA-9113: Clean up task management and state management (apache#7997) MINOR: fix checkstyle issue in ConsumerConfig.java (apache#8038) KAFKA-9491; Increment high watermark after full log truncation (apache#8037) ...
Followup to KAFKA-7317 and KAFKA-9113, there's some additional cleanup we can do in InternalTopologyBuilder. Mostly refactors the subscription code to make the initialization more explicit and reduce some duplicated code in the update logic.
Also some minor cleanup of the
buildmethod