From e76144db025614cb0d9d0ed1c53031b9d4969a94 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Fri, 27 Dec 2024 03:59:39 +0000 Subject: [PATCH 01/10] remove control.Plane.listener --- .../controller/ControllerChannelManager.scala | 4 +- .../scala/kafka/network/SocketServer.scala | 23 +--- .../main/scala/kafka/server/KafkaConfig.scala | 40 ------ .../main/scala/kafka/server/KafkaServer.scala | 14 +-- .../NodeToControllerChannelManager.scala | 4 +- .../unit/kafka/network/SocketServerTest.scala | 115 ------------------ .../unit/kafka/server/KafkaConfigTest.scala | 42 ------- .../server/SaslApiVersionsRequestTest.scala | 2 +- .../kafka/network/SocketServerConfigs.java | 24 ---- 9 files changed, 7 insertions(+), 261 deletions(-) diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index cea7368378dda..659d7e2f6ed06 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -116,8 +116,8 @@ class ControllerChannelManager(controllerEpoch: () => Int, private def addNewBroker(broker: Broker): Unit = { val messageQueue = new LinkedBlockingQueue[QueueItem] debug(s"Controller ${config.brokerId} trying to connect to broker ${broker.id}") - val controllerToBrokerListenerName = config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName) - val controllerToBrokerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol) + val controllerToBrokerListenerName = config.interBrokerListenerName + val controllerToBrokerSecurityProtocol = config.interBrokerSecurityProtocol val brokerNode = broker.node(controllerToBrokerListenerName) val logContext = new LogContext(s"[Controller id=${config.brokerId}, targetBrokerId=${brokerNode.idString}] ") val (networkClient, reconfigurableChannelBuilder) = { diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 093f5298da4f2..03c61f6417ceb 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -107,8 +107,6 @@ class SocketServer( val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics) // control-plane private[network] var controlPlaneAcceptorOpt: Option[ControlPlaneAcceptor] = None - val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ => - new RequestChannel(20, ControlPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics)) private[this] val nextProcessorId: AtomicInteger = new AtomicInteger(0) val connectionQuotas = new ConnectionQuotas(config, time, metrics) @@ -178,7 +176,6 @@ class SocketServer( if (apiVersionManager.listenerType.equals(ListenerType.CONTROLLER)) { config.controllerListeners.foreach(createDataPlaneAcceptorAndProcessors) } else { - config.controlPlaneListener.foreach(createControlPlaneAcceptorAndProcessor) config.dataPlaneListeners.foreach(createDataPlaneAcceptorAndProcessors) } @@ -251,8 +248,7 @@ class SocketServer( } val parsedConfigs = config.valuesFromThisConfigWithPrefixOverride(endpoint.listenerName.configPrefix) connectionQuotas.addListener(config, endpoint.listenerName) - val isPrivilegedListener = controlPlaneRequestChannelOpt.isEmpty && - config.interBrokerListenerName == endpoint.listenerName + val isPrivilegedListener = config.interBrokerListenerName == endpoint.listenerName val dataPlaneAcceptor = createDataPlaneAcceptor(endpoint, isPrivilegedListener, dataPlaneRequestChannel) config.addReconfigurable(dataPlaneAcceptor) dataPlaneAcceptor.configure(parsedConfigs) @@ -260,27 +256,12 @@ class SocketServer( info(s"Created data-plane acceptor and processors for endpoint : ${endpoint.listenerName}") } - private def createControlPlaneAcceptorAndProcessor(endpoint: EndPoint): Unit = synchronized { - if (stopped) { - throw new RuntimeException("Can't create new control plane acceptor and processor: SocketServer is stopped.") - } - connectionQuotas.addListener(config, endpoint.listenerName) - val controlPlaneAcceptor = createControlPlaneAcceptor(endpoint, controlPlaneRequestChannelOpt.get) - controlPlaneAcceptor.addProcessors(1) - controlPlaneAcceptorOpt = Some(controlPlaneAcceptor) - info(s"Created control-plane acceptor and processor for endpoint : ${endpoint.listenerName}") - } - private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap protected def createDataPlaneAcceptor(endPoint: EndPoint, isPrivilegedListener: Boolean, requestChannel: RequestChannel): DataPlaneAcceptor = { new DataPlaneAcceptor(this, endPoint, config, nodeId, connectionQuotas, time, isPrivilegedListener, requestChannel, metrics, credentialProvider, logContext, memoryPool, apiVersionManager) } - private def createControlPlaneAcceptor(endPoint: EndPoint, requestChannel: RequestChannel): ControlPlaneAcceptor = { - new ControlPlaneAcceptor(this, endPoint, config, nodeId, connectionQuotas, time, requestChannel, metrics, credentialProvider, logContext, memoryPool, apiVersionManager) - } - /** * Stop processing requests and new connections. */ @@ -293,7 +274,6 @@ class SocketServer( dataPlaneAcceptors.asScala.values.foreach(_.close()) controlPlaneAcceptorOpt.foreach(_.close()) dataPlaneRequestChannel.clear() - controlPlaneRequestChannelOpt.foreach(_.clear()) info("Stopped socket server request processors") } } @@ -309,7 +289,6 @@ class SocketServer( this.synchronized { stopProcessingRequests() dataPlaneRequestChannel.shutdown() - controlPlaneRequestChannelOpt.foreach(_.shutdown()) connectionQuotas.close() } info("Shutdown completed") diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 513c1273d164c..de3b2b04ee213 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -579,8 +579,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) def interBrokerListenerName = getInterBrokerListenerNameAndSecurityProtocol._1 def interBrokerSecurityProtocol = getInterBrokerListenerNameAndSecurityProtocol._2 - def controlPlaneListenerName = getControlPlaneListenerNameAndSecurityProtocol.map { case (listenerName, _) => listenerName } - def controlPlaneSecurityProtocol = getControlPlaneListenerNameAndSecurityProtocol.map { case (_, securityProtocol) => securityProtocol } def saslMechanismInterBrokerProtocol = getString(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG) val saslInterBrokerHandshakeRequestEnable = interBrokerProtocolVersion.isSaslInterBrokerHandshakeRequestEnabled @@ -665,16 +663,9 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) def saslMechanismControllerProtocol: String = getString(KRaftConfigs.SASL_MECHANISM_CONTROLLER_PROTOCOL_CONFIG) - def controlPlaneListener: Option[EndPoint] = { - controlPlaneListenerName.map { listenerName => - listeners.filter(endpoint => endpoint.listenerName.value() == listenerName.value()).head - } - } - def dataPlaneListeners: Seq[EndPoint] = { listeners.filterNot { listener => val name = listener.listenerName.value() - name.equals(getString(SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG)) || controllerListenerNames.contains(name) } } @@ -723,19 +714,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) } } - private def getControlPlaneListenerNameAndSecurityProtocol: Option[(ListenerName, SecurityProtocol)] = { - Option(getString(SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG)) match { - case Some(name) => - val listenerName = ListenerName.normalised(name) - val securityProtocol = effectiveListenerSecurityProtocolMap.getOrElse(listenerName, - throw new ConfigException(s"Listener with ${listenerName.value} defined in " + - s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG} not found in ${SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG}.")) - Some(listenerName, securityProtocol) - - case None => None - } - } - private def getSecurityProtocol(protocolName: String, configName: String): SecurityProtocol = { try SecurityProtocol.forName(protocolName) catch { @@ -819,10 +797,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) } } - def validateControlPlaneListenerEmptyForKRaft(): Unit = { - require(controlPlaneListenerName.isEmpty, - s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG} is not supported in KRaft mode.") - } def validateControllerQuorumVotersMustContainNodeIdForKRaftController(): Unit = { require(voterIds.isEmpty || voterIds.contains(nodeId), s"If ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains the 'controller' role, the node id $nodeId must be included in the set of voters ${QuorumConfig.QUORUM_VOTERS_CONFIG}=${voterIds.asScala.toSet}") @@ -844,7 +818,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) if (processRoles == Set(ProcessRole.BrokerRole)) { // KRaft broker-only validateQuorumVotersAndQuorumBootstrapServerForKRaft() - validateControlPlaneListenerEmptyForKRaft() // nodeId must not appear in controller.quorum.voters require(!voterIds.contains(nodeId), s"If ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains just the 'broker' role, the node id $nodeId must not be included in the set of voters ${QuorumConfig.QUORUM_VOTERS_CONFIG}=${voterIds.asScala.toSet}") @@ -869,7 +842,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) } else if (processRoles == Set(ProcessRole.ControllerRole)) { // KRaft controller-only validateQuorumVotersAndQuorumBootstrapServerForKRaft() - validateControlPlaneListenerEmptyForKRaft() // listeners should only contain listeners also enumerated in the controller listener require( effectiveAdvertisedControllerListeners.size == listeners.size, @@ -888,7 +860,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) } else if (isKRaftCombinedMode) { // KRaft combined broker and controller validateQuorumVotersAndQuorumBootstrapServerForKRaft() - validateControlPlaneListenerEmptyForKRaft() validateControllerQuorumVotersMustContainNodeIdForKRaftController() validateAdvertisedControllerListenersNonEmptyForKRaftController() validateControllerListenerNamesMustAppearInListenersForKRaftController() @@ -920,17 +891,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) s"${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG} cannot use the nonroutable meta-address 0.0.0.0. "+ s"Use a routable IP address.") - // validate control.plane.listener.name config - if (controlPlaneListenerName.isDefined) { - require(advertisedBrokerListenerNames.contains(controlPlaneListenerName.get), - s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG} must be a listener name defined in ${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG}. " + - s"The valid options based on currently configured listeners are ${advertisedBrokerListenerNames.map(_.value).mkString(",")}") - // controlPlaneListenerName should be different from interBrokerListenerName - require(!controlPlaneListenerName.get.value().equals(interBrokerListenerName.value()), - s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG}, when defined, should have a different value from the inter broker listener name. " + - s"Currently they both have the value ${controlPlaneListenerName.get}") - } - if (groupCoordinatorConfig.offsetTopicCompressionType == CompressionType.ZSTD) require(interBrokerProtocolVersion.highestSupportedRecordVersion().value >= IBP_2_1_IV0.highestSupportedRecordVersion().value, "offsets.topic.compression.codec zstd can only be used when inter.broker.protocol.version " + diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index afd8429e57d18..3de2d26e72dd7 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -25,7 +25,7 @@ import kafka.coordinator.transaction.{TransactionCoordinator, ZkProducerIdManage import kafka.log.LogManager import kafka.log.remote.RemoteLogManager import kafka.metrics.KafkaMetricsReporter -import kafka.network.{ControlPlaneAcceptor, DataPlaneAcceptor, RequestChannel, SocketServer} +import kafka.network.{DataPlaneAcceptor, RequestChannel, SocketServer} import kafka.server.metadata.{ZkConfigRepository, ZkMetadataCache} import kafka.utils._ import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient} @@ -125,12 +125,10 @@ class KafkaServer( var metrics: Metrics = _ @volatile var dataPlaneRequestProcessor: KafkaApis = _ - private var controlPlaneRequestProcessor: KafkaApis = _ var authorizer: Option[Authorizer] = None @volatile var socketServer: SocketServer = _ var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = _ - private var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = _ var logDirFailureChannel: LogDirFailureChannel = _ @volatile private var _logManager: LogManager = _ @@ -507,12 +505,6 @@ class KafkaServer( dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time, config.numIoThreads, s"${DataPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent", DataPlaneAcceptor.ThreadPrefix) - socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel => - controlPlaneRequestProcessor = createKafkaApis(controlPlaneRequestChannel) - controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time, - 1, s"${ControlPlaneAcceptor.MetricPrefix}RequestHandlerAvgIdlePercent", ControlPlaneAcceptor.ThreadPrefix) - } - Mx4jLoader.maybeLoad() /* Add all reconfigurables for config change notification before starting config handlers */ @@ -868,8 +860,6 @@ class KafkaServer( CoreUtils.swallow(socketServer.stopProcessingRequests(), this) if (dataPlaneRequestHandlerPool != null) CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this) - if (controlPlaneRequestHandlerPool != null) - CoreUtils.swallow(controlPlaneRequestHandlerPool.shutdown(), this) /** * We must shutdown the scheduler early because otherwise, the scheduler could touch other @@ -886,8 +876,6 @@ class KafkaServer( if (dataPlaneRequestProcessor != null) CoreUtils.swallow(dataPlaneRequestProcessor.close(), this) - if (controlPlaneRequestProcessor != null) - CoreUtils.swallow(controlPlaneRequestProcessor.close(), this) authorizer.foreach(Utils.closeQuietly(_, "authorizer")) if (adminManager != null) CoreUtils.swallow(adminManager.shutdown(), this) diff --git a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala index 846bbe58ff9e0..2809ed822dacb 100644 --- a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala +++ b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala @@ -58,8 +58,8 @@ class MetadataCacheControllerNodeProvider( val quorumControllerNodeProvider: () => Option[ControllerInformation] ) extends ControllerNodeProvider { - private val zkControllerListenerName = config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName) - private val zkControllerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol) + private val zkControllerListenerName = config.interBrokerListenerName + private val zkControllerSecurityProtocol = config.interBrokerSecurityProtocol private val zkControllerSaslMechanism = config.saslMechanismInterBrokerProtocol val emptyZkControllerInfo = ControllerInformation( diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala index 6d5529675fdf4..6d67cd3499212 100644 --- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala +++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala @@ -104,7 +104,6 @@ class SocketServerTest { logLevelToRestore = kafkaLogger.getLevel Configurator.setLevel(kafkaLogger.getName, Level.TRACE) - assertTrue(server.controlPlaneRequestChannelOpt.isEmpty) } @AfterEach @@ -314,65 +313,6 @@ class SocketServerTest { ) } - @Test - def testStagedListenerStartup(): Unit = { - shutdownServerAndMetrics(server) - val testProps = new Properties - testProps ++= props - testProps.put("listeners", "EXTERNAL://localhost:0,INTERNAL://localhost:0,CONTROL_PLANE://localhost:0") - testProps.put("listener.security.protocol.map", "EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROL_PLANE:PLAINTEXT") - testProps.put("control.plane.listener.name", "CONTROL_PLANE") - testProps.put("inter.broker.listener.name", "INTERNAL") - val config = KafkaConfig.fromProps(testProps) - val testableServer = new TestableSocketServer(config) - - val updatedEndPoints = config.effectiveAdvertisedBrokerListeners.map { endpoint => - endpoint.copy(port = testableServer.boundPort(endpoint.listenerName)) - }.map(_.toJava) - - val externalReadyFuture = new CompletableFuture[Void]() - - def controlPlaneListenerStarted() = { - try { - val socket = connect(testableServer, config.controlPlaneListenerName.get, localAddr = InetAddress.getLocalHost) - sendAndReceiveControllerRequest(socket, testableServer) - true - } catch { - case _: Throwable => false - } - } - - def listenerStarted(listenerName: ListenerName) = { - try { - val socket = connect(testableServer, listenerName, localAddr = InetAddress.getLocalHost) - sendAndReceiveRequest(socket, testableServer) - true - } catch { - case _: Throwable => false - } - } - - try { - val externalListener = new ListenerName("EXTERNAL") - val externalEndpoint = updatedEndPoints.find(e => e.listenerName.get == externalListener.value).get - val controlPlaneListener = new ListenerName("CONTROL_PLANE") - val controlPlaneEndpoint = updatedEndPoints.find(e => e.listenerName.get == controlPlaneListener.value).get - val futures = Map( - externalEndpoint -> externalReadyFuture, - controlPlaneEndpoint -> CompletableFuture.completedFuture[Void](null)) - val requestProcessingFuture = testableServer.enableRequestProcessing(futures) - TestUtils.waitUntilTrue(() => controlPlaneListenerStarted(), "Control plane listener not started") - assertFalse(listenerStarted(config.interBrokerListenerName)) - assertFalse(listenerStarted(externalListener)) - externalReadyFuture.complete(null) - TestUtils.waitUntilTrue(() => listenerStarted(config.interBrokerListenerName), "Inter-broker listener not started") - TestUtils.waitUntilTrue(() => listenerStarted(externalListener), "External listener not started") - requestProcessingFuture.get(1, TimeUnit.MINUTES) - } finally { - shutdownServerAndMetrics(testableServer) - } - } - @Test def testStagedListenerShutdownWhenConnectionQueueIsFull(): Unit = { shutdownServerAndMetrics(server) @@ -1598,8 +1538,6 @@ class SocketServerTest { val testableServer = new TestableSocketServer(time = time) testableServer.enableRequestProcessing(Map.empty).get(1, TimeUnit.MINUTES) - assertTrue(testableServer.controlPlaneRequestChannelOpt.isEmpty) - val proxyServer = new ProxyServer(testableServer) try { val testableSelector = testableServer.testableSelector @@ -1831,27 +1769,6 @@ class SocketServerTest { } } - - @Test - def testControlPlaneAsPrivilegedListener(): Unit = { - val testProps = new Properties - testProps ++= props - testProps.put("listeners", "PLAINTEXT://localhost:0,CONTROLLER://localhost:0") - testProps.put("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT") - testProps.put("control.plane.listener.name", "CONTROLLER") - val config = KafkaConfig.fromProps(testProps) - withTestableServer(config, { testableServer => - val controlPlaneSocket = connect(testableServer, config.controlPlaneListenerName.get, - localAddr = InetAddress.getLocalHost) - val sentRequest = sendAndReceiveControllerRequest(controlPlaneSocket, testableServer) - assertTrue(sentRequest.context.fromPrivilegedListener) - - val plainSocket = connect(testableServer, localAddr = InetAddress.getLocalHost) - val plainRequest = sendAndReceiveRequest(plainSocket, testableServer) - assertFalse(plainRequest.context.fromPrivilegedListener) - }) - } - @Test def testInterBrokerListenerAsPrivilegedListener(): Unit = { val testProps = new Properties @@ -1873,33 +1790,6 @@ class SocketServerTest { }) } - @Test - def testControlPlaneTakePrecedenceOverInterBrokerListenerAsPrivilegedListener(): Unit = { - val testProps = new Properties - testProps ++= props - testProps.put("listeners", "EXTERNAL://localhost:0,INTERNAL://localhost:0,CONTROLLER://localhost:0") - testProps.put("listener.security.protocol.map", "EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT") - testProps.put("control.plane.listener.name", "CONTROLLER") - testProps.put("inter.broker.listener.name", "INTERNAL") - val config = KafkaConfig.fromProps(testProps) - withTestableServer(config, { testableServer => - val controlPlaneSocket = connect(testableServer, config.controlPlaneListenerName.get, - localAddr = InetAddress.getLocalHost) - val controlPlaneRequest = sendAndReceiveControllerRequest(controlPlaneSocket, testableServer) - assertTrue(controlPlaneRequest.context.fromPrivilegedListener) - - val interBrokerSocket = connect(testableServer, config.interBrokerListenerName, - localAddr = InetAddress.getLocalHost) - val interBrokerRequest = sendAndReceiveRequest(interBrokerSocket, testableServer) - assertFalse(interBrokerRequest.context.fromPrivilegedListener) - - val externalSocket = connect(testableServer, new ListenerName("EXTERNAL"), - localAddr = InetAddress.getLocalHost) - val externalRequest = sendAndReceiveRequest(externalSocket, testableServer) - assertFalse(externalRequest.context.fromPrivilegedListener) - }) - } - @Test def testListenBacklogSize(): Unit = { val backlogSize = 128 @@ -2053,11 +1943,6 @@ class SocketServerTest { } } - def sendAndReceiveControllerRequest(socket: Socket, server: SocketServer): RequestChannel.Request = { - sendRequest(socket, producerRequestBytes()) - receiveRequest(server.controlPlaneRequestChannelOpt.get) - } - private def assertProcessorHealthy(testableServer: TestableSocketServer, healthySockets: Seq[Socket] = Seq.empty): Unit = { val selector = testableServer.testableSelector selector.reset() diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 92a7d7a20038a..e66d0330cc052 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -257,31 +257,6 @@ class KafkaConfigTest { assertTrue(isValidKafkaConfig(props)) } - @Test - def testControlPlaneListenerName(): Unit = { - val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect) - props.setProperty("listeners", "PLAINTEXT://localhost:0,CONTROLLER://localhost:5000") - props.setProperty("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL") - props.setProperty("control.plane.listener.name", "CONTROLLER") - KafkaConfig.fromProps(props) - - val serverConfig = KafkaConfig.fromProps(props) - val controlEndpoint = serverConfig.controlPlaneListener.get - assertEquals("localhost", controlEndpoint.host) - assertEquals(5000, controlEndpoint.port) - assertEquals(SecurityProtocol.SSL, controlEndpoint.securityProtocol) - - //advertised listener should contain control-plane listener - val advertisedEndpoints = serverConfig.effectiveAdvertisedBrokerListeners - assertTrue(advertisedEndpoints.exists { endpoint => - endpoint.securityProtocol == controlEndpoint.securityProtocol && endpoint.listenerName.value().equals(controlEndpoint.listenerName.value()) - }) - - // interBrokerListener name should be different from control-plane listener name - val interBrokerListenerName = serverConfig.interBrokerListenerName - assertFalse(interBrokerListenerName.value().equals(controlEndpoint.listenerName.value())) - } - @Test def testControllerListenerNames(): Unit = { val props = new Properties() @@ -301,23 +276,6 @@ class KafkaConfigTest { assertEquals(SecurityProtocol.SASL_SSL, controllerEndpoint.securityProtocol) } - @Test - def testControlPlaneListenerNameNotAllowedWithKRaft(): Unit = { - val props = new Properties() - props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker,controller") - props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://localhost:9092,SSL://localhost:9093") - props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL") - props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2") - props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093") - props.setProperty(SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG, "SSL") - - assertFalse(isValidKafkaConfig(props)) - assertBadConfigContainingMessage(props, "control.plane.listener.name is not supported in KRaft mode.") - - props.remove(SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG) - KafkaConfig.fromProps(props) - } - @Test def testControllerListenerDefinedForKRaftController(): Unit = { val props = new Properties() diff --git a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala index 1e336abdc1839..877c204976843 100644 --- a/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/SaslApiVersionsRequestTest.scala @@ -51,7 +51,7 @@ object SaslApiVersionsRequestTest { // Configure control plane listener to make sure we have separate listeners for testing. val serverProperties = new java.util.HashMap[String, String]() - serverProperties.put(SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG, controlPlaneListenerName) +// serverProperties.put(SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG, controlPlaneListenerName) serverProperties.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, s"$controlPlaneListenerName:$securityProtocol,$securityProtocol:$securityProtocol") serverProperties.put("listeners", s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0") serverProperties.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0") diff --git a/server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java b/server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java index ba17a250ef2e9..12c23327e5da8 100644 --- a/server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java +++ b/server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java @@ -21,7 +21,6 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.network.ListenerName; import org.apache.kafka.common.security.auth.SecurityProtocol; -import org.apache.kafka.server.config.ReplicationConfigs; import org.apache.kafka.server.util.Csv; import java.util.ArrayList; @@ -89,28 +88,6 @@ public class SocketServerConfigs { " so that one listener can be configured to advertise another listener's address." + " This can be useful in some cases where external load balancers are used.", LISTENERS_CONFIG); - - public static final String CONTROL_PLANE_LISTENER_NAME_CONFIG = "control.plane.listener.name"; - public static final String CONTROL_PLANE_LISTENER_NAME_DOC = String.format( - "Name of listener used for communication between controller and brokers. " + - "A broker will use the %s to locate the endpoint in %s list, to listen for connections from the controller. " + - "For example, if a broker's config is:%n" + - "listeners=INTERNAL://192.1.1.8:9092,EXTERNAL://10.1.1.5:9093,CONTROLLER://192.1.1.8:9094%n" + - "listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL,CONTROLLER:SSL%n" + - "control.plane.listener.name = CONTROLLER%n" + - "On startup, the broker will start listening on \"192.1.1.8:9094\" with security protocol \"SSL\".%n" + - "On the controller side, when it discovers a broker's published endpoints through ZooKeeper, it will use the %1$1s " + - "to find the endpoint, which it will use to establish connection to the broker.%n" + - "For example, if the broker's published endpoints on ZooKeeper are:%n" + - " \"endpoints\":[\"INTERNAL://broker1.example.com:9092\",\"EXTERNAL://broker1.example.com:9093\",\"CONTROLLER://broker1.example.com:9094\"]%n" + - " and the controller's config is:%n" + - "listener.security.protocol.map = INTERNAL:PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL%n" + - "control.plane.listener.name = CONTROLLER%n" + - "then the controller will use \"broker1.example.com:9094\" with security protocol \"SSL\" to connect to the broker.%n" + - "If not explicitly configured, the default value will be null and there will be no dedicated endpoints for controller connections.%n" + - "If explicitly configured, the value cannot be the same as the value of %s.", - CONTROL_PLANE_LISTENER_NAME_CONFIG, LISTENERS_CONFIG, ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG); - public static final String SOCKET_SEND_BUFFER_BYTES_CONFIG = "socket.send.buffer.bytes"; public static final int SOCKET_SEND_BUFFER_BYTES_DEFAULT = 100 * 1024; public static final String SOCKET_SEND_BUFFER_BYTES_DOC = "The SO_SNDBUF buffer of the socket server sockets. If the value is -1, the OS default will be used."; @@ -183,7 +160,6 @@ public class SocketServerConfigs { .define(LISTENERS_CONFIG, STRING, LISTENERS_DEFAULT, HIGH, LISTENERS_DOC) .define(ADVERTISED_LISTENERS_CONFIG, STRING, null, HIGH, ADVERTISED_LISTENERS_DOC) .define(LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, STRING, LISTENER_SECURITY_PROTOCOL_MAP_DEFAULT, LOW, LISTENER_SECURITY_PROTOCOL_MAP_DOC) - .define(CONTROL_PLANE_LISTENER_NAME_CONFIG, STRING, null, HIGH, CONTROL_PLANE_LISTENER_NAME_DOC) .define(SOCKET_SEND_BUFFER_BYTES_CONFIG, INT, SOCKET_SEND_BUFFER_BYTES_DEFAULT, HIGH, SOCKET_SEND_BUFFER_BYTES_DOC) .define(SOCKET_RECEIVE_BUFFER_BYTES_CONFIG, INT, SOCKET_RECEIVE_BUFFER_BYTES_DEFAULT, HIGH, SOCKET_RECEIVE_BUFFER_BYTES_DOC) .define(SOCKET_REQUEST_MAX_BYTES_CONFIG, INT, SOCKET_REQUEST_MAX_BYTES_DEFAULT, atLeast(1), HIGH, SOCKET_REQUEST_MAX_BYTES_DOC) From cf3121c11eb97cbafcbd96abb749f5cf56f4d333 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Fri, 27 Dec 2024 05:56:10 +0000 Subject: [PATCH 02/10] update upgrade.html --- docs/upgrade.html | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docs/upgrade.html b/docs/upgrade.html index 774976cf2d02d..74119a6c995f2 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -65,6 +65,8 @@
Notable changes in 4
  • The function onNewBatch in org.apache.kafka.clients.producer.Partitioner class was removed.
  • +
  • The control.plane.listener.name config was removed use controller.listener.names instead. +
  • Broker From 5f15b0a6c1bbf4a4c1196d2c455213aa6a40e91a Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Fri, 27 Dec 2024 11:32:20 +0000 Subject: [PATCH 03/10] remove controlPlaneAcceptor --- .../scala/kafka/network/SocketServer.scala | 76 +------------------ 1 file changed, 3 insertions(+), 73 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 03c61f6417ceb..1f06670cfb8ba 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -69,13 +69,6 @@ import scala.util.control.ControlThrowable * It is possible to configure multiple data-planes by specifying multiple "," separated endpoints for "listeners" in KafkaConfig. * Acceptor has N Processor threads that each have their own selector and read requests from sockets * M Handler threads that handle requests and produce responses back to the processor threads for writing. - * - control-plane : - * - Handles requests from controller. This is optional and can be configured by specifying "control.plane.listener.name". - * If not configured, the controller requests are handled by the data-plane. - * - The threading model is - * 1 Acceptor thread that handles new connections - * Acceptor has 1 Processor thread that has its own selector and read requests from the socket. - * 1 Handler thread that handles requests and produces responses back to the processor thread for writing. */ class SocketServer( val config: KafkaConfig, @@ -105,8 +98,6 @@ class SocketServer( // data-plane private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, DataPlaneAcceptor]() val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics) - // control-plane - private[network] var controlPlaneAcceptorOpt: Option[ControlPlaneAcceptor] = None private[this] val nextProcessorId: AtomicInteger = new AtomicInteger(0) val connectionQuotas = new ConnectionQuotas(config, time, metrics) @@ -135,17 +126,7 @@ class SocketServer( }.sum / dataPlaneProcessors.size } }) - if (config.requiresZookeeper) { - metricsGroup.newGauge(s"${ControlPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", () => SocketServer.this.synchronized { - val controlPlaneProcessorOpt = controlPlaneAcceptorOpt.map(a => a.processors(0)) - val ioWaitRatioMetricName = controlPlaneProcessorOpt.map { p => - metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags) - } - ioWaitRatioMetricName.map { metricName => - Option(metrics.metric(metricName)).fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0)) - }.getOrElse(Double.NaN) - }) - } + metricsGroup.newGauge("MemoryPoolAvailable", () => memoryPool.availableMemory) metricsGroup.newGauge("MemoryPoolUsed", () => memoryPool.size() - memoryPool.availableMemory) metricsGroup.newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", () => SocketServer.this.synchronized { @@ -157,17 +138,6 @@ class SocketServer( Option(metrics.metric(metricName)).fold(0.0)(m => m.metricValue.asInstanceOf[Double]) }.sum }) - if (config.requiresZookeeper) { - metricsGroup.newGauge(s"${ControlPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", () => SocketServer.this.synchronized { - val controlPlaneProcessorOpt = controlPlaneAcceptorOpt.map(a => a.processors(0)) - val expiredConnectionsKilledCountMetricNames = controlPlaneProcessorOpt.map { p => - metrics.metricName("expired-connections-killed-count", MetricsGroup, p.metricTags) - } - expiredConnectionsKilledCountMetricNames.map { metricName => - Option(metrics.metric(metricName)).fold(0.0)(m => m.metricValue.asInstanceOf[Double]) - }.getOrElse(0.0) - }) - } // Create acceptors and processors for the statically configured endpoints when the // SocketServer is constructed. Note that this just opens the ports and creates the data @@ -229,16 +199,14 @@ class SocketServer( } info("Enabling request processing.") - controlPlaneAcceptorOpt.foreach(chainAcceptorFuture) dataPlaneAcceptors.values().forEach(chainAcceptorFuture) FutureUtils.chainFuture(CompletableFuture.allOf(authorizerFutures.values.toArray: _*), allAuthorizerFuturesComplete) // Construct a future that will be completed when all Acceptors have been successfully started. // Alternately, if any of them fail to start, this future will be completed exceptionally. - val allAcceptors = dataPlaneAcceptors.values().asScala.toSeq ++ controlPlaneAcceptorOpt val enableFuture = new CompletableFuture[Void] - FutureUtils.chainFuture(CompletableFuture.allOf(allAcceptors.map(_.startedFuture).toArray: _*), enableFuture) + FutureUtils.chainFuture(CompletableFuture.allOf(dataPlaneAcceptors.values().asScala.toSeq.map(_.startedFuture).toArray: _*), enableFuture) enableFuture } @@ -270,9 +238,7 @@ class SocketServer( stopped = true info("Stopping socket server request processors") dataPlaneAcceptors.asScala.values.foreach(_.beginShutdown()) - controlPlaneAcceptorOpt.foreach(_.beginShutdown()) dataPlaneAcceptors.asScala.values.foreach(_.close()) - controlPlaneAcceptorOpt.foreach(_.close()) dataPlaneRequestChannel.clear() info("Stopped socket server request processors") } @@ -300,7 +266,7 @@ class SocketServer( if (acceptor != null) { acceptor.localPort } else { - controlPlaneAcceptorOpt.map(_.localPort).getOrElse(throw new KafkaException("Could not find listenerName : " + listenerName + " in data-plane or control-plane")) + throw new KafkaException("Could not find listenerName : " + listenerName + " in data-plane or control-plane") } } catch { case e: Exception => @@ -507,42 +473,6 @@ class DataPlaneAcceptor(socketServer: SocketServer, } } -object ControlPlaneAcceptor { - val ThreadPrefix = "control-plane" - val MetricPrefix = "ControlPlane" -} - -class ControlPlaneAcceptor(socketServer: SocketServer, - endPoint: EndPoint, - config: KafkaConfig, - nodeId: Int, - connectionQuotas: ConnectionQuotas, - time: Time, - requestChannel: RequestChannel, - metrics: Metrics, - credentialProvider: CredentialProvider, - logContext: LogContext, - memoryPool: MemoryPool, - apiVersionManager: ApiVersionManager) - extends Acceptor(socketServer, - endPoint, - config, - nodeId, - connectionQuotas, - time, - true, - requestChannel, - metrics, - credentialProvider, - logContext, - memoryPool, - apiVersionManager) { - - override def metricPrefix(): String = ControlPlaneAcceptor.MetricPrefix - override def threadPrefix(): String = ControlPlaneAcceptor.ThreadPrefix - -} - /** * Thread that accepts and configures new connections. There is one of these per endpoint. */ From c6b682d52895e0e3f04d09f69be0f608cebd5b69 Mon Sep 17 00:00:00 2001 From: TaiJu Wu Date: Fri, 27 Dec 2024 11:48:56 +0000 Subject: [PATCH 04/10] move upgrade note to 4.1 --- docs/upgrade.html | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/docs/upgrade.html b/docs/upgrade.html index 74119a6c995f2..1d6a30636ab21 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -19,6 +19,13 @@