From b648e194657290d9b00864f9e93587f92cb2fc01 Mon Sep 17 00:00:00 2001 From: Federico Valeri Date: Wed, 18 Jan 2023 09:41:36 +0100 Subject: [PATCH 1/9] KAFKA-14628: Move CommandLineUtils and CommandDefaultOptions shared classes These classes are required by most commands, so they must be migrated first. Signed-off-by: Federico Valeri --- core/src/main/scala/kafka/Kafka.scala | 1 - core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala | 4 ++-- core/src/main/scala/kafka/tools/ConsoleConsumer.scala | 2 ++ 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala index fa0137e9597a1..089bcf8c6d4f5 100755 --- a/core/src/main/scala/kafka/Kafka.scala +++ b/core/src/main/scala/kafka/Kafka.scala @@ -18,7 +18,6 @@ package kafka import java.util.Properties - import joptsimple.OptionParser import kafka.server.{KafkaConfig, KafkaRaftServer, KafkaServer, Server} import kafka.utils.Implicits._ diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 9c0452b781c7e..eec598ef4cdf5 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -1033,7 +1033,7 @@ object ConsumerGroupCommand extends Logging { val describeOpt = parser.accepts("describe", DescribeDoc) val allGroupsOpt = parser.accepts("all-groups", AllGroupsDoc) val deleteOpt = parser.accepts("delete", DeleteDoc) - val timeoutMsOpt = parser.accepts("timeout", TimeoutMsDoc) + val timeoutMsOpt: OptionSpec[Long] = parser.accepts("timeout", TimeoutMsDoc) .withRequiredArg .describedAs("timeout (ms)") .ofType(classOf[Long]) @@ -1047,7 +1047,7 @@ object ConsumerGroupCommand extends Logging { val dryRunOpt = parser.accepts("dry-run", DryRunDoc) val executeOpt = parser.accepts("execute", ExecuteDoc) val exportOpt = parser.accepts("export", ExportDoc) - val resetToOffsetOpt = parser.accepts("to-offset", ResetToOffsetDoc) + val resetToOffsetOpt: OptionSpec[Long] = parser.accepts("to-offset", ResetToOffsetDoc) .withRequiredArg() .describedAs("offset") .ofType(classOf[Long]) diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 2774f13397a62..dd1a2babdc489 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -356,6 +356,8 @@ object ConsoleConsumer extends Logging { def invalidOffset(offset: String): Nothing = ToolsUtils.printUsageAndExit(parser, s"The provided offset value '$offset' is incorrect. Valid values are " + "'earliest', 'latest', or a non-negative long.") + Exit.exit(1) + } val offsetArg = if (options.has(offsetOpt)) { From e40010d31094cdf66517e10cd4423635f5528c24 Mon Sep 17 00:00:00 2001 From: Federico Valeri Date: Thu, 19 Jan 2023 17:54:34 +0100 Subject: [PATCH 2/9] KAFKA-14582: Move JmxTool to tools Signed-off-by: Federico Valeri --- build.gradle | 1 + checkstyle/import-control.xml | 5 +- .../scala/kafka/tools/ConsoleConsumer.scala | 2 - core/src/main/scala/kafka/tools/JmxTool.scala | 275 ----------- tests/kafkatest/services/monitor/jmx.py | 2 +- .../org/apache/kafka/tools/JmxCommand.java | 441 ++++++++++++++++++ 6 files changed, 446 insertions(+), 280 deletions(-) delete mode 100644 core/src/main/scala/kafka/tools/JmxTool.scala create mode 100644 tools/src/main/java/org/apache/kafka/tools/JmxCommand.java diff --git a/build.gradle b/build.gradle index 568838605734e..7761a96a7819d 100644 --- a/build.gradle +++ b/build.gradle @@ -1763,6 +1763,7 @@ project(':tools') { implementation libs.jacksonJDK8Datatypes implementation libs.slf4jApi implementation libs.log4j + implementation libs.joptSimple implementation libs.jose4j // for SASL/OAUTHBEARER JWT validation implementation libs.jacksonJaxrsJsonProvider diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 0e767baee5d28..44f0bb3842252 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -347,7 +347,7 @@ - + @@ -407,7 +407,8 @@ - + + diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index dd1a2babdc489..2774f13397a62 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -356,8 +356,6 @@ object ConsoleConsumer extends Logging { def invalidOffset(offset: String): Nothing = ToolsUtils.printUsageAndExit(parser, s"The provided offset value '$offset' is incorrect. Valid values are " + "'earliest', 'latest', or a non-negative long.") - Exit.exit(1) - } val offsetArg = if (options.has(offsetOpt)) { diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala deleted file mode 100644 index 223c459bcc270..0000000000000 --- a/core/src/main/scala/kafka/tools/JmxTool.scala +++ /dev/null @@ -1,275 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package kafka.tools - -import java.util.{Date, Objects} -import java.text.SimpleDateFormat -import javax.management._ -import javax.management.remote._ -import javax.rmi.ssl.SslRMIClientSocketFactory -import joptsimple.OptionParser - -import scala.jdk.CollectionConverters._ -import scala.collection.mutable -import scala.math._ -import kafka.utils.{Exit, Logging} -import org.apache.kafka.server.util.CommandLineUtils - - -/** - * A program for reading JMX metrics from a given endpoint. - * - * This tool only works reliably if the JmxServer is fully initialized prior to invoking the tool. See KAFKA-4620 for - * details. - */ -object JmxTool extends Logging { - - def main(args: Array[String]): Unit = { - // Parse command line - val parser = new OptionParser(false) - val objectNameOpt = - parser.accepts("object-name", "A JMX object name to use as a query. This can contain wild cards, and this option " + - "can be given multiple times to specify more than one query. If no objects are specified " + - "all objects will be queried.") - .withRequiredArg - .describedAs("name") - .ofType(classOf[String]) - val attributesOpt = - parser.accepts("attributes", "The list of attributes to include in the query. This is a comma-separated list. If no " + - "attributes are specified all objects will be queried.") - .withRequiredArg - .describedAs("name") - .ofType(classOf[String]) - val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in MS with which to poll jmx stats; default value is 2 seconds. " + - "Value of -1 equivalent to setting one-time to true") - .withRequiredArg - .describedAs("ms") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(2000) - val oneTimeOpt = parser.accepts("one-time", "Flag to indicate run once only.") - .withRequiredArg - .describedAs("one-time") - .ofType(classOf[java.lang.Boolean]) - .defaultsTo(false) - val dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " + - "See java.text.SimpleDateFormat for options.") - .withRequiredArg - .describedAs("format") - .ofType(classOf[String]) - val jmxServiceUrlOpt = - parser.accepts("jmx-url", "The url to connect to poll JMX data. See Oracle javadoc for JMXServiceURL for details.") - .withRequiredArg - .describedAs("service-url") - .ofType(classOf[String]) - .defaultsTo("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi") - val reportFormatOpt = parser.accepts("report-format", "output format name: either 'original', 'properties', 'csv', 'tsv' ") - .withRequiredArg - .describedAs("report-format") - .ofType(classOf[java.lang.String]) - .defaultsTo("original") - val jmxAuthPropOpt = parser.accepts("jmx-auth-prop", "A mechanism to pass property in the form 'username=password' " + - "when enabling remote JMX with password authentication.") - .withRequiredArg - .describedAs("jmx-auth-prop") - .ofType(classOf[String]) - val jmxSslEnableOpt = parser.accepts("jmx-ssl-enable", "Flag to enable remote JMX with SSL.") - .withRequiredArg - .describedAs("ssl-enable") - .ofType(classOf[java.lang.Boolean]) - .defaultsTo(false) - val waitOpt = parser.accepts("wait", "Wait for requested JMX objects to become available before starting output. " + - "Only supported when the list of objects is non-empty and contains no object name patterns.") - val helpOpt = parser.accepts("help", "Print usage information.") - - - if(args.isEmpty) - CommandLineUtils.printUsageAndExit(parser, "Dump JMX values to standard output.") - - val options = parser.parse(args : _*) - - if(options.has(helpOpt)) { - parser.printHelpOn(System.out) - Exit.exit(0) - } - - val url = new JMXServiceURL(options.valueOf(jmxServiceUrlOpt)) - val interval = options.valueOf(reportingIntervalOpt).intValue - val oneTime = interval < 0 || options.has(oneTimeOpt) - val attributesIncludeExists = options.has(attributesOpt) - val attributesInclude = if(attributesIncludeExists) Some(options.valueOf(attributesOpt).split(",").filterNot(_.equals(""))) else None - val dateFormatExists = options.has(dateFormatOpt) - val dateFormat = if(dateFormatExists) Some(new SimpleDateFormat(options.valueOf(dateFormatOpt))) else None - val wait = options.has(waitOpt) - - val reportFormat = parseFormat(options.valueOf(reportFormatOpt).toLowerCase) - val reportFormatOriginal = reportFormat.equals("original") - - val enablePasswordAuth = options.has(jmxAuthPropOpt) - val enableSsl = options.has(jmxSslEnableOpt) - - var jmxc: JMXConnector = null - var mbsc: MBeanServerConnection = null - var connected = false - val connectTimeoutMs = 10000 - val connectTestStarted = System.currentTimeMillis - do { - try { - System.err.println(s"Trying to connect to JMX url: $url.") - val env = new java.util.HashMap[String, AnyRef] - // ssl enable - if (enableSsl) { - val csf = new SslRMIClientSocketFactory - env.put("com.sun.jndi.rmi.factory.socket", csf) - } - // password authentication enable - if (enablePasswordAuth) { - val credentials = options.valueOf(jmxAuthPropOpt).split("=", 2) - env.put(JMXConnector.CREDENTIALS, credentials) - } - jmxc = JMXConnectorFactory.connect(url, env) - mbsc = jmxc.getMBeanServerConnection - connected = true - } catch { - case e : Exception => - System.err.println(s"Could not connect to JMX url: $url. Exception ${e.getMessage}.") - e.printStackTrace() - Thread.sleep(100) - } - } while (System.currentTimeMillis - connectTestStarted < connectTimeoutMs && !connected) - - if (!connected) { - System.err.println(s"Could not connect to JMX url $url after $connectTimeoutMs ms.") - System.err.println("Exiting.") - sys.exit(1) - } - - val queries: Iterable[ObjectName] = - if(options.has(objectNameOpt)) - options.valuesOf(objectNameOpt).asScala.map(new ObjectName(_)) - else - List(null) - - val hasPatternQueries = queries.filterNot(Objects.isNull).exists((name: ObjectName) => name.isPattern) - - var names: Iterable[ObjectName] = null - def namesSet = Option(names).toSet.flatten - def foundAllObjects = queries.toSet == namesSet - val waitTimeoutMs = 10000 - if (!hasPatternQueries) { - val start = System.currentTimeMillis - do { - if (names != null) { - System.err.println("Could not find all object names, retrying") - Thread.sleep(100) - } - names = queries.flatMap((name: ObjectName) => mbsc.queryNames(name, null).asScala) - } while (wait && System.currentTimeMillis - start < waitTimeoutMs && !foundAllObjects) - } - - if (wait && !foundAllObjects) { - val missing = (queries.toSet - namesSet).mkString(", ") - System.err.println(s"Could not find all requested object names after $waitTimeoutMs ms. Missing $missing") - System.err.println("Exiting.") - sys.exit(1) - } - - val numExpectedAttributes: Map[ObjectName, Int] = - if (!attributesIncludeExists) - names.map{name: ObjectName => - val mbean = mbsc.getMBeanInfo(name) - (name, mbsc.getAttributes(name, mbean.getAttributes.map(_.getName)).size)}.toMap - else { - if (!hasPatternQueries) - names.map{name: ObjectName => - val mbean = mbsc.getMBeanInfo(name) - val attributes = mbsc.getAttributes(name, mbean.getAttributes.map(_.getName)) - val expectedAttributes = attributes.asScala.asInstanceOf[mutable.Buffer[Attribute]] - .filter(attr => attributesInclude.get.contains(attr.getName)) - (name, expectedAttributes.size)}.toMap.filter(_._2 > 0) - else - queries.map((_, attributesInclude.get.length)).toMap - } - - if(numExpectedAttributes.isEmpty) { - CommandLineUtils.printUsageAndExit(parser, s"No matched attributes for the queried objects $queries.") - } - - // print csv header - val keys = List("time") ++ queryAttributes(mbsc, names, attributesInclude).keys.toArray.sorted - if(reportFormatOriginal && keys.size == numExpectedAttributes.values.sum + 1) { - println(keys.map("\"" + _ + "\"").mkString(",")) - } - - var keepGoing = true - while (keepGoing) { - val start = System.currentTimeMillis - val attributes = queryAttributes(mbsc, names, attributesInclude) - attributes("time") = dateFormat match { - case Some(dFormat) => dFormat.format(new Date) - case None => System.currentTimeMillis().toString - } - if(attributes.keySet.size == numExpectedAttributes.values.sum + 1) { - if(reportFormatOriginal) { - println(keys.map(attributes(_)).mkString(",")) - } - else if(reportFormat.equals("properties")) { - keys.foreach( k => { println(k + "=" + attributes(k) ) } ) - } - else if(reportFormat.equals("csv")) { - keys.foreach( k => { println(k + ",\"" + attributes(k) + "\"" ) } ) - } - else { // tsv - keys.foreach( k => { println(k + "\t" + attributes(k) ) } ) - } - } - - if (oneTime) { - keepGoing = false - } - else { - val sleep = max(0, interval - (System.currentTimeMillis - start)) - Thread.sleep(sleep) - } - } - } - - def queryAttributes(mbsc: MBeanServerConnection, names: Iterable[ObjectName], attributesInclude: Option[Array[String]]): mutable.Map[String, Any] = { - val attributes = new mutable.HashMap[String, Any]() - for (name <- names) { - val mbean = mbsc.getMBeanInfo(name) - for (attrObj <- mbsc.getAttributes(name, mbean.getAttributes.map(_.getName)).asScala) { - val attr = attrObj.asInstanceOf[Attribute] - attributesInclude match { - case Some(allowedAttributes) => - if (allowedAttributes.contains(attr.getName)) - attributes(name.toString + ":" + attr.getName) = attr.getValue - case None => attributes(name.toString + ":" + attr.getName) = attr.getValue - } - } - } - attributes - } - - def parseFormat(reportFormatOpt : String): String = reportFormatOpt match { - case "properties" => "properties" - case "csv" => "csv" - case "tsv" => "tsv" - case _ => "original" - } -} diff --git a/tests/kafkatest/services/monitor/jmx.py b/tests/kafkatest/services/monitor/jmx.py index bff1878fada62..cd33c80d454be 100644 --- a/tests/kafkatest/services/monitor/jmx.py +++ b/tests/kafkatest/services/monitor/jmx.py @@ -141,7 +141,7 @@ def read_jmx_output_all_nodes(self): self.read_jmx_output(self.idx(node), node) def jmx_class_name(self): - return "kafka.tools.JmxTool" + return "org.apache.kafka.tools.JmxCommand" class JmxTool(JmxMixin, KafkaPathResolverMixin): """ diff --git a/tools/src/main/java/org/apache/kafka/tools/JmxCommand.java b/tools/src/main/java/org/apache/kafka/tools/JmxCommand.java new file mode 100644 index 0000000000000..ffa75ce0f5ee0 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/JmxCommand.java @@ -0,0 +1,441 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import joptsimple.OptionSpec; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; + +import javax.management.Attribute; +import javax.management.AttributeList; +import javax.management.MBeanFeatureInfo; +import javax.management.MBeanInfo; +import javax.management.MBeanServerConnection; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; +import javax.rmi.ssl.SslRMIClientSocketFactory; +import java.io.IOException; +import java.net.MalformedURLException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; +import java.util.function.BiPredicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A program for reading JMX metrics from a given endpoint. + *

+ * This tool only works reliably if the JmxServer is fully initialized prior to invoking the tool. + * See KAFKA-4620 for details. + */ +public class JmxCommand { + public static void main(String[] args) { + Exit.exit(mainNoExit(args)); + } + + static int mainNoExit(String... args) { + try { + execute(args); + return 0; + } catch (TerseException e) { + System.err.println(e.getMessage()); + return 1; + } catch (Throwable e) { + System.err.println(e.getMessage()); + System.err.println(Utils.stackTrace(e)); + return 1; + } + } + + static void execute(String... args) throws Exception { + JmxCommandOptions options = new JmxCommandOptions(args); + CommandLineUtils.maybePrintHelpOrVersion(options, "Dump JMX values to standard output."); + + Optional attributesInclude = options.attributesInclude(); + Optional dateFormat = options.dateFormat(); + String reportFormat = options.parseFormat(); + boolean keepGoing = true; + + MBeanServerConnection conn = connectToBeanServer(options); + List queries = options.queries(); + boolean hasPatternQueries = queries.stream().filter(Objects::nonNull).anyMatch(ObjectName::isPattern); + + Set found = findObjectsIfNoPattern(options, conn, queries, hasPatternQueries); + Map numExpectedAttributes = + findNumExpectedAttributes(conn, attributesInclude, hasPatternQueries, queries, found); + + List keys = new ArrayList<>(); + keys.add("time"); + keys.addAll(new TreeSet<>(queryAttributes(conn, found, attributesInclude).keySet())); + maybePrintCsvHeader(reportFormat, keys, numExpectedAttributes); + + while (keepGoing) { + long start = System.currentTimeMillis(); + Map attributes = queryAttributes(conn, found, attributesInclude); + attributes.put("time", dateFormat.isPresent() ? dateFormat.get().format(new Date()) : String.valueOf(System.currentTimeMillis())); + maybePrintDataRows(reportFormat, numExpectedAttributes, keys, attributes); + if (options.isOneTime()) { + keepGoing = false; + } else { + TimeUnit.MILLISECONDS.sleep(Math.max(0, options.interval() - (System.currentTimeMillis() - start))); + } + } + } + + private static String mkString(Stream stream, String delimeter) { + return stream.filter(Objects::nonNull).map(Object::toString).collect(Collectors.joining(delimeter)); + } + + private static int sumValues(Map numExpectedAttributes) { + return numExpectedAttributes.values().stream().mapToInt(Integer::intValue).sum(); + } + + private static String[] attributesNames(MBeanInfo mBeanInfo) { + return Arrays.stream(mBeanInfo.getAttributes()).map(MBeanFeatureInfo::getName).toArray(String[]::new); + } + + private static MBeanServerConnection connectToBeanServer(JmxCommandOptions options) throws Exception { + JMXConnector connector; + MBeanServerConnection serverConn = null; + boolean connected = false; + long connectTimeoutMs = 10_000; + long connectTestStarted = System.currentTimeMillis(); + do { + try { + // printing to stderr because system tests parse the output + System.err.printf("Trying to connect to JMX url: %s%n", options.jmxServiceURL()); + Map env = new HashMap<>(); + // ssl enable + if (options.hasJmxSslEnableOpt()) { + env.put("com.sun.jndi.rmi.factory.socket", new SslRMIClientSocketFactory()); + } + // password authentication enable + if (options.hasJmxAuthPropOpt()) { + env.put(JMXConnector.CREDENTIALS, options.credentials()); + } + connector = JMXConnectorFactory.connect(options.jmxServiceURL(), env); + serverConn = connector.getMBeanServerConnection(); + connected = true; + } catch (Exception e) { + System.err.printf("Could not connect to JMX url: %s. Exception: %s.%n", + options.jmxServiceURL(), e.getMessage()); + e.printStackTrace(); + TimeUnit.MILLISECONDS.sleep(100); + } + } while (System.currentTimeMillis() - connectTestStarted < connectTimeoutMs && !connected); + + if (!connected) { + throw new TerseException(String.format("Could not connect to JMX url %s after %d ms.", + options.jmxServiceURL(), connectTimeoutMs)); + } + return serverConn; + } + + private static Set findObjectsIfNoPattern(JmxCommandOptions options, + MBeanServerConnection conn, + List queries, + boolean hasPatternQueries) throws Exception { + long waitTimeoutMs = 10_000; + Set result = new HashSet<>(); + Set querySet = new HashSet<>(queries); + BiPredicate, Set> foundAllObjects = (s1, s2) -> s1.containsAll(s2); + if (!hasPatternQueries) { + long start = System.currentTimeMillis(); + do { + if (!result.isEmpty()) { + System.err.println("Could not find all object names, retrying"); + TimeUnit.MILLISECONDS.sleep(100); + } + result.addAll(queryObjects(conn, queries)); + } while (options.hasWait() && System.currentTimeMillis() - start < waitTimeoutMs && !foundAllObjects.test(querySet, result)); + } + + if (options.hasWait() && !foundAllObjects.test(querySet, result)) { + querySet.removeAll(result); + String missing = mkString(querySet.stream().map(Object::toString), ","); + throw new TerseException(String.format("Could not find all requested object names after %d ms. Missing %s", waitTimeoutMs, missing)); + } + return result; + } + + private static Set queryObjects(MBeanServerConnection conn, + List queries) { + Set result = new HashSet<>(); + queries.forEach(name -> { + try { + result.addAll(conn.queryNames(name, null)); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + return result; + } + + private static Map findNumExpectedAttributes(MBeanServerConnection conn, + Optional attributesInclude, + boolean hasPatternQueries, + List queries, + Set found) throws Exception { + Map result = new HashMap<>(); + if (!attributesInclude.isPresent()) { + found.forEach(objectName -> { + try { + MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName); + result.put(objectName, conn.getAttributes(objectName, attributesNames(mBeanInfo)).size()); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } else { + if (!hasPatternQueries) { + found.forEach(objectName -> { + try { + MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName); + AttributeList attributes = conn.getAttributes(objectName, attributesNames(mBeanInfo)); + List expectedAttributes = new ArrayList<>(); + attributes.asList().forEach(attribute -> { + if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) { + expectedAttributes.add(objectName); + } + }); + if (expectedAttributes.size() > 0) { + result.put(objectName, expectedAttributes.size()); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } else { + queries.forEach(objectName -> result.put(objectName, attributesInclude.get().length)); + } + } + + if (result.isEmpty()) { + throw new TerseException(String.format("No matched attributes for the queried objects %s.", queries)); + } + return result; + } + + private static Map queryAttributes(MBeanServerConnection conn, + Set objectNames, + Optional attributesInclude) throws Exception { + Map result = new HashMap<>(); + for (ObjectName objectName : objectNames) { + MBeanInfo beanInfo = conn.getMBeanInfo(objectName); + AttributeList attributes = conn.getAttributes(objectName, + Arrays.stream(beanInfo.getAttributes()).map(a -> a.getName()).toArray(String[]::new)); + for (Attribute attribute : attributes.asList()) { + if (attributesInclude.isPresent()) { + if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) { + result.put(String.format("%s:%s", objectName.toString(), attribute.getName()), + attribute.getValue()); + } + } else { + result.put(String.format("%s:%s", objectName.toString(), attribute.getName()), + attribute.getValue()); + } + } + } + return result; + } + + private static void maybePrintCsvHeader(String reportFormat, List keys, Map numExpectedAttributes) { + if (reportFormat.equals("original") && keys.size() == sumValues(numExpectedAttributes) + 1) { + System.out.println(mkString(keys.stream().map(key -> String.format("\"%s\"", key)), ",")); + } + } + + private static void maybePrintDataRows(String reportFormat, Map numExpectedAttributes, List keys, Map attributes) { + if (attributes.keySet().size() == sumValues(numExpectedAttributes) + 1) { + switch (reportFormat) { + case "properties": + keys.forEach(key -> System.out.println(String.format("%s=%s", key, attributes.get(key)))); + break; + case "csv": + keys.forEach(key -> System.out.println(String.format("%s,\"%s\"", key, attributes.get(key)))); + break; + case "tsv": + keys.forEach(key -> System.out.println(String.format("%s\t%s", key, attributes.get(key)))); + break; + default: + System.out.println(mkString(keys.stream().map(attributes::get), ",")); + break; + } + } + } + + private static class JmxCommandOptions extends CommandDefaultOptions { + private final OptionSpec objectNameOpt; + private final OptionSpec attributesOpt; + private final OptionSpec reportingIntervalOpt; + private final OptionSpec oneTimeOpt; + private final OptionSpec dateFormatOpt; + private final OptionSpec jmxServiceUrlOpt; + private final OptionSpec reportFormatOpt; + private final OptionSpec jmxAuthPropOpt; + private final OptionSpec jmxSslEnableOpt; + private final OptionSpec waitOpt; + + public JmxCommandOptions(String[] args) { + super(args); + objectNameOpt = parser.accepts("object-name", "A JMX object name to use as a query. This can contain wild cards, and this option " + + "can be given multiple times to specify more than one query. If no objects are specified " + + "all objects will be queried.") + .withOptionalArg() + .describedAs("name") + .ofType(String.class); + attributesOpt = parser.accepts("attributes", "The list of attributes to include in the query. This is a comma-separated list. If no " + + "attributes are specified all objects will be queried.") + .withRequiredArg() + .describedAs("name") + .ofType(String.class); + reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in MS with which to poll jmx stats; default value is 2 seconds. " + + "Value of -1 equivalent to setting one-time to true") + .withOptionalArg() + .describedAs("ms") + .ofType(Integer.class) + .defaultsTo(2000); + oneTimeOpt = parser.accepts("one-time", "Flag to indicate run once only.") + .withOptionalArg() + .describedAs("one-time") + .ofType(Boolean.class) + .defaultsTo(false); + dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " + + "See java.text.SimpleDateFormat for options.") + .withOptionalArg() + .describedAs("format") + .ofType(String.class); + jmxServiceUrlOpt = parser.accepts("jmx-url", "The url to connect to poll JMX data. See Oracle javadoc for JMXServiceURL for details.") + .withOptionalArg() + .describedAs("service-url") + .ofType(String.class) + .defaultsTo("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi"); + reportFormatOpt = parser.accepts("report-format", "output format name: either 'original', 'properties', 'csv', 'tsv' ") + .withOptionalArg() + .describedAs("report-format") + .ofType(String.class) + .defaultsTo("original"); + jmxAuthPropOpt = parser.accepts("jmx-auth-prop", "A mechanism to pass property in the form 'username=password' " + + "when enabling remote JMX with password authentication.") + .withOptionalArg() + .describedAs("jmx-auth-prop") + .ofType(String.class); + jmxSslEnableOpt = parser.accepts("jmx-ssl-enable", "Flag to enable remote JMX with SSL.") + .withOptionalArg() + .describedAs("ssl-enable") + .ofType(Boolean.class) + .defaultsTo(false); + waitOpt = parser.accepts("wait", "Wait for requested JMX objects to become available before starting output. " + + "Only supported when the list of objects is non-empty and contains no object name patterns."); + options = parser.parse(args); + } + + public JMXServiceURL jmxServiceURL() { + try { + return new JMXServiceURL(options.valueOf(jmxServiceUrlOpt)); + } catch (MalformedURLException e) { + throw new RuntimeException(e); + } + } + + public int interval() { + return options.valueOf(reportingIntervalOpt); + } + + public boolean isOneTime() { + return interval() < 0 || options.has(oneTimeOpt); + } + + public Optional attributesInclude() { + if (options.has(attributesOpt)) { + String[] attributes = Arrays.stream(options.valueOf(attributesOpt).split(",")) + .sequential().filter(s -> !s.isEmpty()).toArray(String[]::new); + return Optional.of(attributes); + } else { + return Optional.empty(); + } + } + + public Optional dateFormat() { + if (options.has(dateFormatOpt)) { + return Optional.of(new SimpleDateFormat(options.valueOf(dateFormatOpt))); + } else { + return Optional.empty(); + } + } + + public boolean hasWait() { + return options.has(waitOpt); + } + + private String parseFormat() { + String reportFormat = options.valueOf(reportFormatOpt).toLowerCase(Locale.ROOT); + if (Arrays.asList("properties", "csv", "tsv").contains(reportFormat)) { + return reportFormat; + } else { + return "original"; + } + } + + public boolean hasJmxAuthPropOpt() { + return options.has(jmxAuthPropOpt); + } + + public boolean hasJmxSslEnableOpt() { + return options.has(jmxSslEnableOpt); + } + + public String[] credentials() { + return options.valueOf(jmxAuthPropOpt).split("=", 2); + } + + public List queries() { + if (options.has(objectNameOpt)) { + return options.valuesOf(objectNameOpt).stream() + .map(s -> { + try { + return new ObjectName(s); + } catch (MalformedObjectNameException e) { + throw new RuntimeException(e); + } + }).collect(Collectors.toList()); + } else { + List listWithNull = new ArrayList<>(); + listWithNull.add(null); + return listWithNull; + } + } + } +} From a167801e33c98537ae6764d6270c153ea4606c21 Mon Sep 17 00:00:00 2001 From: Federico Valeri Date: Wed, 25 Jan 2023 15:30:33 +0100 Subject: [PATCH 3/9] Add MM feedback Signed-off-by: Federico Valeri --- checkstyle/import-control.xml | 2 +- .../org/apache/kafka/tools/JmxCommand.java | 18 +++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 44f0bb3842252..d90f60fa693c4 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -347,7 +347,7 @@ - + diff --git a/tools/src/main/java/org/apache/kafka/tools/JmxCommand.java b/tools/src/main/java/org/apache/kafka/tools/JmxCommand.java index ffa75ce0f5ee0..4a7e1aa13959e 100644 --- a/tools/src/main/java/org/apache/kafka/tools/JmxCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/JmxCommand.java @@ -278,7 +278,7 @@ private static void maybePrintCsvHeader(String reportFormat, List keys, } private static void maybePrintDataRows(String reportFormat, Map numExpectedAttributes, List keys, Map attributes) { - if (attributes.keySet().size() == sumValues(numExpectedAttributes) + 1) { + if (attributes.size() == sumValues(numExpectedAttributes) + 1) { switch (reportFormat) { case "properties": keys.forEach(key -> System.out.println(String.format("%s=%s", key, attributes.get(key)))); @@ -313,7 +313,7 @@ public JmxCommandOptions(String[] args) { objectNameOpt = parser.accepts("object-name", "A JMX object name to use as a query. This can contain wild cards, and this option " + "can be given multiple times to specify more than one query. If no objects are specified " + "all objects will be queried.") - .withOptionalArg() + .withRequiredArg() .describedAs("name") .ofType(String.class); attributesOpt = parser.accepts("attributes", "The list of attributes to include in the query. This is a comma-separated list. If no " + @@ -323,37 +323,37 @@ public JmxCommandOptions(String[] args) { .ofType(String.class); reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in MS with which to poll jmx stats; default value is 2 seconds. " + "Value of -1 equivalent to setting one-time to true") - .withOptionalArg() + .withRequiredArg() .describedAs("ms") .ofType(Integer.class) .defaultsTo(2000); oneTimeOpt = parser.accepts("one-time", "Flag to indicate run once only.") - .withOptionalArg() + .withRequiredArg() .describedAs("one-time") .ofType(Boolean.class) .defaultsTo(false); dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " + "See java.text.SimpleDateFormat for options.") - .withOptionalArg() + .withRequiredArg() .describedAs("format") .ofType(String.class); jmxServiceUrlOpt = parser.accepts("jmx-url", "The url to connect to poll JMX data. See Oracle javadoc for JMXServiceURL for details.") - .withOptionalArg() + .withRequiredArg() .describedAs("service-url") .ofType(String.class) .defaultsTo("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi"); reportFormatOpt = parser.accepts("report-format", "output format name: either 'original', 'properties', 'csv', 'tsv' ") - .withOptionalArg() + .withRequiredArg() .describedAs("report-format") .ofType(String.class) .defaultsTo("original"); jmxAuthPropOpt = parser.accepts("jmx-auth-prop", "A mechanism to pass property in the form 'username=password' " + "when enabling remote JMX with password authentication.") - .withOptionalArg() + .withRequiredArg() .describedAs("jmx-auth-prop") .ofType(String.class); jmxSslEnableOpt = parser.accepts("jmx-ssl-enable", "Flag to enable remote JMX with SSL.") - .withOptionalArg() + .withRequiredArg() .describedAs("ssl-enable") .ofType(Boolean.class) .defaultsTo(false); From 37b998a3fb5704baf7825f7e862b15c08ada9061 Mon Sep 17 00:00:00 2001 From: Federico Valeri Date: Thu, 26 Jan 2023 09:22:04 +0100 Subject: [PATCH 4/9] Add integration test Signed-off-by: Federico Valeri --- .../org/apache/kafka/tools/JmxCommand.java | 87 +++--- .../apache/kafka/tools/JmxCommandTest.java | 284 ++++++++++++++++++ .../apache/kafka/tools/ToolsTestUtils.java | 23 +- .../kafka/tools/TransactionsCommandTest.java | 23 +- 4 files changed, 354 insertions(+), 63 deletions(-) create mode 100644 tools/src/test/java/org/apache/kafka/tools/JmxCommandTest.java diff --git a/tools/src/main/java/org/apache/kafka/tools/JmxCommand.java b/tools/src/main/java/org/apache/kafka/tools/JmxCommand.java index 4a7e1aa13959e..11562229469ff 100644 --- a/tools/src/main/java/org/apache/kafka/tools/JmxCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/JmxCommand.java @@ -62,55 +62,54 @@ */ public class JmxCommand { public static void main(String[] args) { - Exit.exit(mainNoExit(args)); - } - - static int mainNoExit(String... args) { try { - execute(args); - return 0; + JmxCommandOptions options = new JmxCommandOptions(args); + if (CommandLineUtils.isPrintHelpNeeded(options)) { + CommandLineUtils.printUsageAndExit(options.parser, "Dump JMX values to standard output."); + return; + } + if (CommandLineUtils.isPrintVersionNeeded(options)) { + CommandLineUtils.printVersionAndExit(); + return; + } + + Optional attributesInclude = options.attributesInclude(); + Optional dateFormat = options.dateFormat(); + String reportFormat = options.parseFormat(); + boolean keepGoing = true; + + MBeanServerConnection conn = connectToBeanServer(options); + List queries = options.queries(); + boolean hasPatternQueries = queries.stream().filter(Objects::nonNull).anyMatch(ObjectName::isPattern); + + Set found = findObjectsIfNoPattern(options, conn, queries, hasPatternQueries); + Map numExpectedAttributes = + findNumExpectedAttributes(conn, attributesInclude, hasPatternQueries, queries, found); + + List keys = new ArrayList<>(); + keys.add("time"); + keys.addAll(new TreeSet<>(queryAttributes(conn, found, attributesInclude).keySet())); + maybePrintCsvHeader(reportFormat, keys, numExpectedAttributes); + + while (keepGoing) { + long start = System.currentTimeMillis(); + Map attributes = queryAttributes(conn, found, attributesInclude); + attributes.put("time", dateFormat.isPresent() ? dateFormat.get().format(new Date()) : String.valueOf(System.currentTimeMillis())); + maybePrintDataRows(reportFormat, numExpectedAttributes, keys, attributes); + if (options.isOneTime()) { + keepGoing = false; + } else { + TimeUnit.MILLISECONDS.sleep(Math.max(0, options.interval() - (System.currentTimeMillis() - start))); + } + } + Exit.exit(0); } catch (TerseException e) { System.err.println(e.getMessage()); - return 1; + Exit.exit(1); } catch (Throwable e) { System.err.println(e.getMessage()); System.err.println(Utils.stackTrace(e)); - return 1; - } - } - - static void execute(String... args) throws Exception { - JmxCommandOptions options = new JmxCommandOptions(args); - CommandLineUtils.maybePrintHelpOrVersion(options, "Dump JMX values to standard output."); - - Optional attributesInclude = options.attributesInclude(); - Optional dateFormat = options.dateFormat(); - String reportFormat = options.parseFormat(); - boolean keepGoing = true; - - MBeanServerConnection conn = connectToBeanServer(options); - List queries = options.queries(); - boolean hasPatternQueries = queries.stream().filter(Objects::nonNull).anyMatch(ObjectName::isPattern); - - Set found = findObjectsIfNoPattern(options, conn, queries, hasPatternQueries); - Map numExpectedAttributes = - findNumExpectedAttributes(conn, attributesInclude, hasPatternQueries, queries, found); - - List keys = new ArrayList<>(); - keys.add("time"); - keys.addAll(new TreeSet<>(queryAttributes(conn, found, attributesInclude).keySet())); - maybePrintCsvHeader(reportFormat, keys, numExpectedAttributes); - - while (keepGoing) { - long start = System.currentTimeMillis(); - Map attributes = queryAttributes(conn, found, attributesInclude); - attributes.put("time", dateFormat.isPresent() ? dateFormat.get().format(new Date()) : String.valueOf(System.currentTimeMillis())); - maybePrintDataRows(reportFormat, numExpectedAttributes, keys, attributes); - if (options.isOneTime()) { - keepGoing = false; - } else { - TimeUnit.MILLISECONDS.sleep(Math.max(0, options.interval() - (System.currentTimeMillis() - start))); - } + Exit.exit(1); } } @@ -328,7 +327,7 @@ public JmxCommandOptions(String[] args) { .ofType(Integer.class) .defaultsTo(2000); oneTimeOpt = parser.accepts("one-time", "Flag to indicate run once only.") - .withRequiredArg() + .withOptionalArg() .describedAs("one-time") .ofType(Boolean.class) .defaultsTo(false); diff --git a/tools/src/test/java/org/apache/kafka/tools/JmxCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/JmxCommandTest.java new file mode 100644 index 0000000000000..1be3a999c34e3 --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/JmxCommandTest.java @@ -0,0 +1,284 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.common.utils.Exit; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import javax.management.MBeanServer; +import javax.management.ObjectName; +import javax.management.remote.JMXConnectorServer; +import javax.management.remote.JMXConnectorServerFactory; +import javax.management.remote.JMXServiceURL; + +import java.lang.management.ManagementFactory; +import java.net.ServerSocket; +import java.rmi.registry.LocateRegistry; +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Tag("integration") +public class JmxCommandTest { + private final ToolsTestUtils.MockExitProcedure exitProcedure = new ToolsTestUtils.MockExitProcedure(); + + private static JMXConnectorServer jmxAgent; + private static String jmxUrl; + + @BeforeAll + public static void beforeAll() throws Exception { + int port = findRandomOpenPortOnAllLocalInterfaces(); + jmxAgent = startJmxAgent(port); + jmxUrl = String.format("service:jmx:rmi:///jndi/rmi://:%d/jmxrmi", port); + } + + @AfterAll + public static void afterAll() throws Exception { + jmxAgent.stop(); + } + + @BeforeEach + public void beforeEach() { + Exit.setExitProcedure(exitProcedure); + } + + @AfterEach + public void afterEach() { + Exit.resetExitProcedure(); + } + + @Test + public void kafkaVersion() { + String out = executeAndGetOut("--version"); + assertNormalExit(); + assertEquals(AppInfoParser.getVersion(), out); + } + + @Test + public void unrecognizedOption() { + String err = executeAndGetErr("--foo"); + assertCommandFailure(); + assertTrue(err.contains("UnrecognizedOptionException")); + assertTrue(err.contains("foo")); + } + + @Test + public void missingRequired() { + String err = executeAndGetErr("--reporting-interval"); + assertCommandFailure(); + assertTrue(err.contains("OptionMissingRequiredArgumentException")); + assertTrue(err.contains("reporting-interval")); + } + + @Test + public void invalidJmxUrl() { + String err = executeAndGetErr("--jmx-url", String.format("localhost:9999")); + assertCommandFailure(); + assertTrue(err.contains("MalformedURLException")); + } + + @Test + public void helpOptions() { + String[] expectedOptions = new String[]{ + "--attributes", "--date-format", "--help", "--jmx-auth-prop", + "--jmx-ssl-enable", "--jmx-url", "--object-name", "--one-time", + "--report-format", "--reporting-interval", "--version", "--wait" + }; + String err = executeAndGetErr("--help"); + assertCommandFailure(); + for (String option : expectedOptions) { + assertTrue(err.contains(option), option); + } + } + + @Test + public void csvFormat() { + String[] args = new String[]{ + "--jmx-url", jmxUrl, + "--object-name", "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec", + "--report-format", "csv", + "--one-time" + }; + String out = executeAndGetOut(args); + Arrays.stream(out.split("\\r?\\n")).forEach(line -> { + assertTrue(line.matches("([a-zA-Z0-9=:,.]+),\"([ -~]+)\""), line); + }); + } + + @Test + public void tsvFormat() { + String[] args = new String[]{ + "--jmx-url", jmxUrl, + "--object-name", "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec", + "--report-format", "tsv", + "--one-time" + }; + String out = executeAndGetOut(args); + Arrays.stream(out.split("\\r?\\n")).forEach(line -> { + assertTrue(line.matches("([a-zA-Z0-9=:,.]+)\\t([ -~]+)"), line); + }); + } + + @Test + public void allMetrics() { + String[] args = new String[]{ + "--jmx-url", jmxUrl, + "--report-format", "csv", + "--reporting-interval", "-1" + }; + String out = executeAndGetOut(args); + assertNormalExit(); + + Map csv = parseCsv(out); + assertTrue(csv.size() > 0); + } + + @Test + public void filteredMetrics() { + String[] args = new String[]{ + "--jmx-url", jmxUrl, + "--object-name", "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec", + "--attributes", "FifteenMinuteRate,FiveMinuteRate", + "--report-format", "csv", + "--one-time" + }; + String out = executeAndGetOut(args); + assertNormalExit(); + + Map csv = parseCsv(out); + assertEquals("1.0", csv.get("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FifteenMinuteRate")); + assertEquals("3.0", csv.get("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate")); + } + + @Test + public void dateFormat() { + String dateFormat = "yyyyMMdd-hh:mm:ss"; + String[] args = new String[]{ + "--jmx-url", jmxUrl, + "--date-format", dateFormat, + "--report-format", "csv", + "--one-time" + }; + String out = executeAndGetOut(args); + assertNormalExit(); + + Map csv = parseCsv(out); + assertTrue(validDateFormat(dateFormat, csv.get("time"))); + } + + private static JMXConnectorServer startJmxAgent(int port) throws Exception { + LocateRegistry.createRegistry(port); + Map env = new HashMap<>(); + env.put("com.sun.management.jmxremote.authenticate", "false"); + env.put("com.sun.management.jmxremote.ssl", "false"); + JMXServiceURL url = new JMXServiceURL(String.format("service:jmx:rmi:///jndi/rmi://:%d/jmxrmi", port)); + MBeanServer server = ManagementFactory.getPlatformMBeanServer(); + server.registerMBean(new Metrics(), + new ObjectName("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec")); + JMXConnectorServer agent = JMXConnectorServerFactory.newJMXConnectorServer(url, env, server); + agent.start(); + return agent; + } + + private static int findRandomOpenPortOnAllLocalInterfaces() throws Exception { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } + } + + private String executeAndGetOut(String... args) { + return execute(args, false); + } + + private String executeAndGetErr(String... args) { + return execute(args, true); + } + + private String execute(String[] args, boolean err) { + Runnable runnable = () -> { + try { + JmxCommand.main(args); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + return err ? ToolsTestUtils.captureStandardErr(runnable) + : ToolsTestUtils.captureStandardOut(runnable); + } + + private void assertNormalExit() { + assertTrue(exitProcedure.hasExited()); + assertEquals(0, exitProcedure.statusCode()); + } + + private void assertCommandFailure() { + assertTrue(exitProcedure.hasExited()); + assertEquals(1, exitProcedure.statusCode()); + } + + private Map parseCsv(String value) { + Map result = new HashMap<>(); + Arrays.stream(value.split("\\r?\\n")).forEach(line -> { + String[] cells = line.split(",\""); + if (cells.length == 2) { + result.put(cells[0], cells[1].replaceAll("\"", "")); + } + }); + return result; + } + + private boolean validDateFormat(String format, String value) { + DateFormat formatter = new SimpleDateFormat(format); + formatter.setLenient(false); + try { + formatter.parse(value); + return true; + } catch (ParseException e) { + e.printStackTrace(); + return false; + } + } + + public interface MetricsMBean { + double getFifteenMinuteRate(); + double getFiveMinuteRate(); + } + + public static class Metrics implements MetricsMBean { + @Override + public double getFifteenMinuteRate() { + return 1.0; + } + + @Override + public double getFiveMinuteRate() { + return 3.0; + } + } +} diff --git a/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java b/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java index 4664288ae4fee..709629eef6a29 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java +++ b/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java @@ -16,11 +16,12 @@ */ package org.apache.kafka.tools; +import org.apache.kafka.common.utils.Exit; + import java.io.ByteArrayOutputStream; import java.io.PrintStream; public class ToolsTestUtils { - public static String captureStandardOut(Runnable runnable) { return captureStandardStream(false, runnable); } @@ -48,4 +49,24 @@ private static String captureStandardStream(boolean isErr, Runnable runnable) { } } + public static class MockExitProcedure implements Exit.Procedure { + private boolean hasExited = false; + private int statusCode; + + @Override + public void execute(int statusCode, String message) { + if (!this.hasExited) { + this.hasExited = true; + this.statusCode = statusCode; + } + } + + public boolean hasExited() { + return hasExited; + } + + public int statusCode() { + return statusCode; + } + } } diff --git a/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java index 3dc77a6d0f66e..ef7b5b2ab9930 100644 --- a/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java @@ -79,7 +79,7 @@ public class TransactionsCommandTest { - private final MockExitProcedure exitProcedure = new MockExitProcedure(); + private final ToolsTestUtils.MockExitProcedure exitProcedure = new ToolsTestUtils.MockExitProcedure(); private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); private final PrintStream out = new PrintStream(outputStream); private final MockTime time = new MockTime(); @@ -1048,27 +1048,14 @@ private List readRow(BufferedReader reader) throws IOException { } private void assertNormalExit() { - assertTrue(exitProcedure.hasExited); - assertEquals(0, exitProcedure.statusCode); + assertTrue(exitProcedure.hasExited()); + assertEquals(0, exitProcedure.statusCode()); } private void assertCommandFailure(String[] args) throws Exception { execute(args); - assertTrue(exitProcedure.hasExited); - assertEquals(1, exitProcedure.statusCode); - } - - private static class MockExitProcedure implements Exit.Procedure { - private boolean hasExited = false; - private int statusCode; - - @Override - public void execute(int statusCode, String message) { - if (!this.hasExited) { - this.hasExited = true; - this.statusCode = statusCode; - } - } + assertTrue(exitProcedure.hasExited()); + assertEquals(1, exitProcedure.statusCode()); } } From 42134497484247a5e53cb3c4e95f2d985a124549 Mon Sep 17 00:00:00 2001 From: Federico Valeri Date: Fri, 27 Jan 2023 16:39:35 +0100 Subject: [PATCH 5/9] Switch back to the original class name Signed-off-by: Federico Valeri --- tests/kafkatest/services/monitor/jmx.py | 2 +- .../kafka/tools/{JmxCommand.java => JmxTool.java} | 12 ++++++------ .../tools/{JmxCommandTest.java => JmxToolTest.java} | 4 ++-- 3 files changed, 9 insertions(+), 9 deletions(-) rename tools/src/main/java/org/apache/kafka/tools/{JmxCommand.java => JmxTool.java} (98%) rename tools/src/test/java/org/apache/kafka/tools/{JmxCommandTest.java => JmxToolTest.java} (99%) diff --git a/tests/kafkatest/services/monitor/jmx.py b/tests/kafkatest/services/monitor/jmx.py index cd33c80d454be..03eec51f7173a 100644 --- a/tests/kafkatest/services/monitor/jmx.py +++ b/tests/kafkatest/services/monitor/jmx.py @@ -141,7 +141,7 @@ def read_jmx_output_all_nodes(self): self.read_jmx_output(self.idx(node), node) def jmx_class_name(self): - return "org.apache.kafka.tools.JmxCommand" + return "org.apache.kafka.tools.JmxTool" class JmxTool(JmxMixin, KafkaPathResolverMixin): """ diff --git a/tools/src/main/java/org/apache/kafka/tools/JmxCommand.java b/tools/src/main/java/org/apache/kafka/tools/JmxTool.java similarity index 98% rename from tools/src/main/java/org/apache/kafka/tools/JmxCommand.java rename to tools/src/main/java/org/apache/kafka/tools/JmxTool.java index 11562229469ff..da305ee12855f 100644 --- a/tools/src/main/java/org/apache/kafka/tools/JmxCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/JmxTool.java @@ -60,10 +60,10 @@ * This tool only works reliably if the JmxServer is fully initialized prior to invoking the tool. * See KAFKA-4620 for details. */ -public class JmxCommand { +public class JmxTool { public static void main(String[] args) { try { - JmxCommandOptions options = new JmxCommandOptions(args); + JmxToolOptions options = new JmxToolOptions(args); if (CommandLineUtils.isPrintHelpNeeded(options)) { CommandLineUtils.printUsageAndExit(options.parser, "Dump JMX values to standard output."); return; @@ -125,7 +125,7 @@ private static String[] attributesNames(MBeanInfo mBeanInfo) { return Arrays.stream(mBeanInfo.getAttributes()).map(MBeanFeatureInfo::getName).toArray(String[]::new); } - private static MBeanServerConnection connectToBeanServer(JmxCommandOptions options) throws Exception { + private static MBeanServerConnection connectToBeanServer(JmxToolOptions options) throws Exception { JMXConnector connector; MBeanServerConnection serverConn = null; boolean connected = false; @@ -162,7 +162,7 @@ private static MBeanServerConnection connectToBeanServer(JmxCommandOptions optio return serverConn; } - private static Set findObjectsIfNoPattern(JmxCommandOptions options, + private static Set findObjectsIfNoPattern(JmxToolOptions options, MBeanServerConnection conn, List queries, boolean hasPatternQueries) throws Exception { @@ -295,7 +295,7 @@ private static void maybePrintDataRows(String reportFormat, Map objectNameOpt; private final OptionSpec attributesOpt; private final OptionSpec reportingIntervalOpt; @@ -307,7 +307,7 @@ private static class JmxCommandOptions extends CommandDefaultOptions { private final OptionSpec jmxSslEnableOpt; private final OptionSpec waitOpt; - public JmxCommandOptions(String[] args) { + public JmxToolOptions(String[] args) { super(args); objectNameOpt = parser.accepts("object-name", "A JMX object name to use as a query. This can contain wild cards, and this option " + "can be given multiple times to specify more than one query. If no objects are specified " + diff --git a/tools/src/test/java/org/apache/kafka/tools/JmxCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java similarity index 99% rename from tools/src/test/java/org/apache/kafka/tools/JmxCommandTest.java rename to tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java index 1be3a999c34e3..e2b422505e914 100644 --- a/tools/src/test/java/org/apache/kafka/tools/JmxCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java @@ -45,7 +45,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; @Tag("integration") -public class JmxCommandTest { +public class JmxToolTest { private final ToolsTestUtils.MockExitProcedure exitProcedure = new ToolsTestUtils.MockExitProcedure(); private static JMXConnectorServer jmxAgent; @@ -223,7 +223,7 @@ private String executeAndGetErr(String... args) { private String execute(String[] args, boolean err) { Runnable runnable = () -> { try { - JmxCommand.main(args); + JmxTool.main(args); } catch (Exception e) { throw new RuntimeException(e); } From 607ff2434d5cad93b944975c2a62e39ec0ded3d3 Mon Sep 17 00:00:00 2001 From: Federico Valeri Date: Mon, 30 Jan 2023 18:11:18 +0100 Subject: [PATCH 6/9] Minor improvements Signed-off-by: Federico Valeri --- .../java/org/apache/kafka/tools/JmxTool.java | 92 +++++++++---------- 1 file changed, 43 insertions(+), 49 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/JmxTool.java b/tools/src/main/java/org/apache/kafka/tools/JmxTool.java index da305ee12855f..ae60ca3a99738 100644 --- a/tools/src/main/java/org/apache/kafka/tools/JmxTool.java +++ b/tools/src/main/java/org/apache/kafka/tools/JmxTool.java @@ -310,54 +310,54 @@ private static class JmxToolOptions extends CommandDefaultOptions { public JmxToolOptions(String[] args) { super(args); objectNameOpt = parser.accepts("object-name", "A JMX object name to use as a query. This can contain wild cards, and this option " + - "can be given multiple times to specify more than one query. If no objects are specified " + - "all objects will be queried.") - .withRequiredArg() - .describedAs("name") - .ofType(String.class); + "can be given multiple times to specify more than one query. If no objects are specified " + + "all objects will be queried.") + .withRequiredArg() + .describedAs("name") + .ofType(String.class); attributesOpt = parser.accepts("attributes", "The list of attributes to include in the query. This is a comma-separated list. If no " + - "attributes are specified all objects will be queried.") - .withRequiredArg() - .describedAs("name") - .ofType(String.class); + "attributes are specified all objects will be queried.") + .withRequiredArg() + .describedAs("name") + .ofType(String.class); reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in MS with which to poll jmx stats; default value is 2 seconds. " + - "Value of -1 equivalent to setting one-time to true") - .withRequiredArg() - .describedAs("ms") - .ofType(Integer.class) - .defaultsTo(2000); + "Value of -1 equivalent to setting one-time to true") + .withRequiredArg() + .describedAs("ms") + .ofType(Integer.class) + .defaultsTo(2000); oneTimeOpt = parser.accepts("one-time", "Flag to indicate run once only.") - .withOptionalArg() - .describedAs("one-time") - .ofType(Boolean.class) - .defaultsTo(false); + .withOptionalArg() + .describedAs("one-time") + .ofType(Boolean.class) + .defaultsTo(false); dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " + - "See java.text.SimpleDateFormat for options.") - .withRequiredArg() - .describedAs("format") - .ofType(String.class); + "See java.text.SimpleDateFormat for options.") + .withRequiredArg() + .describedAs("format") + .ofType(String.class); jmxServiceUrlOpt = parser.accepts("jmx-url", "The url to connect to poll JMX data. See Oracle javadoc for JMXServiceURL for details.") - .withRequiredArg() - .describedAs("service-url") - .ofType(String.class) - .defaultsTo("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi"); + .withRequiredArg() + .describedAs("service-url") + .ofType(String.class) + .defaultsTo("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi"); reportFormatOpt = parser.accepts("report-format", "output format name: either 'original', 'properties', 'csv', 'tsv' ") - .withRequiredArg() - .describedAs("report-format") - .ofType(String.class) - .defaultsTo("original"); + .withRequiredArg() + .describedAs("report-format") + .ofType(String.class) + .defaultsTo("original"); jmxAuthPropOpt = parser.accepts("jmx-auth-prop", "A mechanism to pass property in the form 'username=password' " + - "when enabling remote JMX with password authentication.") - .withRequiredArg() - .describedAs("jmx-auth-prop") - .ofType(String.class); + "when enabling remote JMX with password authentication.") + .withRequiredArg() + .describedAs("jmx-auth-prop") + .ofType(String.class); jmxSslEnableOpt = parser.accepts("jmx-ssl-enable", "Flag to enable remote JMX with SSL.") - .withRequiredArg() - .describedAs("ssl-enable") - .ofType(Boolean.class) - .defaultsTo(false); + .withRequiredArg() + .describedAs("ssl-enable") + .ofType(Boolean.class) + .defaultsTo(false); waitOpt = parser.accepts("wait", "Wait for requested JMX objects to become available before starting output. " + - "Only supported when the list of objects is non-empty and contains no object name patterns."); + "Only supported when the list of objects is non-empty and contains no object name patterns."); options = parser.parse(args); } @@ -388,11 +388,9 @@ public Optional attributesInclude() { } public Optional dateFormat() { - if (options.has(dateFormatOpt)) { - return Optional.of(new SimpleDateFormat(options.valueOf(dateFormatOpt))); - } else { - return Optional.empty(); - } + return options.has(dateFormatOpt) + ? Optional.of(new SimpleDateFormat(options.valueOf(dateFormatOpt))) + : Optional.empty(); } public boolean hasWait() { @@ -401,11 +399,7 @@ public boolean hasWait() { private String parseFormat() { String reportFormat = options.valueOf(reportFormatOpt).toLowerCase(Locale.ROOT); - if (Arrays.asList("properties", "csv", "tsv").contains(reportFormat)) { - return reportFormat; - } else { - return "original"; - } + return Arrays.asList("properties", "csv", "tsv").contains(reportFormat) ? reportFormat : "original"; } public boolean hasJmxAuthPropOpt() { From f825f876719a477a04878185a857485d5ffbff2e Mon Sep 17 00:00:00 2001 From: Federico Valeri Date: Wed, 1 Feb 2023 09:52:22 +0100 Subject: [PATCH 7/9] Add changes from Mickael feedback Signed-off-by: Federico Valeri --- checkstyle/import-control.xml | 2 +- core/src/main/scala/kafka/Kafka.scala | 1 + .../scala/kafka/admin/ConsumerGroupCommand.scala | 4 ++-- .../main/java/org/apache/kafka/tools/JmxTool.java | 14 +++++--------- .../java/org/apache/kafka/tools/JmxToolTest.java | 6 ++---- 5 files changed, 11 insertions(+), 16 deletions(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index d90f60fa693c4..303fdab97d2f8 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -407,7 +407,7 @@ - + diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala index 089bcf8c6d4f5..fa0137e9597a1 100755 --- a/core/src/main/scala/kafka/Kafka.scala +++ b/core/src/main/scala/kafka/Kafka.scala @@ -18,6 +18,7 @@ package kafka import java.util.Properties + import joptsimple.OptionParser import kafka.server.{KafkaConfig, KafkaRaftServer, KafkaServer, Server} import kafka.utils.Implicits._ diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index eec598ef4cdf5..9c0452b781c7e 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -1033,7 +1033,7 @@ object ConsumerGroupCommand extends Logging { val describeOpt = parser.accepts("describe", DescribeDoc) val allGroupsOpt = parser.accepts("all-groups", AllGroupsDoc) val deleteOpt = parser.accepts("delete", DeleteDoc) - val timeoutMsOpt: OptionSpec[Long] = parser.accepts("timeout", TimeoutMsDoc) + val timeoutMsOpt = parser.accepts("timeout", TimeoutMsDoc) .withRequiredArg .describedAs("timeout (ms)") .ofType(classOf[Long]) @@ -1047,7 +1047,7 @@ object ConsumerGroupCommand extends Logging { val dryRunOpt = parser.accepts("dry-run", DryRunDoc) val executeOpt = parser.accepts("execute", ExecuteDoc) val exportOpt = parser.accepts("export", ExportDoc) - val resetToOffsetOpt: OptionSpec[Long] = parser.accepts("to-offset", ResetToOffsetDoc) + val resetToOffsetOpt = parser.accepts("to-offset", ResetToOffsetDoc) .withRequiredArg() .describedAs("offset") .ofType(classOf[Long]) diff --git a/tools/src/main/java/org/apache/kafka/tools/JmxTool.java b/tools/src/main/java/org/apache/kafka/tools/JmxTool.java index ae60ca3a99738..ea75748f68cf7 100644 --- a/tools/src/main/java/org/apache/kafka/tools/JmxTool.java +++ b/tools/src/main/java/org/apache/kafka/tools/JmxTool.java @@ -280,13 +280,13 @@ private static void maybePrintDataRows(String reportFormat, Map System.out.println(String.format("%s=%s", key, attributes.get(key)))); + keys.forEach(key -> System.out.printf("%s=%s%n", key, attributes.get(key))); break; case "csv": - keys.forEach(key -> System.out.println(String.format("%s,\"%s\"", key, attributes.get(key)))); + keys.forEach(key -> System.out.printf("%s,\"%s\"%n", key, attributes.get(key))); break; case "tsv": - keys.forEach(key -> System.out.println(String.format("%s\t%s", key, attributes.get(key)))); + keys.forEach(key -> System.out.printf("%s\t%s%n", key, attributes.get(key))); break; default: System.out.println(mkString(keys.stream().map(attributes::get), ",")); @@ -361,12 +361,8 @@ public JmxToolOptions(String[] args) { options = parser.parse(args); } - public JMXServiceURL jmxServiceURL() { - try { - return new JMXServiceURL(options.valueOf(jmxServiceUrlOpt)); - } catch (MalformedURLException e) { - throw new RuntimeException(e); - } + public JMXServiceURL jmxServiceURL() throws MalformedURLException { + return new JMXServiceURL(options.valueOf(jmxServiceUrlOpt)); } public int interval() { diff --git a/tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java b/tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java index e2b422505e914..34644f34430e1 100644 --- a/tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java @@ -22,7 +22,6 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import javax.management.MBeanServer; @@ -44,7 +43,6 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; -@Tag("integration") public class JmxToolTest { private final ToolsTestUtils.MockExitProcedure exitProcedure = new ToolsTestUtils.MockExitProcedure(); @@ -97,8 +95,8 @@ public void missingRequired() { } @Test - public void invalidJmxUrl() { - String err = executeAndGetErr("--jmx-url", String.format("localhost:9999")); + public void malformedURL() { + String err = executeAndGetErr("--jmx-url", "localhost:9999"); assertCommandFailure(); assertTrue(err.contains("MalformedURLException")); } From d86dc35a37d4118d775ecc20439cce6c96597d39 Mon Sep 17 00:00:00 2001 From: Federico Valeri Date: Wed, 1 Feb 2023 18:12:17 +0100 Subject: [PATCH 8/9] Make Idea JUnit pluging happy Signed-off-by: Federico Valeri --- tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java b/tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java index 34644f34430e1..cbd9547d58054 100644 --- a/tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java @@ -75,7 +75,7 @@ public void afterEach() { public void kafkaVersion() { String out = executeAndGetOut("--version"); assertNormalExit(); - assertEquals(AppInfoParser.getVersion(), out); + assertTrue(out.contains(AppInfoParser.getVersion())); } @Test From aa997521c6881cd1148fc8859434a8284b27e757 Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Thu, 2 Feb 2023 11:19:21 +0100 Subject: [PATCH 9/9] Remove printStackTrace() --- tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java b/tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java index cbd9547d58054..8314d16405806 100644 --- a/tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java @@ -258,7 +258,6 @@ private boolean validDateFormat(String format, String value) { formatter.parse(value); return true; } catch (ParseException e) { - e.printStackTrace(); return false; } }