Skip to content

KAFKA-18311: Enforcing copartitioned topics (4/N)#18397

Merged
lucasbru merged 3 commits intoapache:trunkfrom
lucasbru:kip1071merge/topic_config4
Jan 10, 2025
Merged

KAFKA-18311: Enforcing copartitioned topics (4/N)#18397
lucasbru merged 3 commits intoapache:trunkfrom
lucasbru:kip1071merge/topic_config4

Conversation

@lucasbru
Copy link
Copy Markdown
Member

@lucasbru lucasbru commented Jan 6, 2025

A simplified port of "CopartitionedTopicsEnforcer" from the client-side to the group coordinator.

This class is responsible for enforcing the number of partitions in copartitioned topics. For each copartition group, it checks whether the number of partitions for all topics in the group is the same, and enforces the right number of partitions for repartition topics whose number of partitions is not enforced by the topology.

Compared to the client-side version, the implementation uses immutable data structures, and returns the computed number of partitions instead of modifying mutable data structures and calling the admin client.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@lucasbru lucasbru requested a review from cadonna January 6, 2025 13:29
@lucasbru
Copy link
Copy Markdown
Member Author

lucasbru commented Jan 6, 2025

PTAL @aliehsaeedii

@lucasbru lucasbru requested review from bbejeck and mjsax January 6, 2025 13:30
@lucasbru lucasbru force-pushed the kip1071merge/topic_config4 branch from a3a1fc8 to 6d49d28 Compare January 6, 2025 13:36
Copy link
Copy Markdown
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR!

Here my comment!

Comment on lines +67 to +68
* @throws TopicConfigurationException If source topics are missing, or there are topics in copartitionTopics that are not copartitioned
* according to topicPartitionCountProvider are not co-partitioned.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something is wrong with this sentence. Should it be:

If source topics are missing, or there are topics in {@code copartitionTopics} that are not co-partitioned according to topicPartitionCountProvider.

?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

* Enforces the number of partitions for copartitioned topics.
*
* @param copartitionedTopics The set of copartitioned topics (external source topics and repartition topics).
* @param fixedRepartitionTopics The set of repartition topics whose partition count is fixed by the topology.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With by the topology you mean user-specifed, right? If yes, maybe you should mention that they are user-specified.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't want too restrictive - the number of partitions is fixed in the topology, and on the broker we don't necessarily know why it was fixed. But I think right now you are right, this only happens when the user explicitly specifies it in repartition, so I updated it this way:

The set of repartition topics whose partition count is fixed by the topology sent by the client (in particular, when the user uses `repartition` in the DSL).

if (topicPartitionCount.isEmpty()) {
final String str = String.format("Following topics are missing: [%s]", topic);
log.error(str);
throw TopicConfigurationException.missingSourceTopics(str);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think, we are able to verify missing source topics in one location instead of spreading the verification between co-partition enforcer and repartition topic configuration?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we always call RepartitionTopics before calling this class in practice, this can never happen. But that it cannot happen, depends on how this class is used. I think it is still good style for this "unit" to have well-defined behavior if it is used in a context where a source topic is missing. Would you propose that this class just implicitly depends on the validation happening somewhere else?

assertEquals(Map.of(repartitionTopic, 2), result);
}


Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +155 to +156
Utils.mkMap(Utils.mkEntry(repartitionTopic1, 10),
Utils.mkEntry(repartitionTopic2, 5))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also use Map.of() here?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

@Test
public void shouldNotThrowAnExceptionWhenTopicInfosWithEnforcedNumOfPartitionsAreValid() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:
Could you try to formulate this in a positive way?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

@Test
public void shouldNotThrowAnExceptionWhenNumberOfPartitionsOfNonRepartitionTopicAndRepartitionTopicWithEnforcedNumOfPartitionsMatch() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also here could you please try to formulate this positively?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +65 to +66
final String firstSourceTopic = "first";
final String secondSourceTopic = "second";
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:
If you specify all this variables in all tests as constants with sensible names, it would reduce a bit the noise within the tests.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting that you think this is better. Done

