From 5ee73553d07ec84adb3a97f483e8cf9fc5426fad Mon Sep 17 00:00:00 2001 From: David Mao Date: Mon, 1 May 2023 18:39:10 -0700 Subject: [PATCH 1/5] Reduce number of threads created for integration test brokers --- .../test/java/org/apache/kafka/test/TestUtils.java | 11 ----------- .../kafka/network/DynamicNumNetworkThreadsTest.scala | 1 + .../scala/unit/kafka/network/SocketServerTest.scala | 1 + core/src/test/scala/unit/kafka/utils/TestUtils.scala | 4 ++++ 4 files changed, 6 insertions(+), 11 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index b63a4f2096976..b9b2a0465471c 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -150,17 +150,6 @@ public static String randomString(final int len) { public static File tempFile(final String prefix, final String suffix) throws IOException { final File file = Files.createTempFile(prefix, suffix).toFile(); file.deleteOnExit(); - - // Note that we don't use Exit.addShutdownHook here because it allows for the possibility of accidently - // overriding the behaviour of this hook leading to leaked files. - Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon("delete-temp-file-shutdown-hook", () -> { - try { - Utils.delete(file); - } catch (IOException e) { - log.error("Error deleting {}", file.getAbsolutePath(), e); - } - })); - return file; } diff --git a/core/src/test/scala/integration/kafka/network/DynamicNumNetworkThreadsTest.scala b/core/src/test/scala/integration/kafka/network/DynamicNumNetworkThreadsTest.scala index b9f05d5aa8738..a76e6cf1ad0c0 100644 --- a/core/src/test/scala/integration/kafka/network/DynamicNumNetworkThreadsTest.scala +++ b/core/src/test/scala/integration/kafka/network/DynamicNumNetworkThreadsTest.scala @@ -38,6 +38,7 @@ class DynamicNumNetworkThreadsTest extends BaseRequestTest { properties.put(KafkaConfig.ListenersProp, s"$internal://localhost:0, $external://localhost:0") properties.put(KafkaConfig.ListenerSecurityProtocolMapProp, s"$internal:PLAINTEXT, $external:PLAINTEXT") properties.put(s"listener.name.${internal.toLowerCase}.${KafkaConfig.NumNetworkThreadsProp}", "2") + properties.put(KafkaConfig.NumNetworkThreadsProp, Defaults.NumNetworkThreads.toString) } @BeforeEach diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 5bab866de3e24..5e9e84fd465f2 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -1989,6 +1989,7 @@ class SocketServerTest { val sslProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, interBrokerSecurityProtocol = Some(SecurityProtocol.SSL), trustStoreFile = Some(trustStoreFile)) sslProps.put(KafkaConfig.ListenersProp, "SSL://localhost:0") + sslProps.put(KafkaConfig.NumNetworkThreadsProp, "1") sslProps } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index a499ac38344b2..5adc6fbb5d738 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -353,6 +353,10 @@ object TestUtils extends Logging { if (!props.containsKey(KafkaConfig.GroupInitialRebalanceDelayMsProp)) props.put(KafkaConfig.GroupInitialRebalanceDelayMsProp, "0") rack.foreach(props.put(KafkaConfig.RackProp, _)) + // Reduce number of threads per broker + props.put(KafkaConfig.NumNetworkThreadsProp, "2") + props.put(KafkaConfig.NumIoThreadsProp, "2") + props.put(KafkaConfig.BackgroundThreadsProp, "2") if (protocolAndPorts.exists { case (protocol, _) => usesSslTransportLayer(protocol) }) props ++= sslConfigs(Mode.SERVER, false, trustStoreFile, s"server$nodeId") From eca6484e382119f79bc1aeef6a0e8d171879fde1 Mon Sep 17 00:00:00 2001 From: David Mao Date: Mon, 1 May 2023 18:44:27 -0700 Subject: [PATCH 2/5] Fix dynamic broker config test --- .../scala/unit/kafka/server/DynamicBrokerConfigTest.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 76617426f9767..6bdc732c62a73 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -609,7 +609,10 @@ class DynamicBrokerConfigTest { val zkClient: KafkaZkClient = mock(classOf[KafkaZkClient]) when(zkClient.getEntityConfigs(anyString(), anyString())).thenReturn(new java.util.Properties()) - val oldConfig = KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092)) + val initialProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092) + initialProps.remove(KafkaConfig.NumIoThreadsProp) + initialProps.remove(KafkaConfig.BackgroundThreadsProp) + val oldConfig = KafkaConfig.fromProps(initialProps) val dynamicBrokerConfig = new DynamicBrokerConfig(oldConfig) dynamicBrokerConfig.initialize(Some(zkClient)) dynamicBrokerConfig.addBrokerReconfigurable(new TestDynamicThreadPool) From 96ab97a87abd378b26c61787073492cbf8085752 Mon Sep 17 00:00:00 2001 From: David Mao Date: Mon, 1 May 2023 19:03:35 -0700 Subject: [PATCH 3/5] Checkstyle --- clients/src/test/java/org/apache/kafka/test/TestUtils.java | 1 - 1 file changed, 1 deletion(-) diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index b9b2a0465471c..be13e8e7a62e9 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -29,7 +29,6 @@ import org.apache.kafka.common.requests.ByteBufferChannel; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.utils.Exit; -import org.apache.kafka.common.utils.KafkaThread; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; From 10ec406b05bcfcb4daefd4c2cadd6c2d3c3379d7 Mon Sep 17 00:00:00 2001 From: David Mao Date: Tue, 2 May 2023 12:59:39 -0700 Subject: [PATCH 4/5] re-test From 7d768720407cd894da1260a3bd457d6245fe5302 Mon Sep 17 00:00:00 2001 From: David Mao Date: Tue, 2 May 2023 17:27:14 -0700 Subject: [PATCH 5/5] Revert num IO threads change --- .../test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala | 1 - core/src/test/scala/unit/kafka/utils/TestUtils.scala | 1 - 2 files changed, 2 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 6bdc732c62a73..77715bc82300c 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -610,7 +610,6 @@ class DynamicBrokerConfigTest { when(zkClient.getEntityConfigs(anyString(), anyString())).thenReturn(new java.util.Properties()) val initialProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092) - initialProps.remove(KafkaConfig.NumIoThreadsProp) initialProps.remove(KafkaConfig.BackgroundThreadsProp) val oldConfig = KafkaConfig.fromProps(initialProps) val dynamicBrokerConfig = new DynamicBrokerConfig(oldConfig) diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index 5adc6fbb5d738..fcbd490721646 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -355,7 +355,6 @@ object TestUtils extends Logging { rack.foreach(props.put(KafkaConfig.RackProp, _)) // Reduce number of threads per broker props.put(KafkaConfig.NumNetworkThreadsProp, "2") - props.put(KafkaConfig.NumIoThreadsProp, "2") props.put(KafkaConfig.BackgroundThreadsProp, "2") if (protocolAndPorts.exists { case (protocol, _) => usesSslTransportLayer(protocol) })