Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions clients/src/main/java/org/apache/kafka/common/utils/Exit.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ public interface Procedure {
void execute(int statusCode, String message);
}

public interface ShutdownHookAdder {
void addShutdownHook(String name, Runnable runnable);
}

private static final Procedure DEFAULT_HALT_PROCEDURE = new Procedure() {
@Override
public void execute(int statusCode, String message) {
Expand All @@ -40,8 +44,19 @@ public void execute(int statusCode, String message) {
}
};

private static final ShutdownHookAdder DEFAULT_SHUTDOWN_HOOK_ADDER = new ShutdownHookAdder() {
@Override
public void addShutdownHook(String name, Runnable runnable) {
if (name != null)
Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name, runnable));
else
Runtime.getRuntime().addShutdownHook(new Thread(runnable));
}
};

private volatile static Procedure exitProcedure = DEFAULT_EXIT_PROCEDURE;
private volatile static Procedure haltProcedure = DEFAULT_HALT_PROCEDURE;
private volatile static ShutdownHookAdder shutdownHookAdder = DEFAULT_SHUTDOWN_HOOK_ADDER;

public static void exit(int statusCode) {
exit(statusCode, null);
Expand All @@ -59,6 +74,10 @@ public static void halt(int statusCode, String message) {
haltProcedure.execute(statusCode, message);
}

public static void addShutdownHook(String name, Runnable runnable) {
shutdownHookAdder.addShutdownHook(name, runnable);
}

public static void setExitProcedure(Procedure procedure) {
exitProcedure = procedure;
}
Expand All @@ -67,6 +86,10 @@ public static void setHaltProcedure(Procedure procedure) {
haltProcedure = procedure;
}

public static void setShutdownHookAdder(ShutdownHookAdder shutdownHookAdder) {
Exit.shutdownHookAdder = shutdownHookAdder;
}

public static void resetExitProcedure() {
exitProcedure = DEFAULT_EXIT_PROCEDURE;
}
Expand All @@ -75,4 +98,7 @@ public static void resetHaltProcedure() {
haltProcedure = DEFAULT_HALT_PROCEDURE;
}

public static void resetShutdownHookAdder() {
shutdownHookAdder = DEFAULT_SHUTDOWN_HOOK_ADDER;
}
}
88 changes: 88 additions & 0 deletions clients/src/test/java/org/apache/kafka/common/utils/ExitTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.utils;

import org.junit.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static org.junit.Assert.assertEquals;

