diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Exit.java b/clients/src/main/java/org/apache/kafka/common/utils/Exit.java index 7c2a663166e27..45f92e41e46c6 100644 --- a/clients/src/main/java/org/apache/kafka/common/utils/Exit.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Exit.java @@ -26,6 +26,10 @@ public interface Procedure { void execute(int statusCode, String message); } + public interface ShutdownHookAdder { + void addShutdownHook(String name, Runnable runnable); + } + private static final Procedure DEFAULT_HALT_PROCEDURE = new Procedure() { @Override public void execute(int statusCode, String message) { @@ -40,8 +44,19 @@ public void execute(int statusCode, String message) { } }; + private static final ShutdownHookAdder DEFAULT_SHUTDOWN_HOOK_ADDER = new ShutdownHookAdder() { + @Override + public void addShutdownHook(String name, Runnable runnable) { + if (name != null) + Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable)); + else + Runtime.getRuntime().addShutdownHook(new Thread(runnable)); + } + }; + private volatile static Procedure exitProcedure = DEFAULT_EXIT_PROCEDURE; private volatile static Procedure haltProcedure = DEFAULT_HALT_PROCEDURE; + private volatile static ShutdownHookAdder shutdownHookAdder = DEFAULT_SHUTDOWN_HOOK_ADDER; public static void exit(int statusCode) { exit(statusCode, null); @@ -59,6 +74,10 @@ public static void halt(int statusCode, String message) { haltProcedure.execute(statusCode, message); } + public static void addShutdownHook(String name, Runnable runnable) { + shutdownHookAdder.addShutdownHook(name, runnable); + } + public static void setExitProcedure(Procedure procedure) { exitProcedure = procedure; } @@ -67,6 +86,10 @@ public static void setHaltProcedure(Procedure procedure) { haltProcedure = procedure; } + public static void setShutdownHookAdder(ShutdownHookAdder shutdownHookAdder) { + Exit.shutdownHookAdder = shutdownHookAdder; + } + public static void resetExitProcedure() { exitProcedure = DEFAULT_EXIT_PROCEDURE; } @@ -75,4 +98,7 @@ public static void resetHaltProcedure() { haltProcedure = DEFAULT_HALT_PROCEDURE; } + public static void resetShutdownHookAdder() { + shutdownHookAdder = DEFAULT_SHUTDOWN_HOOK_ADDER; + } } diff --git a/clients/src/test/java/org/apache/kafka/common/utils/ExitTest.java b/clients/src/test/java/org/apache/kafka/common/utils/ExitTest.java new file mode 100644 index 0000000000000..1377c99b54178 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/utils/ExitTest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.utils; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +public class ExitTest { + @Test + public void shouldHaltImmediately() { + List list = new ArrayList<>(); + Exit.setHaltProcedure((statusCode, message) -> { + list.add(statusCode); + list.add(message); + }); + try { + int statusCode = 0; + String message = "mesaage"; + Exit.halt(statusCode); + Exit.halt(statusCode, message); + assertEquals(Arrays.asList(statusCode, null, statusCode, message), list); + } finally { + Exit.resetHaltProcedure(); + } + } + + @Test + public void shouldExitImmediately() { + List list = new ArrayList<>(); + Exit.setExitProcedure((statusCode, message) -> { + list.add(statusCode); + list.add(message); + }); + try { + int statusCode = 0; + String message = "mesaage"; + Exit.exit(statusCode); + Exit.exit(statusCode, message); + assertEquals(Arrays.asList(statusCode, null, statusCode, message), list); + } finally { + Exit.resetExitProcedure(); + } + } + + @Test + public void shouldAddShutdownHookImmediately() { + List list = new ArrayList<>(); + Exit.setShutdownHookAdder((name, runnable) -> { + list.add(name); + list.add(runnable); + }); + try { + Runnable runnable = () -> { }; + String name = "name"; + Exit.addShutdownHook(name, runnable); + assertEquals(Arrays.asList(name, runnable), list); + } finally { + Exit.resetShutdownHookAdder(); + } + } + + @Test + public void shouldNotInvokeShutdownHookImmediately() { + List list = new ArrayList<>(); + Runnable runnable = () -> list.add(this); + Exit.addShutdownHook("message", runnable); + assertEquals(0, list.size()); + } +} diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index a53d1fcf2a52f..5782b6d7cfc04 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -29,6 +29,7 @@ import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.requests.RequestHeader; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -262,14 +263,11 @@ public static File tempDirectory(final Path parent, String prefix) { } file.deleteOnExit(); - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - try { - Utils.delete(file); - } catch (IOException e) { - log.error("Error deleting {}", file.getAbsolutePath(), e); - } + Exit.addShutdownHook("delete-temp-file-shutdown-hook", () -> { + try { + Utils.delete(file); + } catch (IOException e) { + log.error("Error deleting {}", file.getAbsolutePath(), e); } }); diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java index d635b1c8c3e62..07ad4fdae94da 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMaker.java @@ -164,7 +164,7 @@ public void start() { } startLatch = new CountDownLatch(herders.size()); stopLatch = new CountDownLatch(herders.size()); - Runtime.getRuntime().addShutdownHook(shutdownHook); + Exit.addShutdownHook("mirror-maker-shutdown-hook", shutdownHook); for (Herder herder : herders.values()) { try { herder.start(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java index 773869a5ba21a..f08586f3a027e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.connect.runtime; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.connect.runtime.rest.RestServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +49,7 @@ public Connect(Herder herder, RestServer rest) { public void start() { try { log.info("Kafka Connect starting"); - Runtime.getRuntime().addShutdownHook(shutdownHook); + Exit.addShutdownHook("connect-shutdown-hook", shutdownHook); herder.start(); rest.initializeResources(herder); diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala index 5950b7145181b..6842cd72bc679 100755 --- a/core/src/main/scala/kafka/Kafka.scala +++ b/core/src/main/scala/kafka/Kafka.scala @@ -77,9 +77,7 @@ object Kafka extends Logging { } // attach shutdown handler to catch terminating signals as well as normal termination - Runtime.getRuntime().addShutdownHook(new Thread("kafka-shutdown-hook") { - override def run(): Unit = kafkaServerStartable.shutdown() - }) + Exit.addShutdownHook("kafka-shutdown-hook", kafkaServerStartable.shutdown) kafkaServerStartable.startup() kafkaServerStartable.awaitShutdown() diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 2e3e4f73a38cb..691cde5697e7e 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -28,7 +28,7 @@ import com.typesafe.scalalogging.LazyLogging import joptsimple._ import kafka.common.MessageFormatter import kafka.utils.Implicits._ -import kafka.utils._ +import kafka.utils.{Exit, _} import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerRecord, KafkaConsumer} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.{AuthenticationException, TimeoutException, WakeupException} @@ -85,8 +85,7 @@ object ConsoleConsumer extends Logging { } def addShutdownHook(consumer: ConsumerWrapper, conf: ConsumerConfig): Unit = { - Runtime.getRuntime.addShutdownHook(new Thread() { - override def run(): Unit = { + Exit.addShutdownHook("consumer-shutdown-hook", { consumer.wakeup() shutdownLatch.await() @@ -94,7 +93,6 @@ object ConsoleConsumer extends Logging { if (conf.enableSystestEventsLogging) { System.out.println("shutdown_complete") } - } }) } diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index 2043d3524fc36..c857f700d0488 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -44,11 +44,7 @@ object ConsoleProducer { val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps(config)) - Runtime.getRuntime.addShutdownHook(new Thread() { - override def run(): Unit = { - producer.close() - } - }) + Exit.addShutdownHook("producer-shutdown-hook", producer.close) var record: ProducerRecord[Array[Byte], Array[Byte]] = null do { diff --git a/core/src/main/scala/kafka/tools/MirrorMaker.scala b/core/src/main/scala/kafka/tools/MirrorMaker.scala index 9a6ecfd3b07b5..9540357918f71 100755 --- a/core/src/main/scala/kafka/tools/MirrorMaker.scala +++ b/core/src/main/scala/kafka/tools/MirrorMaker.scala @@ -515,11 +515,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup { offsetCommitIntervalMs = options.valueOf(offsetCommitIntervalMsOpt).intValue() val numStreams = options.valueOf(numStreamsOpt).intValue() - Runtime.getRuntime.addShutdownHook(new Thread("MirrorMakerShutdownHook") { - override def run(): Unit = { - cleanShutdown() - } - }) + Exit.addShutdownHook("MirrorMakerShutdownHook", cleanShutdown()) // create producer val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt)) diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index 8b8d681502fb5..33d5256e5f6e2 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -200,11 +200,9 @@ object ReplicaVerificationTool extends Logging { fetcherId = counter.incrementAndGet()) } - Runtime.getRuntime.addShutdownHook(new Thread() { - override def run(): Unit = { + Exit.addShutdownHook("ReplicaVerificationToolShutdownHook", { info("Stopping all fetchers") fetcherThreads.foreach(_.shutdown()) - } }) fetcherThreads.foreach(_.start()) println(s"${ReplicaVerificationTool.getCurrentTimeString()}: verification process is started.") diff --git a/core/src/main/scala/kafka/utils/Exit.scala b/core/src/main/scala/kafka/utils/Exit.scala index 5819e97076f0a..ad17237571e56 100644 --- a/core/src/main/scala/kafka/utils/Exit.scala +++ b/core/src/main/scala/kafka/utils/Exit.scala @@ -34,20 +34,30 @@ object Exit { throw new AssertionError("halt should not return, but it did.") } + def addShutdownHook(name: String, shutdownHook: => Unit): Unit = { + JExit.addShutdownHook(name, () => shutdownHook) + } + def setExitProcedure(exitProcedure: (Int, Option[String]) => Nothing): Unit = JExit.setExitProcedure(functionToProcedure(exitProcedure)) def setHaltProcedure(haltProcedure: (Int, Option[String]) => Nothing): Unit = JExit.setHaltProcedure(functionToProcedure(haltProcedure)) + def setShutdownHookAdder(shutdownHookAdder: (String, => Unit) => Unit): Unit = { + JExit.setShutdownHookAdder((name, runnable) => shutdownHookAdder(name, runnable.run)) + } + def resetExitProcedure(): Unit = JExit.resetExitProcedure() def resetHaltProcedure(): Unit = JExit.resetHaltProcedure() + def resetShutdownHookAdder(): Unit = + JExit.resetShutdownHookAdder() + private def functionToProcedure(procedure: (Int, Option[String]) => Nothing) = new JExit.Procedure { def execute(statusCode: Int, message: String): Unit = procedure(statusCode, Option(message)) } - } diff --git a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala index f3b8cda96f807..17d6cb84b6363 100644 --- a/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala +++ b/core/src/test/scala/kafka/security/minikdc/MiniKdc.scala @@ -49,7 +49,7 @@ import org.apache.directory.server.kerberos.shared.keytab.{Keytab, KeytabEntry} import org.apache.directory.server.protocol.shared.transport.{TcpTransport, UdpTransport} import org.apache.directory.server.xdbm.Index import org.apache.directory.shared.kerberos.KerberosTime -import org.apache.kafka.common.utils.{Java, KafkaThread, Utils} +import org.apache.kafka.common.utils.{Java, Utils} /** * Mini KDC based on Apache Directory Server that can be embedded in tests or used from command line as a standalone @@ -370,7 +370,7 @@ object MiniKdc { } } - private def start(workDir: File, config: Properties, keytabFile: File, principals: Seq[String]): Unit = { + private[minikdc] def start(workDir: File, config: Properties, keytabFile: File, principals: Seq[String]): MiniKdc = { val miniKdc = new MiniKdc(config, workDir) miniKdc.start() miniKdc.createPrincipal(keytabFile, principals: _*) @@ -390,9 +390,8 @@ object MiniKdc { | """.stripMargin println(infoMessage) - Runtime.getRuntime.addShutdownHook(new KafkaThread("minikdc-shutdown-hook", false) { - miniKdc.stop() - }) + Exit.addShutdownHook("minikdc-shutdown-hook", miniKdc.stop) + miniKdc } val OrgName = "org.name" diff --git a/core/src/test/scala/kafka/security/minikdc/MiniKdcTest.scala b/core/src/test/scala/kafka/security/minikdc/MiniKdcTest.scala new file mode 100644 index 0000000000000..bfd973ddfbc84 --- /dev/null +++ b/core/src/test/scala/kafka/security/minikdc/MiniKdcTest.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.security.minikdc + +import java.util.Properties + +import kafka.utils.TestUtils +import org.junit.Test +import org.junit.Assert._ + +class MiniKdcTest { + @Test + def shouldNotStopImmediatelyWhenStarted(): Unit = { + val config = new Properties() + config.setProperty("kdc.bind.address", "0.0.0.0") + config.setProperty("transport", "TCP"); + config.setProperty("max.ticket.lifetime", "86400000") + config.setProperty("org.name", "Example") + config.setProperty("kdc.port", "0") + config.setProperty("org.domain", "COM") + config.setProperty("max.renewable.lifetime", "604800000") + config.setProperty("instance", "DefaultKrbServer") + val minikdc = MiniKdc.start(TestUtils.tempDir(), config, TestUtils.tempFile(), List("foo")) + val running = System.getProperty(MiniKdc.JavaSecurityKrb5Conf) != null + try { + assertTrue("MiniKdc stopped immediately; it should not have", running) + } finally { + if (running) minikdc.stop() + } + } +} \ No newline at end of file diff --git a/core/src/test/scala/kafka/utils/ExitTest.scala b/core/src/test/scala/kafka/utils/ExitTest.scala new file mode 100644 index 0000000000000..902acaf5bd1e6 --- /dev/null +++ b/core/src/test/scala/kafka/utils/ExitTest.scala @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.utils + +import java.io.IOException + +import org.junit.Assert.assertEquals +import org.junit.Test +import org.scalatest.Assertions.intercept + +class ExitTest { + @Test + def shouldHaltImmediately(): Unit = { + val array:Array[Any] = Array("a", "b") + def haltProcedure(exitStatus: Int, message: Option[String]) : Nothing = { + array(0) = exitStatus + array(1) = message + throw new IOException() + } + Exit.setHaltProcedure(haltProcedure) + val statusCode = 0 + val message = Some("message") + try { + intercept[IOException] {Exit.halt(statusCode)} + assertEquals(statusCode, array(0)) + assertEquals(None, array(1)) + + intercept[IOException] {Exit.halt(statusCode, message)} + assertEquals(statusCode, array(0)) + assertEquals(message, array(1)) + } finally { + Exit.resetHaltProcedure() + } + } + + @Test + def shouldExitImmediately(): Unit = { + val array:Array[Any] = Array("a", "b") + def exitProcedure(exitStatus: Int, message: Option[String]) : Nothing = { + array(0) = exitStatus + array(1) = message + throw new IOException() + } + Exit.setExitProcedure(exitProcedure) + val statusCode = 0 + val message = Some("message") + try { + intercept[IOException] {Exit.exit(statusCode)} + assertEquals(statusCode, array(0)) + assertEquals(None, array(1)) + + intercept[IOException] {Exit.exit(statusCode, message)} + assertEquals(statusCode, array(0)) + assertEquals(message, array(1)) + } finally { + Exit.resetExitProcedure() + } + } + + @Test + def shouldAddShutdownHookImmediately(): Unit = { + val name = "name" + val array:Array[Any] = Array("", 0) + // immediately invoke the shutdown hook to mutate the data when a hook is added + def shutdownHookAdder(name: String, shutdownHook: => Unit) : Unit = { + // mutate the first element + array(0) = array(0).toString + name + // invoke the shutdown hook (see below, it mutates the second element) + shutdownHook + } + Exit.setShutdownHookAdder(shutdownHookAdder) + def sideEffect(): Unit = { + // mutate the second element + array(1) = array(1).asInstanceOf[Int] + 1 + } + try { + Exit.addShutdownHook(name, sideEffect) // by-name parameter, only invoked due to above shutdownHookAdder + assertEquals(1, array(1)) + assertEquals(name * array(1).asInstanceOf[Int], array(0).toString) + Exit.addShutdownHook(name, array(1) = array(1).asInstanceOf[Int] + 1) // by-name parameter, only invoked due to above shutdownHookAdder + assertEquals(2, array(1)) + assertEquals(name * array(1).asInstanceOf[Int], array(0).toString) + } finally { + Exit.resetShutdownHookAdder() + } + } + + @Test + def shouldNotInvokeShutdownHookImmediately(): Unit = { + val name = "name" + val array:Array[String] = Array(name) + + def sideEffect(): Unit = { + // mutate the first element + array(0) = array(0) + name + } + Exit.addShutdownHook(name, sideEffect) // by-name parameter, not invoked + // make sure the first element wasn't mutated + assertEquals(name, array(0)) + Exit.addShutdownHook(name, sideEffect()) // by-name parameter, not invoked + // again make sure the first element wasn't mutated + assertEquals(name, array(0)) + Exit.addShutdownHook(name, array(0) = array(0) + name) // by-name parameter, not invoked + // again make sure the first element wasn't mutated + assertEquals(name, array(0)) + } +} diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala index 8a5eefb859aad..b411519fd4513 100755 --- a/core/src/test/scala/other/kafka/StressTestLog.scala +++ b/core/src/test/scala/other/kafka/StressTestLog.scala @@ -57,13 +57,11 @@ object StressTestLog { val reader = new ReaderThread(log) reader.start() - Runtime.getRuntime().addShutdownHook(new Thread() { - override def run() = { + Exit.addShutdownHook("stress-test-shutdown-hook", { running.set(false) writer.join() reader.join() Utils.delete(dir) - } }) while(running.get) { diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java index e292f16eda77a..6e7afa8a6c2d9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java @@ -19,6 +19,7 @@ import java.time.Duration; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; @@ -53,25 +54,21 @@ public class EosTestClient extends SmokeTestUtil { private volatile boolean isRunning = true; public void start() { - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - @Override - public void run() { - isRunning = false; - streams.close(Duration.ofSeconds(300)); - - // need to wait for callback to avoid race condition - // -> make sure the callback printout to stdout is there as it is expected test output - waitForStateTransitionCallback(); - - // do not remove these printouts since they are needed for health scripts - if (!uncaughtException) { - System.out.println(System.currentTimeMillis()); - System.out.println("EOS-TEST-CLIENT-CLOSED"); - System.out.flush(); - } - + Exit.addShutdownHook("streams-shutdown-hook", () -> { + isRunning = false; + streams.close(Duration.ofSeconds(300)); + + // need to wait for callback to avoid race condition + // -> make sure the callback printout to stdout is there as it is expected test output + waitForStateTransitionCallback(); + + // do not remove these printouts since they are needed for health scripts + if (!uncaughtException) { + System.out.println(System.currentTimeMillis()); + System.out.println("EOS-TEST-CLIENT-CLOSED"); + System.out.flush(); } - })); + }); while (isRunning) { if (streams == null) { diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java index e98d1527e144b..7dc006f092096 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java @@ -69,11 +69,11 @@ private static synchronized void updateNumRecordsProduces(final int delta) { static void generate(final String kafka) { - Runtime.getRuntime().addShutdownHook(new Thread(() -> { + Exit.addShutdownHook("streams-eos-test-driver-shutdown-hook", () -> { System.out.println("Terminating"); System.out.flush(); isRunning = false; - })); + }); final Properties producerProps = new Properties(); producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "EosTest"); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java index a6a2ebb33781f..bb923ba1caaa9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java @@ -62,12 +62,7 @@ public void uncaughtException(final Thread t, final Throwable e) { } }); - Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { - @Override - public void run() { - streams.close(Duration.ofSeconds(5)); - } - })); + Exit.addShutdownHook("streams-shutdown-hook", () -> streams.close(Duration.ofSeconds(5))); final Properties producerProps = new Properties(); producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest"); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java index 7196657f8ea00..db243fdd849c9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; @@ -76,7 +77,7 @@ public void start(final Properties streamsProperties) { e.printStackTrace(); }); - Runtime.getRuntime().addShutdownHook(new Thread(this::close)); + Exit.addShutdownHook("streams-shutdown-hook", () -> close()); thread = new Thread(() -> streams.start()); thread.start(); diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberTestClient.java index 96ccad44de72d..8c7c5cd56476d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberTestClient.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberTestClient.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams.tests; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; @@ -70,15 +71,12 @@ public static void main(final String[] args) throws Exception { streams.start(); - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - System.out.println("closing Kafka Streams instance"); - System.out.flush(); - streams.close(); - System.out.println("Static membership test closed"); - System.out.flush(); - } + Exit.addShutdownHook("streams-shutdown-hook", () -> { + System.out.println("closing Kafka Streams instance"); + System.out.flush(); + streams.close(); + System.out.println("Static membership test closed"); + System.out.flush(); }); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java index ac4e1200a7739..b3316247a89a6 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsBrokerDownResilienceTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.KafkaStreams; @@ -114,12 +115,11 @@ public void apply(final String key, final String value) { System.out.println("Start Kafka Streams"); streams.start(); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { + Exit.addShutdownHook("streams-shutdown-hook", () -> { streams.close(Duration.ofSeconds(30)); System.out.println("Complete shutdown of streams resilience test app now"); System.out.flush(); - } - )); + }); } private static boolean confirmCorrectConfigs(final Properties properties) { diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java index c408d9f3da592..b98f86141a037 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; @@ -110,12 +111,12 @@ public static void main(final String[] args) throws Exception { streams.start(); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { + Exit.addShutdownHook("streams-shutdown-hook", () -> { System.out.println("closing Kafka Streams instance"); System.out.flush(); streams.close(Duration.ofMillis(5000)); System.out.println("NAMED_REPARTITION_TEST Streams Stopped"); System.out.flush(); - })); + }); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java index 62f0e7461fce5..afec99d03d76f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsOptimizedTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; @@ -131,15 +132,12 @@ public static void main(final String[] args) throws Exception { streams.start(); - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - System.out.println("closing Kafka Streams instance"); - System.out.flush(); - streams.close(Duration.ofMillis(5000)); - System.out.println("OPTIMIZE_TEST Streams Stopped"); - System.out.flush(); - } + Exit.addShutdownHook("streams-shutdown-hook", () -> { + System.out.println("closing Kafka Streams instance"); + System.out.flush(); + streams.close(Duration.ofMillis(5000)); + System.out.println("OPTIMIZE_TEST Streams Stopped"); + System.out.flush(); }); } diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java index 6fa225740a2c0..2ac275fcb9d16 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsStandByReplicaTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; @@ -147,10 +148,10 @@ public static void main(final String[] args) throws IOException { System.out.println("Start Kafka Streams"); streams.start(); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { + Exit.addShutdownHook("streams-shutdown-hook", () -> { shutdown(streams); System.out.println("Shut down streams now"); - })); + }); } private static void shutdown(final KafkaStreams streams) { diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java index 360f11e93e6d2..73d7f076f4280 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.utils.ByteBufferInputStream; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaClientSupplier; import org.apache.kafka.streams.KafkaStreams; @@ -81,13 +82,13 @@ public static void main(final String[] args) throws Exception { final KafkaStreams streams = buildStreams(streamsProperties); streams.start(); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { + Exit.addShutdownHook("streams-shutdown-hook", () -> { System.out.println("closing Kafka Streams instance"); System.out.flush(); streams.close(); System.out.println("UPGRADE-TEST-CLIENT-CLOSED"); System.out.flush(); - })); + }); } public static KafkaStreams buildStreams(final Properties streamsProperties) { diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java index df7359388dbcf..c5f6e6b3b0d9d 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Exit; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KafkaStreams.State; @@ -108,11 +109,11 @@ public void apply(final String key, final String value) { streams.start(); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { + Exit.addShutdownHook("streams-shutdown-hook", () -> { streams.close(); System.out.println(String.format("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED", upgradePhase)); System.out.flush(); - })); + }); } private static void addTasksToBuilder(final List tasks, final StringBuilder builder) { diff --git a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java index 746b8fe93e588..1ea826d835caa 100644 --- a/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java +++ b/tools/src/main/java/org/apache/kafka/tools/TransactionalMessageCopier.java @@ -33,6 +33,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.ProducerFencedException; +import org.apache.kafka.common.utils.Exit; import java.io.IOException; import java.time.Duration; @@ -264,7 +265,7 @@ public static void main(String[] args) throws IOException { final AtomicBoolean isShuttingDown = new AtomicBoolean(false); final AtomicLong remainingMessages = new AtomicLong(maxMessages); final AtomicLong numMessagesProcessed = new AtomicLong(0); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { + Exit.addShutdownHook("transactional-message-copier-shutdown-hook", () -> { isShuttingDown.set(true); // Flush any remaining messages producer.close(); @@ -272,7 +273,7 @@ public static void main(String[] args) throws IOException { consumer.close(); } System.out.println(shutDownString(numMessagesProcessed.get(), remainingMessages.get(), transactionalId)); - })); + }); try { Random random = new Random(); diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java index 0f2715a052865..9cad90f39d034 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java @@ -644,7 +644,7 @@ public static void main(String[] args) { try { final VerifiableConsumer consumer = createFromArgs(parser, args); - Runtime.getRuntime().addShutdownHook(new Thread(() -> consumer.close())); + Exit.addShutdownHook("verifiable-consumer-shutdown-hook", () -> consumer.close()); consumer.run(); } catch (ArgumentParserException e) { diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java index 12aa4f45fde49..534c21b682f90 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableLog4jAppender.java @@ -241,10 +241,8 @@ public static void main(String[] args) throws IOException { final VerifiableLog4jAppender appender = createFromArgs(args); boolean infinite = appender.maxMessages < 0; - Runtime.getRuntime().addShutdownHook(new Thread(() -> { - // Trigger main thread to stop producing messages - appender.stopLogging = true; - })); + // Trigger main thread to stop producing messages when shutting down + Exit.addShutdownHook("verifiable-log4j-appender-shutdown-hook", () -> appender.stopLogging = true); long maxMessages = infinite ? Long.MAX_VALUE : appender.maxMessages; for (long i = 0; i < maxMessages; i++) { diff --git a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java index 3e6f3f14ce7a8..7dbd215161905 100644 --- a/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java +++ b/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java @@ -517,7 +517,7 @@ public static void main(String[] args) { final long startMs = System.currentTimeMillis(); ThroughputThrottler throttler = new ThroughputThrottler(producer.throughput, startMs); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { + Exit.addShutdownHook("verifiable-producer-shutdown-hook", () -> { // Trigger main thread to stop producing messages producer.stopProducing = true; @@ -529,7 +529,7 @@ public static void main(String[] args) { double avgThroughput = 1000 * ((producer.numAcked) / (double) (stopMs - startMs)); producer.printJson(new ToolData(producer.numSent, producer.numAcked, producer.throughput, avgThroughput)); - })); + }); producer.run(throttler); } catch (ArgumentParserException e) { diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java index 12e26a713f434..1674b2d92aaa5 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java @@ -249,7 +249,7 @@ public static void main(String[] args) throws Exception { log.info("Starting agent process."); final Agent agent = new Agent(platform, Scheduler.SYSTEM, restServer, resource); restServer.start(resource); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { + Exit.addShutdownHook("agent-shutdown-hook", () -> { log.warn("Running agent shutdown hook."); try { agent.beginShutdown(); @@ -257,7 +257,7 @@ public static void main(String[] args) throws Exception { } catch (Exception e) { log.error("Got exception while running agent shutdown hook.", e); } - })); + }); if (taskSpec != null) { TaskSpec spec = null; try { diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java index 867de5563a206..47f80e5c7a15e 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java @@ -171,7 +171,7 @@ public static void main(String[] args) throws Exception { final Coordinator coordinator = new Coordinator(platform, Scheduler.SYSTEM, restServer, resource, ThreadLocalRandom.current().nextLong(0, Long.MAX_VALUE / 2)); restServer.start(resource); - Runtime.getRuntime().addShutdownHook(new Thread(() -> { + Exit.addShutdownHook("coordinator-shutdown-hook", () -> { log.warn("Running coordinator shutdown hook."); try { coordinator.beginShutdown(false); @@ -179,7 +179,7 @@ public static void main(String[] args) throws Exception { } catch (Exception e) { log.error("Got exception while running coordinator shutdown hook.", e); } - })); + }); coordinator.waitForShutdown(); } };