KAFKA-10357: Add setup method to internal topics#10317
KAFKA-10357: Add setup method to internal topics#10317guozhangwang merged 2 commits intoapache:trunkfrom
Conversation
For KIP-698, we need a way to setup internal topics without validating them. This PR adds a setup method to the InternalTopicManager for that purpose.
| @Test | ||
| public void shouldReportMisconfigurationsOfCleanupPolicyForRepartitionTopics() { | ||
| final long retentionMs = 1000; | ||
| mockAdminClient.addTopic( |
There was a problem hiding this comment.
Just refactorings from here to the end.
|
Call for review: @rodesai |
rodesai
left a comment
There was a problem hiding this comment.
Mostly makes sense to me. I'm a little concerned about the error handling. Basically, we may raise an exception back to the caller of KafkaStreams and leave things in a state where setup is not retriable. One way to make this more graceful would be to do a best-effort cleanup if setup fails. Alternatively, maybe we could add some interface to cleanup all the internal topics on a failure?
| final long now = time.milliseconds(); | ||
| final long deadline = now + retryTimeoutMs; | ||
|
|
||
| final Map<String, Map<String, String>> newTopicConfigs = topicConfigs.values().stream() |
There was a problem hiding this comment.
naming nit: topicConfigsWithRetention
There was a problem hiding this comment.
I would not rename this since the additional retention is only added in case of topicConfig instanceof WindowedChangelogTopicConfig, which is one of three. What about streamsSideTopicConfigs to emphasize that these are the topic configs that Streams sets by default possibly overidden by user code in the Streams app.
| maybeThrowTimeoutException( | ||
| Collections.singletonList(topicStillToCreate), | ||
| deadline, | ||
| String.format("Could not create internal topics within %d milliseconds. This can happen if the " + |
There was a problem hiding this comment.
In this case I think we should include some error details here. In particular, the last seen error for each topic. I'm worried about cases where we try to create but the create times out but is eventually successful. We'd return an error back, but the user would have no way to know that setup failed because an internal topic already exists.
There was a problem hiding this comment.
This sounds like a really useful idea.
| * | ||
| * @param topicConfigs internal topics to setup | ||
| */ | ||
| public void setup(final Map<String, InternalTopicConfig> topicConfigs) { |
There was a problem hiding this comment.
should we make an effort to clean up created topics on failure? currently this method is not retriable
There was a problem hiding this comment.
Yes, I agree that cleaning up would be a good idea.
Conflicts: * build.gradle: keep `dependencySubstitution` Confluent addition in `resolutionStrategy` and take upstream changes. Commits: * apache-github/trunk: KAFKA-12503: inform threads to resize their cache instead of doing so for them (apache#10356) KAFKA-10697: Remove ProduceResponse.responses (apache#10332) MINOR: Exclude KIP-500.md from rat check (apache#10354) MINOR: Move `configurations.all` to be a child of `allprojects` (apache#10349) MINOR: Remove use of `NoSuchElementException` in `KafkaMetadataLog` (apache#10344) MINOR: Start the broker-to-controller channel for request forwarding (apache#10340) KAFKA-12382: add a README for KIP-500 (apache#10227) MINOR: Fix BaseHashTable sizing (apache#10334) KAFKA-10357: Add setup method to internal topics (apache#10317) MINOR: remove redundant null check when testing specified type (apache#10314) KAFKA-12293: Remove JCenter from buildscript and delete buildscript.gradle KAFKA-12491: Make rocksdb an `api` dependency for `streams` (apache#10341) KAFKA-12454: Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster (apache#10304) KAFKA-12459; Use property testing library for raft event simulation tests (apache#10323) MINOR: fix failing ZooKeeper system tests (apache#10297) MINOR: fix client_compatibility_features_test.py (apache#10292)
For KIP-698, we need a way to setup internal topics without
validating them. This PR adds a setup method to the
InternalTopicManager for that purpose.
Committer Checklist (excluded from commit message)