@cadonna cadonna added streams core Kafka Broker KIP-1071 PRs related to KIP-1071 labels Jan 9, 2025
@lucasbru
Copy link
Copy Markdown
Member Author

lucasbru commented Jan 9, 2025

Ready for re-review @cadonna

@lucasbru
Copy link
Copy Markdown
Member Author

@cadonna As discussed offline, I can add a third validation for source topics in a different place and throw IllegalStateException here. I will do these changes as I rebase InternalTopicManager, as this is the only place where I can put it. Can we please merge this PR, so I can continue?

Copy link
Copy Markdown
Member

@cadonna cadonna left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@lucasbru lucasbru merged commit b9a952d into apache:trunk Jan 10, 2025
m1a2st pushed a commit to m1a2st/kafka that referenced this pull request Jan 10, 2025
A simplified port of "CopartitionedTopicsEnforcer" from the client-side to the group coordinator.

This class is responsible for enforcing the number of partitions in copartitioned topics. For each copartition group, it checks whether the number of partitions for all topics in the group is the same, and enforces the right number of partitions for repartition topics whose number of partitions is not enforced by the topology.

Compared to the client-side version, the implementation uses immutable data structures, and returns the computed number of partitions instead of modifying mutable data structures and calling the admin client.

Reviewers: Bruno Cadonna <cadonna@apache.org>
ijuma added a commit to ijuma/kafka that referenced this pull request Jan 10, 2025
…emove-metadata-version-methods-for-versions-older-than-3.0

* apache-github/trunk:
  KAFKA-18340: Change Dockerfile to use log4j2 yaml instead log4j properties (apache#18378)
  MINOR: fix flaky RemoteLogManagerTest#testStopPartitionsWithDeletion (apache#18474)
  KAFKA-18311: Enforcing copartitioned topics (4/N) (apache#18397)
  KAFKA-18308; Update CoordinatorSerde (apache#18455)
  KAFKA-18440: Convert AuthorizationException to fatal error in AdminClient (apache#18435)
  KAFKA-17671: Create better documentation for transactions (apache#17454)
  KAFKA-18304; Introduce json converter generator (apache#18458)
  MINOR: Clean up classic group tests (apache#18473)
  KAFKA-18399 Remove ZooKeeper from KafkaApis (2/N): CONTROLLED_SHUTDOWN and ENVELOPE (apache#18422)
  MINOR: improve StreamThread periodic processing log (apache#18430)
pranavt84 pushed a commit to pranavt84/kafka that referenced this pull request Jan 27, 2025
A simplified port of "CopartitionedTopicsEnforcer" from the client-side to the group coordinator.

This class is responsible for enforcing the number of partitions in copartitioned topics. For each copartition group, it checks whether the number of partitions for all topics in the group is the same, and enforces the right number of partitions for repartition topics whose number of partitions is not enforced by the topology.

Compared to the client-side version, the implementation uses immutable data structures, and returns the computed number of partitions instead of modifying mutable data structures and calling the admin client.

Reviewers: Bruno Cadonna <cadonna@apache.org>
manoj-mathivanan pushed a commit to manoj-mathivanan/kafka that referenced this pull request Feb 19, 2025
A simplified port of "CopartitionedTopicsEnforcer" from the client-side to the group coordinator.

This class is responsible for enforcing the number of partitions in copartitioned topics. For each copartition group, it checks whether the number of partitions for all topics in the group is the same, and enforces the right number of partitions for repartition topics whose number of partitions is not enforced by the topology.

Compared to the client-side version, the implementation uses immutable data structures, and returns the computed number of partitions instead of modifying mutable data structures and calling the admin client.

Reviewers: Bruno Cadonna <cadonna@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Kafka Broker KIP-1071 PRs related to KIP-1071 streams

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants