From cef3fed3b2709b7e5c3aef803b08e39c06183f44 Mon Sep 17 00:00:00 2001 From: Raymond Ng Date: Mon, 28 Oct 2019 16:37:54 -0700 Subject: [PATCH 1/4] KAFKA-9040 Add --all that includes dynamic and static config Added unit test: kafka.admin.ConfigCommandTest.testDescribeAllBrokerConfig --- .../scala/kafka/admin/ConfigCommand.scala | 15 ++++++++----- .../unit/kafka/admin/ConfigCommandTest.scala | 21 +++++++++++++++++++ 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 57b838aeecae3..14ddb775d2de5 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -305,7 +305,7 @@ object ConfigCommand extends Config { entityType match { case ConfigType.Topic => - val oldConfig = getConfig(adminClient, entityType, entityName, includeSynonyms = false) + val oldConfig = getConfig(adminClient, entityType, entityName, includeSynonyms = false, describeAll = false) .map { entry => (entry.name, entry) }.toMap // fail the command if any of the configs to be deleted does not exist @@ -321,7 +321,7 @@ object ConfigCommand extends Config { adminClient.incrementalAlterConfigs(Map(configResource -> alterEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS) case ConfigType.Broker => - val oldConfig = getConfig(adminClient, entityType, entityName, includeSynonyms = false) + val oldConfig = getConfig(adminClient, entityType, entityName, includeSynonyms = false, describeAll = false) .map { entry => (entry.name, entry) }.toMap // fail the command if any of the configs to be deleted does not exist @@ -340,7 +340,7 @@ object ConfigCommand extends Config { adminClient.alterConfigs(Map(configResource -> newConfig).asJava, alterOptions).all().get(60, TimeUnit.SECONDS) case BrokerLoggerConfigType => - val validLoggers = getConfig(adminClient, entityType, entityName, includeSynonyms = true).map(_.name) + val validLoggers = getConfig(adminClient, entityType, entityName, includeSynonyms = true, describeAll = false).map(_.name) // fail the command if any of the configured broker loggers do not exist val invalidBrokerLoggers = configsToBeDeleted.filterNot(validLoggers.contains) ++ configsToBeAdded.keys.filterNot(validLoggers.contains) if (invalidBrokerLoggers.nonEmpty) @@ -365,6 +365,7 @@ object ConfigCommand extends Config { private def describeConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = { val entityType = opts.entityTypes.head val entityName = opts.entityNames.headOption + val describeAll = opts.options.has(opts.allOpt) val entities = entityName .map(name => List(name)) @@ -382,14 +383,14 @@ object ConfigCommand extends Config { case _ => println(s"Configs for ${entityType.dropRight(1)} ${entity} are:") } - getConfig(adminClient, entityType, entity, includeSynonyms = true).foreach { entry => + getConfig(adminClient, entityType, entity, includeSynonyms = true, describeAll).foreach { entry => val synonyms = entry.synonyms.asScala.map(synonym => s"${synonym.source}:${synonym.name}=${synonym.value}").mkString(", ") println(s" ${entry.name}=${entry.value} sensitive=${entry.isSensitive} synonyms={$synonyms}") } } } - private def getConfig(adminClient: Admin, entityType: String, entityName: String, includeSynonyms: Boolean): Seq[ConfigEntry] = { + private def getConfig(adminClient: Admin, entityType: String, entityName: String, includeSynonyms: Boolean, describeAll: Boolean) = { def validateBrokerId(): Unit = try entityName.toInt catch { case _: NumberFormatException => throw new IllegalArgumentException(s"The entity name for ${entityType} must be a valid integer broker id, found: ${entityName}") @@ -545,6 +546,8 @@ object ConfigCommand extends Config { .ofType(classOf[String]) val alterOpt = parser.accepts("alter", "Alter the configuration for the entity.") val describeOpt = parser.accepts("describe", "List configs for the given entity.") + val allOpt = parser.accepts("all", "List all configs for the given entity") + val entityType = parser.accepts("entity-type", "Type of entity (topics/clients/users/brokers/broker-loggers)") .withRequiredArg .ofType(classOf[String]) @@ -653,6 +656,8 @@ object ConfigCommand extends Config { throw new IllegalArgumentException("One of the required --bootstrap-server or --zookeeper arguments must be specified") else if (options.has(bootstrapServerOpt) && options.has(zkConnectOpt)) throw new IllegalArgumentException("Only one of --bootstrap-server or --zookeeper must be specified") + else if (options.has(zkConnectOpt) && options.has(allOpt)) + throw new IllegalArgumentException(s"--bootstrap-server must be specified for --all") if (hasEntityName && (entityTypeVals.contains(ConfigType.Broker) || entityTypeVals.contains(BrokerLoggerConfigType))) { Seq(entityName, broker, brokerLogger).filter(options.has(_)).map(options.valueOf(_)).foreach { brokerId => diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala index b9527c9bf9afd..1b4c8970299ea 100644 --- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala @@ -445,6 +445,27 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { new ConfigCommandOptions(optsList.toArray).checkArgs() } + def testDescribeAllBrokerConfig(): Unit = { + val optsList = List("--bootstrap-server", "localhost:9092", + "--entity-type", ConfigType.Broker, + "--entity-name", "1", + "--describe", + "--all") + + new ConfigCommandOptions(optsList.toArray).checkArgs() + } + + @Test(expected = classOf[IllegalArgumentException]) + def testDescribeAllBrokerConfigBootstrapServerRequired(): Unit = { + val optsList = List("--zookeeper", zkConnect, + "--entity-type", ConfigType.Broker, + "--entity-name", "1", + "--describe", + "--all") + + new ConfigCommandOptions(optsList.toArray).checkArgs() + } + @Test(expected = classOf[IllegalArgumentException]) def testEntityDefaultOptionWithDescribeBrokerLoggerIsNotAllowed(): Unit = { val optsList = List("--bootstrap-server", "localhost:9092", From c65ae76a6f8e0b0550146345a030c81fed1c712f Mon Sep 17 00:00:00 2001 From: Raymond Ng <> Date: Fri, 3 Jan 2020 16:45:16 -0800 Subject: [PATCH 2/4] Add back describeAll logic + additional validation + tests --- .../scala/kafka/admin/ConfigCommand.scala | 14 ++++++++++--- .../unit/kafka/admin/ConfigCommandTest.scala | 21 +++++++++++++++++++ 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 14ddb775d2de5..5aa68fc877698 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -406,7 +406,8 @@ object ConfigCommand extends Config { (ConfigResource.Type.BROKER, Some(ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG)) case _ => validateBrokerId() - (ConfigResource.Type.BROKER, Some(ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG)) + val configSource = if (describeAll) None else Some(ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG) + (ConfigResource.Type.BROKER, configSource) } case BrokerLoggerConfigType => if (!entityName.isEmpty) @@ -656,8 +657,15 @@ object ConfigCommand extends Config { throw new IllegalArgumentException("One of the required --bootstrap-server or --zookeeper arguments must be specified") else if (options.has(bootstrapServerOpt) && options.has(zkConnectOpt)) throw new IllegalArgumentException("Only one of --bootstrap-server or --zookeeper must be specified") - else if (options.has(zkConnectOpt) && options.has(allOpt)) - throw new IllegalArgumentException(s"--bootstrap-server must be specified for --all") + + if (options.has(allOpt)) { + if (options.has(zkConnectOpt)) + throw new IllegalArgumentException(s"--bootstrap-server must be specified for --all") + else if (!hasEntityName) + throw new IllegalArgumentException(s"--entity-name must be specified for --all") + else if (!entityTypeVals.contains(ConfigType.Broker)) + throw new IllegalArgumentException(s"--all only applies to --entity-type=${ConfigType.Broker}") + } if (hasEntityName && (entityTypeVals.contains(ConfigType.Broker) || entityTypeVals.contains(BrokerLoggerConfigType))) { Seq(entityName, broker, brokerLogger).filter(options.has(_)).map(options.valueOf(_)).foreach { brokerId => diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala index 1b4c8970299ea..d950f1c133981 100644 --- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala @@ -445,6 +445,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { new ConfigCommandOptions(optsList.toArray).checkArgs() } + @Test def testDescribeAllBrokerConfig(): Unit = { val optsList = List("--bootstrap-server", "localhost:9092", "--entity-type", ConfigType.Broker, @@ -455,6 +456,26 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { new ConfigCommandOptions(optsList.toArray).checkArgs() } + @Test(expected = classOf[IllegalArgumentException]) + def testDescribeAllBrokerConfigBrokerRequired(): Unit = { + val optsList = List("--bootstrap-server", "localhost:9092", + "--entity-type", ConfigType.Topic, // not supported + "--describe", + "--all") + + new ConfigCommandOptions(optsList.toArray).checkArgs() + } + + @Test(expected = classOf[IllegalArgumentException]) + def testDescribeAllBrokerConfigEntityNameRequired(): Unit = { + val optsList = List("--bootstrap-server", "localhost:9092", + "--entity-type", ConfigType.Broker, + "--describe", + "--all") + + new ConfigCommandOptions(optsList.toArray).checkArgs() + } + @Test(expected = classOf[IllegalArgumentException]) def testDescribeAllBrokerConfigBootstrapServerRequired(): Unit = { val optsList = List("--zookeeper", zkConnect, From 5e8db153205999579b3145735f7d2d9797acdd57 Mon Sep 17 00:00:00 2001 From: Raymond Ng <> Date: Sun, 12 Jan 2020 16:32:05 -0800 Subject: [PATCH 3/4] improve doc for --all --- core/src/main/scala/kafka/admin/ConfigCommand.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 5aa68fc877698..4a888add470be 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -547,7 +547,7 @@ object ConfigCommand extends Config { .ofType(classOf[String]) val alterOpt = parser.accepts("alter", "Alter the configuration for the entity.") val describeOpt = parser.accepts("describe", "List configs for the given entity.") - val allOpt = parser.accepts("all", "List all configs for the given entity") + val allOpt = parser.accepts("all", "List all configs for the given entity (includes static configuration when the entity type is brokers)") val entityType = parser.accepts("entity-type", "Type of entity (topics/clients/users/brokers/broker-loggers)") .withRequiredArg From 0c2f36a16aec343ac95b476aee7c0f677131f133 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 28 Jan 2020 18:07:24 -0800 Subject: [PATCH 4/4] Allow --all when describing topics as well --- .../scala/kafka/admin/ConfigCommand.scala | 32 +++++++++---------- .../unit/kafka/admin/ConfigCommandTest.scala | 17 +++------- 2 files changed, 20 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ConfigCommand.scala b/core/src/main/scala/kafka/admin/ConfigCommand.scala index 4a888add470be..6787fb0c33581 100644 --- a/core/src/main/scala/kafka/admin/ConfigCommand.scala +++ b/core/src/main/scala/kafka/admin/ConfigCommand.scala @@ -353,13 +353,13 @@ object ConfigCommand extends Config { ).asJavaCollection adminClient.incrementalAlterConfigs(Map(configResource -> alterLogLevelEntries).asJava, alterOptions).all().get(60, TimeUnit.SECONDS) - case _ => throw new IllegalArgumentException(s"Unsupported entity type: ${entityType}") + case _ => throw new IllegalArgumentException(s"Unsupported entity type: $entityType") } if (entityName.nonEmpty) - println(s"Completed updating config for ${entityType.dropRight(1)} ${entityName}.") + println(s"Completed updating config for ${entityType.dropRight(1)} $entityName.") else - println(s"Completed updating default config for ${entityType} in the cluster.") + println(s"Completed updating default config for $entityType in the cluster.") } private def describeConfig(adminClient: Admin, opts: ConfigCommandOptions): Unit = { @@ -379,9 +379,10 @@ object ConfigCommand extends Config { entities.foreach { entity => entity match { case "" => - println(s"Default config for ${entityType} in the cluster are:") + println(s"Default configs for $entityType in the cluster are:") case _ => - println(s"Configs for ${entityType.dropRight(1)} ${entity} are:") + val configSourceStr = if (describeAll) "All" else "Dynamic" + println(s"$configSourceStr configs for ${entityType.dropRight(1)} $entity are:") } getConfig(adminClient, entityType, entity, includeSynonyms = true, describeAll).foreach { entry => val synonyms = entry.synonyms.asScala.map(synonym => s"${synonym.source}:${synonym.name}=${synonym.value}").mkString(", ") @@ -393,10 +394,10 @@ object ConfigCommand extends Config { private def getConfig(adminClient: Admin, entityType: String, entityName: String, includeSynonyms: Boolean, describeAll: Boolean) = { def validateBrokerId(): Unit = try entityName.toInt catch { case _: NumberFormatException => - throw new IllegalArgumentException(s"The entity name for ${entityType} must be a valid integer broker id, found: ${entityName}") + throw new IllegalArgumentException(s"The entity name for $entityType must be a valid integer broker id, found: $entityName") } - val (configResourceType, configSourceFilter) = entityType match { + val (configResourceType, dynamicConfigSource) = entityType match { case ConfigType.Topic => if (!entityName.isEmpty) Topic.validate(entityName) @@ -406,8 +407,7 @@ object ConfigCommand extends Config { (ConfigResource.Type.BROKER, Some(ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG)) case _ => validateBrokerId() - val configSource = if (describeAll) None else Some(ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG) - (ConfigResource.Type.BROKER, configSource) + (ConfigResource.Type.BROKER, Some(ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG)) } case BrokerLoggerConfigType => if (!entityName.isEmpty) @@ -415,6 +415,11 @@ object ConfigCommand extends Config { (ConfigResource.Type.BROKER_LOGGER, None) } + val configSourceFilter = if (describeAll) + None + else + dynamicConfigSource + val configResource = new ConfigResource(configResourceType, entityName) val configs = adminClient.describeConfigs(Collections.singleton(configResource)).all.get(30, TimeUnit.SECONDS) configs.get(configResource).entries.asScala @@ -658,13 +663,8 @@ object ConfigCommand extends Config { else if (options.has(bootstrapServerOpt) && options.has(zkConnectOpt)) throw new IllegalArgumentException("Only one of --bootstrap-server or --zookeeper must be specified") - if (options.has(allOpt)) { - if (options.has(zkConnectOpt)) - throw new IllegalArgumentException(s"--bootstrap-server must be specified for --all") - else if (!hasEntityName) - throw new IllegalArgumentException(s"--entity-name must be specified for --all") - else if (!entityTypeVals.contains(ConfigType.Broker)) - throw new IllegalArgumentException(s"--all only applies to --entity-type=${ConfigType.Broker}") + if (options.has(allOpt) && options.has(zkConnectOpt)) { + throw new IllegalArgumentException(s"--bootstrap-server must be specified for --all") } if (hasEntityName && (entityTypeVals.contains(ConfigType.Broker) || entityTypeVals.contains(BrokerLoggerConfigType))) { diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala index d950f1c133981..69a51f0d6f84f 100644 --- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala @@ -456,20 +456,11 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging { new ConfigCommandOptions(optsList.toArray).checkArgs() } - @Test(expected = classOf[IllegalArgumentException]) - def testDescribeAllBrokerConfigBrokerRequired(): Unit = { - val optsList = List("--bootstrap-server", "localhost:9092", - "--entity-type", ConfigType.Topic, // not supported - "--describe", - "--all") - - new ConfigCommandOptions(optsList.toArray).checkArgs() - } - - @Test(expected = classOf[IllegalArgumentException]) - def testDescribeAllBrokerConfigEntityNameRequired(): Unit = { + @Test + def testDescribeAllTopicConfig(): Unit = { val optsList = List("--bootstrap-server", "localhost:9092", - "--entity-type", ConfigType.Broker, + "--entity-type", ConfigType.Topic, + "--entity-name", "foo", "--describe", "--all")