From b9a786aa79377f8f63ecc11ffaad2375e2f21138 Mon Sep 17 00:00:00 2001 From: mgharat Date: Thu, 6 Dec 2018 08:36:40 -0800 Subject: [PATCH 1/3] Added support for separating the control-plane(controller requests) from the data-plane(client and other broker requests) --- .../controller/ControllerChannelManager.scala | 8 +- .../scala/kafka/network/RequestChannel.scala | 13 +- .../scala/kafka/network/SocketServer.scala | 235 ++++++++++++------ .../kafka/server/DynamicBrokerConfig.scala | 2 +- .../main/scala/kafka/server/KafkaConfig.scala | 61 ++++- .../kafka/server/KafkaRequestHandler.scala | 10 +- .../main/scala/kafka/server/KafkaServer.scala | 38 ++- .../api/AdminClientIntegrationTest.scala | 4 +- .../kafka/api/AuthorizerIntegrationTest.scala | 10 +- .../integration/kafka/api/BaseQuotaTest.scala | 2 +- .../kafka/api/EndToEndAuthorizationTest.scala | 46 ++-- .../SaslSslAdminClientIntegrationTest.scala | 4 +- .../DynamicBrokerReconfigurationTest.scala | 6 +- .../scala/unit/kafka/admin/AdminTest.scala | 4 +- .../ControllerIntegrationTest.scala | 41 ++- .../unit/kafka/network/SocketServerTest.scala | 186 ++++++++------ .../server/DynamicConfigChangeTest.scala | 4 +- .../unit/kafka/server/KafkaConfigTest.scala | 25 ++ .../kafka/server/MetadataRequestTest.scala | 8 +- .../unit/kafka/server/RequestQuotaTest.scala | 2 +- .../scala/unit/kafka/utils/TestUtils.scala | 4 +- .../unit/kafka/zk/AdminZkClientTest.scala | 4 +- .../utils/IntegrationTestUtils.java | 2 +- 23 files changed, 483 insertions(+), 236 deletions(-) diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index b5c6a910b0af9..083e95258228f 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -107,14 +107,16 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf private def addNewBroker(broker: Broker) { val messageQueue = new LinkedBlockingQueue[QueueItem] debug(s"Controller ${config.brokerId} trying to connect to broker ${broker.id}") - val brokerNode = broker.node(config.interBrokerListenerName) + val controllerToBrokerListenerName = config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName) + val controllerToBrokerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol) + val brokerNode = broker.node(controllerToBrokerListenerName) val logContext = new LogContext(s"[Controller id=${config.brokerId}, targetBrokerId=${brokerNode.idString}] ") val networkClient = { val channelBuilder = ChannelBuilders.clientChannelBuilder( - config.interBrokerSecurityProtocol, + controllerToBrokerSecurityProtocol, JaasContext.Type.SERVER, config, - config.interBrokerListenerName, + controllerToBrokerListenerName, config.saslMechanismInterBrokerProtocol, time, config.saslInterBrokerHandshakeRequestEnable diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 00b09688c5b2c..988c14f263e75 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -41,6 +41,7 @@ object RequestChannel extends Logging { val RequestQueueSizeMetric = "RequestQueueSize" val ResponseQueueSizeMetric = "ResponseQueueSize" + val ControlPlaneMetricPrefix = "ControlPlane" val ProcessorMetricTag = "processor" def isRequestLoggingEnabled: Boolean = requestLogger.underlying.isDebugEnabled @@ -272,17 +273,19 @@ object RequestChannel extends Logging { } } -class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup { +class RequestChannel(val queueSize: Int, val metricNamePrefix : String = "") extends KafkaMetricsGroup { import RequestChannel._ val metrics = new RequestChannel.Metrics private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize) private val processors = new ConcurrentHashMap[Int, Processor]() + val requestQueueSizeMetricName = metricNamePrefix.concat(RequestQueueSizeMetric) + val responseQueueSizeMetricName = metricNamePrefix.concat(ResponseQueueSizeMetric) - newGauge(RequestQueueSizeMetric, new Gauge[Int] { + newGauge(requestQueueSizeMetricName, new Gauge[Int] { def value = requestQueue.size }) - newGauge(ResponseQueueSizeMetric, new Gauge[Int]{ + newGauge(responseQueueSizeMetricName, new Gauge[Int]{ def value = processors.values.asScala.foldLeft(0) {(total, processor) => total + processor.responseQueueSize } @@ -292,7 +295,7 @@ class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup { if (processors.putIfAbsent(processor.id, processor) != null) warn(s"Unexpected processor with processorId ${processor.id}") - newGauge(ResponseQueueSizeMetric, + newGauge(responseQueueSizeMetricName, new Gauge[Int] { def value = processor.responseQueueSize }, @@ -302,7 +305,7 @@ class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup { def removeProcessor(processorId: Int): Unit = { processors.remove(processorId) - removeMetric(ResponseQueueSizeMetric, Map(ProcessorMetricTag -> processorId.toString)) + removeMetric(responseQueueSizeMetricName, Map(ProcessorMetricTag -> processorId.toString)) } /** Send a request to be handled, potentially blocking until there is room in the queue for the request */ diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index ae09a03b7dd8d..22940db38364c 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -51,13 +51,28 @@ import scala.collection.mutable.{ArrayBuffer, Buffer} import scala.util.control.ControlThrowable /** - * An NIO socket server. The threading model is - * 1 Acceptor thread that handles new connections - * Acceptor has N Processor threads that each have their own selector and read requests from sockets - * M Handler threads that handle requests and produce responses back to the processor threads for writing. + * Handles new connections, requests and responses to and from broker. + * Kafka supports two types of request planes : + * - data-plane : + * - Handles requests from clients and other brokers in the cluster. + * - The threading model is + * 1 Acceptor thread that handles new connections + * Acceptor has N Processor threads that each have their own selector and read requests from sockets + * M Handler threads that handle requests and produce responses back to the processor threads for writing. + * - control-plane : + * - Handles requests from controller. This is optional and can be configured by specifying "control.plane.listener.name". + * If not configured, the controller requests are handled by the data-plane. + * - The threading model is + * 1 Acceptor thread that handles new connections + * Acceptor has 1 Processor thread that has its own selector and read requests from the socket. + * 1 Handler thread that handles requests and produce responses back to the processor thread for writing. */ + class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time, val credentialProvider: CredentialProvider) extends Logging with KafkaMetricsGroup { + val DataPlanePrefix = "data-plane" + val ControlPlanePrefix = "control-plane" + private val maxQueuedRequests = config.queuedMaxRequests private val logContext = new LogContext(s"[SocketServer brokerId=${config.brokerId}] ") @@ -68,11 +83,16 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal", "socket-server-metrics") memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName)) private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE - val requestChannel = new RequestChannel(maxQueuedRequests) - private val processors = new ConcurrentHashMap[Int, Processor]() - private var nextProcessorId = 0 + // data plane + private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]() + private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]() + val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests) + // control plane + private var controlPlaneProcessorOpt : Option[Processor] = None + private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None + val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ => new RequestChannel(20, RequestChannel.ControlPlaneMetricPrefix)) - private[network] val acceptors = new ConcurrentHashMap[EndPoint, Acceptor]() + private var nextProcessorId = 0 private var connectionQuotas: ConnectionQuotas = _ private var stoppedProcessingRequests = false @@ -91,22 +111,37 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time def startup(startupProcessors: Boolean = true) { this.synchronized { connectionQuotas = new ConnectionQuotas(config.maxConnectionsPerIp, config.maxConnectionsPerIpOverrides) - createAcceptorAndProcessors(config.numNetworkThreads, config.listeners) + createControlPlaneAcceptorAndProcessor(config.controlPlaneListener) + createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners) if (startupProcessors) { - startProcessors() + startControlPlaneProcessor() + startDataPlaneProcessors() } } - newGauge("NetworkProcessorAvgIdlePercent", + newGauge("DataPlaneNetworkProcessorAvgIdlePercent", new Gauge[Double] { def value = SocketServer.this.synchronized { - val ioWaitRatioMetricNames = processors.values.asScala.map { p => + val ioWaitRatioMetricNames = dataPlaneProcessors.values.asScala.map { p => metrics.metricName("io-wait-ratio", "socket-server-metrics", p.metricTags) } ioWaitRatioMetricNames.map { metricName => Option(metrics.metric(metricName)).fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0)) - }.sum / processors.size + }.sum / dataPlaneProcessors.size + } + } + ) + newGauge("ControlPlaneNetworkProcessorAvgIdlePercent", + new Gauge[Double] { + + def value = SocketServer.this.synchronized { + val ioWaitRatioMetricName = controlPlaneProcessorOpt.map { p => + metrics.metricName("io-wait-ratio", "socket-server-metrics", p.metricTags) + } + ioWaitRatioMetricName.map { metricName => + Option(metrics.metric(metricName)).fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0)) + }.getOrElse(0.0) } } ) @@ -124,7 +159,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time new Gauge[Double] { def value = SocketServer.this.synchronized { - val expiredConnectionsKilledCountMetricNames = processors.values.asScala.map { p => + val expiredConnectionsKilledCountMetricNames = dataPlaneProcessors.values.asScala.map { p => metrics.metricName("expired-connections-killed-count", "socket-server-metrics", p.metricTags) } expiredConnectionsKilledCountMetricNames.map { metricName => @@ -133,96 +168,146 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time } } ) - info(s"Started ${acceptors.size} acceptor threads") + newGauge("ControlPlaneExpiredConnectionsKilledCount", + new Gauge[Double] { + + def value = SocketServer.this.synchronized { + val expiredConnectionsKilledCountMetricNames = controlPlaneProcessorOpt.map { p => + metrics.metricName("expired-connections-killed-count", "socket-server-metrics", p.metricTags) + } + expiredConnectionsKilledCountMetricNames.map { metricName => + Option(metrics.metric(metricName)).fold(0.0)(m => m.metricValue.asInstanceOf[Double]) + }.getOrElse(0.0) + } + } + ) + info(s"Started ${dataPlaneAcceptors.size} acceptor threads for data-plane") + controlPlaneAcceptorOpt.foreach(_ => info("Started 1 acceptor thread for control-plane")) + } + + /** + * Starts processors of all the data plane acceptors of this server if they have not already been started. + * This method is used for delayed starting of data plane processors if [[kafka.network.SocketServer#startup]] + * was invoked with `startupProcessors=false`. + */ + def startDataPlaneProcessors(): Unit = synchronized { + dataPlaneAcceptors.values.asScala.foreach { _.startProcessors(DataPlanePrefix) } + info(s"Started data plane processors for ${dataPlaneAcceptors.size} acceptors") } /** - * Starts processors of all the acceptors of this server if they have not already been started. - * This method is used for delayed starting of processors if [[kafka.network.SocketServer#startup]] + * Start the processor of control plane acceptor of this server if it has not already been started. + * This method is used for delayed starting of control plane processor if [[kafka.network.SocketServer#startup]] * was invoked with `startupProcessors=false`. */ - def startProcessors(): Unit = synchronized { - acceptors.values.asScala.foreach { _.startProcessors() } - info(s"Started processors for ${acceptors.size} acceptors") + def startControlPlaneProcessor(): Unit = synchronized { + controlPlaneAcceptorOpt.foreach { acceptor => + acceptor.startProcessors(ControlPlanePrefix) + info(s"Started control plane processor for ${controlPlaneAcceptorOpt.size} acceptors") + } } private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap - private def createAcceptorAndProcessors(processorsPerListener: Int, - endpoints: Seq[EndPoint]): Unit = synchronized { + private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int, + endpoints: Seq[EndPoint]): Unit = synchronized { + endpoints.foreach { endpoint => + val dataPlaneAcceptor = createAcceptor(endpoint) + addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener) + KafkaThread.nonDaemon(s"data-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", dataPlaneAcceptor).start() + dataPlaneAcceptor.awaitStartup() + dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor) + info(s"Created data-plane acceptor and processors for endpoint : $endpoint") + } + } + private def createControlPlaneAcceptorAndProcessor(endpointOpt: Option[EndPoint]): Unit = synchronized { + endpointOpt.foreach { endpoint => + controlPlaneAcceptorOpt = Some(createAcceptor(endpoint)) + controlPlaneProcessorOpt = Some(newProcessor(nextProcessorId, controlPlaneRequestChannelOpt.get, connectionQuotas, endpoint.listenerName, endpoint.securityProtocol, memoryPool)) + val listenerProcessors = new ArrayBuffer[Processor]() + controlPlaneProcessorOpt.foreach { processor => + listenerProcessors += processor + controlPlaneRequestChannelOpt.foreach(_.addProcessor(processor)) + val controlPlaneAcceptor = controlPlaneAcceptorOpt.get + controlPlaneAcceptor.addProcessors(listenerProcessors, ControlPlanePrefix) + KafkaThread.nonDaemon(s"control-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", controlPlaneAcceptor).start() + controlPlaneAcceptor.awaitStartup() + } + info(s"Created control-plane acceptor and processor for endpoint : $endpoint") + } + } + + private def createAcceptor(endPoint: EndPoint) : Acceptor = synchronized { val sendBufferSize = config.socketSendBufferBytes val recvBufferSize = config.socketReceiveBufferBytes val brokerId = config.brokerId - - endpoints.foreach { endpoint => - val listenerName = endpoint.listenerName - val securityProtocol = endpoint.securityProtocol - - val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas) - addProcessors(acceptor, endpoint, processorsPerListener) - KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start() - acceptor.awaitStartup() - acceptors.put(endpoint, acceptor) - } + new Acceptor(endPoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas) } - private def addProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = synchronized { + private def addDataPlaneProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = synchronized { val listenerName = endpoint.listenerName val securityProtocol = endpoint.securityProtocol val listenerProcessors = new ArrayBuffer[Processor]() - for (_ <- 0 until newProcessorsPerListener) { - val processor = newProcessor(nextProcessorId, connectionQuotas, listenerName, securityProtocol, memoryPool) + val processor = newProcessor(nextProcessorId, dataPlaneRequestChannel, connectionQuotas, listenerName, securityProtocol, memoryPool) listenerProcessors += processor - requestChannel.addProcessor(processor) + dataPlaneRequestChannel.addProcessor(processor) nextProcessorId += 1 } - listenerProcessors.foreach(p => processors.put(p.id, p)) - acceptor.addProcessors(listenerProcessors) + listenerProcessors.foreach(p => dataPlaneProcessors.put(p.id, p)) + acceptor.addProcessors(listenerProcessors, DataPlanePrefix) } /** - * Stop processing requests and new connections. - */ + * Stop processing requests and new connections. + */ def stopProcessingRequests() = { info("Stopping socket server request processors") this.synchronized { - acceptors.asScala.values.foreach(_.shutdown()) - processors.asScala.values.foreach(_.shutdown()) - requestChannel.clear() + dataPlaneAcceptors.asScala.values.foreach(_.shutdown()) + controlPlaneAcceptorOpt.foreach(_.shutdown()) + dataPlaneProcessors.asScala.values.foreach(_.shutdown()) + controlPlaneProcessorOpt.foreach(_.shutdown()) + dataPlaneRequestChannel.clear() + controlPlaneRequestChannelOpt.foreach(_.clear()) stoppedProcessingRequests = true } info("Stopped socket server request processors") } def resizeThreadPool(oldNumNetworkThreads: Int, newNumNetworkThreads: Int): Unit = synchronized { - info(s"Resizing network thread pool size for each listener from $oldNumNetworkThreads to $newNumNetworkThreads") + info(s"Resizing network thread pool size for each data plane listener from $oldNumNetworkThreads to $newNumNetworkThreads") if (newNumNetworkThreads > oldNumNetworkThreads) { - acceptors.asScala.foreach { case (endpoint, acceptor) => - addProcessors(acceptor, endpoint, newNumNetworkThreads - oldNumNetworkThreads) + dataPlaneAcceptors.asScala.foreach { case (endpoint, acceptor) => + addDataPlaneProcessors(acceptor, endpoint, newNumNetworkThreads - oldNumNetworkThreads) } } else if (newNumNetworkThreads < oldNumNetworkThreads) - acceptors.asScala.values.foreach(_.removeProcessors(oldNumNetworkThreads - newNumNetworkThreads, requestChannel)) + dataPlaneAcceptors.asScala.values.foreach(_.removeProcessors(oldNumNetworkThreads - newNumNetworkThreads, dataPlaneRequestChannel)) } /** - * Shutdown the socket server. If still processing requests, shutdown - * acceptors and processors first. - */ + * Shutdown the socket server. If still processing requests, shutdown + * acceptors and processors first. + */ def shutdown() = { info("Shutting down socket server") this.synchronized { if (!stoppedProcessingRequests) stopProcessingRequests() - requestChannel.shutdown() + dataPlaneRequestChannel.shutdown() + controlPlaneRequestChannelOpt.foreach(_.shutdown()) } info("Shutdown completed") } def boundPort(listenerName: ListenerName): Int = { try { - acceptors.get(endpoints(listenerName)).serverChannel.socket.getLocalPort + if (dataPlaneAcceptors.containsKey(endpoints(listenerName))) { + dataPlaneAcceptors.get(endpoints(listenerName)).serverChannel.socket.getLocalPort + } else { + controlPlaneAcceptorOpt.map (_.serverChannel.socket().getLocalPort).getOrElse(throw new KafkaException()) + } } catch { case e: Exception => throw new KafkaException("Tried to check server's port before server was started or checked for port of non-existing protocol", e) @@ -230,15 +315,15 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time } def addListeners(listenersAdded: Seq[EndPoint]): Unit = synchronized { - info(s"Adding listeners for endpoints $listenersAdded") - createAcceptorAndProcessors(config.numNetworkThreads, listenersAdded) - startProcessors() + info(s"Adding data-plane listeners for endpoints $listenersAdded") + createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, listenersAdded) + startDataPlaneProcessors() } def removeListeners(listenersRemoved: Seq[EndPoint]): Unit = synchronized { - info(s"Removing listeners for endpoints $listenersRemoved") + info(s"Removing data-plane listeners for endpoints $listenersRemoved") listenersRemoved.foreach { endpoint => - acceptors.asScala.remove(endpoint).foreach(_.shutdown()) + dataPlaneAcceptors.asScala.remove(endpoint).foreach(_.shutdown()) } } @@ -252,8 +337,8 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time connectionQuotas.updateMaxConnectionsPerIpOverride(maxConnectionsPerIpOverrides) } - /* `protected` for test usage */ - protected[network] def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName, + // `protected` for test usage + protected[network] def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName, securityProtocol: SecurityProtocol, memoryPool: MemoryPool): Processor = { new Processor(id, time, @@ -272,12 +357,12 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time ) } - /* For test usage */ + // For test usage private[network] def connectionCount(address: InetAddress): Int = Option(connectionQuotas).fold(0)(_.get(address)) - /* For test usage */ - private[network] def processor(index: Int): Processor = processors.get(index) + // For test usage + private[network] def dataPlaneProcessor(index: Int): Processor = dataPlaneProcessors.get(index) } @@ -357,21 +442,21 @@ private[kafka] class Acceptor(val endPoint: EndPoint, private val processors = new ArrayBuffer[Processor]() private val processorsStarted = new AtomicBoolean - private[network] def addProcessors(newProcessors: Buffer[Processor]): Unit = synchronized { + private[network] def addProcessors(newProcessors: Buffer[Processor], processorThreadPrefix: String): Unit = synchronized { processors ++= newProcessors if (processorsStarted.get) - startProcessors(newProcessors) + startProcessors(newProcessors, processorThreadPrefix) } - private[network] def startProcessors(): Unit = synchronized { + private[network] def startProcessors(processorThreadPrefix: String): Unit = synchronized { if (!processorsStarted.getAndSet(true)) { - startProcessors(processors) + startProcessors(processors, processorThreadPrefix) } } - private def startProcessors(processors: Seq[Processor]): Unit = synchronized { + private def startProcessors(processors: Seq[Processor], processorThreadPrefix: String): Unit = synchronized { processors.foreach { processor => - KafkaThread.nonDaemon(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}", + KafkaThread.nonDaemon(processorThreadPrefix + s"-kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}", processor).start() } } @@ -444,9 +529,9 @@ private[kafka] class Acceptor(val endPoint: EndPoint, } } - /* - * Create a server socket to listen for connections on. - */ + /** + * Create a server socket to listen for connections on. + */ private def openServerSocket(host: String, port: Int): ServerSocketChannel = { val socketAddress = if (host == null || host.trim.isEmpty) @@ -468,7 +553,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint, serverChannel } - /* + /** * Accept a new connection */ def accept(key: SelectionKey, processor: Processor) { @@ -564,7 +649,7 @@ private[kafka] class Processor(val id: Int, // also includes the listener name) Map(NetworkProcessorMetricTag -> id.toString) ) - + val expiredConnectionsKilledCount = new Total() private val expiredConnectionsKilledCountMetricName = metrics.metricName("expired-connections-killed-count", "socket-server-metrics", metricTags) metrics.addMetric(expiredConnectionsKilledCountMetricName, expiredConnectionsKilledCount) @@ -688,7 +773,7 @@ private[kafka] class Processor(val id: Int, } } - /* `protected` for test usage */ + // `protected` for test usage protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) { val connectionId = response.request.context.connectionId trace(s"Socket server received response to send to $connectionId, registering for write and sending data: $response") diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index 2c0f6c1b52c2f..1c5657263c8d9 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -640,7 +640,7 @@ class DynamicThreadPool(server: KafkaServer) extends BrokerReconfigurable { override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { if (newConfig.numIoThreads != oldConfig.numIoThreads) - server.requestHandlerPool.resizeThreadPool(newConfig.numIoThreads) + server.dataPlaneRequestHandlerPool.resizeThreadPool(newConfig.numIoThreads) if (newConfig.numNetworkThreads != oldConfig.numNetworkThreads) server.socketServer.resizeThreadPool(oldConfig.numNetworkThreads, newConfig.numNetworkThreads) if (newConfig.numReplicaFetchers != oldConfig.numReplicaFetchers) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 13f555a20594c..7b13130d04ea3 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -286,6 +286,7 @@ object KafkaConfig { val AdvertisedPortProp = "advertised.port" val AdvertisedListenersProp = "advertised.listeners" val ListenerSecurityProtocolMapProp = "listener.security.protocol.map" + val ControlPlaneListenerNameProp = "control.plane.listener.name" val SocketSendBufferBytesProp = "socket.send.buffer.bytes" val SocketReceiveBufferBytesProp = "socket.receive.buffer.bytes" val SocketRequestMaxBytesProp = "socket.request.max.bytes" @@ -546,6 +547,22 @@ object KafkaConfig { "prefix (the listener name is lowercased) to the config name. For example, to set a different keystore for the " + "INTERNAL listener, a config with name listener.name.internal.ssl.keystore.location would be set. " + "If the config for the listener name is not set, the config will fallback to the generic config (i.e. ssl.keystore.location). " + val controlPlaneListenerNameDoc = "Name of listener used for communication between controller and brokers. " + + s"Broker will use the $ControlPlaneListenerNameProp to locate the endpoint in $ListenersProp list, to listen for connections from the controller. " + + "For example, if a broker's config is :\n" + + "listeners = INTERNAL://192.1.1.8:9092, EXTERNAL://10.1.1.5:9093, CONTROLLER://192.1.1.8:9094\n" + + "listener.security.protocol.map = INTERNAL:PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL\n" + + "control.plane.listener.name = CONTROLLER\n" + + "On startup, the broker will start listening on \"192.1.1.8:9094\" with security protocol \"SSL\".\n" + + s"On controller side, when it discovers a broker's published endpoints through zookeeper, it will use the $ControlPlaneListenerNameProp " + + "to find the endpoint, which it will use to establish connection to the broker.\n" + + "For example, if the broker's published endpoints on zookeeper are :\n" + + "\"endpoints\" : [\"INTERNAL://broker1.example.com:9092\",\"EXTERNAL://broker1.example.com:9093\",\"CONTROLLER://broker1.example.com:9094\"]\n" + + " and the controller's config is :\n" + + "listener.security.protocol.map = INTERNAL:PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL\n" + + "control.plane.listener.name = CONTROLLER\n" + + "then controller will use \"broker1.example.com:9094\" with security protocol \"SSL\" to connect to the broker.\n" + + "If not explicitly configured, the default value will be null and there will be no dedicated endpoints for controller connections." val SocketSendBufferBytesDoc = "The SO_SNDBUF buffer of the socket sever sockets. If the value is -1, the OS default will be used." val SocketReceiveBufferBytesDoc = "The SO_RCVBUF buffer of the socket sever sockets. If the value is -1, the OS default will be used." @@ -846,6 +863,7 @@ object KafkaConfig { .define(AdvertisedPortProp, INT, null, HIGH, AdvertisedPortDoc) .define(AdvertisedListenersProp, STRING, null, HIGH, AdvertisedListenersDoc) .define(ListenerSecurityProtocolMapProp, STRING, Defaults.ListenerSecurityProtocolMap, LOW, ListenerSecurityProtocolMapDoc) + .define(ControlPlaneListenerNameProp, STRING, null, HIGH, controlPlaneListenerNameDoc) .define(SocketSendBufferBytesProp, INT, Defaults.SocketSendBufferBytes, HIGH, SocketSendBufferBytesDoc) .define(SocketReceiveBufferBytesProp, INT, Defaults.SocketReceiveBufferBytes, HIGH, SocketReceiveBufferBytesDoc) .define(SocketRequestMaxBytesProp, INT, Defaults.SocketRequestMaxBytes, atLeast(1), HIGH, SocketRequestMaxBytesDoc) @@ -1265,6 +1283,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO def interBrokerListenerName = getInterBrokerListenerNameAndSecurityProtocol._1 def interBrokerSecurityProtocol = getInterBrokerListenerNameAndSecurityProtocol._2 + def controlPlaneListenerName = getControlPlaneListenerNameAndSecurityProtocol.map(_._1) + def controlPlaneSecurityProtocol = getControlPlaneListenerNameAndSecurityProtocol.map(_._2) def saslMechanismInterBrokerProtocol = getString(KafkaConfig.SaslMechanismInterBrokerProtocolProp) val saslInterBrokerHandshakeRequestEnable = interBrokerProtocolVersion >= KAFKA_0_10_0_IV1 @@ -1337,6 +1357,19 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO }.getOrElse(CoreUtils.listenerListToEndPoints("PLAINTEXT://" + hostName + ":" + port, listenerSecurityProtocolMap)) } + def controlPlaneListener: Option[EndPoint] = { + controlPlaneListenerName.map { listenerName => + listeners.filter(endpoint => endpoint.listenerName.value() == listenerName.value()).head + } + } + + def dataPlaneListeners: Seq[EndPoint] = { + Option(getString(KafkaConfig.ControlPlaneListenerNameProp)) match { + case Some(controlPlaneListenerName) => listeners.filterNot(_.listenerName.value() == controlPlaneListenerName) + case None => listeners + } + } + // If the user defined advertised listeners, we use those // If he didn't but did define advertised host or port, we'll use those and fill in the missing value from regular host / port or defaults // If none of these are defined, we'll use the listeners @@ -1368,6 +1401,19 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO } } + private def getControlPlaneListenerNameAndSecurityProtocol: Option[(ListenerName, SecurityProtocol)] = { + Option(getString(KafkaConfig.ControlPlaneListenerNameProp)) match { + case Some(name) => + val listenerName = ListenerName.normalised(name) + val securityProtocol = listenerSecurityProtocolMap.getOrElse(listenerName, + throw new ConfigException(s"Listener with ${listenerName.value} defined in " + + s"${KafkaConfig.ControlPlaneListenerNameProp} not found in ${KafkaConfig.ListenerSecurityProtocolMapProp}.")) + Some(listenerName, securityProtocol) + + case None => None + } + } + private def getSecurityProtocol(protocolName: String, configName: String): SecurityProtocol = { try SecurityProtocol.forName(protocolName) catch { @@ -1419,6 +1465,17 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO s"${KafkaConfig.AdvertisedListenersProp} cannot use the nonroutable meta-address 0.0.0.0. "+ s"Use a routable IP address.") + // validate controller.listener.name config + if (controlPlaneListenerName.isDefined) { + require(advertisedListenerNames.contains(controlPlaneListenerName.get), + s"${KafkaConfig.ControlPlaneListenerNameProp} must be a listener name defined in ${KafkaConfig.AdvertisedListenersProp}. " + + s"The valid options based on currently configured listeners are ${advertisedListenerNames.map(_.value).mkString(",")}") + // controlPlaneListenerName should be different from interBrokerListenerName + require(!controlPlaneListenerName.get.value().equals(interBrokerListenerName.value()), + s"${KafkaConfig.ControlPlaneListenerNameProp}, when defined, should have a different value from the inter broker listener name. " + + s"Currently they both have the value ${controlPlaneListenerName.get}") + } + val recordVersion = logMessageFormatVersion.recordVersion require(interBrokerProtocolVersion.recordVersion.value >= recordVersion.value, s"log.message.format.version $logMessageFormatVersionString can only be used when inter.broker.protocol.version " + @@ -1443,7 +1500,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO if (connectionsMaxIdleMs >= 0) require(failedAuthenticationDelayMs < connectionsMaxIdleMs, s"${KafkaConfig.FailedAuthenticationDelayMsProp}=$failedAuthenticationDelayMs should always be less than" + - s" ${KafkaConfig.ConnectionsMaxIdleMsProp}=$connectionsMaxIdleMs to prevent failed" + - " authentication responses from timing out") + s" ${KafkaConfig.ConnectionsMaxIdleMsProp}=$connectionsMaxIdleMs to prevent failed" + + s" authentication responses from timing out") } } diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala index d0d4121966311..e0ad1b60cacdf 100755 --- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala +++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala @@ -96,13 +96,15 @@ class KafkaRequestHandlerPool(val brokerId: Int, val requestChannel: RequestChannel, val apis: KafkaApis, time: Time, - numThreads: Int) extends Logging with KafkaMetricsGroup { + numThreads: Int, + requestHandlerAvgIdleMetricName: String, + logAndThreadNamePrefix : String) extends Logging with KafkaMetricsGroup { private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads) /* a meter to track the average free capacity of the request handlers */ - private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS) + private val aggregateIdleMeter = newMeter(requestHandlerAvgIdleMetricName, "percent", TimeUnit.NANOSECONDS) - this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], " + this.logIdent = "[" + logAndThreadNamePrefix + " Kafka Request Handler on Broker " + brokerId + "], " val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads) for (i <- 0 until numThreads) { createHandler(i) @@ -110,7 +112,7 @@ class KafkaRequestHandlerPool(val brokerId: Int, def createHandler(id: Int): Unit = synchronized { runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time) - KafkaThread.daemon("kafka-request-handler-" + id, runnables(id)).start() + KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start() } def resizeThreadPool(newSize: Int): Unit = synchronized { diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 36bc0f1306414..26e1447fb0fb6 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -113,10 +113,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP val brokerState: BrokerState = new BrokerState - var apis: KafkaApis = null + var dataPlaneRequestProcessor: KafkaApis = null + var controlPlaneRequestProcessor: KafkaApis = null + var authorizer: Option[Authorizer] = None var socketServer: SocketServer = null - var requestHandlerPool: KafkaRequestHandlerPool = null + var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null + var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = null var logDirFailureChannel: LogDirFailureChannel = null var logManager: LogManager = null @@ -291,12 +294,21 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS)) /* start processing requests */ - apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator, + dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator, kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers, fetchManager, brokerTopicStats, clusterId, time, tokenManager) - requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time, - config.numIoThreads) + dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, + config.numIoThreads, "RequestHandlerAvgIdlePercent", socketServer.DataPlanePrefix) + + config.controlPlaneListener.foreach { _ => + controlPlaneRequestProcessor = new KafkaApis(socketServer.controlPlaneRequestChannelOpt.get, replicaManager, adminManager, groupCoordinator, transactionCoordinator, + kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers, + fetchManager, brokerTopicStats, clusterId, time, tokenManager) + + controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time, + 1, "ControlPlaneRequestHandlerAvgIdlePercent", socketServer.ControlPlanePrefix) + } Mx4jLoader.maybeLoad() @@ -313,7 +325,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers) dynamicConfigManager.startup() - socketServer.startProcessors() + socketServer.startDataPlaneProcessors() + socketServer.startControlPlaneProcessor() brokerState.newState(RunningAsBroker) shutdownLatch = new CountDownLatch(1) startupComplete.set(true) @@ -579,14 +592,17 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP // Socket server will be shutdown towards the end of the sequence. if (socketServer != null) CoreUtils.swallow(socketServer.stopProcessingRequests(), this) - if (requestHandlerPool != null) - CoreUtils.swallow(requestHandlerPool.shutdown(), this) - + if (dataPlaneRequestHandlerPool != null) + CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this) + if (controlPlaneRequestHandlerPool != null) + CoreUtils.swallow(controlPlaneRequestHandlerPool.shutdown(), this) if (kafkaScheduler != null) CoreUtils.swallow(kafkaScheduler.shutdown(), this) - if (apis != null) - CoreUtils.swallow(apis.close(), this) + if (dataPlaneRequestProcessor != null) + CoreUtils.swallow(dataPlaneRequestProcessor.close(), this) + if (controlPlaneRequestProcessor != null) + CoreUtils.swallow(controlPlaneRequestProcessor.close(), this) CoreUtils.swallow(authorizer.foreach(_.close()), this) if (adminManager != null) CoreUtils.swallow(adminManager.shutdown(), this) diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala index ee69ae42a0f53..92d1758b19460 100644 --- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala @@ -244,9 +244,9 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging { client = AdminClient.create(createConfig()) val nodes = client.describeCluster.nodes.get() val clusterId = client.describeCluster().clusterId().get() - assertEquals(servers.head.apis.clusterId, clusterId) + assertEquals(servers.head.dataPlaneRequestProcessor.clusterId, clusterId) val controller = client.describeCluster().controller().get() - assertEquals(servers.head.apis.metadataCache.getControllerId. + assertEquals(servers.head.dataPlaneRequestProcessor.metadataCache.getControllerId. getOrElse(MetadataResponse.NO_CONTROLLER_ID), controller.id()) val brokers = brokerList.split(",") assertEquals(brokers.size, nodes.size) diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 1b51101e76f4f..4c0459e72e0aa 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -1452,9 +1452,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } def removeAllAcls() = { - servers.head.apis.authorizer.get.getAcls().keys.foreach { resource => - servers.head.apis.authorizer.get.removeAcls(resource) - TestUtils.waitAndVerifyAcls(Set.empty[Acl], servers.head.apis.authorizer.get, resource) + servers.head.dataPlaneRequestProcessor.authorizer.get.getAcls().keys.foreach { resource => + servers.head.dataPlaneRequestProcessor.authorizer.get.removeAcls(resource) + TestUtils.waitAndVerifyAcls(Set.empty[Acl], servers.head.dataPlaneRequestProcessor.authorizer.get, resource) } } @@ -1507,8 +1507,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest { } private def addAndVerifyAcls(acls: Set[Acl], resource: Resource) = { - servers.head.apis.authorizer.get.addAcls(acls, resource) - TestUtils.waitAndVerifyAcls(servers.head.apis.authorizer.get.getAcls(resource) ++ acls, servers.head.apis.authorizer.get, resource) + servers.head.dataPlaneRequestProcessor.authorizer.get.addAcls(acls, resource) + TestUtils.waitAndVerifyAcls(servers.head.dataPlaneRequestProcessor.authorizer.get.getAcls(resource) ++ acls, servers.head.dataPlaneRequestProcessor.authorizer.get, resource) } private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]], diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala index 9b02d809b5500..b28a40f890e00 100644 --- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala @@ -285,7 +285,7 @@ abstract class QuotaTestClients(topic: String, def waitForQuotaUpdate(producerQuota: Long, consumerQuota: Long, requestQuota: Double, server: KafkaServer = leaderNode) { TestUtils.retry(10000) { - val quotaManagers = server.apis.quotas + val quotaManagers = server.dataPlaneRequestProcessor.quotas val overrideProducerQuota = quota(quotaManagers.produce, userPrincipal, producerClientId) val overrideConsumerQuota = quota(quotaManagers.fetch, userPrincipal, consumerClientId) val overrideProducerRequestQuota = quota(quotaManagers.request, userPrincipal, producerClientId) diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala index 854e3381342b9..0587a6de16021 100644 --- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala +++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala @@ -5,7 +5,7 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software @@ -182,8 +182,8 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas override def setUp() { super.setUp() servers.foreach { s => - TestUtils.waitAndVerifyAcls(ClusterActionAcl, s.apis.authorizer.get, Resource.ClusterResource) - TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.apis.authorizer.get, Resource(Topic, "*", LITERAL)) + TestUtils.waitAndVerifyAcls(ClusterActionAcl, s.dataPlaneRequestProcessor.authorizer.get, Resource.ClusterResource) + TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.dataPlaneRequestProcessor.authorizer.get, Resource(Topic, "*", LITERAL)) } // create the test topic with all the brokers as replicas createTopic(topic, 1, 3) @@ -221,7 +221,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas assertTrue("failed re-authentications not 0", TestUtils.totalMetricValue(s, "failed-reauthentication-total") == 0) } } - + private def getGauge(metricName: String) = { Metrics.defaultRegistry.allMetrics.asScala .filterKeys(k => k.getName == metricName) @@ -275,16 +275,16 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas private def setWildcardResourceAcls() { AclCommand.main(produceConsumeWildcardAclArgs) servers.foreach { s => - TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl ++ TopicBrokerReadAcl, s.apis.authorizer.get, wildcardTopicResource) - TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, wildcardGroupResource) + TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl ++ TopicBrokerReadAcl, s.dataPlaneRequestProcessor.authorizer.get, wildcardTopicResource) + TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, wildcardGroupResource) } } private def setPrefixedResourceAcls() { AclCommand.main(produceConsumePrefixedAclsArgs) servers.foreach { s => - TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, prefixedTopicResource) - TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, prefixedGroupResource) + TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, prefixedTopicResource) + TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, prefixedGroupResource) } } @@ -292,9 +292,9 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas AclCommand.main(produceAclArgs(tp.topic)) AclCommand.main(consumeAclArgs(tp.topic)) servers.foreach { s => - TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, + TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, new Resource(Topic, tp.topic, PatternType.LITERAL)) - TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource) + TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource) } val producer = createProducer() sendRecords(producer, numRecords, tp) @@ -315,7 +315,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas def testNoProduceWithDescribeAcl(): Unit = { AclCommand.main(describeAclArgs) servers.foreach { s => - TestUtils.waitAndVerifyAcls(TopicDescribeAcl, s.apis.authorizer.get, topicResource) + TestUtils.waitAndVerifyAcls(TopicDescribeAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource) } try{ val producer = createProducer() @@ -327,7 +327,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas } confirmReauthenticationMetrics } - + /** * Tests that a consumer fails to consume messages without the appropriate * ACL set. @@ -341,7 +341,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas consumeRecords(consumer) confirmReauthenticationMetrics } - + @Test(expected = classOf[TopicAuthorizationException]) def testNoConsumeWithoutDescribeAclViaSubscribe(): Unit = { noConsumeWithoutDescribeAclSetup() @@ -350,13 +350,13 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas // this should timeout since the consumer will not be able to fetch any metadata for the topic consumeRecords(consumer, timeout = 3000) } - + private def noConsumeWithoutDescribeAclSetup(): Unit = { AclCommand.main(produceAclArgs(tp.topic)) AclCommand.main(groupAclArgs) servers.foreach { s => - TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, topicResource) - TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource) + TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource) + TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource) } val producer = createProducer() @@ -365,10 +365,10 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas AclCommand.main(deleteDescribeAclArgs) AclCommand.main(deleteWriteAclArgs) servers.foreach { s => - TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource) + TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource) } } - + @Test def testNoConsumeWithDescribeAclViaAssign(): Unit = { noConsumeWithDescribeAclSetup() @@ -384,7 +384,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas } confirmReauthenticationMetrics } - + @Test def testNoConsumeWithDescribeAclViaSubscribe(): Unit = { noConsumeWithDescribeAclSetup() @@ -400,13 +400,13 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas } confirmReauthenticationMetrics } - + private def noConsumeWithDescribeAclSetup(): Unit = { AclCommand.main(produceAclArgs(tp.topic)) AclCommand.main(groupAclArgs) servers.foreach { s => - TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, topicResource) - TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource) + TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource) + TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource) } val producer = createProducer() sendRecords(producer, numRecords, tp) @@ -420,7 +420,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas def testNoGroupAcl(): Unit = { AclCommand.main(produceAclArgs(tp.topic)) servers.foreach { s => - TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, topicResource) + TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource) } val producer = createProducer() sendRecords(producer, numRecords, tp) diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala index 55e1529c96316..cb2186c42271b 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala @@ -71,7 +71,7 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with private def addClusterAcl(permissionType: PermissionType, operation: Operation): Unit = { val acls = Set(clusterAcl(permissionType, operation)) - val authorizer = servers.head.apis.authorizer.get + val authorizer = servers.head.dataPlaneRequestProcessor.authorizer.get val prevAcls = authorizer.getAcls(AuthResource.ClusterResource) authorizer.addAcls(acls, AuthResource.ClusterResource) TestUtils.waitAndVerifyAcls(prevAcls ++ acls, authorizer, AuthResource.ClusterResource) @@ -79,7 +79,7 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with private def removeClusterAcl(permissionType: PermissionType, operation: Operation): Unit = { val acls = Set(clusterAcl(permissionType, operation)) - val authorizer = servers.head.apis.authorizer.get + val authorizer = servers.head.dataPlaneRequestProcessor.authorizer.get val prevAcls = authorizer.getAcls(AuthResource.ClusterResource) Assert.assertTrue(authorizer.removeAcls(acls, AuthResource.ClusterResource)) TestUtils.waitAndVerifyAcls(prevAcls -- acls, authorizer, AuthResource.ClusterResource) diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index 853f99944ff13..c13b0a384c861 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -501,8 +501,8 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet @Test def testThreadPoolResize(): Unit = { - val requestHandlerPrefix = "kafka-request-handler-" - val networkThreadPrefix = "kafka-network-thread-" + val requestHandlerPrefix = "data-plane-kafka-request-handler-" + val networkThreadPrefix = "data-plane-kafka-network-thread-" val fetcherThreadPrefix = "ReplicaFetcherThread-" // Executor threads and recovery threads are not verified since threads may not be running // For others, thread count should be configuredCount * threadMultiplier * numBrokers @@ -577,7 +577,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet "", mayReceiveDuplicates = false) verifyThreadPoolResize(KafkaConfig.NumNetworkThreadsProp, config.numNetworkThreads, networkThreadPrefix, mayReceiveDuplicates = true) - verifyThreads("kafka-socket-acceptor-", config.listeners.size) + verifyThreads("data-plane-kafka-socket-acceptor-", config.listeners.size) verifyProcessorMetrics() verifyMarkPartitionsForTruncation() diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala index 2306a921898ca..a64f6e7ab9217 100755 --- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala +++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala @@ -175,8 +175,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest { // Test that the existing clientId overrides are read val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect))) servers = Seq(server) - assertEquals(new Quota(1000, true), server.apis.quotas.produce.quota("ANONYMOUS", clientId)) - assertEquals(new Quota(2000, true), server.apis.quotas.fetch.quota("ANONYMOUS", clientId)) + assertEquals(new Quota(1000, true), server.dataPlaneRequestProcessor.quotas.produce.quota("ANONYMOUS", clientId)) + assertEquals(new Quota(2000, true), server.dataPlaneRequestProcessor.quotas.fetch.quota("ANONYMOUS", clientId)) } @Test diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala index 23f62252fcb07..b45c648950561 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala @@ -32,8 +32,10 @@ import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.{ControllerMovedException, StaleBrokerEpochException} import org.apache.log4j.Level import kafka.utils.LogCaptureAppender +import org.apache.kafka.common.metrics.KafkaMetric import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.util.Try class ControllerIntegrationTest extends ZooKeeperTestHarness { @@ -83,6 +85,27 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { waitUntilControllerEpoch(firstControllerEpoch + 1, "controller epoch was not incremented after controller move") } + @Test + def testMetadaPropogationOnControlPlane(): Unit = { + servers = makeServers(1, listeners = Some("PLAINTEXT://localhost:0,CONTROLLER://localhost:5000"), listenerSecurityProtocolMap = Some("PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT"), + controlPlaneListenerName = Some("CONTROLLER")) + TestUtils.waitUntilBrokerMetadataIsPropagated(servers) + val controlPlaneMetricMap = mutable.Map[String, KafkaMetric]() + val dataPlaneMetricMap = mutable.Map[String, KafkaMetric]() + servers.head.metrics.metrics().values().asScala.foreach { kafkaMetric => + if (kafkaMetric.metricName().tags().values().contains("CONTROLLER")) { + controlPlaneMetricMap.put(kafkaMetric.metricName().name(), kafkaMetric) + } + if (kafkaMetric.metricName().tags().values().contains("PLAINTEXT")) { + dataPlaneMetricMap.put(kafkaMetric.metricName().name(), kafkaMetric) + } + } + assertTrue(controlPlaneMetricMap.get("response-total").get.metricValue().asInstanceOf[Double] == 1.0 && dataPlaneMetricMap.get("response-total").get.metricValue().asInstanceOf[Double] == 0.0) + assertTrue(controlPlaneMetricMap.get("request-total").get.metricValue().asInstanceOf[Double] == 1.0 && dataPlaneMetricMap.get("request-total").get.metricValue().asInstanceOf[Double] == 0.0) + assertTrue(controlPlaneMetricMap.get("incoming-byte-total").get.metricValue().asInstanceOf[Double] > 1.0 && dataPlaneMetricMap.get("incoming-byte-total").get.metricValue().asInstanceOf[Double] == 0.0) + assertTrue(controlPlaneMetricMap.get("network-io-total").get.metricValue().asInstanceOf[Double] == 2.0 && dataPlaneMetricMap.get("network-io-total").get.metricValue().asInstanceOf[Double] == 0.0) + } + // This test case is used to ensure that there will be no correctness issue after we avoid sending out full // UpdateMetadataRequest to all brokers in the cluster @Test @@ -376,10 +399,10 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { var activeServers = servers.filter(s => s.config.brokerId != 2) // wait for the update metadata request to trickle to the brokers TestUtils.waitUntilTrue(() => - activeServers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.isr.size != 3), + activeServers.forall(_.dataPlaneRequestProcessor.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.isr.size != 3), "Topic test not created after timeout") assertEquals(0, partitionsRemaining.size) - var partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get + var partitionStateInfo = activeServers.head.dataPlaneRequestProcessor.metadataCache.getPartitionInfo(topic,partition).get var leaderAfterShutdown = partitionStateInfo.basePartitionState.leader assertEquals(0, leaderAfterShutdown) assertEquals(2, partitionStateInfo.basePartitionState.isr.size) @@ -388,16 +411,16 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { partitionsRemaining = resultQueue.take().get assertEquals(0, partitionsRemaining.size) activeServers = servers.filter(s => s.config.brokerId == 0) - partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get + partitionStateInfo = activeServers.head.dataPlaneRequestProcessor.metadataCache.getPartitionInfo(topic,partition).get leaderAfterShutdown = partitionStateInfo.basePartitionState.leader assertEquals(0, leaderAfterShutdown) - assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader == 0)) + assertTrue(servers.forall(_.dataPlaneRequestProcessor.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader == 0)) controller.controlledShutdown(0, servers.find(_.config.brokerId == 0).get.kafkaController.brokerEpoch, controlledShutdownCallback) partitionsRemaining = resultQueue.take().get assertEquals(1, partitionsRemaining.size) // leader doesn't change since all the replicas are shut down - assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader == 0)) + assertTrue(servers.forall(_.dataPlaneRequestProcessor.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader == 0)) } @Test @@ -585,12 +608,18 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { private def makeServers(numConfigs: Int, autoLeaderRebalanceEnable: Boolean = false, uncleanLeaderElectionEnable: Boolean = false, - enableControlledShutdown: Boolean = true) = { + enableControlledShutdown: Boolean = true, + listeners : Option[String] = None, + listenerSecurityProtocolMap : Option[String] = None, + controlPlaneListenerName : Option[String] = None) = { val configs = TestUtils.createBrokerConfigs(numConfigs, zkConnect, enableControlledShutdown = enableControlledShutdown) configs.foreach { config => config.setProperty(KafkaConfig.AutoLeaderRebalanceEnableProp, autoLeaderRebalanceEnable.toString) config.setProperty(KafkaConfig.UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString) config.setProperty(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp, "1") + listeners.foreach(listener => config.setProperty(KafkaConfig.ListenersProp, listener)) + listenerSecurityProtocolMap.foreach(listenerMap => config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, listenerMap)) + controlPlaneListenerName.foreach(controlPlaneListener => config.setProperty(KafkaConfig.ControlPlaneListenerNameProp, controlPlaneListener)) } configs.map(config => TestUtils.createServer(KafkaConfig.fromProps(config))) } diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index b5983377b7730..cb41638f2e723 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -21,7 +21,7 @@ import java.io._ import java.net._ import java.nio.ByteBuffer import java.nio.channels.SocketChannel -import java.util.{HashMap, Random} +import java.util.{HashMap, Properties, Random} import com.yammer.metrics.core.{Gauge, Meter} import com.yammer.metrics.{Metrics => YammerMetrics} @@ -134,8 +134,8 @@ class SocketServerTest extends JUnitSuite { channel.sendResponse(new RequestChannel.SendResponse(request, send, Some(request.header.toString), None)) } - def connect(s: SocketServer = server, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT, localAddr: InetAddress = null) = { - val socket = new Socket("localhost", s.boundPort(ListenerName.forSecurityProtocol(protocol)), localAddr, 0) + def connect(s: SocketServer = server, listenerName: ListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), localAddr: InetAddress = null, port: Int = 0) = { + val socket = new Socket("localhost", s.boundPort(/*ListenerName.forSecurityProtocol(protocol)*/listenerName), localAddr, port) sockets += socket socket } @@ -144,13 +144,13 @@ class SocketServerTest extends JUnitSuite { def connectAndProcessRequest(s: SocketServer): (Socket, String) = { val socket = connect(s) val request = sendAndReceiveRequest(socket, s) - processRequest(s.requestChannel, request) + processRequest(s.dataPlaneRequestChannel, request) (socket, request.context.connectionId) } def sendAndReceiveRequest(socket: Socket, server: SocketServer): RequestChannel.Request = { sendRequest(socket, producerRequestBytes()) - receiveRequest(server.requestChannel) + receiveRequest(server.dataPlaneRequestChannel) } def shutdownServerAndMetrics(server: SocketServer): Unit = { @@ -176,15 +176,29 @@ class SocketServerTest extends JUnitSuite { @Test def simpleRequest() { - val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT) + val plainSocket = connect() val serializedBytes = producerRequestBytes() // Test PLAINTEXT socket sendRequest(plainSocket, serializedBytes) - processRequest(server.requestChannel) + processRequest(server.dataPlaneRequestChannel) assertEquals(serializedBytes.toSeq, receiveResponse(plainSocket).toSeq) } + @Test + def testControlPlaneRequest(): Unit = { + val testProps = new Properties + testProps.putAll(props) + testProps.put("listeners", "PLAINTEXT://localhost:0,CONTROLLER://localhost:5000") + testProps.put("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT") + testProps.put("control.plane.listener.name", "CONTROLLER") + val config = KafkaConfig.fromProps(testProps) + withTestableControlPlaneServer (config, { testableServer => + val socket = connect(testableServer, config.controlPlaneListenerName.get, localAddr = InetAddress.getLocalHost, port = 5000) + sendAndReceiveControllerRequest(socket, testableServer) + }) + } + @Test def tooBigRequestIsRejected() { val tooManyBytes = new Array[Byte](server.config.socketRequestMaxBytes + 1) @@ -205,41 +219,41 @@ class SocketServerTest extends JUnitSuite { @Test def testGracefulClose() { - val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT) + val plainSocket = connect() val serializedBytes = producerRequestBytes() for (_ <- 0 until 10) sendRequest(plainSocket, serializedBytes) plainSocket.close() for (_ <- 0 until 10) { - val request = receiveRequest(server.requestChannel) + val request = receiveRequest(server.dataPlaneRequestChannel) assertNotNull("receiveRequest timed out", request) - server.requestChannel.sendResponse(new RequestChannel.NoOpResponse(request)) + server.dataPlaneRequestChannel.sendResponse(new RequestChannel.NoOpResponse(request)) } } @Test def testNoOpAction(): Unit = { - val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT) + val plainSocket = connect() val serializedBytes = producerRequestBytes() for (_ <- 0 until 3) sendRequest(plainSocket, serializedBytes) for (_ <- 0 until 3) { - val request = receiveRequest(server.requestChannel) + val request = receiveRequest(server.dataPlaneRequestChannel) assertNotNull("receiveRequest timed out", request) - server.requestChannel.sendResponse(new RequestChannel.NoOpResponse(request)) + server.dataPlaneRequestChannel.sendResponse(new RequestChannel.NoOpResponse(request)) } } @Test def testConnectionId() { - val sockets = (1 to 5).map(_ => connect(protocol = SecurityProtocol.PLAINTEXT)) + val sockets = (1 to 5).map(_ => connect()) val serializedBytes = producerRequestBytes() val requests = sockets.map{socket => sendRequest(socket, serializedBytes) - receiveRequest(server.requestChannel) + receiveRequest(server.dataPlaneRequestChannel) } requests.zipWithIndex.foreach { case (request, i) => val index = request.context.connectionId.split("-").last @@ -258,36 +272,36 @@ class SocketServerTest extends JUnitSuite { val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, time, credentialProvider) def openChannel(request: RequestChannel.Request): Option[KafkaChannel] = - overrideServer.processor(request.processor).channel(request.context.connectionId) + overrideServer.dataPlaneProcessor(request.processor).channel(request.context.connectionId) def openOrClosingChannel(request: RequestChannel.Request): Option[KafkaChannel] = - overrideServer.processor(request.processor).openOrClosingChannel(request.context.connectionId) + overrideServer.dataPlaneProcessor(request.processor).openOrClosingChannel(request.context.connectionId) try { overrideServer.startup() val serializedBytes = producerRequestBytes() // Connection with no staged receives - val socket1 = connect(overrideServer, protocol = SecurityProtocol.PLAINTEXT) + val socket1 = connect(overrideServer) sendRequest(socket1, serializedBytes) - val request1 = receiveRequest(overrideServer.requestChannel) + val request1 = receiveRequest(overrideServer.dataPlaneRequestChannel) assertTrue("Channel not open", openChannel(request1).nonEmpty) assertEquals(openChannel(request1), openOrClosingChannel(request1)) time.sleep(idleTimeMs + 1) TestUtils.waitUntilTrue(() => openOrClosingChannel(request1).isEmpty, "Failed to close idle channel") assertTrue("Channel not removed", openChannel(request1).isEmpty) - processRequest(overrideServer.requestChannel, request1) + processRequest(overrideServer.dataPlaneRequestChannel, request1) // Connection with staged receives - val socket2 = connect(overrideServer, protocol = SecurityProtocol.PLAINTEXT) + val socket2 = connect(overrideServer) val request2 = sendRequestsUntilStagedReceive(overrideServer, socket2, serializedBytes) time.sleep(idleTimeMs + 1) TestUtils.waitUntilTrue(() => openChannel(request2).isEmpty, "Failed to close idle channel") TestUtils.waitUntilTrue(() => openOrClosingChannel(request2).nonEmpty, "Channel removed without processing staged receives") - processRequest(overrideServer.requestChannel, request2) // this triggers a failed send since channel has been closed + processRequest(overrideServer.dataPlaneRequestChannel, request2) // this triggers a failed send since channel has been closed TestUtils.waitUntilTrue(() => openOrClosingChannel(request2).isEmpty, "Failed to remove channel with failed sends") - assertNull("Received request after failed send", overrideServer.requestChannel.receiveRequest(200)) + assertNull("Received request after failed send", overrideServer.dataPlaneRequestChannel.receiveRequest(200)) } finally { shutdownServerAndMetrics(overrideServer) @@ -304,9 +318,9 @@ class SocketServerTest extends JUnitSuite { @volatile var selector: TestableSelector = null val overrideConnectionId = "127.0.0.1:1-127.0.0.1:2-0" val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, time, credentialProvider) { - override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName, + override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName, protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = { - new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas, + new Processor(id, time, config.socketRequestMaxBytes, dataPlaneRequestChannel, connectionQuotas, config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics, credentialProvider, memoryPool, new LogContext()) { override protected[network] def connectionId(socket: Socket): String = overrideConnectionId @@ -319,8 +333,8 @@ class SocketServerTest extends JUnitSuite { } } - def openChannel: Option[KafkaChannel] = overrideServer.processor(0).channel(overrideConnectionId) - def openOrClosingChannel: Option[KafkaChannel] = overrideServer.processor(0).openOrClosingChannel(overrideConnectionId) + def openChannel: Option[KafkaChannel] = overrideServer.dataPlaneProcessor(0).channel(overrideConnectionId) + def openOrClosingChannel: Option[KafkaChannel] = overrideServer.dataPlaneProcessor(0).openOrClosingChannel(overrideConnectionId) def connectionCount = overrideServer.connectionCount(InetAddress.getByName("127.0.0.1")) // Create a client connection and wait for server to register the connection with the selector. For @@ -361,7 +375,7 @@ class SocketServerTest extends JUnitSuite { assertSame(channel1, openOrClosingChannel.getOrElse(throw new RuntimeException("Channel not found"))) // Complete request with failed send so that `channel1` is removed from Selector.closingChannels - processRequest(overrideServer.requestChannel, request) + processRequest(overrideServer.dataPlaneRequestChannel, request) TestUtils.waitUntilTrue(() => connectionCount == 0 && openOrClosingChannel.isEmpty, "Failed to remove channel with failed send") // Check that new connections can be created with the same id since `channel1` is no longer in Selector @@ -380,14 +394,14 @@ class SocketServerTest extends JUnitSuite { def sendTwoRequestsReceiveOne(): RequestChannel.Request = { sendRequest(socket, requestBytes, flush = false) sendRequest(socket, requestBytes, flush = true) - receiveRequest(server.requestChannel) + receiveRequest(server.dataPlaneRequestChannel) } val (request, hasStagedReceives) = TestUtils.computeUntilTrue(sendTwoRequestsReceiveOne()) { req => val connectionId = req.context.connectionId - val hasStagedReceives = server.processor(0).numStagedReceives(connectionId) > 0 + val hasStagedReceives = server.dataPlaneProcessor(0).numStagedReceives(connectionId) > 0 if (!hasStagedReceives) { - processRequest(server.requestChannel, req) - processRequest(server.requestChannel) + processRequest(server.dataPlaneRequestChannel, req) + processRequest(server.dataPlaneRequestChannel) } hasStagedReceives } @@ -403,11 +417,11 @@ class SocketServerTest extends JUnitSuite { // Mimic a primitive request handler that fetches the request from RequestChannel and place a response with a // throttled channel. - val request = receiveRequest(server.requestChannel) + val request = receiveRequest(server.dataPlaneRequestChannel) val byteBuffer = request.body[AbstractRequest].serialize(request.header) val send = new NetworkSend(request.context.connectionId, byteBuffer) def channelThrottlingCallback(response: RequestChannel.Response): Unit = { - server.requestChannel.sendResponse(response) + server.dataPlaneRequestChannel.sendResponse(response) } val throttledChannel = new ThrottledChannel(request, new MockTime(), 100, channelThrottlingCallback) val response = @@ -415,7 +429,7 @@ class SocketServerTest extends JUnitSuite { new RequestChannel.SendResponse(request, send, Some(request.header.toString), None) else new RequestChannel.NoOpResponse(request) - server.requestChannel.sendResponse(response) + server.dataPlaneRequestChannel.sendResponse(response) // Quota manager would call notifyThrottlingDone() on throttling completion. Simulate it if throttleingInProgress is // false. @@ -426,11 +440,11 @@ class SocketServerTest extends JUnitSuite { } def openOrClosingChannel(request: RequestChannel.Request): Option[KafkaChannel] = - server.processor(0).openOrClosingChannel(request.context.connectionId) + server.dataPlaneProcessor(0).openOrClosingChannel(request.context.connectionId) @Test def testSendActionResponseWithThrottledChannelWhereThrottlingInProgress() { - val socket = connect(protocol = SecurityProtocol.PLAINTEXT) + val socket = connect() val serializedBytes = producerRequestBytes() // SendAction with throttling in progress val request = throttledChannelTestSetUp(socket, serializedBytes, false, true) @@ -444,7 +458,7 @@ class SocketServerTest extends JUnitSuite { @Test def testSendActionResponseWithThrottledChannelWhereThrottlingAlreadyDone() { - val socket = connect(protocol = SecurityProtocol.PLAINTEXT) + val socket = connect() val serializedBytes = producerRequestBytes() // SendAction with throttling in progress val request = throttledChannelTestSetUp(socket, serializedBytes, false, false) @@ -459,7 +473,7 @@ class SocketServerTest extends JUnitSuite { @Test def testNoOpActionResponseWithThrottledChannelWhereThrottlingInProgress() { - val socket = connect(protocol = SecurityProtocol.PLAINTEXT) + val socket = connect() val serializedBytes = producerRequestBytes() // SendAction with throttling in progress val request = throttledChannelTestSetUp(socket, serializedBytes, true, true) @@ -471,7 +485,7 @@ class SocketServerTest extends JUnitSuite { @Test def testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone() { - val socket = connect(protocol = SecurityProtocol.PLAINTEXT) + val socket = connect() val serializedBytes = producerRequestBytes() // SendAction with throttling in progress val request = throttledChannelTestSetUp(socket, serializedBytes, true, false) @@ -485,16 +499,16 @@ class SocketServerTest extends JUnitSuite { @Test def testSocketsCloseOnShutdown() { // open a connection - val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT) + val plainSocket = connect() plainSocket.setTcpNoDelay(true) val bytes = new Array[Byte](40) // send a request first to make sure the connection has been picked up by the socket server sendRequest(plainSocket, bytes, Some(0)) - processRequest(server.requestChannel) + processRequest(server.dataPlaneRequestChannel) // the following sleep is necessary to reliably detect the connection close when we send data below Thread.sleep(200L) - // make sure the sockets are open - server.acceptors.asScala.values.foreach(acceptor => assertFalse(acceptor.serverChannel.socket.isClosed)) + // make sure the sockets ar e open + server.dataPlaneAcceptors.asScala.values.foreach(acceptor => assertFalse(acceptor.serverChannel.socket.isClosed)) // then shutdown the server shutdownServerAndMetrics(server) @@ -527,7 +541,7 @@ class SocketServerTest extends JUnitSuite { val conn2 = connect() val serializedBytes = producerRequestBytes() sendRequest(conn2, serializedBytes) - val request = server.requestChannel.receiveRequest(2000) + val request = server.dataPlaneRequestChannel.receiveRequest(2000) assertNotNull(request) } @@ -555,7 +569,7 @@ class SocketServerTest extends JUnitSuite { val conn2 = connect(server) val serializedBytes = producerRequestBytes() sendRequest(conn2, serializedBytes) - val request = server.requestChannel.receiveRequest(2000) + val request = server.dataPlaneRequestChannel.receiveRequest(2000) assertNotNull(request) // now try to connect from the external facing interface, which should fail @@ -583,7 +597,7 @@ class SocketServerTest extends JUnitSuite { // it should succeed val serializedBytes = producerRequestBytes() sendRequest(conns.last, serializedBytes) - val request = overrideServer.requestChannel.receiveRequest(2000) + val request = overrideServer.dataPlaneRequestChannel.receiveRequest(2000) assertNotNull(request) // now try one more (should fail) @@ -627,7 +641,7 @@ class SocketServerTest extends JUnitSuite { byteBuffer.get(serializedBytes) sendRequest(sslSocket, serializedBytes) - processRequest(overrideServer.requestChannel) + processRequest(overrideServer.dataPlaneRequestChannel) assertEquals(serializedBytes.toSeq, receiveResponse(sslSocket).toSeq) sslSocket.close() } finally { @@ -640,7 +654,7 @@ class SocketServerTest extends JUnitSuite { val socket = connect() val bytes = new Array[Byte](40) sendRequest(socket, bytes, Some(0)) - assertEquals(KafkaPrincipal.ANONYMOUS, receiveRequest(server.requestChannel).session.principal) + assertEquals(KafkaPrincipal.ANONYMOUS, receiveRequest(server.dataPlaneRequestChannel).session.principal) } /* Test that we update request metrics if the client closes the connection while the broker response is in flight. */ @@ -650,9 +664,9 @@ class SocketServerTest extends JUnitSuite { val serverMetrics = new Metrics var conn: Socket = null val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM, credentialProvider) { - override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName, + override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName, protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = { - new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas, + new Processor(id, time, config.socketRequestMaxBytes, dataPlaneRequestChannel, connectionQuotas, config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics, credentialProvider, MemoryPool.NONE, new LogContext()) { override protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) { @@ -668,7 +682,7 @@ class SocketServerTest extends JUnitSuite { val serializedBytes = producerRequestBytes() sendRequest(conn, serializedBytes) - val channel = overrideServer.requestChannel + val channel = overrideServer.dataPlaneRequestChannel val request = receiveRequest(channel) val requestMetrics = channel.metrics(request.header.apiKey.name) @@ -696,9 +710,9 @@ class SocketServerTest extends JUnitSuite { @volatile var selector: TestableSelector = null val overrideConnectionId = "127.0.0.1:1-127.0.0.1:2-0" val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM, credentialProvider) { - override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName, + override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName, protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = { - new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas, + new Processor(id, time, config.socketRequestMaxBytes, dataPlaneRequestChannel, connectionQuotas, config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics, credentialProvider, memoryPool, new LogContext()) { override protected[network] def connectionId(socket: Socket): String = overrideConnectionId @@ -711,8 +725,8 @@ class SocketServerTest extends JUnitSuite { } } - def openChannel: Option[KafkaChannel] = overrideServer.processor(0).channel(overrideConnectionId) - def openOrClosingChannel: Option[KafkaChannel] = overrideServer.processor(0).openOrClosingChannel(overrideConnectionId) + def openChannel: Option[KafkaChannel] = overrideServer.dataPlaneProcessor(0).channel(overrideConnectionId) + def openOrClosingChannel: Option[KafkaChannel] = overrideServer.dataPlaneProcessor(0).openOrClosingChannel(overrideConnectionId) try { overrideServer.startup() @@ -730,7 +744,7 @@ class SocketServerTest extends JUnitSuite { socket.close() // Complete request with socket exception so that the channel is removed from Selector.closingChannels - processRequest(overrideServer.requestChannel, request) + processRequest(overrideServer.dataPlaneRequestChannel, request) TestUtils.waitUntilTrue(() => openOrClosingChannel.isEmpty, "Channel not closed after failed send") assertTrue("Unexpected completed send", selector.completedSends.isEmpty) } finally { @@ -755,10 +769,10 @@ class SocketServerTest extends JUnitSuite { conn = connect(overrideServer) val serializedBytes = producerRequestBytes() sendRequest(conn, serializedBytes) - val channel = overrideServer.requestChannel + val channel = overrideServer.dataPlaneRequestChannel val request = receiveRequest(channel) - TestUtils.waitUntilTrue(() => overrideServer.processor(request.processor).channel(request.context.connectionId).isEmpty, + TestUtils.waitUntilTrue(() => overrideServer.dataPlaneProcessor(request.processor).channel(request.context.connectionId).isEmpty, s"Idle connection `${request.context.connectionId}` was not closed by selector") val requestMetrics = channel.metrics(request.header.apiKey.name) @@ -780,10 +794,10 @@ class SocketServerTest extends JUnitSuite { server.stopProcessingRequests() val version = ApiKeys.PRODUCE.latestVersion val version2 = (version - 1).toShort - for (_ <- 0 to 1) server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).mark() - server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version2).mark() - assertEquals(2, server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).count()) - server.requestChannel.updateErrorMetrics(ApiKeys.PRODUCE, Map(Errors.NONE -> 1)) + for (_ <- 0 to 1) server.dataPlaneRequestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).mark() + server.dataPlaneRequestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version2).mark() + assertEquals(2, server.dataPlaneRequestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).count()) + server.dataPlaneRequestChannel.updateErrorMetrics(ApiKeys.PRODUCE, Map(Errors.NONE -> 1)) val nonZeroMeters = Map(s"kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce,version=$version" -> 2, s"kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce,version=$version2" -> 1, "kafka.network:type=RequestMetrics,name=ErrorsPerSec,request=Produce,error=NONE" -> 1) @@ -818,7 +832,7 @@ class SocketServerTest extends JUnitSuite { val kafkaMetricNames = metrics.metrics.keySet.asScala.filter(_.tags.asScala.get("listener").nonEmpty) assertFalse(kafkaMetricNames.isEmpty) - val expectedListeners = Set("PLAINTEXT", "TRACE") + val expectedListeners = Set("PLAINTEXT") kafkaMetricNames.foreach { kafkaMetricName => assertTrue(expectedListeners.contains(kafkaMetricName.tags.get("listener"))) } @@ -879,7 +893,7 @@ class SocketServerTest extends JUnitSuite { sockets.foreach(sendRequest(_, producerRequestBytes())) testableServer.testableSelector.addFailure(SelectorOperation.Send) - sockets.foreach(_ => processRequest(testableServer.requestChannel)) + sockets.foreach(_ => processRequest(testableServer.dataPlaneRequestChannel)) testableSelector.waitForOperations(SelectorOperation.Send, 2) testableServer.waitForChannelClose(testableSelector.allFailedChannels.head, locallyClosed = true) @@ -900,7 +914,7 @@ class SocketServerTest extends JUnitSuite { val sockets = (1 to 2).map(_ => connect(testableServer)) sockets.foreach(sendRequest(_, producerRequestBytes())) - val requestChannel = testableServer.requestChannel + val requestChannel = testableServer.dataPlaneRequestChannel val requests = sockets.map(_ => receiveRequest(requestChannel)) val failedConnectionId = requests(0).context.connectionId @@ -933,8 +947,8 @@ class SocketServerTest extends JUnitSuite { testableSelector.addFailure(SelectorOperation.Send) sockets(0).close() - processRequest(testableServer.requestChannel, request) - processRequest(testableServer.requestChannel) // Also process request from other channel + processRequest(testableServer.dataPlaneRequestChannel, request) + processRequest(testableServer.dataPlaneRequestChannel) // Also process request from other channel testableSelector.waitForOperations(SelectorOperation.Send, 2) testableServer.waitForChannelClose(request.context.connectionId, locallyClosed = true) @@ -957,7 +971,7 @@ class SocketServerTest extends JUnitSuite { withTestableServer { testableServer => val sockets = (1 to 2).map(_ => connect(testableServer)) val testableSelector = testableServer.testableSelector - val requestChannel = testableServer.requestChannel + val requestChannel = testableServer.dataPlaneRequestChannel testableSelector.cachedCompletedReceives.minPerPoll = 2 testableSelector.addFailure(SelectorOperation.Mute) @@ -989,7 +1003,7 @@ class SocketServerTest extends JUnitSuite { val requests = sockets.map(sendAndReceiveRequest(_, testableServer)) testableSelector.addFailure(SelectorOperation.Unmute) - requests.foreach(processRequest(testableServer.requestChannel, _)) + requests.foreach(processRequest(testableServer.dataPlaneRequestChannel, _)) testableSelector.waitForOperations(SelectorOperation.Unmute, 2) testableServer.waitForChannelClose(testableSelector.allFailedChannels.head, locallyClosed = true) @@ -1070,10 +1084,25 @@ class SocketServerTest extends JUnitSuite { } } + private def withTestableControlPlaneServer(config : KafkaConfig, testWithServer: TestableSocketServer => Unit) = { + val testableServer = new TestableSocketServer(config) + testableServer.startup() + try { + testWithServer(testableServer) + } finally { + shutdownServerAndMetrics(testableServer) + } + } + + def sendAndReceiveControllerRequest(socket: Socket, server: SocketServer): RequestChannel.Request = { + sendRequest(socket, producerRequestBytes()) + receiveRequest(server.controlPlaneRequestChannelOpt.get) + } + private def assertProcessorHealthy(testableServer: TestableSocketServer, healthySockets: Seq[Socket] = Seq.empty): Unit = { val selector = testableServer.testableSelector selector.reset() - val requestChannel = testableServer.requestChannel + val requestChannel = testableServer.dataPlaneRequestChannel // Check that existing channels behave as expected healthySockets.foreach { socket => @@ -1096,18 +1125,17 @@ class SocketServerTest extends JUnitSuite { def isSocketConnectionId(connectionId: String, socket: Socket): Boolean = connectionId.contains(s":${socket.getLocalPort}-") - class TestableSocketServer extends SocketServer(KafkaConfig.fromProps(props), + class TestableSocketServer(config : KafkaConfig = config) extends SocketServer(config, new Metrics, Time.SYSTEM, credentialProvider) { @volatile var selector: Option[TestableSelector] = None - override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName, + override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName, protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = { new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas, config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics, credentialProvider, memoryPool, new LogContext()) { override protected[network] def createSelector(channelBuilder: ChannelBuilder): Selector = { - val testableSelector = new TestableSelector(config, channelBuilder, time, metrics) - assertEquals(None, selector) + val testableSelector = new TestableSelector(config, channelBuilder, time, metrics, metricTags.asScala) selector = Some(testableSelector) testableSelector } @@ -1131,7 +1159,7 @@ class SocketServerTest extends JUnitSuite { val openCount = selector.allChannels.size - 1 // minus one for the channel just closed above TestUtils.waitUntilTrue(() => connectionCount(localAddress) == openCount, "Connection count not decremented") TestUtils.waitUntilTrue(() => - processor(0).inflightResponseCount == 0, "Inflight responses not cleared") + dataPlaneProcessor(0).inflightResponseCount == 0, "Inflight responses not cleared") assertNull("Channel not removed", selector.channel(connectionId)) assertNull("Closing channel not removed", selector.closingChannel(connectionId)) } @@ -1149,9 +1177,9 @@ class SocketServerTest extends JUnitSuite { case object CloseSelector extends SelectorOperation } - class TestableSelector(config: KafkaConfig, channelBuilder: ChannelBuilder, time: Time, metrics: Metrics) + class TestableSelector(config: KafkaConfig, channelBuilder: ChannelBuilder, time: Time, metrics: Metrics, metricTags: mutable.Map[String, String] = mutable.Map.empty) extends Selector(config.socketRequestMaxBytes, config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, - metrics, time, "socket-server", new HashMap, false, true, channelBuilder, MemoryPool.NONE, new LogContext()) { + metrics, time, "socket-server", metricTags.asJava, false, true, channelBuilder, MemoryPool.NONE, new LogContext()) { val failures = mutable.Map[SelectorOperation, Exception]() val operationCounts = mutable.Map[SelectorOperation, Int]().withDefaultValue(0) @@ -1302,4 +1330,4 @@ class SocketServerTest extends JUnitSuite { sockets.filterNot(socket => isSocketConnectionId(failedConnectionId, socket)) } } -} +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala index 789dbaeb8f762..f5cc134297024 100644 --- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala @@ -93,7 +93,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { props.put(DynamicConfig.Client.ProducerByteRateOverrideProp, "1000") props.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, "2000") - val quotaManagers = servers.head.apis.quotas + val quotaManagers = servers.head.dataPlaneRequestProcessor.quotas rootEntityType match { case ConfigType.Client => adminZkClient.changeClientIdConfig(configEntityName, props) case _ => adminZkClient.changeUserOrUserClientIdConfig(configEntityName, props) @@ -179,7 +179,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { // Remove config change znodes to force quota initialization only through loading of user/client quotas zkClient.getChildren(ConfigEntityChangeNotificationZNode.path).foreach { p => zkClient.deletePath(ConfigEntityChangeNotificationZNode.path + "/" + p) } server.startup() - val quotaManagers = server.apis.quotas + val quotaManagers = server.dataPlaneRequestProcessor.quotas assertEquals(Quota.upperBound(1000), quotaManagers.produce.quota("someuser", "overriddenClientId")) assertEquals(Quota.upperBound(2000), quotaManagers.fetch.quota("someuser", "overriddenClientId")) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 10dba1a479394..44b321e5735d0 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -229,6 +229,31 @@ class KafkaConfigTest { assertFalse(isValidKafkaConfig(props)) } + @Test + def testControlPlaneListenerName() = { + val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) + props.put("listeners", "PLAINTEXT://localhost:0,CONTROLLER://localhost:5000") + props.put("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL") + props.put("control.plane.listener.name", "CONTROLLER") + assertTrue(isValidKafkaConfig(props)) + + val serverConfig = KafkaConfig.fromProps(props) + val controlEndpoint = serverConfig.controlPlaneListener.get + assertEquals("localhost", controlEndpoint.host) + assertEquals(5000, controlEndpoint.port) + assertEquals(SecurityProtocol.SSL, controlEndpoint.securityProtocol) + + //advertised listener should contain control plane listener + val advertisedEndpoints = serverConfig.advertisedListeners + assertFalse(advertisedEndpoints.filter { endpoint => + endpoint.securityProtocol == controlEndpoint.securityProtocol && endpoint.listenerName.value().equals(controlEndpoint.listenerName.value()) + }.isEmpty) + + // interBrokerListener name should be different from control plane listener name + val interBrokerListenerName = serverConfig.interBrokerListenerName + assertFalse(interBrokerListenerName.value().equals(controlEndpoint.listenerName.value())) + } + @Test def testBadListenerProtocol() { val props = new Properties() diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala index 4e60a8f602792..ef3dece306325 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala @@ -74,7 +74,7 @@ class MetadataRequestTest extends BaseRequestTest { assertNotEquals("Controller id should switch to a new broker", controllerId, controllerId2) TestUtils.waitUntilTrue(() => { val metadataResponse2 = sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort)) - metadataResponse2.controller != null && controllerServer2.apis.brokerId == metadataResponse2.controller.id + metadataResponse2.controller != null && controllerServer2.dataPlaneRequestProcessor.brokerId == metadataResponse2.controller.id }, "Controller id should match the active controller after failover", 5000) } @@ -178,7 +178,7 @@ class MetadataRequestTest extends BaseRequestTest { assertEquals(topic1, topicMetadata1.topic) assertEquals(Errors.INVALID_TOPIC_EXCEPTION, topicMetadata2.error) assertEquals(topic2, topicMetadata2.topic) - + TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 0) TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 0) @@ -250,7 +250,7 @@ class MetadataRequestTest extends BaseRequestTest { val metadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, true, 1.toShort)) val partitionMetadata = metadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head val downNode = servers.find { server => - val serverId = server.apis.brokerId + val serverId = server.dataPlaneRequestProcessor.brokerId val leaderId = partitionMetadata.leader.id val replicaIds = partitionMetadata.replicas.asScala.map(_.id) serverId != leaderId && replicaIds.contains(serverId) @@ -260,7 +260,7 @@ class MetadataRequestTest extends BaseRequestTest { TestUtils.waitUntilTrue(() => { val response = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, true, 1.toShort)) val metadata = response.topicMetadata.asScala.head.partitionMetadata.asScala.head - val replica = metadata.replicas.asScala.find(_.id == downNode.apis.brokerId).get + val replica = metadata.replicas.asScala.find(_.id == downNode.dataPlaneRequestProcessor.brokerId).get replica.host == "" & replica.port == -1 }, "Replica was not found down", 5000) diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index ae8353657a693..c9e4e7890ebf6 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -99,7 +99,7 @@ class RequestQuotaTest extends BaseRequestTest { adminZkClient.changeClientIdConfig(Sanitizer.sanitize(smallQuotaConsumerClientId), quotaProps) TestUtils.retry(10000) { - val quotaManager = servers.head.apis.quotas.request + val quotaManager = servers.head.dataPlaneRequestProcessor.quotas.request assertEquals(s"Default request quota not set", Quota.upperBound(0.01), quotaManager.quota("some-user", "some-client")) assertEquals(s"Request quota override not set", Quota.upperBound(2000), quotaManager.quota("some-user", unthrottledClientId)) } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index e5ea6a4baae37..ecdb8c8739cc5 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -835,7 +835,7 @@ object TestUtils extends Logging { timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Unit = { val expectedBrokerIds = servers.map(_.config.brokerId).toSet TestUtils.waitUntilTrue(() => servers.forall(server => - expectedBrokerIds == server.apis.metadataCache.getAliveBrokers.map(_.id).toSet + expectedBrokerIds == server.dataPlaneRequestProcessor.metadataCache.getAliveBrokers.map(_.id).toSet ), "Timed out waiting for broker metadata to propagate to all servers", timeout) } @@ -855,7 +855,7 @@ object TestUtils extends Logging { TestUtils.waitUntilTrue(() => servers.foldLeft(true) { (result, server) => - val partitionStateOpt = server.apis.metadataCache.getPartitionInfo(topic, partition) + val partitionStateOpt = server.dataPlaneRequestProcessor.metadataCache.getPartitionInfo(topic, partition) partitionStateOpt match { case None => false case Some(partitionState) => diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala index b15f8ac47b576..c120caa458142 100644 --- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala +++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala @@ -313,8 +313,8 @@ class AdminZkClientTest extends ZooKeeperTestHarness with Logging with RackAware // Test that the existing clientId overrides are read val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect))) servers = Seq(server) - assertEquals(new Quota(1000, true), server.apis.quotas.produce.quota("ANONYMOUS", clientId)) - assertEquals(new Quota(2000, true), server.apis.quotas.fetch.quota("ANONYMOUS", clientId)) + assertEquals(new Quota(1000, true), server.dataPlaneRequestProcessor.quotas.produce.quota("ANONYMOUS", clientId)) + assertEquals(new Quota(2000, true), server.dataPlaneRequestProcessor.quotas.fetch.quota("ANONYMOUS", clientId)) } @Test diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java index 8a2122d8e37ad..efd0d49be44cd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java @@ -517,7 +517,7 @@ public static void waitUntilMetadataIsPropagated(final List servers final long timeout) throws InterruptedException { TestUtils.waitForCondition(() -> { for (final KafkaServer server : servers) { - final MetadataCache metadataCache = server.apis().metadataCache(); + final MetadataCache metadataCache = server.dataPlaneRequestProcessor().metadataCache(); final Option partitionInfo = metadataCache.getPartitionInfo(topic, partition); if (partitionInfo.isEmpty()) { From 52f1e287af3cdf5146590700f760a685807d5f7e Mon Sep 17 00:00:00 2001 From: mgharat Date: Tue, 18 Dec 2018 09:28:17 -0800 Subject: [PATCH 2/3] Addressed comments on the PR. --- .../scala/kafka/network/SocketServer.scala | 28 +++++----- .../main/scala/kafka/server/KafkaConfig.scala | 6 +-- .../ControllerIntegrationTest.scala | 12 +++-- .../unit/kafka/network/SocketServerTest.scala | 54 ++++++++----------- 4 files changed, 47 insertions(+), 53 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 22940db38364c..a18eb2137224a 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -56,7 +56,8 @@ import scala.util.control.ControlThrowable * - data-plane : * - Handles requests from clients and other brokers in the cluster. * - The threading model is - * 1 Acceptor thread that handles new connections + * 1 Acceptor thread per listener, that handles new connections. + * It is possible to configure multiple data-planes by specifying multiple "," separated endpoints for "listeners" in KafkaConfig. * Acceptor has N Processor threads that each have their own selector and read requests from sockets * M Handler threads that handle requests and produce responses back to the processor threads for writing. * - control-plane : @@ -67,7 +68,6 @@ import scala.util.control.ControlThrowable * Acceptor has 1 Processor thread that has its own selector and read requests from the socket. * 1 Handler thread that handles requests and produce responses back to the processor thread for writing. */ - class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time, val credentialProvider: CredentialProvider) extends Logging with KafkaMetricsGroup { val DataPlanePrefix = "data-plane" @@ -119,7 +119,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time } } - newGauge("DataPlaneNetworkProcessorAvgIdlePercent", + newGauge("NetworkProcessorAvgIdlePercent", new Gauge[Double] { def value = SocketServer.this.synchronized { @@ -223,17 +223,17 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time private def createControlPlaneAcceptorAndProcessor(endpointOpt: Option[EndPoint]): Unit = synchronized { endpointOpt.foreach { endpoint => - controlPlaneAcceptorOpt = Some(createAcceptor(endpoint)) - controlPlaneProcessorOpt = Some(newProcessor(nextProcessorId, controlPlaneRequestChannelOpt.get, connectionQuotas, endpoint.listenerName, endpoint.securityProtocol, memoryPool)) + val controlPlaneAcceptor = createAcceptor(endpoint) + val controlPlaneProcessor = newProcessor(nextProcessorId, controlPlaneRequestChannelOpt.get, connectionQuotas, endpoint.listenerName, endpoint.securityProtocol, memoryPool) + controlPlaneAcceptorOpt = Some(controlPlaneAcceptor) + controlPlaneProcessorOpt = Some(controlPlaneProcessor) val listenerProcessors = new ArrayBuffer[Processor]() - controlPlaneProcessorOpt.foreach { processor => - listenerProcessors += processor - controlPlaneRequestChannelOpt.foreach(_.addProcessor(processor)) - val controlPlaneAcceptor = controlPlaneAcceptorOpt.get - controlPlaneAcceptor.addProcessors(listenerProcessors, ControlPlanePrefix) - KafkaThread.nonDaemon(s"control-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", controlPlaneAcceptor).start() - controlPlaneAcceptor.awaitStartup() - } + listenerProcessors += controlPlaneProcessor + controlPlaneRequestChannelOpt.foreach(_.addProcessor(controlPlaneProcessor)) + nextProcessorId += 1 + controlPlaneAcceptor.addProcessors(listenerProcessors, ControlPlanePrefix) + KafkaThread.nonDaemon(s"control-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", controlPlaneAcceptor).start() + controlPlaneAcceptor.awaitStartup() info(s"Created control-plane acceptor and processor for endpoint : $endpoint") } } @@ -306,7 +306,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time if (dataPlaneAcceptors.containsKey(endpoints(listenerName))) { dataPlaneAcceptors.get(endpoints(listenerName)).serverChannel.socket.getLocalPort } else { - controlPlaneAcceptorOpt.map (_.serverChannel.socket().getLocalPort).getOrElse(throw new KafkaException()) + controlPlaneAcceptorOpt.map (_.serverChannel.socket().getLocalPort).getOrElse(throw new KafkaException("Could not find listenerName : " + listenerName + " in data-plane or control-plane")) } } catch { case e: Exception => diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 7b13130d04ea3..c3c3295835cfb 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -504,7 +504,7 @@ object KafkaConfig { val NumIoThreadsDoc = "The number of threads that the server uses for processing requests, which may include disk I/O" val NumReplicaAlterLogDirsThreadsDoc = "The number of threads that can move replicas between log directories, which may include disk I/O" val BackgroundThreadsDoc = "The number of threads to use for various background processing tasks" - val QueuedMaxRequestsDoc = "The number of queued requests allowed before blocking the network threads" + val QueuedMaxRequestsDoc = "The number of queued requests allowed for data-plane, before blocking the network threads" val QueuedMaxRequestBytesDoc = "The number of queued bytes allowed before no more requests are read" val RequestTimeoutMsDoc = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC /************* Authorizer Configuration ***********/ @@ -1283,8 +1283,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO def interBrokerListenerName = getInterBrokerListenerNameAndSecurityProtocol._1 def interBrokerSecurityProtocol = getInterBrokerListenerNameAndSecurityProtocol._2 - def controlPlaneListenerName = getControlPlaneListenerNameAndSecurityProtocol.map(_._1) - def controlPlaneSecurityProtocol = getControlPlaneListenerNameAndSecurityProtocol.map(_._2) + def controlPlaneListenerName = getControlPlaneListenerNameAndSecurityProtocol.map { case (listenerName, securityProtocol) => listenerName } + def controlPlaneSecurityProtocol = getControlPlaneListenerNameAndSecurityProtocol.map { case (listenerName, securityProtocol) => securityProtocol } def saslMechanismInterBrokerProtocol = getString(KafkaConfig.SaslMechanismInterBrokerProtocolProp) val saslInterBrokerHandshakeRequestEnable = interBrokerProtocolVersion >= KAFKA_0_10_0_IV1 diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala index b45c648950561..d245ce2dba99e 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala @@ -100,10 +100,14 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { dataPlaneMetricMap.put(kafkaMetric.metricName().name(), kafkaMetric) } } - assertTrue(controlPlaneMetricMap.get("response-total").get.metricValue().asInstanceOf[Double] == 1.0 && dataPlaneMetricMap.get("response-total").get.metricValue().asInstanceOf[Double] == 0.0) - assertTrue(controlPlaneMetricMap.get("request-total").get.metricValue().asInstanceOf[Double] == 1.0 && dataPlaneMetricMap.get("request-total").get.metricValue().asInstanceOf[Double] == 0.0) - assertTrue(controlPlaneMetricMap.get("incoming-byte-total").get.metricValue().asInstanceOf[Double] > 1.0 && dataPlaneMetricMap.get("incoming-byte-total").get.metricValue().asInstanceOf[Double] == 0.0) - assertTrue(controlPlaneMetricMap.get("network-io-total").get.metricValue().asInstanceOf[Double] == 2.0 && dataPlaneMetricMap.get("network-io-total").get.metricValue().asInstanceOf[Double] == 0.0) + assertTrue(controlPlaneMetricMap.get("response-total").get.metricValue().asInstanceOf[Double] == 1.0) + assertTrue(dataPlaneMetricMap.get("response-total").get.metricValue().asInstanceOf[Double] == 0.0) + assertTrue(controlPlaneMetricMap.get("request-total").get.metricValue().asInstanceOf[Double] == 1.0) + assertTrue(dataPlaneMetricMap.get("request-total").get.metricValue().asInstanceOf[Double] == 0.0) + assertTrue(controlPlaneMetricMap.get("incoming-byte-total").get.metricValue().asInstanceOf[Double] > 1.0) + assertTrue(dataPlaneMetricMap.get("incoming-byte-total").get.metricValue().asInstanceOf[Double] == 0.0) + assertTrue(controlPlaneMetricMap.get("network-io-total").get.metricValue().asInstanceOf[Double] == 2.0) + assertTrue(dataPlaneMetricMap.get("network-io-total").get.metricValue().asInstanceOf[Double] == 0.0) } // This test case is used to ensure that there will be no correctness issue after we avoid sending out full diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index cb41638f2e723..546b8815e2e71 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -135,7 +135,7 @@ class SocketServerTest extends JUnitSuite { } def connect(s: SocketServer = server, listenerName: ListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), localAddr: InetAddress = null, port: Int = 0) = { - val socket = new Socket("localhost", s.boundPort(/*ListenerName.forSecurityProtocol(protocol)*/listenerName), localAddr, port) + val socket = new Socket("localhost", s.boundPort(listenerName), localAddr, port) sockets += socket socket } @@ -193,7 +193,7 @@ class SocketServerTest extends JUnitSuite { testProps.put("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT") testProps.put("control.plane.listener.name", "CONTROLLER") val config = KafkaConfig.fromProps(testProps) - withTestableControlPlaneServer (config, { testableServer => + withTestableServer(config, { testableServer => val socket = connect(testableServer, config.controlPlaneListenerName.get, localAddr = InetAddress.getLocalHost, port = 5000) sendAndReceiveControllerRequest(socket, testableServer) }) @@ -860,7 +860,7 @@ class SocketServerTest extends JUnitSuite { */ @Test def configureNewConnectionException(): Unit = { - withTestableServer { testableServer => + withTestableServer (testWithServer = { testableServer => val testableSelector = testableServer.testableSelector testableSelector.updateMinWakeup(2) @@ -870,7 +870,7 @@ class SocketServerTest extends JUnitSuite { TestUtils.waitUntilTrue(() => testableServer.connectionCount(localAddress) == 1, "Failed channel not removed") assertProcessorHealthy(testableServer, testableSelector.notFailed(sockets)) - } + }) } /** @@ -885,7 +885,7 @@ class SocketServerTest extends JUnitSuite { */ @Test def processNewResponseException(): Unit = { - withTestableServer { testableServer => + withTestableServer (testWithServer = { testableServer => val testableSelector = testableServer.testableSelector testableSelector.updateMinWakeup(2) @@ -898,7 +898,7 @@ class SocketServerTest extends JUnitSuite { testableServer.waitForChannelClose(testableSelector.allFailedChannels.head, locallyClosed = true) assertProcessorHealthy(testableServer, testableSelector.notFailed(sockets)) - } + }) } /** @@ -908,7 +908,7 @@ class SocketServerTest extends JUnitSuite { */ @Test def sendCancelledKeyException(): Unit = { - withTestableServer { testableServer => + withTestableServer (testWithServer = { testableServer => val testableSelector = testableServer.testableSelector testableSelector.updateMinWakeup(2) @@ -926,7 +926,7 @@ class SocketServerTest extends JUnitSuite { val successfulSocket = if (isSocketConnectionId(failedConnectionId, sockets(0))) sockets(1) else sockets(0) assertProcessorHealthy(testableServer, Seq(successfulSocket)) - } + }) } /** @@ -936,7 +936,7 @@ class SocketServerTest extends JUnitSuite { */ @Test def closingChannelException(): Unit = { - withTestableServer { testableServer => + withTestableServer (testWithServer = { testableServer => val testableSelector = testableServer.testableSelector testableSelector.updateMinWakeup(2) @@ -953,7 +953,7 @@ class SocketServerTest extends JUnitSuite { testableServer.waitForChannelClose(request.context.connectionId, locallyClosed = true) assertProcessorHealthy(testableServer, Seq(sockets(1))) - } + }) } /** @@ -968,7 +968,7 @@ class SocketServerTest extends JUnitSuite { */ @Test def processCompletedReceiveException(): Unit = { - withTestableServer { testableServer => + withTestableServer (testWithServer = { testableServer => val sockets = (1 to 2).map(_ => connect(testableServer)) val testableSelector = testableServer.testableSelector val requestChannel = testableServer.dataPlaneRequestChannel @@ -982,7 +982,7 @@ class SocketServerTest extends JUnitSuite { requests.foreach(processRequest(requestChannel, _)) assertProcessorHealthy(testableServer, testableSelector.notFailed(sockets)) - } + }) } /** @@ -997,7 +997,7 @@ class SocketServerTest extends JUnitSuite { */ @Test def processCompletedSendException(): Unit = { - withTestableServer { testableServer => + withTestableServer (testWithServer = { testableServer => val testableSelector = testableServer.testableSelector val sockets = (1 to 2).map(_ => connect(testableServer)) val requests = sockets.map(sendAndReceiveRequest(_, testableServer)) @@ -1008,7 +1008,7 @@ class SocketServerTest extends JUnitSuite { testableServer.waitForChannelClose(testableSelector.allFailedChannels.head, locallyClosed = true) assertProcessorHealthy(testableServer, testableSelector.notFailed(sockets)) - } + }) } /** @@ -1021,7 +1021,7 @@ class SocketServerTest extends JUnitSuite { */ @Test def processDisconnectedException(): Unit = { - withTestableServer { testableServer => + withTestableServer (testWithServer = { testableServer => val (socket, connectionId) = connectAndProcessRequest(testableServer) val testableSelector = testableServer.testableSelector @@ -1035,7 +1035,7 @@ class SocketServerTest extends JUnitSuite { testableServer.waitForChannelClose(connectionId, locallyClosed = false) assertProcessorHealthy(testableServer) - } + }) } /** @@ -1043,7 +1043,7 @@ class SocketServerTest extends JUnitSuite { */ @Test def pollException(): Unit = { - withTestableServer { testableServer => + withTestableServer (testWithServer = { testableServer => val (socket, _) = connectAndProcessRequest(testableServer) val testableSelector = testableServer.testableSelector @@ -1052,7 +1052,7 @@ class SocketServerTest extends JUnitSuite { testableSelector.waitForOperations(SelectorOperation.Poll, 2) assertProcessorHealthy(testableServer, Seq(socket)) - } + }) } /** @@ -1060,7 +1060,7 @@ class SocketServerTest extends JUnitSuite { */ @Test def controlThrowable(): Unit = { - withTestableServer { testableServer => + withTestableServer (testWithServer = { testableServer => connectAndProcessRequest(testableServer) val testableSelector = testableServer.testableSelector @@ -1070,25 +1070,15 @@ class SocketServerTest extends JUnitSuite { testableSelector.waitForOperations(SelectorOperation.Poll, 1) testableSelector.waitForOperations(SelectorOperation.CloseSelector, 1) - } + }) } - private def withTestableServer(testWithServer: TestableSocketServer => Unit): Unit = { + private def withTestableServer(config : KafkaConfig = config, testWithServer: TestableSocketServer => Unit): Unit = { props.put("listeners", "PLAINTEXT://localhost:0") - val testableServer = new TestableSocketServer - testableServer.startup() - try { - testWithServer(testableServer) - } finally { - shutdownServerAndMetrics(testableServer) - } - } - - private def withTestableControlPlaneServer(config : KafkaConfig, testWithServer: TestableSocketServer => Unit) = { val testableServer = new TestableSocketServer(config) testableServer.startup() try { - testWithServer(testableServer) + testWithServer(testableServer) } finally { shutdownServerAndMetrics(testableServer) } From c40a575b7d984205ae4a8ac4b0584b9731566090 Mon Sep 17 00:00:00 2001 From: mgharat Date: Fri, 21 Dec 2018 13:24:11 -0800 Subject: [PATCH 3/3] Addressed comments on the PR. Changed the metric ControlPlaneNetworkProcessorAvgIdlePercent to return Double.NaN, if the control-plane listener is not configured. --- .../scala/kafka/network/SocketServer.scala | 32 ++++++++++--------- .../ControllerIntegrationTest.scala | 10 +++--- .../unit/kafka/network/SocketServerTest.scala | 2 +- .../unit/kafka/server/KafkaConfigTest.scala | 4 +-- 4 files changed, 25 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index a18eb2137224a..125efdc373d18 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -83,11 +83,11 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal", "socket-server-metrics") memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName)) private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE - // data plane + // data-plane private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]() private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]() val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests) - // control plane + // control-plane private var controlPlaneProcessorOpt : Option[Processor] = None private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ => new RequestChannel(20, RequestChannel.ControlPlaneMetricPrefix)) @@ -141,7 +141,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time } ioWaitRatioMetricName.map { metricName => Option(metrics.metric(metricName)).fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0)) - }.getOrElse(0.0) + }.getOrElse(Double.NaN) } } ) @@ -182,28 +182,29 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time } ) info(s"Started ${dataPlaneAcceptors.size} acceptor threads for data-plane") - controlPlaneAcceptorOpt.foreach(_ => info("Started 1 acceptor thread for control-plane")) + if (controlPlaneAcceptorOpt.isDefined) + info("Started control-plane acceptor thread") } /** - * Starts processors of all the data plane acceptors of this server if they have not already been started. - * This method is used for delayed starting of data plane processors if [[kafka.network.SocketServer#startup]] + * Starts processors of all the data-plane acceptors of this server if they have not already been started. + * This method is used for delayed starting of data-plane processors if [[kafka.network.SocketServer#startup]] * was invoked with `startupProcessors=false`. */ def startDataPlaneProcessors(): Unit = synchronized { dataPlaneAcceptors.values.asScala.foreach { _.startProcessors(DataPlanePrefix) } - info(s"Started data plane processors for ${dataPlaneAcceptors.size} acceptors") + info(s"Started data-plane processors for ${dataPlaneAcceptors.size} acceptors") } /** - * Start the processor of control plane acceptor of this server if it has not already been started. - * This method is used for delayed starting of control plane processor if [[kafka.network.SocketServer#startup]] + * Start the processor of control-plane acceptor of this server if it has not already been started. + * This method is used for delayed starting of control-plane processor if [[kafka.network.SocketServer#startup]] * was invoked with `startupProcessors=false`. */ def startControlPlaneProcessor(): Unit = synchronized { - controlPlaneAcceptorOpt.foreach { acceptor => - acceptor.startProcessors(ControlPlanePrefix) - info(s"Started control plane processor for ${controlPlaneAcceptorOpt.size} acceptors") + if (controlPlaneAcceptorOpt.isDefined) { + controlPlaneAcceptorOpt.get.startProcessors(ControlPlanePrefix) + info(s"Started control-plane processor for the control-plane acceptor") } } @@ -277,7 +278,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time } def resizeThreadPool(oldNumNetworkThreads: Int, newNumNetworkThreads: Int): Unit = synchronized { - info(s"Resizing network thread pool size for each data plane listener from $oldNumNetworkThreads to $newNumNetworkThreads") + info(s"Resizing network thread pool size for each data-plane listener from $oldNumNetworkThreads to $newNumNetworkThreads") if (newNumNetworkThreads > oldNumNetworkThreads) { dataPlaneAcceptors.asScala.foreach { case (endpoint, acceptor) => addDataPlaneProcessors(acceptor, endpoint, newNumNetworkThreads - oldNumNetworkThreads) @@ -303,8 +304,9 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time def boundPort(listenerName: ListenerName): Int = { try { - if (dataPlaneAcceptors.containsKey(endpoints(listenerName))) { - dataPlaneAcceptors.get(endpoints(listenerName)).serverChannel.socket.getLocalPort + val acceptor = dataPlaneAcceptors.get(endpoints(listenerName)) + if (acceptor != null) { + acceptor.serverChannel.socket.getLocalPort } else { controlPlaneAcceptorOpt.map (_.serverChannel.socket().getLocalPort).getOrElse(throw new KafkaException("Could not find listenerName : " + listenerName + " in data-plane or control-plane")) } diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala index d245ce2dba99e..08747a85a8e57 100644 --- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala @@ -86,7 +86,7 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { } @Test - def testMetadaPropogationOnControlPlane(): Unit = { + def testMetadataPropagationOnControlPlane(): Unit = { servers = makeServers(1, listeners = Some("PLAINTEXT://localhost:0,CONTROLLER://localhost:5000"), listenerSecurityProtocolMap = Some("PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT"), controlPlaneListenerName = Some("CONTROLLER")) TestUtils.waitUntilBrokerMetadataIsPropagated(servers) @@ -100,10 +100,10 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness { dataPlaneMetricMap.put(kafkaMetric.metricName().name(), kafkaMetric) } } - assertTrue(controlPlaneMetricMap.get("response-total").get.metricValue().asInstanceOf[Double] == 1.0) - assertTrue(dataPlaneMetricMap.get("response-total").get.metricValue().asInstanceOf[Double] == 0.0) - assertTrue(controlPlaneMetricMap.get("request-total").get.metricValue().asInstanceOf[Double] == 1.0) - assertTrue(dataPlaneMetricMap.get("request-total").get.metricValue().asInstanceOf[Double] == 0.0) + assertEquals(1e-0, controlPlaneMetricMap.get("response-total").get.metricValue().asInstanceOf[Double], 0) + assertEquals(0e-0, dataPlaneMetricMap.get("response-total").get.metricValue().asInstanceOf[Double], 0) + assertEquals(1e-0, controlPlaneMetricMap.get("request-total").get.metricValue().asInstanceOf[Double], 0) + assertEquals(0e-0, dataPlaneMetricMap.get("request-total").get.metricValue().asInstanceOf[Double], 0) assertTrue(controlPlaneMetricMap.get("incoming-byte-total").get.metricValue().asInstanceOf[Double] > 1.0) assertTrue(dataPlaneMetricMap.get("incoming-byte-total").get.metricValue().asInstanceOf[Double] == 0.0) assertTrue(controlPlaneMetricMap.get("network-io-total").get.metricValue().asInstanceOf[Double] == 2.0) diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 546b8815e2e71..2609d9e0b7905 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -822,7 +822,7 @@ class SocketServerTest extends JUnitSuite { .allMetrics.asScala .filterKeys(k => k.getName.endsWith("IdlePercent") || k.getName.endsWith("NetworkProcessorAvgIdlePercent")) .collect { case (k, metric: Gauge[_]) => (k, metric.value().asInstanceOf[Double]) } - .filter { case (_, value) => value != 0.0 } + .filter { case (_, value) => value != 0.0 && !value.equals(Double.NaN) } assertEquals(Map.empty, nonZeroMetricNamesAndValues) } diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 44b321e5735d0..f21f38427b2d0 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -243,13 +243,13 @@ class KafkaConfigTest { assertEquals(5000, controlEndpoint.port) assertEquals(SecurityProtocol.SSL, controlEndpoint.securityProtocol) - //advertised listener should contain control plane listener + //advertised listener should contain control-plane listener val advertisedEndpoints = serverConfig.advertisedListeners assertFalse(advertisedEndpoints.filter { endpoint => endpoint.securityProtocol == controlEndpoint.securityProtocol && endpoint.listenerName.value().equals(controlEndpoint.listenerName.value()) }.isEmpty) - // interBrokerListener name should be different from control plane listener name + // interBrokerListener name should be different from control-plane listener name val interBrokerListenerName = serverConfig.interBrokerListenerName assertFalse(interBrokerListenerName.value().equals(controlEndpoint.listenerName.value())) }