diff --git a/build.gradle b/build.gradle index 568838605734e..7761a96a7819d 100644 --- a/build.gradle +++ b/build.gradle @@ -1763,6 +1763,7 @@ project(':tools') { implementation libs.jacksonJDK8Datatypes implementation libs.slf4jApi implementation libs.log4j + implementation libs.joptSimple implementation libs.jose4j // for SASL/OAUTHBEARER JWT validation implementation libs.jacksonJaxrsJsonProvider diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 0e767baee5d28..303fdab97d2f8 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -408,6 +408,7 @@ + diff --git a/core/src/main/scala/kafka/tools/JmxTool.scala b/core/src/main/scala/kafka/tools/JmxTool.scala deleted file mode 100644 index 223c459bcc270..0000000000000 --- a/core/src/main/scala/kafka/tools/JmxTool.scala +++ /dev/null @@ -1,275 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package kafka.tools - -import java.util.{Date, Objects} -import java.text.SimpleDateFormat -import javax.management._ -import javax.management.remote._ -import javax.rmi.ssl.SslRMIClientSocketFactory -import joptsimple.OptionParser - -import scala.jdk.CollectionConverters._ -import scala.collection.mutable -import scala.math._ -import kafka.utils.{Exit, Logging} -import org.apache.kafka.server.util.CommandLineUtils - - -/** - * A program for reading JMX metrics from a given endpoint. - * - * This tool only works reliably if the JmxServer is fully initialized prior to invoking the tool. See KAFKA-4620 for - * details. - */ -object JmxTool extends Logging { - - def main(args: Array[String]): Unit = { - // Parse command line - val parser = new OptionParser(false) - val objectNameOpt = - parser.accepts("object-name", "A JMX object name to use as a query. This can contain wild cards, and this option " + - "can be given multiple times to specify more than one query. If no objects are specified " + - "all objects will be queried.") - .withRequiredArg - .describedAs("name") - .ofType(classOf[String]) - val attributesOpt = - parser.accepts("attributes", "The list of attributes to include in the query. This is a comma-separated list. If no " + - "attributes are specified all objects will be queried.") - .withRequiredArg - .describedAs("name") - .ofType(classOf[String]) - val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in MS with which to poll jmx stats; default value is 2 seconds. " + - "Value of -1 equivalent to setting one-time to true") - .withRequiredArg - .describedAs("ms") - .ofType(classOf[java.lang.Integer]) - .defaultsTo(2000) - val oneTimeOpt = parser.accepts("one-time", "Flag to indicate run once only.") - .withRequiredArg - .describedAs("one-time") - .ofType(classOf[java.lang.Boolean]) - .defaultsTo(false) - val dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " + - "See java.text.SimpleDateFormat for options.") - .withRequiredArg - .describedAs("format") - .ofType(classOf[String]) - val jmxServiceUrlOpt = - parser.accepts("jmx-url", "The url to connect to poll JMX data. See Oracle javadoc for JMXServiceURL for details.") - .withRequiredArg - .describedAs("service-url") - .ofType(classOf[String]) - .defaultsTo("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi") - val reportFormatOpt = parser.accepts("report-format", "output format name: either 'original', 'properties', 'csv', 'tsv' ") - .withRequiredArg - .describedAs("report-format") - .ofType(classOf[java.lang.String]) - .defaultsTo("original") - val jmxAuthPropOpt = parser.accepts("jmx-auth-prop", "A mechanism to pass property in the form 'username=password' " + - "when enabling remote JMX with password authentication.") - .withRequiredArg - .describedAs("jmx-auth-prop") - .ofType(classOf[String]) - val jmxSslEnableOpt = parser.accepts("jmx-ssl-enable", "Flag to enable remote JMX with SSL.") - .withRequiredArg - .describedAs("ssl-enable") - .ofType(classOf[java.lang.Boolean]) - .defaultsTo(false) - val waitOpt = parser.accepts("wait", "Wait for requested JMX objects to become available before starting output. " + - "Only supported when the list of objects is non-empty and contains no object name patterns.") - val helpOpt = parser.accepts("help", "Print usage information.") - - - if(args.isEmpty) - CommandLineUtils.printUsageAndExit(parser, "Dump JMX values to standard output.") - - val options = parser.parse(args : _*) - - if(options.has(helpOpt)) { - parser.printHelpOn(System.out) - Exit.exit(0) - } - - val url = new JMXServiceURL(options.valueOf(jmxServiceUrlOpt)) - val interval = options.valueOf(reportingIntervalOpt).intValue - val oneTime = interval < 0 || options.has(oneTimeOpt) - val attributesIncludeExists = options.has(attributesOpt) - val attributesInclude = if(attributesIncludeExists) Some(options.valueOf(attributesOpt).split(",").filterNot(_.equals(""))) else None - val dateFormatExists = options.has(dateFormatOpt) - val dateFormat = if(dateFormatExists) Some(new SimpleDateFormat(options.valueOf(dateFormatOpt))) else None - val wait = options.has(waitOpt) - - val reportFormat = parseFormat(options.valueOf(reportFormatOpt).toLowerCase) - val reportFormatOriginal = reportFormat.equals("original") - - val enablePasswordAuth = options.has(jmxAuthPropOpt) - val enableSsl = options.has(jmxSslEnableOpt) - - var jmxc: JMXConnector = null - var mbsc: MBeanServerConnection = null - var connected = false - val connectTimeoutMs = 10000 - val connectTestStarted = System.currentTimeMillis - do { - try { - System.err.println(s"Trying to connect to JMX url: $url.") - val env = new java.util.HashMap[String, AnyRef] - // ssl enable - if (enableSsl) { - val csf = new SslRMIClientSocketFactory - env.put("com.sun.jndi.rmi.factory.socket", csf) - } - // password authentication enable - if (enablePasswordAuth) { - val credentials = options.valueOf(jmxAuthPropOpt).split("=", 2) - env.put(JMXConnector.CREDENTIALS, credentials) - } - jmxc = JMXConnectorFactory.connect(url, env) - mbsc = jmxc.getMBeanServerConnection - connected = true - } catch { - case e : Exception => - System.err.println(s"Could not connect to JMX url: $url. Exception ${e.getMessage}.") - e.printStackTrace() - Thread.sleep(100) - } - } while (System.currentTimeMillis - connectTestStarted < connectTimeoutMs && !connected) - - if (!connected) { - System.err.println(s"Could not connect to JMX url $url after $connectTimeoutMs ms.") - System.err.println("Exiting.") - sys.exit(1) - } - - val queries: Iterable[ObjectName] = - if(options.has(objectNameOpt)) - options.valuesOf(objectNameOpt).asScala.map(new ObjectName(_)) - else - List(null) - - val hasPatternQueries = queries.filterNot(Objects.isNull).exists((name: ObjectName) => name.isPattern) - - var names: Iterable[ObjectName] = null - def namesSet = Option(names).toSet.flatten - def foundAllObjects = queries.toSet == namesSet - val waitTimeoutMs = 10000 - if (!hasPatternQueries) { - val start = System.currentTimeMillis - do { - if (names != null) { - System.err.println("Could not find all object names, retrying") - Thread.sleep(100) - } - names = queries.flatMap((name: ObjectName) => mbsc.queryNames(name, null).asScala) - } while (wait && System.currentTimeMillis - start < waitTimeoutMs && !foundAllObjects) - } - - if (wait && !foundAllObjects) { - val missing = (queries.toSet - namesSet).mkString(", ") - System.err.println(s"Could not find all requested object names after $waitTimeoutMs ms. Missing $missing") - System.err.println("Exiting.") - sys.exit(1) - } - - val numExpectedAttributes: Map[ObjectName, Int] = - if (!attributesIncludeExists) - names.map{name: ObjectName => - val mbean = mbsc.getMBeanInfo(name) - (name, mbsc.getAttributes(name, mbean.getAttributes.map(_.getName)).size)}.toMap - else { - if (!hasPatternQueries) - names.map{name: ObjectName => - val mbean = mbsc.getMBeanInfo(name) - val attributes = mbsc.getAttributes(name, mbean.getAttributes.map(_.getName)) - val expectedAttributes = attributes.asScala.asInstanceOf[mutable.Buffer[Attribute]] - .filter(attr => attributesInclude.get.contains(attr.getName)) - (name, expectedAttributes.size)}.toMap.filter(_._2 > 0) - else - queries.map((_, attributesInclude.get.length)).toMap - } - - if(numExpectedAttributes.isEmpty) { - CommandLineUtils.printUsageAndExit(parser, s"No matched attributes for the queried objects $queries.") - } - - // print csv header - val keys = List("time") ++ queryAttributes(mbsc, names, attributesInclude).keys.toArray.sorted - if(reportFormatOriginal && keys.size == numExpectedAttributes.values.sum + 1) { - println(keys.map("\"" + _ + "\"").mkString(",")) - } - - var keepGoing = true - while (keepGoing) { - val start = System.currentTimeMillis - val attributes = queryAttributes(mbsc, names, attributesInclude) - attributes("time") = dateFormat match { - case Some(dFormat) => dFormat.format(new Date) - case None => System.currentTimeMillis().toString - } - if(attributes.keySet.size == numExpectedAttributes.values.sum + 1) { - if(reportFormatOriginal) { - println(keys.map(attributes(_)).mkString(",")) - } - else if(reportFormat.equals("properties")) { - keys.foreach( k => { println(k + "=" + attributes(k) ) } ) - } - else if(reportFormat.equals("csv")) { - keys.foreach( k => { println(k + ",\"" + attributes(k) + "\"" ) } ) - } - else { // tsv - keys.foreach( k => { println(k + "\t" + attributes(k) ) } ) - } - } - - if (oneTime) { - keepGoing = false - } - else { - val sleep = max(0, interval - (System.currentTimeMillis - start)) - Thread.sleep(sleep) - } - } - } - - def queryAttributes(mbsc: MBeanServerConnection, names: Iterable[ObjectName], attributesInclude: Option[Array[String]]): mutable.Map[String, Any] = { - val attributes = new mutable.HashMap[String, Any]() - for (name <- names) { - val mbean = mbsc.getMBeanInfo(name) - for (attrObj <- mbsc.getAttributes(name, mbean.getAttributes.map(_.getName)).asScala) { - val attr = attrObj.asInstanceOf[Attribute] - attributesInclude match { - case Some(allowedAttributes) => - if (allowedAttributes.contains(attr.getName)) - attributes(name.toString + ":" + attr.getName) = attr.getValue - case None => attributes(name.toString + ":" + attr.getName) = attr.getValue - } - } - } - attributes - } - - def parseFormat(reportFormatOpt : String): String = reportFormatOpt match { - case "properties" => "properties" - case "csv" => "csv" - case "tsv" => "tsv" - case _ => "original" - } -} diff --git a/tests/kafkatest/services/monitor/jmx.py b/tests/kafkatest/services/monitor/jmx.py index bff1878fada62..03eec51f7173a 100644 --- a/tests/kafkatest/services/monitor/jmx.py +++ b/tests/kafkatest/services/monitor/jmx.py @@ -141,7 +141,7 @@ def read_jmx_output_all_nodes(self): self.read_jmx_output(self.idx(node), node) def jmx_class_name(self): - return "kafka.tools.JmxTool" + return "org.apache.kafka.tools.JmxTool" class JmxTool(JmxMixin, KafkaPathResolverMixin): """ diff --git a/tools/src/main/java/org/apache/kafka/tools/JmxTool.java b/tools/src/main/java/org/apache/kafka/tools/JmxTool.java new file mode 100644 index 0000000000000..ea75748f68cf7 --- /dev/null +++ b/tools/src/main/java/org/apache/kafka/tools/JmxTool.java @@ -0,0 +1,430 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import joptsimple.OptionSpec; +import org.apache.kafka.common.utils.Exit; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.CommandDefaultOptions; +import org.apache.kafka.server.util.CommandLineUtils; + +import javax.management.Attribute; +import javax.management.AttributeList; +import javax.management.MBeanFeatureInfo; +import javax.management.MBeanInfo; +import javax.management.MBeanServerConnection; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; +import javax.rmi.ssl.SslRMIClientSocketFactory; +import java.io.IOException; +import java.net.MalformedURLException; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.TimeUnit; +import java.util.function.BiPredicate; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A program for reading JMX metrics from a given endpoint. + *

+ * This tool only works reliably if the JmxServer is fully initialized prior to invoking the tool. + * See KAFKA-4620 for details. + */ +public class JmxTool { + public static void main(String[] args) { + try { + JmxToolOptions options = new JmxToolOptions(args); + if (CommandLineUtils.isPrintHelpNeeded(options)) { + CommandLineUtils.printUsageAndExit(options.parser, "Dump JMX values to standard output."); + return; + } + if (CommandLineUtils.isPrintVersionNeeded(options)) { + CommandLineUtils.printVersionAndExit(); + return; + } + + Optional attributesInclude = options.attributesInclude(); + Optional dateFormat = options.dateFormat(); + String reportFormat = options.parseFormat(); + boolean keepGoing = true; + + MBeanServerConnection conn = connectToBeanServer(options); + List queries = options.queries(); + boolean hasPatternQueries = queries.stream().filter(Objects::nonNull).anyMatch(ObjectName::isPattern); + + Set found = findObjectsIfNoPattern(options, conn, queries, hasPatternQueries); + Map numExpectedAttributes = + findNumExpectedAttributes(conn, attributesInclude, hasPatternQueries, queries, found); + + List keys = new ArrayList<>(); + keys.add("time"); + keys.addAll(new TreeSet<>(queryAttributes(conn, found, attributesInclude).keySet())); + maybePrintCsvHeader(reportFormat, keys, numExpectedAttributes); + + while (keepGoing) { + long start = System.currentTimeMillis(); + Map attributes = queryAttributes(conn, found, attributesInclude); + attributes.put("time", dateFormat.isPresent() ? dateFormat.get().format(new Date()) : String.valueOf(System.currentTimeMillis())); + maybePrintDataRows(reportFormat, numExpectedAttributes, keys, attributes); + if (options.isOneTime()) { + keepGoing = false; + } else { + TimeUnit.MILLISECONDS.sleep(Math.max(0, options.interval() - (System.currentTimeMillis() - start))); + } + } + Exit.exit(0); + } catch (TerseException e) { + System.err.println(e.getMessage()); + Exit.exit(1); + } catch (Throwable e) { + System.err.println(e.getMessage()); + System.err.println(Utils.stackTrace(e)); + Exit.exit(1); + } + } + + private static String mkString(Stream stream, String delimeter) { + return stream.filter(Objects::nonNull).map(Object::toString).collect(Collectors.joining(delimeter)); + } + + private static int sumValues(Map numExpectedAttributes) { + return numExpectedAttributes.values().stream().mapToInt(Integer::intValue).sum(); + } + + private static String[] attributesNames(MBeanInfo mBeanInfo) { + return Arrays.stream(mBeanInfo.getAttributes()).map(MBeanFeatureInfo::getName).toArray(String[]::new); + } + + private static MBeanServerConnection connectToBeanServer(JmxToolOptions options) throws Exception { + JMXConnector connector; + MBeanServerConnection serverConn = null; + boolean connected = false; + long connectTimeoutMs = 10_000; + long connectTestStarted = System.currentTimeMillis(); + do { + try { + // printing to stderr because system tests parse the output + System.err.printf("Trying to connect to JMX url: %s%n", options.jmxServiceURL()); + Map env = new HashMap<>(); + // ssl enable + if (options.hasJmxSslEnableOpt()) { + env.put("com.sun.jndi.rmi.factory.socket", new SslRMIClientSocketFactory()); + } + // password authentication enable + if (options.hasJmxAuthPropOpt()) { + env.put(JMXConnector.CREDENTIALS, options.credentials()); + } + connector = JMXConnectorFactory.connect(options.jmxServiceURL(), env); + serverConn = connector.getMBeanServerConnection(); + connected = true; + } catch (Exception e) { + System.err.printf("Could not connect to JMX url: %s. Exception: %s.%n", + options.jmxServiceURL(), e.getMessage()); + e.printStackTrace(); + TimeUnit.MILLISECONDS.sleep(100); + } + } while (System.currentTimeMillis() - connectTestStarted < connectTimeoutMs && !connected); + + if (!connected) { + throw new TerseException(String.format("Could not connect to JMX url %s after %d ms.", + options.jmxServiceURL(), connectTimeoutMs)); + } + return serverConn; + } + + private static Set findObjectsIfNoPattern(JmxToolOptions options, + MBeanServerConnection conn, + List queries, + boolean hasPatternQueries) throws Exception { + long waitTimeoutMs = 10_000; + Set result = new HashSet<>(); + Set querySet = new HashSet<>(queries); + BiPredicate, Set> foundAllObjects = (s1, s2) -> s1.containsAll(s2); + if (!hasPatternQueries) { + long start = System.currentTimeMillis(); + do { + if (!result.isEmpty()) { + System.err.println("Could not find all object names, retrying"); + TimeUnit.MILLISECONDS.sleep(100); + } + result.addAll(queryObjects(conn, queries)); + } while (options.hasWait() && System.currentTimeMillis() - start < waitTimeoutMs && !foundAllObjects.test(querySet, result)); + } + + if (options.hasWait() && !foundAllObjects.test(querySet, result)) { + querySet.removeAll(result); + String missing = mkString(querySet.stream().map(Object::toString), ","); + throw new TerseException(String.format("Could not find all requested object names after %d ms. Missing %s", waitTimeoutMs, missing)); + } + return result; + } + + private static Set queryObjects(MBeanServerConnection conn, + List queries) { + Set result = new HashSet<>(); + queries.forEach(name -> { + try { + result.addAll(conn.queryNames(name, null)); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + return result; + } + + private static Map findNumExpectedAttributes(MBeanServerConnection conn, + Optional attributesInclude, + boolean hasPatternQueries, + List queries, + Set found) throws Exception { + Map result = new HashMap<>(); + if (!attributesInclude.isPresent()) { + found.forEach(objectName -> { + try { + MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName); + result.put(objectName, conn.getAttributes(objectName, attributesNames(mBeanInfo)).size()); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } else { + if (!hasPatternQueries) { + found.forEach(objectName -> { + try { + MBeanInfo mBeanInfo = conn.getMBeanInfo(objectName); + AttributeList attributes = conn.getAttributes(objectName, attributesNames(mBeanInfo)); + List expectedAttributes = new ArrayList<>(); + attributes.asList().forEach(attribute -> { + if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) { + expectedAttributes.add(objectName); + } + }); + if (expectedAttributes.size() > 0) { + result.put(objectName, expectedAttributes.size()); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + } else { + queries.forEach(objectName -> result.put(objectName, attributesInclude.get().length)); + } + } + + if (result.isEmpty()) { + throw new TerseException(String.format("No matched attributes for the queried objects %s.", queries)); + } + return result; + } + + private static Map queryAttributes(MBeanServerConnection conn, + Set objectNames, + Optional attributesInclude) throws Exception { + Map result = new HashMap<>(); + for (ObjectName objectName : objectNames) { + MBeanInfo beanInfo = conn.getMBeanInfo(objectName); + AttributeList attributes = conn.getAttributes(objectName, + Arrays.stream(beanInfo.getAttributes()).map(a -> a.getName()).toArray(String[]::new)); + for (Attribute attribute : attributes.asList()) { + if (attributesInclude.isPresent()) { + if (Arrays.asList(attributesInclude.get()).contains(attribute.getName())) { + result.put(String.format("%s:%s", objectName.toString(), attribute.getName()), + attribute.getValue()); + } + } else { + result.put(String.format("%s:%s", objectName.toString(), attribute.getName()), + attribute.getValue()); + } + } + } + return result; + } + + private static void maybePrintCsvHeader(String reportFormat, List keys, Map numExpectedAttributes) { + if (reportFormat.equals("original") && keys.size() == sumValues(numExpectedAttributes) + 1) { + System.out.println(mkString(keys.stream().map(key -> String.format("\"%s\"", key)), ",")); + } + } + + private static void maybePrintDataRows(String reportFormat, Map numExpectedAttributes, List keys, Map attributes) { + if (attributes.size() == sumValues(numExpectedAttributes) + 1) { + switch (reportFormat) { + case "properties": + keys.forEach(key -> System.out.printf("%s=%s%n", key, attributes.get(key))); + break; + case "csv": + keys.forEach(key -> System.out.printf("%s,\"%s\"%n", key, attributes.get(key))); + break; + case "tsv": + keys.forEach(key -> System.out.printf("%s\t%s%n", key, attributes.get(key))); + break; + default: + System.out.println(mkString(keys.stream().map(attributes::get), ",")); + break; + } + } + } + + private static class JmxToolOptions extends CommandDefaultOptions { + private final OptionSpec objectNameOpt; + private final OptionSpec attributesOpt; + private final OptionSpec reportingIntervalOpt; + private final OptionSpec oneTimeOpt; + private final OptionSpec dateFormatOpt; + private final OptionSpec jmxServiceUrlOpt; + private final OptionSpec reportFormatOpt; + private final OptionSpec jmxAuthPropOpt; + private final OptionSpec jmxSslEnableOpt; + private final OptionSpec waitOpt; + + public JmxToolOptions(String[] args) { + super(args); + objectNameOpt = parser.accepts("object-name", "A JMX object name to use as a query. This can contain wild cards, and this option " + + "can be given multiple times to specify more than one query. If no objects are specified " + + "all objects will be queried.") + .withRequiredArg() + .describedAs("name") + .ofType(String.class); + attributesOpt = parser.accepts("attributes", "The list of attributes to include in the query. This is a comma-separated list. If no " + + "attributes are specified all objects will be queried.") + .withRequiredArg() + .describedAs("name") + .ofType(String.class); + reportingIntervalOpt = parser.accepts("reporting-interval", "Interval in MS with which to poll jmx stats; default value is 2 seconds. " + + "Value of -1 equivalent to setting one-time to true") + .withRequiredArg() + .describedAs("ms") + .ofType(Integer.class) + .defaultsTo(2000); + oneTimeOpt = parser.accepts("one-time", "Flag to indicate run once only.") + .withOptionalArg() + .describedAs("one-time") + .ofType(Boolean.class) + .defaultsTo(false); + dateFormatOpt = parser.accepts("date-format", "The date format to use for formatting the time field. " + + "See java.text.SimpleDateFormat for options.") + .withRequiredArg() + .describedAs("format") + .ofType(String.class); + jmxServiceUrlOpt = parser.accepts("jmx-url", "The url to connect to poll JMX data. See Oracle javadoc for JMXServiceURL for details.") + .withRequiredArg() + .describedAs("service-url") + .ofType(String.class) + .defaultsTo("service:jmx:rmi:///jndi/rmi://:9999/jmxrmi"); + reportFormatOpt = parser.accepts("report-format", "output format name: either 'original', 'properties', 'csv', 'tsv' ") + .withRequiredArg() + .describedAs("report-format") + .ofType(String.class) + .defaultsTo("original"); + jmxAuthPropOpt = parser.accepts("jmx-auth-prop", "A mechanism to pass property in the form 'username=password' " + + "when enabling remote JMX with password authentication.") + .withRequiredArg() + .describedAs("jmx-auth-prop") + .ofType(String.class); + jmxSslEnableOpt = parser.accepts("jmx-ssl-enable", "Flag to enable remote JMX with SSL.") + .withRequiredArg() + .describedAs("ssl-enable") + .ofType(Boolean.class) + .defaultsTo(false); + waitOpt = parser.accepts("wait", "Wait for requested JMX objects to become available before starting output. " + + "Only supported when the list of objects is non-empty and contains no object name patterns."); + options = parser.parse(args); + } + + public JMXServiceURL jmxServiceURL() throws MalformedURLException { + return new JMXServiceURL(options.valueOf(jmxServiceUrlOpt)); + } + + public int interval() { + return options.valueOf(reportingIntervalOpt); + } + + public boolean isOneTime() { + return interval() < 0 || options.has(oneTimeOpt); + } + + public Optional attributesInclude() { + if (options.has(attributesOpt)) { + String[] attributes = Arrays.stream(options.valueOf(attributesOpt).split(",")) + .sequential().filter(s -> !s.isEmpty()).toArray(String[]::new); + return Optional.of(attributes); + } else { + return Optional.empty(); + } + } + + public Optional dateFormat() { + return options.has(dateFormatOpt) + ? Optional.of(new SimpleDateFormat(options.valueOf(dateFormatOpt))) + : Optional.empty(); + } + + public boolean hasWait() { + return options.has(waitOpt); + } + + private String parseFormat() { + String reportFormat = options.valueOf(reportFormatOpt).toLowerCase(Locale.ROOT); + return Arrays.asList("properties", "csv", "tsv").contains(reportFormat) ? reportFormat : "original"; + } + + public boolean hasJmxAuthPropOpt() { + return options.has(jmxAuthPropOpt); + } + + public boolean hasJmxSslEnableOpt() { + return options.has(jmxSslEnableOpt); + } + + public String[] credentials() { + return options.valueOf(jmxAuthPropOpt).split("=", 2); + } + + public List queries() { + if (options.has(objectNameOpt)) { + return options.valuesOf(objectNameOpt).stream() + .map(s -> { + try { + return new ObjectName(s); + } catch (MalformedObjectNameException e) { + throw new RuntimeException(e); + } + }).collect(Collectors.toList()); + } else { + List listWithNull = new ArrayList<>(); + listWithNull.add(null); + return listWithNull; + } + } + } +} diff --git a/tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java b/tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java new file mode 100644 index 0000000000000..8314d16405806 --- /dev/null +++ b/tools/src/test/java/org/apache/kafka/tools/JmxToolTest.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.common.utils.Exit; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import javax.management.MBeanServer; +import javax.management.ObjectName; +import javax.management.remote.JMXConnectorServer; +import javax.management.remote.JMXConnectorServerFactory; +import javax.management.remote.JMXServiceURL; + +import java.lang.management.ManagementFactory; +import java.net.ServerSocket; +import java.rmi.registry.LocateRegistry; +import java.text.DateFormat; +import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class JmxToolTest { + private final ToolsTestUtils.MockExitProcedure exitProcedure = new ToolsTestUtils.MockExitProcedure(); + + private static JMXConnectorServer jmxAgent; + private static String jmxUrl; + + @BeforeAll + public static void beforeAll() throws Exception { + int port = findRandomOpenPortOnAllLocalInterfaces(); + jmxAgent = startJmxAgent(port); + jmxUrl = String.format("service:jmx:rmi:///jndi/rmi://:%d/jmxrmi", port); + } + + @AfterAll + public static void afterAll() throws Exception { + jmxAgent.stop(); + } + + @BeforeEach + public void beforeEach() { + Exit.setExitProcedure(exitProcedure); + } + + @AfterEach + public void afterEach() { + Exit.resetExitProcedure(); + } + + @Test + public void kafkaVersion() { + String out = executeAndGetOut("--version"); + assertNormalExit(); + assertTrue(out.contains(AppInfoParser.getVersion())); + } + + @Test + public void unrecognizedOption() { + String err = executeAndGetErr("--foo"); + assertCommandFailure(); + assertTrue(err.contains("UnrecognizedOptionException")); + assertTrue(err.contains("foo")); + } + + @Test + public void missingRequired() { + String err = executeAndGetErr("--reporting-interval"); + assertCommandFailure(); + assertTrue(err.contains("OptionMissingRequiredArgumentException")); + assertTrue(err.contains("reporting-interval")); + } + + @Test + public void malformedURL() { + String err = executeAndGetErr("--jmx-url", "localhost:9999"); + assertCommandFailure(); + assertTrue(err.contains("MalformedURLException")); + } + + @Test + public void helpOptions() { + String[] expectedOptions = new String[]{ + "--attributes", "--date-format", "--help", "--jmx-auth-prop", + "--jmx-ssl-enable", "--jmx-url", "--object-name", "--one-time", + "--report-format", "--reporting-interval", "--version", "--wait" + }; + String err = executeAndGetErr("--help"); + assertCommandFailure(); + for (String option : expectedOptions) { + assertTrue(err.contains(option), option); + } + } + + @Test + public void csvFormat() { + String[] args = new String[]{ + "--jmx-url", jmxUrl, + "--object-name", "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec", + "--report-format", "csv", + "--one-time" + }; + String out = executeAndGetOut(args); + Arrays.stream(out.split("\\r?\\n")).forEach(line -> { + assertTrue(line.matches("([a-zA-Z0-9=:,.]+),\"([ -~]+)\""), line); + }); + } + + @Test + public void tsvFormat() { + String[] args = new String[]{ + "--jmx-url", jmxUrl, + "--object-name", "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec", + "--report-format", "tsv", + "--one-time" + }; + String out = executeAndGetOut(args); + Arrays.stream(out.split("\\r?\\n")).forEach(line -> { + assertTrue(line.matches("([a-zA-Z0-9=:,.]+)\\t([ -~]+)"), line); + }); + } + + @Test + public void allMetrics() { + String[] args = new String[]{ + "--jmx-url", jmxUrl, + "--report-format", "csv", + "--reporting-interval", "-1" + }; + String out = executeAndGetOut(args); + assertNormalExit(); + + Map csv = parseCsv(out); + assertTrue(csv.size() > 0); + } + + @Test + public void filteredMetrics() { + String[] args = new String[]{ + "--jmx-url", jmxUrl, + "--object-name", "kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec", + "--attributes", "FifteenMinuteRate,FiveMinuteRate", + "--report-format", "csv", + "--one-time" + }; + String out = executeAndGetOut(args); + assertNormalExit(); + + Map csv = parseCsv(out); + assertEquals("1.0", csv.get("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FifteenMinuteRate")); + assertEquals("3.0", csv.get("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec:FiveMinuteRate")); + } + + @Test + public void dateFormat() { + String dateFormat = "yyyyMMdd-hh:mm:ss"; + String[] args = new String[]{ + "--jmx-url", jmxUrl, + "--date-format", dateFormat, + "--report-format", "csv", + "--one-time" + }; + String out = executeAndGetOut(args); + assertNormalExit(); + + Map csv = parseCsv(out); + assertTrue(validDateFormat(dateFormat, csv.get("time"))); + } + + private static JMXConnectorServer startJmxAgent(int port) throws Exception { + LocateRegistry.createRegistry(port); + Map env = new HashMap<>(); + env.put("com.sun.management.jmxremote.authenticate", "false"); + env.put("com.sun.management.jmxremote.ssl", "false"); + JMXServiceURL url = new JMXServiceURL(String.format("service:jmx:rmi:///jndi/rmi://:%d/jmxrmi", port)); + MBeanServer server = ManagementFactory.getPlatformMBeanServer(); + server.registerMBean(new Metrics(), + new ObjectName("kafka.server:type=BrokerTopicMetrics,name=MessagesInPerSec")); + JMXConnectorServer agent = JMXConnectorServerFactory.newJMXConnectorServer(url, env, server); + agent.start(); + return agent; + } + + private static int findRandomOpenPortOnAllLocalInterfaces() throws Exception { + try (ServerSocket socket = new ServerSocket(0)) { + return socket.getLocalPort(); + } + } + + private String executeAndGetOut(String... args) { + return execute(args, false); + } + + private String executeAndGetErr(String... args) { + return execute(args, true); + } + + private String execute(String[] args, boolean err) { + Runnable runnable = () -> { + try { + JmxTool.main(args); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + return err ? ToolsTestUtils.captureStandardErr(runnable) + : ToolsTestUtils.captureStandardOut(runnable); + } + + private void assertNormalExit() { + assertTrue(exitProcedure.hasExited()); + assertEquals(0, exitProcedure.statusCode()); + } + + private void assertCommandFailure() { + assertTrue(exitProcedure.hasExited()); + assertEquals(1, exitProcedure.statusCode()); + } + + private Map parseCsv(String value) { + Map result = new HashMap<>(); + Arrays.stream(value.split("\\r?\\n")).forEach(line -> { + String[] cells = line.split(",\""); + if (cells.length == 2) { + result.put(cells[0], cells[1].replaceAll("\"", "")); + } + }); + return result; + } + + private boolean validDateFormat(String format, String value) { + DateFormat formatter = new SimpleDateFormat(format); + formatter.setLenient(false); + try { + formatter.parse(value); + return true; + } catch (ParseException e) { + return false; + } + } + + public interface MetricsMBean { + double getFifteenMinuteRate(); + double getFiveMinuteRate(); + } + + public static class Metrics implements MetricsMBean { + @Override + public double getFifteenMinuteRate() { + return 1.0; + } + + @Override + public double getFiveMinuteRate() { + return 3.0; + } + } +} diff --git a/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java b/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java index 4664288ae4fee..709629eef6a29 100644 --- a/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java +++ b/tools/src/test/java/org/apache/kafka/tools/ToolsTestUtils.java @@ -16,11 +16,12 @@ */ package org.apache.kafka.tools; +import org.apache.kafka.common.utils.Exit; + import java.io.ByteArrayOutputStream; import java.io.PrintStream; public class ToolsTestUtils { - public static String captureStandardOut(Runnable runnable) { return captureStandardStream(false, runnable); } @@ -48,4 +49,24 @@ private static String captureStandardStream(boolean isErr, Runnable runnable) { } } + public static class MockExitProcedure implements Exit.Procedure { + private boolean hasExited = false; + private int statusCode; + + @Override + public void execute(int statusCode, String message) { + if (!this.hasExited) { + this.hasExited = true; + this.statusCode = statusCode; + } + } + + public boolean hasExited() { + return hasExited; + } + + public int statusCode() { + return statusCode; + } + } } diff --git a/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java index 3dc77a6d0f66e..ef7b5b2ab9930 100644 --- a/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/TransactionsCommandTest.java @@ -79,7 +79,7 @@ public class TransactionsCommandTest { - private final MockExitProcedure exitProcedure = new MockExitProcedure(); + private final ToolsTestUtils.MockExitProcedure exitProcedure = new ToolsTestUtils.MockExitProcedure(); private final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); private final PrintStream out = new PrintStream(outputStream); private final MockTime time = new MockTime(); @@ -1048,27 +1048,14 @@ private List readRow(BufferedReader reader) throws IOException { } private void assertNormalExit() { - assertTrue(exitProcedure.hasExited); - assertEquals(0, exitProcedure.statusCode); + assertTrue(exitProcedure.hasExited()); + assertEquals(0, exitProcedure.statusCode()); } private void assertCommandFailure(String[] args) throws Exception { execute(args); - assertTrue(exitProcedure.hasExited); - assertEquals(1, exitProcedure.statusCode); - } - - private static class MockExitProcedure implements Exit.Procedure { - private boolean hasExited = false; - private int statusCode; - - @Override - public void execute(int statusCode, String message) { - if (!this.hasExited) { - this.hasExited = true; - this.statusCode = statusCode; - } - } + assertTrue(exitProcedure.hasExited()); + assertEquals(1, exitProcedure.statusCode()); } }