KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used#8598
KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used#8598cmccabe wants to merge 1 commit intoapache:trunkfrom
Conversation
…ts when --bootstrap-server is used
There was a problem hiding this comment.
I think we should take out ++Set(bootstrapServerOpt) from the last parameter of checkInvalidArgs
CommandLineUtils.checkInvalidArgs(parser, options, ifExistsOpt, allTopicLevelOpts -- Set(alterOpt, deleteOpt, describeOpt) ++ Set(bootstrapServerOpt))
CommandLineUtils.checkInvalidArgs(parser, options, ifNotExistsOpt, allTopicLevelOpts -- Set(createOpt) ++ Set(bootstrapServerOpt))
dajac
left a comment
There was a problem hiding this comment.
Thanks for the PR. I have left few comments.
| } catch { | ||
| case e: ExecutionException => { | ||
| val cause = e.getCause | ||
| if (cause.isInstanceOf[TopicExistsException] || topic.ifTopicDoesntExist()) { |
There was a problem hiding this comment.
Shouldn't we re-throw all exceptions except TopicExistsException if topic.ifTopicDoesntExist?
|
|
||
| println(s"Created topic ${topic.name}.") | ||
| } else { | ||
| throw new IllegalArgumentException(s"Topic ${topic.name} already exists") |
There was a problem hiding this comment.
Apparently, we already verify the existence of the topic prior to creating it. Should we also handle --if-not-exists in this case?
| new util.ArrayList(replicaMap.map(p => p._2.asJava).asJavaCollection).asInstanceOf[util.List[util.List[Integer]]] | ||
| if(opts.topicConfig.isDefined || opts.configsToDelete.isDefined) { | ||
| throw new RuntimeException("Using --config or --delete-config is not supported " + | ||
| "when altering a topic via the broker API. Use kafka-configs.sh instead.") |
There was a problem hiding this comment.
nit: Extra space after the first ..
| "logic or ordering of the messages will be affected") | ||
| val topicDescription = try { | ||
| adminClient.describeTopics(Collections.singleton(tp.name)). | ||
| all().get().get(tp.name) |
There was a problem hiding this comment.
nit: unnecessary spaces before all().
| val createResult = adminClient.createTopics(Collections.singleton(newTopic)) | ||
| createResult.all().get() | ||
| try { | ||
| createResult.all().get() |
There was a problem hiding this comment.
nit: the parenthesis are not necessary. there is other cases when they can be removed.
| topicName -> NewPartitions.increaseTo(topic.partitions.get, newAssignment) | ||
| } | ||
| if (topicDescription.partitions().size() == tp.partitions.get) { | ||
| println(s"Topic ${tp.name} already has ${tp.partitions.get} partitions. " + |
There was a problem hiding this comment.
nit: extra space after the .. same below.
| "if set when describing topics, only show topics that have overridden configs") | ||
| private val ifExistsOpt = parser.accepts("if-exists", | ||
| "if set when altering or deleting or describing topics, the action will only execute if the topic exists. Not supported with the --bootstrap-server option.") | ||
| "if set when altering or deleting or describing topics, the action will only execute if the topic exists.") |
There was a problem hiding this comment.
We need to update checkArgs to accept them with bootstrap-server. As the moment, the tool refuses them with the following error:
Option "[if-not-exists]" can't be used with option "[bootstrap-server]"
|
duplicate of #8737 |
No description provided.