Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,8 @@ class ControllerChannelManager(controllerEpoch: () => Int,
private def addNewBroker(broker: Broker): Unit = {
val messageQueue = new LinkedBlockingQueue[QueueItem]
debug(s"Controller ${config.brokerId} trying to connect to broker ${broker.id}")
val controllerToBrokerListenerName = config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName)
val controllerToBrokerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol)
val controllerToBrokerListenerName = config.interBrokerListenerName
val controllerToBrokerSecurityProtocol = config.interBrokerSecurityProtocol
val brokerNode = broker.node(controllerToBrokerListenerName)
val logContext = new LogContext(s"[Controller id=${config.brokerId}, targetBrokerId=${brokerNode.idString}] ")
val (networkClient, reconfigurableChannelBuilder) = {
Expand Down
99 changes: 4 additions & 95 deletions core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,6 @@ import scala.util.control.ControlThrowable
* It is possible to configure multiple data-planes by specifying multiple "," separated endpoints for "listeners" in KafkaConfig.
* Acceptor has N Processor threads that each have their own selector and read requests from sockets
* M Handler threads that handle requests and produce responses back to the processor threads for writing.
* - control-plane :
* - Handles requests from controller. This is optional and can be configured by specifying "control.plane.listener.name".
* If not configured, the controller requests are handled by the data-plane.
* - The threading model is
* 1 Acceptor thread that handles new connections
* Acceptor has 1 Processor thread that has its own selector and read requests from the socket.
* 1 Handler thread that handles requests and produces responses back to the processor thread for writing.
*/
class SocketServer(
val config: KafkaConfig,
Expand Down Expand Up @@ -105,10 +98,6 @@ class SocketServer(
// data-plane
private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, DataPlaneAcceptor]()
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics)
// control-plane
private[network] var controlPlaneAcceptorOpt: Option[ControlPlaneAcceptor] = None
val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ =>
new RequestChannel(20, ControlPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics))

private[this] val nextProcessorId: AtomicInteger = new AtomicInteger(0)
val connectionQuotas = new ConnectionQuotas(config, time, metrics)
Expand Down Expand Up @@ -137,17 +126,7 @@ class SocketServer(
}.sum / dataPlaneProcessors.size
}
})
if (config.requiresZookeeper) {
metricsGroup.newGauge(s"${ControlPlaneAcceptor.MetricPrefix}NetworkProcessorAvgIdlePercent", () => SocketServer.this.synchronized {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m1a2st please include this in #18365

val controlPlaneProcessorOpt = controlPlaneAcceptorOpt.map(a => a.processors(0))
val ioWaitRatioMetricName = controlPlaneProcessorOpt.map { p =>
metrics.metricName("io-wait-ratio", MetricsGroup, p.metricTags)
}
ioWaitRatioMetricName.map { metricName =>
Option(metrics.metric(metricName)).fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0))
}.getOrElse(Double.NaN)
})
}

metricsGroup.newGauge("MemoryPoolAvailable", () => memoryPool.availableMemory)
metricsGroup.newGauge("MemoryPoolUsed", () => memoryPool.size() - memoryPool.availableMemory)
metricsGroup.newGauge(s"${DataPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", () => SocketServer.this.synchronized {
Expand All @@ -159,17 +138,6 @@ class SocketServer(
Option(metrics.metric(metricName)).fold(0.0)(m => m.metricValue.asInstanceOf[Double])
}.sum
})
if (config.requiresZookeeper) {
metricsGroup.newGauge(s"${ControlPlaneAcceptor.MetricPrefix}ExpiredConnectionsKilledCount", () => SocketServer.this.synchronized {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@m1a2st please include this in #18365

val controlPlaneProcessorOpt = controlPlaneAcceptorOpt.map(a => a.processors(0))
val expiredConnectionsKilledCountMetricNames = controlPlaneProcessorOpt.map { p =>
metrics.metricName("expired-connections-killed-count", MetricsGroup, p.metricTags)
}
expiredConnectionsKilledCountMetricNames.map { metricName =>
Option(metrics.metric(metricName)).fold(0.0)(m => m.metricValue.asInstanceOf[Double])
}.getOrElse(0.0)
})
}

