KAFKA-10357: Extract setup of changelog from Streams partition assignor#10163
Conversation
| public Set<TopicPartition> preExistingNonSourceTopicBasedPartitions() { | ||
| return Collections.unmodifiableSet(preExistingNonSourceTopicBasedChangelogPartitions); | ||
| } | ||
|
|
||
| public Set<TopicPartition> preExistingPartitionsFor(final TaskId taskId) { | ||
| if (preExistingChangelogPartitionsForTask.containsKey(taskId)) { | ||
| return Collections.unmodifiableSet(preExistingChangelogPartitionsForTask.get(taskId)); | ||
| } | ||
| return Collections.emptySet(); | ||
| } | ||
|
|
||
| public Set<TopicPartition> sourceTopicBasedPartitions() { | ||
| return Collections.unmodifiableSet(sourceTopicBasedChangelogTopicPartitions); | ||
| } | ||
|
|
||
| public Set<TaskId> taskIds() { | ||
| return Collections.unmodifiableSet(changelogPartitionsForTask.keySet()); | ||
| } |
There was a problem hiding this comment.
I added some methods to simplify some code in the assignor.
| final Map<TopicPartition, ListOffsetsResultInfo> endOffsets, | ||
| private Map<TaskId, Long> computeEndOffsetSumsByTask(final Map<TopicPartition, ListOffsetsResultInfo> endOffsets, | ||
| final Map<TopicPartition, Long> sourceChangelogEndOffsets, | ||
| final Collection<TopicPartition> newlyCreatedChangelogPartitions) { |
There was a problem hiding this comment.
I removed the newly created changelog partitions because they do actually not contribute to the task end offset sums.
|
Call for review: @ableegoldman @guozhangwang @stevenpyzhang |
guozhangwang
left a comment
There was a problem hiding this comment.
Made a quick pass on the PR and it lgtm overall. Leaving to @ableegoldman and @mjsax to merge.
ableegoldman
left a comment
There was a problem hiding this comment.
Thanks Bruno! Some naming suggestions since this code is pretty dense, but overall LGTM
| log = logContext.logger(getClass()); | ||
| } | ||
|
|
||
| public void setup() { |
There was a problem hiding this comment.
Is there a specific reason we need to make an explicit call to setup() rather than just doing this in the constructor? I'm always worried we'll end up forgetting to call setup again after some refactoring and someone will waste a day debugging their code because they tried to use a Changelogs object before/without first calling setup()
There was a problem hiding this comment.
I agree with you and if I remember correctly you brought this up also on my change that introduces RepartitionTopics. Once we have the explicit user initialisation as described in KIP-698, we need to distinguish between setting up the broker-side state for Streams and verifying the the broker-side change. That requires changes in the InternalTopicManager. Currently, I could not separate this two concerns and left both in the setup method. Moving forward I will put everything that can be initialized in the constructor to avoid the mistakes you refer to. Consider the state of this class a temporary situation that will be soon fixed.
| private final InternalTopicManager internalTopicManager; | ||
| private final Map<Integer, TopicsInfo> topicGroups; | ||
| private final Map<Integer, Set<TaskId>> tasksForTopicGroup; | ||
| private final Map<TaskId, Set<TopicPartition>> changelogPartitionsForTask = new HashMap<>(); |
There was a problem hiding this comment.
nit: clarify that this contains only stateful tasks (eg changelogPartitionsForStatefulTask -- for this and any other data structure which only contains stateful partitions/tasks
There was a problem hiding this comment.
I also thought about this and came to the conclusion that it would be clear that the task is stateful since it has at least one changelog. But I am fine with renaming it as you proposed. Better clear!
| if (!sourceTopicBasedChangelogTopics.contains(topicPartition.topic())) { | ||
| preExistingNonSourceTopicBasedChangelogPartitions.add(topicPartition); | ||
| } else { | ||
| sourceTopicBasedChangelogTopicPartitions.add(topicPartition); |
There was a problem hiding this comment.
So technically this only contains pre-existing source topic based changelogs -- can we specify so in the set name? It's already pretty long but maybe we can drop the Topic part to help somewhat
There was a problem hiding this comment.
Also here, if a changelog is source based it has to be pre-existing, because the InternalTopicManager will not setup source topics. However, making this clearer in the name does not harm.
| changelogEndOffset = sourceChangelogEndOffsets.get(changelog); | ||
| } else if (endOffsets.containsKey(changelog)) { | ||
| changelogEndOffset = endOffsets.get(changelog).offset(); | ||
| for (final TaskId taskId : changelogTopics.taskIds()) { |
There was a problem hiding this comment.
Just to clarify, changelogTopics.taskIds() will only return stateful tasks -- again I think we should clarify that in the getter method name. I had to trace back through the code to be sure
ableegoldman
left a comment
There was a problem hiding this comment.
Looks good, merging to trunk
|
Java11 failed with unrelated flaky |
* apache-github/trunk: (37 commits) KAFKA-10357: Extract setup of changelog from Streams partition assignor (apache#10163) KAFKA-10251: increase timeout for consuming records (apache#10228) KAFKA-12394; Return `TOPIC_AUTHORIZATION_FAILED` in delete topic response if no describe permission (apache#10223) MINOR: Disable transactional/idempotent system tests for Raft quorums (apache#10224) KAFKA-10766: Unit test cases for RocksDBRangeIterator (apache#9717) KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore (apache#10052) KAFKA-12268: Implement task idling semantics via currentLag API (apache#10137) MINOR: Time and log producer state recovery phases (apache#10241) MINOR: correct the error message of validating uint32 (apache#10193) MINOR: Format the revoking active log output in `StreamsPartitionAssignor` (apache#10242) KAFKA-12323 Follow-up: Refactor the unit test a bit (apache#10205) MINOR: Remove stack trace of the lock exception in a debug log4j (apache#10231) MINOR: Word count should account for extra whitespaces between words (apache#10229) MINOR; Small refactor in `GroupMetadata` (apache#10236) KAFKA-10340: Proactively close producer when cancelling source tasks (apache#10016) KAFKA-12329; kafka-reassign-partitions command should give a better error message when a topic does not exist (apache#10141) KAFKA-12254: Ensure MM2 creates topics with source topic configs (apache#10217) MINOR: fix kafka-metadata-shell.sh (apache#10226) KAFKA-12374: Add missing config sasl.mechanism.controller.protocol (apache#10199) KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs (apache#8812) ...
To implement the explicit user initialization of Kafka Streams as
described in KIP-698, we first need to extract the code for the
setup of the changelog topics from the Streams partition assignor
so that it can also be called outside of a rebalance.
Committer Checklist (excluded from commit message)