Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,16 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
private def addNewBroker(broker: Broker) {
val messageQueue = new LinkedBlockingQueue[QueueItem]
debug(s"Controller ${config.brokerId} trying to connect to broker ${broker.id}")
val brokerNode = broker.node(config.interBrokerListenerName)
val controllerToBrokerListenerName = config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName)
val controllerToBrokerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol)
val brokerNode = broker.node(controllerToBrokerListenerName)
val logContext = new LogContext(s"[Controller id=${config.brokerId}, targetBrokerId=${brokerNode.idString}] ")
val networkClient = {
val channelBuilder = ChannelBuilders.clientChannelBuilder(
config.interBrokerSecurityProtocol,
controllerToBrokerSecurityProtocol,
JaasContext.Type.SERVER,
config,
config.interBrokerListenerName,
controllerToBrokerListenerName,
config.saslMechanismInterBrokerProtocol,
time,
config.saslInterBrokerHandshakeRequestEnable
Expand Down
13 changes: 8 additions & 5 deletions core/src/main/scala/kafka/network/RequestChannel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ object RequestChannel extends Logging {

val RequestQueueSizeMetric = "RequestQueueSize"
val ResponseQueueSizeMetric = "ResponseQueueSize"
val ControlPlaneMetricPrefix = "ControlPlane"
val ProcessorMetricTag = "processor"

def isRequestLoggingEnabled: Boolean = requestLogger.underlying.isDebugEnabled
Expand Down Expand Up @@ -272,17 +273,19 @@ object RequestChannel extends Logging {
}
}

class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
class RequestChannel(val queueSize: Int, val metricNamePrefix : String = "") extends KafkaMetricsGroup {
import RequestChannel._
val metrics = new RequestChannel.Metrics
private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
private val processors = new ConcurrentHashMap[Int, Processor]()
val requestQueueSizeMetricName = metricNamePrefix.concat(RequestQueueSizeMetric)
val responseQueueSizeMetricName = metricNamePrefix.concat(ResponseQueueSizeMetric)

newGauge(RequestQueueSizeMetric, new Gauge[Int] {
newGauge(requestQueueSizeMetricName, new Gauge[Int] {
def value = requestQueue.size
})

newGauge(ResponseQueueSizeMetric, new Gauge[Int]{
newGauge(responseQueueSizeMetricName, new Gauge[Int]{
def value = processors.values.asScala.foldLeft(0) {(total, processor) =>
total + processor.responseQueueSize
}
Expand All @@ -292,7 +295,7 @@ class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
if (processors.putIfAbsent(processor.id, processor) != null)
warn(s"Unexpected processor with processorId ${processor.id}")

newGauge(ResponseQueueSizeMetric,
newGauge(responseQueueSizeMetricName,
new Gauge[Int] {
def value = processor.responseQueueSize
},
Expand All @@ -302,7 +305,7 @@ class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {

def removeProcessor(processorId: Int): Unit = {
processors.remove(processorId)
removeMetric(ResponseQueueSizeMetric, Map(ProcessorMetricTag -> processorId.toString))
removeMetric(responseQueueSizeMetricName, Map(ProcessorMetricTag -> processorId.toString))
}

/** Send a request to be handled, potentially blocking until there is room in the queue for the request */
Expand Down
235 changes: 161 additions & 74 deletions core/src/main/scala/kafka/network/SocketServer.scala

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ class DynamicThreadPool(server: KafkaServer) extends BrokerReconfigurable {

override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
if (newConfig.numIoThreads != oldConfig.numIoThreads)
server.requestHandlerPool.resizeThreadPool(newConfig.numIoThreads)
server.dataPlaneRequestHandlerPool.resizeThreadPool(newConfig.numIoThreads)
if (newConfig.numNetworkThreads != oldConfig.numNetworkThreads)
server.socketServer.resizeThreadPool(oldConfig.numNetworkThreads, newConfig.numNetworkThreads)
if (newConfig.numReplicaFetchers != oldConfig.numReplicaFetchers)
Expand Down
63 changes: 60 additions & 3 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ object KafkaConfig {
val AdvertisedPortProp = "advertised.port"
val AdvertisedListenersProp = "advertised.listeners"
val ListenerSecurityProtocolMapProp = "listener.security.protocol.map"
val ControlPlaneListenerNameProp = "control.plane.listener.name"
val SocketSendBufferBytesProp = "socket.send.buffer.bytes"
val SocketReceiveBufferBytesProp = "socket.receive.buffer.bytes"
val SocketRequestMaxBytesProp = "socket.request.max.bytes"
Expand Down Expand Up @@ -503,7 +504,7 @@ object KafkaConfig {
val NumIoThreadsDoc = "The number of threads that the server uses for processing requests, which may include disk I/O"
val NumReplicaAlterLogDirsThreadsDoc = "The number of threads that can move replicas between log directories, which may include disk I/O"
val BackgroundThreadsDoc = "The number of threads to use for various background processing tasks"
val QueuedMaxRequestsDoc = "The number of queued requests allowed before blocking the network threads"
val QueuedMaxRequestsDoc = "The number of queued requests allowed for data-plane, before blocking the network threads"
val QueuedMaxRequestBytesDoc = "The number of queued bytes allowed before no more requests are read"
val RequestTimeoutMsDoc = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC
/************* Authorizer Configuration ***********/
Expand Down Expand Up @@ -546,6 +547,22 @@ object KafkaConfig {
"prefix (the listener name is lowercased) to the config name. For example, to set a different keystore for the " +
"INTERNAL listener, a config with name <code>listener.name.internal.ssl.keystore.location</code> would be set. " +
"If the config for the listener name is not set, the config will fallback to the generic config (i.e. <code>ssl.keystore.location</code>). "
val controlPlaneListenerNameDoc = "Name of listener used for communication between controller and brokers. " +
Comment thread
MayureshGharat marked this conversation as resolved.
Outdated
s"Broker will use the $ControlPlaneListenerNameProp to locate the endpoint in $ListenersProp list, to listen for connections from the controller. " +
"For example, if a broker's config is :\n" +
"listeners = INTERNAL://192.1.1.8:9092, EXTERNAL://10.1.1.5:9093, CONTROLLER://192.1.1.8:9094\n" +
"listener.security.protocol.map = INTERNAL:PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL\n" +
"control.plane.listener.name = CONTROLLER\n" +
"On startup, the broker will start listening on \"192.1.1.8:9094\" with security protocol \"SSL\".\n" +
s"On controller side, when it discovers a broker's published endpoints through zookeeper, it will use the $ControlPlaneListenerNameProp " +
"to find the endpoint, which it will use to establish connection to the broker.\n" +
"For example, if the broker's published endpoints on zookeeper are :\n" +
"\"endpoints\" : [\"INTERNAL://broker1.example.com:9092\",\"EXTERNAL://broker1.example.com:9093\",\"CONTROLLER://broker1.example.com:9094\"]\n" +
" and the controller's config is :\n" +
"listener.security.protocol.map = INTERNAL:PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL\n" +
"control.plane.listener.name = CONTROLLER\n" +
"then controller will use \"broker1.example.com:9094\" with security protocol \"SSL\" to connect to the broker.\n" +
"If not explicitly configured, the default value will be null and there will be no dedicated endpoints for controller connections."

val SocketSendBufferBytesDoc = "The SO_SNDBUF buffer of the socket sever sockets. If the value is -1, the OS default will be used."
val SocketReceiveBufferBytesDoc = "The SO_RCVBUF buffer of the socket sever sockets. If the value is -1, the OS default will be used."
Expand Down Expand Up @@ -846,6 +863,7 @@ object KafkaConfig {
.define(AdvertisedPortProp, INT, null, HIGH, AdvertisedPortDoc)
.define(AdvertisedListenersProp, STRING, null, HIGH, AdvertisedListenersDoc)
.define(ListenerSecurityProtocolMapProp, STRING, Defaults.ListenerSecurityProtocolMap, LOW, ListenerSecurityProtocolMapDoc)
.define(ControlPlaneListenerNameProp, STRING, null, HIGH, controlPlaneListenerNameDoc)
.define(SocketSendBufferBytesProp, INT, Defaults.SocketSendBufferBytes, HIGH, SocketSendBufferBytesDoc)
.define(SocketReceiveBufferBytesProp, INT, Defaults.SocketReceiveBufferBytes, HIGH, SocketReceiveBufferBytesDoc)
.define(SocketRequestMaxBytesProp, INT, Defaults.SocketRequestMaxBytes, atLeast(1), HIGH, SocketRequestMaxBytesDoc)
Expand Down Expand Up @@ -1265,6 +1283,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO

def interBrokerListenerName = getInterBrokerListenerNameAndSecurityProtocol._1
def interBrokerSecurityProtocol = getInterBrokerListenerNameAndSecurityProtocol._2
def controlPlaneListenerName = getControlPlaneListenerNameAndSecurityProtocol.map { case (listenerName, securityProtocol) => listenerName }
def controlPlaneSecurityProtocol = getControlPlaneListenerNameAndSecurityProtocol.map { case (listenerName, securityProtocol) => securityProtocol }
def saslMechanismInterBrokerProtocol = getString(KafkaConfig.SaslMechanismInterBrokerProtocolProp)
val saslInterBrokerHandshakeRequestEnable = interBrokerProtocolVersion >= KAFKA_0_10_0_IV1

Expand Down Expand Up @@ -1337,6 +1357,19 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
}.getOrElse(CoreUtils.listenerListToEndPoints("PLAINTEXT://" + hostName + ":" + port, listenerSecurityProtocolMap))
}

def controlPlaneListener: Option[EndPoint] = {
controlPlaneListenerName.map { listenerName =>
listeners.filter(endpoint => endpoint.listenerName.value() == listenerName.value()).head
}
}

def dataPlaneListeners: Seq[EndPoint] = {
Option(getString(KafkaConfig.ControlPlaneListenerNameProp)) match {
case Some(controlPlaneListenerName) => listeners.filterNot(_.listenerName.value() == controlPlaneListenerName)
case None => listeners
}
}

// If the user defined advertised listeners, we use those
// If he didn't but did define advertised host or port, we'll use those and fill in the missing value from regular host / port or defaults
// If none of these are defined, we'll use the listeners
Expand Down Expand Up @@ -1368,6 +1401,19 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
}
}

private def getControlPlaneListenerNameAndSecurityProtocol: Option[(ListenerName, SecurityProtocol)] = {
Option(getString(KafkaConfig.ControlPlaneListenerNameProp)) match {
case Some(name) =>
val listenerName = ListenerName.normalised(name)
val securityProtocol = listenerSecurityProtocolMap.getOrElse(listenerName,
throw new ConfigException(s"Listener with ${listenerName.value} defined in " +
s"${KafkaConfig.ControlPlaneListenerNameProp} not found in ${KafkaConfig.ListenerSecurityProtocolMapProp}."))
Some(listenerName, securityProtocol)

case None => None
}
}

private def getSecurityProtocol(protocolName: String, configName: String): SecurityProtocol = {
try SecurityProtocol.forName(protocolName)
catch {
Expand Down Expand Up @@ -1419,6 +1465,17 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
s"${KafkaConfig.AdvertisedListenersProp} cannot use the nonroutable meta-address 0.0.0.0. "+
s"Use a routable IP address.")

// validate controller.listener.name config
if (controlPlaneListenerName.isDefined) {
require(advertisedListenerNames.contains(controlPlaneListenerName.get),
s"${KafkaConfig.ControlPlaneListenerNameProp} must be a listener name defined in ${KafkaConfig.AdvertisedListenersProp}. " +
s"The valid options based on currently configured listeners are ${advertisedListenerNames.map(_.value).mkString(",")}")
// controlPlaneListenerName should be different from interBrokerListenerName
require(!controlPlaneListenerName.get.value().equals(interBrokerListenerName.value()),
s"${KafkaConfig.ControlPlaneListenerNameProp}, when defined, should have a different value from the inter broker listener name. " +
s"Currently they both have the value ${controlPlaneListenerName.get}")
}

val recordVersion = logMessageFormatVersion.recordVersion
require(interBrokerProtocolVersion.recordVersion.value >= recordVersion.value,
s"log.message.format.version $logMessageFormatVersionString can only be used when inter.broker.protocol.version " +
Expand All @@ -1443,7 +1500,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
if (connectionsMaxIdleMs >= 0)
require(failedAuthenticationDelayMs < connectionsMaxIdleMs,
s"${KafkaConfig.FailedAuthenticationDelayMsProp}=$failedAuthenticationDelayMs should always be less than" +
s" ${KafkaConfig.ConnectionsMaxIdleMsProp}=$connectionsMaxIdleMs to prevent failed" +
" authentication responses from timing out")
s" ${KafkaConfig.ConnectionsMaxIdleMsProp}=$connectionsMaxIdleMs to prevent failed" +
s" authentication responses from timing out")
}
}
10 changes: 6 additions & 4 deletions core/src/main/scala/kafka/server/KafkaRequestHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,21 +96,23 @@ class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: KafkaApis,
time: Time,
numThreads: Int) extends Logging with KafkaMetricsGroup {
numThreads: Int,
requestHandlerAvgIdleMetricName: String,
logAndThreadNamePrefix : String) extends Logging with KafkaMetricsGroup {

private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
/* a meter to track the average free capacity of the request handlers */
private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
private val aggregateIdleMeter = newMeter(requestHandlerAvgIdleMetricName, "percent", TimeUnit.NANOSECONDS)

this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
this.logIdent = "[" + logAndThreadNamePrefix + " Kafka Request Handler on Broker " + brokerId + "], "
val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
for (i <- 0 until numThreads) {
createHandler(i)
}

def createHandler(id: Int): Unit = synchronized {
runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
KafkaThread.daemon("kafka-request-handler-" + id, runnables(id)).start()
KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()
}

def resizeThreadPool(newSize: Int): Unit = synchronized {
Expand Down
38 changes: 27 additions & 11 deletions core/src/main/scala/kafka/server/KafkaServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP

val brokerState: BrokerState = new BrokerState

var apis: KafkaApis = null
var dataPlaneRequestProcessor: KafkaApis = null
var controlPlaneRequestProcessor: KafkaApis = null

var authorizer: Option[Authorizer] = None
var socketServer: SocketServer = null
var requestHandlerPool: KafkaRequestHandlerPool = null
var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = null

var logDirFailureChannel: LogDirFailureChannel = null
var logManager: LogManager = null
Expand Down Expand Up @@ -291,12 +294,21 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))

/* start processing requests */
apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager)

requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
config.numIoThreads)
dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
config.numIoThreads, "RequestHandlerAvgIdlePercent", socketServer.DataPlanePrefix)

config.controlPlaneListener.foreach { _ =>
controlPlaneRequestProcessor = new KafkaApis(socketServer.controlPlaneRequestChannelOpt.get, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager)

controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
1, "ControlPlaneRequestHandlerAvgIdlePercent", socketServer.ControlPlanePrefix)
}

Mx4jLoader.maybeLoad()

Expand All @@ -313,7 +325,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
dynamicConfigManager.startup()

socketServer.startProcessors()
socketServer.startDataPlaneProcessors()
socketServer.startControlPlaneProcessor()
brokerState.newState(RunningAsBroker)
shutdownLatch = new CountDownLatch(1)
startupComplete.set(true)
Expand Down Expand Up @@ -579,14 +592,17 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
// Socket server will be shutdown towards the end of the sequence.
if (socketServer != null)
CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
if (requestHandlerPool != null)
CoreUtils.swallow(requestHandlerPool.shutdown(), this)

if (dataPlaneRequestHandlerPool != null)
CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this)
if (controlPlaneRequestHandlerPool != null)
CoreUtils.swallow(controlPlaneRequestHandlerPool.shutdown(), this)
if (kafkaScheduler != null)
CoreUtils.swallow(kafkaScheduler.shutdown(), this)

if (apis != null)
CoreUtils.swallow(apis.close(), this)
if (dataPlaneRequestProcessor != null)
CoreUtils.swallow(dataPlaneRequestProcessor.close(), this)
if (controlPlaneRequestProcessor != null)
CoreUtils.swallow(controlPlaneRequestProcessor.close(), this)
CoreUtils.swallow(authorizer.foreach(_.close()), this)
if (adminManager != null)
CoreUtils.swallow(adminManager.shutdown(), this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,9 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
client = AdminClient.create(createConfig())
val nodes = client.describeCluster.nodes.get()
val clusterId = client.describeCluster().clusterId().get()
assertEquals(servers.head.apis.clusterId, clusterId)
assertEquals(servers.head.dataPlaneRequestProcessor.clusterId, clusterId)
val controller = client.describeCluster().controller().get()
assertEquals(servers.head.apis.metadataCache.getControllerId.
assertEquals(servers.head.dataPlaneRequestProcessor.metadataCache.getControllerId.
getOrElse(MetadataResponse.NO_CONTROLLER_ID), controller.id())
val brokers = brokerList.split(",")
assertEquals(brokers.size, nodes.size)
Expand Down
Loading