Skip to content

KAFKA-12254: Ensure MM2 creates topics with source topic configs#10217

Merged
rajinisivaram merged 7 commits intoapache:trunkfrom
dhruvilshah3:mm2-fix
Mar 1, 2021
Merged

KAFKA-12254: Ensure MM2 creates topics with source topic configs#10217
rajinisivaram merged 7 commits intoapache:trunkfrom
dhruvilshah3:mm2-fix

Conversation

@dhruvilshah3
Copy link
Copy Markdown
Contributor

@dhruvilshah3 dhruvilshah3 commented Feb 26, 2021

MM2 creates new topics on the destination cluster with default configurations. It has an async periodic task to refresh topic configurations from the source to destination. However, this opens up a window where the destination cluster has data produced to it with default configurations. In the worst case, this could cause data loss if the destination topic is created without the right cleanup.policy.

This patch fixes the above issue by ensuring that the right configurations are supplied to AdminClient#createTopics when MM2 creates topics on the destination cluster.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

@dhruvilshah3
Copy link
Copy Markdown
Contributor Author

cc @rajinisivaram @junrao for review

Copy link
Copy Markdown
Contributor

@rajinisivaram rajinisivaram left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dhruvilshah3 Thanks for the PR, looks good. There is a failure in the PR build in MirrorConnectorsIntegrationSSLTest. I am guessing it is not related, but can you take a look to confirm?

Map<String, NewPartitions> newPartitions) {
void createNewTopics(List<NewTopic> newTopics) {
Map<String, NewTopic> newTopicMap = newTopics.stream()
.collect(Collectors.toMap(NewTopic::name, Function.identity()));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make this void createNewTopics(Map<String, NewTopic> newTopics) and just create the map in the caller since we seem to be creating a list and transforming?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, I made the change.

@dhruvilshah3
Copy link
Copy Markdown
Contributor Author

Thanks for the review @rajinisivaram. I ran MirrorConnectorsIntegrationSSLTest a few times locally and it passed. It passed in the latest jenkins run as well. Couple of failures in the latest run seem unrelated to the changes.

Copy link
Copy Markdown
Contributor

@rajinisivaram rajinisivaram left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dhruvilshah3 Thanks for the update, LGTM

@rajinisivaram rajinisivaram merged commit cc088c5 into apache:trunk Mar 1, 2021
@dhruvilshah3 dhruvilshah3 deleted the mm2-fix branch March 1, 2021 15:58
ijuma added a commit to ijuma/kafka that referenced this pull request Mar 2, 2021
* apache-github/trunk: (37 commits)
  KAFKA-10357: Extract setup of changelog from Streams partition assignor (apache#10163)
  KAFKA-10251: increase timeout for consuming records (apache#10228)
  KAFKA-12394; Return `TOPIC_AUTHORIZATION_FAILED` in delete topic response if no describe permission (apache#10223)
  MINOR: Disable transactional/idempotent system tests for Raft quorums (apache#10224)
  KAFKA-10766: Unit test cases for RocksDBRangeIterator (apache#9717)
  KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore (apache#10052)
  KAFKA-12268: Implement task idling semantics via currentLag API (apache#10137)
  MINOR: Time and log producer state recovery phases (apache#10241)
  MINOR: correct the error message of validating uint32 (apache#10193)
  MINOR: Format the revoking active log output in `StreamsPartitionAssignor` (apache#10242)
  KAFKA-12323 Follow-up: Refactor the unit test a bit (apache#10205)
  MINOR: Remove stack trace of the lock exception in a debug log4j (apache#10231)
  MINOR: Word count should account for extra whitespaces between words (apache#10229)
  MINOR; Small refactor in `GroupMetadata` (apache#10236)
  KAFKA-10340: Proactively close producer when cancelling source tasks (apache#10016)
  KAFKA-12329; kafka-reassign-partitions command should give a better error message when a topic does not exist (apache#10141)
  KAFKA-12254: Ensure MM2 creates topics with source topic configs (apache#10217)
  MINOR: fix kafka-metadata-shell.sh (apache#10226)
  KAFKA-12374: Add missing config sasl.mechanism.controller.protocol (apache#10199)
  KAFKA-10101: Fix edge cases in Log.recoverLog and LogManager.loadLogs (apache#8812)
  ...
rajinisivaram pushed a commit that referenced this pull request Mar 2, 2021
)

MM2 creates new topics on the destination cluster with default configurations. It has an async periodic task to refresh topic configurations from the source to destination. However, this opens up a window where the destination cluster has data produced to it with default configurations. In the worst case, this could cause data loss if the destination topic is created without the right cleanup.policy. This commit fixes the above issue by ensuring that the right configurations are supplied to AdminClient#createTopics when MM2 creates topics on the destination cluster.

Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants