KAFKA-12454: Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster#10304
Conversation
…do not exist in current kafka cluster
…do not exist in current kafka cluster
|
@chia7712 Can you help review this PR? Thank you. |
dajac
left a comment
There was a problem hiding this comment.
Thanks for the PR. I left a minor suggestion. Could we also add a test case?
| val nonExistBrokers: Array[Int] = brokerList.filterNot(brokerId => clusterBrokers.contains(brokerId)) | ||
| if (!nonExistBrokers.isEmpty) { | ||
| System.err.println(s"The given node(s) does not exist from broker-list ${nonExistBrokers.mkString(",")}") | ||
| sys.exit(1) |
There was a problem hiding this comment.
Should we do this only when handle the brokers provided by the user? It does not make sense to validate the list of brokers otherwise. What do you think?
There was a problem hiding this comment.
Should we do this only when handle the brokers provided by the user? It does not make sense to validate the list of brokers otherwise. What do you think?
Your suggestion is very good.I have changed the logic, please review.
…do not exist in current kafka cluster
Thanks for your comment.Your suggestion is very good, we only need to judge on the node entered by the user, I have added a unit test, please review it again.Thank you. |
chia7712
left a comment
There was a problem hiding this comment.
@wenbingshen thanks for this contribution. a couple of comments are left. Most of them are related to code style. Otherwise, LGTM
| case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt) | ||
| case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray | ||
| case Some(brokerListStr) => { | ||
| val inputBrokers: Array[Int] = brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt) |
There was a problem hiding this comment.
Could you replace !_.isEmpty by _.nonEmpty? It seems to me the later is more readable.
| val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match { | ||
| case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt) | ||
| case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray | ||
| case Some(brokerListStr) => { |
| out.println("Querying brokers for log directories information") | ||
| val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava) | ||
| val logDirInfosByBroker = describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> v.asScala } | ||
| if (!nonExistBrokers.isEmpty) { |
| val opts = new LogDirsCommandOptions(args) | ||
| val adminClient = createAdminClient(opts) | ||
| val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(!_.isEmpty) | ||
| val clusterBrokers: Array[Int] = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray |
There was a problem hiding this comment.
Is Set[Int] more suitable for this case?
| case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray | ||
| case Some(brokerListStr) => { | ||
| val inputBrokers: Array[Int] = brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt) | ||
| nonExistBrokers = inputBrokers.filterNot(brokerId => clusterBrokers.contains(brokerId)) |
There was a problem hiding this comment.
How about using diff to get nonExistBrokers?
| out.println(s"Received log directory information from brokers ${brokerList.mkString(",")}") | ||
| out.println(formatAsJson(logDirInfosByBroker, topicList.toSet)) | ||
| } | ||
| adminClient.close() |
There was a problem hiding this comment.
Could you use try-finally to release adminClient?
| val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava) | ||
| val logDirInfosByBroker = describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> v.asScala } | ||
| if (!nonExistBrokers.isEmpty) { | ||
| out.println(s"ERROR: The given node(s) does not exist from broker-list ${nonExistBrokers.mkString(",")}") |
There was a problem hiding this comment.
Could we show existent brokers also?
…do not exist in current kafka cluster
Thank you very much for your patience and guidance. These comments are very important. I submitted them after modifying them. Please review them again, thank you. |
dajac
left a comment
There was a problem hiding this comment.
@wenbingshen Thanks for the updates. I have left few more minor comments. Also, it seems that the build failed. Could you check it?
| var nonExistBrokers: Set[Int] = Set.empty | ||
| try { | ||
| val clusterBrokers: Set[Int] = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet | ||
| val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match { | ||
| case Some(brokerListStr) => | ||
| val inputBrokers: Set[Int] = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet | ||
| nonExistBrokers = inputBrokers.diff(clusterBrokers) | ||
| inputBrokers | ||
| case None => clusterBrokers | ||
| } |
There was a problem hiding this comment.
nit: We usually avoid using mutable variable unless it is really necessary. In this case, I would rather return the nonExistingBrokers when the argument is processed. Something like this:
val (existingBrokers, nonExistingBrokers) = Option(opts.options.valueOf(opts.brokerListOpt)) match {
case Some(brokerListStr) =>
val inputBrokers: Set[Int] = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet
(inputBrokers, inputBrokers.diff(clusterBrokers)
case None => (clusterBrokers, Set.empty)
}
| val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty) | ||
| var nonExistBrokers: Set[Int] = Set.empty | ||
| try { | ||
| val clusterBrokers: Set[Int] = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet |
There was a problem hiding this comment.
nit: We can remove specifying Set[Int].
| val clusterBrokers: Set[Int] = adminClient.describeCluster().nodes().get().asScala.map(_.id()).toSet | ||
| val brokerList = Option(opts.options.valueOf(opts.brokerListOpt)) match { | ||
| case Some(brokerListStr) => | ||
| val inputBrokers: Set[Int] = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet |
| out.println(s"ERROR: The given node(s) does not exist from broker-list: ${nonExistBrokers.mkString(",")}. Current cluster exist node(s): ${clusterBrokers.mkString(",")}") | ||
| } else { | ||
| out.println("Querying brokers for log directories information") | ||
| val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava) |
There was a problem hiding this comment.
nit: We can remove DescribeLogDirsResult.
…do not exist in current kafka cluster
Thank you for your review and suggestions. I have submitted the latest code, and the code has been tested and compiled successfully. Please help review it again, thank you. |
…do not exist in current kafka cluster
dajac
left a comment
There was a problem hiding this comment.
@wenbingshen Thanks for the updates. Let few more minot comments.
| @@ -0,0 +1,52 @@ | |||
| package unit.kafka.admin | |||
There was a problem hiding this comment.
We must add the licence header here. You can copy it from another file.
There was a problem hiding this comment.
Sorry, after checking the compilation report, I have realized this problem and I have made changes.
| val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava) | ||
| val logDirInfosByBroker = describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> v.asScala } | ||
| if (nonExistingBrokers.nonEmpty) { | ||
| out.println(s"ERROR: The given node(s) does not exist from broker-list: ${nonExistingBrokers.mkString(",")}. Current cluster exist node(s): ${clusterBrokers.mkString(",")}") |
There was a problem hiding this comment.
nit: Should we say --broker-list instead of broker-list? Also, should we say broker(s) instead of node(s) to be consistent with the message below?
There was a problem hiding this comment.
Good idea.I will act right away.
| out.println(s"ERROR: The given node(s) does not exist from broker-list: ${nonExistingBrokers.mkString(",")}. Current cluster exist node(s): ${clusterBrokers.mkString(",")}") | ||
| } else { | ||
| out.println("Querying brokers for log directories information") | ||
| val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(existingBrokers.map(Integer.valueOf).toSeq.asJava) |
There was a problem hiding this comment.
nit: DescribeLogDirsResult can be removed.
There was a problem hiding this comment.
Sorry, I forgot this, I will change it right away
…do not exist in current kafka cluster
Thank you for your commonts.I submitted the latest code, please review it, thank you! |
…do not exist in current kafka cluster
chia7712
left a comment
There was a problem hiding this comment.
@wenbingshen thanks for your patch. overall LGTM. a couple of trivial comments are left. please take a look :)
| val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava) | ||
| val logDirInfosByBroker = describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> v.asScala } | ||
| if (nonExistingBrokers.nonEmpty) { | ||
| out.println(s"ERROR: The given brokers do not exist from --broker-list: ${nonExistingBrokers.mkString(",")}. Current cluster exist brokers: ${clusterBrokers.mkString(",")}") |
There was a problem hiding this comment.
How about current existent brokers:
| val describeLogDirsResult: DescribeLogDirsResult = adminClient.describeLogDirs(brokerList.map(Integer.valueOf).toSeq.asJava) | ||
| val logDirInfosByBroker = describeLogDirsResult.allDescriptions.get().asScala.map { case (k, v) => k -> v.asScala } | ||
| if (nonExistingBrokers.nonEmpty) { | ||
| out.println(s"ERROR: The given brokers do not exist from --broker-list: ${nonExistingBrokers.mkString(",")}. Current cluster exist brokers: ${clusterBrokers.mkString(",")}") |
There was a problem hiding this comment.
Also, could you separate this line?
| case Some(brokerListStr) => brokerListStr.split(',').filter(!_.isEmpty).map(_.toInt) | ||
| case None => adminClient.describeCluster().nodes().get().asScala.map(_.id()).toArray | ||
| } | ||
| val topicList = opts.options.valueOf(opts.topicListOpt).split(",").filter(_.nonEmpty) |
There was a problem hiding this comment.
the resource declaration should be followed by try block.
val adminClient = createAdminClient(opts)
try {
}There was a problem hiding this comment.
Good.It has been modified in the latest submission.
| assertTrue(existBrokersLineIter.hasNext) | ||
| assertTrue(existBrokersLineIter.next().contains(s"Querying brokers for log directories information")) | ||
|
|
||
| //input nonExist brokerList |
There was a problem hiding this comment.
how about using nonexistent instead of nonExist?
There was a problem hiding this comment.
It has been modified.
|
|
||
| //input nonExist brokerList | ||
| byteArrayOutputStream.reset() | ||
| LogDirsCommand.describe(Array("--bootstrap-server", brokerList, "--broker-list", "0,1,2", "--describe"), printStream) |
There was a problem hiding this comment.
Could you input duplicate ids and the check the output does not include duplicates?
There was a problem hiding this comment.
@chia7712 I have added the duplicate ids test and it passed.
…do not exist in current kafka cluster
Thanks for your comments.According to your comments, I have submitted the latest code, please review it again :) |
chia7712
left a comment
There was a problem hiding this comment.
@wenbingshen thanks for your patch. a couple of comments. please take a look.
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
| package unit.kafka.admin |
There was a problem hiding this comment.
the package name should be kafka.admin rather than package unit.kafka.admin
| val (existingBrokers, nonExistingBrokers) = Option(opts.options.valueOf(opts.brokerListOpt)) match { | ||
| case Some(brokerListStr) => | ||
| val inputBrokers = brokerListStr.split(',').filter(_.nonEmpty).map(_.toInt).toSet | ||
| (inputBrokers, inputBrokers.diff(clusterBrokers)) |
There was a problem hiding this comment.
As the variable is called existingBrokers , we should find out the "true" existent brokers. In short, it should return inputBrokers.intersect(clusterBrokers) rather than inputBrokers
…do not exist in current kafka cluster
Thanks for your comments.I submitted the latest code, please review it again. :) |
Dear @chia7712 @dajac If there are no other problems,can you help advance this pr?Thanks very much! :) |
sure. will merge it after I take a final review :) |
Conflicts: * build.gradle: keep `dependencySubstitution` Confluent addition in `resolutionStrategy` and take upstream changes. Commits: * apache-github/trunk: KAFKA-12503: inform threads to resize their cache instead of doing so for them (apache#10356) KAFKA-10697: Remove ProduceResponse.responses (apache#10332) MINOR: Exclude KIP-500.md from rat check (apache#10354) MINOR: Move `configurations.all` to be a child of `allprojects` (apache#10349) MINOR: Remove use of `NoSuchElementException` in `KafkaMetadataLog` (apache#10344) MINOR: Start the broker-to-controller channel for request forwarding (apache#10340) KAFKA-12382: add a README for KIP-500 (apache#10227) MINOR: Fix BaseHashTable sizing (apache#10334) KAFKA-10357: Add setup method to internal topics (apache#10317) MINOR: remove redundant null check when testing specified type (apache#10314) KAFKA-12293: Remove JCenter from buildscript and delete buildscript.gradle KAFKA-12491: Make rocksdb an `api` dependency for `streams` (apache#10341) KAFKA-12454: Add ERROR logging on kafka-log-dirs when given brokerIds do not exist in current kafka cluster (apache#10304) KAFKA-12459; Use property testing library for raft event simulation tests (apache#10323) MINOR: fix failing ZooKeeper system tests (apache#10297) MINOR: fix client_compatibility_features_test.py (apache#10292)
When non-existent brokerIds value are given, the kafka-log-dirs tool will have a timeout error:
Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeLogDirs
at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at kafka.admin.LogDirsCommand$.describe(LogDirsCommand.scala:50)
at kafka.admin.LogDirsCommand$.main(LogDirsCommand.scala:36)
at kafka.admin.LogDirsCommand.main(LogDirsCommand.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: describeLogDirs
When the brokerId entered by the user does not exist, an error message indicating that the node is not present should be printed.
Committer Checklist (excluded from commit message)