diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index b5c6a910b0af9..083e95258228f 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -107,14 +107,16 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
private def addNewBroker(broker: Broker) {
val messageQueue = new LinkedBlockingQueue[QueueItem]
debug(s"Controller ${config.brokerId} trying to connect to broker ${broker.id}")
- val brokerNode = broker.node(config.interBrokerListenerName)
+ val controllerToBrokerListenerName = config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName)
+ val controllerToBrokerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol)
+ val brokerNode = broker.node(controllerToBrokerListenerName)
val logContext = new LogContext(s"[Controller id=${config.brokerId}, targetBrokerId=${brokerNode.idString}] ")
val networkClient = {
val channelBuilder = ChannelBuilders.clientChannelBuilder(
- config.interBrokerSecurityProtocol,
+ controllerToBrokerSecurityProtocol,
JaasContext.Type.SERVER,
config,
- config.interBrokerListenerName,
+ controllerToBrokerListenerName,
config.saslMechanismInterBrokerProtocol,
time,
config.saslInterBrokerHandshakeRequestEnable
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 00b09688c5b2c..988c14f263e75 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -41,6 +41,7 @@ object RequestChannel extends Logging {
val RequestQueueSizeMetric = "RequestQueueSize"
val ResponseQueueSizeMetric = "ResponseQueueSize"
+ val ControlPlaneMetricPrefix = "ControlPlane"
val ProcessorMetricTag = "processor"
def isRequestLoggingEnabled: Boolean = requestLogger.underlying.isDebugEnabled
@@ -272,17 +273,19 @@ object RequestChannel extends Logging {
}
}
-class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
+class RequestChannel(val queueSize: Int, val metricNamePrefix : String = "") extends KafkaMetricsGroup {
import RequestChannel._
val metrics = new RequestChannel.Metrics
private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
private val processors = new ConcurrentHashMap[Int, Processor]()
+ val requestQueueSizeMetricName = metricNamePrefix.concat(RequestQueueSizeMetric)
+ val responseQueueSizeMetricName = metricNamePrefix.concat(ResponseQueueSizeMetric)
- newGauge(RequestQueueSizeMetric, new Gauge[Int] {
+ newGauge(requestQueueSizeMetricName, new Gauge[Int] {
def value = requestQueue.size
})
- newGauge(ResponseQueueSizeMetric, new Gauge[Int]{
+ newGauge(responseQueueSizeMetricName, new Gauge[Int]{
def value = processors.values.asScala.foldLeft(0) {(total, processor) =>
total + processor.responseQueueSize
}
@@ -292,7 +295,7 @@ class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
if (processors.putIfAbsent(processor.id, processor) != null)
warn(s"Unexpected processor with processorId ${processor.id}")
- newGauge(ResponseQueueSizeMetric,
+ newGauge(responseQueueSizeMetricName,
new Gauge[Int] {
def value = processor.responseQueueSize
},
@@ -302,7 +305,7 @@ class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
def removeProcessor(processorId: Int): Unit = {
processors.remove(processorId)
- removeMetric(ResponseQueueSizeMetric, Map(ProcessorMetricTag -> processorId.toString))
+ removeMetric(responseQueueSizeMetricName, Map(ProcessorMetricTag -> processorId.toString))
}
/** Send a request to be handled, potentially blocking until there is room in the queue for the request */
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index ae09a03b7dd8d..125efdc373d18 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -51,13 +51,28 @@ import scala.collection.mutable.{ArrayBuffer, Buffer}
import scala.util.control.ControlThrowable
/**
- * An NIO socket server. The threading model is
- * 1 Acceptor thread that handles new connections
- * Acceptor has N Processor threads that each have their own selector and read requests from sockets
- * M Handler threads that handle requests and produce responses back to the processor threads for writing.
+ * Handles new connections, requests and responses to and from broker.
+ * Kafka supports two types of request planes :
+ * - data-plane :
+ * - Handles requests from clients and other brokers in the cluster.
+ * - The threading model is
+ * 1 Acceptor thread per listener, that handles new connections.
+ * It is possible to configure multiple data-planes by specifying multiple "," separated endpoints for "listeners" in KafkaConfig.
+ * Acceptor has N Processor threads that each have their own selector and read requests from sockets
+ * M Handler threads that handle requests and produce responses back to the processor threads for writing.
+ * - control-plane :
+ * - Handles requests from controller. This is optional and can be configured by specifying "control.plane.listener.name".
+ * If not configured, the controller requests are handled by the data-plane.
+ * - The threading model is
+ * 1 Acceptor thread that handles new connections
+ * Acceptor has 1 Processor thread that has its own selector and read requests from the socket.
+ * 1 Handler thread that handles requests and produce responses back to the processor thread for writing.
*/
class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time, val credentialProvider: CredentialProvider) extends Logging with KafkaMetricsGroup {
+ val DataPlanePrefix = "data-plane"
+ val ControlPlanePrefix = "control-plane"
+
private val maxQueuedRequests = config.queuedMaxRequests
private val logContext = new LogContext(s"[SocketServer brokerId=${config.brokerId}] ")
@@ -68,11 +83,16 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal", "socket-server-metrics")
memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
- val requestChannel = new RequestChannel(maxQueuedRequests)
- private val processors = new ConcurrentHashMap[Int, Processor]()
- private var nextProcessorId = 0
+ // data-plane
+ private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
+ private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
+ val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests)
+ // control-plane
+ private var controlPlaneProcessorOpt : Option[Processor] = None
+ private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
+ val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ => new RequestChannel(20, RequestChannel.ControlPlaneMetricPrefix))
- private[network] val acceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
+ private var nextProcessorId = 0
private var connectionQuotas: ConnectionQuotas = _
private var stoppedProcessingRequests = false
@@ -91,9 +111,11 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
def startup(startupProcessors: Boolean = true) {
this.synchronized {
connectionQuotas = new ConnectionQuotas(config.maxConnectionsPerIp, config.maxConnectionsPerIpOverrides)
- createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)
+ createControlPlaneAcceptorAndProcessor(config.controlPlaneListener)
+ createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners)
if (startupProcessors) {
- startProcessors()
+ startControlPlaneProcessor()
+ startDataPlaneProcessors()
}
}
@@ -101,12 +123,25 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
new Gauge[Double] {
def value = SocketServer.this.synchronized {
- val ioWaitRatioMetricNames = processors.values.asScala.map { p =>
+ val ioWaitRatioMetricNames = dataPlaneProcessors.values.asScala.map { p =>
metrics.metricName("io-wait-ratio", "socket-server-metrics", p.metricTags)
}
ioWaitRatioMetricNames.map { metricName =>
Option(metrics.metric(metricName)).fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0))
- }.sum / processors.size
+ }.sum / dataPlaneProcessors.size
+ }
+ }
+ )
+ newGauge("ControlPlaneNetworkProcessorAvgIdlePercent",
+ new Gauge[Double] {
+
+ def value = SocketServer.this.synchronized {
+ val ioWaitRatioMetricName = controlPlaneProcessorOpt.map { p =>
+ metrics.metricName("io-wait-ratio", "socket-server-metrics", p.metricTags)
+ }
+ ioWaitRatioMetricName.map { metricName =>
+ Option(metrics.metric(metricName)).fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0))
+ }.getOrElse(Double.NaN)
}
}
)
@@ -124,7 +159,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
new Gauge[Double] {
def value = SocketServer.this.synchronized {
- val expiredConnectionsKilledCountMetricNames = processors.values.asScala.map { p =>
+ val expiredConnectionsKilledCountMetricNames = dataPlaneProcessors.values.asScala.map { p =>
metrics.metricName("expired-connections-killed-count", "socket-server-metrics", p.metricTags)
}
expiredConnectionsKilledCountMetricNames.map { metricName =>
@@ -133,96 +168,148 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
}
}
)
- info(s"Started ${acceptors.size} acceptor threads")
+ newGauge("ControlPlaneExpiredConnectionsKilledCount",
+ new Gauge[Double] {
+
+ def value = SocketServer.this.synchronized {
+ val expiredConnectionsKilledCountMetricNames = controlPlaneProcessorOpt.map { p =>
+ metrics.metricName("expired-connections-killed-count", "socket-server-metrics", p.metricTags)
+ }
+ expiredConnectionsKilledCountMetricNames.map { metricName =>
+ Option(metrics.metric(metricName)).fold(0.0)(m => m.metricValue.asInstanceOf[Double])
+ }.getOrElse(0.0)
+ }
+ }
+ )
+ info(s"Started ${dataPlaneAcceptors.size} acceptor threads for data-plane")
+ if (controlPlaneAcceptorOpt.isDefined)
+ info("Started control-plane acceptor thread")
}
/**
- * Starts processors of all the acceptors of this server if they have not already been started.
- * This method is used for delayed starting of processors if [[kafka.network.SocketServer#startup]]
+ * Starts processors of all the data-plane acceptors of this server if they have not already been started.
+ * This method is used for delayed starting of data-plane processors if [[kafka.network.SocketServer#startup]]
* was invoked with `startupProcessors=false`.
*/
- def startProcessors(): Unit = synchronized {
- acceptors.values.asScala.foreach { _.startProcessors() }
- info(s"Started processors for ${acceptors.size} acceptors")
+ def startDataPlaneProcessors(): Unit = synchronized {
+ dataPlaneAcceptors.values.asScala.foreach { _.startProcessors(DataPlanePrefix) }
+ info(s"Started data-plane processors for ${dataPlaneAcceptors.size} acceptors")
+ }
+
+ /**
+ * Start the processor of control-plane acceptor of this server if it has not already been started.
+ * This method is used for delayed starting of control-plane processor if [[kafka.network.SocketServer#startup]]
+ * was invoked with `startupProcessors=false`.
+ */
+ def startControlPlaneProcessor(): Unit = synchronized {
+ if (controlPlaneAcceptorOpt.isDefined) {
+ controlPlaneAcceptorOpt.get.startProcessors(ControlPlanePrefix)
+ info(s"Started control-plane processor for the control-plane acceptor")
+ }
}
private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap
- private def createAcceptorAndProcessors(processorsPerListener: Int,
- endpoints: Seq[EndPoint]): Unit = synchronized {
+ private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int,
+ endpoints: Seq[EndPoint]): Unit = synchronized {
+ endpoints.foreach { endpoint =>
+ val dataPlaneAcceptor = createAcceptor(endpoint)
+ addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)
+ KafkaThread.nonDaemon(s"data-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", dataPlaneAcceptor).start()
+ dataPlaneAcceptor.awaitStartup()
+ dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
+ info(s"Created data-plane acceptor and processors for endpoint : $endpoint")
+ }
+ }
+
+ private def createControlPlaneAcceptorAndProcessor(endpointOpt: Option[EndPoint]): Unit = synchronized {
+ endpointOpt.foreach { endpoint =>
+ val controlPlaneAcceptor = createAcceptor(endpoint)
+ val controlPlaneProcessor = newProcessor(nextProcessorId, controlPlaneRequestChannelOpt.get, connectionQuotas, endpoint.listenerName, endpoint.securityProtocol, memoryPool)
+ controlPlaneAcceptorOpt = Some(controlPlaneAcceptor)
+ controlPlaneProcessorOpt = Some(controlPlaneProcessor)
+ val listenerProcessors = new ArrayBuffer[Processor]()
+ listenerProcessors += controlPlaneProcessor
+ controlPlaneRequestChannelOpt.foreach(_.addProcessor(controlPlaneProcessor))
+ nextProcessorId += 1
+ controlPlaneAcceptor.addProcessors(listenerProcessors, ControlPlanePrefix)
+ KafkaThread.nonDaemon(s"control-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", controlPlaneAcceptor).start()
+ controlPlaneAcceptor.awaitStartup()
+ info(s"Created control-plane acceptor and processor for endpoint : $endpoint")
+ }
+ }
+ private def createAcceptor(endPoint: EndPoint) : Acceptor = synchronized {
val sendBufferSize = config.socketSendBufferBytes
val recvBufferSize = config.socketReceiveBufferBytes
val brokerId = config.brokerId
-
- endpoints.foreach { endpoint =>
- val listenerName = endpoint.listenerName
- val securityProtocol = endpoint.securityProtocol
-
- val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas)
- addProcessors(acceptor, endpoint, processorsPerListener)
- KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()
- acceptor.awaitStartup()
- acceptors.put(endpoint, acceptor)
- }
+ new Acceptor(endPoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas)
}
- private def addProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = synchronized {
+ private def addDataPlaneProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = synchronized {
val listenerName = endpoint.listenerName
val securityProtocol = endpoint.securityProtocol
val listenerProcessors = new ArrayBuffer[Processor]()
-
for (_ <- 0 until newProcessorsPerListener) {
- val processor = newProcessor(nextProcessorId, connectionQuotas, listenerName, securityProtocol, memoryPool)
+ val processor = newProcessor(nextProcessorId, dataPlaneRequestChannel, connectionQuotas, listenerName, securityProtocol, memoryPool)
listenerProcessors += processor
- requestChannel.addProcessor(processor)
+ dataPlaneRequestChannel.addProcessor(processor)
nextProcessorId += 1
}
- listenerProcessors.foreach(p => processors.put(p.id, p))
- acceptor.addProcessors(listenerProcessors)
+ listenerProcessors.foreach(p => dataPlaneProcessors.put(p.id, p))
+ acceptor.addProcessors(listenerProcessors, DataPlanePrefix)
}
/**
- * Stop processing requests and new connections.
- */
+ * Stop processing requests and new connections.
+ */
def stopProcessingRequests() = {
info("Stopping socket server request processors")
this.synchronized {
- acceptors.asScala.values.foreach(_.shutdown())
- processors.asScala.values.foreach(_.shutdown())
- requestChannel.clear()
+ dataPlaneAcceptors.asScala.values.foreach(_.shutdown())
+ controlPlaneAcceptorOpt.foreach(_.shutdown())
+ dataPlaneProcessors.asScala.values.foreach(_.shutdown())
+ controlPlaneProcessorOpt.foreach(_.shutdown())
+ dataPlaneRequestChannel.clear()
+ controlPlaneRequestChannelOpt.foreach(_.clear())
stoppedProcessingRequests = true
}
info("Stopped socket server request processors")
}
def resizeThreadPool(oldNumNetworkThreads: Int, newNumNetworkThreads: Int): Unit = synchronized {
- info(s"Resizing network thread pool size for each listener from $oldNumNetworkThreads to $newNumNetworkThreads")
+ info(s"Resizing network thread pool size for each data-plane listener from $oldNumNetworkThreads to $newNumNetworkThreads")
if (newNumNetworkThreads > oldNumNetworkThreads) {
- acceptors.asScala.foreach { case (endpoint, acceptor) =>
- addProcessors(acceptor, endpoint, newNumNetworkThreads - oldNumNetworkThreads)
+ dataPlaneAcceptors.asScala.foreach { case (endpoint, acceptor) =>
+ addDataPlaneProcessors(acceptor, endpoint, newNumNetworkThreads - oldNumNetworkThreads)
}
} else if (newNumNetworkThreads < oldNumNetworkThreads)
- acceptors.asScala.values.foreach(_.removeProcessors(oldNumNetworkThreads - newNumNetworkThreads, requestChannel))
+ dataPlaneAcceptors.asScala.values.foreach(_.removeProcessors(oldNumNetworkThreads - newNumNetworkThreads, dataPlaneRequestChannel))
}
/**
- * Shutdown the socket server. If still processing requests, shutdown
- * acceptors and processors first.
- */
+ * Shutdown the socket server. If still processing requests, shutdown
+ * acceptors and processors first.
+ */
def shutdown() = {
info("Shutting down socket server")
this.synchronized {
if (!stoppedProcessingRequests)
stopProcessingRequests()
- requestChannel.shutdown()
+ dataPlaneRequestChannel.shutdown()
+ controlPlaneRequestChannelOpt.foreach(_.shutdown())
}
info("Shutdown completed")
}
def boundPort(listenerName: ListenerName): Int = {
try {
- acceptors.get(endpoints(listenerName)).serverChannel.socket.getLocalPort
+ val acceptor = dataPlaneAcceptors.get(endpoints(listenerName))
+ if (acceptor != null) {
+ acceptor.serverChannel.socket.getLocalPort
+ } else {
+ controlPlaneAcceptorOpt.map (_.serverChannel.socket().getLocalPort).getOrElse(throw new KafkaException("Could not find listenerName : " + listenerName + " in data-plane or control-plane"))
+ }
} catch {
case e: Exception =>
throw new KafkaException("Tried to check server's port before server was started or checked for port of non-existing protocol", e)
@@ -230,15 +317,15 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
}
def addListeners(listenersAdded: Seq[EndPoint]): Unit = synchronized {
- info(s"Adding listeners for endpoints $listenersAdded")
- createAcceptorAndProcessors(config.numNetworkThreads, listenersAdded)
- startProcessors()
+ info(s"Adding data-plane listeners for endpoints $listenersAdded")
+ createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, listenersAdded)
+ startDataPlaneProcessors()
}
def removeListeners(listenersRemoved: Seq[EndPoint]): Unit = synchronized {
- info(s"Removing listeners for endpoints $listenersRemoved")
+ info(s"Removing data-plane listeners for endpoints $listenersRemoved")
listenersRemoved.foreach { endpoint =>
- acceptors.asScala.remove(endpoint).foreach(_.shutdown())
+ dataPlaneAcceptors.asScala.remove(endpoint).foreach(_.shutdown())
}
}
@@ -252,8 +339,8 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
connectionQuotas.updateMaxConnectionsPerIpOverride(maxConnectionsPerIpOverrides)
}
- /* `protected` for test usage */
- protected[network] def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
+ // `protected` for test usage
+ protected[network] def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
securityProtocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
new Processor(id,
time,
@@ -272,12 +359,12 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
)
}
- /* For test usage */
+ // For test usage
private[network] def connectionCount(address: InetAddress): Int =
Option(connectionQuotas).fold(0)(_.get(address))
- /* For test usage */
- private[network] def processor(index: Int): Processor = processors.get(index)
+ // For test usage
+ private[network] def dataPlaneProcessor(index: Int): Processor = dataPlaneProcessors.get(index)
}
@@ -357,21 +444,21 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
private val processors = new ArrayBuffer[Processor]()
private val processorsStarted = new AtomicBoolean
- private[network] def addProcessors(newProcessors: Buffer[Processor]): Unit = synchronized {
+ private[network] def addProcessors(newProcessors: Buffer[Processor], processorThreadPrefix: String): Unit = synchronized {
processors ++= newProcessors
if (processorsStarted.get)
- startProcessors(newProcessors)
+ startProcessors(newProcessors, processorThreadPrefix)
}
- private[network] def startProcessors(): Unit = synchronized {
+ private[network] def startProcessors(processorThreadPrefix: String): Unit = synchronized {
if (!processorsStarted.getAndSet(true)) {
- startProcessors(processors)
+ startProcessors(processors, processorThreadPrefix)
}
}
- private def startProcessors(processors: Seq[Processor]): Unit = synchronized {
+ private def startProcessors(processors: Seq[Processor], processorThreadPrefix: String): Unit = synchronized {
processors.foreach { processor =>
- KafkaThread.nonDaemon(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
+ KafkaThread.nonDaemon(processorThreadPrefix + s"-kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
processor).start()
}
}
@@ -444,9 +531,9 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
}
}
- /*
- * Create a server socket to listen for connections on.
- */
+ /**
+ * Create a server socket to listen for connections on.
+ */
private def openServerSocket(host: String, port: Int): ServerSocketChannel = {
val socketAddress =
if (host == null || host.trim.isEmpty)
@@ -468,7 +555,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
serverChannel
}
- /*
+ /**
* Accept a new connection
*/
def accept(key: SelectionKey, processor: Processor) {
@@ -564,7 +651,7 @@ private[kafka] class Processor(val id: Int,
// also includes the listener name)
Map(NetworkProcessorMetricTag -> id.toString)
)
-
+
val expiredConnectionsKilledCount = new Total()
private val expiredConnectionsKilledCountMetricName = metrics.metricName("expired-connections-killed-count", "socket-server-metrics", metricTags)
metrics.addMetric(expiredConnectionsKilledCountMetricName, expiredConnectionsKilledCount)
@@ -688,7 +775,7 @@ private[kafka] class Processor(val id: Int,
}
}
- /* `protected` for test usage */
+ // `protected` for test usage
protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) {
val connectionId = response.request.context.connectionId
trace(s"Socket server received response to send to $connectionId, registering for write and sending data: $response")
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 2c0f6c1b52c2f..1c5657263c8d9 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -640,7 +640,7 @@ class DynamicThreadPool(server: KafkaServer) extends BrokerReconfigurable {
override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
if (newConfig.numIoThreads != oldConfig.numIoThreads)
- server.requestHandlerPool.resizeThreadPool(newConfig.numIoThreads)
+ server.dataPlaneRequestHandlerPool.resizeThreadPool(newConfig.numIoThreads)
if (newConfig.numNetworkThreads != oldConfig.numNetworkThreads)
server.socketServer.resizeThreadPool(oldConfig.numNetworkThreads, newConfig.numNetworkThreads)
if (newConfig.numReplicaFetchers != oldConfig.numReplicaFetchers)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 13f555a20594c..c3c3295835cfb 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -286,6 +286,7 @@ object KafkaConfig {
val AdvertisedPortProp = "advertised.port"
val AdvertisedListenersProp = "advertised.listeners"
val ListenerSecurityProtocolMapProp = "listener.security.protocol.map"
+ val ControlPlaneListenerNameProp = "control.plane.listener.name"
val SocketSendBufferBytesProp = "socket.send.buffer.bytes"
val SocketReceiveBufferBytesProp = "socket.receive.buffer.bytes"
val SocketRequestMaxBytesProp = "socket.request.max.bytes"
@@ -503,7 +504,7 @@ object KafkaConfig {
val NumIoThreadsDoc = "The number of threads that the server uses for processing requests, which may include disk I/O"
val NumReplicaAlterLogDirsThreadsDoc = "The number of threads that can move replicas between log directories, which may include disk I/O"
val BackgroundThreadsDoc = "The number of threads to use for various background processing tasks"
- val QueuedMaxRequestsDoc = "The number of queued requests allowed before blocking the network threads"
+ val QueuedMaxRequestsDoc = "The number of queued requests allowed for data-plane, before blocking the network threads"
val QueuedMaxRequestBytesDoc = "The number of queued bytes allowed before no more requests are read"
val RequestTimeoutMsDoc = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC
/************* Authorizer Configuration ***********/
@@ -546,6 +547,22 @@ object KafkaConfig {
"prefix (the listener name is lowercased) to the config name. For example, to set a different keystore for the " +
"INTERNAL listener, a config with name listener.name.internal.ssl.keystore.location would be set. " +
"If the config for the listener name is not set, the config will fallback to the generic config (i.e. ssl.keystore.location). "
+ val controlPlaneListenerNameDoc = "Name of listener used for communication between controller and brokers. " +
+ s"Broker will use the $ControlPlaneListenerNameProp to locate the endpoint in $ListenersProp list, to listen for connections from the controller. " +
+ "For example, if a broker's config is :\n" +
+ "listeners = INTERNAL://192.1.1.8:9092, EXTERNAL://10.1.1.5:9093, CONTROLLER://192.1.1.8:9094\n" +
+ "listener.security.protocol.map = INTERNAL:PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL\n" +
+ "control.plane.listener.name = CONTROLLER\n" +
+ "On startup, the broker will start listening on \"192.1.1.8:9094\" with security protocol \"SSL\".\n" +
+ s"On controller side, when it discovers a broker's published endpoints through zookeeper, it will use the $ControlPlaneListenerNameProp " +
+ "to find the endpoint, which it will use to establish connection to the broker.\n" +
+ "For example, if the broker's published endpoints on zookeeper are :\n" +
+ "\"endpoints\" : [\"INTERNAL://broker1.example.com:9092\",\"EXTERNAL://broker1.example.com:9093\",\"CONTROLLER://broker1.example.com:9094\"]\n" +
+ " and the controller's config is :\n" +
+ "listener.security.protocol.map = INTERNAL:PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL\n" +
+ "control.plane.listener.name = CONTROLLER\n" +
+ "then controller will use \"broker1.example.com:9094\" with security protocol \"SSL\" to connect to the broker.\n" +
+ "If not explicitly configured, the default value will be null and there will be no dedicated endpoints for controller connections."
val SocketSendBufferBytesDoc = "The SO_SNDBUF buffer of the socket sever sockets. If the value is -1, the OS default will be used."
val SocketReceiveBufferBytesDoc = "The SO_RCVBUF buffer of the socket sever sockets. If the value is -1, the OS default will be used."
@@ -846,6 +863,7 @@ object KafkaConfig {
.define(AdvertisedPortProp, INT, null, HIGH, AdvertisedPortDoc)
.define(AdvertisedListenersProp, STRING, null, HIGH, AdvertisedListenersDoc)
.define(ListenerSecurityProtocolMapProp, STRING, Defaults.ListenerSecurityProtocolMap, LOW, ListenerSecurityProtocolMapDoc)
+ .define(ControlPlaneListenerNameProp, STRING, null, HIGH, controlPlaneListenerNameDoc)
.define(SocketSendBufferBytesProp, INT, Defaults.SocketSendBufferBytes, HIGH, SocketSendBufferBytesDoc)
.define(SocketReceiveBufferBytesProp, INT, Defaults.SocketReceiveBufferBytes, HIGH, SocketReceiveBufferBytesDoc)
.define(SocketRequestMaxBytesProp, INT, Defaults.SocketRequestMaxBytes, atLeast(1), HIGH, SocketRequestMaxBytesDoc)
@@ -1265,6 +1283,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
def interBrokerListenerName = getInterBrokerListenerNameAndSecurityProtocol._1
def interBrokerSecurityProtocol = getInterBrokerListenerNameAndSecurityProtocol._2
+ def controlPlaneListenerName = getControlPlaneListenerNameAndSecurityProtocol.map { case (listenerName, securityProtocol) => listenerName }
+ def controlPlaneSecurityProtocol = getControlPlaneListenerNameAndSecurityProtocol.map { case (listenerName, securityProtocol) => securityProtocol }
def saslMechanismInterBrokerProtocol = getString(KafkaConfig.SaslMechanismInterBrokerProtocolProp)
val saslInterBrokerHandshakeRequestEnable = interBrokerProtocolVersion >= KAFKA_0_10_0_IV1
@@ -1337,6 +1357,19 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
}.getOrElse(CoreUtils.listenerListToEndPoints("PLAINTEXT://" + hostName + ":" + port, listenerSecurityProtocolMap))
}
+ def controlPlaneListener: Option[EndPoint] = {
+ controlPlaneListenerName.map { listenerName =>
+ listeners.filter(endpoint => endpoint.listenerName.value() == listenerName.value()).head
+ }
+ }
+
+ def dataPlaneListeners: Seq[EndPoint] = {
+ Option(getString(KafkaConfig.ControlPlaneListenerNameProp)) match {
+ case Some(controlPlaneListenerName) => listeners.filterNot(_.listenerName.value() == controlPlaneListenerName)
+ case None => listeners
+ }
+ }
+
// If the user defined advertised listeners, we use those
// If he didn't but did define advertised host or port, we'll use those and fill in the missing value from regular host / port or defaults
// If none of these are defined, we'll use the listeners
@@ -1368,6 +1401,19 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
}
}
+ private def getControlPlaneListenerNameAndSecurityProtocol: Option[(ListenerName, SecurityProtocol)] = {
+ Option(getString(KafkaConfig.ControlPlaneListenerNameProp)) match {
+ case Some(name) =>
+ val listenerName = ListenerName.normalised(name)
+ val securityProtocol = listenerSecurityProtocolMap.getOrElse(listenerName,
+ throw new ConfigException(s"Listener with ${listenerName.value} defined in " +
+ s"${KafkaConfig.ControlPlaneListenerNameProp} not found in ${KafkaConfig.ListenerSecurityProtocolMapProp}."))
+ Some(listenerName, securityProtocol)
+
+ case None => None
+ }
+ }
+
private def getSecurityProtocol(protocolName: String, configName: String): SecurityProtocol = {
try SecurityProtocol.forName(protocolName)
catch {
@@ -1419,6 +1465,17 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
s"${KafkaConfig.AdvertisedListenersProp} cannot use the nonroutable meta-address 0.0.0.0. "+
s"Use a routable IP address.")
+ // validate controller.listener.name config
+ if (controlPlaneListenerName.isDefined) {
+ require(advertisedListenerNames.contains(controlPlaneListenerName.get),
+ s"${KafkaConfig.ControlPlaneListenerNameProp} must be a listener name defined in ${KafkaConfig.AdvertisedListenersProp}. " +
+ s"The valid options based on currently configured listeners are ${advertisedListenerNames.map(_.value).mkString(",")}")
+ // controlPlaneListenerName should be different from interBrokerListenerName
+ require(!controlPlaneListenerName.get.value().equals(interBrokerListenerName.value()),
+ s"${KafkaConfig.ControlPlaneListenerNameProp}, when defined, should have a different value from the inter broker listener name. " +
+ s"Currently they both have the value ${controlPlaneListenerName.get}")
+ }
+
val recordVersion = logMessageFormatVersion.recordVersion
require(interBrokerProtocolVersion.recordVersion.value >= recordVersion.value,
s"log.message.format.version $logMessageFormatVersionString can only be used when inter.broker.protocol.version " +
@@ -1443,7 +1500,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
if (connectionsMaxIdleMs >= 0)
require(failedAuthenticationDelayMs < connectionsMaxIdleMs,
s"${KafkaConfig.FailedAuthenticationDelayMsProp}=$failedAuthenticationDelayMs should always be less than" +
- s" ${KafkaConfig.ConnectionsMaxIdleMsProp}=$connectionsMaxIdleMs to prevent failed" +
- " authentication responses from timing out")
+ s" ${KafkaConfig.ConnectionsMaxIdleMsProp}=$connectionsMaxIdleMs to prevent failed" +
+ s" authentication responses from timing out")
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index d0d4121966311..e0ad1b60cacdf 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -96,13 +96,15 @@ class KafkaRequestHandlerPool(val brokerId: Int,
val requestChannel: RequestChannel,
val apis: KafkaApis,
time: Time,
- numThreads: Int) extends Logging with KafkaMetricsGroup {
+ numThreads: Int,
+ requestHandlerAvgIdleMetricName: String,
+ logAndThreadNamePrefix : String) extends Logging with KafkaMetricsGroup {
private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
/* a meter to track the average free capacity of the request handlers */
- private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
+ private val aggregateIdleMeter = newMeter(requestHandlerAvgIdleMetricName, "percent", TimeUnit.NANOSECONDS)
- this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
+ this.logIdent = "[" + logAndThreadNamePrefix + " Kafka Request Handler on Broker " + brokerId + "], "
val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
for (i <- 0 until numThreads) {
createHandler(i)
@@ -110,7 +112,7 @@ class KafkaRequestHandlerPool(val brokerId: Int,
def createHandler(id: Int): Unit = synchronized {
runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
- KafkaThread.daemon("kafka-request-handler-" + id, runnables(id)).start()
+ KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()
}
def resizeThreadPool(newSize: Int): Unit = synchronized {
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 36bc0f1306414..26e1447fb0fb6 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -113,10 +113,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
val brokerState: BrokerState = new BrokerState
- var apis: KafkaApis = null
+ var dataPlaneRequestProcessor: KafkaApis = null
+ var controlPlaneRequestProcessor: KafkaApis = null
+
var authorizer: Option[Authorizer] = None
var socketServer: SocketServer = null
- var requestHandlerPool: KafkaRequestHandlerPool = null
+ var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
+ var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
var logDirFailureChannel: LogDirFailureChannel = null
var logManager: LogManager = null
@@ -291,12 +294,21 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
/* start processing requests */
- apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
+ dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
fetchManager, brokerTopicStats, clusterId, time, tokenManager)
- requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
- config.numIoThreads)
+ dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
+ config.numIoThreads, "RequestHandlerAvgIdlePercent", socketServer.DataPlanePrefix)
+
+ config.controlPlaneListener.foreach { _ =>
+ controlPlaneRequestProcessor = new KafkaApis(socketServer.controlPlaneRequestChannelOpt.get, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
+ kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
+ fetchManager, brokerTopicStats, clusterId, time, tokenManager)
+
+ controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
+ 1, "ControlPlaneRequestHandlerAvgIdlePercent", socketServer.ControlPlanePrefix)
+ }
Mx4jLoader.maybeLoad()
@@ -313,7 +325,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
dynamicConfigManager.startup()
- socketServer.startProcessors()
+ socketServer.startDataPlaneProcessors()
+ socketServer.startControlPlaneProcessor()
brokerState.newState(RunningAsBroker)
shutdownLatch = new CountDownLatch(1)
startupComplete.set(true)
@@ -579,14 +592,17 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
// Socket server will be shutdown towards the end of the sequence.
if (socketServer != null)
CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
- if (requestHandlerPool != null)
- CoreUtils.swallow(requestHandlerPool.shutdown(), this)
-
+ if (dataPlaneRequestHandlerPool != null)
+ CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this)
+ if (controlPlaneRequestHandlerPool != null)
+ CoreUtils.swallow(controlPlaneRequestHandlerPool.shutdown(), this)
if (kafkaScheduler != null)
CoreUtils.swallow(kafkaScheduler.shutdown(), this)
- if (apis != null)
- CoreUtils.swallow(apis.close(), this)
+ if (dataPlaneRequestProcessor != null)
+ CoreUtils.swallow(dataPlaneRequestProcessor.close(), this)
+ if (controlPlaneRequestProcessor != null)
+ CoreUtils.swallow(controlPlaneRequestProcessor.close(), this)
CoreUtils.swallow(authorizer.foreach(_.close()), this)
if (adminManager != null)
CoreUtils.swallow(adminManager.shutdown(), this)
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index ee69ae42a0f53..92d1758b19460 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -244,9 +244,9 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
client = AdminClient.create(createConfig())
val nodes = client.describeCluster.nodes.get()
val clusterId = client.describeCluster().clusterId().get()
- assertEquals(servers.head.apis.clusterId, clusterId)
+ assertEquals(servers.head.dataPlaneRequestProcessor.clusterId, clusterId)
val controller = client.describeCluster().controller().get()
- assertEquals(servers.head.apis.metadataCache.getControllerId.
+ assertEquals(servers.head.dataPlaneRequestProcessor.metadataCache.getControllerId.
getOrElse(MetadataResponse.NO_CONTROLLER_ID), controller.id())
val brokers = brokerList.split(",")
assertEquals(brokers.size, nodes.size)
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 1b51101e76f4f..4c0459e72e0aa 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -1452,9 +1452,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
def removeAllAcls() = {
- servers.head.apis.authorizer.get.getAcls().keys.foreach { resource =>
- servers.head.apis.authorizer.get.removeAcls(resource)
- TestUtils.waitAndVerifyAcls(Set.empty[Acl], servers.head.apis.authorizer.get, resource)
+ servers.head.dataPlaneRequestProcessor.authorizer.get.getAcls().keys.foreach { resource =>
+ servers.head.dataPlaneRequestProcessor.authorizer.get.removeAcls(resource)
+ TestUtils.waitAndVerifyAcls(Set.empty[Acl], servers.head.dataPlaneRequestProcessor.authorizer.get, resource)
}
}
@@ -1507,8 +1507,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
private def addAndVerifyAcls(acls: Set[Acl], resource: Resource) = {
- servers.head.apis.authorizer.get.addAcls(acls, resource)
- TestUtils.waitAndVerifyAcls(servers.head.apis.authorizer.get.getAcls(resource) ++ acls, servers.head.apis.authorizer.get, resource)
+ servers.head.dataPlaneRequestProcessor.authorizer.get.addAcls(acls, resource)
+ TestUtils.waitAndVerifyAcls(servers.head.dataPlaneRequestProcessor.authorizer.get.getAcls(resource) ++ acls, servers.head.dataPlaneRequestProcessor.authorizer.get, resource)
}
private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]],
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index 9b02d809b5500..b28a40f890e00 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -285,7 +285,7 @@ abstract class QuotaTestClients(topic: String,
def waitForQuotaUpdate(producerQuota: Long, consumerQuota: Long, requestQuota: Double, server: KafkaServer = leaderNode) {
TestUtils.retry(10000) {
- val quotaManagers = server.apis.quotas
+ val quotaManagers = server.dataPlaneRequestProcessor.quotas
val overrideProducerQuota = quota(quotaManagers.produce, userPrincipal, producerClientId)
val overrideConsumerQuota = quota(quotaManagers.fetch, userPrincipal, consumerClientId)
val overrideProducerRequestQuota = quota(quotaManagers.request, userPrincipal, producerClientId)
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 854e3381342b9..0587a6de16021 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -182,8 +182,8 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
override def setUp() {
super.setUp()
servers.foreach { s =>
- TestUtils.waitAndVerifyAcls(ClusterActionAcl, s.apis.authorizer.get, Resource.ClusterResource)
- TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.apis.authorizer.get, Resource(Topic, "*", LITERAL))
+ TestUtils.waitAndVerifyAcls(ClusterActionAcl, s.dataPlaneRequestProcessor.authorizer.get, Resource.ClusterResource)
+ TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.dataPlaneRequestProcessor.authorizer.get, Resource(Topic, "*", LITERAL))
}
// create the test topic with all the brokers as replicas
createTopic(topic, 1, 3)
@@ -221,7 +221,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
assertTrue("failed re-authentications not 0", TestUtils.totalMetricValue(s, "failed-reauthentication-total") == 0)
}
}
-
+
private def getGauge(metricName: String) = {
Metrics.defaultRegistry.allMetrics.asScala
.filterKeys(k => k.getName == metricName)
@@ -275,16 +275,16 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
private def setWildcardResourceAcls() {
AclCommand.main(produceConsumeWildcardAclArgs)
servers.foreach { s =>
- TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl ++ TopicBrokerReadAcl, s.apis.authorizer.get, wildcardTopicResource)
- TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, wildcardGroupResource)
+ TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl ++ TopicBrokerReadAcl, s.dataPlaneRequestProcessor.authorizer.get, wildcardTopicResource)
+ TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, wildcardGroupResource)
}
}
private def setPrefixedResourceAcls() {
AclCommand.main(produceConsumePrefixedAclsArgs)
servers.foreach { s =>
- TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, prefixedTopicResource)
- TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, prefixedGroupResource)
+ TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, prefixedTopicResource)
+ TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, prefixedGroupResource)
}
}
@@ -292,9 +292,9 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
AclCommand.main(produceAclArgs(tp.topic))
AclCommand.main(consumeAclArgs(tp.topic))
servers.foreach { s =>
- TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get,
+ TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get,
new Resource(Topic, tp.topic, PatternType.LITERAL))
- TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource)
+ TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource)
}
val producer = createProducer()
sendRecords(producer, numRecords, tp)
@@ -315,7 +315,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
def testNoProduceWithDescribeAcl(): Unit = {
AclCommand.main(describeAclArgs)
servers.foreach { s =>
- TestUtils.waitAndVerifyAcls(TopicDescribeAcl, s.apis.authorizer.get, topicResource)
+ TestUtils.waitAndVerifyAcls(TopicDescribeAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource)
}
try{
val producer = createProducer()
@@ -327,7 +327,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
}
confirmReauthenticationMetrics
}
-
+
/**
* Tests that a consumer fails to consume messages without the appropriate
* ACL set.
@@ -341,7 +341,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
consumeRecords(consumer)
confirmReauthenticationMetrics
}
-
+
@Test(expected = classOf[TopicAuthorizationException])
def testNoConsumeWithoutDescribeAclViaSubscribe(): Unit = {
noConsumeWithoutDescribeAclSetup()
@@ -350,13 +350,13 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
// this should timeout since the consumer will not be able to fetch any metadata for the topic
consumeRecords(consumer, timeout = 3000)
}
-
+
private def noConsumeWithoutDescribeAclSetup(): Unit = {
AclCommand.main(produceAclArgs(tp.topic))
AclCommand.main(groupAclArgs)
servers.foreach { s =>
- TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, topicResource)
- TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource)
+ TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource)
+ TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource)
}
val producer = createProducer()
@@ -365,10 +365,10 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
AclCommand.main(deleteDescribeAclArgs)
AclCommand.main(deleteWriteAclArgs)
servers.foreach { s =>
- TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource)
+ TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource)
}
}
-
+
@Test
def testNoConsumeWithDescribeAclViaAssign(): Unit = {
noConsumeWithDescribeAclSetup()
@@ -384,7 +384,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
}
confirmReauthenticationMetrics
}
-
+
@Test
def testNoConsumeWithDescribeAclViaSubscribe(): Unit = {
noConsumeWithDescribeAclSetup()
@@ -400,13 +400,13 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
}
confirmReauthenticationMetrics
}
-
+
private def noConsumeWithDescribeAclSetup(): Unit = {
AclCommand.main(produceAclArgs(tp.topic))
AclCommand.main(groupAclArgs)
servers.foreach { s =>
- TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, topicResource)
- TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource)
+ TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource)
+ TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource)
}
val producer = createProducer()
sendRecords(producer, numRecords, tp)
@@ -420,7 +420,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
def testNoGroupAcl(): Unit = {
AclCommand.main(produceAclArgs(tp.topic))
servers.foreach { s =>
- TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, topicResource)
+ TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource)
}
val producer = createProducer()
sendRecords(producer, numRecords, tp)
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
index 55e1529c96316..cb2186c42271b 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
@@ -71,7 +71,7 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
private def addClusterAcl(permissionType: PermissionType, operation: Operation): Unit = {
val acls = Set(clusterAcl(permissionType, operation))
- val authorizer = servers.head.apis.authorizer.get
+ val authorizer = servers.head.dataPlaneRequestProcessor.authorizer.get
val prevAcls = authorizer.getAcls(AuthResource.ClusterResource)
authorizer.addAcls(acls, AuthResource.ClusterResource)
TestUtils.waitAndVerifyAcls(prevAcls ++ acls, authorizer, AuthResource.ClusterResource)
@@ -79,7 +79,7 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
private def removeClusterAcl(permissionType: PermissionType, operation: Operation): Unit = {
val acls = Set(clusterAcl(permissionType, operation))
- val authorizer = servers.head.apis.authorizer.get
+ val authorizer = servers.head.dataPlaneRequestProcessor.authorizer.get
val prevAcls = authorizer.getAcls(AuthResource.ClusterResource)
Assert.assertTrue(authorizer.removeAcls(acls, AuthResource.ClusterResource))
TestUtils.waitAndVerifyAcls(prevAcls -- acls, authorizer, AuthResource.ClusterResource)
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 853f99944ff13..c13b0a384c861 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -501,8 +501,8 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
@Test
def testThreadPoolResize(): Unit = {
- val requestHandlerPrefix = "kafka-request-handler-"
- val networkThreadPrefix = "kafka-network-thread-"
+ val requestHandlerPrefix = "data-plane-kafka-request-handler-"
+ val networkThreadPrefix = "data-plane-kafka-network-thread-"
val fetcherThreadPrefix = "ReplicaFetcherThread-"
// Executor threads and recovery threads are not verified since threads may not be running
// For others, thread count should be configuredCount * threadMultiplier * numBrokers
@@ -577,7 +577,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
"", mayReceiveDuplicates = false)
verifyThreadPoolResize(KafkaConfig.NumNetworkThreadsProp, config.numNetworkThreads,
networkThreadPrefix, mayReceiveDuplicates = true)
- verifyThreads("kafka-socket-acceptor-", config.listeners.size)
+ verifyThreads("data-plane-kafka-socket-acceptor-", config.listeners.size)
verifyProcessorMetrics()
verifyMarkPartitionsForTruncation()
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 2306a921898ca..a64f6e7ab9217 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -175,8 +175,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
// Test that the existing clientId overrides are read
val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
servers = Seq(server)
- assertEquals(new Quota(1000, true), server.apis.quotas.produce.quota("ANONYMOUS", clientId))
- assertEquals(new Quota(2000, true), server.apis.quotas.fetch.quota("ANONYMOUS", clientId))
+ assertEquals(new Quota(1000, true), server.dataPlaneRequestProcessor.quotas.produce.quota("ANONYMOUS", clientId))
+ assertEquals(new Quota(2000, true), server.dataPlaneRequestProcessor.quotas.fetch.quota("ANONYMOUS", clientId))
}
@Test
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 23f62252fcb07..08747a85a8e57 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -32,8 +32,10 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{ControllerMovedException, StaleBrokerEpochException}
import org.apache.log4j.Level
import kafka.utils.LogCaptureAppender
+import org.apache.kafka.common.metrics.KafkaMetric
import scala.collection.JavaConverters._
+import scala.collection.mutable
import scala.util.Try
class ControllerIntegrationTest extends ZooKeeperTestHarness {
@@ -83,6 +85,31 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
waitUntilControllerEpoch(firstControllerEpoch + 1, "controller epoch was not incremented after controller move")
}
+ @Test
+ def testMetadataPropagationOnControlPlane(): Unit = {
+ servers = makeServers(1, listeners = Some("PLAINTEXT://localhost:0,CONTROLLER://localhost:5000"), listenerSecurityProtocolMap = Some("PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT"),
+ controlPlaneListenerName = Some("CONTROLLER"))
+ TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
+ val controlPlaneMetricMap = mutable.Map[String, KafkaMetric]()
+ val dataPlaneMetricMap = mutable.Map[String, KafkaMetric]()
+ servers.head.metrics.metrics().values().asScala.foreach { kafkaMetric =>
+ if (kafkaMetric.metricName().tags().values().contains("CONTROLLER")) {
+ controlPlaneMetricMap.put(kafkaMetric.metricName().name(), kafkaMetric)
+ }
+ if (kafkaMetric.metricName().tags().values().contains("PLAINTEXT")) {
+ dataPlaneMetricMap.put(kafkaMetric.metricName().name(), kafkaMetric)
+ }
+ }
+ assertEquals(1e-0, controlPlaneMetricMap.get("response-total").get.metricValue().asInstanceOf[Double], 0)
+ assertEquals(0e-0, dataPlaneMetricMap.get("response-total").get.metricValue().asInstanceOf[Double], 0)
+ assertEquals(1e-0, controlPlaneMetricMap.get("request-total").get.metricValue().asInstanceOf[Double], 0)
+ assertEquals(0e-0, dataPlaneMetricMap.get("request-total").get.metricValue().asInstanceOf[Double], 0)
+ assertTrue(controlPlaneMetricMap.get("incoming-byte-total").get.metricValue().asInstanceOf[Double] > 1.0)
+ assertTrue(dataPlaneMetricMap.get("incoming-byte-total").get.metricValue().asInstanceOf[Double] == 0.0)
+ assertTrue(controlPlaneMetricMap.get("network-io-total").get.metricValue().asInstanceOf[Double] == 2.0)
+ assertTrue(dataPlaneMetricMap.get("network-io-total").get.metricValue().asInstanceOf[Double] == 0.0)
+ }
+
// This test case is used to ensure that there will be no correctness issue after we avoid sending out full
// UpdateMetadataRequest to all brokers in the cluster
@Test
@@ -376,10 +403,10 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
var activeServers = servers.filter(s => s.config.brokerId != 2)
// wait for the update metadata request to trickle to the brokers
TestUtils.waitUntilTrue(() =>
- activeServers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.isr.size != 3),
+ activeServers.forall(_.dataPlaneRequestProcessor.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.isr.size != 3),
"Topic test not created after timeout")
assertEquals(0, partitionsRemaining.size)
- var partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get
+ var partitionStateInfo = activeServers.head.dataPlaneRequestProcessor.metadataCache.getPartitionInfo(topic,partition).get
var leaderAfterShutdown = partitionStateInfo.basePartitionState.leader
assertEquals(0, leaderAfterShutdown)
assertEquals(2, partitionStateInfo.basePartitionState.isr.size)
@@ -388,16 +415,16 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
partitionsRemaining = resultQueue.take().get
assertEquals(0, partitionsRemaining.size)
activeServers = servers.filter(s => s.config.brokerId == 0)
- partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get
+ partitionStateInfo = activeServers.head.dataPlaneRequestProcessor.metadataCache.getPartitionInfo(topic,partition).get
leaderAfterShutdown = partitionStateInfo.basePartitionState.leader
assertEquals(0, leaderAfterShutdown)
- assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader == 0))
+ assertTrue(servers.forall(_.dataPlaneRequestProcessor.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader == 0))
controller.controlledShutdown(0, servers.find(_.config.brokerId == 0).get.kafkaController.brokerEpoch, controlledShutdownCallback)
partitionsRemaining = resultQueue.take().get
assertEquals(1, partitionsRemaining.size)
// leader doesn't change since all the replicas are shut down
- assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader == 0))
+ assertTrue(servers.forall(_.dataPlaneRequestProcessor.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader == 0))
}
@Test
@@ -585,12 +612,18 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
private def makeServers(numConfigs: Int,
autoLeaderRebalanceEnable: Boolean = false,
uncleanLeaderElectionEnable: Boolean = false,
- enableControlledShutdown: Boolean = true) = {
+ enableControlledShutdown: Boolean = true,
+ listeners : Option[String] = None,
+ listenerSecurityProtocolMap : Option[String] = None,
+ controlPlaneListenerName : Option[String] = None) = {
val configs = TestUtils.createBrokerConfigs(numConfigs, zkConnect, enableControlledShutdown = enableControlledShutdown)
configs.foreach { config =>
config.setProperty(KafkaConfig.AutoLeaderRebalanceEnableProp, autoLeaderRebalanceEnable.toString)
config.setProperty(KafkaConfig.UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString)
config.setProperty(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp, "1")
+ listeners.foreach(listener => config.setProperty(KafkaConfig.ListenersProp, listener))
+ listenerSecurityProtocolMap.foreach(listenerMap => config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, listenerMap))
+ controlPlaneListenerName.foreach(controlPlaneListener => config.setProperty(KafkaConfig.ControlPlaneListenerNameProp, controlPlaneListener))
}
configs.map(config => TestUtils.createServer(KafkaConfig.fromProps(config)))
}
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index b5983377b7730..2609d9e0b7905 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -21,7 +21,7 @@ import java.io._
import java.net._
import java.nio.ByteBuffer
import java.nio.channels.SocketChannel
-import java.util.{HashMap, Random}
+import java.util.{HashMap, Properties, Random}
import com.yammer.metrics.core.{Gauge, Meter}
import com.yammer.metrics.{Metrics => YammerMetrics}
@@ -134,8 +134,8 @@ class SocketServerTest extends JUnitSuite {
channel.sendResponse(new RequestChannel.SendResponse(request, send, Some(request.header.toString), None))
}
- def connect(s: SocketServer = server, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT, localAddr: InetAddress = null) = {
- val socket = new Socket("localhost", s.boundPort(ListenerName.forSecurityProtocol(protocol)), localAddr, 0)
+ def connect(s: SocketServer = server, listenerName: ListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), localAddr: InetAddress = null, port: Int = 0) = {
+ val socket = new Socket("localhost", s.boundPort(listenerName), localAddr, port)
sockets += socket
socket
}
@@ -144,13 +144,13 @@ class SocketServerTest extends JUnitSuite {
def connectAndProcessRequest(s: SocketServer): (Socket, String) = {
val socket = connect(s)
val request = sendAndReceiveRequest(socket, s)
- processRequest(s.requestChannel, request)
+ processRequest(s.dataPlaneRequestChannel, request)
(socket, request.context.connectionId)
}
def sendAndReceiveRequest(socket: Socket, server: SocketServer): RequestChannel.Request = {
sendRequest(socket, producerRequestBytes())
- receiveRequest(server.requestChannel)
+ receiveRequest(server.dataPlaneRequestChannel)
}
def shutdownServerAndMetrics(server: SocketServer): Unit = {
@@ -176,15 +176,29 @@ class SocketServerTest extends JUnitSuite {
@Test
def simpleRequest() {
- val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)
+ val plainSocket = connect()
val serializedBytes = producerRequestBytes()
// Test PLAINTEXT socket
sendRequest(plainSocket, serializedBytes)
- processRequest(server.requestChannel)
+ processRequest(server.dataPlaneRequestChannel)
assertEquals(serializedBytes.toSeq, receiveResponse(plainSocket).toSeq)
}
+ @Test
+ def testControlPlaneRequest(): Unit = {
+ val testProps = new Properties
+ testProps.putAll(props)
+ testProps.put("listeners", "PLAINTEXT://localhost:0,CONTROLLER://localhost:5000")
+ testProps.put("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT")
+ testProps.put("control.plane.listener.name", "CONTROLLER")
+ val config = KafkaConfig.fromProps(testProps)
+ withTestableServer(config, { testableServer =>
+ val socket = connect(testableServer, config.controlPlaneListenerName.get, localAddr = InetAddress.getLocalHost, port = 5000)
+ sendAndReceiveControllerRequest(socket, testableServer)
+ })
+ }
+
@Test
def tooBigRequestIsRejected() {
val tooManyBytes = new Array[Byte](server.config.socketRequestMaxBytes + 1)
@@ -205,41 +219,41 @@ class SocketServerTest extends JUnitSuite {
@Test
def testGracefulClose() {
- val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)
+ val plainSocket = connect()
val serializedBytes = producerRequestBytes()
for (_ <- 0 until 10)
sendRequest(plainSocket, serializedBytes)
plainSocket.close()
for (_ <- 0 until 10) {
- val request = receiveRequest(server.requestChannel)
+ val request = receiveRequest(server.dataPlaneRequestChannel)
assertNotNull("receiveRequest timed out", request)
- server.requestChannel.sendResponse(new RequestChannel.NoOpResponse(request))
+ server.dataPlaneRequestChannel.sendResponse(new RequestChannel.NoOpResponse(request))
}
}
@Test
def testNoOpAction(): Unit = {
- val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)
+ val plainSocket = connect()
val serializedBytes = producerRequestBytes()
for (_ <- 0 until 3)
sendRequest(plainSocket, serializedBytes)
for (_ <- 0 until 3) {
- val request = receiveRequest(server.requestChannel)
+ val request = receiveRequest(server.dataPlaneRequestChannel)
assertNotNull("receiveRequest timed out", request)
- server.requestChannel.sendResponse(new RequestChannel.NoOpResponse(request))
+ server.dataPlaneRequestChannel.sendResponse(new RequestChannel.NoOpResponse(request))
}
}
@Test
def testConnectionId() {
- val sockets = (1 to 5).map(_ => connect(protocol = SecurityProtocol.PLAINTEXT))
+ val sockets = (1 to 5).map(_ => connect())
val serializedBytes = producerRequestBytes()
val requests = sockets.map{socket =>
sendRequest(socket, serializedBytes)
- receiveRequest(server.requestChannel)
+ receiveRequest(server.dataPlaneRequestChannel)
}
requests.zipWithIndex.foreach { case (request, i) =>
val index = request.context.connectionId.split("-").last
@@ -258,36 +272,36 @@ class SocketServerTest extends JUnitSuite {
val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, time, credentialProvider)
def openChannel(request: RequestChannel.Request): Option[KafkaChannel] =
- overrideServer.processor(request.processor).channel(request.context.connectionId)
+ overrideServer.dataPlaneProcessor(request.processor).channel(request.context.connectionId)
def openOrClosingChannel(request: RequestChannel.Request): Option[KafkaChannel] =
- overrideServer.processor(request.processor).openOrClosingChannel(request.context.connectionId)
+ overrideServer.dataPlaneProcessor(request.processor).openOrClosingChannel(request.context.connectionId)
try {
overrideServer.startup()
val serializedBytes = producerRequestBytes()
// Connection with no staged receives
- val socket1 = connect(overrideServer, protocol = SecurityProtocol.PLAINTEXT)
+ val socket1 = connect(overrideServer)
sendRequest(socket1, serializedBytes)
- val request1 = receiveRequest(overrideServer.requestChannel)
+ val request1 = receiveRequest(overrideServer.dataPlaneRequestChannel)
assertTrue("Channel not open", openChannel(request1).nonEmpty)
assertEquals(openChannel(request1), openOrClosingChannel(request1))
time.sleep(idleTimeMs + 1)
TestUtils.waitUntilTrue(() => openOrClosingChannel(request1).isEmpty, "Failed to close idle channel")
assertTrue("Channel not removed", openChannel(request1).isEmpty)
- processRequest(overrideServer.requestChannel, request1)
+ processRequest(overrideServer.dataPlaneRequestChannel, request1)
// Connection with staged receives
- val socket2 = connect(overrideServer, protocol = SecurityProtocol.PLAINTEXT)
+ val socket2 = connect(overrideServer)
val request2 = sendRequestsUntilStagedReceive(overrideServer, socket2, serializedBytes)
time.sleep(idleTimeMs + 1)
TestUtils.waitUntilTrue(() => openChannel(request2).isEmpty, "Failed to close idle channel")
TestUtils.waitUntilTrue(() => openOrClosingChannel(request2).nonEmpty, "Channel removed without processing staged receives")
- processRequest(overrideServer.requestChannel, request2) // this triggers a failed send since channel has been closed
+ processRequest(overrideServer.dataPlaneRequestChannel, request2) // this triggers a failed send since channel has been closed
TestUtils.waitUntilTrue(() => openOrClosingChannel(request2).isEmpty, "Failed to remove channel with failed sends")
- assertNull("Received request after failed send", overrideServer.requestChannel.receiveRequest(200))
+ assertNull("Received request after failed send", overrideServer.dataPlaneRequestChannel.receiveRequest(200))
} finally {
shutdownServerAndMetrics(overrideServer)
@@ -304,9 +318,9 @@ class SocketServerTest extends JUnitSuite {
@volatile var selector: TestableSelector = null
val overrideConnectionId = "127.0.0.1:1-127.0.0.1:2-0"
val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, time, credentialProvider) {
- override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
+ override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
- new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas,
+ new Processor(id, time, config.socketRequestMaxBytes, dataPlaneRequestChannel, connectionQuotas,
config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics,
credentialProvider, memoryPool, new LogContext()) {
override protected[network] def connectionId(socket: Socket): String = overrideConnectionId
@@ -319,8 +333,8 @@ class SocketServerTest extends JUnitSuite {
}
}
- def openChannel: Option[KafkaChannel] = overrideServer.processor(0).channel(overrideConnectionId)
- def openOrClosingChannel: Option[KafkaChannel] = overrideServer.processor(0).openOrClosingChannel(overrideConnectionId)
+ def openChannel: Option[KafkaChannel] = overrideServer.dataPlaneProcessor(0).channel(overrideConnectionId)
+ def openOrClosingChannel: Option[KafkaChannel] = overrideServer.dataPlaneProcessor(0).openOrClosingChannel(overrideConnectionId)
def connectionCount = overrideServer.connectionCount(InetAddress.getByName("127.0.0.1"))
// Create a client connection and wait for server to register the connection with the selector. For
@@ -361,7 +375,7 @@ class SocketServerTest extends JUnitSuite {
assertSame(channel1, openOrClosingChannel.getOrElse(throw new RuntimeException("Channel not found")))
// Complete request with failed send so that `channel1` is removed from Selector.closingChannels
- processRequest(overrideServer.requestChannel, request)
+ processRequest(overrideServer.dataPlaneRequestChannel, request)
TestUtils.waitUntilTrue(() => connectionCount == 0 && openOrClosingChannel.isEmpty, "Failed to remove channel with failed send")
// Check that new connections can be created with the same id since `channel1` is no longer in Selector
@@ -380,14 +394,14 @@ class SocketServerTest extends JUnitSuite {
def sendTwoRequestsReceiveOne(): RequestChannel.Request = {
sendRequest(socket, requestBytes, flush = false)
sendRequest(socket, requestBytes, flush = true)
- receiveRequest(server.requestChannel)
+ receiveRequest(server.dataPlaneRequestChannel)
}
val (request, hasStagedReceives) = TestUtils.computeUntilTrue(sendTwoRequestsReceiveOne()) { req =>
val connectionId = req.context.connectionId
- val hasStagedReceives = server.processor(0).numStagedReceives(connectionId) > 0
+ val hasStagedReceives = server.dataPlaneProcessor(0).numStagedReceives(connectionId) > 0
if (!hasStagedReceives) {
- processRequest(server.requestChannel, req)
- processRequest(server.requestChannel)
+ processRequest(server.dataPlaneRequestChannel, req)
+ processRequest(server.dataPlaneRequestChannel)
}
hasStagedReceives
}
@@ -403,11 +417,11 @@ class SocketServerTest extends JUnitSuite {
// Mimic a primitive request handler that fetches the request from RequestChannel and place a response with a
// throttled channel.
- val request = receiveRequest(server.requestChannel)
+ val request = receiveRequest(server.dataPlaneRequestChannel)
val byteBuffer = request.body[AbstractRequest].serialize(request.header)
val send = new NetworkSend(request.context.connectionId, byteBuffer)
def channelThrottlingCallback(response: RequestChannel.Response): Unit = {
- server.requestChannel.sendResponse(response)
+ server.dataPlaneRequestChannel.sendResponse(response)
}
val throttledChannel = new ThrottledChannel(request, new MockTime(), 100, channelThrottlingCallback)
val response =
@@ -415,7 +429,7 @@ class SocketServerTest extends JUnitSuite {
new RequestChannel.SendResponse(request, send, Some(request.header.toString), None)
else
new RequestChannel.NoOpResponse(request)
- server.requestChannel.sendResponse(response)
+ server.dataPlaneRequestChannel.sendResponse(response)
// Quota manager would call notifyThrottlingDone() on throttling completion. Simulate it if throttleingInProgress is
// false.
@@ -426,11 +440,11 @@ class SocketServerTest extends JUnitSuite {
}
def openOrClosingChannel(request: RequestChannel.Request): Option[KafkaChannel] =
- server.processor(0).openOrClosingChannel(request.context.connectionId)
+ server.dataPlaneProcessor(0).openOrClosingChannel(request.context.connectionId)
@Test
def testSendActionResponseWithThrottledChannelWhereThrottlingInProgress() {
- val socket = connect(protocol = SecurityProtocol.PLAINTEXT)
+ val socket = connect()
val serializedBytes = producerRequestBytes()
// SendAction with throttling in progress
val request = throttledChannelTestSetUp(socket, serializedBytes, false, true)
@@ -444,7 +458,7 @@ class SocketServerTest extends JUnitSuite {
@Test
def testSendActionResponseWithThrottledChannelWhereThrottlingAlreadyDone() {
- val socket = connect(protocol = SecurityProtocol.PLAINTEXT)
+ val socket = connect()
val serializedBytes = producerRequestBytes()
// SendAction with throttling in progress
val request = throttledChannelTestSetUp(socket, serializedBytes, false, false)
@@ -459,7 +473,7 @@ class SocketServerTest extends JUnitSuite {
@Test
def testNoOpActionResponseWithThrottledChannelWhereThrottlingInProgress() {
- val socket = connect(protocol = SecurityProtocol.PLAINTEXT)
+ val socket = connect()
val serializedBytes = producerRequestBytes()
// SendAction with throttling in progress
val request = throttledChannelTestSetUp(socket, serializedBytes, true, true)
@@ -471,7 +485,7 @@ class SocketServerTest extends JUnitSuite {
@Test
def testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone() {
- val socket = connect(protocol = SecurityProtocol.PLAINTEXT)
+ val socket = connect()
val serializedBytes = producerRequestBytes()
// SendAction with throttling in progress
val request = throttledChannelTestSetUp(socket, serializedBytes, true, false)
@@ -485,16 +499,16 @@ class SocketServerTest extends JUnitSuite {
@Test
def testSocketsCloseOnShutdown() {
// open a connection
- val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)
+ val plainSocket = connect()
plainSocket.setTcpNoDelay(true)
val bytes = new Array[Byte](40)
// send a request first to make sure the connection has been picked up by the socket server
sendRequest(plainSocket, bytes, Some(0))
- processRequest(server.requestChannel)
+ processRequest(server.dataPlaneRequestChannel)
// the following sleep is necessary to reliably detect the connection close when we send data below
Thread.sleep(200L)
- // make sure the sockets are open
- server.acceptors.asScala.values.foreach(acceptor => assertFalse(acceptor.serverChannel.socket.isClosed))
+ // make sure the sockets ar e open
+ server.dataPlaneAcceptors.asScala.values.foreach(acceptor => assertFalse(acceptor.serverChannel.socket.isClosed))
// then shutdown the server
shutdownServerAndMetrics(server)
@@ -527,7 +541,7 @@ class SocketServerTest extends JUnitSuite {
val conn2 = connect()
val serializedBytes = producerRequestBytes()
sendRequest(conn2, serializedBytes)
- val request = server.requestChannel.receiveRequest(2000)
+ val request = server.dataPlaneRequestChannel.receiveRequest(2000)
assertNotNull(request)
}
@@ -555,7 +569,7 @@ class SocketServerTest extends JUnitSuite {
val conn2 = connect(server)
val serializedBytes = producerRequestBytes()
sendRequest(conn2, serializedBytes)
- val request = server.requestChannel.receiveRequest(2000)
+ val request = server.dataPlaneRequestChannel.receiveRequest(2000)
assertNotNull(request)
// now try to connect from the external facing interface, which should fail
@@ -583,7 +597,7 @@ class SocketServerTest extends JUnitSuite {
// it should succeed
val serializedBytes = producerRequestBytes()
sendRequest(conns.last, serializedBytes)
- val request = overrideServer.requestChannel.receiveRequest(2000)
+ val request = overrideServer.dataPlaneRequestChannel.receiveRequest(2000)
assertNotNull(request)
// now try one more (should fail)
@@ -627,7 +641,7 @@ class SocketServerTest extends JUnitSuite {
byteBuffer.get(serializedBytes)
sendRequest(sslSocket, serializedBytes)
- processRequest(overrideServer.requestChannel)
+ processRequest(overrideServer.dataPlaneRequestChannel)
assertEquals(serializedBytes.toSeq, receiveResponse(sslSocket).toSeq)
sslSocket.close()
} finally {
@@ -640,7 +654,7 @@ class SocketServerTest extends JUnitSuite {
val socket = connect()
val bytes = new Array[Byte](40)
sendRequest(socket, bytes, Some(0))
- assertEquals(KafkaPrincipal.ANONYMOUS, receiveRequest(server.requestChannel).session.principal)
+ assertEquals(KafkaPrincipal.ANONYMOUS, receiveRequest(server.dataPlaneRequestChannel).session.principal)
}
/* Test that we update request metrics if the client closes the connection while the broker response is in flight. */
@@ -650,9 +664,9 @@ class SocketServerTest extends JUnitSuite {
val serverMetrics = new Metrics
var conn: Socket = null
val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM, credentialProvider) {
- override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
+ override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
- new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas,
+ new Processor(id, time, config.socketRequestMaxBytes, dataPlaneRequestChannel, connectionQuotas,
config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics,
credentialProvider, MemoryPool.NONE, new LogContext()) {
override protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) {
@@ -668,7 +682,7 @@ class SocketServerTest extends JUnitSuite {
val serializedBytes = producerRequestBytes()
sendRequest(conn, serializedBytes)
- val channel = overrideServer.requestChannel
+ val channel = overrideServer.dataPlaneRequestChannel
val request = receiveRequest(channel)
val requestMetrics = channel.metrics(request.header.apiKey.name)
@@ -696,9 +710,9 @@ class SocketServerTest extends JUnitSuite {
@volatile var selector: TestableSelector = null
val overrideConnectionId = "127.0.0.1:1-127.0.0.1:2-0"
val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM, credentialProvider) {
- override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
+ override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
- new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas,
+ new Processor(id, time, config.socketRequestMaxBytes, dataPlaneRequestChannel, connectionQuotas,
config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics,
credentialProvider, memoryPool, new LogContext()) {
override protected[network] def connectionId(socket: Socket): String = overrideConnectionId
@@ -711,8 +725,8 @@ class SocketServerTest extends JUnitSuite {
}
}
- def openChannel: Option[KafkaChannel] = overrideServer.processor(0).channel(overrideConnectionId)
- def openOrClosingChannel: Option[KafkaChannel] = overrideServer.processor(0).openOrClosingChannel(overrideConnectionId)
+ def openChannel: Option[KafkaChannel] = overrideServer.dataPlaneProcessor(0).channel(overrideConnectionId)
+ def openOrClosingChannel: Option[KafkaChannel] = overrideServer.dataPlaneProcessor(0).openOrClosingChannel(overrideConnectionId)
try {
overrideServer.startup()
@@ -730,7 +744,7 @@ class SocketServerTest extends JUnitSuite {
socket.close()
// Complete request with socket exception so that the channel is removed from Selector.closingChannels
- processRequest(overrideServer.requestChannel, request)
+ processRequest(overrideServer.dataPlaneRequestChannel, request)
TestUtils.waitUntilTrue(() => openOrClosingChannel.isEmpty, "Channel not closed after failed send")
assertTrue("Unexpected completed send", selector.completedSends.isEmpty)
} finally {
@@ -755,10 +769,10 @@ class SocketServerTest extends JUnitSuite {
conn = connect(overrideServer)
val serializedBytes = producerRequestBytes()
sendRequest(conn, serializedBytes)
- val channel = overrideServer.requestChannel
+ val channel = overrideServer.dataPlaneRequestChannel
val request = receiveRequest(channel)
- TestUtils.waitUntilTrue(() => overrideServer.processor(request.processor).channel(request.context.connectionId).isEmpty,
+ TestUtils.waitUntilTrue(() => overrideServer.dataPlaneProcessor(request.processor).channel(request.context.connectionId).isEmpty,
s"Idle connection `${request.context.connectionId}` was not closed by selector")
val requestMetrics = channel.metrics(request.header.apiKey.name)
@@ -780,10 +794,10 @@ class SocketServerTest extends JUnitSuite {
server.stopProcessingRequests()
val version = ApiKeys.PRODUCE.latestVersion
val version2 = (version - 1).toShort
- for (_ <- 0 to 1) server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).mark()
- server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version2).mark()
- assertEquals(2, server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).count())
- server.requestChannel.updateErrorMetrics(ApiKeys.PRODUCE, Map(Errors.NONE -> 1))
+ for (_ <- 0 to 1) server.dataPlaneRequestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).mark()
+ server.dataPlaneRequestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version2).mark()
+ assertEquals(2, server.dataPlaneRequestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).count())
+ server.dataPlaneRequestChannel.updateErrorMetrics(ApiKeys.PRODUCE, Map(Errors.NONE -> 1))
val nonZeroMeters = Map(s"kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce,version=$version" -> 2,
s"kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce,version=$version2" -> 1,
"kafka.network:type=RequestMetrics,name=ErrorsPerSec,request=Produce,error=NONE" -> 1)
@@ -808,7 +822,7 @@ class SocketServerTest extends JUnitSuite {
.allMetrics.asScala
.filterKeys(k => k.getName.endsWith("IdlePercent") || k.getName.endsWith("NetworkProcessorAvgIdlePercent"))
.collect { case (k, metric: Gauge[_]) => (k, metric.value().asInstanceOf[Double]) }
- .filter { case (_, value) => value != 0.0 }
+ .filter { case (_, value) => value != 0.0 && !value.equals(Double.NaN) }
assertEquals(Map.empty, nonZeroMetricNamesAndValues)
}
@@ -818,7 +832,7 @@ class SocketServerTest extends JUnitSuite {
val kafkaMetricNames = metrics.metrics.keySet.asScala.filter(_.tags.asScala.get("listener").nonEmpty)
assertFalse(kafkaMetricNames.isEmpty)
- val expectedListeners = Set("PLAINTEXT", "TRACE")
+ val expectedListeners = Set("PLAINTEXT")
kafkaMetricNames.foreach { kafkaMetricName =>
assertTrue(expectedListeners.contains(kafkaMetricName.tags.get("listener")))
}
@@ -846,7 +860,7 @@ class SocketServerTest extends JUnitSuite {
*/
@Test
def configureNewConnectionException(): Unit = {
- withTestableServer { testableServer =>
+ withTestableServer (testWithServer = { testableServer =>
val testableSelector = testableServer.testableSelector
testableSelector.updateMinWakeup(2)
@@ -856,7 +870,7 @@ class SocketServerTest extends JUnitSuite {
TestUtils.waitUntilTrue(() => testableServer.connectionCount(localAddress) == 1, "Failed channel not removed")
assertProcessorHealthy(testableServer, testableSelector.notFailed(sockets))
- }
+ })
}
/**
@@ -871,7 +885,7 @@ class SocketServerTest extends JUnitSuite {
*/
@Test
def processNewResponseException(): Unit = {
- withTestableServer { testableServer =>
+ withTestableServer (testWithServer = { testableServer =>
val testableSelector = testableServer.testableSelector
testableSelector.updateMinWakeup(2)
@@ -879,12 +893,12 @@ class SocketServerTest extends JUnitSuite {
sockets.foreach(sendRequest(_, producerRequestBytes()))
testableServer.testableSelector.addFailure(SelectorOperation.Send)
- sockets.foreach(_ => processRequest(testableServer.requestChannel))
+ sockets.foreach(_ => processRequest(testableServer.dataPlaneRequestChannel))
testableSelector.waitForOperations(SelectorOperation.Send, 2)
testableServer.waitForChannelClose(testableSelector.allFailedChannels.head, locallyClosed = true)
assertProcessorHealthy(testableServer, testableSelector.notFailed(sockets))
- }
+ })
}
/**
@@ -894,13 +908,13 @@ class SocketServerTest extends JUnitSuite {
*/
@Test
def sendCancelledKeyException(): Unit = {
- withTestableServer { testableServer =>
+ withTestableServer (testWithServer = { testableServer =>
val testableSelector = testableServer.testableSelector
testableSelector.updateMinWakeup(2)
val sockets = (1 to 2).map(_ => connect(testableServer))
sockets.foreach(sendRequest(_, producerRequestBytes()))
- val requestChannel = testableServer.requestChannel
+ val requestChannel = testableServer.dataPlaneRequestChannel
val requests = sockets.map(_ => receiveRequest(requestChannel))
val failedConnectionId = requests(0).context.connectionId
@@ -912,7 +926,7 @@ class SocketServerTest extends JUnitSuite {
val successfulSocket = if (isSocketConnectionId(failedConnectionId, sockets(0))) sockets(1) else sockets(0)
assertProcessorHealthy(testableServer, Seq(successfulSocket))
- }
+ })
}
/**
@@ -922,7 +936,7 @@ class SocketServerTest extends JUnitSuite {
*/
@Test
def closingChannelException(): Unit = {
- withTestableServer { testableServer =>
+ withTestableServer (testWithServer = { testableServer =>
val testableSelector = testableServer.testableSelector
testableSelector.updateMinWakeup(2)
@@ -933,13 +947,13 @@ class SocketServerTest extends JUnitSuite {
testableSelector.addFailure(SelectorOperation.Send)
sockets(0).close()
- processRequest(testableServer.requestChannel, request)
- processRequest(testableServer.requestChannel) // Also process request from other channel
+ processRequest(testableServer.dataPlaneRequestChannel, request)
+ processRequest(testableServer.dataPlaneRequestChannel) // Also process request from other channel
testableSelector.waitForOperations(SelectorOperation.Send, 2)
testableServer.waitForChannelClose(request.context.connectionId, locallyClosed = true)
assertProcessorHealthy(testableServer, Seq(sockets(1)))
- }
+ })
}
/**
@@ -954,10 +968,10 @@ class SocketServerTest extends JUnitSuite {
*/
@Test
def processCompletedReceiveException(): Unit = {
- withTestableServer { testableServer =>
+ withTestableServer (testWithServer = { testableServer =>
val sockets = (1 to 2).map(_ => connect(testableServer))
val testableSelector = testableServer.testableSelector
- val requestChannel = testableServer.requestChannel
+ val requestChannel = testableServer.dataPlaneRequestChannel
testableSelector.cachedCompletedReceives.minPerPoll = 2
testableSelector.addFailure(SelectorOperation.Mute)
@@ -968,7 +982,7 @@ class SocketServerTest extends JUnitSuite {
requests.foreach(processRequest(requestChannel, _))
assertProcessorHealthy(testableServer, testableSelector.notFailed(sockets))
- }
+ })
}
/**
@@ -983,18 +997,18 @@ class SocketServerTest extends JUnitSuite {
*/
@Test
def processCompletedSendException(): Unit = {
- withTestableServer { testableServer =>
+ withTestableServer (testWithServer = { testableServer =>
val testableSelector = testableServer.testableSelector
val sockets = (1 to 2).map(_ => connect(testableServer))
val requests = sockets.map(sendAndReceiveRequest(_, testableServer))
testableSelector.addFailure(SelectorOperation.Unmute)
- requests.foreach(processRequest(testableServer.requestChannel, _))
+ requests.foreach(processRequest(testableServer.dataPlaneRequestChannel, _))
testableSelector.waitForOperations(SelectorOperation.Unmute, 2)
testableServer.waitForChannelClose(testableSelector.allFailedChannels.head, locallyClosed = true)
assertProcessorHealthy(testableServer, testableSelector.notFailed(sockets))
- }
+ })
}
/**
@@ -1007,7 +1021,7 @@ class SocketServerTest extends JUnitSuite {
*/
@Test
def processDisconnectedException(): Unit = {
- withTestableServer { testableServer =>
+ withTestableServer (testWithServer = { testableServer =>
val (socket, connectionId) = connectAndProcessRequest(testableServer)
val testableSelector = testableServer.testableSelector
@@ -1021,7 +1035,7 @@ class SocketServerTest extends JUnitSuite {
testableServer.waitForChannelClose(connectionId, locallyClosed = false)
assertProcessorHealthy(testableServer)
- }
+ })
}
/**
@@ -1029,7 +1043,7 @@ class SocketServerTest extends JUnitSuite {
*/
@Test
def pollException(): Unit = {
- withTestableServer { testableServer =>
+ withTestableServer (testWithServer = { testableServer =>
val (socket, _) = connectAndProcessRequest(testableServer)
val testableSelector = testableServer.testableSelector
@@ -1038,7 +1052,7 @@ class SocketServerTest extends JUnitSuite {
testableSelector.waitForOperations(SelectorOperation.Poll, 2)
assertProcessorHealthy(testableServer, Seq(socket))
- }
+ })
}
/**
@@ -1046,7 +1060,7 @@ class SocketServerTest extends JUnitSuite {
*/
@Test
def controlThrowable(): Unit = {
- withTestableServer { testableServer =>
+ withTestableServer (testWithServer = { testableServer =>
connectAndProcessRequest(testableServer)
val testableSelector = testableServer.testableSelector
@@ -1056,12 +1070,12 @@ class SocketServerTest extends JUnitSuite {
testableSelector.waitForOperations(SelectorOperation.Poll, 1)
testableSelector.waitForOperations(SelectorOperation.CloseSelector, 1)
- }
+ })
}
- private def withTestableServer(testWithServer: TestableSocketServer => Unit): Unit = {
+ private def withTestableServer(config : KafkaConfig = config, testWithServer: TestableSocketServer => Unit): Unit = {
props.put("listeners", "PLAINTEXT://localhost:0")
- val testableServer = new TestableSocketServer
+ val testableServer = new TestableSocketServer(config)
testableServer.startup()
try {
testWithServer(testableServer)
@@ -1070,10 +1084,15 @@ class SocketServerTest extends JUnitSuite {
}
}
+ def sendAndReceiveControllerRequest(socket: Socket, server: SocketServer): RequestChannel.Request = {
+ sendRequest(socket, producerRequestBytes())
+ receiveRequest(server.controlPlaneRequestChannelOpt.get)
+ }
+
private def assertProcessorHealthy(testableServer: TestableSocketServer, healthySockets: Seq[Socket] = Seq.empty): Unit = {
val selector = testableServer.testableSelector
selector.reset()
- val requestChannel = testableServer.requestChannel
+ val requestChannel = testableServer.dataPlaneRequestChannel
// Check that existing channels behave as expected
healthySockets.foreach { socket =>
@@ -1096,18 +1115,17 @@ class SocketServerTest extends JUnitSuite {
def isSocketConnectionId(connectionId: String, socket: Socket): Boolean =
connectionId.contains(s":${socket.getLocalPort}-")
- class TestableSocketServer extends SocketServer(KafkaConfig.fromProps(props),
+ class TestableSocketServer(config : KafkaConfig = config) extends SocketServer(config,
new Metrics, Time.SYSTEM, credentialProvider) {
@volatile var selector: Option[TestableSelector] = None
- override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
+ override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas, config.connectionsMaxIdleMs,
config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics, credentialProvider, memoryPool, new LogContext()) {
override protected[network] def createSelector(channelBuilder: ChannelBuilder): Selector = {
- val testableSelector = new TestableSelector(config, channelBuilder, time, metrics)
- assertEquals(None, selector)
+ val testableSelector = new TestableSelector(config, channelBuilder, time, metrics, metricTags.asScala)
selector = Some(testableSelector)
testableSelector
}
@@ -1131,7 +1149,7 @@ class SocketServerTest extends JUnitSuite {
val openCount = selector.allChannels.size - 1 // minus one for the channel just closed above
TestUtils.waitUntilTrue(() => connectionCount(localAddress) == openCount, "Connection count not decremented")
TestUtils.waitUntilTrue(() =>
- processor(0).inflightResponseCount == 0, "Inflight responses not cleared")
+ dataPlaneProcessor(0).inflightResponseCount == 0, "Inflight responses not cleared")
assertNull("Channel not removed", selector.channel(connectionId))
assertNull("Closing channel not removed", selector.closingChannel(connectionId))
}
@@ -1149,9 +1167,9 @@ class SocketServerTest extends JUnitSuite {
case object CloseSelector extends SelectorOperation
}
- class TestableSelector(config: KafkaConfig, channelBuilder: ChannelBuilder, time: Time, metrics: Metrics)
+ class TestableSelector(config: KafkaConfig, channelBuilder: ChannelBuilder, time: Time, metrics: Metrics, metricTags: mutable.Map[String, String] = mutable.Map.empty)
extends Selector(config.socketRequestMaxBytes, config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs,
- metrics, time, "socket-server", new HashMap, false, true, channelBuilder, MemoryPool.NONE, new LogContext()) {
+ metrics, time, "socket-server", metricTags.asJava, false, true, channelBuilder, MemoryPool.NONE, new LogContext()) {
val failures = mutable.Map[SelectorOperation, Exception]()
val operationCounts = mutable.Map[SelectorOperation, Int]().withDefaultValue(0)
@@ -1302,4 +1320,4 @@ class SocketServerTest extends JUnitSuite {
sockets.filterNot(socket => isSocketConnectionId(failedConnectionId, socket))
}
}
-}
+}
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 789dbaeb8f762..f5cc134297024 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -93,7 +93,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
props.put(DynamicConfig.Client.ProducerByteRateOverrideProp, "1000")
props.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, "2000")
- val quotaManagers = servers.head.apis.quotas
+ val quotaManagers = servers.head.dataPlaneRequestProcessor.quotas
rootEntityType match {
case ConfigType.Client => adminZkClient.changeClientIdConfig(configEntityName, props)
case _ => adminZkClient.changeUserOrUserClientIdConfig(configEntityName, props)
@@ -179,7 +179,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
// Remove config change znodes to force quota initialization only through loading of user/client quotas
zkClient.getChildren(ConfigEntityChangeNotificationZNode.path).foreach { p => zkClient.deletePath(ConfigEntityChangeNotificationZNode.path + "/" + p) }
server.startup()
- val quotaManagers = server.apis.quotas
+ val quotaManagers = server.dataPlaneRequestProcessor.quotas
assertEquals(Quota.upperBound(1000), quotaManagers.produce.quota("someuser", "overriddenClientId"))
assertEquals(Quota.upperBound(2000), quotaManagers.fetch.quota("someuser", "overriddenClientId"))
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 10dba1a479394..f21f38427b2d0 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -229,6 +229,31 @@ class KafkaConfigTest {
assertFalse(isValidKafkaConfig(props))
}
+ @Test
+ def testControlPlaneListenerName() = {
+ val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+ props.put("listeners", "PLAINTEXT://localhost:0,CONTROLLER://localhost:5000")
+ props.put("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
+ props.put("control.plane.listener.name", "CONTROLLER")
+ assertTrue(isValidKafkaConfig(props))
+
+ val serverConfig = KafkaConfig.fromProps(props)
+ val controlEndpoint = serverConfig.controlPlaneListener.get
+ assertEquals("localhost", controlEndpoint.host)
+ assertEquals(5000, controlEndpoint.port)
+ assertEquals(SecurityProtocol.SSL, controlEndpoint.securityProtocol)
+
+ //advertised listener should contain control-plane listener
+ val advertisedEndpoints = serverConfig.advertisedListeners
+ assertFalse(advertisedEndpoints.filter { endpoint =>
+ endpoint.securityProtocol == controlEndpoint.securityProtocol && endpoint.listenerName.value().equals(controlEndpoint.listenerName.value())
+ }.isEmpty)
+
+ // interBrokerListener name should be different from control-plane listener name
+ val interBrokerListenerName = serverConfig.interBrokerListenerName
+ assertFalse(interBrokerListenerName.value().equals(controlEndpoint.listenerName.value()))
+ }
+
@Test
def testBadListenerProtocol() {
val props = new Properties()
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index 4e60a8f602792..ef3dece306325 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -74,7 +74,7 @@ class MetadataRequestTest extends BaseRequestTest {
assertNotEquals("Controller id should switch to a new broker", controllerId, controllerId2)
TestUtils.waitUntilTrue(() => {
val metadataResponse2 = sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort))
- metadataResponse2.controller != null && controllerServer2.apis.brokerId == metadataResponse2.controller.id
+ metadataResponse2.controller != null && controllerServer2.dataPlaneRequestProcessor.brokerId == metadataResponse2.controller.id
}, "Controller id should match the active controller after failover", 5000)
}
@@ -178,7 +178,7 @@ class MetadataRequestTest extends BaseRequestTest {
assertEquals(topic1, topicMetadata1.topic)
assertEquals(Errors.INVALID_TOPIC_EXCEPTION, topicMetadata2.error)
assertEquals(topic2, topicMetadata2.topic)
-
+
TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 0)
TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 0)
@@ -250,7 +250,7 @@ class MetadataRequestTest extends BaseRequestTest {
val metadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, true, 1.toShort))
val partitionMetadata = metadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
val downNode = servers.find { server =>
- val serverId = server.apis.brokerId
+ val serverId = server.dataPlaneRequestProcessor.brokerId
val leaderId = partitionMetadata.leader.id
val replicaIds = partitionMetadata.replicas.asScala.map(_.id)
serverId != leaderId && replicaIds.contains(serverId)
@@ -260,7 +260,7 @@ class MetadataRequestTest extends BaseRequestTest {
TestUtils.waitUntilTrue(() => {
val response = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, true, 1.toShort))
val metadata = response.topicMetadata.asScala.head.partitionMetadata.asScala.head
- val replica = metadata.replicas.asScala.find(_.id == downNode.apis.brokerId).get
+ val replica = metadata.replicas.asScala.find(_.id == downNode.dataPlaneRequestProcessor.brokerId).get
replica.host == "" & replica.port == -1
}, "Replica was not found down", 5000)
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index ae8353657a693..c9e4e7890ebf6 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -99,7 +99,7 @@ class RequestQuotaTest extends BaseRequestTest {
adminZkClient.changeClientIdConfig(Sanitizer.sanitize(smallQuotaConsumerClientId), quotaProps)
TestUtils.retry(10000) {
- val quotaManager = servers.head.apis.quotas.request
+ val quotaManager = servers.head.dataPlaneRequestProcessor.quotas.request
assertEquals(s"Default request quota not set", Quota.upperBound(0.01), quotaManager.quota("some-user", "some-client"))
assertEquals(s"Request quota override not set", Quota.upperBound(2000), quotaManager.quota("some-user", unthrottledClientId))
}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index e5ea6a4baae37..ecdb8c8739cc5 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -835,7 +835,7 @@ object TestUtils extends Logging {
timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Unit = {
val expectedBrokerIds = servers.map(_.config.brokerId).toSet
TestUtils.waitUntilTrue(() => servers.forall(server =>
- expectedBrokerIds == server.apis.metadataCache.getAliveBrokers.map(_.id).toSet
+ expectedBrokerIds == server.dataPlaneRequestProcessor.metadataCache.getAliveBrokers.map(_.id).toSet
), "Timed out waiting for broker metadata to propagate to all servers", timeout)
}
@@ -855,7 +855,7 @@ object TestUtils extends Logging {
TestUtils.waitUntilTrue(() =>
servers.foldLeft(true) {
(result, server) =>
- val partitionStateOpt = server.apis.metadataCache.getPartitionInfo(topic, partition)
+ val partitionStateOpt = server.dataPlaneRequestProcessor.metadataCache.getPartitionInfo(topic, partition)
partitionStateOpt match {
case None => false
case Some(partitionState) =>
diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
index b15f8ac47b576..c120caa458142 100644
--- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
@@ -313,8 +313,8 @@ class AdminZkClientTest extends ZooKeeperTestHarness with Logging with RackAware
// Test that the existing clientId overrides are read
val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
servers = Seq(server)
- assertEquals(new Quota(1000, true), server.apis.quotas.produce.quota("ANONYMOUS", clientId))
- assertEquals(new Quota(2000, true), server.apis.quotas.fetch.quota("ANONYMOUS", clientId))
+ assertEquals(new Quota(1000, true), server.dataPlaneRequestProcessor.quotas.produce.quota("ANONYMOUS", clientId))
+ assertEquals(new Quota(2000, true), server.dataPlaneRequestProcessor.quotas.fetch.quota("ANONYMOUS", clientId))
}
@Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 8a2122d8e37ad..efd0d49be44cd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -517,7 +517,7 @@ public static void waitUntilMetadataIsPropagated(final List servers
final long timeout) throws InterruptedException {
TestUtils.waitForCondition(() -> {
for (final KafkaServer server : servers) {
- final MetadataCache metadataCache = server.apis().metadataCache();
+ final MetadataCache metadataCache = server.dataPlaneRequestProcessor().metadataCache();
final Option partitionInfo =
metadataCache.getPartitionInfo(topic, partition);
if (partitionInfo.isEmpty()) {