// Create acceptors and processors for the statically configured endpoints when the
// SocketServer is constructed. Note that this just opens the ports and creates the data
Expand All @@ -178,7 +146,6 @@ class SocketServer(
if (apiVersionManager.listenerType.equals(ListenerType.CONTROLLER)) {
config.controllerListeners.foreach(createDataPlaneAcceptorAndProcessors)
} else {
config.controlPlaneListener.foreach(createControlPlaneAcceptorAndProcessor)
config.dataPlaneListeners.foreach(createDataPlaneAcceptorAndProcessors)
}

Expand Down Expand Up @@ -232,16 +199,14 @@ class SocketServer(
}

info("Enabling request processing.")
controlPlaneAcceptorOpt.foreach(chainAcceptorFuture)
dataPlaneAcceptors.values().forEach(chainAcceptorFuture)
FutureUtils.chainFuture(CompletableFuture.allOf(authorizerFutures.values.toArray: _*),
allAuthorizerFuturesComplete)

// Construct a future that will be completed when all Acceptors have been successfully started.
// Alternately, if any of them fail to start, this future will be completed exceptionally.
val allAcceptors = dataPlaneAcceptors.values().asScala.toSeq ++ controlPlaneAcceptorOpt
val enableFuture = new CompletableFuture[Void]
FutureUtils.chainFuture(CompletableFuture.allOf(allAcceptors.map(_.startedFuture).toArray: _*), enableFuture)
FutureUtils.chainFuture(CompletableFuture.allOf(dataPlaneAcceptors.values().asScala.toArray.map(_.startedFuture): _*), enableFuture)
enableFuture
}

Expand All @@ -251,36 +216,20 @@ class SocketServer(
}
val parsedConfigs = config.valuesFromThisConfigWithPrefixOverride(endpoint.listenerName.configPrefix)
connectionQuotas.addListener(config, endpoint.listenerName)
val isPrivilegedListener = controlPlaneRequestChannelOpt.isEmpty &&
config.interBrokerListenerName == endpoint.listenerName
val isPrivilegedListener = config.interBrokerListenerName == endpoint.listenerName
val dataPlaneAcceptor = createDataPlaneAcceptor(endpoint, isPrivilegedListener, dataPlaneRequestChannel)
config.addReconfigurable(dataPlaneAcceptor)
dataPlaneAcceptor.configure(parsedConfigs)
dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
info(s"Created data-plane acceptor and processors for endpoint : ${endpoint.listenerName}")
}

private def createControlPlaneAcceptorAndProcessor(endpoint: EndPoint): Unit = synchronized {
if (stopped) {
throw new RuntimeException("Can't create new control plane acceptor and processor: SocketServer is stopped.")
}
connectionQuotas.addListener(config, endpoint.listenerName)
val controlPlaneAcceptor = createControlPlaneAcceptor(endpoint, controlPlaneRequestChannelOpt.get)
controlPlaneAcceptor.addProcessors(1)
controlPlaneAcceptorOpt = Some(controlPlaneAcceptor)
info(s"Created control-plane acceptor and processor for endpoint : ${endpoint.listenerName}")
}

private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap

protected def createDataPlaneAcceptor(endPoint: EndPoint, isPrivilegedListener: Boolean, requestChannel: RequestChannel): DataPlaneAcceptor = {
new DataPlaneAcceptor(this, endPoint, config, nodeId, connectionQuotas, time, isPrivilegedListener, requestChannel, metrics, credentialProvider, logContext, memoryPool, apiVersionManager)
}

private def createControlPlaneAcceptor(endPoint: EndPoint, requestChannel: RequestChannel): ControlPlaneAcceptor = {
new ControlPlaneAcceptor(this, endPoint, config, nodeId, connectionQuotas, time, requestChannel, metrics, credentialProvider, logContext, memoryPool, apiVersionManager)
}

/**
* Stop processing requests and new connections.
*/
Expand All @@ -289,11 +238,8 @@ class SocketServer(
stopped = true
info("Stopping socket server request processors")
dataPlaneAcceptors.asScala.values.foreach(_.beginShutdown())
controlPlaneAcceptorOpt.foreach(_.beginShutdown())
dataPlaneAcceptors.asScala.values.foreach(_.close())
controlPlaneAcceptorOpt.foreach(_.close())
dataPlaneRequestChannel.clear()
controlPlaneRequestChannelOpt.foreach(_.clear())
info("Stopped socket server request processors")
}
}
Expand All @@ -309,7 +255,6 @@ class SocketServer(
this.synchronized {
stopProcessingRequests()
dataPlaneRequestChannel.shutdown()
controlPlaneRequestChannelOpt.foreach(_.shutdown())
connectionQuotas.close()
}
info("Shutdown completed")
Expand All @@ -321,7 +266,7 @@ class SocketServer(
if (acceptor != null) {
acceptor.localPort
} else {
controlPlaneAcceptorOpt.map(_.localPort).getOrElse(throw new KafkaException("Could not find listenerName : " + listenerName + " in data-plane or control-plane"))
throw new KafkaException("Could not find listenerName : " + listenerName + " in data-plane.")
}
} catch {
case e: Exception =>
Expand Down Expand Up @@ -528,42 +473,6 @@ class DataPlaneAcceptor(socketServer: SocketServer,
}
}

object ControlPlaneAcceptor {
val ThreadPrefix = "control-plane"
val MetricPrefix = "ControlPlane"
}

class ControlPlaneAcceptor(socketServer: SocketServer,
endPoint: EndPoint,
config: KafkaConfig,
nodeId: Int,
connectionQuotas: ConnectionQuotas,
time: Time,
requestChannel: RequestChannel,
metrics: Metrics,
credentialProvider: CredentialProvider,
logContext: LogContext,
memoryPool: MemoryPool,
apiVersionManager: ApiVersionManager)
extends Acceptor(socketServer,
endPoint,
config,
nodeId,
connectionQuotas,
time,
true,
requestChannel,
metrics,
credentialProvider,
logContext,
memoryPool,
apiVersionManager) {

override def metricPrefix(): String = ControlPlaneAcceptor.MetricPrefix
override def threadPrefix(): String = ControlPlaneAcceptor.ThreadPrefix

}

/**
* Thread that accepts and configures new connections. There is one of these per endpoint.
*/
Expand Down
40 changes: 0 additions & 40 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -487,8 +487,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])

