KAFKA-10357: Extract setup of repartition topics from Streams partition assignor#9848
Conversation
…tition assignor To implement the explicit user initialization of Kafka Streams as described in KIP-698, we first need to extract the code for the setup of the repartition topics from the Streams partition assignor so that it can also be called outside of a rebalance.
|
Call for review: @ableegoldman @guozhangwang @lct45 @wcarlson5 |
| return Collections.unmodifiableMap(topicPartitionInfos); | ||
| } | ||
|
|
||
| private Map<String, InternalTopicConfig> computeRepartitionTopicConfig(final Map<Integer, TopicsInfo> topicGroups, |
There was a problem hiding this comment.
Is there a reason you renamed this from computeRepartitionTopicMetadata to computeRepartitionTopicConfig?
There was a problem hiding this comment.
I thought since the returned value is a bunch of configs computeRepartitionTopicConfig() would be more precise.
|
|
||
| if (numPartitions == null) { | ||
| partitionCountNeeded = true; | ||
| log.trace("Unable to determine number of partitions for {}, another iteration is needed", |
There was a problem hiding this comment.
Maybe I'm missing something, but it looks like we want to go through the do loop again if we have null partitions, but in this case wouldn't we hit the if statement below and throw an exception, since at this point partitionCountNeeded == True and progressMadeThisIteration == False ? Or does that just log an exception and continue through the do loop?
There was a problem hiding this comment.
@lct45 sort of; the point of progressMadeThisIteration is to break out of the loop entirely if we detect that we're infinitely looping, as was the case in a recent-ish bug. We want to continue looping only as long as we're actually gaining new information/filling in the partition count of some topic group (subtopology).
So yes, if progressMadeThisIteration = false after the full loop, but we still don't have all the partition counts (partitionCountNeeded = true), then oh well. Something went wrong 🙂 We should throw
| private final Node[] inSyncReplicas; | ||
| private final Node[] offlineReplicas; | ||
|
|
||
| // Used only by tests |
There was a problem hiding this comment.
The usual lying comment. This constructor is not only used in tests.
ableegoldman
left a comment
There was a problem hiding this comment.
Nice work. A few of the variable names were kind of throwing me off, but you can apply my suggestions (or not :P) in a followup PR if you'd prefer. This one pretty much LGTM, looking forward to the next PR
|
|
||
| private void checkIfExternalSourceTopicsExist(final TopicsInfo topicsInfo, | ||
| final Cluster clusterMetadata) { | ||
| final Set<String> externalSourceTopics = new HashSet<>(topicsInfo.sourceTopics); |
There was a problem hiding this comment.
prop: rename to missingExternalSourceTopics or similar
|
|
||
| if (numPartitions == null) { | ||
| partitionCountNeeded = true; | ||
| log.trace("Unable to determine number of partitions for {}, another iteration is needed", |
There was a problem hiding this comment.
@lct45 sort of; the point of progressMadeThisIteration is to break out of the loop entirely if we detect that we're infinitely looping, as was the case in a recent-ish bug. We want to continue looping only as long as we're actually gaining new information/filling in the partition count of some topic group (subtopology).
So yes, if progressMadeThisIteration = false after the full loop, but we still don't have all the partition counts (partitionCountNeeded = true), then oh well. Something went wrong 🙂 We should throw
| final String repartitionSourceTopic) { | ||
| Integer partitionCount = null; | ||
| // try set the number of partitions for this repartition topic if it is not set yet | ||
| for (final TopicsInfo upstreamTopicsInfo : topicGroups.values()) { |
There was a problem hiding this comment.
It's a bit confusing to call this upstreamTopicsInfo since that will only be true of one (or some) of these topic groups. Most of them are not upstream of the current
There was a problem hiding this comment.
Yeah, I was also not sure. I renamed some of the variables. Let me know if it is better now.
| final Cluster clusterMetadata = niceMock(Cluster.class); | ||
|
|
||
| @Before | ||
| public void setUp() { |
There was a problem hiding this comment.
Did you mean to leave in an empty method?
| metadata, | ||
| logPrefix | ||
| ); | ||
| repartitionTopics.setup(); |
There was a problem hiding this comment.
Why not just do the setup inside the constructor?
There was a problem hiding this comment.
That is a valid question. When we introduce the explicit initialization and the manual and automatic configs, we need to distinguish between setting up the internal topics and verifying the internal topics. Hence, there will be two methods on the RepartitionTopic class. We will create the object and then according to the configs we will either just verify or verify and setup the repartition topics. We could accomplish the same with a flag in the constructor but I thought the code might be easier to understand with two methods.
|
@cadonna looks like |
|
@ableegoldman Thank you for the info! |
guozhangwang
left a comment
There was a problem hiding this comment.
Made a pass on the PR. Overall LGTM.
I think once they current comments are addressed it can be merged cc @ableegoldman
| final Map<Integer, TopicsInfo> topicGroups = taskManager.builder().topicGroups(); | ||
|
|
||
| final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = prepareRepartitionTopics(topicGroups, metadata); | ||
| final Map<TopicPartition, PartitionInfo> allRepartitionTopicPartitions = prepareRepartitionTopics(metadata); |
There was a problem hiding this comment.
nit: line 365 above can be moved down now since it is only needed before line 379.
|
|
||
| // make sure the repartition source topics exist with the right number of partitions, | ||
| // create these topics if necessary | ||
| internalTopicManager.makeReady(repartitionTopicMetadata); |
There was a problem hiding this comment.
Not related to this PR: when reading the makeReady code again, I feel that the way we get the retries config by first constructing a dummy admin client can be simplified. Also, it is debatable that if we only do verify, then do we really need to retry or not since the admin client itself already retries internally.
There was a problem hiding this comment.
I think we should keep this in mind. For this ticket, we need to touch makeReady() anyways, because we need to separate verification from creation.
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package org.apache.kafka.streams.processor.internals; |
There was a problem hiding this comment.
As always, thanks for the great test coverage!
|
@ableegoldman @guozhangwang, could someone of you merge this PR since it seems there are no further open question. |
|
One unrelated test failure in |
|
Merged to trunk |
…e-allocations-lz4 * apache-github/trunk: (562 commits) MINOR: remove unused code from MessageTest (apache#9961) MINOR: Fix visibility of Log.{unflushedMessages, addSegment} methods (apache#9966) KAFKA-12229: Restore original class loader in integration tests using EmbeddedConnectCluster during shutdown (apache#9942) KAFKA-12190: Fix setting of file permissions on non-POSIX filesystems (apache#9947) MINOR: Remove `toStruct` and `fromStruct` methods from generated protocol classes (apache#9960) MINOR: Fix typo in Utils#toPositive (apache#9943) MINOR: MessageUtil: remove some deadcode (apache#9931) MINOR: Update zstd-jni to 1.4.8-2 (apache#9957) MINOR: Revert assertion in MockProducerTest (apache#9956) MINOR: Optimize assertions in unit tests (apache#9955) MINOR: Tag `RaftEventSimulationTest` as `integration` and tweak it (apache#9925) MINOR: Update to Gradle 6.8.1 (apache#9953) MINOR: A few small group coordinator cleanups (apache#9952) MINOR: Upgrade ducktape to version 0.8.1 (apache#9933) MINOR: fix record time in test shouldWipeOutStandbyStateDirectoryIfCheckpointIsMissing (apache#9948) MINOR: Restore interrupt status when closing (apache#9863) KAFKA-10357: Extract setup of repartition topics from Streams partition assignor (apache#9848) KAFKA-12212; Bump Metadata API version to remove `ClusterAuthorizedOperations` fields (KIP-700) (apache#9945) MINOR: log 2min processing summary of StreamThread loop (apache#9941) MINOR: Drop enable.metadata.quorum config (apache#9934) ...
To implement the explicit user initialization of Kafka Streams as
described in KIP-698, we first need to extract the code for the
setup of the repartition topics from the Streams partition assignor
so that it can also be called outside of a rebalance.
Committer Checklist (excluded from commit message)