-
Notifications
You must be signed in to change notification settings - Fork 331
SAMZA-2431: Fix the checkpoint and changelog topic auto-creation. #1251
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
@prateekm Can you please take a look at this patch. |
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
Outdated
Show resolved
Hide resolved
samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
Outdated
Show resolved
Hide resolved
shanthoosh
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review.
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
Outdated
Show resolved
Hide resolved
samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
Outdated
Show resolved
Hide resolved
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
Show resolved
Hide resolved
prateekm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the fix.
| checkpointTopicProperties.putAll(spec.getConfig()); | ||
| kafkaSpec = KafkaStreamSpec.fromSpec(StreamSpec.createCheckpointStreamSpec(spec.getPhysicalName(), spec.getSystemName())) | ||
| .copyWithReplicationFactor(Integer.parseInt(new KafkaConfig(config).getCheckpointReplicationFactor().get())) | ||
| .copyWithProperties(checkpointTopicProperties); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can do it separately, can we add .copyWithConfig or .copyWithMap which takes Config or Map<String, String> ?
copyWithProperties is not user friendly, as we are converting a Map/Config to a properties, and this method basically convert it back.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, adding the copyWithConfig to KafkaStreamSpec might be good convenience API. We've additional cleanup planned with this control-flow in the near-future. If acceptable, then I can couple this change with that. Since it's a critical bug-fix, I would prefer to keep the change minimal by containing it to the fix alone and do the clean-up later.
samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
Outdated
Show resolved
Hide resolved
…ache#1251) * Fix the checkpoint and changelog topic creation configurations. * Address review comments. * Address review comments.
Symptom: Checkpoint and changelog kafka topics of a samza job may be created with cleanup.policy set to 'delete' instead of 'compact' for certain cases.
Cause:
Changes: Fix the topic-creation control-flow for the metadata topics and generate the correct topic-configurations.
Tests: Added unit tests to validate that the expected topic configuration bag was generated for both checkpoint and changelog topics.
API Changes: None
Upgrade Instructions: None
Usage Instructions: None