From 6381e533ec72893814fb2ab125509763a6186d4a Mon Sep 17 00:00:00 2001 From: dengziming Date: Tue, 2 Aug 2022 12:33:52 +0800 Subject: [PATCH 1/9] KAFKA-13914: Add command line tool kafka-metadata-quorum.sh --- bin/kafka-metadata-quorum.sh | 17 +++ .../kafka/admin/MetadataQuorumCommand.scala | 109 ++++++++++++++++++ .../test/java/kafka/test/ClusterInstance.java | 3 + .../junit/RaftClusterInvocationContext.java | 5 + .../junit/ZkClusterInvocationContext.java | 6 + .../admin/MetadataQuorumCommandTest.scala | 85 ++++++++++++++ 6 files changed, 225 insertions(+) create mode 100755 bin/kafka-metadata-quorum.sh create mode 100644 core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala create mode 100644 core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala diff --git a/bin/kafka-metadata-quorum.sh b/bin/kafka-metadata-quorum.sh new file mode 100755 index 0000000000000..24bedbded1e7d --- /dev/null +++ b/bin/kafka-metadata-quorum.sh @@ -0,0 +1,17 @@ +#!/bin/bash +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +exec $(dirname $0)/kafka-run-class.sh kafka.admin.MetadataQuorumCommand "$@" diff --git a/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala b/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala new file mode 100644 index 0000000000000..945bb09656365 --- /dev/null +++ b/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import kafka.tools.TerseFailure +import kafka.utils.Exit +import net.sourceforge.argparse4j.ArgumentParsers +import net.sourceforge.argparse4j.impl.Arguments.fileType +import net.sourceforge.argparse4j.inf.Subparsers +import org.apache.kafka.clients._ +import org.apache.kafka.clients.admin.{Admin, QuorumInfo} +import org.apache.kafka.common.utils.Utils + +import java.io.File +import java.util.Properties +import scala.jdk.CollectionConverters._ + +/** + * A tool for describing quorum status + */ +object MetadataQuorumCommand { + + def main(args: Array[String]): Unit = { + val res = mainNoExit(args) + Exit.exit(res) + } + + def mainNoExit(args: Array[String]): Int = { + val parser = ArgumentParsers.newArgumentParser("kafka-metadata-quorum") + .defaultHelp(true) + .description("This tool describes kraft metadata quorum status.") + parser.addArgument("--bootstrap-server") + .help("A comma-separated list of host:port pairs to use for establishing the connection to the Kafka cluster.") + .required(true) + + parser.addArgument("--command-config") + .`type`(fileType()) + .help("Property file containing configs to be passed to Admin Client.") + val subparsers = parser.addSubparsers().dest("command") + addDescribeParser(subparsers) + + try { + val namespace = parser.parseArgsOrFail(args) + val command = namespace.getString("command") + + val commandConfig = namespace.get[File]("command_config") + val props = if (commandConfig != null) { + if (!commandConfig.exists()) { + throw new TerseFailure(s"Properties file ${commandConfig.getPath} does not exists!") + } + Utils.loadProps(commandConfig.getPath) + } else { + new Properties() + } + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, namespace.getString("bootstrap_server")) + val admin = Admin.create(props) + + if (command == "describe") { + handleDescribe(admin) + } else { + // currently we only support describe + } + admin.close() + 0 + } catch { + case e: TerseFailure => + Console.err.println(e.getMessage) + 1 + } + } + + def addDescribeParser(subparsers: Subparsers): Unit = { + subparsers.addParser("describe") + .help("Describe the metadata quorum info") + } + + def handleDescribe(admin: Admin): Unit = { + val quorumInfo = admin.describeMetadataQuorum().quorumInfo().get() + val leader = quorumInfo.leaderId() + println(s"leaderId: $leader") + + def printQuorumInfo(infos: Seq[QuorumInfo.ReplicaState]): Unit = { + infos.foreach { voter => + println(s"replicaId: ${voter.replicaId()} \tlogEndOffset: ${voter.logEndOffset()}\t" + + s"lastFetchTimeMs: ${voter.lastFetchTimeMs().orElse(-1)}\tlastCaughtUpTimeMs: ${voter.lastCaughtUpTimeMs().orElse(-1)}") + } + } + println(s"voters info:") + printQuorumInfo(quorumInfo.voters().asScala.toSeq) + println() + println(s"observers info:") + printQuorumInfo(quorumInfo.observers().asScala.toSeq) + } +} diff --git a/core/src/test/java/kafka/test/ClusterInstance.java b/core/src/test/java/kafka/test/ClusterInstance.java index 9058508fa94f7..1a947541e053d 100644 --- a/core/src/test/java/kafka/test/ClusterInstance.java +++ b/core/src/test/java/kafka/test/ClusterInstance.java @@ -19,6 +19,7 @@ import kafka.network.SocketServer; import kafka.server.BrokerFeatures; +import kafka.server.ControllerServer; import kafka.test.annotation.ClusterTest; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.common.network.ListenerName; @@ -115,6 +116,8 @@ default Optional controlPlaneListenerName() { */ Map brokerFeatures(); + Collection controllerServers(); + /** * The underlying object which is responsible for setting up and tearing down the cluster. */ diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java index 40669f3068bf1..9f96f47ffae6d 100644 --- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java +++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java @@ -183,6 +183,11 @@ public Map brokerFeatures() { )); } + @Override + public Collection controllerServers() { + return controllers().collect(Collectors.toList()); + } + @Override public ClusterType clusterType() { return ClusterType.RAFT; diff --git a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java index 18a85e2d7bf66..019d143c45f6d 100644 --- a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java +++ b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java @@ -20,6 +20,7 @@ import kafka.api.IntegrationTestHarness; import kafka.network.SocketServer; import kafka.server.BrokerFeatures; +import kafka.server.ControllerServer; import kafka.server.KafkaConfig; import kafka.server.KafkaServer; import kafka.test.ClusterConfig; @@ -248,6 +249,11 @@ public Map brokerFeatures() { )); } + @Override + public Collection controllerServers() { + throw new UnsupportedOperationException("Can't get controller server in zk mode"); + } + @Override public ClusterType clusterType() { return ClusterType.ZK; diff --git a/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala b/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala new file mode 100644 index 0000000000000..ff8ee19b13fce --- /dev/null +++ b/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.admin + +import kafka.test.ClusterInstance +import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, ClusterTests, Type} +import kafka.test.junit.ClusterTestExtensions +import kafka.utils.TestUtils +import org.apache.kafka.common.errors.UnsupportedVersionException +import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue} +import org.junit.jupiter.api.{Tag, Test} +import org.junit.jupiter.api.extension.ExtendWith + +import java.util.concurrent.ExecutionException +import scala.jdk.CollectionConverters._ + +@ExtendWith(value = Array(classOf[ClusterTestExtensions])) +@ClusterTestDefaults(clusterType = Type.KRAFT) +@Tag("integration") +class MetadataQuorumCommandTest(cluster: ClusterInstance) { + + @ClusterTests( + Array( + new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 3), + new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 3) + ) + ) + def testDescribeQuorumSuccessful(): Unit = { + val initialDescribeOutput = TestUtils.grabConsoleOutput( + MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", cluster.bootstrapServers(), "describe")) + ) + + val leaderInfo = initialDescribeOutput.substring(0, initialDescribeOutput.indexOf("voters info:")).trim + assertEquals( + 1, + cluster.controllerServers().asScala.map(_.config.nodeId).map(id => s"leaderId: $id").count(info => info == leaderInfo) + ) + + val votersInfo = initialDescribeOutput.substring(initialDescribeOutput.indexOf("voters info:") + 13, initialDescribeOutput.indexOf("observers info:")) + assertEquals(3, votersInfo.split("\n").length) + + val observersInfo = initialDescribeOutput.substring(initialDescribeOutput.indexOf("observers info:") + 16) + assertTrue(observersInfo.isEmpty, "this will be fixed by KAFKA-13986") + } + + @ClusterTest(clusterType = Type.ZK, brokers = 3, controllers = 1) + def testDescribeQuorumInZkMode(): Unit = { + assertTrue( + assertThrows( + classOf[ExecutionException], + () => MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", cluster.bootstrapServers(), "describe")) + ).getCause.isInstanceOf[UnsupportedVersionException] + ) + } +} + +class MetadataQuorumCommandErrorTest { + + @Test + def testPropertiesFileDoesNotExists(): Unit = { + assertEquals( + 1, + MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", "localhost:9092", "--command-config", "admin.properties", "describe")) + ) + assertEquals( + "Properties file admin.properties does not exists!", + TestUtils.grabConsoleError(MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", "localhost:9092", "--command-config", "admin.properties", "describe"))).trim + ) + } +} \ No newline at end of file From dc8725c838caaafe2aa284475d721982173bd9a0 Mon Sep 17 00:00:00 2001 From: dengziming Date: Thu, 11 Aug 2022 10:44:19 +0800 Subject: [PATCH 2/9] Make output consistent with KIP --- bin/windows/kafka-metatada-quorum.bat | 17 +++ .../kafka/admin/MetadataQuorumCommand.scala | 119 ++++++++++++++---- .../admin/MetadataQuorumCommandTest.scala | 107 ++++++++++++---- 3 files changed, 197 insertions(+), 46 deletions(-) create mode 100644 bin/windows/kafka-metatada-quorum.bat diff --git a/bin/windows/kafka-metatada-quorum.bat b/bin/windows/kafka-metatada-quorum.bat new file mode 100644 index 0000000000000..4ea8e3109f962 --- /dev/null +++ b/bin/windows/kafka-metatada-quorum.bat @@ -0,0 +1,17 @@ +@echo off +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +rem http://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. + +"%~dp0kafka-run-class.bat" kafka.admin.MetadataQuorumCommand %* diff --git a/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala b/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala index 945bb09656365..a87a170fc765c 100644 --- a/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala +++ b/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala @@ -14,13 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package kafka.admin import kafka.tools.TerseFailure import kafka.utils.Exit import net.sourceforge.argparse4j.ArgumentParsers -import net.sourceforge.argparse4j.impl.Arguments.fileType +import net.sourceforge.argparse4j.impl.Arguments.{fileType, storeTrue} import net.sourceforge.argparse4j.inf.Subparsers import org.apache.kafka.clients._ import org.apache.kafka.clients.admin.{Admin, QuorumInfo} @@ -41,19 +40,23 @@ object MetadataQuorumCommand { } def mainNoExit(args: Array[String]): Int = { - val parser = ArgumentParsers.newArgumentParser("kafka-metadata-quorum") + val parser = ArgumentParsers + .newArgumentParser("kafka-metadata-quorum") .defaultHelp(true) .description("This tool describes kraft metadata quorum status.") - parser.addArgument("--bootstrap-server") + parser + .addArgument("--bootstrap-server") .help("A comma-separated list of host:port pairs to use for establishing the connection to the Kafka cluster.") .required(true) - parser.addArgument("--command-config") + parser + .addArgument("--command-config") .`type`(fileType()) .help("Property file containing configs to be passed to Admin Client.") val subparsers = parser.addSubparsers().dest("command") addDescribeParser(subparsers) + var admin: Admin = null try { val namespace = parser.parseArgsOrFail(args) val command = namespace.getString("command") @@ -68,42 +71,116 @@ object MetadataQuorumCommand { new Properties() } props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, namespace.getString("bootstrap_server")) - val admin = Admin.create(props) + admin = Admin.create(props) if (command == "describe") { - handleDescribe(admin) + if (namespace.getBoolean("status") && namespace.getBoolean("replication")) { + throw new TerseFailure(s"Only one of --status or --replication should be specified with describe sub-command") + } else if (namespace.getBoolean("replication")) { + handleDescribeReplication(admin) + } else if (namespace.getBoolean("status")) { + handleDescribeStatus(admin) + } else { + throw new TerseFailure(s"One of --status or --replication must be specified with describe sub-command") + } } else { // currently we only support describe } - admin.close() 0 } catch { case e: TerseFailure => Console.err.println(e.getMessage) 1 + } finally { + if (admin != null) { + admin.close() + } } } def addDescribeParser(subparsers: Subparsers): Unit = { - subparsers.addParser("describe") + val describeParser = subparsers + .addParser("describe") .help("Describe the metadata quorum info") + + val statusArgs = describeParser.addArgumentGroup("Status") + statusArgs + .addArgument("--status") + .help( + "A short summary of the quorum status and the other provides detailed information about the status of replication.") + .action(storeTrue()) + val replicationArgs = describeParser.addArgumentGroup("Replication") + replicationArgs + .addArgument("--replication") + .help("Detailed information about the status of replication") + .action(storeTrue()) } - def handleDescribe(admin: Admin): Unit = { + private def handleDescribeReplication(admin: Admin): Unit = { val quorumInfo = admin.describeMetadataQuorum().quorumInfo().get() - val leader = quorumInfo.leaderId() - println(s"leaderId: $leader") + val leaderId = quorumInfo.leaderId() + val leader = quorumInfo.voters().asScala.filter(_.replicaId() == leaderId).head + // Find proper columns width + var (maxReplicaIdLen, maxLogEndOffsetLen, maxLagLen, maxLastFetchTimeMsLen, maxLastCaughtUpTimeMsLen) = + (15, 15, 15, 15, 18) + (quorumInfo.voters().asScala ++ quorumInfo.observers().asScala).foreach { voter => + maxReplicaIdLen = Math.max(maxReplicaIdLen, voter.replicaId().toString.length) + maxLogEndOffsetLen = Math.max(maxLogEndOffsetLen, voter.logEndOffset().toString.length) + maxLagLen = Math.max(maxLagLen, (leader.logEndOffset() - voter.logEndOffset()).toString.length) + maxLastFetchTimeMsLen = Math.max(maxLastFetchTimeMsLen, leader.lastFetchTimeMs().orElse(-1).toString.length) + maxLastCaughtUpTimeMsLen = + Math.max(maxLastCaughtUpTimeMsLen, leader.lastCaughtUpTimeMs().orElse(-1).toString.length) + } + println( + s"%${-maxReplicaIdLen}s %${-maxLogEndOffsetLen}s %${-maxLagLen}s %${-maxLastFetchTimeMsLen}s %${-maxLastCaughtUpTimeMsLen}s %-15s " + .format("ReplicaId", "LogEndOffset", "Lag", "LastFetchTimeMs", "LastCaughtUpTimeMs", "Status") + ) - def printQuorumInfo(infos: Seq[QuorumInfo.ReplicaState]): Unit = { + def printQuorumInfo(infos: Seq[QuorumInfo.ReplicaState], status: String): Unit = infos.foreach { voter => - println(s"replicaId: ${voter.replicaId()} \tlogEndOffset: ${voter.logEndOffset()}\t" + - s"lastFetchTimeMs: ${voter.lastFetchTimeMs().orElse(-1)}\tlastCaughtUpTimeMs: ${voter.lastCaughtUpTimeMs().orElse(-1)}") + println( + s"%${-maxReplicaIdLen}s %${-maxLogEndOffsetLen}s %${-maxLagLen}s %${-maxLastFetchTimeMsLen}s %${-maxLastCaughtUpTimeMsLen}s %-15s " + .format( + voter.replicaId(), + voter.logEndOffset(), + leader.logEndOffset() - voter.logEndOffset(), + voter.lastFetchTimeMs().orElse(-1), + voter.lastCaughtUpTimeMs().orElse(-1), + status + )) } - } - println(s"voters info:") - printQuorumInfo(quorumInfo.voters().asScala.toSeq) - println() - println(s"observers info:") - printQuorumInfo(quorumInfo.observers().asScala.toSeq) + printQuorumInfo(Seq(leader), "Leader") + printQuorumInfo(quorumInfo.voters().asScala.filter(_.replicaId() != leaderId).toSeq, "Follower") + printQuorumInfo(quorumInfo.observers().asScala.toSeq, "Observer") + } + + private def handleDescribeStatus(admin: Admin): Unit = { + val clusterId = admin.describeCluster().clusterId().get() + val quorumInfo = admin.describeMetadataQuorum().quorumInfo().get() + val leaderId = quorumInfo.leaderId() + val leader = quorumInfo.voters().asScala.filter(_.replicaId() == leaderId).head + val maxLagFollower = quorumInfo + .voters() + .asScala + .filter(_.replicaId() != leaderId) + .minBy(_.logEndOffset()) + val maxFollowerLag = leader.logEndOffset() - maxLagFollower.logEndOffset() + val maxFollowerLagTimeMs = + if (leader.lastCaughtUpTimeMs().isPresent && maxLagFollower.lastCaughtUpTimeMs().isPresent) { + leader.lastCaughtUpTimeMs().getAsLong - maxLagFollower.lastCaughtUpTimeMs().getAsLong + } else { + -1 + } + println( + s"""|ClusterId: $clusterId + |LeaderId: ${quorumInfo.leaderId()} + |LeaderEpoch: {{TODO}} + |HighWatermark: {{TODO}} + |MaxFollowerLag: $maxFollowerLag + |MaxFollowerLagTimeMs: $maxFollowerLagTimeMs + |CurrentVoters: ${quorumInfo.voters().asScala.map(_.replicaId()).mkString("[", ",", "]")} + |CurrentObservers: ${quorumInfo.observers().asScala.map(_.replicaId()).mkString("[", ",", "]")} + |""".stripMargin + ) } } diff --git a/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala b/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala index ff8ee19b13fce..0a506e744cda6 100644 --- a/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package kafka.admin import kafka.test.ClusterInstance @@ -27,7 +26,6 @@ import org.junit.jupiter.api.{Tag, Test} import org.junit.jupiter.api.extension.ExtendWith import java.util.concurrent.ExecutionException -import scala.jdk.CollectionConverters._ @ExtendWith(value = Array(classOf[ClusterTestExtensions])) @ClusterTestDefaults(clusterType = Type.KRAFT) @@ -35,27 +33,50 @@ import scala.jdk.CollectionConverters._ class MetadataQuorumCommandTest(cluster: ClusterInstance) { @ClusterTests( - Array( - new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 3), - new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 3) - ) - ) - def testDescribeQuorumSuccessful(): Unit = { - val initialDescribeOutput = TestUtils.grabConsoleOutput( - MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", cluster.bootstrapServers(), "describe")) + Array(new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 3), + new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 3))) + def testDescribeQuorumReplicationSuccessful(): Unit = { + val describeOutput = TestUtils.grabConsoleOutput( + MetadataQuorumCommand.mainNoExit( + Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--replication")) ) - val leaderInfo = initialDescribeOutput.substring(0, initialDescribeOutput.indexOf("voters info:")).trim - assertEquals( - 1, - cluster.controllerServers().asScala.map(_.config.nodeId).map(id => s"leaderId: $id").count(info => info == leaderInfo) - ) + val leaderPattern = """\d+\s+\d+\s+\d+\s+[-]?\d+\s+[-]?\d+\s+Leader\s+""".r + val followerPattern = """\d+\s+\d+\s+\d+\s+[-]?\d+\s+[-]?\d+\s+Follower\s+""".r + val observerPattern = """\d+\s+\d+\s+\d+\s+[-]?\d+\s+[-]?\d+\s+Observer\s+""".r + val outputs = describeOutput.split("\n").tail + if (cluster.config().clusterType() == Type.CO_KRAFT) { + assertEquals(cluster.config().numControllers(), outputs.length) + } else { + assertEquals(cluster.config().numBrokers() + cluster.config().numControllers(), outputs.length) + } + assertTrue(leaderPattern.matches(outputs.head)) + assertEquals(1, outputs.count(leaderPattern.matches(_))) + assertEquals(cluster.config().numControllers() - 1, outputs.count(followerPattern.matches(_))) + if (cluster.config().clusterType() == Type.CO_KRAFT) { + assertEquals(0, outputs.count(observerPattern.matches(_))) + } else { + assertEquals(cluster.config().numBrokers(), outputs.count(observerPattern.matches(_))) + } + } - val votersInfo = initialDescribeOutput.substring(initialDescribeOutput.indexOf("voters info:") + 13, initialDescribeOutput.indexOf("observers info:")) - assertEquals(3, votersInfo.split("\n").length) + @ClusterTests( + Array(new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 3), + new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 3))) + def testDescribeQuorumStatusSuccessful(): Unit = { + val describeOutput = TestUtils.grabConsoleOutput( + MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status")) + ) + val outputs = describeOutput.split("\n") - val observersInfo = initialDescribeOutput.substring(initialDescribeOutput.indexOf("observers info:") + 16) - assertTrue(observersInfo.isEmpty, "this will be fixed by KAFKA-13986") + assertTrue("""ClusterId:\s+\S{22}""".r.matches(outputs(0))) + assertTrue("""LeaderId:\s+\d+""".r.matches(outputs(1)), "[" + outputs(1) + "]") + assertTrue("""LeaderEpoch:\s+\d+""".r.matches(outputs(2))) + assertTrue("""HighWatermark:\s+\d+""".r.matches(outputs(3))) + assertTrue("""MaxFollowerLag:\s+\d+""".r.matches(outputs(4))) + assertTrue("""MaxFollowerLagTimeMs:\s+\d+""".r.matches(outputs(5))) + assertTrue("""CurrentVoters:\s+\[\d+(,\d+)*\]""".r.matches(outputs(6))) + assertTrue("""CurrentObservers:\s+\s+\[\d+(,\d+)*\]""".r.matches(outputs(7))) } @ClusterTest(clusterType = Type.ZK, brokers = 3, controllers = 1) @@ -63,7 +84,17 @@ class MetadataQuorumCommandTest(cluster: ClusterInstance) { assertTrue( assertThrows( classOf[ExecutionException], - () => MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", cluster.bootstrapServers(), "describe")) + () => + MetadataQuorumCommand.mainNoExit( + Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status")) + ).getCause.isInstanceOf[UnsupportedVersionException] + ) + assertTrue( + assertThrows( + classOf[ExecutionException], + () => + MetadataQuorumCommand.mainNoExit( + Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--replication")) ).getCause.isInstanceOf[UnsupportedVersionException] ) } @@ -73,13 +104,39 @@ class MetadataQuorumCommandErrorTest { @Test def testPropertiesFileDoesNotExists(): Unit = { + assertEquals(1, + MetadataQuorumCommand.mainNoExit( + Array("--bootstrap-server", "localhost:9092", "--command-config", "admin.properties", "describe"))) + assertEquals( + "Properties file admin.properties does not exists!", + TestUtils + .grabConsoleError( + MetadataQuorumCommand.mainNoExit( + Array("--bootstrap-server", "localhost:9092", "--command-config", "admin.properties", "describe"))) + .trim + ) + } + + @Test + def testDescribeOptions(): Unit = { + assertEquals(1, MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", "localhost:9092", "describe"))) assertEquals( - 1, - MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", "localhost:9092", "--command-config", "admin.properties", "describe")) + "One of --status or --replication must be specified with describe sub-command", + TestUtils + .grabConsoleError(MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", "localhost:9092", "describe"))) + .trim ) + + assertEquals(1, + MetadataQuorumCommand.mainNoExit( + Array("--bootstrap-server", "localhost:9092", "describe", "--status", "--replication"))) assertEquals( - "Properties file admin.properties does not exists!", - TestUtils.grabConsoleError(MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", "localhost:9092", "--command-config", "admin.properties", "describe"))).trim + "Only one of --status or --replication should be specified with describe sub-command", + TestUtils + .grabConsoleError( + MetadataQuorumCommand.mainNoExit( + Array("--bootstrap-server", "localhost:9092", "describe", "--status", "--replication"))) + .trim ) } -} \ No newline at end of file +} From 8c8b8d554780421e01d21a9297629ea786259b1b Mon Sep 17 00:00:00 2001 From: dengziming Date: Mon, 15 Aug 2022 12:58:00 +0800 Subject: [PATCH 3/9] Add leaderEpoch and highWatermark in DescribeQuorumResponse --- .../kafka/clients/admin/KafkaAdminClient.java | 2 ++ .../org/apache/kafka/clients/admin/QuorumInfo.java | 14 +++++++++++++- .../kafka/clients/admin/KafkaAdminClientTest.java | 2 +- .../scala/kafka/admin/MetadataQuorumCommand.scala | 4 ++-- .../kafka/admin/MetadataQuorumCommandTest.scala | 10 +++++++--- .../kafka/server/DescribeQuorumRequestTest.scala | 2 ++ 6 files changed, 27 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 41eb27a1ddad8..e5df779b616ed 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -4359,6 +4359,8 @@ private QuorumInfo.ReplicaState translateReplicaState(DescribeQuorumResponseData private QuorumInfo createQuorumResult(final DescribeQuorumResponseData.PartitionData partition) { return new QuorumInfo( partition.leaderId(), + partition.leaderEpoch(), + partition.highWatermark(), partition.currentVoters().stream().map(v -> translateReplicaState(v)).collect(Collectors.toList()), partition.observers().stream().map(o -> translateReplicaState(o)).collect(Collectors.toList())); } diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java b/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java index 75476d77dcff1..3a0b6cf6f7492 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/QuorumInfo.java @@ -25,11 +25,15 @@ */ public class QuorumInfo { private final Integer leaderId; + private final Integer leaderEpoch; + private final Long highWatermark; private final List voters; private final List observers; - QuorumInfo(Integer leaderId, List voters, List observers) { + QuorumInfo(Integer leaderId, Integer leaderEpoch, Long highWatermark, List voters, List observers) { this.leaderId = leaderId; + this.leaderEpoch = leaderEpoch; + this.highWatermark = highWatermark; this.voters = voters; this.observers = observers; } @@ -38,6 +42,14 @@ public Integer leaderId() { return leaderId; } + public Integer leaderEpoch() { + return leaderEpoch; + } + + public Long highWatermark() { + return highWatermark; + } + public List voters() { return voters; } diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index 5faf53f0756fa..193457655a528 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -608,7 +608,7 @@ private static ApiVersionsResponse prepareApiVersionsResponseForDescribeFeatures } private static QuorumInfo defaultQuorumInfo(Boolean emptyOptionals) { - return new QuorumInfo(1, + return new QuorumInfo(1, 1, 1L, singletonList(new QuorumInfo.ReplicaState(1, 100, emptyOptionals ? OptionalLong.empty() : OptionalLong.of(1000), emptyOptionals ? OptionalLong.empty() : OptionalLong.of(1000))), diff --git a/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala b/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala index a87a170fc765c..10f1e823ed152 100644 --- a/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala +++ b/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala @@ -174,8 +174,8 @@ object MetadataQuorumCommand { println( s"""|ClusterId: $clusterId |LeaderId: ${quorumInfo.leaderId()} - |LeaderEpoch: {{TODO}} - |HighWatermark: {{TODO}} + |LeaderEpoch: ${quorumInfo.leaderEpoch()} + |HighWatermark: ${quorumInfo.highWatermark()} |MaxFollowerLag: $maxFollowerLag |MaxFollowerLagTimeMs: $maxFollowerLagTimeMs |CurrentVoters: ${quorumInfo.voters().asScala.map(_.replicaId()).mkString("[", ",", "]")} diff --git a/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala b/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala index 0a506e744cda6..c75b0ecca6ef2 100644 --- a/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala @@ -70,13 +70,17 @@ class MetadataQuorumCommandTest(cluster: ClusterInstance) { val outputs = describeOutput.split("\n") assertTrue("""ClusterId:\s+\S{22}""".r.matches(outputs(0))) - assertTrue("""LeaderId:\s+\d+""".r.matches(outputs(1)), "[" + outputs(1) + "]") + assertTrue("""LeaderId:\s+\d+""".r.matches(outputs(1))) assertTrue("""LeaderEpoch:\s+\d+""".r.matches(outputs(2))) assertTrue("""HighWatermark:\s+\d+""".r.matches(outputs(3))) assertTrue("""MaxFollowerLag:\s+\d+""".r.matches(outputs(4))) - assertTrue("""MaxFollowerLagTimeMs:\s+\d+""".r.matches(outputs(5))) + assertTrue("""MaxFollowerLagTimeMs:\s+[-]?\d+""".r.matches(outputs(5)), "[" + outputs(5) + "]") assertTrue("""CurrentVoters:\s+\[\d+(,\d+)*\]""".r.matches(outputs(6))) - assertTrue("""CurrentObservers:\s+\s+\[\d+(,\d+)*\]""".r.matches(outputs(7))) + if (cluster.config().clusterType() == Type.CO_KRAFT) { + assertTrue("""CurrentObservers:\s+\[\]""".r.matches(outputs(7))) + } else { + assertTrue("""CurrentObservers:\s+\[\d+(,\d+)*\]""".r.matches(outputs(7))) + } } @ClusterTest(clusterType = Type.ZK, brokers = 3, controllers = 1) diff --git a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala index eed58961e4f78..28a6f801235c8 100644 --- a/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/DescribeQuorumRequestTest.scala @@ -74,6 +74,8 @@ class DescribeQuorumRequestTest(cluster: ClusterInstance) { val leaderId = partitionData.leaderId assertTrue(leaderId > 0) + assertTrue(partitionData.leaderEpoch() > 0) + assertTrue(partitionData.highWatermark() > 0) val leaderState = partitionData.currentVoters.asScala.find(_.replicaId == leaderId) .getOrElse(throw new AssertionError("Failed to find leader among current voter states")) From c4666c94d3190e5a41d8eb93c6624bb30cd7fb87 Mon Sep 17 00:00:00 2001 From: dengziming Date: Tue, 16 Aug 2022 15:15:23 +0800 Subject: [PATCH 4/9] some feedback --- .../org/apache/kafka/common/utils/Utils.java | 46 ++++++++++ .../kafka/admin/MetadataQuorumCommand.scala | 84 ++++++++----------- .../admin/MetadataQuorumCommandTest.scala | 58 +++++++++++-- .../kafka/tools/TransactionsCommand.java | 46 +--------- 4 files changed, 132 insertions(+), 102 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 7d84167cf24fc..3eab4ddfb7cad 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.utils; +import java.io.PrintStream; import java.nio.BufferUnderflowException; import java.nio.file.StandardOpenOption; import java.util.AbstractMap; @@ -1451,4 +1452,49 @@ public static String[] enumOptions(Class> enumClass) { .toArray(String[]::new); } + private static void appendColumnValue( + StringBuilder rowBuilder, + String value, + int length + ) { + int padLength = length - value.length(); + rowBuilder.append(value); + for (int i = 0; i < padLength; i++) + rowBuilder.append(' '); + } + + private static void printRow( + List columnLengths, + String[] row, + PrintStream out + ) { + StringBuilder rowBuilder = new StringBuilder(); + for (int i = 0; i < row.length; i++) { + Integer columnLength = columnLengths.get(i); + String columnValue = row[i]; + appendColumnValue(rowBuilder, columnValue, columnLength); + rowBuilder.append('\t'); + } + out.println(rowBuilder); + } + + public static void prettyPrintTable( + String[] headers, + List rows, + PrintStream out + ) { + List columnLengths = Arrays.stream(headers) + .map(String::length) + .collect(Collectors.toList()); + + for (String[] row : rows) { + for (int i = 0; i < headers.length; i++) { + columnLengths.set(i, Math.max(columnLengths.get(i), row[i].length())); + } + } + + printRow(columnLengths, headers, out); + rows.forEach(row -> printRow(columnLengths, row, out)); + } + } diff --git a/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala b/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala index 10f1e823ed152..d27df8c28ed40 100644 --- a/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala +++ b/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala @@ -24,6 +24,7 @@ import net.sourceforge.argparse4j.inf.Subparsers import org.apache.kafka.clients._ import org.apache.kafka.clients.admin.{Admin, QuorumInfo} import org.apache.kafka.common.utils.Utils +import org.apache.kafka.common.utils.Utils.prettyPrintTable import java.io.File import java.util.Properties @@ -117,69 +118,54 @@ object MetadataQuorumCommand { } private def handleDescribeReplication(admin: Admin): Unit = { - val quorumInfo = admin.describeMetadataQuorum().quorumInfo().get() - val leaderId = quorumInfo.leaderId() - val leader = quorumInfo.voters().asScala.filter(_.replicaId() == leaderId).head - // Find proper columns width - var (maxReplicaIdLen, maxLogEndOffsetLen, maxLagLen, maxLastFetchTimeMsLen, maxLastCaughtUpTimeMsLen) = - (15, 15, 15, 15, 18) - (quorumInfo.voters().asScala ++ quorumInfo.observers().asScala).foreach { voter => - maxReplicaIdLen = Math.max(maxReplicaIdLen, voter.replicaId().toString.length) - maxLogEndOffsetLen = Math.max(maxLogEndOffsetLen, voter.logEndOffset().toString.length) - maxLagLen = Math.max(maxLagLen, (leader.logEndOffset() - voter.logEndOffset()).toString.length) - maxLastFetchTimeMsLen = Math.max(maxLastFetchTimeMsLen, leader.lastFetchTimeMs().orElse(-1).toString.length) - maxLastCaughtUpTimeMsLen = - Math.max(maxLastCaughtUpTimeMsLen, leader.lastCaughtUpTimeMs().orElse(-1).toString.length) - } - println( - s"%${-maxReplicaIdLen}s %${-maxLogEndOffsetLen}s %${-maxLagLen}s %${-maxLastFetchTimeMsLen}s %${-maxLastCaughtUpTimeMsLen}s %-15s " - .format("ReplicaId", "LogEndOffset", "Lag", "LastFetchTimeMs", "LastCaughtUpTimeMs", "Status") - ) + val quorumInfo = admin.describeMetadataQuorum.quorumInfo.get + val leaderId = quorumInfo.leaderId + val leader = quorumInfo.voters.asScala.filter(_.replicaId == leaderId).head - def printQuorumInfo(infos: Seq[QuorumInfo.ReplicaState], status: String): Unit = - infos.foreach { voter => - println( - s"%${-maxReplicaIdLen}s %${-maxLogEndOffsetLen}s %${-maxLagLen}s %${-maxLastFetchTimeMsLen}s %${-maxLastCaughtUpTimeMsLen}s %-15s " - .format( - voter.replicaId(), - voter.logEndOffset(), - leader.logEndOffset() - voter.logEndOffset(), - voter.lastFetchTimeMs().orElse(-1), - voter.lastCaughtUpTimeMs().orElse(-1), + def convertQuorumInfo(infos: Seq[QuorumInfo.ReplicaState], status: String): Seq[Array[String]] = + infos.map { voter => + Array(voter.replicaId, + voter.logEndOffset, + leader.logEndOffset - voter.logEndOffset, + voter.lastFetchTimeMs.orElse(-1), + voter.lastCaughtUpTimeMs.orElse(-1), status - )) + ).map(_.toString) } - printQuorumInfo(Seq(leader), "Leader") - printQuorumInfo(quorumInfo.voters().asScala.filter(_.replicaId() != leaderId).toSeq, "Follower") - printQuorumInfo(quorumInfo.observers().asScala.toSeq, "Observer") + prettyPrintTable( + Array("ReplicaId", "LogEndOffset", "Lag", "LastFetchTimeMs", "LastCaughtUpTimeMs", "Status"), + (convertQuorumInfo(Seq(leader), "Leader") + ++ convertQuorumInfo(quorumInfo.voters.asScala.filter(_.replicaId != leaderId).toSeq, "Follower") + ++ convertQuorumInfo(quorumInfo.observers.asScala.toSeq, "Observer")).asJava, + scala.Console.out + ) } private def handleDescribeStatus(admin: Admin): Unit = { - val clusterId = admin.describeCluster().clusterId().get() - val quorumInfo = admin.describeMetadataQuorum().quorumInfo().get() - val leaderId = quorumInfo.leaderId() - val leader = quorumInfo.voters().asScala.filter(_.replicaId() == leaderId).head - val maxLagFollower = quorumInfo - .voters() - .asScala - .filter(_.replicaId() != leaderId) - .minBy(_.logEndOffset()) - val maxFollowerLag = leader.logEndOffset() - maxLagFollower.logEndOffset() + val clusterId = admin.describeCluster.clusterId.get + val quorumInfo = admin.describeMetadataQuorum.quorumInfo.get + val leaderId = quorumInfo.leaderId + val leader = quorumInfo.voters.asScala.filter(_.replicaId == leaderId).head + val maxLagFollower = quorumInfo.voters.asScala + .minBy(_.logEndOffset) + val maxFollowerLag = leader.logEndOffset - maxLagFollower.logEndOffset val maxFollowerLagTimeMs = - if (leader.lastCaughtUpTimeMs().isPresent && maxLagFollower.lastCaughtUpTimeMs().isPresent) { - leader.lastCaughtUpTimeMs().getAsLong - maxLagFollower.lastCaughtUpTimeMs().getAsLong + if (leader == maxLagFollower) { + 0 + } else if (leader.lastCaughtUpTimeMs.isPresent && maxLagFollower.lastCaughtUpTimeMs.isPresent) { + leader.lastCaughtUpTimeMs.getAsLong - maxLagFollower.lastCaughtUpTimeMs.getAsLong } else { -1 } println( s"""|ClusterId: $clusterId - |LeaderId: ${quorumInfo.leaderId()} - |LeaderEpoch: ${quorumInfo.leaderEpoch()} - |HighWatermark: ${quorumInfo.highWatermark()} + |LeaderId: ${quorumInfo.leaderId} + |LeaderEpoch: ${quorumInfo.leaderEpoch} + |HighWatermark: ${quorumInfo.highWatermark} |MaxFollowerLag: $maxFollowerLag |MaxFollowerLagTimeMs: $maxFollowerLagTimeMs - |CurrentVoters: ${quorumInfo.voters().asScala.map(_.replicaId()).mkString("[", ",", "]")} - |CurrentObservers: ${quorumInfo.observers().asScala.map(_.replicaId()).mkString("[", ",", "]")} + |CurrentVoters: ${quorumInfo.voters.asScala.map(_.replicaId).mkString("[", ",", "]")} + |CurrentObservers: ${quorumInfo.observers.asScala.map(_.replicaId).mkString("[", ",", "]")} |""".stripMargin ) } diff --git a/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala b/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala index c75b0ecca6ef2..51fcb9fb65654 100644 --- a/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala @@ -32,9 +32,20 @@ import java.util.concurrent.ExecutionException @Tag("integration") class MetadataQuorumCommandTest(cluster: ClusterInstance) { + /** + * 1. The same number of broker controllers + * 2. More brokers than controllers + * 3. Fewer brokers than controllers + */ @ClusterTests( - Array(new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 3), - new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 3))) + Array( + new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 3), + new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 3), + new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 4), + new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 4), + new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 4, controllers = 3), + new ClusterTest(clusterType = Type.KRAFT, brokers = 4, controllers = 3) + )) def testDescribeQuorumReplicationSuccessful(): Unit = { val describeOutput = TestUtils.grabConsoleOutput( MetadataQuorumCommand.mainNoExit( @@ -46,23 +57,35 @@ class MetadataQuorumCommandTest(cluster: ClusterInstance) { val observerPattern = """\d+\s+\d+\s+\d+\s+[-]?\d+\s+[-]?\d+\s+Observer\s+""".r val outputs = describeOutput.split("\n").tail if (cluster.config().clusterType() == Type.CO_KRAFT) { - assertEquals(cluster.config().numControllers(), outputs.length) + assertEquals(Math.max(cluster.config().numControllers(), cluster.config().numBrokers()), outputs.length) } else { assertEquals(cluster.config().numBrokers() + cluster.config().numControllers(), outputs.length) } - assertTrue(leaderPattern.matches(outputs.head)) + assertTrue(leaderPattern.matches(outputs.head), "[" + outputs.head + "]") assertEquals(1, outputs.count(leaderPattern.matches(_))) assertEquals(cluster.config().numControllers() - 1, outputs.count(followerPattern.matches(_))) + if (cluster.config().clusterType() == Type.CO_KRAFT) { - assertEquals(0, outputs.count(observerPattern.matches(_))) + assertEquals(Math.max(0, cluster.config().numBrokers() - cluster.config().numControllers()), outputs.count(observerPattern.matches(_))) } else { assertEquals(cluster.config().numBrokers(), outputs.count(observerPattern.matches(_))) } } + /** + * 1. The same number of broker controllers + * 2. More brokers than controllers + * 3. Fewer brokers than controllers + */ @ClusterTests( - Array(new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 3), - new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 3))) + Array( + new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 3), + new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 3), + new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 4), + new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 4), + new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 4, controllers = 3), + new ClusterTest(clusterType = Type.KRAFT, brokers = 4, controllers = 3) + )) def testDescribeQuorumStatusSuccessful(): Unit = { val describeOutput = TestUtils.grabConsoleOutput( MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status")) @@ -76,13 +99,32 @@ class MetadataQuorumCommandTest(cluster: ClusterInstance) { assertTrue("""MaxFollowerLag:\s+\d+""".r.matches(outputs(4))) assertTrue("""MaxFollowerLagTimeMs:\s+[-]?\d+""".r.matches(outputs(5)), "[" + outputs(5) + "]") assertTrue("""CurrentVoters:\s+\[\d+(,\d+)*\]""".r.matches(outputs(6))) - if (cluster.config().clusterType() == Type.CO_KRAFT) { + + // There are no observers if we have fewer brokers than controllers + if (cluster.config().clusterType() == Type.CO_KRAFT + && cluster.config().numBrokers() <= cluster.config().numControllers()) { assertTrue("""CurrentObservers:\s+\[\]""".r.matches(outputs(7))) } else { assertTrue("""CurrentObservers:\s+\[\d+(,\d+)*\]""".r.matches(outputs(7))) } } + @ClusterTests( + Array(new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 1, controllers = 1), + new ClusterTest(clusterType = Type.KRAFT, brokers = 1, controllers = 1))) + def testOnlyOneBrokerAndOneController(): Unit = { + val statusOutput = TestUtils.grabConsoleOutput( + MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status")) + ) + assertEquals("MaxFollowerLag: 0", statusOutput.split("\n")(4)) + assertEquals("MaxFollowerLagTimeMs: 0", statusOutput.split("\n")(5)) + + val replicationOutput = TestUtils.grabConsoleOutput( + MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--replication")) + ) + assertEquals("0", replicationOutput.split("\n").last.split("\\s+")(2)) + } + @ClusterTest(clusterType = Type.ZK, brokers = 3, controllers = 1) def testDescribeQuorumInZkMode(): Unit = { assertTrue( diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java index 92e713ac3b43b..fa644051be2ae 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java @@ -66,6 +66,7 @@ import static java.util.Collections.singleton; import static java.util.Collections.singletonList; import static net.sourceforge.argparse4j.impl.Arguments.store; +import static org.apache.kafka.common.utils.Utils.prettyPrintTable; public abstract class TransactionsCommand { private static final Logger log = LoggerFactory.getLogger(TransactionsCommand.class); @@ -903,51 +904,6 @@ private void consumeInBatches( } } - private static void appendColumnValue( - StringBuilder rowBuilder, - String value, - int length - ) { - int padLength = length - value.length(); - rowBuilder.append(value); - for (int i = 0; i < padLength; i++) - rowBuilder.append(' '); - } - - private static void printRow( - List columnLengths, - String[] row, - PrintStream out - ) { - StringBuilder rowBuilder = new StringBuilder(); - for (int i = 0; i < row.length; i++) { - Integer columnLength = columnLengths.get(i); - String columnValue = row[i]; - appendColumnValue(rowBuilder, columnValue, columnLength); - rowBuilder.append('\t'); - } - out.println(rowBuilder); - } - - private static void prettyPrintTable( - String[] headers, - List rows, - PrintStream out - ) { - List columnLengths = Arrays.stream(headers) - .map(String::length) - .collect(Collectors.toList()); - - for (String[] row : rows) { - for (int i = 0; i < headers.length; i++) { - columnLengths.set(i, Math.max(columnLengths.get(i), row[i].length())); - } - } - - printRow(columnLengths, headers, out); - rows.forEach(row -> printRow(columnLengths, row, out)); - } - private static void printErrorAndExit(String message, Throwable t) { log.debug(message, t); From 1a1aef699825f9ff63fe1d380b385100ac3ee37b Mon Sep 17 00:00:00 2001 From: dengziming Date: Tue, 16 Aug 2022 18:20:49 +0800 Subject: [PATCH 5/9] remove methods only supported from 2.13 --- .../admin/MetadataQuorumCommandTest.scala | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala b/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala index 51fcb9fb65654..a034d9749d639 100644 --- a/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala @@ -61,14 +61,15 @@ class MetadataQuorumCommandTest(cluster: ClusterInstance) { } else { assertEquals(cluster.config().numBrokers() + cluster.config().numControllers(), outputs.length) } - assertTrue(leaderPattern.matches(outputs.head), "[" + outputs.head + "]") - assertEquals(1, outputs.count(leaderPattern.matches(_))) - assertEquals(cluster.config().numControllers() - 1, outputs.count(followerPattern.matches(_))) + // `matches` is not supported in scala 2.12, use `findFirstIn` instead. + assertTrue(leaderPattern.findFirstIn(outputs.head).nonEmpty) + assertEquals(1, outputs.count(leaderPattern.findFirstIn(_).nonEmpty)) + assertEquals(cluster.config().numControllers() - 1, outputs.count(followerPattern.findFirstIn(_).nonEmpty)) if (cluster.config().clusterType() == Type.CO_KRAFT) { - assertEquals(Math.max(0, cluster.config().numBrokers() - cluster.config().numControllers()), outputs.count(observerPattern.matches(_))) + assertEquals(Math.max(0, cluster.config().numBrokers() - cluster.config().numControllers()), outputs.count(observerPattern.findFirstIn(_).nonEmpty)) } else { - assertEquals(cluster.config().numBrokers(), outputs.count(observerPattern.matches(_))) + assertEquals(cluster.config().numBrokers(), outputs.count(observerPattern.findFirstIn(_).nonEmpty)) } } @@ -92,20 +93,20 @@ class MetadataQuorumCommandTest(cluster: ClusterInstance) { ) val outputs = describeOutput.split("\n") - assertTrue("""ClusterId:\s+\S{22}""".r.matches(outputs(0))) - assertTrue("""LeaderId:\s+\d+""".r.matches(outputs(1))) - assertTrue("""LeaderEpoch:\s+\d+""".r.matches(outputs(2))) - assertTrue("""HighWatermark:\s+\d+""".r.matches(outputs(3))) - assertTrue("""MaxFollowerLag:\s+\d+""".r.matches(outputs(4))) - assertTrue("""MaxFollowerLagTimeMs:\s+[-]?\d+""".r.matches(outputs(5)), "[" + outputs(5) + "]") - assertTrue("""CurrentVoters:\s+\[\d+(,\d+)*\]""".r.matches(outputs(6))) + assertTrue("""ClusterId:\s+\S{22}""".r.findFirstIn(outputs(0)).nonEmpty) + assertTrue("""LeaderId:\s+\d+""".r.findFirstIn(outputs(1)).nonEmpty) + assertTrue("""LeaderEpoch:\s+\d+""".r.findFirstIn(outputs(2)).nonEmpty) + assertTrue("""HighWatermark:\s+\d+""".r.findFirstIn(outputs(3)).nonEmpty) + assertTrue("""MaxFollowerLag:\s+\d+""".r.findFirstIn(outputs(4)).nonEmpty) + assertTrue("""MaxFollowerLagTimeMs:\s+[-]?\d+""".r.findFirstIn(outputs(5)).nonEmpty) + assertTrue("""CurrentVoters:\s+\[\d+(,\d+)*\]""".r.findFirstIn(outputs(6)).nonEmpty) // There are no observers if we have fewer brokers than controllers if (cluster.config().clusterType() == Type.CO_KRAFT && cluster.config().numBrokers() <= cluster.config().numControllers()) { - assertTrue("""CurrentObservers:\s+\[\]""".r.matches(outputs(7))) + assertTrue("""CurrentObservers:\s+\[\]""".r.findFirstIn(outputs(7)).nonEmpty) } else { - assertTrue("""CurrentObservers:\s+\[\d+(,\d+)*\]""".r.matches(outputs(7))) + assertTrue("""CurrentObservers:\s+\[\d+(,\d+)*\]""".r.findFirstIn(outputs(7)).nonEmpty) } } From b9b1685e41e746cbe678959a71a03bc9327afedb Mon Sep 17 00:00:00 2001 From: dengziming Date: Wed, 17 Aug 2022 11:09:12 +0800 Subject: [PATCH 6/9] more feedbacks --- build.gradle | 1 + .../org/apache/kafka/common/utils/Utils.java | 46 ----------------- .../kafka/admin/MetadataQuorumCommand.scala | 18 +++---- .../test/java/kafka/test/ClusterInstance.java | 3 -- .../junit/RaftClusterInvocationContext.java | 1 - .../junit/ZkClusterInvocationContext.java | 6 --- .../admin/MetadataQuorumCommandTest.scala | 2 +- .../apache/kafka/server/util}/ToolsUtils.java | 51 ++++++++++++++++++- .../kafka/tools/ProducerPerformance.java | 1 + .../kafka/tools/TransactionsCommand.java | 2 +- 10 files changed, 63 insertions(+), 68 deletions(-) rename {tools/src/main/java/org/apache/kafka/tools => server-common/src/main/java/org/apache/kafka/server/util}/ToolsUtils.java (61%) diff --git a/build.gradle b/build.gradle index f17011ca4d205..7c38d899a6e16 100644 --- a/build.gradle +++ b/build.gradle @@ -1705,6 +1705,7 @@ project(':tools') { dependencies { implementation project(':clients') + implementation project(':server-common') implementation project(':log4j-appender') implementation libs.argparse4j implementation libs.jacksonDatabind diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 3eab4ddfb7cad..7d84167cf24fc 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.common.utils; -import java.io.PrintStream; import java.nio.BufferUnderflowException; import java.nio.file.StandardOpenOption; import java.util.AbstractMap; @@ -1452,49 +1451,4 @@ public static String[] enumOptions(Class> enumClass) { .toArray(String[]::new); } - private static void appendColumnValue( - StringBuilder rowBuilder, - String value, - int length - ) { - int padLength = length - value.length(); - rowBuilder.append(value); - for (int i = 0; i < padLength; i++) - rowBuilder.append(' '); - } - - private static void printRow( - List columnLengths, - String[] row, - PrintStream out - ) { - StringBuilder rowBuilder = new StringBuilder(); - for (int i = 0; i < row.length; i++) { - Integer columnLength = columnLengths.get(i); - String columnValue = row[i]; - appendColumnValue(rowBuilder, columnValue, columnLength); - rowBuilder.append('\t'); - } - out.println(rowBuilder); - } - - public static void prettyPrintTable( - String[] headers, - List rows, - PrintStream out - ) { - List columnLengths = Arrays.stream(headers) - .map(String::length) - .collect(Collectors.toList()); - - for (String[] row : rows) { - for (int i = 0; i < headers.length; i++) { - columnLengths.set(i, Math.max(columnLengths.get(i), row[i].length())); - } - } - - printRow(columnLengths, headers, out); - rows.forEach(row -> printRow(columnLengths, row, out)); - } - } diff --git a/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala b/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala index d27df8c28ed40..b6e4e1597b557 100644 --- a/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala +++ b/core/src/main/scala/kafka/admin/MetadataQuorumCommand.scala @@ -24,7 +24,7 @@ import net.sourceforge.argparse4j.inf.Subparsers import org.apache.kafka.clients._ import org.apache.kafka.clients.admin.{Admin, QuorumInfo} import org.apache.kafka.common.utils.Utils -import org.apache.kafka.common.utils.Utils.prettyPrintTable +import org.apache.kafka.server.util.ToolsUtils.prettyPrintTable import java.io.File import java.util.Properties @@ -85,7 +85,7 @@ object MetadataQuorumCommand { throw new TerseFailure(s"One of --status or --replication must be specified with describe sub-command") } } else { - // currently we only support describe + throw new IllegalStateException(s"Unknown command: $command, only 'describe' is supported") } 0 } catch { @@ -123,17 +123,17 @@ object MetadataQuorumCommand { val leader = quorumInfo.voters.asScala.filter(_.replicaId == leaderId).head def convertQuorumInfo(infos: Seq[QuorumInfo.ReplicaState], status: String): Seq[Array[String]] = - infos.map { voter => - Array(voter.replicaId, - voter.logEndOffset, - leader.logEndOffset - voter.logEndOffset, - voter.lastFetchTimeMs.orElse(-1), - voter.lastCaughtUpTimeMs.orElse(-1), + infos.map { info => + Array(info.replicaId, + info.logEndOffset, + leader.logEndOffset - info.logEndOffset, + info.lastFetchTimeMs.orElse(-1), + info.lastCaughtUpTimeMs.orElse(-1), status ).map(_.toString) } prettyPrintTable( - Array("ReplicaId", "LogEndOffset", "Lag", "LastFetchTimeMs", "LastCaughtUpTimeMs", "Status"), + Array("NodeId", "LogEndOffset", "Lag", "LastFetchTimeMs", "LastCaughtUpTimeMs", "Status"), (convertQuorumInfo(Seq(leader), "Leader") ++ convertQuorumInfo(quorumInfo.voters.asScala.filter(_.replicaId != leaderId).toSeq, "Follower") ++ convertQuorumInfo(quorumInfo.observers.asScala.toSeq, "Observer")).asJava, diff --git a/core/src/test/java/kafka/test/ClusterInstance.java b/core/src/test/java/kafka/test/ClusterInstance.java index 1a947541e053d..9058508fa94f7 100644 --- a/core/src/test/java/kafka/test/ClusterInstance.java +++ b/core/src/test/java/kafka/test/ClusterInstance.java @@ -19,7 +19,6 @@ import kafka.network.SocketServer; import kafka.server.BrokerFeatures; -import kafka.server.ControllerServer; import kafka.test.annotation.ClusterTest; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.common.network.ListenerName; @@ -116,8 +115,6 @@ default Optional controlPlaneListenerName() { */ Map brokerFeatures(); - Collection controllerServers(); - /** * The underlying object which is responsible for setting up and tearing down the cluster. */ diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java index 9f96f47ffae6d..f5c281ff24967 100644 --- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java +++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java @@ -183,7 +183,6 @@ public Map brokerFeatures() { )); } - @Override public Collection controllerServers() { return controllers().collect(Collectors.toList()); } diff --git a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java index 019d143c45f6d..18a85e2d7bf66 100644 --- a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java +++ b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java @@ -20,7 +20,6 @@ import kafka.api.IntegrationTestHarness; import kafka.network.SocketServer; import kafka.server.BrokerFeatures; -import kafka.server.ControllerServer; import kafka.server.KafkaConfig; import kafka.server.KafkaServer; import kafka.test.ClusterConfig; @@ -249,11 +248,6 @@ public Map brokerFeatures() { )); } - @Override - public Collection controllerServers() { - throw new UnsupportedOperationException("Can't get controller server in zk mode"); - } - @Override public ClusterType clusterType() { return ClusterType.ZK; diff --git a/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala b/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala index a034d9749d639..0535229943b4a 100644 --- a/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala @@ -126,7 +126,7 @@ class MetadataQuorumCommandTest(cluster: ClusterInstance) { assertEquals("0", replicationOutput.split("\n").last.split("\\s+")(2)) } - @ClusterTest(clusterType = Type.ZK, brokers = 3, controllers = 1) + @ClusterTest(clusterType = Type.ZK, brokers = 3) def testDescribeQuorumInZkMode(): Unit = { assertTrue( assertThrows( diff --git a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java b/server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java similarity index 61% rename from tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java rename to server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java index 3a80b5811f3fd..0c923cd66c1ce 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ToolsUtils.java +++ b/server-common/src/main/java/org/apache/kafka/server/util/ToolsUtils.java @@ -14,13 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.tools; +package org.apache.kafka.server.util; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; +import java.io.PrintStream; +import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.TreeMap; +import java.util.stream.Collectors; public class ToolsUtils { @@ -52,4 +56,49 @@ public static void printMetrics(Map metrics) { } } } + + private static void appendColumnValue( + StringBuilder rowBuilder, + String value, + int length + ) { + int padLength = length - value.length(); + rowBuilder.append(value); + for (int i = 0; i < padLength; i++) + rowBuilder.append(' '); + } + + private static void printRow( + List columnLengths, + String[] row, + PrintStream out + ) { + StringBuilder rowBuilder = new StringBuilder(); + for (int i = 0; i < row.length; i++) { + Integer columnLength = columnLengths.get(i); + String columnValue = row[i]; + appendColumnValue(rowBuilder, columnValue, columnLength); + rowBuilder.append('\t'); + } + out.println(rowBuilder); + } + + public static void prettyPrintTable( + String[] headers, + List rows, + PrintStream out + ) { + List columnLengths = Arrays.stream(headers) + .map(String::length) + .collect(Collectors.toList()); + + for (String[] row : rows) { + for (int i = 0; i < headers.length; i++) { + columnLengths.set(i, Math.max(columnLengths.get(i), row[i].length())); + } + } + + printRow(columnLengths, headers, out); + rows.forEach(row -> printRow(columnLengths, row, out)); + } } diff --git a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java index 6967a16fa6bd5..f2ee53cb3f0bf 100644 --- a/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java +++ b/tools/src/main/java/org/apache/kafka/tools/ProducerPerformance.java @@ -43,6 +43,7 @@ import net.sourceforge.argparse4j.inf.Namespace; import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.server.util.ToolsUtils; public class ProducerPerformance { diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java index fa644051be2ae..194524d2654a7 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/TransactionsCommand.java @@ -66,7 +66,7 @@ import static java.util.Collections.singleton; import static java.util.Collections.singletonList; import static net.sourceforge.argparse4j.impl.Arguments.store; -import static org.apache.kafka.common.utils.Utils.prettyPrintTable; +import static org.apache.kafka.server.util.ToolsUtils.prettyPrintTable; public abstract class TransactionsCommand { private static final Logger log = LoggerFactory.getLogger(TransactionsCommand.class); From ea9f6108ce9aa353d938ab051b15c8049455669f Mon Sep 17 00:00:00 2001 From: dengziming Date: Wed, 17 Aug 2022 17:38:11 +0800 Subject: [PATCH 7/9] fix checkstyle --- checkstyle/import-control.xml | 1 + 1 file changed, 1 insertion(+) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 4b07a26cba5c9..d24d1e7e5e93d 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -366,6 +366,7 @@ + From e05bfe59de9a5273f65eba1d5edde28421bdf887 Mon Sep 17 00:00:00 2001 From: dengziming Date: Thu, 18 Aug 2022 13:28:31 +0800 Subject: [PATCH 8/9] fix flaky test --- .../admin/MetadataQuorumCommandTest.scala | 37 ++++++++++++++----- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala b/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala index 0535229943b4a..6864d5a46d30b 100644 --- a/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala @@ -19,19 +19,34 @@ package kafka.admin import kafka.test.ClusterInstance import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, ClusterTests, Type} import kafka.test.junit.ClusterTestExtensions +import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance import kafka.utils.TestUtils import org.apache.kafka.common.errors.UnsupportedVersionException +import org.apache.kafka.controller.QuorumController import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue} import org.junit.jupiter.api.{Tag, Test} import org.junit.jupiter.api.extension.ExtendWith import java.util.concurrent.ExecutionException +import scala.jdk.CollectionConverters._ @ExtendWith(value = Array(classOf[ClusterTestExtensions])) @ClusterTestDefaults(clusterType = Type.KRAFT) @Tag("integration") class MetadataQuorumCommandTest(cluster: ClusterInstance) { + private def ensureConsistentKRaftMetadata(): Unit = { + cluster.waitForReadyBrokers() + TestUtils.waitUntilTrue( + () => cluster.asInstanceOf[RaftClusterInstance].controllers().filter(_.controller.asInstanceOf[QuorumController].isActive).count() == 1, + "Timeout waiting for leader election" + ) + TestUtils.ensureConsistentKRaftMetadata( + cluster.asInstanceOf[RaftClusterInstance].brokers().iterator().asScala.toSeq, + cluster.asInstanceOf[RaftClusterInstance].controllers().filter(_.controller.asInstanceOf[QuorumController].isActive).findFirst().get() + ) + } + /** * 1. The same number of broker controllers * 2. More brokers than controllers @@ -41,12 +56,13 @@ class MetadataQuorumCommandTest(cluster: ClusterInstance) { Array( new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 3), new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 3), - new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 4), - new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 4), - new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 4, controllers = 3), - new ClusterTest(clusterType = Type.KRAFT, brokers = 4, controllers = 3) + new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 2), + new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 2), + new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 2, controllers = 3), + new ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 3) )) def testDescribeQuorumReplicationSuccessful(): Unit = { + ensureConsistentKRaftMetadata() val describeOutput = TestUtils.grabConsoleOutput( MetadataQuorumCommand.mainNoExit( Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--replication")) @@ -82,12 +98,13 @@ class MetadataQuorumCommandTest(cluster: ClusterInstance) { Array( new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 3), new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 3), - new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 4), - new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 4), - new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 4, controllers = 3), - new ClusterTest(clusterType = Type.KRAFT, brokers = 4, controllers = 3) + new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 3, controllers = 2), + new ClusterTest(clusterType = Type.KRAFT, brokers = 3, controllers = 2), + new ClusterTest(clusterType = Type.CO_KRAFT, brokers = 2, controllers = 3), + new ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 3) )) def testDescribeQuorumStatusSuccessful(): Unit = { + ensureConsistentKRaftMetadata() val describeOutput = TestUtils.grabConsoleOutput( MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status")) ) @@ -96,7 +113,7 @@ class MetadataQuorumCommandTest(cluster: ClusterInstance) { assertTrue("""ClusterId:\s+\S{22}""".r.findFirstIn(outputs(0)).nonEmpty) assertTrue("""LeaderId:\s+\d+""".r.findFirstIn(outputs(1)).nonEmpty) assertTrue("""LeaderEpoch:\s+\d+""".r.findFirstIn(outputs(2)).nonEmpty) - assertTrue("""HighWatermark:\s+\d+""".r.findFirstIn(outputs(3)).nonEmpty) + assertTrue("""HighWatermark:\s+[-]?\d+""".r.findFirstIn(outputs(3)).nonEmpty) assertTrue("""MaxFollowerLag:\s+\d+""".r.findFirstIn(outputs(4)).nonEmpty) assertTrue("""MaxFollowerLagTimeMs:\s+[-]?\d+""".r.findFirstIn(outputs(5)).nonEmpty) assertTrue("""CurrentVoters:\s+\[\d+(,\d+)*\]""".r.findFirstIn(outputs(6)).nonEmpty) @@ -123,7 +140,7 @@ class MetadataQuorumCommandTest(cluster: ClusterInstance) { val replicationOutput = TestUtils.grabConsoleOutput( MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--replication")) ) - assertEquals("0", replicationOutput.split("\n").last.split("\\s+")(2)) + assertEquals("0", replicationOutput.split("\n")(1).split("\\s+")(2)) } @ClusterTest(clusterType = Type.ZK, brokers = 3) From 428b1d0bd880a759c84af78c1d9cf905daad707f Mon Sep 17 00:00:00 2001 From: dengziming Date: Sat, 20 Aug 2022 08:43:49 +0800 Subject: [PATCH 9/9] rebase trunk and improve code --- .../admin/MetadataQuorumCommandTest.scala | 26 +++++-------------- 1 file changed, 6 insertions(+), 20 deletions(-) diff --git a/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala b/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala index 6864d5a46d30b..24b6616cb1edb 100644 --- a/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala +++ b/core/src/test/scala/unit/kafka/admin/MetadataQuorumCommandTest.scala @@ -19,34 +19,19 @@ package kafka.admin import kafka.test.ClusterInstance import kafka.test.annotation.{ClusterTest, ClusterTestDefaults, ClusterTests, Type} import kafka.test.junit.ClusterTestExtensions -import kafka.test.junit.RaftClusterInvocationContext.RaftClusterInstance import kafka.utils.TestUtils import org.apache.kafka.common.errors.UnsupportedVersionException -import org.apache.kafka.controller.QuorumController import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue} import org.junit.jupiter.api.{Tag, Test} import org.junit.jupiter.api.extension.ExtendWith import java.util.concurrent.ExecutionException -import scala.jdk.CollectionConverters._ @ExtendWith(value = Array(classOf[ClusterTestExtensions])) @ClusterTestDefaults(clusterType = Type.KRAFT) @Tag("integration") class MetadataQuorumCommandTest(cluster: ClusterInstance) { - private def ensureConsistentKRaftMetadata(): Unit = { - cluster.waitForReadyBrokers() - TestUtils.waitUntilTrue( - () => cluster.asInstanceOf[RaftClusterInstance].controllers().filter(_.controller.asInstanceOf[QuorumController].isActive).count() == 1, - "Timeout waiting for leader election" - ) - TestUtils.ensureConsistentKRaftMetadata( - cluster.asInstanceOf[RaftClusterInstance].brokers().iterator().asScala.toSeq, - cluster.asInstanceOf[RaftClusterInstance].controllers().filter(_.controller.asInstanceOf[QuorumController].isActive).findFirst().get() - ) - } - /** * 1. The same number of broker controllers * 2. More brokers than controllers @@ -62,15 +47,15 @@ class MetadataQuorumCommandTest(cluster: ClusterInstance) { new ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 3) )) def testDescribeQuorumReplicationSuccessful(): Unit = { - ensureConsistentKRaftMetadata() + cluster.waitForReadyBrokers() val describeOutput = TestUtils.grabConsoleOutput( MetadataQuorumCommand.mainNoExit( Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--replication")) ) - val leaderPattern = """\d+\s+\d+\s+\d+\s+[-]?\d+\s+[-]?\d+\s+Leader\s+""".r - val followerPattern = """\d+\s+\d+\s+\d+\s+[-]?\d+\s+[-]?\d+\s+Follower\s+""".r - val observerPattern = """\d+\s+\d+\s+\d+\s+[-]?\d+\s+[-]?\d+\s+Observer\s+""".r + val leaderPattern = """\d+\s+\d+\s+\d+\s+\d+\s+[-]?\d+\s+Leader\s+""".r + val followerPattern = """\d+\s+\d+\s+\d+\s+\d+\s+[-]?\d+\s+Follower\s+""".r + val observerPattern = """\d+\s+\d+\s+\d+\s+\d+\s+[-]?\d+\s+Observer\s+""".r val outputs = describeOutput.split("\n").tail if (cluster.config().clusterType() == Type.CO_KRAFT) { assertEquals(Math.max(cluster.config().numControllers(), cluster.config().numBrokers()), outputs.length) @@ -104,7 +89,7 @@ class MetadataQuorumCommandTest(cluster: ClusterInstance) { new ClusterTest(clusterType = Type.KRAFT, brokers = 2, controllers = 3) )) def testDescribeQuorumStatusSuccessful(): Unit = { - ensureConsistentKRaftMetadata() + cluster.waitForReadyBrokers() val describeOutput = TestUtils.grabConsoleOutput( MetadataQuorumCommand.mainNoExit(Array("--bootstrap-server", cluster.bootstrapServers(), "describe", "--status")) ) @@ -113,6 +98,7 @@ class MetadataQuorumCommandTest(cluster: ClusterInstance) { assertTrue("""ClusterId:\s+\S{22}""".r.findFirstIn(outputs(0)).nonEmpty) assertTrue("""LeaderId:\s+\d+""".r.findFirstIn(outputs(1)).nonEmpty) assertTrue("""LeaderEpoch:\s+\d+""".r.findFirstIn(outputs(2)).nonEmpty) + // HighWatermark may be -1 assertTrue("""HighWatermark:\s+[-]?\d+""".r.findFirstIn(outputs(3)).nonEmpty) assertTrue("""MaxFollowerLag:\s+\d+""".r.findFirstIn(outputs(4)).nonEmpty) assertTrue("""MaxFollowerLagTimeMs:\s+[-]?\d+""".r.findFirstIn(outputs(5)).nonEmpty)