public class ExitTest {
@Test
public void shouldHaltImmediately() {
List<Object> list = new ArrayList<>();
Exit.setHaltProcedure((statusCode, message) -> {
list.add(statusCode);
list.add(message);
});
try {
int statusCode = 0;
String message = "mesaage";
Exit.halt(statusCode);
Exit.halt(statusCode, message);
assertEquals(Arrays.asList(statusCode, null, statusCode, message), list);
} finally {
Exit.resetHaltProcedure();
}
}

@Test
public void shouldExitImmediately() {
List<Object> list = new ArrayList<>();
Exit.setExitProcedure((statusCode, message) -> {
list.add(statusCode);
list.add(message);
});
try {
int statusCode = 0;
String message = "mesaage";
Exit.exit(statusCode);
Exit.exit(statusCode, message);
assertEquals(Arrays.asList(statusCode, null, statusCode, message), list);
} finally {
Exit.resetExitProcedure();
}
}

@Test
public void shouldAddShutdownHookImmediately() {
List<Object> list = new ArrayList<>();
Exit.setShutdownHookAdder((name, runnable) -> {
list.add(name);
list.add(runnable);
});
try {
Runnable runnable = () -> { };
String name = "name";
Exit.addShutdownHook(name, runnable);
assertEquals(Arrays.asList(name, runnable), list);
} finally {
Exit.resetShutdownHookAdder();
}
}

@Test
public void shouldNotInvokeShutdownHookImmediately() {
List<Object> list = new ArrayList<>();
Runnable runnable = () -> list.add(this);
Exit.addShutdownHook("message", runnable);
assertEquals(0, list.size());
}
}
14 changes: 6 additions & 8 deletions clients/src/test/java/org/apache/kafka/test/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -262,14 +263,11 @@ public static File tempDirectory(final Path parent, String prefix) {
}
file.deleteOnExit();

Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
try {
Utils.delete(file);
} catch (IOException e) {
log.error("Error deleting {}", file.getAbsolutePath(), e);
}
Exit.addShutdownHook("delete-temp-file-shutdown-hook", () -> {
try {
Utils.delete(file);
} catch (IOException e) {
log.error("Error deleting {}", file.getAbsolutePath(), e);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void start() {
}
startLatch = new CountDownLatch(herders.size());
stopLatch = new CountDownLatch(herders.size());
Runtime.getRuntime().addShutdownHook(shutdownHook);
Exit.addShutdownHook("mirror-maker-shutdown-hook", shutdownHook);
for (Herder herder : herders.values()) {
try {
herder.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.runtime;

import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.connect.runtime.rest.RestServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -48,7 +49,7 @@ public Connect(Herder herder, RestServer rest) {
public void start() {
try {
log.info("Kafka Connect starting");
Runtime.getRuntime().addShutdownHook(shutdownHook);
Exit.addShutdownHook("connect-shutdown-hook", shutdownHook);

herder.start();
rest.initializeResources(herder);
Expand Down
4 changes: 1 addition & 3 deletions core/src/main/scala/kafka/Kafka.scala
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,7 @@ object Kafka extends Logging {
}

// attach shutdown handler to catch terminating signals as well as normal termination
Runtime.getRuntime().addShutdownHook(new Thread("kafka-shutdown-hook") {
override def run(): Unit = kafkaServerStartable.shutdown()
})
Exit.addShutdownHook("kafka-shutdown-hook", kafkaServerStartable.shutdown)

kafkaServerStartable.startup()
kafkaServerStartable.awaitShutdown()
Expand Down
6 changes: 2 additions & 4 deletions core/src/main/scala/kafka/tools/ConsoleConsumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import com.typesafe.scalalogging.LazyLogging
import joptsimple._
import kafka.common.MessageFormatter
import kafka.utils.Implicits._
import kafka.utils._
import kafka.utils.{Exit, _}
import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, ConsumerRecord, KafkaConsumer}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{AuthenticationException, TimeoutException, WakeupException}
Expand Down Expand Up @@ -85,16 +85,14 @@ object ConsoleConsumer extends Logging {
}

def addShutdownHook(consumer: ConsumerWrapper, conf: ConsumerConfig): Unit = {
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
Exit.addShutdownHook("consumer-shutdown-hook", {
consumer.wakeup()

shutdownLatch.await()

if (conf.enableSystestEventsLogging) {
System.out.println("shutdown_complete")
}
}
})
}

Expand Down
6 changes: 1 addition & 5 deletions core/src/main/scala/kafka/tools/ConsoleProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,7 @@ object ConsoleProducer {

val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps(config))

Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
producer.close()
}
})
Exit.addShutdownHook("producer-shutdown-hook", producer.close)

var record: ProducerRecord[Array[Byte], Array[Byte]] = null
do {
Expand Down
6 changes: 1 addition & 5 deletions core/src/main/scala/kafka/tools/MirrorMaker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -515,11 +515,7 @@ object MirrorMaker extends Logging with KafkaMetricsGroup {
offsetCommitIntervalMs = options.valueOf(offsetCommitIntervalMsOpt).intValue()
val numStreams = options.valueOf(numStreamsOpt).intValue()

Runtime.getRuntime.addShutdownHook(new Thread("MirrorMakerShutdownHook") {
override def run(): Unit = {
cleanShutdown()
}
})
Exit.addShutdownHook("MirrorMakerShutdownHook", cleanShutdown())

// create producer
val producerProps = Utils.loadProps(options.valueOf(producerConfigOpt))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,9 @@ object ReplicaVerificationTool extends Logging {
fetcherId = counter.incrementAndGet())
}

Runtime.getRuntime.addShutdownHook(new Thread() {
override def run(): Unit = {
Exit.addShutdownHook("ReplicaVerificationToolShutdownHook", {
info("Stopping all fetchers")
fetcherThreads.foreach(_.shutdown())
}
})
fetcherThreads.foreach(_.start())
println(s"${ReplicaVerificationTool.getCurrentTimeString()}: verification process is started.")
Expand Down
12 changes: 11 additions & 1 deletion core/src/main/scala/kafka/utils/Exit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,20 +34,30 @@ object Exit {
throw new AssertionError("halt should not return, but it did.")
}

def addShutdownHook(name: String, shutdownHook: => Unit): Unit = {
JExit.addShutdownHook(name, () => shutdownHook)
}

def setExitProcedure(exitProcedure: (Int, Option[String]) => Nothing): Unit =
JExit.setExitProcedure(functionToProcedure(exitProcedure))

def setHaltProcedure(haltProcedure: (Int, Option[String]) => Nothing): Unit =
JExit.setHaltProcedure(functionToProcedure(haltProcedure))

def setShutdownHookAdder(shutdownHookAdder: (String, => Unit) => Unit): Unit = {
JExit.setShutdownHookAdder((name, runnable) => shutdownHookAdder(name, runnable.run))
}

def resetExitProcedure(): Unit =
JExit.resetExitProcedure()

def resetHaltProcedure(): Unit =
JExit.resetHaltProcedure()

def resetShutdownHookAdder(): Unit =
JExit.resetShutdownHookAdder()

private def functionToProcedure(procedure: (Int, Option[String]) => Nothing) = new JExit.Procedure {
def execute(statusCode: Int, message: String): Unit = procedure(statusCode, Option(message))
}

}
9 changes: 4 additions & 5 deletions core/src/test/scala/kafka/security/minikdc/MiniKdc.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import org.apache.directory.server.kerberos.shared.keytab.{Keytab, KeytabEntry}
import org.apache.directory.server.protocol.shared.transport.{TcpTransport, UdpTransport}
import org.apache.directory.server.xdbm.Index
import org.apache.directory.shared.kerberos.KerberosTime
import org.apache.kafka.common.utils.{Java, KafkaThread, Utils}
import org.apache.kafka.common.utils.{Java, Utils}

/**
* Mini KDC based on Apache Directory Server that can be embedded in tests or used from command line as a standalone
Expand Down Expand Up @@ -370,7 +370,7 @@ object MiniKdc {
}
}

private def start(workDir: File, config: Properties, keytabFile: File, principals: Seq[String]): Unit = {
private[minikdc] def start(workDir: File, config: Properties, keytabFile: File, principals: Seq[String]): MiniKdc = {
val miniKdc = new MiniKdc(config, workDir)
miniKdc.start()
miniKdc.createPrincipal(keytabFile, principals: _*)
Expand All @@ -390,9 +390,8 @@ object MiniKdc {
|
""".stripMargin
println(infoMessage)
Runtime.getRuntime.addShutdownHook(new KafkaThread("minikdc-shutdown-hook", false) {
miniKdc.stop()
})
Exit.addShutdownHook("minikdc-shutdown-hook", miniKdc.stop)
miniKdc
}

val OrgName = "org.name"
Expand Down
46 changes: 46 additions & 0 deletions core/src/test/scala/kafka/security/minikdc/MiniKdcTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.security.minikdc

import java.util.Properties

import kafka.utils.TestUtils
import org.junit.Test
import org.junit.Assert._

class MiniKdcTest {
@Test
def shouldNotStopImmediatelyWhenStarted(): Unit = {
val config = new Properties()
config.setProperty("kdc.bind.address", "0.0.0.0")
config.setProperty("transport", "TCP");
config.setProperty("max.ticket.lifetime", "86400000")
config.setProperty("org.name", "Example")
config.setProperty("kdc.port", "0")
config.setProperty("org.domain", "COM")
config.setProperty("max.renewable.lifetime", "604800000")
config.setProperty("instance", "DefaultKrbServer")
val minikdc = MiniKdc.start(TestUtils.tempDir(), config, TestUtils.tempFile(), List("foo"))
val running = System.getProperty(MiniKdc.JavaSecurityKrb5Conf) != null
try {
assertTrue("MiniKdc stopped immediately; it should not have", running)
} finally {
if (running) minikdc.stop()
}
}
}
Loading