Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 25 additions & 12 deletions core/src/main/scala/kafka/admin/ConfigCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,7 @@ object ConfigCommand extends Config {

entityType match {
case ConfigType.Topic =>
val oldConfig = getConfig(adminClient, entityType, entityName, includeSynonyms = false)
val oldConfig = getConfig(adminClient, entityType, entityName, includeSynonyms = false, describeAll = false)
.map { entry => (entry.name, entry) }.toMap

// fail the command if any of the configs to be deleted does not exist
Expand All @@ -321,7 +321,7 @@ object ConfigCommand extends Config {
adminClient.incrementalAlterConfigs(Map(configResource -> alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)

case ConfigType.Broker =>
val oldConfig = getConfig(adminClient, entityType, entityName, includeSynonyms = false)
val oldConfig = getConfig(adminClient, entityType, entityName, includeSynonyms = false, describeAll = false)
.map { entry => (entry.name, entry) }.toMap

// fail the command if any of the configs to be deleted does not exist
Expand All @@ -340,7 +340,7 @@ object ConfigCommand extends Config {
adminClient.alterConfigs(Map(configResource -> newConfig).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)

case BrokerLoggerConfigType =>
val validLoggers = getConfig(adminClient, entityType, entityName, includeSynonyms = true).map(_.name)
val validLoggers = getConfig(adminClient, entityType, entityName, includeSynonyms = true, describeAll = false).map(_.name)
// fail the command if any of the configured broker loggers do not exist
val invalidBrokerLoggers = configsToBeDeleted.filterNot(validLoggers.contains) ++ configsToBeAdded.keys.filterNot(validLoggers.contains)
if (invalidBrokerLoggers.nonEmpty)
Expand All @@ -353,18 +353,19 @@ object ConfigCommand extends Config {
).asJavaCollection
adminClient.incrementalAlterConfigs(Map(configResource -> alterLogLevelEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS)

case _ => throw new IllegalArgumentException(s"Unsupported entity type: ${entityType}")
case _ => throw new IllegalArgumentException(s"Unsupported entity type: $entityType")
}

if (entityName.nonEmpty)
println(s"Completed updating config for ${entityType.dropRight(1)} ${entityName}.")
println(s"Completed updating config for ${entityType.dropRight(1)} $entityName.")
else
println(s"Completed updating default config for ${entityType} in the cluster.")
println(s"Completed updating default config for $entityType in the cluster.")
}

private def describeConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = {
val entityType = opts.entityTypes.head
val entityName = opts.entityNames.headOption
val describeAll = opts.options.has(opts.allOpt)

val entities = entityName
.map(name => List(name))
Expand All @@ -378,24 +379,25 @@ object ConfigCommand extends Config {
entities.foreach { entity =>
entity match {
case "" =>
println(s"Default config for ${entityType} in the cluster are:")
println(s"Default configs for $entityType in the cluster are:")
case _ =>
println(s"Configs for ${entityType.dropRight(1)} ${entity} are:")
val configSourceStr = if (describeAll) "All" else "Dynamic"
println(s"$configSourceStr configs for ${entityType.dropRight(1)} $entity are:")
}
getConfig(adminClient, entityType, entity, includeSynonyms = true).foreach { entry =>
getConfig(adminClient, entityType, entity, includeSynonyms = true, describeAll).foreach { entry =>
val synonyms = entry.synonyms.asScala.map(synonym => s"${synonym.source}:${synonym.name}=${synonym.value}").mkString(", ")
println(s" ${entry.name}=${entry.value} sensitive=${entry.isSensitive} synonyms={$synonyms}")
}
}
}

private def getConfig(adminClient: Admin, entityType: String, entityName: String, includeSynonyms: Boolean): Seq[ConfigEntry] = {
private def getConfig(adminClient: Admin, entityType: String, entityName: String, includeSynonyms: Boolean, describeAll: Boolean) = {
Comment thread
raymondng marked this conversation as resolved.
def validateBrokerId(): Unit = try entityName.toInt catch {
case _: NumberFormatException =>
throw new IllegalArgumentException(s"The entity name for ${entityType} must be a valid integer broker id, found: ${entityName}")
throw new IllegalArgumentException(s"The entity name for $entityType must be a valid integer broker id, found: $entityName")
}

val (configResourceType, configSourceFilter) = entityType match {
val (configResourceType, dynamicConfigSource) = entityType match {
case ConfigType.Topic =>
if (!entityName.isEmpty)
Topic.validate(entityName)
Expand All @@ -413,6 +415,11 @@ object ConfigCommand extends Config {
(ConfigResource.Type.BROKER_LOGGER, None)
}

val configSourceFilter = if (describeAll)
None
else
dynamicConfigSource

val configResource = new ConfigResource(configResourceType, entityName)
val configs = adminClient.describeConfigs(Collections.singleton(configResource)).all.get(30, TimeUnit.SECONDS)
configs.get(configResource).entries.asScala
Expand Down Expand Up @@ -545,6 +552,8 @@ object ConfigCommand extends Config {
.ofType(classOf[String])
val alterOpt = parser.accepts("alter", "Alter the configuration for the entity.")
val describeOpt = parser.accepts("describe", "List configs for the given entity.")
val allOpt = parser.accepts("all", "List all configs for the given entity (includes static configuration when the entity type is brokers)")

val entityType = parser.accepts("entity-type", "Type of entity (topics/clients/users/brokers/broker-loggers)")
.withRequiredArg
.ofType(classOf[String])
Expand Down Expand Up @@ -654,6 +663,10 @@ object ConfigCommand extends Config {
else if (options.has(bootstrapServerOpt) && options.has(zkConnectOpt))
throw new IllegalArgumentException("Only one of --bootstrap-server or --zookeeper must be specified")

if (options.has(allOpt) && options.has(zkConnectOpt)) {
throw new IllegalArgumentException(s"--bootstrap-server must be specified for --all")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the user may not have seen the 'Only one of --bootstrap-server or --zookeeper' error above, it would be clearer to mention that zkConnectOpt should be dropped while adding '--bootstrap-server'

}

if (hasEntityName && (entityTypeVals.contains(ConfigType.Broker) || entityTypeVals.contains(BrokerLoggerConfigType))) {
Seq(entityName, broker, brokerLogger).filter(options.has(_)).map(options.valueOf(_)).foreach { brokerId =>
try brokerId.toInt catch {
Expand Down
33 changes: 33 additions & 0 deletions core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,39 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
new ConfigCommandOptions(optsList.toArray).checkArgs()
}

@Test
def testDescribeAllBrokerConfig(): Unit = {
val optsList = List("--bootstrap-server", "localhost:9092",
"--entity-type", ConfigType.Broker,
"--entity-name", "1",
"--describe",
"--all")

new ConfigCommandOptions(optsList.toArray).checkArgs()
}

@Test
def testDescribeAllTopicConfig(): Unit = {
val optsList = List("--bootstrap-server", "localhost:9092",
"--entity-type", ConfigType.Topic,
"--entity-name", "foo",
"--describe",
"--all")

new ConfigCommandOptions(optsList.toArray).checkArgs()
}

@Test(expected = classOf[IllegalArgumentException])
def testDescribeAllBrokerConfigBootstrapServerRequired(): Unit = {
val optsList = List("--zookeeper", zkConnect,
"--entity-type", ConfigType.Broker,
"--entity-name", "1",
"--describe",
"--all")

new ConfigCommandOptions(optsList.toArray).checkArgs()
}

@Test(expected = classOf[IllegalArgumentException])
def testEntityDefaultOptionWithDescribeBrokerLoggerIsNotAllowed(): Unit = {
val optsList = List("--bootstrap-server", "localhost:9092",
Expand Down