diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 88dfa15b3f5c9..d15cd9b234ecb 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -268,7 +268,7 @@ class SocketServer(val config: KafkaConfig, config.addReconfigurable(dataPlaneAcceptor) dataPlaneAcceptor.configure(parsedConfigs) dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor) - info(s"Created data-plane acceptor and processors for endpoint : ${endpoint.listenerName}") +// println(s"Created data-plane acceptor and processors for endpoint : ${endpoint.listenerName}") } } @@ -276,7 +276,8 @@ class SocketServer(val config: KafkaConfig, endpointOpt.foreach { endpoint => connectionQuotas.addListener(config, endpoint.listenerName) val controlPlaneAcceptor = createControlPlaneAcceptor(endpoint, controlPlaneRequestChannelOpt.get) - controlPlaneAcceptor.addProcessors(1) + val brokerId = controlPlaneAcceptor.config.brokerId + controlPlaneAcceptor.addProcessors(1, brokerId) controlPlaneAcceptorOpt = Some(controlPlaneAcceptor) info(s"Created control-plane acceptor and processor for endpoint : ${endpoint.listenerName}") } @@ -556,21 +557,26 @@ class DataPlaneAcceptor(socketServer: SocketServer, * the AlterConfigs response. */ override def validateReconfiguration(configs: util.Map[String, _]): Unit = { - configs.forEach { (k, v) => - if (reconfigurableConfigs.contains(k)) { - val newValue = v.asInstanceOf[Int] - val oldValue = processors.length - if (newValue != oldValue) { - val errorMsg = s"Dynamic thread count update validation failed for $k=$v" - if (newValue <= 0) - throw new ConfigException(s"$errorMsg, value should be at least 1") - if (newValue < oldValue / 2) - throw new ConfigException(s"$errorMsg, value should be at least half the current value $oldValue") - if (newValue > oldValue * 2) - throw new ConfigException(s"$errorMsg, value should not be greater than double the current value $oldValue") + try { + configs.forEach { (k, v) => + if (reconfigurableConfigs.contains(k)) { + val newValue = v.asInstanceOf[Int] + val oldValue = processors.length + if (newValue != oldValue) { + val errorMsg = s"Dynamic thread count update validation failed for $k=$v" + if (newValue <= 0) + throw new ConfigException(s"$errorMsg, value should be at least 1") + if (newValue < oldValue / 2) + throw new ConfigException(s"$errorMsg, value should be at least half the current value $oldValue") + if (newValue > oldValue * 2) + throw new ConfigException(s"$errorMsg, value should not be greater than double the current value $oldValue") + } } } } + catch { + case e: ConfigException => println(s"DataAcceptor invalidConf: $e"); throw e + } } /** @@ -581,16 +587,20 @@ class DataPlaneAcceptor(socketServer: SocketServer, * the configs have passed validation using {@link #validateReconfiguration ( Map )}. */ override def reconfigure(configs: util.Map[String, _]): Unit = { + val brokerId = configs.get(KafkaConfig.BrokerIdProp).asInstanceOf[Int] val newNumNetworkThreads = configs.get(KafkaConfig.NumNetworkThreadsProp).asInstanceOf[Int] +// val printf = newNumNetworkThreads == 2 +// if (printf) println(s"DataAcceptor $listenerName on $brokerId - newThreads: $newNumNetworkThreads, currentProcessors: ${processors.length}, thread: ${Thread.currentThread()}") if (newNumNetworkThreads != processors.length) { - info(s"Resizing network thread pool size for ${endPoint.listenerName} listener from ${processors.length} to $newNumNetworkThreads") +// if (printf) println(s"DataAcceptor $listenerName on $brokerId - Resizing network thread pool size for ${endPoint.listenerName} listener from ${processors.length} to $newNumNetworkThreads") if (newNumNetworkThreads > processors.length) { - addProcessors(newNumNetworkThreads - processors.length) + addProcessors(newNumNetworkThreads - processors.length, brokerId, false) } else if (newNumNetworkThreads < processors.length) { removeProcessors(processors.length - newNumNetworkThreads) } } +// if (printf) println(s"DataAcceptor $listenerName on $brokerId reconfigure complete") } /** @@ -885,17 +895,19 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer, @Override def wakeup(): Unit = nioSelector.wakeup() - def addProcessors(toCreate: Int): Unit = { + def addProcessors(toCreate: Int, brokerId: Int = -1, printf: Boolean = false): Unit = { val listenerName = endPoint.listenerName val securityProtocol = endPoint.securityProtocol val listenerProcessors = new ArrayBuffer[Processor]() - + if (printf) println(s"DataAcceptor $listenerName on $brokerId - addProcessors adding $toCreate") for (_ <- 0 until toCreate) { val processor = newProcessor(socketServer.nextProcessorId(), listenerName, securityProtocol) listenerProcessors += processor requestChannel.addProcessor(processor) + if (printf) println(s"DataAcceptor $listenerName on $brokerId - addProcessors added processor ${processor.id}") } + if (printf) println(listenerProcessors) processors ++= listenerProcessors if (processorsStarted.get) startProcessors(listenerProcessors) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index cb6cd84d3b630..b7f89b8aecf85 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -35,8 +35,10 @@ import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable} import org.apache.kafka.common.security.authenticator.LoginManager import org.apache.kafka.common.utils.{ConfigUtils, Utils} +import java.util.concurrent.atomic.AtomicBoolean import scala.annotation.nowarn import scala.collection._ +import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ /** @@ -197,12 +199,53 @@ object DynamicBrokerConfig { class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging { + class LoggingBuffer[T](val name: String) extends ArrayBuffer[T] { + + private val iteratorInUse = new AtomicBoolean(false) + override def clear(): Unit = { +// println(s"Buffer $name cleared") + super.clear() + } + + override def addOne(elem: T): this.type = { + checkForModificationWhileIterating(elem) + super.addOne(elem) + } + + override def subtractOne(elem: T): LoggingBuffer.this.type = { + checkForModificationWhileIterating(elem) + super.subtractOne(elem) + } + + private def checkForModificationWhileIterating(elem: T): Unit = { + if (!iteratorInUse.get()) return + + val st = Thread.currentThread().getStackTrace.mkString("\n") + println(s"Buffer $name modfied during iteration, element: $elem, st: $st") + } + + override def iterator: Iterator[T] = { + val iter = super.iterator +// iteratorInUse.set(true) +// new Iterator[T] { +// override def hasNext: Boolean = { +// val next = iter.hasNext +// if (!next) iteratorInUse.set(false) +// next +// } +// +// override def next(): T = iter.next() +// } + iter + } + } + private[server] val staticBrokerConfigs = ConfigDef.convertToStringMapWithPasswordValues(kafkaConfig.originalsFromThisConfig).asScala private[server] val staticDefaultConfigs = ConfigDef.convertToStringMapWithPasswordValues(KafkaConfig.defaultValues.asJava).asScala private val dynamicBrokerConfigs = mutable.Map[String, String]() private val dynamicDefaultConfigs = mutable.Map[String, String]() - private val reconfigurables = mutable.Buffer[Reconfigurable]() - private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]() + private val reconfigurables = new LoggingBuffer[Reconfigurable]("reconfigurables") + private val brokerReconfigurables = new LoggingBuffer[BrokerReconfigurable]("brokerReconfigurables") private val lock = new ReentrantReadWriteLock private var currentConfig: KafkaConfig = null private val dynamicConfigPasswordEncoder = maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret) @@ -536,23 +579,38 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging try { val customConfigs = new util.HashMap[String, Object](newConfig.originalsFromThisConfig) // non-Kafka configs newConfig.valuesFromThisConfig.keySet.forEach(customConfigs.remove(_)) - reconfigurables.foreach { - case listenerReconfigurable: ListenerReconfigurable => - processListenerReconfigurable(listenerReconfigurable, newConfig, customConfigs, validateOnly, reloadOnly = false) - case reconfigurable => - if (needsReconfiguration(reconfigurable.reconfigurableConfigs, changeMap.keySet, deletedKeySet)) - processReconfigurable(reconfigurable, changeMap.keySet, newConfig.valuesFromThisConfig, customConfigs, validateOnly) + try { + reconfigurables.foreach { + case listenerReconfigurable: ListenerReconfigurable => + processListenerReconfigurable(listenerReconfigurable, newConfig, customConfigs, validateOnly, reloadOnly = false) + case reconfigurable => + if (needsReconfiguration(reconfigurable.reconfigurableConfigs, changeMap.keySet, deletedKeySet)) + processReconfigurable(reconfigurable, changeMap.keySet, newConfig.valuesFromThisConfig, customConfigs, validateOnly) + } + } catch { + case e: Exception => { + println(s"Exception on reconfigurables, $e") + throw e + } } // BrokerReconfigurable updates are processed after config is updated. Only do the validation here. val brokerReconfigurablesToUpdate = mutable.Buffer[BrokerReconfigurable]() - brokerReconfigurables.foreach { reconfigurable => - if (needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava, changeMap.keySet, deletedKeySet)) { - reconfigurable.validateReconfiguration(newConfig) - if (!validateOnly) - brokerReconfigurablesToUpdate += reconfigurable + + try { + brokerReconfigurables.foreach { reconfigurable => + if (needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava, changeMap.keySet, deletedKeySet)) { + reconfigurable.validateReconfiguration(newConfig) + if (!validateOnly) + brokerReconfigurablesToUpdate += reconfigurable + } } + } catch { + case e: Exception => { + println(s"Exception on brokerReconfigurables, $e") + throw e } + } (newConfig, brokerReconfigurablesToUpdate.toList) } catch { case e: Exception => @@ -577,14 +635,24 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging validateOnly: Boolean, reloadOnly: Boolean): Unit = { val listenerName = listenerReconfigurable.listenerName +// val brokerId = newConfig.brokerId + +// val printf = newConfig.numNetworkThreads == 2 val oldValues = currentConfig.valuesWithPrefixOverride(listenerName.configPrefix) val newValues = newConfig.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix) val (changeMap, deletedKeys) = updatedConfigs(newValues, oldValues) val updatedKeys = changeMap.keySet val configsChanged = needsReconfiguration(listenerReconfigurable.reconfigurableConfigs, updatedKeys, deletedKeys) + // if `reloadOnly`, reconfigure if configs haven't changed. Otherwise reconfigure if configs have changed - if (reloadOnly != configsChanged) + if (reloadOnly != configsChanged) { +// if (printf) { +// val stackTrace = "nop" //util.Arrays.toString(Thread.currentThread().getStackTrace.asInstanceOf[Array[Object]]) +// println(s"processListenerReconfigurable brokerId, $brokerId, listenerName: $listenerName, thread: ${Thread.currentThread()}, stack: $stackTrace") +// println(s"Updated keys: $updatedKeys, ${changeMap.get("num.network.threads")}") +// } processReconfigurable(listenerReconfigurable, updatedKeys, newValues, customConfigs, validateOnly) + } } private def processReconfigurable(reconfigurable: Reconfigurable, @@ -592,11 +660,24 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging allNewConfigs: util.Map[String, _], newCustomConfigs: util.Map[String, Object], validateOnly: Boolean): Unit = { + +// var lr: ListenerReconfigurable = null +// val printf = reconfigurable match { +// case _: ListenerReconfigurable => +// allNewConfigs.get("num.network.threads").asInstanceOf[Int] == 2 +// case _ => false +// } +// +// if (printf) lr = reconfigurable.asInstanceOf[ListenerReconfigurable] +// if (lr != null) println(s"processReconfigurable for ${lr.listenerName()}") + val newConfigs = new util.HashMap[String, Object] allNewConfigs.forEach { (k, v) => newConfigs.put(k, v.asInstanceOf[AnyRef]) } newConfigs.putAll(newCustomConfigs) try { +// if (lr != null) println(s"validateReconfig for ${lr.listenerName()}, validateOnly = $validateOnly") reconfigurable.validateReconfiguration(newConfigs) +// if (lr != null) println(s"validateReconfig success for ${lr.listenerName()}") } catch { case e: ConfigException => throw e case _: Exception => @@ -604,9 +685,13 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging } if (!validateOnly) { - info(s"Reconfiguring $reconfigurable, updated configs: $updatedConfigNames " + - s"custom configs: ${ConfigUtils.configMapToRedactedString(newCustomConfigs, KafkaConfig.configDef)}") +// if (printf && lr != null) { +// println(s"Reconfiguring ${lr.listenerName()}, updated configs: $updatedConfigNames " + +// s"custom configs: ${ConfigUtils.configMapToRedactedString(newCustomConfigs, KafkaConfig.configDef)}") +// } reconfigurable.reconfigure(newConfigs) + } else { +// if (lr != null) println(s"${lr.listenerName()} was validatedOnly") } } } diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 1d64106f096fb..b1545339d11f5 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -26,7 +26,6 @@ import java.time.Duration import java.util import java.util.{Collections, Properties} import java.util.concurrent._ - import javax.management.ObjectName import com.yammer.metrics.core.MetricName import kafka.admin.ConfigCommand @@ -36,7 +35,6 @@ import kafka.log.{CleanerConfig, LogConfig} import kafka.message.ProducerCompressionCodec import kafka.metrics.KafkaYammerMetrics import kafka.network.{Processor, RequestChannel} -import kafka.server.QuorumTestHarness import kafka.utils._ import kafka.utils.Implicits._ import kafka.zk.ConfigEntityChangeNotificationZNode @@ -99,11 +97,17 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup private val sslProperties2 = TestUtils.sslConfigs(Mode.SERVER, clientCert = false, Some(trustStoreFile2), "kafka") private val invalidSslProperties = invalidSslConfigs + private var originalLogLevels: Map[String, String] = null + def addExtraProps(props: Properties): Unit = { } @BeforeEach override def setUp(testInfo: TestInfo): Unit = { + // Eliminate log output while println debugging + originalLogLevels = Map.from(Log4jController.loggers) + Log4jController.loggers.foreach(ln => Log4jController.logLevel(ln._1, "FATAL")) + startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism))) super.setUp(testInfo) @@ -164,6 +168,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup TestUtils.shutdownServers(servers) super.tearDown() closeSasl() + + // Restore log levels for any other tests running in this process + originalLogLevels.foreach(ln => Log4jController.logLevel(ln._1, ln._2)) } @Test @@ -728,8 +735,10 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup } @Test - @Disabled // TODO: To be re-enabled once we can make it less flaky (KAFKA-13672) + // @Disabled // TODO: To be re-enabled once we can make it less flaky (KAFKA-13672) def testThreadPoolResize(): Unit = { + + val requestHandlerPrefix = "data-plane-kafka-request-handler-" val networkThreadPrefix = "data-plane-kafka-network-thread-" val fetcherThreadPrefix = "ReplicaFetcherThread-" @@ -773,8 +782,32 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup } def resizeThreadPool(propName: String, newSize: Int, threadPrefix: String): Unit = { - val props = new Properties + val props = new Properties() props.put(propName, newSize.toString) + + + // Very temporary copy and paste hack to observe behaviour in CI via the magic println for this test only + alterConfigs(servers, adminClients.head, props, perBrokerConfig = false).all().get() + + // Double the maxWaitMs as used in waitForConfigOnServer + val maxWaitMs = 20000 + servers.foreach { server => TestUtils.retry(maxWaitMs) { + val value = server.config.originals.get(propName) + if (propName == "num.network.threads") { +// println(s"Using maxWaitMs of $maxWaitMs") + // Check if using admin.describeConfigs returns expected value. + // Limiting this to num.network.threads as it is where the CI is breaking currently + +// val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, "") +// val config = adminClients.head.describeConfigs(List(brokerResource).asJava).values().get(brokerResource).get() +// val adminValue = if (config.get(propName) != null) config.get(propName).value() else "null" +// println(s"Server: ${server.config.brokerId}, prop: $propName, expected: $newSize, serverConf: ${ server.config.get(propName) } serverOriginalsConf: $value, adminConf: $adminValue") + } + + assertEquals(newSize.toString, value) + } + } + reconfigureServers(props, perBrokerConfig = false, (propName, newSize.toString)) maybeVerifyThreadPoolSize(propName, newSize, threadPrefix) } @@ -795,6 +828,8 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup maybeVerifyThreadPoolSize(propName, threadPoolSize, threadPrefix) } + + val config = servers.head.config verifyThreadPoolResize(KafkaConfig.NumIoThreadsProp, config.numIoThreads, requestHandlerPrefix, mayReceiveDuplicates = false)