From 2bb436eab3423b3a86449aa59d5ab8672d0f50be Mon Sep 17 00:00:00 2001 From: Liam Clarke-Hutchinson Date: Tue, 1 Mar 2022 16:04:02 +1300 Subject: [PATCH 01/18] Good ol println debugging for test that is flakey in CI context --- .../DynamicBrokerReconfigurationTest.scala | 31 ++++++++++++++++--- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 0cc58628103da..1b5fe910f386b 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -26,7 +26,6 @@ import java.time.Duration import java.util import java.util.{Collections, Properties} import java.util.concurrent._ - import javax.management.ObjectName import com.yammer.metrics.core.MetricName import kafka.admin.ConfigCommand @@ -36,7 +35,6 @@ import kafka.log.{CleanerConfig, LogConfig} import kafka.message.ProducerCompressionCodec import kafka.metrics.KafkaYammerMetrics import kafka.network.{Processor, RequestChannel} -import kafka.server.QuorumTestHarness import kafka.utils._ import kafka.utils.Implicits._ import kafka.zk.ConfigEntityChangeNotificationZNode @@ -99,11 +97,17 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup private val sslProperties2 = TestUtils.sslConfigs(Mode.SERVER, clientCert = false, Some(trustStoreFile2), "kafka") private val invalidSslProperties = invalidSslConfigs + private var originalLogLevels: Map[String, String] = null + def addExtraProps(props: Properties): Unit = { } @BeforeEach override def setUp(testInfo: TestInfo): Unit = { + // Eliminate log output while println debugging + originalLogLevels = Map.from(Log4jController.loggers) + Log4jController.loggers.foreach(ln => Log4jController.logLevel(ln._1, "FATAL")) + startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism))) super.setUp(testInfo) @@ -164,6 +168,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup TestUtils.shutdownServers(servers) super.tearDown() closeSasl() + + // Restore log levels for any other tests running in this process + originalLogLevels.foreach(ln => Log4jController.logLevel(ln._1, ln._2)) } @Test @@ -727,6 +734,8 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup @Test def testThreadPoolResize(): Unit = { + + val requestHandlerPrefix = "data-plane-kafka-request-handler-" val networkThreadPrefix = "data-plane-kafka-network-thread-" val fetcherThreadPrefix = "ReplicaFetcherThread-" @@ -770,9 +779,21 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup } def resizeThreadPool(propName: String, newSize: Int, threadPrefix: String): Unit = { - val props = new Properties + val props = new Properties() props.put(propName, newSize.toString) - reconfigureServers(props, perBrokerConfig = false, (propName, newSize.toString)) + + // Very temporary copy and paste hack to observe behaviour in CI via the magic println for this test only + alterConfigs(servers, adminClients.head, props, perBrokerConfig = false).all().get() + + // Same maxWaitMs as used in waitForConfigOnServer + servers.foreach { server => TestUtils.retry(10000) { + val value = server.config.originals.get(propName) + println(s"Server: ${server.config.brokerId}, prop: $propName, expected: $newSize, serverConf: $value") + assertEquals(newSize.toString, value) + } + } + + // reconfigureServers(props, perBrokerConfig = false, (propName, newSize.toString)) maybeVerifyThreadPoolSize(propName, newSize, threadPrefix) } @@ -792,6 +813,8 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup maybeVerifyThreadPoolSize(propName, threadPoolSize, threadPrefix) } + + val config = servers.head.config verifyThreadPoolResize(KafkaConfig.NumIoThreadsProp, config.numIoThreads, requestHandlerPrefix, mayReceiveDuplicates = false) From f27dd07a23a797395402c06c24c472eca4ee562f Mon Sep 17 00:00:00 2001 From: Liam Clarke-Hutchinson Date: Tue, 1 Mar 2022 18:06:56 +1300 Subject: [PATCH 02/18] Double timeout on flaky test, to see if it makes a difference --- .../kafka/server/DynamicBrokerReconfigurationTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 1b5fe910f386b..22dc8d60b2b14 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -785,8 +785,8 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup // Very temporary copy and paste hack to observe behaviour in CI via the magic println for this test only alterConfigs(servers, adminClients.head, props, perBrokerConfig = false).all().get() - // Same maxWaitMs as used in waitForConfigOnServer - servers.foreach { server => TestUtils.retry(10000) { + // Double the maxWaitMs as used in waitForConfigOnServer + servers.foreach { server => TestUtils.retry(20000) { val value = server.config.originals.get(propName) println(s"Server: ${server.config.brokerId}, prop: $propName, expected: $newSize, serverConf: $value") assertEquals(newSize.toString, value) From f20d7ea6a8c4ae2853c3f6cfba7c1b1d5972c5af Mon Sep 17 00:00:00 2001 From: Liam Clarke-Hutchinson Date: Tue, 1 Mar 2022 22:56:39 +1300 Subject: [PATCH 03/18] Set timeout even higher to try to eliminate slow processes as cause of flakiness --- .../kafka/server/DynamicBrokerReconfigurationTest.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 22dc8d60b2b14..a70189bb15568 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -786,7 +786,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup alterConfigs(servers, adminClients.head, props, perBrokerConfig = false).all().get() // Double the maxWaitMs as used in waitForConfigOnServer - servers.foreach { server => TestUtils.retry(20000) { + val maxWaitMs = 200000 + println(s"Using maxWaitMs of $maxWaitMs") + servers.foreach { server => TestUtils.retry(maxWaitMs) { val value = server.config.originals.get(propName) println(s"Server: ${server.config.brokerId}, prop: $propName, expected: $newSize, serverConf: $value") assertEquals(newSize.toString, value) From 7ce6b8e2d38e7563c15751f6e788e2813835a992 Mon Sep 17 00:00:00 2001 From: Liam Clarke-Hutchinson Date: Wed, 2 Mar 2022 09:16:44 +1300 Subject: [PATCH 04/18] Explore if querying config via adminClient produces a different result --- .../server/DynamicBrokerReconfigurationTest.scala | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index a70189bb15568..2a5796c6b4fa5 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -790,7 +790,17 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup println(s"Using maxWaitMs of $maxWaitMs") servers.foreach { server => TestUtils.retry(maxWaitMs) { val value = server.config.originals.get(propName) - println(s"Server: ${server.config.brokerId}, prop: $propName, expected: $newSize, serverConf: $value") + + // Check if using admin.describeConfigs returns expected value. + // Limiting this to num.network.threads as it is where the CI is breaking currently + val adminValue = if (propName == "num.network.threads") { + val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, "") + val config = adminClients.head.describeConfigs(List(brokerResource).asJava).values().get(brokerResource).get() + if (config.get(propName) != null) config.get(propName).value() else "null" + } else { + "n/a" + } + println(s"Server: ${server.config.brokerId}, prop: $propName, expected: $newSize, serverConf: $value, adminConf: $adminValue") assertEquals(newSize.toString, value) } } From dded546550267ef3d2deb8040da72194b32d126c Mon Sep 17 00:00:00 2001 From: Liam Clarke-Hutchinson Date: Wed, 2 Mar 2022 13:02:03 +1300 Subject: [PATCH 05/18] More println() --- .../kafka/server/DynamicBrokerReconfigurationTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 2a5796c6b4fa5..fd14e60e21229 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -800,7 +800,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup } else { "n/a" } - println(s"Server: ${server.config.brokerId}, prop: $propName, expected: $newSize, serverConf: $value, adminConf: $adminValue") + println(s"Server: ${server.config.brokerId}, prop: $propName, expected: $newSize, serverConf: ${ server.config.get(propName) } serverOriginalsConf: $value, adminConf: $adminValue") assertEquals(newSize.toString, value) } } From a4bfda7ce958230249f72634de70bbb70c23af3c Mon Sep 17 00:00:00 2001 From: Liam Clarke-Hutchinson Date: Wed, 2 Mar 2022 15:36:31 +1300 Subject: [PATCH 06/18] Extend println() debugging into SocketServer --- .../main/scala/kafka/network/SocketServer.scala | 13 ++++++++----- .../server/DynamicBrokerReconfigurationTest.scala | 15 +++++++-------- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 88dfa15b3f5c9..4907da3367410 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -581,16 +581,18 @@ class DataPlaneAcceptor(socketServer: SocketServer, * the configs have passed validation using {@link #validateReconfiguration ( Map )}. */ override def reconfigure(configs: util.Map[String, _]): Unit = { + val brokerId = configs.get(KafkaConfig.BrokerIdProp).asInstanceOf[Int] val newNumNetworkThreads = configs.get(KafkaConfig.NumNetworkThreadsProp).asInstanceOf[Int] - + println(s"SocketServer on $brokerId - newThreads: $newNumNetworkThreads, currentProcessors: ${processors.length}") if (newNumNetworkThreads != processors.length) { - info(s"Resizing network thread pool size for ${endPoint.listenerName} listener from ${processors.length} to $newNumNetworkThreads") + println(s"SocketServer on $brokerId - Resizing network thread pool size for ${endPoint.listenerName} listener from ${processors.length} to $newNumNetworkThreads") if (newNumNetworkThreads > processors.length) { - addProcessors(newNumNetworkThreads - processors.length) + addProcessors(newNumNetworkThreads - processors.length, brokerId) } else if (newNumNetworkThreads < processors.length) { removeProcessors(processors.length - newNumNetworkThreads) } } + println(s"SocketServer on $brokerId reconfigure complete") } /** @@ -885,15 +887,16 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer, @Override def wakeup(): Unit = nioSelector.wakeup() - def addProcessors(toCreate: Int): Unit = { + def addProcessors(toCreate: Int, brokerId: Int = -1): Unit = { val listenerName = endPoint.listenerName val securityProtocol = endPoint.securityProtocol val listenerProcessors = new ArrayBuffer[Processor]() - + println(s"SocketServer on $brokerId - addProcessors adding $toCreate") for (_ <- 0 until toCreate) { val processor = newProcessor(socketServer.nextProcessorId(), listenerName, securityProtocol) listenerProcessors += processor requestChannel.addProcessor(processor) + println(s"SocketServer on $brokerId - addProcessors added processor ${processor.id}") } processors ++= listenerProcessors diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index fd14e60e21229..2f999ff8ac7d2 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -786,21 +786,20 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup alterConfigs(servers, adminClients.head, props, perBrokerConfig = false).all().get() // Double the maxWaitMs as used in waitForConfigOnServer - val maxWaitMs = 200000 - println(s"Using maxWaitMs of $maxWaitMs") + val maxWaitMs = 20000 servers.foreach { server => TestUtils.retry(maxWaitMs) { val value = server.config.originals.get(propName) - + if (propName == "num.network.threads") { + println(s"Using maxWaitMs of $maxWaitMs") // Check if using admin.describeConfigs returns expected value. // Limiting this to num.network.threads as it is where the CI is breaking currently - val adminValue = if (propName == "num.network.threads") { + val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, "") val config = adminClients.head.describeConfigs(List(brokerResource).asJava).values().get(brokerResource).get() - if (config.get(propName) != null) config.get(propName).value() else "null" - } else { - "n/a" + val adminValue = if (config.get(propName) != null) config.get(propName).value() else "null" + println(s"Server: ${server.config.brokerId}, prop: $propName, expected: $newSize, serverConf: ${ server.config.get(propName) } serverOriginalsConf: $value, adminConf: $adminValue") } - println(s"Server: ${server.config.brokerId}, prop: $propName, expected: $newSize, serverConf: ${ server.config.get(propName) } serverOriginalsConf: $value, adminConf: $adminValue") + assertEquals(newSize.toString, value) } } From eba6a73cd5196561b4864e824ff53f40e9686328 Mon Sep 17 00:00:00 2001 From: Liam Clarke-Hutchinson Date: Thu, 3 Mar 2022 11:37:55 +1300 Subject: [PATCH 07/18] Capture listener name --- core/src/main/scala/kafka/network/SocketServer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 4907da3367410..aa7a747460c61 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -896,7 +896,7 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer, val processor = newProcessor(socketServer.nextProcessorId(), listenerName, securityProtocol) listenerProcessors += processor requestChannel.addProcessor(processor) - println(s"SocketServer on $brokerId - addProcessors added processor ${processor.id}") + println(s"SocketServer on $brokerId - addProcessors added processor ${processor.id} for $listenerName") } processors ++= listenerProcessors From 0e64fc6ead222f5c263e78d9305099fe62f29ad8 Mon Sep 17 00:00:00 2001 From: Liam Clarke-Hutchinson Date: Thu, 3 Mar 2022 15:56:44 +1300 Subject: [PATCH 08/18] Test failure appears to be because only Internal listener is reconfigured, so more println() to figure out why --- .../scala/kafka/network/SocketServer.scala | 41 +++++++++++-------- .../kafka/server/DynamicBrokerConfig.scala | 35 +++++++++++++++- 2 files changed, 57 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index aa7a747460c61..0da710f888a9c 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -268,7 +268,7 @@ class SocketServer(val config: KafkaConfig, config.addReconfigurable(dataPlaneAcceptor) dataPlaneAcceptor.configure(parsedConfigs) dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor) - info(s"Created data-plane acceptor and processors for endpoint : ${endpoint.listenerName}") + println(s"Created data-plane acceptor and processors for endpoint : ${endpoint.listenerName}") } } @@ -556,21 +556,26 @@ class DataPlaneAcceptor(socketServer: SocketServer, * the AlterConfigs response. */ override def validateReconfiguration(configs: util.Map[String, _]): Unit = { - configs.forEach { (k, v) => - if (reconfigurableConfigs.contains(k)) { - val newValue = v.asInstanceOf[Int] - val oldValue = processors.length - if (newValue != oldValue) { - val errorMsg = s"Dynamic thread count update validation failed for $k=$v" - if (newValue <= 0) - throw new ConfigException(s"$errorMsg, value should be at least 1") - if (newValue < oldValue / 2) - throw new ConfigException(s"$errorMsg, value should be at least half the current value $oldValue") - if (newValue > oldValue * 2) - throw new ConfigException(s"$errorMsg, value should not be greater than double the current value $oldValue") + try { + configs.forEach { (k, v) => + if (reconfigurableConfigs.contains(k)) { + val newValue = v.asInstanceOf[Int] + val oldValue = processors.length + if (newValue != oldValue) { + val errorMsg = s"Dynamic thread count update validation failed for $k=$v" + if (newValue <= 0) + throw new ConfigException(s"$errorMsg, value should be at least 1") + if (newValue < oldValue / 2) + throw new ConfigException(s"$errorMsg, value should be at least half the current value $oldValue") + if (newValue > oldValue * 2) + throw new ConfigException(s"$errorMsg, value should not be greater than double the current value $oldValue") + } } } } + catch { + case e: ConfigException => println(s"DataAcceptor invalidConf: $e"); throw e + } } /** @@ -583,16 +588,16 @@ class DataPlaneAcceptor(socketServer: SocketServer, override def reconfigure(configs: util.Map[String, _]): Unit = { val brokerId = configs.get(KafkaConfig.BrokerIdProp).asInstanceOf[Int] val newNumNetworkThreads = configs.get(KafkaConfig.NumNetworkThreadsProp).asInstanceOf[Int] - println(s"SocketServer on $brokerId - newThreads: $newNumNetworkThreads, currentProcessors: ${processors.length}") + println(s"DataAcceptor $listenerName on $brokerId - newThreads: $newNumNetworkThreads, currentProcessors: ${processors.length}") if (newNumNetworkThreads != processors.length) { - println(s"SocketServer on $brokerId - Resizing network thread pool size for ${endPoint.listenerName} listener from ${processors.length} to $newNumNetworkThreads") + println(s"DataAcceptor $listenerName on $brokerId - Resizing network thread pool size for ${endPoint.listenerName} listener from ${processors.length} to $newNumNetworkThreads") if (newNumNetworkThreads > processors.length) { addProcessors(newNumNetworkThreads - processors.length, brokerId) } else if (newNumNetworkThreads < processors.length) { removeProcessors(processors.length - newNumNetworkThreads) } } - println(s"SocketServer on $brokerId reconfigure complete") + println(s"DataAcceptor $listenerName on $brokerId reconfigure complete") } /** @@ -891,12 +896,12 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer, val listenerName = endPoint.listenerName val securityProtocol = endPoint.securityProtocol val listenerProcessors = new ArrayBuffer[Processor]() - println(s"SocketServer on $brokerId - addProcessors adding $toCreate") + println(s"DataAcceptor $listenerName on $brokerId - addProcessors adding $toCreate") for (_ <- 0 until toCreate) { val processor = newProcessor(socketServer.nextProcessorId(), listenerName, securityProtocol) listenerProcessors += processor requestChannel.addProcessor(processor) - println(s"SocketServer on $brokerId - addProcessors added processor ${processor.id} for $listenerName") + println(s"DataAcceptor $listenerName on $brokerId - addProcessors added processor ${processor.id}") } processors ++= listenerProcessors diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index cb6cd84d3b630..0e931b47d66c5 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -37,6 +37,7 @@ import org.apache.kafka.common.utils.{ConfigUtils, Utils} import scala.annotation.nowarn import scala.collection._ +import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ /** @@ -197,11 +198,37 @@ object DynamicBrokerConfig { class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging { + class Foo extends ArrayBuffer[Reconfigurable] { + override def clear(): Unit = { + println("Reconfigurables: cleared") + super.clear() + } + + override def addOne(elem: Reconfigurable): this.type = { + elem match { + case x: DataPlaneAcceptor => println(s"Reconfigurables: Adding data plane acceptor foe ${x.listenerName()}") + case _ => + } + + super.addOne(elem) + } + + override def subtractOne(elem: Reconfigurable): Foo.this.type = { + elem match { + case x: DataPlaneAcceptor => println(s"Reconfigurables: Removing data plane acceptor foe ${x.listenerName()}") + case _ => + } + super.subtractOne(elem) + } + + + } + private[server] val staticBrokerConfigs = ConfigDef.convertToStringMapWithPasswordValues(kafkaConfig.originalsFromThisConfig).asScala private[server] val staticDefaultConfigs = ConfigDef.convertToStringMapWithPasswordValues(KafkaConfig.defaultValues.asJava).asScala private val dynamicBrokerConfigs = mutable.Map[String, String]() private val dynamicDefaultConfigs = mutable.Map[String, String]() - private val reconfigurables = mutable.Buffer[Reconfigurable]() + private val reconfigurables: mutable.Buffer[Reconfigurable] = new Foo() private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]() private val lock = new ReentrantReadWriteLock private var currentConfig: KafkaConfig = null @@ -592,6 +619,12 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging allNewConfigs: util.Map[String, _], newCustomConfigs: util.Map[String, Object], validateOnly: Boolean): Unit = { + + reconfigurable match { + case reconfigurable1: ListenerReconfigurable => + println(s"Processing listener reconfigurable for ${reconfigurable1.listenerName()}") + case _ => + } val newConfigs = new util.HashMap[String, Object] allNewConfigs.forEach { (k, v) => newConfigs.put(k, v.asInstanceOf[AnyRef]) } newConfigs.putAll(newCustomConfigs) From 6dac9d01ef3f2fa023f28964657c1d44f3de781b Mon Sep 17 00:00:00 2001 From: Liam Clarke-Hutchinson Date: Mon, 7 Mar 2022 14:05:19 +1300 Subject: [PATCH 09/18] Continue trying to ascertain why external listener not being reconfigured --- core/src/main/scala/kafka/network/SocketServer.scala | 3 ++- core/src/main/scala/kafka/server/DynamicBrokerConfig.scala | 6 +++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 0da710f888a9c..306117953d456 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -276,7 +276,8 @@ class SocketServer(val config: KafkaConfig, endpointOpt.foreach { endpoint => connectionQuotas.addListener(config, endpoint.listenerName) val controlPlaneAcceptor = createControlPlaneAcceptor(endpoint, controlPlaneRequestChannelOpt.get) - controlPlaneAcceptor.addProcessors(1) + val brokerId = controlPlaneAcceptor.config.brokerId + controlPlaneAcceptor.addProcessors(1, brokerId) controlPlaneAcceptorOpt = Some(controlPlaneAcceptor) info(s"Created control-plane acceptor and processor for endpoint : ${endpoint.listenerName}") } diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 0e931b47d66c5..d5068fcb42c42 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -604,11 +604,15 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging validateOnly: Boolean, reloadOnly: Boolean): Unit = { val listenerName = listenerReconfigurable.listenerName + val brokerId = newConfig.brokerId + + println(s"processListenerReconfigurable brokerId, $brokerId, listenerName: $listenerName") val oldValues = currentConfig.valuesWithPrefixOverride(listenerName.configPrefix) val newValues = newConfig.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix) val (changeMap, deletedKeys) = updatedConfigs(newValues, oldValues) val updatedKeys = changeMap.keySet val configsChanged = needsReconfiguration(listenerReconfigurable.reconfigurableConfigs, updatedKeys, deletedKeys) + println(s"processListenerReconfigurable brokerId, $brokerId, listenerName: $listenerName, reloadOnly: $reloadOnly, configsChanged: $configsChanged") // if `reloadOnly`, reconfigure if configs haven't changed. Otherwise reconfigure if configs have changed if (reloadOnly != configsChanged) processReconfigurable(listenerReconfigurable, updatedKeys, newValues, customConfigs, validateOnly) @@ -622,7 +626,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging reconfigurable match { case reconfigurable1: ListenerReconfigurable => - println(s"Processing listener reconfigurable for ${reconfigurable1.listenerName()}") + println(s"processReconfigurable listenerReconfigurable: ${reconfigurable1.listenerName()}") case _ => } val newConfigs = new util.HashMap[String, Object] From b782b6ee1bc64c5523f0763a71e6b2c3c9bc5f20 Mon Sep 17 00:00:00 2001 From: Liam Clarke-Hutchinson Date: Mon, 7 Mar 2022 16:42:28 +1300 Subject: [PATCH 10/18] Try to be more selective... --- .../scala/kafka/network/SocketServer.scala | 17 ++++++++------ .../kafka/server/DynamicBrokerConfig.scala | 23 +++++++++++-------- .../DynamicBrokerReconfigurationTest.scala | 1 + 3 files changed, 25 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 306117953d456..a7f950d834397 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -589,16 +589,18 @@ class DataPlaneAcceptor(socketServer: SocketServer, override def reconfigure(configs: util.Map[String, _]): Unit = { val brokerId = configs.get(KafkaConfig.BrokerIdProp).asInstanceOf[Int] val newNumNetworkThreads = configs.get(KafkaConfig.NumNetworkThreadsProp).asInstanceOf[Int] - println(s"DataAcceptor $listenerName on $brokerId - newThreads: $newNumNetworkThreads, currentProcessors: ${processors.length}") + val printf = newNumNetworkThreads == 2 + + if (printf) println(s"DataAcceptor $listenerName on $brokerId - newThreads: $newNumNetworkThreads, currentProcessors: ${processors.length}, thread: ${Thread.currentThread()}") if (newNumNetworkThreads != processors.length) { - println(s"DataAcceptor $listenerName on $brokerId - Resizing network thread pool size for ${endPoint.listenerName} listener from ${processors.length} to $newNumNetworkThreads") + if (printf) println(s"DataAcceptor $listenerName on $brokerId - Resizing network thread pool size for ${endPoint.listenerName} listener from ${processors.length} to $newNumNetworkThreads") if (newNumNetworkThreads > processors.length) { - addProcessors(newNumNetworkThreads - processors.length, brokerId) + addProcessors(newNumNetworkThreads - processors.length, brokerId, printf) } else if (newNumNetworkThreads < processors.length) { removeProcessors(processors.length - newNumNetworkThreads) } } - println(s"DataAcceptor $listenerName on $brokerId reconfigure complete") + if (printf) println(s"DataAcceptor $listenerName on $brokerId reconfigure complete") } /** @@ -893,18 +895,19 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer, @Override def wakeup(): Unit = nioSelector.wakeup() - def addProcessors(toCreate: Int, brokerId: Int = -1): Unit = { + def addProcessors(toCreate: Int, brokerId: Int = -1, printf: Boolean = false): Unit = { val listenerName = endPoint.listenerName val securityProtocol = endPoint.securityProtocol val listenerProcessors = new ArrayBuffer[Processor]() - println(s"DataAcceptor $listenerName on $brokerId - addProcessors adding $toCreate") + if (printf) println(s"DataAcceptor $listenerName on $brokerId - addProcessors adding $toCreate") for (_ <- 0 until toCreate) { val processor = newProcessor(socketServer.nextProcessorId(), listenerName, securityProtocol) listenerProcessors += processor requestChannel.addProcessor(processor) - println(s"DataAcceptor $listenerName on $brokerId - addProcessors added processor ${processor.id}") + if (printf) println(s"DataAcceptor $listenerName on $brokerId - addProcessors added processor ${processor.id}") } + if (printf) println(listenerProcessors) processors ++= listenerProcessors if (processorsStarted.get) startProcessors(listenerProcessors) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index d5068fcb42c42..b9e153bfc43d4 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -228,7 +228,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging private[server] val staticDefaultConfigs = ConfigDef.convertToStringMapWithPasswordValues(KafkaConfig.defaultValues.asJava).asScala private val dynamicBrokerConfigs = mutable.Map[String, String]() private val dynamicDefaultConfigs = mutable.Map[String, String]() - private val reconfigurables: mutable.Buffer[Reconfigurable] = new Foo() + private val reconfigurables: mutable.Buffer[Reconfigurable] = mutable.Buffer[Reconfigurable]() private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]() private val lock = new ReentrantReadWriteLock private var currentConfig: KafkaConfig = null @@ -606,13 +606,16 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging val listenerName = listenerReconfigurable.listenerName val brokerId = newConfig.brokerId - println(s"processListenerReconfigurable brokerId, $brokerId, listenerName: $listenerName") + val printf = newConfig.numNetworkThreads == 2 val oldValues = currentConfig.valuesWithPrefixOverride(listenerName.configPrefix) val newValues = newConfig.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix) val (changeMap, deletedKeys) = updatedConfigs(newValues, oldValues) val updatedKeys = changeMap.keySet val configsChanged = needsReconfiguration(listenerReconfigurable.reconfigurableConfigs, updatedKeys, deletedKeys) - println(s"processListenerReconfigurable brokerId, $brokerId, listenerName: $listenerName, reloadOnly: $reloadOnly, configsChanged: $configsChanged") + if (printf) { + println(s"processListenerReconfigurable brokerId, $brokerId, listenerName: $listenerName, reloadOnly: $reloadOnly, configsChanged: $configsChanged, thread: ${Thread.currentThread()}") + println(s"Updated keys: $updatedKeys, ${changeMap.get("num.network.threads")}") + } // if `reloadOnly`, reconfigure if configs haven't changed. Otherwise reconfigure if configs have changed if (reloadOnly != configsChanged) processReconfigurable(listenerReconfigurable, updatedKeys, newValues, customConfigs, validateOnly) @@ -624,10 +627,10 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging newCustomConfigs: util.Map[String, Object], validateOnly: Boolean): Unit = { - reconfigurable match { - case reconfigurable1: ListenerReconfigurable => - println(s"processReconfigurable listenerReconfigurable: ${reconfigurable1.listenerName()}") - case _ => + val printf = reconfigurable match { + case _: ListenerReconfigurable => + allNewConfigs.get("num.network.threads").asInstanceOf[Int] == 2 + case _ => false } val newConfigs = new util.HashMap[String, Object] allNewConfigs.forEach { (k, v) => newConfigs.put(k, v.asInstanceOf[AnyRef]) } @@ -641,8 +644,10 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging } if (!validateOnly) { - info(s"Reconfiguring $reconfigurable, updated configs: $updatedConfigNames " + - s"custom configs: ${ConfigUtils.configMapToRedactedString(newCustomConfigs, KafkaConfig.configDef)}") + if (printf) { + println(s"Reconfiguring $reconfigurable, updated configs: $updatedConfigNames " + + s"custom configs: ${ConfigUtils.configMapToRedactedString(newCustomConfigs, KafkaConfig.configDef)}") + } reconfigurable.reconfigure(newConfigs) } } diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 2f999ff8ac7d2..9dde4c9d09d7b 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -782,6 +782,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup val props = new Properties() props.put(propName, newSize.toString) + // Very temporary copy and paste hack to observe behaviour in CI via the magic println for this test only alterConfigs(servers, adminClients.head, props, perBrokerConfig = false).all().get() From 09ec4418c1b2634cb424f5dd411eb5b88922e0f5 Mon Sep 17 00:00:00 2001 From: Liam Clarke-Hutchinson Date: Tue, 8 Mar 2022 13:05:00 +1300 Subject: [PATCH 11/18] Tweak --- .../scala/kafka/server/DynamicBrokerConfig.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index b9e153bfc43d4..fdc1fb65a1238 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -612,13 +612,16 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging val (changeMap, deletedKeys) = updatedConfigs(newValues, oldValues) val updatedKeys = changeMap.keySet val configsChanged = needsReconfiguration(listenerReconfigurable.reconfigurableConfigs, updatedKeys, deletedKeys) - if (printf) { - println(s"processListenerReconfigurable brokerId, $brokerId, listenerName: $listenerName, reloadOnly: $reloadOnly, configsChanged: $configsChanged, thread: ${Thread.currentThread()}") - println(s"Updated keys: $updatedKeys, ${changeMap.get("num.network.threads")}") - } + // if `reloadOnly`, reconfigure if configs haven't changed. Otherwise reconfigure if configs have changed - if (reloadOnly != configsChanged) + if (reloadOnly != configsChanged) { + if (printf) { + val stackTrace = util.Arrays.toString(Thread.currentThread().getStackTrace.asInstanceOf[Array[Object]]) + println(s"processListenerReconfigurable brokerId, $brokerId, listenerName: $listenerName, thread: $stackTrace") + println(s"Updated keys: $updatedKeys, ${changeMap.get("num.network.threads")}") + } processReconfigurable(listenerReconfigurable, updatedKeys, newValues, customConfigs, validateOnly) + } } private def processReconfigurable(reconfigurable: Reconfigurable, From b54f4d566c5b721ed1415d9cc760b8948322ca6a Mon Sep 17 00:00:00 2001 From: Liam Clarke-Hutchinson Date: Wed, 9 Mar 2022 14:14:08 +1300 Subject: [PATCH 12/18] Touch a file to try to get the PR build to run --- core/src/main/scala/kafka/server/DynamicBrokerConfig.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index fdc1fb65a1238..ecdc58c42bcde 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -617,7 +617,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging if (reloadOnly != configsChanged) { if (printf) { val stackTrace = util.Arrays.toString(Thread.currentThread().getStackTrace.asInstanceOf[Array[Object]]) - println(s"processListenerReconfigurable brokerId, $brokerId, listenerName: $listenerName, thread: $stackTrace") + println(s"processListenerReconfigurable brokerId, $brokerId, listenerName: $listenerName, thread: ${Thread.currentThread()}, stack: $stackTrace") println(s"Updated keys: $updatedKeys, ${changeMap.get("num.network.threads")}") } processReconfigurable(listenerReconfigurable, updatedKeys, newValues, customConfigs, validateOnly) From bb664cbecd818ffe4ec307f5aa0dff3610ada11e Mon Sep 17 00:00:00 2001 From: Liam Clarke-Hutchinson Date: Wed, 9 Mar 2022 19:18:54 +1300 Subject: [PATCH 13/18] See if validation is the issue --- .../main/scala/kafka/server/DynamicBrokerConfig.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index ecdc58c42bcde..b81d15d710ba4 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -616,7 +616,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging // if `reloadOnly`, reconfigure if configs haven't changed. Otherwise reconfigure if configs have changed if (reloadOnly != configsChanged) { if (printf) { - val stackTrace = util.Arrays.toString(Thread.currentThread().getStackTrace.asInstanceOf[Array[Object]]) + val stackTrace = "nop" //util.Arrays.toString(Thread.currentThread().getStackTrace.asInstanceOf[Array[Object]]) println(s"processListenerReconfigurable brokerId, $brokerId, listenerName: $listenerName, thread: ${Thread.currentThread()}, stack: $stackTrace") println(s"Updated keys: $updatedKeys, ${changeMap.get("num.network.threads")}") } @@ -630,16 +630,23 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging newCustomConfigs: util.Map[String, Object], validateOnly: Boolean): Unit = { + var lr: ListenerReconfigurable = null val printf = reconfigurable match { case _: ListenerReconfigurable => allNewConfigs.get("num.network.threads").asInstanceOf[Int] == 2 case _ => false } + + if (printf) lr = reconfigurable.asInstanceOf[ListenerReconfigurable] + if (lr != null) println(s"processReconfigurable for ${lr.listenerName()}") + val newConfigs = new util.HashMap[String, Object] allNewConfigs.forEach { (k, v) => newConfigs.put(k, v.asInstanceOf[AnyRef]) } newConfigs.putAll(newCustomConfigs) try { + if (lr != null) println(s"validateReconfig for ${lr.listenerName()}") reconfigurable.validateReconfiguration(newConfigs) + if (lr != null) println(s"validateReconfig success for ${lr.listenerName()}") } catch { case e: ConfigException => throw e case _: Exception => @@ -648,7 +655,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging if (!validateOnly) { if (printf) { - println(s"Reconfiguring $reconfigurable, updated configs: $updatedConfigNames " + + println(s"Reconfiguring ${lr.listenerName()}, updated configs: $updatedConfigNames " + s"custom configs: ${ConfigUtils.configMapToRedactedString(newCustomConfigs, KafkaConfig.configDef)}") } reconfigurable.reconfigure(newConfigs) From fc10bfbdebb824f7e0d623e3d69c637d3fa3915c Mon Sep 17 00:00:00 2001 From: Liam Clarke-Hutchinson Date: Tue, 15 Mar 2022 12:40:41 +1300 Subject: [PATCH 14/18] More println, more! --- .../src/main/scala/kafka/server/DynamicBrokerConfig.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index b81d15d710ba4..d8534fd0eed97 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -206,7 +206,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging override def addOne(elem: Reconfigurable): this.type = { elem match { - case x: DataPlaneAcceptor => println(s"Reconfigurables: Adding data plane acceptor foe ${x.listenerName()}") + case x: DataPlaneAcceptor => println(s"Reconfigurables: Broker ${x.config.brokerId}, Adding data plane acceptor for ${x.listenerName()}") case _ => } @@ -215,7 +215,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging override def subtractOne(elem: Reconfigurable): Foo.this.type = { elem match { - case x: DataPlaneAcceptor => println(s"Reconfigurables: Removing data plane acceptor foe ${x.listenerName()}") + case x: DataPlaneAcceptor => println(s"Reconfigurables: Broker ${x.config.brokerId}, Removing data plane acceptor for ${x.listenerName()}") case _ => } super.subtractOne(elem) @@ -644,7 +644,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging allNewConfigs.forEach { (k, v) => newConfigs.put(k, v.asInstanceOf[AnyRef]) } newConfigs.putAll(newCustomConfigs) try { - if (lr != null) println(s"validateReconfig for ${lr.listenerName()}") + if (lr != null) println(s"validateReconfig for ${lr.listenerName()}, validateOnly = $validateOnly") reconfigurable.validateReconfiguration(newConfigs) if (lr != null) println(s"validateReconfig success for ${lr.listenerName()}") } catch { @@ -659,6 +659,8 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging s"custom configs: ${ConfigUtils.configMapToRedactedString(newCustomConfigs, KafkaConfig.configDef)}") } reconfigurable.reconfigure(newConfigs) + } else { + println(s"${lr.listenerName()} was validatedOnly") } } } From 1aa40504f143487bacad3abd7907537bd154f593 Mon Sep 17 00:00:00 2001 From: Liam Clarke-Hutchinson Date: Tue, 15 Mar 2022 15:22:25 +1300 Subject: [PATCH 15/18] Re-enable logging buffer of reconfigurables in DynamicBrokerConfig --- .../scala/kafka/server/DynamicBrokerConfig.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index d8534fd0eed97..e652a5effcfd8 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -198,22 +198,22 @@ object DynamicBrokerConfig { class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging { - class Foo extends ArrayBuffer[Reconfigurable] { + class LoggingBuffer extends ArrayBuffer[Reconfigurable] { override def clear(): Unit = { println("Reconfigurables: cleared") super.clear() } override def addOne(elem: Reconfigurable): this.type = { - elem match { - case x: DataPlaneAcceptor => println(s"Reconfigurables: Broker ${x.config.brokerId}, Adding data plane acceptor for ${x.listenerName()}") - case _ => - } +// elem match { +// case x: DataPlaneAcceptor => println(s"Reconfigurables: Broker ${x.config.brokerId}, Adding data plane acceptor for ${x.listenerName()}") +// case _ => +// } super.addOne(elem) } - override def subtractOne(elem: Reconfigurable): Foo.this.type = { + override def subtractOne(elem: Reconfigurable): LoggingBuffer.this.type = { elem match { case x: DataPlaneAcceptor => println(s"Reconfigurables: Broker ${x.config.brokerId}, Removing data plane acceptor for ${x.listenerName()}") case _ => @@ -228,7 +228,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging private[server] val staticDefaultConfigs = ConfigDef.convertToStringMapWithPasswordValues(KafkaConfig.defaultValues.asJava).asScala private val dynamicBrokerConfigs = mutable.Map[String, String]() private val dynamicDefaultConfigs = mutable.Map[String, String]() - private val reconfigurables: mutable.Buffer[Reconfigurable] = mutable.Buffer[Reconfigurable]() + private val reconfigurables: mutable.Buffer[Reconfigurable] = new LoggingBuffer() private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]() private val lock = new ReentrantReadWriteLock private var currentConfig: KafkaConfig = null From 6fa2d247d7954cdaf30654e37dc3848ae98deb8b Mon Sep 17 00:00:00 2001 From: Liam Clarke-Hutchinson Date: Wed, 16 Mar 2022 15:58:41 +1300 Subject: [PATCH 16/18] Fix NPE --- core/src/main/scala/kafka/server/DynamicBrokerConfig.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index e652a5effcfd8..fa2c9e360c929 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -654,13 +654,13 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging } if (!validateOnly) { - if (printf) { + if (printf && lr != null) { println(s"Reconfiguring ${lr.listenerName()}, updated configs: $updatedConfigNames " + s"custom configs: ${ConfigUtils.configMapToRedactedString(newCustomConfigs, KafkaConfig.configDef)}") } reconfigurable.reconfigure(newConfigs) } else { - println(s"${lr.listenerName()} was validatedOnly") + if (lr != null) println(s"${lr.listenerName()} was validatedOnly") } } } From 55cd50fcbcb39b67b191a5e68363a4f9de326064 Mon Sep 17 00:00:00 2001 From: Liam Clarke-Hutchinson Date: Thu, 17 Mar 2022 11:36:44 +1300 Subject: [PATCH 17/18] Re-enable the flaky test I'm working on (I guess that the CI pipeline merges in trunk before running? --- .../kafka/server/DynamicBrokerReconfigurationTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index fffe7527cb50b..499c0d07dbb6a 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -735,7 +735,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup } @Test - @Disabled // TODO: To be re-enabled once we can make it less flaky (KAFKA-13672) + // @Disabled // TODO: To be re-enabled once we can make it less flaky (KAFKA-13672) def testThreadPoolResize(): Unit = { From 7df84c2dc7bafb8d49215130484152d89e3260de Mon Sep 17 00:00:00 2001 From: Liam Clarke-Hutchinson Date: Sat, 19 Mar 2022 16:00:12 +1300 Subject: [PATCH 18/18] Check for CME Luke flagged --- .../scala/kafka/network/SocketServer.scala | 12 +- .../kafka/server/DynamicBrokerConfig.scala | 129 +++++++++++------- .../DynamicBrokerReconfigurationTest.scala | 12 +- 3 files changed, 92 insertions(+), 61 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index a7f950d834397..d15cd9b234ecb 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -268,7 +268,7 @@ class SocketServer(val config: KafkaConfig, config.addReconfigurable(dataPlaneAcceptor) dataPlaneAcceptor.configure(parsedConfigs) dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor) - println(s"Created data-plane acceptor and processors for endpoint : ${endpoint.listenerName}") +// println(s"Created data-plane acceptor and processors for endpoint : ${endpoint.listenerName}") } } @@ -589,18 +589,18 @@ class DataPlaneAcceptor(socketServer: SocketServer, override def reconfigure(configs: util.Map[String, _]): Unit = { val brokerId = configs.get(KafkaConfig.BrokerIdProp).asInstanceOf[Int] val newNumNetworkThreads = configs.get(KafkaConfig.NumNetworkThreadsProp).asInstanceOf[Int] - val printf = newNumNetworkThreads == 2 +// val printf = newNumNetworkThreads == 2 - if (printf) println(s"DataAcceptor $listenerName on $brokerId - newThreads: $newNumNetworkThreads, currentProcessors: ${processors.length}, thread: ${Thread.currentThread()}") +// if (printf) println(s"DataAcceptor $listenerName on $brokerId - newThreads: $newNumNetworkThreads, currentProcessors: ${processors.length}, thread: ${Thread.currentThread()}") if (newNumNetworkThreads != processors.length) { - if (printf) println(s"DataAcceptor $listenerName on $brokerId - Resizing network thread pool size for ${endPoint.listenerName} listener from ${processors.length} to $newNumNetworkThreads") +// if (printf) println(s"DataAcceptor $listenerName on $brokerId - Resizing network thread pool size for ${endPoint.listenerName} listener from ${processors.length} to $newNumNetworkThreads") if (newNumNetworkThreads > processors.length) { - addProcessors(newNumNetworkThreads - processors.length, brokerId, printf) + addProcessors(newNumNetworkThreads - processors.length, brokerId, false) } else if (newNumNetworkThreads < processors.length) { removeProcessors(processors.length - newNumNetworkThreads) } } - if (printf) println(s"DataAcceptor $listenerName on $brokerId reconfigure complete") +// if (printf) println(s"DataAcceptor $listenerName on $brokerId reconfigure complete") } /** diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index fa2c9e360c929..b7f89b8aecf85 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -35,6 +35,7 @@ import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable} import org.apache.kafka.common.security.authenticator.LoginManager import org.apache.kafka.common.utils.{ConfigUtils, Utils} +import java.util.concurrent.atomic.AtomicBoolean import scala.annotation.nowarn import scala.collection._ import scala.collection.mutable.ArrayBuffer @@ -198,38 +199,53 @@ object DynamicBrokerConfig { class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging { - class LoggingBuffer extends ArrayBuffer[Reconfigurable] { + class LoggingBuffer[T](val name: String) extends ArrayBuffer[T] { + + private val iteratorInUse = new AtomicBoolean(false) override def clear(): Unit = { - println("Reconfigurables: cleared") +// println(s"Buffer $name cleared") super.clear() } - override def addOne(elem: Reconfigurable): this.type = { -// elem match { -// case x: DataPlaneAcceptor => println(s"Reconfigurables: Broker ${x.config.brokerId}, Adding data plane acceptor for ${x.listenerName()}") -// case _ => -// } - + override def addOne(elem: T): this.type = { + checkForModificationWhileIterating(elem) super.addOne(elem) } - override def subtractOne(elem: Reconfigurable): LoggingBuffer.this.type = { - elem match { - case x: DataPlaneAcceptor => println(s"Reconfigurables: Broker ${x.config.brokerId}, Removing data plane acceptor for ${x.listenerName()}") - case _ => - } + override def subtractOne(elem: T): LoggingBuffer.this.type = { + checkForModificationWhileIterating(elem) super.subtractOne(elem) } + private def checkForModificationWhileIterating(elem: T): Unit = { + if (!iteratorInUse.get()) return + + val st = Thread.currentThread().getStackTrace.mkString("\n") + println(s"Buffer $name modfied during iteration, element: $elem, st: $st") + } + override def iterator: Iterator[T] = { + val iter = super.iterator +// iteratorInUse.set(true) +// new Iterator[T] { +// override def hasNext: Boolean = { +// val next = iter.hasNext +// if (!next) iteratorInUse.set(false) +// next +// } +// +// override def next(): T = iter.next() +// } + iter + } } private[server] val staticBrokerConfigs = ConfigDef.convertToStringMapWithPasswordValues(kafkaConfig.originalsFromThisConfig).asScala private[server] val staticDefaultConfigs = ConfigDef.convertToStringMapWithPasswordValues(KafkaConfig.defaultValues.asJava).asScala private val dynamicBrokerConfigs = mutable.Map[String, String]() private val dynamicDefaultConfigs = mutable.Map[String, String]() - private val reconfigurables: mutable.Buffer[Reconfigurable] = new LoggingBuffer() - private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]() + private val reconfigurables = new LoggingBuffer[Reconfigurable]("reconfigurables") + private val brokerReconfigurables = new LoggingBuffer[BrokerReconfigurable]("brokerReconfigurables") private val lock = new ReentrantReadWriteLock private var currentConfig: KafkaConfig = null private val dynamicConfigPasswordEncoder = maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret) @@ -563,23 +579,38 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging try { val customConfigs = new util.HashMap[String, Object](newConfig.originalsFromThisConfig) // non-Kafka configs newConfig.valuesFromThisConfig.keySet.forEach(customConfigs.remove(_)) - reconfigurables.foreach { - case listenerReconfigurable: ListenerReconfigurable => - processListenerReconfigurable(listenerReconfigurable, newConfig, customConfigs, validateOnly, reloadOnly = false) - case reconfigurable => - if (needsReconfiguration(reconfigurable.reconfigurableConfigs, changeMap.keySet, deletedKeySet)) - processReconfigurable(reconfigurable, changeMap.keySet, newConfig.valuesFromThisConfig, customConfigs, validateOnly) + try { + reconfigurables.foreach { + case listenerReconfigurable: ListenerReconfigurable => + processListenerReconfigurable(listenerReconfigurable, newConfig, customConfigs, validateOnly, reloadOnly = false) + case reconfigurable => + if (needsReconfiguration(reconfigurable.reconfigurableConfigs, changeMap.keySet, deletedKeySet)) + processReconfigurable(reconfigurable, changeMap.keySet, newConfig.valuesFromThisConfig, customConfigs, validateOnly) + } + } catch { + case e: Exception => { + println(s"Exception on reconfigurables, $e") + throw e + } } // BrokerReconfigurable updates are processed after config is updated. Only do the validation here. val brokerReconfigurablesToUpdate = mutable.Buffer[BrokerReconfigurable]() - brokerReconfigurables.foreach { reconfigurable => - if (needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava, changeMap.keySet, deletedKeySet)) { - reconfigurable.validateReconfiguration(newConfig) - if (!validateOnly) - brokerReconfigurablesToUpdate += reconfigurable + + try { + brokerReconfigurables.foreach { reconfigurable => + if (needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava, changeMap.keySet, deletedKeySet)) { + reconfigurable.validateReconfiguration(newConfig) + if (!validateOnly) + brokerReconfigurablesToUpdate += reconfigurable + } } + } catch { + case e: Exception => { + println(s"Exception on brokerReconfigurables, $e") + throw e } + } (newConfig, brokerReconfigurablesToUpdate.toList) } catch { case e: Exception => @@ -604,9 +635,9 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging validateOnly: Boolean, reloadOnly: Boolean): Unit = { val listenerName = listenerReconfigurable.listenerName - val brokerId = newConfig.brokerId +// val brokerId = newConfig.brokerId - val printf = newConfig.numNetworkThreads == 2 +// val printf = newConfig.numNetworkThreads == 2 val oldValues = currentConfig.valuesWithPrefixOverride(listenerName.configPrefix) val newValues = newConfig.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix) val (changeMap, deletedKeys) = updatedConfigs(newValues, oldValues) @@ -615,11 +646,11 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging // if `reloadOnly`, reconfigure if configs haven't changed. Otherwise reconfigure if configs have changed if (reloadOnly != configsChanged) { - if (printf) { - val stackTrace = "nop" //util.Arrays.toString(Thread.currentThread().getStackTrace.asInstanceOf[Array[Object]]) - println(s"processListenerReconfigurable brokerId, $brokerId, listenerName: $listenerName, thread: ${Thread.currentThread()}, stack: $stackTrace") - println(s"Updated keys: $updatedKeys, ${changeMap.get("num.network.threads")}") - } +// if (printf) { +// val stackTrace = "nop" //util.Arrays.toString(Thread.currentThread().getStackTrace.asInstanceOf[Array[Object]]) +// println(s"processListenerReconfigurable brokerId, $brokerId, listenerName: $listenerName, thread: ${Thread.currentThread()}, stack: $stackTrace") +// println(s"Updated keys: $updatedKeys, ${changeMap.get("num.network.threads")}") +// } processReconfigurable(listenerReconfigurable, updatedKeys, newValues, customConfigs, validateOnly) } } @@ -630,23 +661,23 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging newCustomConfigs: util.Map[String, Object], validateOnly: Boolean): Unit = { - var lr: ListenerReconfigurable = null - val printf = reconfigurable match { - case _: ListenerReconfigurable => - allNewConfigs.get("num.network.threads").asInstanceOf[Int] == 2 - case _ => false - } - - if (printf) lr = reconfigurable.asInstanceOf[ListenerReconfigurable] - if (lr != null) println(s"processReconfigurable for ${lr.listenerName()}") +// var lr: ListenerReconfigurable = null +// val printf = reconfigurable match { +// case _: ListenerReconfigurable => +// allNewConfigs.get("num.network.threads").asInstanceOf[Int] == 2 +// case _ => false +// } +// +// if (printf) lr = reconfigurable.asInstanceOf[ListenerReconfigurable] +// if (lr != null) println(s"processReconfigurable for ${lr.listenerName()}") val newConfigs = new util.HashMap[String, Object] allNewConfigs.forEach { (k, v) => newConfigs.put(k, v.asInstanceOf[AnyRef]) } newConfigs.putAll(newCustomConfigs) try { - if (lr != null) println(s"validateReconfig for ${lr.listenerName()}, validateOnly = $validateOnly") +// if (lr != null) println(s"validateReconfig for ${lr.listenerName()}, validateOnly = $validateOnly") reconfigurable.validateReconfiguration(newConfigs) - if (lr != null) println(s"validateReconfig success for ${lr.listenerName()}") +// if (lr != null) println(s"validateReconfig success for ${lr.listenerName()}") } catch { case e: ConfigException => throw e case _: Exception => @@ -654,13 +685,13 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging } if (!validateOnly) { - if (printf && lr != null) { - println(s"Reconfiguring ${lr.listenerName()}, updated configs: $updatedConfigNames " + - s"custom configs: ${ConfigUtils.configMapToRedactedString(newCustomConfigs, KafkaConfig.configDef)}") - } +// if (printf && lr != null) { +// println(s"Reconfiguring ${lr.listenerName()}, updated configs: $updatedConfigNames " + +// s"custom configs: ${ConfigUtils.configMapToRedactedString(newCustomConfigs, KafkaConfig.configDef)}") +// } reconfigurable.reconfigure(newConfigs) } else { - if (lr != null) println(s"${lr.listenerName()} was validatedOnly") +// if (lr != null) println(s"${lr.listenerName()} was validatedOnly") } } } diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 499c0d07dbb6a..b1545339d11f5 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -794,21 +794,21 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup servers.foreach { server => TestUtils.retry(maxWaitMs) { val value = server.config.originals.get(propName) if (propName == "num.network.threads") { - println(s"Using maxWaitMs of $maxWaitMs") +// println(s"Using maxWaitMs of $maxWaitMs") // Check if using admin.describeConfigs returns expected value. // Limiting this to num.network.threads as it is where the CI is breaking currently - val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, "") - val config = adminClients.head.describeConfigs(List(brokerResource).asJava).values().get(brokerResource).get() - val adminValue = if (config.get(propName) != null) config.get(propName).value() else "null" - println(s"Server: ${server.config.brokerId}, prop: $propName, expected: $newSize, serverConf: ${ server.config.get(propName) } serverOriginalsConf: $value, adminConf: $adminValue") +// val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, "") +// val config = adminClients.head.describeConfigs(List(brokerResource).asJava).values().get(brokerResource).get() +// val adminValue = if (config.get(propName) != null) config.get(propName).value() else "null" +// println(s"Server: ${server.config.brokerId}, prop: $propName, expected: $newSize, serverConf: ${ server.config.get(propName) } serverOriginalsConf: $value, adminConf: $adminValue") } assertEquals(newSize.toString, value) } } - // reconfigureServers(props, perBrokerConfig = false, (propName, newSize.toString)) + reconfigureServers(props, perBrokerConfig = false, (propName, newSize.toString)) maybeVerifyThreadPoolSize(propName, newSize, threadPrefix) }