Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
2bb436e
Good ol println debugging for test that is flakey in CI context
LiamClarkeNZ Mar 1, 2022
5f49892
Merge branch 'trunk' into KAFKA-13672-flaky-test
LiamClarkeNZ Mar 1, 2022
f27dd07
Double timeout on flaky test, to see if it makes a difference
LiamClarkeNZ Mar 1, 2022
f20d7ea
Set timeout even higher to try to eliminate slow processes as cause o…
LiamClarkeNZ Mar 1, 2022
7ce6b8e
Explore if querying config via adminClient produces a different result
LiamClarkeNZ Mar 1, 2022
dded546
More println()
LiamClarkeNZ Mar 2, 2022
a4bfda7
Extend println() debugging into SocketServer
LiamClarkeNZ Mar 2, 2022
eba6a73
Capture listener name
LiamClarkeNZ Mar 2, 2022
0e64fc6
Test failure appears to be because only Internal listener is reconfig…
LiamClarkeNZ Mar 3, 2022
6d2b44b
Merge branch 'trunk' into KAFKA-13672-flaky-test
LiamClarkeNZ Mar 6, 2022
6dac9d0
Continue trying to ascertain why external listener not being reconfig…
LiamClarkeNZ Mar 7, 2022
b782b6e
Try to be more selective...
LiamClarkeNZ Mar 7, 2022
09ec441
Tweak
LiamClarkeNZ Mar 8, 2022
b54f4d5
Touch a file to try to get the PR build to run
LiamClarkeNZ Mar 9, 2022
bb664cb
See if validation is the issue
LiamClarkeNZ Mar 9, 2022
fc10bfb
More println, more!
LiamClarkeNZ Mar 14, 2022
1aa4050
Re-enable logging buffer of reconfigurables in DynamicBrokerConfig
LiamClarkeNZ Mar 15, 2022
6fa2d24
Fix NPE
LiamClarkeNZ Mar 16, 2022
74f1bb6
Merge branch 'trunk' into KAFKA-13672-flaky-test
LiamClarkeNZ Mar 16, 2022
55cd50f
Re-enable the flaky test I'm working on (I guess that the CI pipeline…
LiamClarkeNZ Mar 16, 2022
7df84c2
Check for CME Luke flagged
LiamClarkeNZ Mar 19, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 30 additions & 18 deletions core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -268,15 +268,16 @@ class SocketServer(val config: KafkaConfig,
config.addReconfigurable(dataPlaneAcceptor)
dataPlaneAcceptor.configure(parsedConfigs)
dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
info(s"Created data-plane acceptor and processors for endpoint : ${endpoint.listenerName}")
// println(s"Created data-plane acceptor and processors for endpoint : ${endpoint.listenerName}")
}
}

