From 371cb03a8efdd694bd07e5ac12d1b0051051f583 Mon Sep 17 00:00:00 2001 From: Ron Dagostino Date: Mon, 13 Jan 2020 09:13:19 -0500 Subject: [PATCH 1/8] MINOR: MiniKdc JVM shutdown hook fix --- core/src/test/scala/kafka/security/minikdc/MiniKdc.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala index f3b8cda96f807..22d679fd14886 100644 --- a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala +++ b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala @@ -390,9 +390,7 @@ object MiniKdc { | """.stripMargin println(infoMessage) - Runtime.getRuntime.addShutdownHook(new KafkaThread("minikdc-shutdown-hook", false) { - miniKdc.stop() - }) + Runtime.getRuntime.addShutdownHook(new KafkaThread("minikdc-shutdown-hook", () => miniKdc.stop(), false)) } val OrgName = "org.name" From a878154cf632e7e5b959244ccbbe4fecc584c0f1 Mon Sep 17 00:00:00 2001 From: Ron Dagostino Date: Thu, 16 Jan 2020 21:57:15 -0500 Subject: [PATCH 2/8] Scala changes for kafka.utils.Exit --- .../org/apache/kafka/common/utils/Exit.java | 30 +++++ .../apache/kafka/common/utils/ExitTest.java | 91 +++++++++++++ core/src/main/scala/kafka/Kafka.scala | 4 +- .../scala/kafka/tools/ConsoleConsumer.scala | 6 +- .../scala/kafka/tools/ConsoleProducer.scala | 6 +- .../main/scala/kafka/tools/MirrorMaker.scala | 24 +--- .../kafka/tools/ReplicaVerificationTool.scala | 4 +- core/src/main/scala/kafka/utils/Exit.scala | 14 ++ .../kafka/security/minikdc/MiniKdc.scala | 7 +- .../kafka/security/minikdc/MinikdcTest.scala | 49 +++++++ .../src/test/scala/kafka/utils/ExitTest.scala | 127 ++++++++++++++++++ .../scala/other/kafka/StressTestLog.scala | 4 +- 12 files changed, 328 insertions(+), 38 deletions(-) create mode 100644 clients/src/test/java/org/apache/kafka/common/utils/ExitTest.java create mode 100644 core/src/test/scala/kafka/security/minikdc/MinikdcTest.scala create mode 100644 core/src/test/scala/kafka/utils/ExitTest.scala diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Exit.java b/clients/src/main/java/org/apache/kafka/common/utils/Exit.java index 7c2a663166e27..bfad456bba670 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Exit.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Exit.java @@ -26,6 +26,10 @@ public interface Procedure { void execute(int statusCode, String message); } + public interface ShutdownHookAdder { + void addShutdownHook(Runnable runnable, String name); + } + private static final Procedure DEFAULT_HALT_PROCEDURE = new Procedure() { @Override public void execute(int statusCode, String message) { @@ -40,8 +44,19 @@ public void execute(int statusCode, String message) { } }; + private static final ShutdownHookAdder DEFAULT_SHUTDOWN_HOOK_ADDER = new ShutdownHookAdder() { + @Override + public void addShutdownHook(Runnable runnable, String name) { + if (name != null) + Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable)); + else + Runtime.getRuntime().addShutdownHook(new Thread(runnable)); + } + }; + private volatile static Procedure exitProcedure = DEFAULT_EXIT_PROCEDURE; private volatile static Procedure haltProcedure = DEFAULT_HALT_PROCEDURE; + private volatile static ShutdownHookAdder shutdownHookAdder = DEFAULT_SHUTDOWN_HOOK_ADDER; public static void exit(int statusCode) { exit(statusCode, null); @@ -59,6 +74,14 @@ public static void halt(int statusCode, String message) { haltProcedure.execute(statusCode, message); } + public static void addShutdownHook(Runnable runnable) { + addShutdownHook(runnable, null); + } + + public static void addShutdownHook(Runnable runnable, String name) { + shutdownHookAdder.addShutdownHook(runnable, name); + } + public static void setExitProcedure(Procedure procedure) { exitProcedure = procedure; } @@ -67,6 +90,10 @@ public static void setHaltProcedure(Procedure procedure) { haltProcedure = procedure; } + public static void setShutdownHookAdder(ShutdownHookAdder shutdownHookAdder) { + Exit.shutdownHookAdder = shutdownHookAdder; + } + public static void resetExitProcedure() { exitProcedure = DEFAULT_EXIT_PROCEDURE; } @@ -75,4 +102,7 @@ public static void resetHaltProcedure() { haltProcedure = DEFAULT_HALT_PROCEDURE; } + public static void resetShutdownHookAdder() { + shutdownHookAdder = DEFAULT_SHUTDOWN_HOOK_ADDER; + } } diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ExitTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ExitTest.java new file mode 100644 index 0000000000000..dc3b307877a65 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/utils/ExitTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class ExitTest { + @Test + public void shouldHaltImmediately() { + List list = new ArrayList<>(); + Exit.setHaltProcedure((statusCode, message) -> { + list.add(statusCode); + list.add(message); + }); + try { + int statusCode = 0; + String message = "mesaage"; + Exit.halt(statusCode); + Exit.halt(statusCode, message); + assertEquals(Arrays.asList(statusCode, null, statusCode, message), list); + } finally { + Exit.resetHaltProcedure(); + } + } + + @Test + public void shouldExitImmediately() { + List list = new ArrayList<>(); + Exit.setExitProcedure((statusCode, message) -> { + list.add(statusCode); + list.add(message); + }); + try { + int statusCode = 0; + String message = "mesaage"; + Exit.exit(statusCode); + Exit.exit(statusCode, message); + assertEquals(Arrays.asList(statusCode, null, statusCode, message), list); + } finally { + Exit.resetExitProcedure(); + } + } + + @Test + public void shouldAddShutdownHookImmediately() { + List list = new ArrayList<>(); + Exit.setShutdownHookAdder((runnable, name) -> { + list.add(runnable); + list.add(name); + }); + try { + Runnable runnable = () -> { }; + String name = "name"; + Exit.addShutdownHook(runnable); + Exit.addShutdownHook(runnable, name); + assertEquals(Arrays.asList(runnable, null, runnable, name), list); + } finally { + Exit.resetShutdownHookAdder(); + } + } + + @Test + public void shouldNotInvokeShutdownHookImmediately() { + List list = new ArrayList<>(); + Runnable runnable = () -> list.add(this); + Exit.addShutdownHook(runnable); + assertEquals(0, list.size()); + Exit.addShutdownHook(runnable, "messge"); + assertEquals(0, list.size()); + } +} diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala index 5950b7145181b..bd5c3ccafd6a1 100755 --- a/core/src/main/scala/kafka/Kafka.scala +++ b/core/src/main/scala/kafka/Kafka.scala @@ -77,9 +77,7 @@ object Kafka extends Logging { } // attach shutdown handler to catch terminating signals as well as normal termination - Runtime.getRuntime().addShutdownHook(new Thread("kafka-shutdown-hook") { - override def run(): Unit = kafkaServerStartable.shutdown() - }) + Exit.addShutdownHook(() => kafkaServerStartable.shutdown, Some("kafka-shutdown-hook")) kafkaServerStartable.startup() kafkaServerStartable.awaitShutdown() diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 2e3e4f73a38cb..412ff15b88780 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -28,7 +28,7 @@ import com.typesafe.scalalogging.LazyLogging import joptsimple._ import kafka.common.MessageFormatter import kafka.utils.Implicits._ -import kafka.utils._ +import kafka.utils.{Exit, _} import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerRecord, KafkaConsumer} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.{AuthenticationException, TimeoutException, WakeupException} @@ -85,8 +85,7 @@ object ConsoleConsumer extends Logging { } def addShutdownHook(consumer: ConsumerWrapper, conf: ConsumerConfig): Unit = { - Runtime.getRuntime.addShutdownHook(new Thread() { - override def run(): Unit = { + Exit.addShutdownHook(() => { consumer.wakeup() shutdownLatch.await() @@ -94,7 +93,6 @@ object ConsoleConsumer extends Logging { if (conf.enableSystestEventsLogging) { System.out.println("shutdown_complete") } - } }) } diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index 2043d3524fc36..fe012cf8abd7c 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -44,11 +44,7 @@ object ConsoleProducer { val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps(config)) - Runtime.getRuntime.addShutdownHook(new Thread() { - override def run(): Unit = { - producer.close() - } - }) + Exit.addShutdownHook(() => producer.close) var record: ProducerRecord[Array[Byte], Array[Byte]] = null do { diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 9a6ecfd3b07b5..b991a9a8bd5c8 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -515,11 +515,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { offsetCommitIntervalMs = options.valueOf(offsetCommitIntervalMsOpt).intValue() val numStreams = options.valueOf(numStreamsOpt).intValue() - Runtime.getRuntime.addShutdownHook(new Thread("MirrorMakerShutdownHook") { - override def run(): Unit = { - cleanShutdown() - } - }) + Exit.addShutdownHook(() => cleanShutdown(), Some("MirrorMakerShutdownHook")) // create producer val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt)) @@ -536,7 +532,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { producer = new MirrorMakerProducer(sync, producerProps) // Create consumers - val customRebalanceListener: Option[ConsumerRebalanceListener] = { + val customRebalanceListener = { val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt) if (customRebalanceListenerClass != null) { val rebalanceListenerArgs = options.valueOf(rebalanceListenerArgsOpt) @@ -544,9 +540,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { Some(CoreUtils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass, rebalanceListenerArgs)) else Some(CoreUtils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass)) - } else { - None - } + } else None } val mirrorMakerConsumers = createConsumers( numStreams, @@ -562,14 +556,10 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { val customMessageHandlerClass = options.valueOf(messageHandlerOpt) val messageHandlerArgs = options.valueOf(messageHandlerArgsOpt) messageHandler = { - if (customMessageHandlerClass != null) { - if (messageHandlerArgs != null) - CoreUtils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass, messageHandlerArgs) - else - CoreUtils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass) - } else { - defaultMirrorMakerMessageHandler - } + if (customMessageHandlerClass != null) if (messageHandlerArgs != null) + CoreUtils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass, messageHandlerArgs) + else + CoreUtils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass) else defaultMirrorMakerMessageHandler } } } diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index 8b8d681502fb5..0fc557882ea40 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -200,11 +200,9 @@ object ReplicaVerificationTool extends Logging { fetcherId = counter.incrementAndGet()) } - Runtime.getRuntime.addShutdownHook(new Thread() { - override def run(): Unit = { + Exit.addShutdownHook(() => { info("Stopping all fetchers") fetcherThreads.foreach(_.shutdown()) - } }) fetcherThreads.foreach(_.start()) println(s"${ReplicaVerificationTool.getCurrentTimeString()}: verification process is started.") diff --git a/core/src/main/scala/kafka/utils/Exit.scala b/core/src/main/scala/kafka/utils/Exit.scala index 5819e97076f0a..5cee2426424cb 100644 --- a/core/src/main/scala/kafka/utils/Exit.scala +++ b/core/src/main/scala/kafka/utils/Exit.scala @@ -34,20 +34,34 @@ object Exit { throw new AssertionError("halt should not return, but it did.") } + def addShutdownHook(runnable: Runnable, name: Option[String] = None): Unit = { + JExit.addShutdownHook(runnable, name.orNull) + } + def setExitProcedure(exitProcedure: (Int, Option[String]) => Nothing): Unit = JExit.setExitProcedure(functionToProcedure(exitProcedure)) def setHaltProcedure(haltProcedure: (Int, Option[String]) => Nothing): Unit = JExit.setHaltProcedure(functionToProcedure(haltProcedure)) + def setShutdownHookAdder(shutdownHookAdder: (Runnable, Option[String]) => Unit): Unit = { + JExit.setShutdownHookAdder(functionToShutdownHookAdder(shutdownHookAdder)) + } + def resetExitProcedure(): Unit = JExit.resetExitProcedure() def resetHaltProcedure(): Unit = JExit.resetHaltProcedure() + def resetShutdownHookAdder(): Unit = + JExit.resetShutdownHookAdder() + private def functionToProcedure(procedure: (Int, Option[String]) => Nothing) = new JExit.Procedure { def execute(statusCode: Int, message: String): Unit = procedure(statusCode, Option(message)) } + private def functionToShutdownHookAdder(procedure: (Runnable, Option[String]) => Unit) = new JExit.ShutdownHookAdder { + def addShutdownHook(runnable: Runnable, name: String): Unit = procedure(runnable, Option(name)) + } } diff --git a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala index 22d679fd14886..26d6ba6e66f37 100644 --- a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala +++ b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala @@ -49,7 +49,7 @@ import org.apache.directory.server.kerberos.shared.keytab.{Keytab, KeytabEntry} import org.apache.directory.server.protocol.shared.transport.{TcpTransport, UdpTransport} import org.apache.directory.server.xdbm.Index import org.apache.directory.shared.kerberos.KerberosTime -import org.apache.kafka.common.utils.{Java, KafkaThread, Utils} +import org.apache.kafka.common.utils.{Java, Utils} /** * Mini KDC based on Apache Directory Server that can be embedded in tests or used from command line as a standalone @@ -370,7 +370,7 @@ object MiniKdc { } } - private def start(workDir: File, config: Properties, keytabFile: File, principals: Seq[String]): Unit = { + private[minikdc] def start(workDir: File, config: Properties, keytabFile: File, principals: Seq[String]): MiniKdc = { val miniKdc = new MiniKdc(config, workDir) miniKdc.start() miniKdc.createPrincipal(keytabFile, principals: _*) @@ -390,7 +390,8 @@ object MiniKdc { | """.stripMargin println(infoMessage) - Runtime.getRuntime.addShutdownHook(new KafkaThread("minikdc-shutdown-hook", () => miniKdc.stop(), false)) + Exit.addShutdownHook(() => miniKdc.stop, Some("minikdc-shutdown-hook")) + miniKdc } val OrgName = "org.name" diff --git a/core/src/test/scala/kafka/security/minikdc/MinikdcTest.scala b/core/src/test/scala/kafka/security/minikdc/MinikdcTest.scala new file mode 100644 index 0000000000000..521650ce4fff1 --- /dev/null +++ b/core/src/test/scala/kafka/security/minikdc/MinikdcTest.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.security.minikdc + +import java.io.{File, FileWriter} +import java.nio.charset.StandardCharsets +import java.nio.file.Files +import java.util.Properties + +import kafka.utils.TestUtils +import org.junit.Test +import org.junit.Assert._ + +class MinikdcTest { + @Test + def shouldNotStopImmediatelyWhenStarted(): Unit = { + val config = new Properties() + config.setProperty("kdc.bind.address", "0.0.0.0") + config.setProperty("transport", "TCP"); + config.setProperty("max.ticket.lifetime", "86400000") + config.setProperty("org.name", "Example") + config.setProperty("kdc.port", "0") + config.setProperty("org.domain", "COM") + config.setProperty("max.renewable.lifetime", "604800000") + config.setProperty("instance", "DefaultKrbServer") + val minikdc = MiniKdc.start(TestUtils.tempDir(), config, TestUtils.tempFile(), List("foo")) + val running = System.getProperty(MiniKdc.JavaSecurityKrb5Conf) != null + try { + assertTrue("MiniKdc stopped immediately; it should not have", running) + } finally { + if (running) minikdc.stop() + } + } +} \ No newline at end of file diff --git a/core/src/test/scala/kafka/utils/ExitTest.scala b/core/src/test/scala/kafka/utils/ExitTest.scala new file mode 100644 index 0000000000000..ab478185a330f --- /dev/null +++ b/core/src/test/scala/kafka/utils/ExitTest.scala @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.utils + +import org.junit.Assert.assertEquals +import org.junit.Test + + +class ExitTest { + @Test + def shouldHaltImmediately(): Unit = { + val array:Array[Any] = Array("a", "b") + def haltProcedure(exitStatus: Int, message: Option[String]) : Nothing = { + array(0) = exitStatus + array(1) = message + throw new Exception() + } + Exit.setHaltProcedure(haltProcedure) + val statusCode = 0 + val message = Some("mesaage") + try { + try { + Exit.halt(statusCode) + } catch { + case e: Exception => { + assertEquals(statusCode, array(0)) + assertEquals(None, array(1)) + } + } + try { + Exit.halt(statusCode, message) + } catch { + case e: Exception => { + assertEquals(statusCode, array(0)) + assertEquals(message, array(1)) + } + } + } finally { + Exit.resetHaltProcedure() + } + } + + @Test + def shouldExitImmediately(): Unit = { + val array:Array[Any] = Array("a", "b") + def exitProcedure(exitStatus: Int, message: Option[String]) : Nothing = { + array(0) = exitStatus + array(1) = message + throw new Exception() + } + Exit.setExitProcedure(exitProcedure) + val statusCode = 0 + val message = Some("mesaage") + try { + try { + Exit.exit(statusCode) + } catch { + case e: Exception => { + assertEquals(statusCode, array(0)) + assertEquals(None, array(1)) + } + } + try { + Exit.exit(statusCode, message) + } catch { + case e: Exception => { + assertEquals(statusCode, array(0)) + assertEquals(message, array(1)) + } + } + } finally { + Exit.resetExitProcedure() + } + } + + @Test + def shouldAddShutdownHookImmediately(): Unit = { + val array:Array[Any] = Array("a", "b") + def shutdownHookAdder(runnable: Runnable, name: Option[String]) : Unit = { + array(0) = runnable + array(1) = name + } + Exit.setShutdownHookAdder(shutdownHookAdder) + val runnable: Runnable = () => {} + val message = Some("mesaage") + try { + Exit.addShutdownHook(runnable) + assertEquals(runnable, array(0)) + assertEquals(None, array(1)) + Exit.addShutdownHook(runnable, message) + assertEquals(runnable, array(0)) + assertEquals(message, array(1)) + } finally { + Exit.resetShutdownHookAdder() + } + } + + @Test + def shouldNotInvokeShutdownHookImmediately(): Unit = { + val value = "a" + val array:Array[Any] = Array(value) + val runnable: Runnable = () => {} + try { + Exit.addShutdownHook(runnable) + assertEquals(value, array(0)) + Exit.addShutdownHook(runnable, Some("mesaage")) + assertEquals(value, array(0)) + } finally { + Exit.resetShutdownHookAdder() + } + } +} diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala index 8a5eefb859aad..cbb0fb75d8fcc 100755 --- a/core/src/test/scala/other/kafka/StressTestLog.scala +++ b/core/src/test/scala/other/kafka/StressTestLog.scala @@ -57,13 +57,11 @@ object StressTestLog { val reader = new ReaderThread(log) reader.start() - Runtime.getRuntime().addShutdownHook(new Thread() { - override def run() = { + Exit.addShutdownHook(() => { running.set(false) writer.join() reader.join() Utils.delete(dir) - } }) while(running.get) { From 114737b3323fce4d1d2d847757c86716677cfdcd Mon Sep 17 00:00:00 2001 From: Ron Dagostino Date: Thu, 16 Jan 2020 21:59:51 -0500 Subject: [PATCH 3/8] Java changes for o.a.k.common.utils.Exit --- .../java/org/apache/kafka/test/TestUtils.java | 14 ++++---- .../kafka/connect/mirror/MirrorMaker.java | 2 +- .../apache/kafka/connect/runtime/Connect.java | 3 +- .../kafka/streams/tests/EosTestClient.java | 33 +++++++++---------- .../kafka/streams/tests/EosTestDriver.java | 4 +-- .../streams/tests/ShutdownDeadlockTest.java | 7 +--- .../kafka/streams/tests/SmokeTestClient.java | 3 +- .../streams/tests/StaticMemberTestClient.java | 16 ++++----- .../StreamsBrokerDownResilienceTest.java | 6 ++-- .../tests/StreamsNamedRepartitionTest.java | 5 +-- .../streams/tests/StreamsOptimizedTest.java | 16 ++++----- .../tests/StreamsStandByReplicaTest.java | 5 +-- .../streams/tests/StreamsUpgradeTest.java | 5 +-- ...eamsUpgradeToCooperativeRebalanceTest.java | 5 +-- .../tools/TransactionalMessageCopier.java | 5 +-- .../kafka/tools/VerifiableConsumer.java | 2 +- .../kafka/tools/VerifiableLog4jAppender.java | 6 ++-- .../kafka/tools/VerifiableProducer.java | 4 +-- .../org/apache/kafka/trogdor/agent/Agent.java | 4 +-- .../trogdor/coordinator/Coordinator.java | 4 +-- 20 files changed, 70 insertions(+), 79 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index a53d1fcf2a52f..0425dc2a23fff 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -262,14 +263,11 @@ public static File tempDirectory(final Path parent, String prefix) { } file.deleteOnExit(); - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - try { - Utils.delete(file); - } catch (IOException e) { - log.error("Error deleting {}", file.getAbsolutePath(), e); - } + Exit.addShutdownHook(() -> { + try { + Utils.delete(file); + } catch (IOException e) { + log.error("Error deleting {}", file.getAbsolutePath(), e); } }); diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java index d635b1c8c3e62..e2b486ced3d74 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java @@ -164,7 +164,7 @@ public void start() { } startLatch = new CountDownLatch(herders.size()); stopLatch = new CountDownLatch(herders.size()); - Runtime.getRuntime().addShutdownHook(shutdownHook); + Exit.addShutdownHook(shutdownHook); for (Herder herder : herders.values()) { try { herder.start(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java index 773869a5ba21a..833d3a2fb2625 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.connect.runtime; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.connect.runtime.rest.RestServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +49,7 @@ public Connect(Herder herder, RestServer rest) { public void start() { try { log.info("Kafka Connect starting"); - Runtime.getRuntime().addShutdownHook(shutdownHook); + Exit.addShutdownHook(shutdownHook); herder.start(); rest.initializeResources(herder); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java index e292f16eda77a..0f4aeaab894f5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java @@ -19,6 +19,7 @@ import java.time.Duration; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; @@ -53,25 +54,21 @@ public class EosTestClient extends SmokeTestUtil { private volatile boolean isRunning = true; public void start() { - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - @Override - public void run() { - isRunning = false; - streams.close(Duration.ofSeconds(300)); - - // need to wait for callback to avoid race condition - // -> make sure the callback printout to stdout is there as it is expected test output - waitForStateTransitionCallback(); - - // do not remove these printouts since they are needed for health scripts - if (!uncaughtException) { - System.out.println(System.currentTimeMillis()); - System.out.println("EOS-TEST-CLIENT-CLOSED"); - System.out.flush(); - } - + Exit.addShutdownHook(() -> { + isRunning = false; + streams.close(Duration.ofSeconds(300)); + + // need to wait for callback to avoid race condition + // -> make sure the callback printout to stdout is there as it is expected test output + waitForStateTransitionCallback(); + + // do not remove these printouts since they are needed for health scripts + if (!uncaughtException) { + System.out.println(System.currentTimeMillis()); + System.out.println("EOS-TEST-CLIENT-CLOSED"); + System.out.flush(); } - })); + }); while (isRunning) { if (streams == null) { diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java index e98d1527e144b..3c84447b98da8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java @@ -69,11 +69,11 @@ private static synchronized void updateNumRecordsProduces(final int delta) { static void generate(final String kafka) { - Runtime.getRuntime().addShutdownHook(new Thread(() -> { + Exit.addShutdownHook(() -> { System.out.println("Terminating"); System.out.flush(); isRunning = false; - })); + }); final Properties producerProps = new Properties(); producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "EosTest"); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java index a6a2ebb33781f..77d36a6cc1348 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java @@ -62,12 +62,7 @@ public void uncaughtException(final Thread t, final Throwable e) { } }); - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - @Override - public void run() { - streams.close(Duration.ofSeconds(5)); - } - })); + Exit.addShutdownHook(() -> streams.close(Duration.ofSeconds(5))); final Properties producerProps = new Properties(); producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest"); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java index 7196657f8ea00..e7fe7a77c6f45 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; @@ -76,7 +77,7 @@ public void start(final Properties streamsProperties) { e.printStackTrace(); }); - Runtime.getRuntime().addShutdownHook(new Thread(this::close)); + Exit.addShutdownHook(() -> close()); thread = new Thread(() -> streams.start()); thread.start(); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberTestClient.java index 96ccad44de72d..3e4ceed657325 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberTestClient.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.tests; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; @@ -70,15 +71,12 @@ public static void main(final String[] args) throws Exception { streams.start(); - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - System.out.println("closing Kafka Streams instance"); - System.out.flush(); - streams.close(); - System.out.println("Static membership test closed"); - System.out.flush(); - } + Exit.addShutdownHook(() -> { + System.out.println("closing Kafka Streams instance"); + System.out.flush(); + streams.close(); + System.out.println("Static membership test closed"); + System.out.flush(); }); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java index ac4e1200a7739..1137fee78212b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.KafkaStreams; @@ -114,12 +115,11 @@ public void apply(final String key, final String value) { System.out.println("Start Kafka Streams"); streams.start(); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { + Exit.addShutdownHook(() -> { streams.close(Duration.ofSeconds(30)); System.out.println("Complete shutdown of streams resilience test app now"); System.out.flush(); - } - )); + }); } private static boolean confirmCorrectConfigs(final Properties properties) { diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java index c408d9f3da592..88f0483440758 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; @@ -110,12 +111,12 @@ public static void main(final String[] args) throws Exception { streams.start(); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { + Exit.addShutdownHook(() -> { System.out.println("closing Kafka Streams instance"); System.out.flush(); streams.close(Duration.ofMillis(5000)); System.out.println("NAMED_REPARTITION_TEST Streams Stopped"); System.out.flush(); - })); + }); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java index 62f0e7461fce5..7579b02f8dd9e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; @@ -131,15 +132,12 @@ public static void main(final String[] args) throws Exception { streams.start(); - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - System.out.println("closing Kafka Streams instance"); - System.out.flush(); - streams.close(Duration.ofMillis(5000)); - System.out.println("OPTIMIZE_TEST Streams Stopped"); - System.out.flush(); - } + Exit.addShutdownHook(() -> { + System.out.println("closing Kafka Streams instance"); + System.out.flush(); + streams.close(Duration.ofMillis(5000)); + System.out.println("OPTIMIZE_TEST Streams Stopped"); + System.out.flush(); }); } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java index 6fa225740a2c0..2ad2ce5057889 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; @@ -147,10 +148,10 @@ public static void main(final String[] args) throws IOException { System.out.println("Start Kafka Streams"); streams.start(); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { + Exit.addShutdownHook(() -> { shutdown(streams); System.out.println("Shut down streams now"); - })); + }); } private static void shutdown(final KafkaStreams streams) { diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 360f11e93e6d2..b9ed41e9e2647 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.utils.ByteBufferInputStream; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.KafkaStreams; @@ -81,13 +82,13 @@ public static void main(final String[] args) throws Exception { final KafkaStreams streams = buildStreams(streamsProperties); streams.start(); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { + Exit.addShutdownHook(() -> { System.out.println("closing Kafka Streams instance"); System.out.flush(); streams.close(); System.out.println("UPGRADE-TEST-CLIENT-CLOSED"); System.out.flush(); - })); + }); } public static KafkaStreams buildStreams(final Properties streamsProperties) { diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java index df7359388dbcf..06375a67de6f4 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; @@ -108,11 +109,11 @@ public void apply(final String key, final String value) { streams.start(); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { + Exit.addShutdownHook(() -> { streams.close(); System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase)); System.out.flush(); - })); + }); } private static void addTasksToBuilder(final List tasks, final StringBuilder builder) { diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java index 746b8fe93e588..18f57efad01bb 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java +++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.utils.Exit; import java.io.IOException; import java.time.Duration; @@ -264,7 +265,7 @@ public static void main(String[] args) throws IOException { final AtomicBoolean isShuttingDown = new AtomicBoolean(false); final AtomicLong remainingMessages = new AtomicLong(maxMessages); final AtomicLong numMessagesProcessed = new AtomicLong(0); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { + Exit.addShutdownHook(() -> { isShuttingDown.set(true); // Flush any remaining messages producer.close(); @@ -272,7 +273,7 @@ public static void main(String[] args) throws IOException { consumer.close(); } System.out.println(shutDownString(numMessagesProcessed.get(), remainingMessages.get(), transactionalId)); - })); + }); try { Random random = new Random(); diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java index 0f2715a052865..39eed9ba1883b 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java @@ -644,7 +644,7 @@ public static void main(String[] args) { try { final VerifiableConsumer consumer = createFromArgs(parser, args); - Runtime.getRuntime().addShutdownHook(new Thread(() -> consumer.close())); + Exit.addShutdownHook(() -> consumer.close()); consumer.run(); } catch (ArgumentParserException e) { diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java index 12aa4f45fde49..fbeb148e8ecbe 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java @@ -241,10 +241,8 @@ public static void main(String[] args) throws IOException { final VerifiableLog4jAppender appender = createFromArgs(args); boolean infinite = appender.maxMessages < 0; - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - // Trigger main thread to stop producing messages - appender.stopLogging = true; - })); + // Trigger main thread to stop producing messages when shutting down + Exit.addShutdownHook(() -> appender.stopLogging = true); long maxMessages = infinite ? Long.MAX_VALUE : appender.maxMessages; for (long i = 0; i < maxMessages; i++) { diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java index 3e6f3f14ce7a8..86926c4a53b0c 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java @@ -517,7 +517,7 @@ public static void main(String[] args) { final long startMs = System.currentTimeMillis(); ThroughputThrottler throttler = new ThroughputThrottler(producer.throughput, startMs); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { + Exit.addShutdownHook(() -> { // Trigger main thread to stop producing messages producer.stopProducing = true; @@ -529,7 +529,7 @@ public static void main(String[] args) { double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs)); producer.printJson(new ToolData(producer.numSent, producer.numAcked, producer.throughput, avgThroughput)); - })); + }); producer.run(throttler); } catch (ArgumentParserException e) { diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java index 12e26a713f434..4b6f8edc418c6 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java @@ -249,7 +249,7 @@ public static void main(String[] args) throws Exception { log.info("Starting agent process."); final Agent agent = new Agent(platform, Scheduler.SYSTEM, restServer, resource); restServer.start(resource); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { + Exit.addShutdownHook(() -> { log.warn("Running agent shutdown hook."); try { agent.beginShutdown(); @@ -257,7 +257,7 @@ public static void main(String[] args) throws Exception { } catch (Exception e) { log.error("Got exception while running agent shutdown hook.", e); } - })); + }); if (taskSpec != null) { TaskSpec spec = null; try { diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java index 867de5563a206..33f28e2075193 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java @@ -171,7 +171,7 @@ public static void main(String[] args) throws Exception { final Coordinator coordinator = new Coordinator(platform, Scheduler.SYSTEM, restServer, resource, ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2)); restServer.start(resource); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { + Exit.addShutdownHook(() -> { log.warn("Running coordinator shutdown hook."); try { coordinator.beginShutdown(false); @@ -179,7 +179,7 @@ public static void main(String[] args) throws Exception { } catch (Exception e) { log.error("Got exception while running coordinator shutdown hook.", e); } - })); + }); coordinator.waitForShutdown(); } }; From 473b31b24bb6dcd89b8f4c6bd781f420f155c22b Mon Sep 17 00:00:00 2001 From: Ron Dagostino Date: Tue, 21 Jan 2020 17:07:59 -0500 Subject: [PATCH 4/8] Use by-name parameter for cleaner Scala code --- core/src/main/scala/kafka/Kafka.scala | 2 +- .../scala/kafka/tools/ConsoleConsumer.scala | 2 +- .../scala/kafka/tools/ConsoleProducer.scala | 2 +- .../main/scala/kafka/tools/MirrorMaker.scala | 2 +- .../kafka/tools/ReplicaVerificationTool.scala | 2 +- core/src/main/scala/kafka/utils/Exit.scala | 10 +-- .../kafka/security/minikdc/MiniKdc.scala | 2 +- .../kafka/security/minikdc/MinikdcTest.scala | 3 - .../src/test/scala/kafka/utils/ExitTest.scala | 63 ++++++++++++------- .../scala/other/kafka/StressTestLog.scala | 2 +- 10 files changed, 54 insertions(+), 36 deletions(-) diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala index bd5c3ccafd6a1..40f11b44a6861 100755 --- a/core/src/main/scala/kafka/Kafka.scala +++ b/core/src/main/scala/kafka/Kafka.scala @@ -77,7 +77,7 @@ object Kafka extends Logging { } // attach shutdown handler to catch terminating signals as well as normal termination - Exit.addShutdownHook(() => kafkaServerStartable.shutdown, Some("kafka-shutdown-hook")) + Exit.addShutdownHook(kafkaServerStartable.shutdown, Some("kafka-shutdown-hook")) kafkaServerStartable.startup() kafkaServerStartable.awaitShutdown() diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 412ff15b88780..bfcd4148fd7eb 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -85,7 +85,7 @@ object ConsoleConsumer extends Logging { } def addShutdownHook(consumer: ConsumerWrapper, conf: ConsumerConfig): Unit = { - Exit.addShutdownHook(() => { + Exit.addShutdownHook({ consumer.wakeup() shutdownLatch.await() diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index fe012cf8abd7c..5d0f67599c5f4 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -44,7 +44,7 @@ object ConsoleProducer { val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps(config)) - Exit.addShutdownHook(() => producer.close) + Exit.addShutdownHook(producer.close) var record: ProducerRecord[Array[Byte], Array[Byte]] = null do { diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index b991a9a8bd5c8..9c5132ddcbc9e 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -515,7 +515,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { offsetCommitIntervalMs = options.valueOf(offsetCommitIntervalMsOpt).intValue() val numStreams = options.valueOf(numStreamsOpt).intValue() - Exit.addShutdownHook(() => cleanShutdown(), Some("MirrorMakerShutdownHook")) + Exit.addShutdownHook(cleanShutdown(), Some("MirrorMakerShutdownHook")) // create producer val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt)) diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index 0fc557882ea40..0ce5709dbce68 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -200,7 +200,7 @@ object ReplicaVerificationTool extends Logging { fetcherId = counter.incrementAndGet()) } - Exit.addShutdownHook(() => { + Exit.addShutdownHook({ info("Stopping all fetchers") fetcherThreads.foreach(_.shutdown()) }) diff --git a/core/src/main/scala/kafka/utils/Exit.scala b/core/src/main/scala/kafka/utils/Exit.scala index 5cee2426424cb..44f0b95b60b00 100644 --- a/core/src/main/scala/kafka/utils/Exit.scala +++ b/core/src/main/scala/kafka/utils/Exit.scala @@ -34,8 +34,8 @@ object Exit { throw new AssertionError("halt should not return, but it did.") } - def addShutdownHook(runnable: Runnable, name: Option[String] = None): Unit = { - JExit.addShutdownHook(runnable, name.orNull) + def addShutdownHook(code: => Unit, name: Option[String] = None): Unit = { + JExit.addShutdownHook(() => code, name.orNull) } def setExitProcedure(exitProcedure: (Int, Option[String]) => Nothing): Unit = @@ -44,7 +44,7 @@ object Exit { def setHaltProcedure(haltProcedure: (Int, Option[String]) => Nothing): Unit = JExit.setHaltProcedure(functionToProcedure(haltProcedure)) - def setShutdownHookAdder(shutdownHookAdder: (Runnable, Option[String]) => Unit): Unit = { + def setShutdownHookAdder(shutdownHookAdder: (=> Unit, Option[String]) => Unit): Unit = { JExit.setShutdownHookAdder(functionToShutdownHookAdder(shutdownHookAdder)) } @@ -61,7 +61,7 @@ object Exit { def execute(statusCode: Int, message: String): Unit = procedure(statusCode, Option(message)) } - private def functionToShutdownHookAdder(procedure: (Runnable, Option[String]) => Unit) = new JExit.ShutdownHookAdder { - def addShutdownHook(runnable: Runnable, name: String): Unit = procedure(runnable, Option(name)) + private def functionToShutdownHookAdder(procedure: (=> Unit, Option[String]) => Unit) = new JExit.ShutdownHookAdder { + def addShutdownHook(runnable: Runnable, name: String): Unit = procedure(runnable.run, Option(name)) } } diff --git a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala index 26d6ba6e66f37..ce3c13f912857 100644 --- a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala +++ b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala @@ -390,7 +390,7 @@ object MiniKdc { | """.stripMargin println(infoMessage) - Exit.addShutdownHook(() => miniKdc.stop, Some("minikdc-shutdown-hook")) + Exit.addShutdownHook(miniKdc.stop, Some("minikdc-shutdown-hook")) miniKdc } diff --git a/core/src/test/scala/kafka/security/minikdc/MinikdcTest.scala b/core/src/test/scala/kafka/security/minikdc/MinikdcTest.scala index 521650ce4fff1..c1d918f577272 100644 --- a/core/src/test/scala/kafka/security/minikdc/MinikdcTest.scala +++ b/core/src/test/scala/kafka/security/minikdc/MinikdcTest.scala @@ -17,9 +17,6 @@ package kafka.security.minikdc -import java.io.{File, FileWriter} -import java.nio.charset.StandardCharsets -import java.nio.file.Files import java.util.Properties import kafka.utils.TestUtils diff --git a/core/src/test/scala/kafka/utils/ExitTest.scala b/core/src/test/scala/kafka/utils/ExitTest.scala index ab478185a330f..eeb1617d9f499 100644 --- a/core/src/test/scala/kafka/utils/ExitTest.scala +++ b/core/src/test/scala/kafka/utils/ExitTest.scala @@ -20,7 +20,6 @@ package kafka.utils import org.junit.Assert.assertEquals import org.junit.Test - class ExitTest { @Test def shouldHaltImmediately(): Unit = { @@ -32,7 +31,7 @@ class ExitTest { } Exit.setHaltProcedure(haltProcedure) val statusCode = 0 - val message = Some("mesaage") + val message = Some("message") try { try { Exit.halt(statusCode) @@ -65,7 +64,7 @@ class ExitTest { } Exit.setExitProcedure(exitProcedure) val statusCode = 0 - val message = Some("mesaage") + val message = Some("message") try { try { Exit.exit(statusCode) @@ -90,21 +89,36 @@ class ExitTest { @Test def shouldAddShutdownHookImmediately(): Unit = { - val array:Array[Any] = Array("a", "b") - def shutdownHookAdder(runnable: Runnable, name: Option[String]) : Unit = { - array(0) = runnable + val array:Array[Any] = Array(0, Some("other thing")) + // immediately invoke the code to mutate the data when a hook is added + def shutdownHookAdder(code: => Unit, name: Option[String]) : Unit = { + // invoke the code (see below, it mutates the first element) + code + // mutate the second element array(1) = name } Exit.setShutdownHookAdder(shutdownHookAdder) - val runnable: Runnable = () => {} - val message = Some("mesaage") + def sideEffect(): Unit = { + // mutate the first element + array(0) = array(0).asInstanceOf[Int] + 1 + } + val message = Some("message") try { - Exit.addShutdownHook(runnable) - assertEquals(runnable, array(0)) + Exit.addShutdownHook(sideEffect) + // first element should be mutated once + assertEquals(1, array(0)) + // second element should be mutated as well assertEquals(None, array(1)) - Exit.addShutdownHook(runnable, message) - assertEquals(runnable, array(0)) + Exit.addShutdownHook(sideEffect(), message) + // first element should be mutated again, once + assertEquals(2, array(0)) + // second element should be mutated again, too assertEquals(message, array(1)) + Exit.addShutdownHook(array(0) = array(0).asInstanceOf[Int] + 1) + // first element should be mutated again, once + assertEquals(3, array(0)) + // second element should be mutated again, too + assertEquals(None, array(1)) } finally { Exit.resetShutdownHookAdder() } @@ -112,16 +126,23 @@ class ExitTest { @Test def shouldNotInvokeShutdownHookImmediately(): Unit = { - val value = "a" + val value = "value" val array:Array[Any] = Array(value) - val runnable: Runnable = () => {} - try { - Exit.addShutdownHook(runnable) - assertEquals(value, array(0)) - Exit.addShutdownHook(runnable, Some("mesaage")) - assertEquals(value, array(0)) - } finally { - Exit.resetShutdownHookAdder() + + def sideEffect(): Unit = { + // mutate the first element + array(0) = array(0).toString + array(0).toString } + Exit.addShutdownHook(sideEffect) // by-name parameter, not invoked + // make sure the first element wasn't mutated + assertEquals(value, array(0)) + Exit.addShutdownHook(sideEffect()) // by-name parameter, not invoked + // again make sure the first element wasn't mutated + Exit.addShutdownHook(array(0) = array(0).toString + array(0).toString) // by-name parameter, not invoked + // again make sure the first element wasn't mutated + assertEquals(value, array(0)) + Exit.addShutdownHook(sideEffect, Some("message")) // by-name parameter, not invoked + // make sure the first element still isn't mutated + assertEquals(value, array(0)) } } diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala index cbb0fb75d8fcc..88986c1c117d0 100755 --- a/core/src/test/scala/other/kafka/StressTestLog.scala +++ b/core/src/test/scala/other/kafka/StressTestLog.scala @@ -57,7 +57,7 @@ object StressTestLog { val reader = new ReaderThread(log) reader.start() - Exit.addShutdownHook(() => { + Exit.addShutdownHook({ running.set(false) writer.join() reader.join() From f14afa8292b98346929689a4563e6614c0a58b83 Mon Sep 17 00:00:00 2001 From: Ron Dagostino Date: Tue, 21 Jan 2020 17:20:24 -0500 Subject: [PATCH 5/8] minor fixes for clarity --- core/src/main/scala/kafka/utils/Exit.scala | 4 ++-- core/src/test/scala/kafka/utils/ExitTest.scala | 9 +++++---- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/utils/Exit.scala b/core/src/main/scala/kafka/utils/Exit.scala index 44f0b95b60b00..a320e0f9f4238 100644 --- a/core/src/main/scala/kafka/utils/Exit.scala +++ b/core/src/main/scala/kafka/utils/Exit.scala @@ -34,8 +34,8 @@ object Exit { throw new AssertionError("halt should not return, but it did.") } - def addShutdownHook(code: => Unit, name: Option[String] = None): Unit = { - JExit.addShutdownHook(() => code, name.orNull) + def addShutdownHook(statementByName: => Unit, name: Option[String] = None): Unit = { + JExit.addShutdownHook(() => statementByName, name.orNull) } def setExitProcedure(exitProcedure: (Int, Option[String]) => Nothing): Unit = diff --git a/core/src/test/scala/kafka/utils/ExitTest.scala b/core/src/test/scala/kafka/utils/ExitTest.scala index eeb1617d9f499..547f7d6c77bef 100644 --- a/core/src/test/scala/kafka/utils/ExitTest.scala +++ b/core/src/test/scala/kafka/utils/ExitTest.scala @@ -90,10 +90,10 @@ class ExitTest { @Test def shouldAddShutdownHookImmediately(): Unit = { val array:Array[Any] = Array(0, Some("other thing")) - // immediately invoke the code to mutate the data when a hook is added - def shutdownHookAdder(code: => Unit, name: Option[String]) : Unit = { - // invoke the code (see below, it mutates the first element) - code + // immediately invoke the statement to mutate the data when a hook is added + def shutdownHookAdder(statementByName: => Unit, name: Option[String]) : Unit = { + // invoke the statement (see below, it mutates the first element) + statementByName // mutate the second element array(1) = name } @@ -138,6 +138,7 @@ class ExitTest { assertEquals(value, array(0)) Exit.addShutdownHook(sideEffect()) // by-name parameter, not invoked // again make sure the first element wasn't mutated + assertEquals(value, array(0)) Exit.addShutdownHook(array(0) = array(0).toString + array(0).toString) // by-name parameter, not invoked // again make sure the first element wasn't mutated assertEquals(value, array(0)) From 2dd54dfa6b3b7cd1685c6fb228205d46f335ed9b Mon Sep 17 00:00:00 2001 From: Ron Dagostino Date: Tue, 21 Jan 2020 21:42:59 -0500 Subject: [PATCH 6/8] name required as first parameter, then shutdownHook --- .../org/apache/kafka/common/utils/Exit.java | 10 +-- .../apache/kafka/common/utils/ExitTest.java | 10 +-- core/src/main/scala/kafka/Kafka.scala | 2 +- .../scala/kafka/tools/ConsoleConsumer.scala | 2 +- .../scala/kafka/tools/ConsoleProducer.scala | 2 +- .../main/scala/kafka/tools/MirrorMaker.scala | 2 +- .../kafka/tools/ReplicaVerificationTool.scala | 2 +- core/src/main/scala/kafka/utils/Exit.scala | 10 +-- .../kafka/security/minikdc/MiniKdc.scala | 2 +- .../src/test/scala/kafka/utils/ExitTest.scala | 62 ++++++++----------- .../scala/other/kafka/StressTestLog.scala | 2 +- 11 files changed, 47 insertions(+), 59 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Exit.java b/clients/src/main/java/org/apache/kafka/common/utils/Exit.java index bfad456bba670..6c5ab61df9af9 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Exit.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Exit.java @@ -27,7 +27,7 @@ public interface Procedure { } public interface ShutdownHookAdder { - void addShutdownHook(Runnable runnable, String name); + void addShutdownHook(String name, Runnable runnable); } private static final Procedure DEFAULT_HALT_PROCEDURE = new Procedure() { @@ -46,7 +46,7 @@ public void execute(int statusCode, String message) { private static final ShutdownHookAdder DEFAULT_SHUTDOWN_HOOK_ADDER = new ShutdownHookAdder() { @Override - public void addShutdownHook(Runnable runnable, String name) { + public void addShutdownHook(String name, Runnable runnable) { if (name != null) Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable)); else @@ -75,11 +75,11 @@ public static void halt(int statusCode, String message) { } public static void addShutdownHook(Runnable runnable) { - addShutdownHook(runnable, null); + addShutdownHook(null, runnable); } - public static void addShutdownHook(Runnable runnable, String name) { - shutdownHookAdder.addShutdownHook(runnable, name); + public static void addShutdownHook(String name, Runnable runnable) { + shutdownHookAdder.addShutdownHook(name, runnable); } public static void setExitProcedure(Procedure procedure) { diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ExitTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ExitTest.java index dc3b307877a65..1663765c5293d 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/ExitTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/ExitTest.java @@ -64,16 +64,16 @@ public void shouldExitImmediately() { @Test public void shouldAddShutdownHookImmediately() { List list = new ArrayList<>(); - Exit.setShutdownHookAdder((runnable, name) -> { - list.add(runnable); + Exit.setShutdownHookAdder((name, runnable) -> { list.add(name); + list.add(runnable); }); try { Runnable runnable = () -> { }; String name = "name"; Exit.addShutdownHook(runnable); - Exit.addShutdownHook(runnable, name); - assertEquals(Arrays.asList(runnable, null, runnable, name), list); + Exit.addShutdownHook(name, runnable); + assertEquals(Arrays.asList(null, runnable, name, runnable), list); } finally { Exit.resetShutdownHookAdder(); } @@ -85,7 +85,7 @@ public void shouldNotInvokeShutdownHookImmediately() { Runnable runnable = () -> list.add(this); Exit.addShutdownHook(runnable); assertEquals(0, list.size()); - Exit.addShutdownHook(runnable, "messge"); + Exit.addShutdownHook("message", runnable); assertEquals(0, list.size()); } } diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala index 40f11b44a6861..6842cd72bc679 100755 --- a/core/src/main/scala/kafka/Kafka.scala +++ b/core/src/main/scala/kafka/Kafka.scala @@ -77,7 +77,7 @@ object Kafka extends Logging { } // attach shutdown handler to catch terminating signals as well as normal termination - Exit.addShutdownHook(kafkaServerStartable.shutdown, Some("kafka-shutdown-hook")) + Exit.addShutdownHook("kafka-shutdown-hook", kafkaServerStartable.shutdown) kafkaServerStartable.startup() kafkaServerStartable.awaitShutdown() diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index bfcd4148fd7eb..691cde5697e7e 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -85,7 +85,7 @@ object ConsoleConsumer extends Logging { } def addShutdownHook(consumer: ConsumerWrapper, conf: ConsumerConfig): Unit = { - Exit.addShutdownHook({ + Exit.addShutdownHook("consumer-shutdown-hook", { consumer.wakeup() shutdownLatch.await() diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index 5d0f67599c5f4..c857f700d0488 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -44,7 +44,7 @@ object ConsoleProducer { val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps(config)) - Exit.addShutdownHook(producer.close) + Exit.addShutdownHook("producer-shutdown-hook", producer.close) var record: ProducerRecord[Array[Byte], Array[Byte]] = null do { diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 9c5132ddcbc9e..87409f3fe9992 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -515,7 +515,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { offsetCommitIntervalMs = options.valueOf(offsetCommitIntervalMsOpt).intValue() val numStreams = options.valueOf(numStreamsOpt).intValue() - Exit.addShutdownHook(cleanShutdown(), Some("MirrorMakerShutdownHook")) + Exit.addShutdownHook("MirrorMakerShutdownHook", cleanShutdown()) // create producer val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt)) diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index 0ce5709dbce68..33d5256e5f6e2 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -200,7 +200,7 @@ object ReplicaVerificationTool extends Logging { fetcherId = counter.incrementAndGet()) } - Exit.addShutdownHook({ + Exit.addShutdownHook("ReplicaVerificationToolShutdownHook", { info("Stopping all fetchers") fetcherThreads.foreach(_.shutdown()) }) diff --git a/core/src/main/scala/kafka/utils/Exit.scala b/core/src/main/scala/kafka/utils/Exit.scala index a320e0f9f4238..d6f0b8b77681a 100644 --- a/core/src/main/scala/kafka/utils/Exit.scala +++ b/core/src/main/scala/kafka/utils/Exit.scala @@ -34,8 +34,8 @@ object Exit { throw new AssertionError("halt should not return, but it did.") } - def addShutdownHook(statementByName: => Unit, name: Option[String] = None): Unit = { - JExit.addShutdownHook(() => statementByName, name.orNull) + def addShutdownHook(name: String, shutdownHook: => Unit): Unit = { + JExit.addShutdownHook(name, () => shutdownHook) } def setExitProcedure(exitProcedure: (Int, Option[String]) => Nothing): Unit = @@ -44,7 +44,7 @@ object Exit { def setHaltProcedure(haltProcedure: (Int, Option[String]) => Nothing): Unit = JExit.setHaltProcedure(functionToProcedure(haltProcedure)) - def setShutdownHookAdder(shutdownHookAdder: (=> Unit, Option[String]) => Unit): Unit = { + def setShutdownHookAdder(shutdownHookAdder: (String, => Unit) => Unit): Unit = { JExit.setShutdownHookAdder(functionToShutdownHookAdder(shutdownHookAdder)) } @@ -61,7 +61,7 @@ object Exit { def execute(statusCode: Int, message: String): Unit = procedure(statusCode, Option(message)) } - private def functionToShutdownHookAdder(procedure: (=> Unit, Option[String]) => Unit) = new JExit.ShutdownHookAdder { - def addShutdownHook(runnable: Runnable, name: String): Unit = procedure(runnable.run, Option(name)) + private def functionToShutdownHookAdder(procedure: (String, => Unit) => Unit) = new JExit.ShutdownHookAdder { + def addShutdownHook(name: String, runnable: Runnable): Unit = procedure(name, runnable.run) } } diff --git a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala index ce3c13f912857..17d6cb84b6363 100644 --- a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala +++ b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala @@ -390,7 +390,7 @@ object MiniKdc { | """.stripMargin println(infoMessage) - Exit.addShutdownHook(miniKdc.stop, Some("minikdc-shutdown-hook")) + Exit.addShutdownHook("minikdc-shutdown-hook", miniKdc.stop) miniKdc } diff --git a/core/src/test/scala/kafka/utils/ExitTest.scala b/core/src/test/scala/kafka/utils/ExitTest.scala index 547f7d6c77bef..50e46bb59cf60 100644 --- a/core/src/test/scala/kafka/utils/ExitTest.scala +++ b/core/src/test/scala/kafka/utils/ExitTest.scala @@ -89,36 +89,27 @@ class ExitTest { @Test def shouldAddShutdownHookImmediately(): Unit = { - val array:Array[Any] = Array(0, Some("other thing")) - // immediately invoke the statement to mutate the data when a hook is added - def shutdownHookAdder(statementByName: => Unit, name: Option[String]) : Unit = { - // invoke the statement (see below, it mutates the first element) - statementByName - // mutate the second element - array(1) = name + val name = "name" + val array:Array[Any] = Array("", 0) + // immediately invoke the shutdown hook to mutate the data when a hook is added + def shutdownHookAdder(name: String, shutdownHook: => Unit) : Unit = { + // mutate the first element + array(0) = array(0).toString + name + // invoke the shutdown hook (see below, it mutates the second element) + shutdownHook } Exit.setShutdownHookAdder(shutdownHookAdder) def sideEffect(): Unit = { - // mutate the first element - array(0) = array(0).asInstanceOf[Int] + 1 + // mutate the second element + array(1) = array(1).asInstanceOf[Int] + 1 } - val message = Some("message") try { - Exit.addShutdownHook(sideEffect) - // first element should be mutated once - assertEquals(1, array(0)) - // second element should be mutated as well - assertEquals(None, array(1)) - Exit.addShutdownHook(sideEffect(), message) - // first element should be mutated again, once - assertEquals(2, array(0)) - // second element should be mutated again, too - assertEquals(message, array(1)) - Exit.addShutdownHook(array(0) = array(0).asInstanceOf[Int] + 1) - // first element should be mutated again, once - assertEquals(3, array(0)) - // second element should be mutated again, too - assertEquals(None, array(1)) + Exit.addShutdownHook(name, sideEffect) // by-name parameter, only invoked due to above shutdownHookAdder + assertEquals(1, array(1)) + assertEquals(name * array(1).asInstanceOf[Int], array(0).toString) + Exit.addShutdownHook(name, array(1) = array(1).asInstanceOf[Int] + 1) // by-name parameter, only invoked due to above shutdownHookAdder + assertEquals(2, array(1)) + assertEquals(name * array(1).asInstanceOf[Int], array(0).toString) } finally { Exit.resetShutdownHookAdder() } @@ -126,24 +117,21 @@ class ExitTest { @Test def shouldNotInvokeShutdownHookImmediately(): Unit = { - val value = "value" - val array:Array[Any] = Array(value) + val name = "name" + val array:Array[String] = Array(name) def sideEffect(): Unit = { // mutate the first element - array(0) = array(0).toString + array(0).toString + array(0) = array(0) + name } - Exit.addShutdownHook(sideEffect) // by-name parameter, not invoked + Exit.addShutdownHook(name, sideEffect) // by-name parameter, not invoked // make sure the first element wasn't mutated - assertEquals(value, array(0)) - Exit.addShutdownHook(sideEffect()) // by-name parameter, not invoked + assertEquals(name, array(0)) + Exit.addShutdownHook(name, sideEffect()) // by-name parameter, not invoked // again make sure the first element wasn't mutated - assertEquals(value, array(0)) - Exit.addShutdownHook(array(0) = array(0).toString + array(0).toString) // by-name parameter, not invoked + assertEquals(name, array(0)) + Exit.addShutdownHook(name, array(0) = array(0) + name) // by-name parameter, not invoked // again make sure the first element wasn't mutated - assertEquals(value, array(0)) - Exit.addShutdownHook(sideEffect, Some("message")) // by-name parameter, not invoked - // make sure the first element still isn't mutated - assertEquals(value, array(0)) + assertEquals(name, array(0)) } } diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala index 88986c1c117d0..b411519fd4513 100755 --- a/core/src/test/scala/other/kafka/StressTestLog.scala +++ b/core/src/test/scala/other/kafka/StressTestLog.scala @@ -57,7 +57,7 @@ object StressTestLog { val reader = new ReaderThread(log) reader.start() - Exit.addShutdownHook({ + Exit.addShutdownHook("stress-test-shutdown-hook", { running.set(false) writer.join() reader.join() From 63bd6b64dd60e151302fa4dcd9f6ad90177b4fda Mon Sep 17 00:00:00 2001 From: Ron Dagostino Date: Thu, 23 Jan 2020 17:29:59 -0500 Subject: [PATCH 7/8] Always provide a shutdown hook thread name --- .../src/main/java/org/apache/kafka/common/utils/Exit.java | 4 ---- .../test/java/org/apache/kafka/common/utils/ExitTest.java | 5 +---- clients/src/test/java/org/apache/kafka/test/TestUtils.java | 2 +- .../java/org/apache/kafka/connect/mirror/MirrorMaker.java | 2 +- .../main/java/org/apache/kafka/connect/runtime/Connect.java | 2 +- .../java/org/apache/kafka/streams/tests/EosTestClient.java | 2 +- .../java/org/apache/kafka/streams/tests/EosTestDriver.java | 2 +- .../org/apache/kafka/streams/tests/ShutdownDeadlockTest.java | 2 +- .../java/org/apache/kafka/streams/tests/SmokeTestClient.java | 2 +- .../apache/kafka/streams/tests/StaticMemberTestClient.java | 2 +- .../kafka/streams/tests/StreamsBrokerDownResilienceTest.java | 2 +- .../kafka/streams/tests/StreamsNamedRepartitionTest.java | 2 +- .../org/apache/kafka/streams/tests/StreamsOptimizedTest.java | 2 +- .../kafka/streams/tests/StreamsStandByReplicaTest.java | 2 +- .../org/apache/kafka/streams/tests/StreamsUpgradeTest.java | 2 +- .../tests/StreamsUpgradeToCooperativeRebalanceTest.java | 2 +- .../org/apache/kafka/tools/TransactionalMessageCopier.java | 2 +- .../main/java/org/apache/kafka/tools/VerifiableConsumer.java | 2 +- .../java/org/apache/kafka/tools/VerifiableLog4jAppender.java | 2 +- .../main/java/org/apache/kafka/tools/VerifiableProducer.java | 2 +- .../src/main/java/org/apache/kafka/trogdor/agent/Agent.java | 2 +- .../org/apache/kafka/trogdor/coordinator/Coordinator.java | 2 +- 22 files changed, 21 insertions(+), 28 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Exit.java b/clients/src/main/java/org/apache/kafka/common/utils/Exit.java index 6c5ab61df9af9..45f92e41e46c6 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Exit.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Exit.java @@ -74,10 +74,6 @@ public static void halt(int statusCode, String message) { haltProcedure.execute(statusCode, message); } - public static void addShutdownHook(Runnable runnable) { - addShutdownHook(null, runnable); - } - public static void addShutdownHook(String name, Runnable runnable) { shutdownHookAdder.addShutdownHook(name, runnable); } diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ExitTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ExitTest.java index 1663765c5293d..1377c99b54178 100644 --- a/clients/src/test/java/org/apache/kafka/common/utils/ExitTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/ExitTest.java @@ -71,9 +71,8 @@ public void shouldAddShutdownHookImmediately() { try { Runnable runnable = () -> { }; String name = "name"; - Exit.addShutdownHook(runnable); Exit.addShutdownHook(name, runnable); - assertEquals(Arrays.asList(null, runnable, name, runnable), list); + assertEquals(Arrays.asList(name, runnable), list); } finally { Exit.resetShutdownHookAdder(); } @@ -83,8 +82,6 @@ public void shouldAddShutdownHookImmediately() { public void shouldNotInvokeShutdownHookImmediately() { List list = new ArrayList<>(); Runnable runnable = () -> list.add(this); - Exit.addShutdownHook(runnable); - assertEquals(0, list.size()); Exit.addShutdownHook("message", runnable); assertEquals(0, list.size()); } diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index 0425dc2a23fff..5782b6d7cfc04 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -263,7 +263,7 @@ public static File tempDirectory(final Path parent, String prefix) { } file.deleteOnExit(); - Exit.addShutdownHook(() -> { + Exit.addShutdownHook("delete-temp-file-shutdown-hook", () -> { try { Utils.delete(file); } catch (IOException e) { diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java index e2b486ced3d74..07ad4fdae94da 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java @@ -164,7 +164,7 @@ public void start() { } startLatch = new CountDownLatch(herders.size()); stopLatch = new CountDownLatch(herders.size()); - Exit.addShutdownHook(shutdownHook); + Exit.addShutdownHook("mirror-maker-shutdown-hook", shutdownHook); for (Herder herder : herders.values()) { try { herder.start(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java index 833d3a2fb2625..f08586f3a027e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java @@ -49,7 +49,7 @@ public Connect(Herder herder, RestServer rest) { public void start() { try { log.info("Kafka Connect starting"); - Exit.addShutdownHook(shutdownHook); + Exit.addShutdownHook("connect-shutdown-hook", shutdownHook); herder.start(); rest.initializeResources(herder); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java index 0f4aeaab894f5..6e7afa8a6c2d9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java @@ -54,7 +54,7 @@ public class EosTestClient extends SmokeTestUtil { private volatile boolean isRunning = true; public void start() { - Exit.addShutdownHook(() -> { + Exit.addShutdownHook("streams-shutdown-hook", () -> { isRunning = false; streams.close(Duration.ofSeconds(300)); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java index 3c84447b98da8..7dc006f092096 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java @@ -69,7 +69,7 @@ private static synchronized void updateNumRecordsProduces(final int delta) { static void generate(final String kafka) { - Exit.addShutdownHook(() -> { + Exit.addShutdownHook("streams-eos-test-driver-shutdown-hook", () -> { System.out.println("Terminating"); System.out.flush(); isRunning = false; diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java index 77d36a6cc1348..bb923ba1caaa9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java @@ -62,7 +62,7 @@ public void uncaughtException(final Thread t, final Throwable e) { } }); - Exit.addShutdownHook(() -> streams.close(Duration.ofSeconds(5))); + Exit.addShutdownHook("streams-shutdown-hook", () -> streams.close(Duration.ofSeconds(5))); final Properties producerProps = new Properties(); producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest"); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java index e7fe7a77c6f45..db243fdd849c9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java @@ -77,7 +77,7 @@ public void start(final Properties streamsProperties) { e.printStackTrace(); }); - Exit.addShutdownHook(() -> close()); + Exit.addShutdownHook("streams-shutdown-hook", () -> close()); thread = new Thread(() -> streams.start()); thread.start(); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberTestClient.java index 3e4ceed657325..8c7c5cd56476d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberTestClient.java @@ -71,7 +71,7 @@ public static void main(final String[] args) throws Exception { streams.start(); - Exit.addShutdownHook(() -> { + Exit.addShutdownHook("streams-shutdown-hook", () -> { System.out.println("closing Kafka Streams instance"); System.out.flush(); streams.close(); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java index 1137fee78212b..b3316247a89a6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java @@ -115,7 +115,7 @@ public void apply(final String key, final String value) { System.out.println("Start Kafka Streams"); streams.start(); - Exit.addShutdownHook(() -> { + Exit.addShutdownHook("streams-shutdown-hook", () -> { streams.close(Duration.ofSeconds(30)); System.out.println("Complete shutdown of streams resilience test app now"); System.out.flush(); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java index 88f0483440758..b98f86141a037 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java @@ -111,7 +111,7 @@ public static void main(final String[] args) throws Exception { streams.start(); - Exit.addShutdownHook(() -> { + Exit.addShutdownHook("streams-shutdown-hook", () -> { System.out.println("closing Kafka Streams instance"); System.out.flush(); streams.close(Duration.ofMillis(5000)); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java index 7579b02f8dd9e..afec99d03d76f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java @@ -132,7 +132,7 @@ public static void main(final String[] args) throws Exception { streams.start(); - Exit.addShutdownHook(() -> { + Exit.addShutdownHook("streams-shutdown-hook", () -> { System.out.println("closing Kafka Streams instance"); System.out.flush(); streams.close(Duration.ofMillis(5000)); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java index 2ad2ce5057889..2ac275fcb9d16 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java @@ -148,7 +148,7 @@ public static void main(final String[] args) throws IOException { System.out.println("Start Kafka Streams"); streams.start(); - Exit.addShutdownHook(() -> { + Exit.addShutdownHook("streams-shutdown-hook", () -> { shutdown(streams); System.out.println("Shut down streams now"); }); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index b9ed41e9e2647..73d7f076f4280 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -82,7 +82,7 @@ public static void main(final String[] args) throws Exception { final KafkaStreams streams = buildStreams(streamsProperties); streams.start(); - Exit.addShutdownHook(() -> { + Exit.addShutdownHook("streams-shutdown-hook", () -> { System.out.println("closing Kafka Streams instance"); System.out.flush(); streams.close(); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java index 06375a67de6f4..c5f6e6b3b0d9d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java @@ -109,7 +109,7 @@ public void apply(final String key, final String value) { streams.start(); - Exit.addShutdownHook(() -> { + Exit.addShutdownHook("streams-shutdown-hook", () -> { streams.close(); System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase)); System.out.flush(); diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java index 18f57efad01bb..1ea826d835caa 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java +++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java @@ -265,7 +265,7 @@ public static void main(String[] args) throws IOException { final AtomicBoolean isShuttingDown = new AtomicBoolean(false); final AtomicLong remainingMessages = new AtomicLong(maxMessages); final AtomicLong numMessagesProcessed = new AtomicLong(0); - Exit.addShutdownHook(() -> { + Exit.addShutdownHook("transactional-message-copier-shutdown-hook", () -> { isShuttingDown.set(true); // Flush any remaining messages producer.close(); diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java index 39eed9ba1883b..9cad90f39d034 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java @@ -644,7 +644,7 @@ public static void main(String[] args) { try { final VerifiableConsumer consumer = createFromArgs(parser, args); - Exit.addShutdownHook(() -> consumer.close()); + Exit.addShutdownHook("verifiable-consumer-shutdown-hook", () -> consumer.close()); consumer.run(); } catch (ArgumentParserException e) { diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java index fbeb148e8ecbe..534c21b682f90 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java @@ -242,7 +242,7 @@ public static void main(String[] args) throws IOException { boolean infinite = appender.maxMessages < 0; // Trigger main thread to stop producing messages when shutting down - Exit.addShutdownHook(() -> appender.stopLogging = true); + Exit.addShutdownHook("verifiable-log4j-appender-shutdown-hook", () -> appender.stopLogging = true); long maxMessages = infinite ? Long.MAX_VALUE : appender.maxMessages; for (long i = 0; i < maxMessages; i++) { diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java index 86926c4a53b0c..7dbd215161905 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java @@ -517,7 +517,7 @@ public static void main(String[] args) { final long startMs = System.currentTimeMillis(); ThroughputThrottler throttler = new ThroughputThrottler(producer.throughput, startMs); - Exit.addShutdownHook(() -> { + Exit.addShutdownHook("verifiable-producer-shutdown-hook", () -> { // Trigger main thread to stop producing messages producer.stopProducing = true; diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java index 4b6f8edc418c6..1674b2d92aaa5 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java @@ -249,7 +249,7 @@ public static void main(String[] args) throws Exception { log.info("Starting agent process."); final Agent agent = new Agent(platform, Scheduler.SYSTEM, restServer, resource); restServer.start(resource); - Exit.addShutdownHook(() -> { + Exit.addShutdownHook("agent-shutdown-hook", () -> { log.warn("Running agent shutdown hook."); try { agent.beginShutdown(); diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java index 33f28e2075193..47f80e5c7a15e 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java @@ -171,7 +171,7 @@ public static void main(String[] args) throws Exception { final Coordinator coordinator = new Coordinator(platform, Scheduler.SYSTEM, restServer, resource, ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2)); restServer.start(resource); - Exit.addShutdownHook(() -> { + Exit.addShutdownHook("coordinator-shutdown-hook", () -> { log.warn("Running coordinator shutdown hook."); try { coordinator.beginShutdown(false); From 78a0d9ed6ee0999008c3261bc9b356c4fdef80f2 Mon Sep 17 00:00:00 2001 From: Ron Dagostino Date: Fri, 24 Jan 2020 09:38:23 -0500 Subject: [PATCH 8/8] address review comments --- .../main/scala/kafka/tools/MirrorMaker.scala | 18 ++++--- core/src/main/scala/kafka/utils/Exit.scala | 6 +-- .../{MinikdcTest.scala => MiniKdcTest.scala} | 2 +- .../src/test/scala/kafka/utils/ExitTest.scala | 53 +++++++------------ 4 files changed, 33 insertions(+), 46 deletions(-) rename core/src/test/scala/kafka/security/minikdc/{MinikdcTest.scala => MiniKdcTest.scala} (98%) diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 87409f3fe9992..9540357918f71 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -532,7 +532,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { producer = new MirrorMakerProducer(sync, producerProps) // Create consumers - val customRebalanceListener = { + val customRebalanceListener: Option[ConsumerRebalanceListener] = { val customRebalanceListenerClass = options.valueOf(consumerRebalanceListenerOpt) if (customRebalanceListenerClass != null) { val rebalanceListenerArgs = options.valueOf(rebalanceListenerArgsOpt) @@ -540,7 +540,9 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { Some(CoreUtils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass, rebalanceListenerArgs)) else Some(CoreUtils.createObject[ConsumerRebalanceListener](customRebalanceListenerClass)) - } else None + } else { + None + } } val mirrorMakerConsumers = createConsumers( numStreams, @@ -556,10 +558,14 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { val customMessageHandlerClass = options.valueOf(messageHandlerOpt) val messageHandlerArgs = options.valueOf(messageHandlerArgsOpt) messageHandler = { - if (customMessageHandlerClass != null) if (messageHandlerArgs != null) - CoreUtils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass, messageHandlerArgs) - else - CoreUtils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass) else defaultMirrorMakerMessageHandler + if (customMessageHandlerClass != null) { + if (messageHandlerArgs != null) + CoreUtils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass, messageHandlerArgs) + else + CoreUtils.createObject[MirrorMakerMessageHandler](customMessageHandlerClass) + } else { + defaultMirrorMakerMessageHandler + } } } } diff --git a/core/src/main/scala/kafka/utils/Exit.scala b/core/src/main/scala/kafka/utils/Exit.scala index d6f0b8b77681a..ad17237571e56 100644 --- a/core/src/main/scala/kafka/utils/Exit.scala +++ b/core/src/main/scala/kafka/utils/Exit.scala @@ -45,7 +45,7 @@ object Exit { JExit.setHaltProcedure(functionToProcedure(haltProcedure)) def setShutdownHookAdder(shutdownHookAdder: (String, => Unit) => Unit): Unit = { - JExit.setShutdownHookAdder(functionToShutdownHookAdder(shutdownHookAdder)) + JExit.setShutdownHookAdder((name, runnable) => shutdownHookAdder(name, runnable.run)) } def resetExitProcedure(): Unit = @@ -60,8 +60,4 @@ object Exit { private def functionToProcedure(procedure: (Int, Option[String]) => Nothing) = new JExit.Procedure { def execute(statusCode: Int, message: String): Unit = procedure(statusCode, Option(message)) } - - private def functionToShutdownHookAdder(procedure: (String, => Unit) => Unit) = new JExit.ShutdownHookAdder { - def addShutdownHook(name: String, runnable: Runnable): Unit = procedure(name, runnable.run) - } } diff --git a/core/src/test/scala/kafka/security/minikdc/MinikdcTest.scala b/core/src/test/scala/kafka/security/minikdc/MiniKdcTest.scala similarity index 98% rename from core/src/test/scala/kafka/security/minikdc/MinikdcTest.scala rename to core/src/test/scala/kafka/security/minikdc/MiniKdcTest.scala index c1d918f577272..bfd973ddfbc84 100644 --- a/core/src/test/scala/kafka/security/minikdc/MinikdcTest.scala +++ b/core/src/test/scala/kafka/security/minikdc/MiniKdcTest.scala @@ -23,7 +23,7 @@ import kafka.utils.TestUtils import org.junit.Test import org.junit.Assert._ -class MinikdcTest { +class MiniKdcTest { @Test def shouldNotStopImmediatelyWhenStarted(): Unit = { val config = new Properties() diff --git a/core/src/test/scala/kafka/utils/ExitTest.scala b/core/src/test/scala/kafka/utils/ExitTest.scala index 50e46bb59cf60..902acaf5bd1e6 100644 --- a/core/src/test/scala/kafka/utils/ExitTest.scala +++ b/core/src/test/scala/kafka/utils/ExitTest.scala @@ -17,8 +17,11 @@ package kafka.utils +import java.io.IOException + import org.junit.Assert.assertEquals import org.junit.Test +import org.scalatest.Assertions.intercept class ExitTest { @Test @@ -27,28 +30,19 @@ class ExitTest { def haltProcedure(exitStatus: Int, message: Option[String]) : Nothing = { array(0) = exitStatus array(1) = message - throw new Exception() + throw new IOException() } Exit.setHaltProcedure(haltProcedure) val statusCode = 0 val message = Some("message") try { - try { - Exit.halt(statusCode) - } catch { - case e: Exception => { - assertEquals(statusCode, array(0)) - assertEquals(None, array(1)) - } - } - try { - Exit.halt(statusCode, message) - } catch { - case e: Exception => { - assertEquals(statusCode, array(0)) - assertEquals(message, array(1)) - } - } + intercept[IOException] {Exit.halt(statusCode)} + assertEquals(statusCode, array(0)) + assertEquals(None, array(1)) + + intercept[IOException] {Exit.halt(statusCode, message)} + assertEquals(statusCode, array(0)) + assertEquals(message, array(1)) } finally { Exit.resetHaltProcedure() } @@ -60,28 +54,19 @@ class ExitTest { def exitProcedure(exitStatus: Int, message: Option[String]) : Nothing = { array(0) = exitStatus array(1) = message - throw new Exception() + throw new IOException() } Exit.setExitProcedure(exitProcedure) val statusCode = 0 val message = Some("message") try { - try { - Exit.exit(statusCode) - } catch { - case e: Exception => { - assertEquals(statusCode, array(0)) - assertEquals(None, array(1)) - } - } - try { - Exit.exit(statusCode, message) - } catch { - case e: Exception => { - assertEquals(statusCode, array(0)) - assertEquals(message, array(1)) - } - } + intercept[IOException] {Exit.exit(statusCode)} + assertEquals(statusCode, array(0)) + assertEquals(None, array(1)) + + intercept[IOException] {Exit.exit(statusCode, message)} + assertEquals(statusCode, array(0)) + assertEquals(message, array(1)) } finally { Exit.resetExitProcedure() }