diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 57b838aeecae3..6787fb0c33581 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -305,7 +305,7 @@ object ConfigCommand extends Config { entityType match { case ConfigType.Topic => - val oldConfig = getConfig(adminClient, entityType, entityName, includeSynonyms = false) + val oldConfig = getConfig(adminClient, entityType, entityName, includeSynonyms = false, describeAll = false) .map { entry => (entry.name, entry) }.toMap // fail the command if any of the configs to be deleted does not exist @@ -321,7 +321,7 @@ object ConfigCommand extends Config { adminClient.incrementalAlterConfigs(Map(configResource -> alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS) case ConfigType.Broker => - val oldConfig = getConfig(adminClient, entityType, entityName, includeSynonyms = false) + val oldConfig = getConfig(adminClient, entityType, entityName, includeSynonyms = false, describeAll = false) .map { entry => (entry.name, entry) }.toMap // fail the command if any of the configs to be deleted does not exist @@ -340,7 +340,7 @@ object ConfigCommand extends Config { adminClient.alterConfigs(Map(configResource -> newConfig).asJava, alterOptions).all().get(60, TimeUnit.SECONDS) case BrokerLoggerConfigType => - val validLoggers = getConfig(adminClient, entityType, entityName, includeSynonyms = true).map(_.name) + val validLoggers = getConfig(adminClient, entityType, entityName, includeSynonyms = true, describeAll = false).map(_.name) // fail the command if any of the configured broker loggers do not exist val invalidBrokerLoggers = configsToBeDeleted.filterNot(validLoggers.contains) ++ configsToBeAdded.keys.filterNot(validLoggers.contains) if (invalidBrokerLoggers.nonEmpty) @@ -353,18 +353,19 @@ object ConfigCommand extends Config { ).asJavaCollection adminClient.incrementalAlterConfigs(Map(configResource -> alterLogLevelEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS) - case _ => throw new IllegalArgumentException(s"Unsupported entity type: ${entityType}") + case _ => throw new IllegalArgumentException(s"Unsupported entity type: $entityType") } if (entityName.nonEmpty) - println(s"Completed updating config for ${entityType.dropRight(1)} ${entityName}.") + println(s"Completed updating config for ${entityType.dropRight(1)} $entityName.") else - println(s"Completed updating default config for ${entityType} in the cluster.") + println(s"Completed updating default config for $entityType in the cluster.") } private def describeConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = { val entityType = opts.entityTypes.head val entityName = opts.entityNames.headOption + val describeAll = opts.options.has(opts.allOpt) val entities = entityName .map(name => List(name)) @@ -378,24 +379,25 @@ object ConfigCommand extends Config { entities.foreach { entity => entity match { case "" => - println(s"Default config for ${entityType} in the cluster are:") + println(s"Default configs for $entityType in the cluster are:") case _ => - println(s"Configs for ${entityType.dropRight(1)} ${entity} are:") + val configSourceStr = if (describeAll) "All" else "Dynamic" + println(s"$configSourceStr configs for ${entityType.dropRight(1)} $entity are:") } - getConfig(adminClient, entityType, entity, includeSynonyms = true).foreach { entry => + getConfig(adminClient, entityType, entity, includeSynonyms = true, describeAll).foreach { entry => val synonyms = entry.synonyms.asScala.map(synonym => s"${synonym.source}:${synonym.name}=${synonym.value}").mkString(", ") println(s" ${entry.name}=${entry.value} sensitive=${entry.isSensitive} synonyms={$synonyms}") } } } - private def getConfig(adminClient: Admin, entityType: String, entityName: String, includeSynonyms: Boolean): Seq[ConfigEntry] = { + private def getConfig(adminClient: Admin, entityType: String, entityName: String, includeSynonyms: Boolean, describeAll: Boolean) = { def validateBrokerId(): Unit = try entityName.toInt catch { case _: NumberFormatException => - throw new IllegalArgumentException(s"The entity name for ${entityType} must be a valid integer broker id, found: ${entityName}") + throw new IllegalArgumentException(s"The entity name for $entityType must be a valid integer broker id, found: $entityName") } - val (configResourceType, configSourceFilter) = entityType match { + val (configResourceType, dynamicConfigSource) = entityType match { case ConfigType.Topic => if (!entityName.isEmpty) Topic.validate(entityName) @@ -413,6 +415,11 @@ object ConfigCommand extends Config { (ConfigResource.Type.BROKER_LOGGER, None) } + val configSourceFilter = if (describeAll) + None + else + dynamicConfigSource + val configResource = new ConfigResource(configResourceType, entityName) val configs = adminClient.describeConfigs(Collections.singleton(configResource)).all.get(30, TimeUnit.SECONDS) configs.get(configResource).entries.asScala @@ -545,6 +552,8 @@ object ConfigCommand extends Config { .ofType(classOf[String]) val alterOpt = parser.accepts("alter", "Alter the configuration for the entity.") val describeOpt = parser.accepts("describe", "List configs for the given entity.") + val allOpt = parser.accepts("all", "List all configs for the given entity (includes static configuration when the entity type is brokers)") + val entityType = parser.accepts("entity-type", "Type of entity (topics/clients/users/brokers/broker-loggers)") .withRequiredArg .ofType(classOf[String]) @@ -654,6 +663,10 @@ object ConfigCommand extends Config { else if (options.has(bootstrapServerOpt) && options.has(zkConnectOpt)) throw new IllegalArgumentException("Only one of --bootstrap-server or --zookeeper must be specified") + if (options.has(allOpt) && options.has(zkConnectOpt)) { + throw new IllegalArgumentException(s"--bootstrap-server must be specified for --all") + } + if (hasEntityName && (entityTypeVals.contains(ConfigType.Broker) || entityTypeVals.contains(BrokerLoggerConfigType))) { Seq(entityName, broker, brokerLogger).filter(options.has(_)).map(options.valueOf(_)).foreach { brokerId => try brokerId.toInt catch { diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala index b9527c9bf9afd..69a51f0d6f84f 100644 --- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala @@ -445,6 +445,39 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { new ConfigCommandOptions(optsList.toArray).checkArgs() } + @Test + def testDescribeAllBrokerConfig(): Unit = { + val optsList = List("--bootstrap-server", "localhost:9092", + "--entity-type", ConfigType.Broker, + "--entity-name", "1", + "--describe", + "--all") + + new ConfigCommandOptions(optsList.toArray).checkArgs() + } + + @Test + def testDescribeAllTopicConfig(): Unit = { + val optsList = List("--bootstrap-server", "localhost:9092", + "--entity-type", ConfigType.Topic, + "--entity-name", "foo", + "--describe", + "--all") + + new ConfigCommandOptions(optsList.toArray).checkArgs() + } + + @Test(expected = classOf[IllegalArgumentException]) + def testDescribeAllBrokerConfigBootstrapServerRequired(): Unit = { + val optsList = List("--zookeeper", zkConnect, + "--entity-type", ConfigType.Broker, + "--entity-name", "1", + "--describe", + "--all") + + new ConfigCommandOptions(optsList.toArray).checkArgs() + } + @Test(expected = classOf[IllegalArgumentException]) def testEntityDefaultOptionWithDescribeBrokerLoggerIsNotAllowed(): Unit = { val optsList = List("--bootstrap-server", "localhost:9092",