private def createControlPlaneAcceptorAndProcessor(endpointOpt: Option[EndPoint]): Unit = {
endpointOpt.foreach { endpoint =>
connectionQuotas.addListener(config, endpoint.listenerName)
val controlPlaneAcceptor = createControlPlaneAcceptor(endpoint, controlPlaneRequestChannelOpt.get)
controlPlaneAcceptor.addProcessors(1)
val brokerId = controlPlaneAcceptor.config.brokerId
controlPlaneAcceptor.addProcessors(1, brokerId)
controlPlaneAcceptorOpt = Some(controlPlaneAcceptor)
info(s"Created control-plane acceptor and processor for endpoint : ${endpoint.listenerName}")
}
Expand Down Expand Up @@ -556,21 +557,26 @@ class DataPlaneAcceptor(socketServer: SocketServer,
* the AlterConfigs response.
*/
override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
configs.forEach { (k, v) =>
if (reconfigurableConfigs.contains(k)) {
val newValue = v.asInstanceOf[Int]
val oldValue = processors.length
if (newValue != oldValue) {
val errorMsg = s"Dynamic thread count update validation failed for $k=$v"
if (newValue <= 0)
throw new ConfigException(s"$errorMsg, value should be at least 1")
if (newValue < oldValue / 2)
throw new ConfigException(s"$errorMsg, value should be at least half the current value $oldValue")
if (newValue > oldValue * 2)
throw new ConfigException(s"$errorMsg, value should not be greater than double the current value $oldValue")
try {
configs.forEach { (k, v) =>
if (reconfigurableConfigs.contains(k)) {
val newValue = v.asInstanceOf[Int]
val oldValue = processors.length
if (newValue != oldValue) {
val errorMsg = s"Dynamic thread count update validation failed for $k=$v"
if (newValue <= 0)
throw new ConfigException(s"$errorMsg, value should be at least 1")
if (newValue < oldValue / 2)
throw new ConfigException(s"$errorMsg, value should be at least half the current value $oldValue")
if (newValue > oldValue * 2)
throw new ConfigException(s"$errorMsg, value should not be greater than double the current value $oldValue")
}
}
}
}
catch {
case e: ConfigException => println(s"DataAcceptor invalidConf: $e"); throw e
}
}

/**
Expand All @@ -581,16 +587,20 @@ class DataPlaneAcceptor(socketServer: SocketServer,
* the configs have passed validation using {@link #validateReconfiguration ( Map )}.
*/
override def reconfigure(configs: util.Map[String, _]): Unit = {
val brokerId = configs.get(KafkaConfig.BrokerIdProp).asInstanceOf[Int]
val newNumNetworkThreads = configs.get(KafkaConfig.NumNetworkThreadsProp).asInstanceOf[Int]
// val printf = newNumNetworkThreads == 2

// if (printf) println(s"DataAcceptor $listenerName on $brokerId - newThreads: $newNumNetworkThreads, currentProcessors: ${processors.length}, thread: ${Thread.currentThread()}")
if (newNumNetworkThreads != processors.length) {
info(s"Resizing network thread pool size for ${endPoint.listenerName} listener from ${processors.length} to $newNumNetworkThreads")
// if (printf) println(s"DataAcceptor $listenerName on $brokerId - Resizing network thread pool size for ${endPoint.listenerName} listener from ${processors.length} to $newNumNetworkThreads")
if (newNumNetworkThreads > processors.length) {
addProcessors(newNumNetworkThreads - processors.length)
addProcessors(newNumNetworkThreads - processors.length, brokerId, false)
} else if (newNumNetworkThreads < processors.length) {
removeProcessors(processors.length - newNumNetworkThreads)
}
}
// if (printf) println(s"DataAcceptor $listenerName on $brokerId reconfigure complete")
}

/**
Expand Down Expand Up @@ -885,17 +895,19 @@ private[kafka] abstract class Acceptor(val socketServer: SocketServer,
@Override
def wakeup(): Unit = nioSelector.wakeup()

def addProcessors(toCreate: Int): Unit = {
def addProcessors(toCreate: Int, brokerId: Int = -1, printf: Boolean = false): Unit = {
val listenerName = endPoint.listenerName
val securityProtocol = endPoint.securityProtocol
val listenerProcessors = new ArrayBuffer[Processor]()

if (printf) println(s"DataAcceptor $listenerName on $brokerId - addProcessors adding $toCreate")
for (_ <- 0 until toCreate) {
val processor = newProcessor(socketServer.nextProcessorId(), listenerName, securityProtocol)
listenerProcessors += processor
requestChannel.addProcessor(processor)
if (printf) println(s"DataAcceptor $listenerName on $brokerId - addProcessors added processor ${processor.id}")
}

if (printf) println(listenerProcessors)
processors ++= listenerProcessors
if (processorsStarted.get)
startProcessors(listenerProcessors)
Expand Down
117 changes: 101 additions & 16 deletions core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
import org.apache.kafka.common.security.authenticator.LoginManager
import org.apache.kafka.common.utils.{ConfigUtils, Utils}

import java.util.concurrent.atomic.AtomicBoolean
import scala.annotation.nowarn
import scala.collection._
import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._

/**
Expand Down Expand Up @@ -197,12 +199,53 @@ object DynamicBrokerConfig {

class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging {

class LoggingBuffer[T](val name: String) extends ArrayBuffer[T] {

private val iteratorInUse = new AtomicBoolean(false)
override def clear(): Unit = {
// println(s"Buffer $name cleared")
super.clear()
}

override def addOne(elem: T): this.type = {
checkForModificationWhileIterating(elem)
super.addOne(elem)
}

override def subtractOne(elem: T): LoggingBuffer.this.type = {
checkForModificationWhileIterating(elem)
super.subtractOne(elem)
}

private def checkForModificationWhileIterating(elem: T): Unit = {
if (!iteratorInUse.get()) return

val st = Thread.currentThread().getStackTrace.mkString("\n")
println(s"Buffer $name modfied during iteration, element: $elem, st: $st")
}

override def iterator: Iterator[T] = {
val iter = super.iterator
// iteratorInUse.set(true)
// new Iterator[T] {
// override def hasNext: Boolean = {
// val next = iter.hasNext
// if (!next) iteratorInUse.set(false)
// next
// }
//
// override def next(): T = iter.next()
// }
iter
}
}

private[server] val staticBrokerConfigs = ConfigDef.convertToStringMapWithPasswordValues(kafkaConfig.originalsFromThisConfig).asScala
private[server] val staticDefaultConfigs = ConfigDef.convertToStringMapWithPasswordValues(KafkaConfig.defaultValues.asJava).asScala
private val dynamicBrokerConfigs = mutable.Map[String, String]()
private val dynamicDefaultConfigs = mutable.Map[String, String]()
private val reconfigurables = mutable.Buffer[Reconfigurable]()
private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]()
private val reconfigurables = new LoggingBuffer[Reconfigurable]("reconfigurables")
private val brokerReconfigurables = new LoggingBuffer[BrokerReconfigurable]("brokerReconfigurables")
private val lock = new ReentrantReadWriteLock
private var currentConfig: KafkaConfig = null
private val dynamicConfigPasswordEncoder = maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
Expand Down Expand Up @@ -536,23 +579,38 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
try {
val customConfigs = new util.HashMap[String, Object](newConfig.originalsFromThisConfig) // non-Kafka configs
newConfig.valuesFromThisConfig.keySet.forEach(customConfigs.remove(_))
reconfigurables.foreach {
case listenerReconfigurable: ListenerReconfigurable =>
processListenerReconfigurable(listenerReconfigurable, newConfig, customConfigs, validateOnly, reloadOnly = false)
case reconfigurable =>
if (needsReconfiguration(reconfigurable.reconfigurableConfigs, changeMap.keySet, deletedKeySet))
processReconfigurable(reconfigurable, changeMap.keySet, newConfig.valuesFromThisConfig, customConfigs, validateOnly)
try {
reconfigurables.foreach {
case listenerReconfigurable: ListenerReconfigurable =>
processListenerReconfigurable(listenerReconfigurable, newConfig, customConfigs, validateOnly, reloadOnly = false)
case reconfigurable =>
if (needsReconfiguration(reconfigurable.reconfigurableConfigs, changeMap.keySet, deletedKeySet))
processReconfigurable(reconfigurable, changeMap.keySet, newConfig.valuesFromThisConfig, customConfigs, validateOnly)
}
} catch {
case e: Exception => {
println(s"Exception on reconfigurables, $e")
throw e
}
}

// BrokerReconfigurable updates are processed after config is updated. Only do the validation here.
val brokerReconfigurablesToUpdate = mutable.Buffer[BrokerReconfigurable]()
brokerReconfigurables.foreach { reconfigurable =>
if (needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava, changeMap.keySet, deletedKeySet)) {
reconfigurable.validateReconfiguration(newConfig)
if (!validateOnly)
brokerReconfigurablesToUpdate += reconfigurable

try {
brokerReconfigurables.foreach { reconfigurable =>
if (needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava, changeMap.keySet, deletedKeySet)) {
reconfigurable.validateReconfiguration(newConfig)
if (!validateOnly)
brokerReconfigurablesToUpdate += reconfigurable
}
}
} catch {
case e: Exception => {
println(s"Exception on brokerReconfigurables, $e")
throw e
}
}
(newConfig, brokerReconfigurablesToUpdate.toList)
} catch {
case e: Exception =>
Expand All @@ -577,36 +635,63 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging
validateOnly: Boolean,
reloadOnly: Boolean): Unit = {
val listenerName = listenerReconfigurable.listenerName
// val brokerId = newConfig.brokerId

// val printf = newConfig.numNetworkThreads == 2
val oldValues = currentConfig.valuesWithPrefixOverride(listenerName.configPrefix)
val newValues = newConfig.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix)
val (changeMap, deletedKeys) = updatedConfigs(newValues, oldValues)
val updatedKeys = changeMap.keySet
val configsChanged = needsReconfiguration(listenerReconfigurable.reconfigurableConfigs, updatedKeys, deletedKeys)

// if `reloadOnly`, reconfigure if configs haven't changed. Otherwise reconfigure if configs have changed
if (reloadOnly != configsChanged)
if (reloadOnly != configsChanged) {
// if (printf) {
// val stackTrace = "nop" //util.Arrays.toString(Thread.currentThread().getStackTrace.asInstanceOf[Array[Object]])
// println(s"processListenerReconfigurable brokerId, $brokerId, listenerName: $listenerName, thread: ${Thread.currentThread()}, stack: $stackTrace")
// println(s"Updated keys: $updatedKeys, ${changeMap.get("num.network.threads")}")
// }
processReconfigurable(listenerReconfigurable, updatedKeys, newValues, customConfigs, validateOnly)
}
}

private def processReconfigurable(reconfigurable: Reconfigurable,
updatedConfigNames: Set[String],
allNewConfigs: util.Map[String, _],
newCustomConfigs: util.Map[String, Object],
validateOnly: Boolean): Unit = {

// var lr: ListenerReconfigurable = null
// val printf = reconfigurable match {
// case _: ListenerReconfigurable =>
// allNewConfigs.get("num.network.threads").asInstanceOf[Int] == 2
// case _ => false
// }
//
// if (printf) lr = reconfigurable.asInstanceOf[ListenerReconfigurable]
// if (lr != null) println(s"processReconfigurable for ${lr.listenerName()}")

val newConfigs = new util.HashMap[String, Object]
allNewConfigs.forEach { (k, v) => newConfigs.put(k, v.asInstanceOf[AnyRef]) }
newConfigs.putAll(newCustomConfigs)
try {
// if (lr != null) println(s"validateReconfig for ${lr.listenerName()}, validateOnly = $validateOnly")
reconfigurable.validateReconfiguration(newConfigs)
// if (lr != null) println(s"validateReconfig success for ${lr.listenerName()}")
} catch {
case e: ConfigException => throw e
case _: Exception =>
throw new ConfigException(s"Validation of dynamic config update of $updatedConfigNames failed with class ${reconfigurable.getClass}")
}

if (!validateOnly) {
info(s"Reconfiguring $reconfigurable, updated configs: $updatedConfigNames " +
s"custom configs: ${ConfigUtils.configMapToRedactedString(newCustomConfigs, KafkaConfig.configDef)}")
// if (printf && lr != null) {
// println(s"Reconfiguring ${lr.listenerName()}, updated configs: $updatedConfigNames " +
// s"custom configs: ${ConfigUtils.configMapToRedactedString(newCustomConfigs, KafkaConfig.configDef)}")
// }
reconfigurable.reconfigure(newConfigs)
} else {
// if (lr != null) println(s"${lr.listenerName()} was validatedOnly")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import java.time.Duration
import java.util
import java.util.{Collections, Properties}
import java.util.concurrent._

import javax.management.ObjectName
import com.yammer.metrics.core.MetricName
import kafka.admin.ConfigCommand
Expand All @@ -36,7 +35,6 @@ import kafka.log.{CleanerConfig, LogConfig}
import kafka.message.ProducerCompressionCodec
import kafka.metrics.KafkaYammerMetrics
import kafka.network.{Processor, RequestChannel}
import kafka.server.QuorumTestHarness
import kafka.utils._
import kafka.utils.Implicits._
import kafka.zk.ConfigEntityChangeNotificationZNode
Expand Down Expand Up @@ -99,11 +97,17 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
private val sslProperties2 = TestUtils.sslConfigs(Mode.SERVER, clientCert = false, Some(trustStoreFile2), "kafka")
private val invalidSslProperties = invalidSslConfigs

private var originalLogLevels: Map[String, String] = null

def addExtraProps(props: Properties): Unit = {
}

@BeforeEach
override def setUp(testInfo: TestInfo): Unit = {
// Eliminate log output while println debugging
originalLogLevels = Map.from(Log4jController.loggers)
Log4jController.loggers.foreach(ln => Log4jController.logLevel(ln._1, "FATAL"))

startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism)))
super.setUp(testInfo)

Expand Down Expand Up @@ -164,6 +168,9 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
TestUtils.shutdownServers(servers)
super.tearDown()
closeSasl()

// Restore log levels for any other tests running in this process
originalLogLevels.foreach(ln => Log4jController.logLevel(ln._1, ln._2))
}

@Test
Expand Down Expand Up @@ -728,8 +735,10 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
}

@Test
@Disabled // TODO: To be re-enabled once we can make it less flaky (KAFKA-13672)
// @Disabled // TODO: To be re-enabled once we can make it less flaky (KAFKA-13672)
def testThreadPoolResize(): Unit = {


val requestHandlerPrefix = "data-plane-kafka-request-handler-"
val networkThreadPrefix = "data-plane-kafka-network-thread-"
val fetcherThreadPrefix = "ReplicaFetcherThread-"
Expand Down Expand Up @@ -773,8 +782,32 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
}

def resizeThreadPool(propName: String, newSize: Int, threadPrefix: String): Unit = {
val props = new Properties
val props = new Properties()
props.put(propName, newSize.toString)


// Very temporary copy and paste hack to observe behaviour in CI via the magic println for this test only
alterConfigs(servers, adminClients.head, props, perBrokerConfig = false).all().get()

// Double the maxWaitMs as used in waitForConfigOnServer
val maxWaitMs = 20000
servers.foreach { server => TestUtils.retry(maxWaitMs) {
val value = server.config.originals.get(propName)
if (propName == "num.network.threads") {
// println(s"Using maxWaitMs of $maxWaitMs")
// Check if using admin.describeConfigs returns expected value.
// Limiting this to num.network.threads as it is where the CI is breaking currently

// val brokerResource = new ConfigResource(ConfigResource.Type.BROKER, "")
// val config = adminClients.head.describeConfigs(List(brokerResource).asJava).values().get(brokerResource).get()
// val adminValue = if (config.get(propName) != null) config.get(propName).value() else "null"
// println(s"Server: ${server.config.brokerId}, prop: $propName, expected: $newSize, serverConf: ${ server.config.get(propName) } serverOriginalsConf: $value, adminConf: $adminValue")
}

assertEquals(newSize.toString, value)
}
}

reconfigureServers(props, perBrokerConfig = false, (propName, newSize.toString))
maybeVerifyThreadPoolSize(propName, newSize, threadPrefix)
}
Expand All @@ -795,6 +828,8 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup
maybeVerifyThreadPoolSize(propName, threadPoolSize, threadPrefix)
}



val config = servers.head.config
verifyThreadPoolResize(KafkaConfig.NumIoThreadsProp, config.numIoThreads,
requestHandlerPrefix, mayReceiveDuplicates = false)
Expand Down