def interBrokerListenerName = getInterBrokerListenerNameAndSecurityProtocol._1
def interBrokerSecurityProtocol = getInterBrokerListenerNameAndSecurityProtocol._2
def controlPlaneListenerName = getControlPlaneListenerNameAndSecurityProtocol.map { case (listenerName, _) => listenerName }
def controlPlaneSecurityProtocol = getControlPlaneListenerNameAndSecurityProtocol.map { case (_, securityProtocol) => securityProtocol }
def saslMechanismInterBrokerProtocol = getString(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG)
val saslInterBrokerHandshakeRequestEnable = interBrokerProtocolVersion.isSaslInterBrokerHandshakeRequestEnabled

Expand Down Expand Up @@ -565,16 +563,9 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])

def saslMechanismControllerProtocol: String = getString(KRaftConfigs.SASL_MECHANISM_CONTROLLER_PROTOCOL_CONFIG)

def controlPlaneListener: Option[EndPoint] = {
controlPlaneListenerName.map { listenerName =>
listeners.filter(endpoint => endpoint.listenerName.value() == listenerName.value()).head
}
}

def dataPlaneListeners: Seq[EndPoint] = {
Comment thread
TaiJuWu marked this conversation as resolved.
listeners.filterNot { listener =>
val name = listener.listenerName.value()
name.equals(getString(SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG)) ||
controllerListenerNames.contains(name)
}
}
Expand Down Expand Up @@ -623,19 +614,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
}
}

