Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,8 @@ project(':server') {
implementation project(':server-common')
implementation project(':storage')
implementation project(':group-coordinator')
implementation project(':group-coordinator:group-coordinator-api')
implementation libs.commonsValidator
implementation project(':transaction-coordinator')
implementation project(':raft')
implementation libs.metrics
Expand Down
3 changes: 3 additions & 0 deletions checkstyle/import-control-server.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@
<allow pkg="org.apache.kafka.network" />
<allow pkg="org.apache.kafka.storage.internals.log" />
</subpackage>
<subpackage name="utils">
<allow class="org.apache.commons.validator.routines.InetAddressValidator" />
</subpackage>
</subpackage>

<subpackage name="security">
Expand Down
11 changes: 11 additions & 0 deletions clients/src/main/java/org/apache/kafka/common/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -1655,6 +1655,16 @@ public static void require(boolean requirement) {
throw new IllegalArgumentException("requirement failed");
}

/**
* Checks requirement. Throw {@link IllegalArgumentException} if {@code requirement} failed.
* @param requirement Requirement to check.
* @param message String to include in the failure message
*/
public static void require(boolean requirement, String message) {
if (!requirement)
throw new IllegalArgumentException("requirement failed: " + message);
}

/**
* Merge multiple {@link ConfigDef} into one
* @param configDefs List of {@link ConfigDef}
Expand All @@ -1664,6 +1674,7 @@ public static ConfigDef mergeConfigs(List<ConfigDef> configDefs) {
configDefs.forEach(configDef -> configDef.configKeys().values().forEach(all::define));
return all;
}

/**
* A runnable that can throw checked exception.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public void onMetadataUpdate(
@SuppressWarnings("ThrowableNotThrown")
private void onMetadataVersionChanged(MetadataVersion metadataVersion) {
try {
this.config.validateWithMetadataVersion(metadataVersion);
this.config.validator().validateWithMetadataVersion(metadataVersion);
} catch (Throwable t) {
this.faultHandler.handleFault(
"Broker configuration does not support the cluster MetadataVersion", t);
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/cluster/Broker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,6 @@ case class Broker(id: Int, endPoints: Seq[EndPoint], rack: Option[String], featu
val interBrokerEndpoint: Endpoint = endPoint(config.interBrokerListenerName).toJava
val brokerEndpoints: util.List[Endpoint] = endPoints.toList.map(_.toJava).asJava
Broker.ServerInfo(clusterResource, id, brokerEndpoints, interBrokerEndpoint,
config.earlyStartListeners.map(_.value()).asJava)
config.earlyStartListeners().asScala.map(_.value()).asJava)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ class ControllerChannelManager(controllerEpoch: () => Int,
private def addNewBroker(broker: Broker): Unit = {
val messageQueue = new LinkedBlockingQueue[QueueItem]
debug(s"Controller ${config.brokerId} trying to connect to broker ${broker.id}")
val controllerToBrokerListenerName = config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName)
val controllerToBrokerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol)
val controllerToBrokerListenerName = config.controlPlaneListenerName.orElse(config.interBrokerListenerName)
val controllerToBrokerSecurityProtocol = config.controlPlaneSecurityProtocol.orElse(config.interBrokerSecurityProtocol)
val brokerNode = broker.node(controllerToBrokerListenerName)
val logContext = new LogContext(s"[Controller id=${config.brokerId}, targetBrokerId=${brokerNode.idString}] ")
val (networkClient, reconfigurableChannelBuilder) = {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1567,7 +1567,7 @@ object LogManager {

val cleanerConfig = LogCleaner.cleanerConfig(config)

new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile),
new LogManager(logDirs = config.logDirs.asScala.map(new File(_).getAbsoluteFile),
initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile),
configRepository = configRepository,
initialDefaultConfig = defaultLogConfig,
Expand Down
31 changes: 18 additions & 13 deletions core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,14 @@ import org.apache.kafka.common.utils.{KafkaThread, LogContext, Time, Utils}
import org.apache.kafka.common.{Endpoint, KafkaException, MetricName, Reconfigurable}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.config.{ServerConfigs, QuotaConfigs}
import org.apache.kafka.server.config.{QuotaConfigs, ServerConfigs}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.FutureUtils
import org.slf4j.event.Level

import scala.collection._
import scala.collection.mutable.ArrayBuffer
import scala.compat.java8.OptionConverters.RichOptionalGeneric
import scala.jdk.CollectionConverters._
import scala.util.control.ControlThrowable

Expand Down Expand Up @@ -102,7 +103,7 @@ class SocketServer(val config: KafkaConfig,
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics)
// control-plane
private[network] var controlPlaneAcceptorOpt: Option[ControlPlaneAcceptor] = None
val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ =>
val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.asScala.map(_ =>
new RequestChannel(20, ControlPlaneAcceptor.MetricPrefix, time, apiVersionManager.newRequestMetrics))

private[this] val nextProcessorId: AtomicInteger = new AtomicInteger(0)
Expand Down Expand Up @@ -171,10 +172,10 @@ class SocketServer(val config: KafkaConfig,
// structures. It does not start the acceptors and processors or their associated JVM
// threads.
if (apiVersionManager.listenerType.equals(ListenerType.CONTROLLER)) {
config.controllerListeners.foreach(createDataPlaneAcceptorAndProcessors)
config.controllerListeners.forEach(l => createDataPlaneAcceptorAndProcessors(EndPoint.fromJava(l)))
} else {
config.controlPlaneListener.foreach(createControlPlaneAcceptorAndProcessor)
config.dataPlaneListeners.foreach(createDataPlaneAcceptorAndProcessors)
config.controlPlaneListener.map(l => createControlPlaneAcceptorAndProcessor(EndPoint.fromJava(l)))
config.dataPlaneListeners.forEach(l => createDataPlaneAcceptorAndProcessors(EndPoint.fromJava(l)))
}

// Processors are now created by each Acceptor. However to preserve compatibility, we need to number the processors
Expand Down Expand Up @@ -266,7 +267,10 @@ class SocketServer(val config: KafkaConfig,
info(s"Created control-plane acceptor and processor for endpoint : ${endpoint.listenerName}")
}

private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap
private def endpoints = config.listeners.asScala.map(l => {
val endpoint = EndPoint.fromJava(l)
endpoint.listenerName -> endpoint
}).toMap

protected def createDataPlaneAcceptor(endPoint: EndPoint, isPrivilegedListener: Boolean, requestChannel: RequestChannel): DataPlaneAcceptor = {
new DataPlaneAcceptor(this, endPoint, config, nodeId, connectionQuotas, time, isPrivilegedListener, requestChannel, metrics, credentialProvider, logContext, memoryPool, apiVersionManager)
Expand Down Expand Up @@ -327,14 +331,15 @@ class SocketServer(val config: KafkaConfig,
/**
* This method is called to dynamically add listeners.
*/
def addListeners(listenersAdded: Seq[EndPoint]): Unit = synchronized {
def addListeners(listenersAdded: Seq[Endpoint]): Unit = synchronized {
if (stopped) {
throw new RuntimeException("can't add new listeners: SocketServer is stopped.")
}
info(s"Adding data-plane listeners for endpoints $listenersAdded")
listenersAdded.foreach { endpoint =>
listenersAdded.foreach { l =>
val endpoint = EndPoint.fromJava(l)
createDataPlaneAcceptorAndProcessors(endpoint)
val acceptor = dataPlaneAcceptors.get(endpoint)
val acceptor = dataPlaneAcceptors.get(l)
// There is no authorizer future for this new listener endpoint. So start the
// listener once all authorizer futures are complete.
allAuthorizerFuturesComplete.whenComplete((_, e) => {
Expand Down Expand Up @@ -371,8 +376,8 @@ class SocketServer(val config: KafkaConfig,
info(s"Updating maxConnectionsPerIp: $maxConnectionsPerIp")
connectionQuotas.updateMaxConnectionsPerIp(maxConnectionsPerIp)
}
val maxConnectionsPerIpOverrides = newConfig.maxConnectionsPerIpOverrides
if (maxConnectionsPerIpOverrides != oldConfig.maxConnectionsPerIpOverrides) {
val maxConnectionsPerIpOverrides = newConfig.maxConnectionsPerIpOverrides.asScala.view.mapValues(_.toInt).toMap
if (maxConnectionsPerIpOverrides != oldConfig.maxConnectionsPerIpOverrides.asScala.view.mapValues(_.toInt).toMap) {
info(s"Updating maxConnectionsPerIpOverrides: ${maxConnectionsPerIpOverrides.map { case (k, v) => s"$k=$v" }.mkString(",")}")
connectionQuotas.updateMaxConnectionsPerIpOverride(maxConnectionsPerIpOverrides)
}
Expand Down Expand Up @@ -1435,7 +1440,7 @@ object ConnectionQuotas {
class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extends Logging with AutoCloseable {

@volatile private var defaultMaxConnectionsPerIp: Int = config.maxConnectionsPerIp
@volatile private var maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides.map { case (host, count) => (InetAddress.getByName(host), count) }
@volatile private var maxConnectionsPerIpOverrides = config.maxConnectionsPerIpOverrides.asScala.map { case (host, count) => (InetAddress.getByName(host), count.toInt) }.toMap
@volatile private var brokerMaxConnections = config.maxConnections
private val interBrokerListenerName = config.interBrokerListenerName
private val counts = mutable.Map[InetAddress, Int]()
Expand Down Expand Up @@ -1473,7 +1478,7 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, metrics: Metrics) extend
}

private[network] def updateMaxConnectionsPerIpOverride(overrideQuotas: Map[String, Int]): Unit = {
maxConnectionsPerIpOverrides = overrideQuotas.map { case (host, count) => (InetAddress.getByName(host), count) }
maxConnectionsPerIpOverrides = overrideQuotas.map { case (host, count) => (InetAddress.getByName(host), count) }.toMap
}

private[network] def updateBrokerMaxConnections(maxConnections: Int): Unit = {
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/scala/kafka/raft/RaftManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ object KafkaRaftManager {
*/
private def hasDifferentLogDir(config: KafkaConfig): Boolean = {
!config
.logDirs
.logDirs.asScala
.map(Paths.get(_).toAbsolutePath)
.contains(Paths.get(config.metadataLogDir).toAbsolutePath)
}
Expand All @@ -95,7 +95,7 @@ object KafkaRaftManager {
*/
def maybeDeleteMetadataLogDir(config: KafkaConfig): Unit = {
// These constraints are enforced in KafkaServer, but repeating them here to guard against future callers
if (config.processRoles.nonEmpty) {
if (!config.processRoles.isEmpty) {
throw new RuntimeException("Not deleting metadata log dir since this node is in KRaft mode.")
} else if (!config.migrationEnabled) {
throw new RuntimeException("Not deleting metadata log dir since migrations are not enabled.")
Expand Down Expand Up @@ -171,7 +171,7 @@ class KafkaRaftManager[T](
val differentMetadataLogDir = KafkaRaftManager.hasDifferentLogDir(config)

// Or this node is only a controller
val isOnlyController = config.processRoles == Set(ProcessRole.ControllerRole)
val isOnlyController = config.processRoles == Utils.mkSet(ProcessRole.ControllerRole)

if (differentMetadataLogDir || isOnlyController) {
Some(KafkaRaftManager.lockDataDir(new File(config.metadataLogDir)))
Expand Down Expand Up @@ -260,8 +260,8 @@ class KafkaRaftManager[T](
}

private def buildNetworkClient(): (ListenerName, NetworkClient) = {
val controllerListenerName = new ListenerName(config.controllerListenerNames.head)
val controllerSecurityProtocol = config.effectiveListenerSecurityProtocolMap.getOrElse(
val controllerListenerName = new ListenerName(config.controllerListenerNames.asScala.head)
val controllerSecurityProtocol = config.effectiveListenerSecurityProtocolMap.asScala.getOrElse(
controllerListenerName,
SecurityProtocol.forName(controllerListenerName.value())
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ class BrokerLifecycleManager(
setFeatures(features).
setIncarnationId(incarnationId).
setListeners(_advertisedListeners).
setRack(rack.orNull).
setRack(rack.orElse(null)).
setPreviousBrokerEpoch(previousBrokerEpoch.orElse(-1L)).
setLogDirs(sortedLogDirs)
if (isDebugEnabled) {
Expand Down
19 changes: 12 additions & 7 deletions core/src/main/scala/kafka/server/BrokerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.kafka.common.security.token.delegation.internals.DelegationTok
import org.apache.kafka.common.utils.{LogContext, Time}
import org.apache.kafka.common.{ClusterResource, TopicPartition, Uuid}
import org.apache.kafka.coordinator.group.metrics.{GroupCoordinatorMetrics, GroupCoordinatorRuntimeMetrics}
import org.apache.kafka.coordinator.group.{CoordinatorRecord, GroupCoordinator, GroupCoordinatorConfig, GroupCoordinatorService, CoordinatorRecordSerde}
import org.apache.kafka.coordinator.group.{CoordinatorRecord, CoordinatorRecordSerde, GroupCoordinator, GroupCoordinatorConfig, GroupCoordinatorService}
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker, MetadataPublisher}
import org.apache.kafka.metadata.{BrokerState, ListenerInfo, VersionRange}
import org.apache.kafka.security.CredentialProvider
Expand All @@ -56,9 +56,10 @@ import java.util
import java.util.Optional
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.{Condition, ReentrantLock}
import java.util.concurrent.{CompletableFuture, ExecutionException, TimeoutException, TimeUnit}
import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit, TimeoutException}
import java.util.stream.Collectors
import scala.collection.Map
import scala.compat.java8.OptionConverters.RichOptionForJava8
import scala.compat.java8.OptionConverters.{RichOptionForJava8, RichOptionalGeneric}
import scala.jdk.CollectionConverters._


Expand Down Expand Up @@ -255,7 +256,7 @@ class BrokerServer(
clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas)

val listenerInfo = ListenerInfo.create(Optional.of(config.interBrokerListenerName.value()),
config.effectiveAdvertisedListeners.map(_.toJava).asJava).
config.effectiveAdvertisedListeners).
withWildcardHostnamesResolved().
withEphemeralPortsCorrected(name => socketServer.boundPort(new ListenerName(name)))

Expand Down Expand Up @@ -386,7 +387,7 @@ class BrokerServer(
})

// Create and initialize an authorizer if one is configured.
authorizer = config.createNewAuthorizer()
authorizer = config.createNewAuthorizer().asScala
authorizer.foreach(_.configure(config.originals))

// The FetchSessionCache is divided into config.numIoThreads shards, each responsible
Expand Down Expand Up @@ -530,13 +531,17 @@ class BrokerServer(
// authorizer future is completed.
val endpointReadyFutures = {
val builder = new EndpointReadyFutures.Builder()
val earlyStartListeners: util.Set[String] = config.earlyStartListeners.stream()
.map(_.value())
.collect(Collectors.toSet())

builder.build(authorizer.asJava,
new KafkaAuthorizerServerInfo(
new ClusterResource(clusterId),
config.nodeId,
listenerInfo.listeners().values(),
listenerInfo.firstListener(),
config.earlyStartListeners.map(_.value()).asJava))
earlyStartListeners))
}
val authorizerFutures = endpointReadyFutures.futures().asScala.toMap
val enableRequestProcessingFuture = socketServer.enableRequestProcessing(authorizerFutures)
Expand Down Expand Up @@ -621,7 +626,7 @@ class BrokerServer(

protected def createRemoteLogManager(): Option[RemoteLogManager] = {
if (config.remoteLogManagerConfig.isRemoteStorageSystemEnabled()) {
Some(new RemoteLogManager(config, config.brokerId, config.logDirs.head, clusterId, time,
Some(new RemoteLogManager(config, config.brokerId, config.logDirs.asScala.head, clusterId, time,
(tp: TopicPartition) => logManager.getLog(tp).asJava,
(tp: TopicPartition, remoteLogStartOffset: java.lang.Long) => {
logManager.getLog(tp).foreach { log =>
Expand Down
15 changes: 10 additions & 5 deletions core/src/main/scala/kafka/server/ControllerServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import java.util
import java.util.{Optional, OptionalLong}
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{CompletableFuture, TimeUnit}
import java.util.stream.Collectors
import scala.compat.java8.OptionConverters._
import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -159,7 +160,7 @@ class ControllerServer(
metricsGroup.newGauge("linux-disk-write-bytes", () => linuxIoMetricsCollector.writeBytes())
}

authorizer = config.createNewAuthorizer()
authorizer = config.createNewAuthorizer().asScala
authorizer.foreach(_.configure(config.originals))

metadataCache = MetadataCache.kRaftMetadataCache(config.nodeId)
Expand Down Expand Up @@ -187,20 +188,24 @@ class ControllerServer(
credentialProvider,
apiVersionManager)

val listenerInfo = ListenerInfo.create(config.controllerListeners.map(_.toJava).asJava).
val listenerInfo = ListenerInfo.create(config.controllerListeners).
withWildcardHostnamesResolved().
withEphemeralPortsCorrected(name => socketServer.boundPort(new ListenerName(name)))
socketServerFirstBoundPortFuture.complete(listenerInfo.firstListener().port())

val endpointReadyFutures = {
val builder = new EndpointReadyFutures.Builder()
val earlyStartListeners: util.Set[String] = config.earlyStartListeners.stream()
.map(_.value())
.collect(Collectors.toSet())

builder.build(authorizer.asJava,
new KafkaAuthorizerServerInfo(
new ClusterResource(clusterId),
config.nodeId,
listenerInfo.listeners().values(),
listenerInfo.firstListener(),
config.earlyStartListeners.map(_.value()).asJava))
earlyStartListeners))
}

sharedServer.startForController()
Expand Down Expand Up @@ -234,7 +239,7 @@ class ControllerServer(
OptionalLong.empty()
}

val maxIdleIntervalNs = config.metadataMaxIdleIntervalNs.fold(OptionalLong.empty)(OptionalLong.of)
val maxIdleIntervalNs = config.metadataMaxIdleIntervalNs.asScala.fold(OptionalLong.empty)(interval => OptionalLong.of(interval))

quorumControllerMetrics = new QuorumControllerMetrics(Optional.of(KafkaYammerMetrics.defaultRegistry), time, config.migrationEnabled)

Expand Down Expand Up @@ -278,7 +283,7 @@ class ControllerServer(

if (config.migrationEnabled) {
val zkClient = KafkaZkClient.createZkClient("KRaft Migration", time, config, KafkaServer.zkClientConfigFromKafkaConfig(config))
val zkConfigEncoder = config.passwordEncoderSecret match {
val zkConfigEncoder = config.passwordEncoderSecret.asScala match {
case Some(secret) => PasswordEncoder.encrypting(secret,
config.passwordEncoderKeyFactoryAlgorithm,
config.passwordEncoderCipherAlgorithm,
Expand Down
Loading