private def getControlPlaneListenerNameAndSecurityProtocol: Option[(ListenerName, SecurityProtocol)] = {
Option(getString(SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG)) match {
case Some(name) =>
val listenerName = ListenerName.normalised(name)
val securityProtocol = effectiveListenerSecurityProtocolMap.getOrElse(listenerName,
throw new ConfigException(s"Listener with ${listenerName.value} defined in " +
s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG} not found in ${SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG}."))
Some(listenerName, securityProtocol)

case None => None
}
}

private def getSecurityProtocol(protocolName: String, configName: String): SecurityProtocol = {
try SecurityProtocol.forName(protocolName)
catch {
Expand Down Expand Up @@ -719,10 +697,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
}
}

def validateControlPlaneListenerEmptyForKRaft(): Unit = {
require(controlPlaneListenerName.isEmpty,
s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG} is not supported in KRaft mode.")
}
def validateControllerQuorumVotersMustContainNodeIdForKRaftController(): Unit = {
require(voterIds.isEmpty || voterIds.contains(nodeId),
s"If ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains the 'controller' role, the node id $nodeId must be included in the set of voters ${QuorumConfig.QUORUM_VOTERS_CONFIG}=${voterIds.asScala.toSet}")
Expand All @@ -744,7 +718,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
if (processRoles == Set(ProcessRole.BrokerRole)) {
// KRaft broker-only
validateQuorumVotersAndQuorumBootstrapServerForKRaft()
validateControlPlaneListenerEmptyForKRaft()
// nodeId must not appear in controller.quorum.voters
require(!voterIds.contains(nodeId),
s"If ${KRaftConfigs.PROCESS_ROLES_CONFIG} contains just the 'broker' role, the node id $nodeId must not be included in the set of voters ${QuorumConfig.QUORUM_VOTERS_CONFIG}=${voterIds.asScala.toSet}")
Expand All @@ -769,7 +742,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
} else if (processRoles == Set(ProcessRole.ControllerRole)) {
// KRaft controller-only
validateQuorumVotersAndQuorumBootstrapServerForKRaft()
validateControlPlaneListenerEmptyForKRaft()
// listeners should only contain listeners also enumerated in the controller listener
require(
effectiveAdvertisedControllerListeners.size == listeners.size,
Expand All @@ -788,7 +760,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
} else if (isKRaftCombinedMode) {
// KRaft combined broker and controller
validateQuorumVotersAndQuorumBootstrapServerForKRaft()
validateControlPlaneListenerEmptyForKRaft()
validateControllerQuorumVotersMustContainNodeIdForKRaftController()
validateAdvertisedControllerListenersNonEmptyForKRaftController()
validateControllerListenerNamesMustAppearInListenersForKRaftController()
Expand Down Expand Up @@ -820,17 +791,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
s"${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG} cannot use the nonroutable meta-address 0.0.0.0. "+
s"Use a routable IP address.")

// validate control.plane.listener.name config
if (controlPlaneListenerName.isDefined) {
require(advertisedBrokerListenerNames.contains(controlPlaneListenerName.get),
s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG} must be a listener name defined in ${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG}. " +
s"The valid options based on currently configured listeners are ${advertisedBrokerListenerNames.map(_.value).mkString(",")}")
// controlPlaneListenerName should be different from interBrokerListenerName
require(!controlPlaneListenerName.get.value().equals(interBrokerListenerName.value()),
s"${SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG}, when defined, should have a different value from the inter broker listener name. " +
s"Currently they both have the value ${controlPlaneListenerName.get}")
}

if (groupCoordinatorConfig.offsetTopicCompressionType == CompressionType.ZSTD)
require(interBrokerProtocolVersion.highestSupportedRecordVersion().value >= IBP_2_1_IV0.highestSupportedRecordVersion().value,
"offsets.topic.compression.codec zstd can only be used when inter.broker.protocol.version " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ class MetadataCacheControllerNodeProvider(
val quorumControllerNodeProvider: () => Option[ControllerInformation]
) extends ControllerNodeProvider {

private val zkControllerListenerName = config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName)
private val zkControllerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol)
private val zkControllerListenerName = config.interBrokerListenerName
private val zkControllerSecurityProtocol = config.interBrokerSecurityProtocol
private val zkControllerSaslMechanism = config.saslMechanismInterBrokerProtocol

val emptyZkControllerInfo = ControllerInformation(
Expand Down
3 changes: 0 additions & 3 deletions core/src/test/scala/unit/kafka/network/SocketServerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ class SocketServerTest {
logLevelToRestore = kafkaLogger.getLevel
Configurator.setLevel(kafkaLogger.getName, Level.TRACE)

assertTrue(server.controlPlaneRequestChannelOpt.isEmpty)
}

@AfterEach
Expand Down Expand Up @@ -1542,8 +1541,6 @@ class SocketServerTest {
val testableServer = new TestableSocketServer(time = time)
testableServer.enableRequestProcessing(Map.empty).get(1, TimeUnit.MINUTES)

assertTrue(testableServer.controlPlaneRequestChannelOpt.isEmpty)

val proxyServer = new ProxyServer(testableServer)
try {
val testableSelector = testableServer.testableSelector
Expand Down
17 changes: 0 additions & 17 deletions core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -275,23 +275,6 @@ class KafkaConfigTest {
assertEquals(SecurityProtocol.SASL_SSL, controllerEndpoint.securityProtocol)
}

@Test
def testControlPlaneListenerNameNotAllowedWithKRaft(): Unit = {
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker,controller")
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "SSL")
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "2")
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:9093")
props.setProperty(SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG, "SSL")

assertFalse(isValidKafkaConfig(props))
assertBadConfigContainingMessage(props, "control.plane.listener.name is not supported in KRaft mode.")

props.remove(SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG)
KafkaConfig.fromProps(props)
}

@Test
def testControllerListenerDefinedForKRaftController(): Unit = {
val props = new Properties()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ object SaslApiVersionsRequestTest {

// Configure control plane listener to make sure we have separate listeners for testing.
val serverProperties = new java.util.HashMap[String, String]()
serverProperties.put(SocketServerConfigs.CONTROL_PLANE_LISTENER_NAME_CONFIG, controlPlaneListenerName)
serverProperties.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, s"$controlPlaneListenerName:$securityProtocol,$securityProtocol:$securityProtocol")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think controlPlaneListenerName is unnecessary to this test, but this test is disabled for now. Maybe we can revisit it in KAFKA-17631

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, #18330 handles it already

serverProperties.put("listeners", s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0")
serverProperties.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, s"$securityProtocol://localhost:0,$controlPlaneListenerName://localhost:0")
Expand Down
Loading