From 1908217dbb7c0f0c4efb8c63c537e55bff9d3b8e Mon Sep 17 00:00:00 2001 From: m1a2st Date: Wed, 15 Jan 2025 20:07:39 +0800 Subject: [PATCH 01/13] fix the error test --- .../kafka/controller/KafkaController.scala | 4 +-- .../main/scala/kafka/server/KafkaConfig.scala | 32 ++++--------------- .../unit/kafka/server/KafkaConfigTest.scala | 23 ++++++++++--- 3 files changed, 27 insertions(+), 32 deletions(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 973a2ec648242..2afa793fe69f8 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -416,7 +416,7 @@ class KafkaController(val config: KafkaConfig, FeatureZNodeStatus.Enabled, brokerFeatures.defaultFinalizedFeatures.asScala.map { case (k, v) => (k, v.shortValue()) } )) - featureCache.waitUntilFeatureEpochOrThrow(newVersion, config.zkConnectionTimeoutMs) + featureCache.waitUntilFeatureEpochOrThrow(newVersion, 0) } else { val existingFeatureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get) val newFeatures = existingFeatureZNode.status match { @@ -431,7 +431,7 @@ class KafkaController(val config: KafkaConfig, val newFeatureZNode = FeatureZNode(config.interBrokerProtocolVersion, FeatureZNodeStatus.Enabled, newFeatures) if (!newFeatureZNode.equals(existingFeatureZNode)) { val newVersion = updateFeatureZNode(newFeatureZNode) - featureCache.waitUntilFeatureEpochOrThrow(newVersion, config.zkConnectionTimeoutMs) + featureCache.waitUntilFeatureEpochOrThrow(newVersion, 0) } } } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 563ebd603b8af..4d2d0d1b31ff4 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -43,7 +43,7 @@ import org.apache.kafka.security.authorizer.AuthorizerUtils import org.apache.kafka.server.ProcessRole import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.common.MetadataVersion -import org.apache.kafka.server.config.{AbstractKafkaConfig, DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ShareCoordinatorConfig, ZkConfigs} +import org.apache.kafka.server.config.{AbstractKafkaConfig, DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ShareCoordinatorConfig} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.metrics.MetricConfigs import org.apache.kafka.server.util.Csv @@ -186,15 +186,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) private[server] def valuesFromThisConfig: util.Map[String, _] = super.values def valuesFromThisConfigWithPrefixOverride(prefix: String): util.Map[String, AnyRef] = super.valuesWithPrefixOverride(prefix) - - /** ********* Zookeeper Configuration ***********/ - val zkConnect: String = getString(ZkConfigs.ZK_CONNECT_CONFIG) - val zkSessionTimeoutMs: Int = getInt(ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG) - val zkConnectionTimeoutMs: Int = - Option(getInt(ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_CONFIG)).map(_.toInt).getOrElse(getInt(ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG)) - val zkEnableSecureAcls: Boolean = getBoolean(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG) - val zkMaxInFlightRequests: Int = getInt(ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG) - + private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this) def remoteLogManagerConfig = _remoteLogManagerConfig @@ -635,22 +627,12 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) if (nodeId != brokerId) { throw new ConfigException(s"You must set `${KRaftConfigs.NODE_ID_CONFIG}` to the same value as `${ServerConfigs.BROKER_ID_CONFIG}`.") } - if (requiresZookeeper) { - if (zkConnect == null) { - throw new ConfigException(s"Missing required configuration `${ZkConfigs.ZK_CONNECT_CONFIG}` which has no default value.") - } - if (brokerIdGenerationEnable) { - require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must be greater than or equal to -1 and not greater than reserved.broker.max.id") - } else { - require(brokerId >= 0, "broker.id must be greater than or equal to 0") - } - } else { - // KRaft-based metadata quorum - if (nodeId < 0) { - throw new ConfigException(s"Missing configuration `${KRaftConfigs.NODE_ID_CONFIG}` which is required " + - s"when `process.roles` is defined (i.e. when running in KRaft mode).") - } + // KRaft-based metadata quorum + if (nodeId < 0) { + throw new ConfigException(s"Missing configuration `${KRaftConfigs.NODE_ID_CONFIG}` which is required " + + s"when `process.roles` is defined (i.e. when running in KRaft mode).") } + require(logRollTimeMillis >= 1, "log.roll.ms must be greater than or equal to 1") require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be greater than or equal to 0") require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, greater than or equal to 1") diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 2926b0471809a..7a50bcf8160ba 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -773,7 +773,12 @@ class KafkaConfigTest { def testFromPropsInvalid(): Unit = { def baseProperties: Properties = { val validRequiredProperties = new Properties() - validRequiredProperties.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "127.0.0.1:2181") + validRequiredProperties.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller") + validRequiredProperties.setProperty(KRaftConfigs.NODE_ID_CONFIG, "0") + validRequiredProperties.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") + validRequiredProperties.setProperty("controller.quorum.bootstrap.servers", "localhost:9093") + validRequiredProperties.setProperty("listeners", "CONTROLLER://:9093") + validRequiredProperties.setProperty("advertised.listeners", "CONTROLLER://127.0.0.1:9093") validRequiredProperties } // to ensure a basis is valid - bootstraps all needed validation @@ -1070,7 +1075,12 @@ class KafkaConfigTest { def testDynamicLogConfigs(): Unit = { def baseProperties: Properties = { val validRequiredProperties = new Properties() - validRequiredProperties.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "127.0.0.1:2181") + validRequiredProperties.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller") + validRequiredProperties.setProperty(KRaftConfigs.NODE_ID_CONFIG, "0") + validRequiredProperties.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") + validRequiredProperties.setProperty("controller.quorum.bootstrap.servers", "localhost:9093") + validRequiredProperties.setProperty("listeners", "CONTROLLER://:9093") + validRequiredProperties.setProperty("advertised.listeners", "CONTROLLER://127.0.0.1:9093") validRequiredProperties } @@ -1180,8 +1190,6 @@ class KafkaConfigTest { defaults.setProperty(MetricConfigs.METRIC_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.toString) val config = KafkaConfig.fromProps(defaults) - assertEquals("127.0.0.1:2181", config.zkConnect) - assertEquals(1234, config.zkConnectionTimeoutMs) assertEquals(false, config.brokerIdGenerationEnable) assertEquals(1, config.maxReservedBrokerId) assertEquals(1, config.brokerId) @@ -1568,7 +1576,12 @@ class KafkaConfigTest { @Test def testSaslJwksEndpointRetryDefaults(): Unit = { val props = new Properties() - props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181") + props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller") + props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "0") + props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") + props.setProperty("controller.quorum.bootstrap.servers", "localhost:9093") + props.setProperty("listeners", "CONTROLLER://:9093") + props.setProperty("advertised.listeners", "CONTROLLER://127.0.0.1:9093") val config = KafkaConfig.fromProps(props) assertNotNull(config.getLong(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MS)) assertNotNull(config.getLong(SaslConfigs.SASL_OAUTHBEARER_JWKS_ENDPOINT_RETRY_BACKOFF_MAX_MS)) From dbc87c44b0e1f19b557338502c7e1e354d8aa412 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Wed, 15 Jan 2025 20:13:49 +0800 Subject: [PATCH 02/13] remove the zookeeper comment --- core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 4d2d0d1b31ff4..f1abdb485fc6b 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -731,7 +731,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) val listenerNames = listeners.map(_.listenerName).toSet if (processRoles.isEmpty || processRoles.contains(ProcessRole.BrokerRole)) { - // validations for all broker setups (i.e. ZooKeeper and KRaft broker-only and KRaft co-located) + // validations for all broker setups (i.e. KRaft broker-only and KRaft co-located) validateAdvertisedBrokerListenersNonEmptyForBroker() require(advertisedBrokerListenerNames.contains(interBrokerListenerName), s"${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} must be a listener name defined in ${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG}. " + From 7bc0660c1db8516ea4bfb049caf16689697a1ea7 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Thu, 16 Jan 2025 08:24:30 +0800 Subject: [PATCH 03/13] resolve conflict --- core/src/main/scala/kafka/controller/KafkaController.scala | 4 ++-- core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 2afa793fe69f8..8091e709e530a 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -416,7 +416,7 @@ class KafkaController(val config: KafkaConfig, FeatureZNodeStatus.Enabled, brokerFeatures.defaultFinalizedFeatures.asScala.map { case (k, v) => (k, v.shortValue()) } )) - featureCache.waitUntilFeatureEpochOrThrow(newVersion, 0) + featureCache.waitUntilFeatureEpochOrThrow(newVersion, 18000) } else { val existingFeatureZNode = FeatureZNode.decode(mayBeFeatureZNodeBytes.get) val newFeatures = existingFeatureZNode.status match { @@ -431,7 +431,7 @@ class KafkaController(val config: KafkaConfig, val newFeatureZNode = FeatureZNode(config.interBrokerProtocolVersion, FeatureZNodeStatus.Enabled, newFeatures) if (!newFeatureZNode.equals(existingFeatureZNode)) { val newVersion = updateFeatureZNode(newFeatureZNode) - featureCache.waitUntilFeatureEpochOrThrow(newVersion, 0) + featureCache.waitUntilFeatureEpochOrThrow(newVersion, 18000) } } } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 8bf908a5bec13..3fe43843c76b4 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -44,7 +44,7 @@ import org.apache.kafka.security.authorizer.AuthorizerUtils import org.apache.kafka.server.ProcessRole import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.common.MetadataVersion -import org.apache.kafka.server.config.{AbstractKafkaConfig, DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ZkConfigs} +import org.apache.kafka.server.config.{AbstractKafkaConfig, DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.metrics.MetricConfigs import org.apache.kafka.server.util.Csv From c9b8a346e4e24b7d643805399b8d56f8a0c9ea50 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Thu, 16 Jan 2025 08:27:26 +0800 Subject: [PATCH 04/13] update some missing --- core/src/main/scala/kafka/server/KafkaConfig.scala | 3 +-- core/src/main/scala/kafka/server/Server.scala | 8 +------- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 3fe43843c76b4..4725d1f292e8a 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -224,7 +224,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) val controllerPerformanceAlwaysLogThresholdMs: Long = getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS) def requiresZookeeper: Boolean = processRoles.isEmpty - def usesSelfManagedQuorum: Boolean = processRoles.nonEmpty private def parseProcessRoles(): Set[ProcessRole] = { val roles = getList(KRaftConfigs.PROCESS_ROLES_CONFIG).asScala.map { @@ -602,7 +601,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) .map { case (listenerName, protocolName) => ListenerName.normalised(listenerName) -> getSecurityProtocol(protocolName, SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG) } - if (usesSelfManagedQuorum && !originals.containsKey(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG)) { + if (!originals.containsKey(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG)) { // Nothing was specified explicitly for listener.security.protocol.map, so we are using the default value, // and we are using KRaft. // Add PLAINTEXT mappings for controller listeners as long as there is no SSL or SASL_{PLAINTEXT,SSL} in use diff --git a/core/src/main/scala/kafka/server/Server.scala b/core/src/main/scala/kafka/server/Server.scala index d85060cc72db2..a4e3adecd823d 100644 --- a/core/src/main/scala/kafka/server/Server.scala +++ b/core/src/main/scala/kafka/server/Server.scala @@ -69,13 +69,7 @@ object Server { ): KafkaMetricsContext = { val contextLabels = new java.util.HashMap[String, Object] contextLabels.put(ClusterIdLabel, clusterId) - - if (config.usesSelfManagedQuorum) { - contextLabels.put(NodeIdLabel, config.nodeId.toString) - } else { - contextLabels.put(BrokerIdLabel, config.brokerId.toString) - } - + contextLabels.put(BrokerIdLabel, config.brokerId.toString) contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)) new KafkaMetricsContext(MetricsPrefix, contextLabels) } From 57c63342b3caf86e758658dc71b7a34bc6de5b4c Mon Sep 17 00:00:00 2001 From: m1a2st Date: Thu, 16 Jan 2025 08:29:33 +0800 Subject: [PATCH 05/13] do more clean up --- core/src/main/scala/kafka/server/DynamicBrokerConfig.scala | 2 +- core/src/main/scala/kafka/server/KafkaConfig.scala | 2 -- core/src/main/scala/kafka/server/MetadataSupport.scala | 4 ++-- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index ea99be3bcb15a..4d70a8690ee01 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -1004,7 +1004,7 @@ class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable wi } private def verifyListenerRegistrationAlterationSupported(): Unit = { - if (!server.config.requiresZookeeper) { + if (true) { throw new ConfigException("Advertised listeners cannot be altered when using a " + "Raft-based metadata quorum.") } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 4725d1f292e8a..f4b70044b3f1c 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -223,8 +223,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) val controllerPerformanceSamplePeriodMs: Long = getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS) val controllerPerformanceAlwaysLogThresholdMs: Long = getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS) - def requiresZookeeper: Boolean = processRoles.isEmpty - private def parseProcessRoles(): Set[ProcessRole] = { val roles = getList(KRaftConfigs.PROCESS_ROLES_CONFIG).asScala.map { case "broker" => ProcessRole.BrokerRole diff --git a/core/src/main/scala/kafka/server/MetadataSupport.scala b/core/src/main/scala/kafka/server/MetadataSupport.scala index 83a52e83f69a0..f1999696e1a08 100644 --- a/core/src/main/scala/kafka/server/MetadataSupport.scala +++ b/core/src/main/scala/kafka/server/MetadataSupport.scala @@ -77,7 +77,7 @@ case class ZkSupport(adminManager: ZkAdminManager, override def requireRaftOrThrow(createException: => Exception): RaftSupport = throw createException override def ensureConsistentWith(config: KafkaConfig): Unit = { - if (!config.requiresZookeeper) { + if (false) { throw new IllegalStateException("Config specifies Raft but metadata support instance is for ZooKeeper") } } @@ -97,7 +97,7 @@ case class RaftSupport(fwdMgr: ForwardingManager, override def requireRaftOrThrow(createException: => Exception): RaftSupport = this override def ensureConsistentWith(config: KafkaConfig): Unit = { - if (config.requiresZookeeper) { + if (false) { throw new IllegalStateException("Config specifies ZooKeeper but metadata support instance is for Raft") } } From 5b687b3885c8fb63a9bf262151d1bf7f98d7b3c9 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Thu, 16 Jan 2025 19:38:49 +0800 Subject: [PATCH 06/13] resolve the conflict --- .../main/scala/kafka/server/KafkaConfig.scala | 17 +++-------------- .../unit/kafka/server/KafkaConfigTest.scala | 1 - 2 files changed, 3 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index e243d40bbb2ff..c0bc365f75e3d 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -44,7 +44,7 @@ import org.apache.kafka.security.authorizer.AuthorizerUtils import org.apache.kafka.server.ProcessRole import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.common.MetadataVersion -import org.apache.kafka.server.config.{AbstractKafkaConfig, DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ZkConfigs} +import org.apache.kafka.server.config.{AbstractKafkaConfig, DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.metrics.MetricConfigs import org.apache.kafka.server.util.Csv @@ -188,14 +188,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) def valuesFromThisConfigWithPrefixOverride(prefix: String): util.Map[String, AnyRef] = super.valuesWithPrefixOverride(prefix) - /** ********* Zookeeper Configuration ***********/ - val zkConnect: String = getString(ZkConfigs.ZK_CONNECT_CONFIG) - val zkSessionTimeoutMs: Int = getInt(ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG) - val zkConnectionTimeoutMs: Int = - Option(getInt(ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_CONFIG)).map(_.toInt).getOrElse(getInt(ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG)) - val zkEnableSecureAcls: Boolean = getBoolean(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG) - val zkMaxInFlightRequests: Int = getInt(ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG) - private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this) def remoteLogManagerConfig = _remoteLogManagerConfig @@ -231,9 +223,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) val controllerPerformanceSamplePeriodMs: Long = getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS) val controllerPerformanceAlwaysLogThresholdMs: Long = getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS) - def requiresZookeeper: Boolean = processRoles.isEmpty - def usesSelfManagedQuorum: Boolean = processRoles.nonEmpty - private def parseProcessRoles(): Set[ProcessRole] = { val roles = getList(KRaftConfigs.PROCESS_ROLES_CONFIG).asScala.map { case "broker" => ProcessRole.BrokerRole @@ -610,7 +599,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) .map { case (listenerName, protocolName) => ListenerName.normalised(listenerName) -> getSecurityProtocol(protocolName, SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG) } - if (usesSelfManagedQuorum && !originals.containsKey(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG)) { + if (!originals.containsKey(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG)) { // Nothing was specified explicitly for listener.security.protocol.map, so we are using the default value, // and we are using KRaft. // Add PLAINTEXT mappings for controller listeners as long as there is no SSL or SASL_{PLAINTEXT,SSL} in use @@ -734,7 +723,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) val listenerNames = listeners.map(_.listenerName).toSet if (processRoles.isEmpty || processRoles.contains(ProcessRole.BrokerRole)) { - // validations for all broker setups (i.e. ZooKeeper and KRaft broker-only and KRaft co-located) + // validations for all broker setups (i.e. KRaft broker-only and KRaft co-located) validateAdvertisedBrokerListenersNonEmptyForBroker() require(advertisedBrokerListenerNames.contains(interBrokerListenerName), s"${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} must be a listener name defined in ${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG}. " + diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 971018ffc190f..aaa4bb253849f 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -1198,7 +1198,6 @@ class KafkaConfigTest { defaults.setProperty(MetricConfigs.METRIC_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.toString) val config = KafkaConfig.fromProps(defaults) - assertEquals(1234, config.zkConnectionTimeoutMs) assertEquals(false, config.brokerIdGenerationEnable) assertEquals(1, config.maxReservedBrokerId) assertEquals(1, config.brokerId) From 719905aa6e909f7ba29552207af1aa5b7aed3ec9 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Sat, 18 Jan 2025 21:50:29 +0800 Subject: [PATCH 07/13] addressed by comments --- core/src/main/scala/kafka/controller/KafkaController.scala | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 core/src/main/scala/kafka/controller/KafkaController.scala diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala deleted file mode 100644 index e69de29bb2d1d..0000000000000 From 964449118def900942c9d5492c37552b8312d386 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Wed, 22 Jan 2025 14:18:51 +0800 Subject: [PATCH 08/13] addressed by comment --- .../DescribeAuthorizedOperationsTest.scala | 3 +- .../api/PlaintextAdminIntegrationTest.scala | 6 ++-- .../api/SaslMultiMechanismConsumerTest.scala | 2 -- .../kafka/api/SaslSslConsumerTest.scala | 2 -- .../server/DynamicBrokerConfigTest.scala | 2 +- .../unit/kafka/server/KafkaConfigTest.scala | 29 +++++++++---------- .../apache/kafka/server/config/ZkConfigs.java | 25 ---------------- 7 files changed, 19 insertions(+), 50 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala index 0f23b93e31cf6..c7426c0d78efb 100644 --- a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala +++ b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala @@ -27,7 +27,7 @@ import org.apache.kafka.common.utils.Utils import org.apache.kafka.metadata.authorizer.StandardAuthorizer import org.apache.kafka.security.authorizer.AclEntry import org.apache.kafka.server.authorizer.Authorizer -import org.apache.kafka.server.config.{ServerConfigs, ZkConfigs} +import org.apache.kafka.server.config.ServerConfigs import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNull} import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo} @@ -79,7 +79,6 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslS import DescribeAuthorizedOperationsTest._ override val brokerCount = 1 - this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true") this.serverConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, classOf[StandardAuthorizer].getName) var client: Admin = _ diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 0f4174e1250d2..a11dee5097533 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -53,7 +53,7 @@ import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEX import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig} import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.security.authorizer.AclEntry -import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs, ServerLogConfigs, ZkConfigs} +import org.apache.kafka.server.config.{KRaftConfigs, QuotaConfig, ServerConfigs, ServerLogConfigs} import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogFileUtils} import org.apache.kafka.test.TestUtils.{DEFAULT_MAX_WAIT_MS, assertFutureThrows} import org.apache.logging.log4j.core.config.Configurator @@ -4082,7 +4082,7 @@ object PlaintextAdminIntegrationTest { new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4"), OpType.SET) )) alterConfigs.put(topicResource2, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy"), OpType.SET))) - alterConfigs.put(brokerResource, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181"), OpType.SET))) + alterConfigs.put(brokerResource, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(KRaftConfigs.NODE_ID_CONFIG, "123"), OpType.SET))) var alterResult = admin.incrementalAlterConfigs(alterConfigs) assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.values.keySet) @@ -4111,7 +4111,7 @@ object PlaintextAdminIntegrationTest { new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4"), OpType.SET) )) alterConfigs.put(topicResource2, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "gzip"), OpType.SET))) - alterConfigs.put(brokerResource, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181"), OpType.SET))) + alterConfigs.put(brokerResource, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(KRaftConfigs.NODE_ID_CONFIG, "123"), OpType.SET))) alterResult = admin.incrementalAlterConfigs(alterConfigs, new AlterConfigsOptions().validateOnly(true)) assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.values.keySet) diff --git a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala index 30a33c2ab647c..b41ccb6316caf 100644 --- a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala @@ -15,7 +15,6 @@ package kafka.api import kafka.security.JaasTestUtils import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.common.security.auth.SecurityProtocol -import org.apache.kafka.server.config.ZkConfigs import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.MethodSource @@ -26,7 +25,6 @@ import scala.jdk.CollectionConverters._ class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslSetup { private val kafkaClientSaslMechanism = "PLAIN" private val kafkaServerSaslMechanisms = List("GSSAPI", "PLAIN") - this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true") override protected def securityProtocol = SecurityProtocol.SASL_SSL override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks")) override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism)) diff --git a/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala index 460ebe2cb4e75..22c3077f4f9b8 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala @@ -15,12 +15,10 @@ package kafka.api import kafka.security.JaasTestUtils import kafka.utils.TestUtils import org.apache.kafka.common.security.auth.SecurityProtocol -import org.apache.kafka.server.config.ZkConfigs import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout} @Timeout(600) class SaslSslConsumerTest extends BaseConsumerTest with SaslSetup { - this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true") override protected def securityProtocol = SecurityProtocol.SASL_SSL override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks")) diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 10b42f96b4e54..e3b99f4753ece 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -253,7 +253,7 @@ class DynamicBrokerConfigTest { val securityPropsWithoutListenerPrefix = Map(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG -> "PKCS12") verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, securityPropsWithoutListenerPrefix) - val nonDynamicProps = Map(ZkConfigs.ZK_CONNECT_CONFIG -> "somehost:2181") + val nonDynamicProps = Map(KRaftConfigs.NODE_ID_CONFIG -> "123") verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, nonDynamicProps) // Test update of configs with invalid type diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 5ec8faa1b2717..f937ea6c81d72 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -49,6 +49,17 @@ import org.junit.jupiter.api.function.Executable import scala.jdk.CollectionConverters._ class KafkaConfigTest { + + def createDefaultConfig(): Properties = { + val props = new Properties() + props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker,controller") + props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") + props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "1") + props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://localhost:0,CONTROLLER://localhost:5000") + props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:5000") + props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "PLAINTEXT:PLAINTEXT,CONTROLLER:SASL_SSL") + props + } @Test def testLogRetentionTimeHoursProvided(): Unit = { @@ -547,9 +558,7 @@ class KafkaConfigTest { @Test def testListenerNameMissingFromListenerSecurityProtocolMap(): Unit = { - val props = new Properties() - props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1") - props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181") + val props = createDefaultConfig() props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "SSL://localhost:9091,REPLICATION://localhost:9092") props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, "SSL") @@ -558,9 +567,7 @@ class KafkaConfigTest { @Test def testInterBrokerListenerNameMissingFromListenerSecurityProtocolMap(): Unit = { - val props = new Properties() - props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1") - props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181") + val props = createDefaultConfig() props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "SSL://localhost:9091") props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, "REPLICATION") @@ -569,9 +576,7 @@ class KafkaConfigTest { @Test def testInterBrokerListenerNameAndSecurityProtocolSet(): Unit = { - val props = new Properties() - props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1") - props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181") + val props = createDefaultConfig() props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "SSL://localhost:9091") props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, "SSL") @@ -794,11 +799,6 @@ class KafkaConfigTest { KafkaConfig.configNames.foreach { name => name match { - case ZkConfigs.ZK_CONNECT_CONFIG => // ignore string - case ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number") - case ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number") - case ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_boolean") - case ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") case ZkConfigs.ZK_SSL_CLIENT_ENABLE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_boolean") case ZkConfigs.ZK_CLIENT_CNXN_SOCKET_CONFIG => //ignore string case ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_CONFIG => //ignore string @@ -1181,7 +1181,6 @@ class KafkaConfigTest { defaults.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "CONTROLLER://localhost:9092") defaults.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") // For ZkConnectionTimeoutMs - defaults.setProperty(ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG, "1234") defaults.setProperty(ServerConfigs.BROKER_ID_GENERATION_ENABLE_CONFIG, "false") defaults.setProperty(ServerConfigs.RESERVED_BROKER_MAX_ID_CONFIG, "1") defaults.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1") diff --git a/server/src/main/java/org/apache/kafka/server/config/ZkConfigs.java b/server/src/main/java/org/apache/kafka/server/config/ZkConfigs.java index 0fd251edd160e..b3b1b06911c61 100644 --- a/server/src/main/java/org/apache/kafka/server/config/ZkConfigs.java +++ b/server/src/main/java/org/apache/kafka/server/config/ZkConfigs.java @@ -22,23 +22,15 @@ import java.util.Map; import java.util.stream.Collectors; -import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH; import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; -import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN; -import static org.apache.kafka.common.config.ConfigDef.Type.INT; import static org.apache.kafka.common.config.ConfigDef.Type.LIST; import static org.apache.kafka.common.config.ConfigDef.Type.PASSWORD; import static org.apache.kafka.common.config.ConfigDef.Type.STRING; public final class ZkConfigs { /** ********* Zookeeper Configuration ***********/ - public static final String ZK_CONNECT_CONFIG = "zookeeper.connect"; - public static final String ZK_SESSION_TIMEOUT_MS_CONFIG = "zookeeper.session.timeout.ms"; - public static final String ZK_CONNECTION_TIMEOUT_MS_CONFIG = "zookeeper.connection.timeout.ms"; - public static final String ZK_ENABLE_SECURE_ACLS_CONFIG = "zookeeper.set.acl"; - public static final String ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG = "zookeeper.max.in.flight.requests"; public static final String ZK_SSL_CLIENT_ENABLE_CONFIG = "zookeeper.ssl.client.enable"; public static final String ZK_CLIENT_CNXN_SOCKET_CONFIG = "zookeeper.clientCnxnSocket"; public static final String ZK_SSL_KEY_STORE_LOCATION_CONFIG = "zookeeper.ssl.keystore.location"; @@ -54,15 +46,6 @@ public final class ZkConfigs { public static final String ZK_SSL_CRL_ENABLE_CONFIG = "zookeeper.ssl.crl.enable"; public static final String ZK_SSL_OCSP_ENABLE_CONFIG = "zookeeper.ssl.ocsp.enable"; - public static final String ZK_CONNECT_DOC = "Specifies the ZooKeeper connection string in the form hostname:port where host and port are the " + - "host and port of a ZooKeeper server. To allow connecting through other ZooKeeper nodes when that ZooKeeper machine is " + - "down you can also specify multiple hosts in the form hostname1:port1,hostname2:port2,hostname3:port3.\n" + - "The server can also have a ZooKeeper chroot path as part of its ZooKeeper connection string which puts its data under some path in the global ZooKeeper namespace. " + - "For example to give a chroot path of /chroot/path you would give the connection string as hostname1:port1,hostname2:port2,hostname3:port3/chroot/path."; - public static final String ZK_SESSION_TIMEOUT_MS_DOC = "Zookeeper session timeout"; - public static final String ZK_CONNECTION_TIMEOUT_MS_DOC = "The max time that the client waits to establish a connection to ZooKeeper. If not set, the value in " + ZK_SESSION_TIMEOUT_MS_CONFIG + " is used"; - public static final String ZK_ENABLE_SECURE_ACLS_DOC = "Set client to use secure ACLs"; - public static final String ZK_MAX_IN_FLIGHT_REQUESTS_DOC = "The maximum number of unacknowledged requests the client will send to ZooKeeper before blocking."; public static final String ZK_SSL_CLIENT_ENABLE_DOC; public static final String ZK_CLIENT_CNXN_SOCKET_DOC; public static final String ZK_SSL_KEY_STORE_LOCATION_DOC; @@ -81,9 +64,6 @@ public final class ZkConfigs { // a map from the Kafka config to the corresponding ZooKeeper Java system property public static final Map ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP; - public static final int ZK_SESSION_TIMEOUT_MS = 18000; - public static final boolean ZK_ENABLE_SECURE_ACLS = false; - public static final int ZK_MAX_IN_FLIGHT_REQUESTS = 10; public static final boolean ZK_SSL_CLIENT_ENABLE = false; public static final String ZK_SSL_PROTOCOL = "TLSv1.2"; public static final String ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM = "HTTPS"; @@ -152,11 +132,6 @@ public final class ZkConfigs { } public static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(ZK_CONNECT_CONFIG, STRING, null, HIGH, ZK_CONNECT_DOC) - .define(ZK_SESSION_TIMEOUT_MS_CONFIG, INT, ZK_SESSION_TIMEOUT_MS, HIGH, ZK_SESSION_TIMEOUT_MS_DOC) - .define(ZK_CONNECTION_TIMEOUT_MS_CONFIG, INT, null, HIGH, ZK_CONNECTION_TIMEOUT_MS_DOC) - .define(ZK_ENABLE_SECURE_ACLS_CONFIG, BOOLEAN, ZK_ENABLE_SECURE_ACLS, HIGH, ZK_ENABLE_SECURE_ACLS_DOC) - .define(ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG, INT, ZK_MAX_IN_FLIGHT_REQUESTS, atLeast(1), HIGH, ZK_MAX_IN_FLIGHT_REQUESTS_DOC) .define(ZK_SSL_CLIENT_ENABLE_CONFIG, BOOLEAN, ZK_SSL_CLIENT_ENABLE, MEDIUM, ZK_SSL_CLIENT_ENABLE_DOC) .define(ZK_CLIENT_CNXN_SOCKET_CONFIG, STRING, null, MEDIUM, ZK_CLIENT_CNXN_SOCKET_DOC) .define(ZK_SSL_KEY_STORE_LOCATION_CONFIG, STRING, null, MEDIUM, ZK_SSL_KEY_STORE_LOCATION_DOC) From b6992fc8566036f8ded73d26d6efeb0414d8637e Mon Sep 17 00:00:00 2001 From: m1a2st Date: Wed, 22 Jan 2025 14:24:47 +0800 Subject: [PATCH 09/13] remove unused import --- .../test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index e3b99f4753ece..3e77c76d9b4cf 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -34,7 +34,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.server.authorizer._ -import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ZkConfigs} +import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.metrics.{KafkaYammerMetrics, MetricConfigs} import org.apache.kafka.server.util.KafkaScheduler From 909aabcb23c27cb84e035dfa6665158f56b6c11e Mon Sep 17 00:00:00 2001 From: m1a2st Date: Thu, 23 Jan 2025 12:22:23 +0800 Subject: [PATCH 10/13] fix the test --- core/src/main/scala/kafka/server/Server.scala | 2 +- core/src/test/scala/unit/kafka/server/ServerTest.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/Server.scala b/core/src/main/scala/kafka/server/Server.scala index a4e3adecd823d..ca1b4b2f2c0f0 100644 --- a/core/src/main/scala/kafka/server/Server.scala +++ b/core/src/main/scala/kafka/server/Server.scala @@ -69,7 +69,7 @@ object Server { ): KafkaMetricsContext = { val contextLabels = new java.util.HashMap[String, Object] contextLabels.put(ClusterIdLabel, clusterId) - contextLabels.put(BrokerIdLabel, config.brokerId.toString) + contextLabels.put(NodeIdLabel, config.nodeId.toString) contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)) new KafkaMetricsContext(MetricsPrefix, contextLabels) } diff --git a/core/src/test/scala/unit/kafka/server/ServerTest.scala b/core/src/test/scala/unit/kafka/server/ServerTest.scala index 4b2b900b3757d..5b60d3e08f887 100644 --- a/core/src/test/scala/unit/kafka/server/ServerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerTest.scala @@ -29,7 +29,7 @@ import scala.jdk.CollectionConverters._ class ServerTest { @Test - def testCreateSelfManagedKafkaMetricsContext(): Unit = { + def testCreateKafkaMetricsContext(): Unit = { val nodeId = 0 val clusterId = Uuid.randomUuid().toString From 221fde785f41fe362fb96ad76cf814e23823eb21 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Thu, 23 Jan 2025 12:32:15 +0800 Subject: [PATCH 11/13] addressed by comments in PlaintextAdminIntegrationTest --- .../kafka/api/PlaintextAdminIntegrationTest.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index a11dee5097533..d20bd5abc950d 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -53,7 +53,7 @@ import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEX import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig} import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.security.authorizer.AclEntry -import org.apache.kafka.server.config.{KRaftConfigs, QuotaConfig, ServerConfigs, ServerLogConfigs} +import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs, ServerLogConfigs} import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogFileUtils} import org.apache.kafka.test.TestUtils.{DEFAULT_MAX_WAIT_MS, assertFutureThrows} import org.apache.logging.log4j.core.config.Configurator @@ -4082,7 +4082,7 @@ object PlaintextAdminIntegrationTest { new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4"), OpType.SET) )) alterConfigs.put(topicResource2, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy"), OpType.SET))) - alterConfigs.put(brokerResource, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(KRaftConfigs.NODE_ID_CONFIG, "123"), OpType.SET))) + alterConfigs.put(brokerResource, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, "EXTERNAL://localhost:0,INTERNAL://localhost:0"), OpType.SET))) var alterResult = admin.incrementalAlterConfigs(alterConfigs) assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.values.keySet) @@ -4111,7 +4111,7 @@ object PlaintextAdminIntegrationTest { new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4"), OpType.SET) )) alterConfigs.put(topicResource2, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "gzip"), OpType.SET))) - alterConfigs.put(brokerResource, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(KRaftConfigs.NODE_ID_CONFIG, "123"), OpType.SET))) + alterConfigs.put(brokerResource, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, "EXTERNAL://localhost:0,INTERNAL://localhost:0"), OpType.SET))) alterResult = admin.incrementalAlterConfigs(alterConfigs, new AlterConfigsOptions().validateOnly(true)) assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.values.keySet) From fe5f2091dece2f115aca5de086623e7bae2cdf3c Mon Sep 17 00:00:00 2001 From: m1a2st Date: Thu, 23 Jan 2025 12:33:39 +0800 Subject: [PATCH 12/13] update the comments --- core/src/main/scala/kafka/server/KafkaConfig.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index c0bc365f75e3d..6aea088d8c6f4 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -723,7 +723,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) val listenerNames = listeners.map(_.listenerName).toSet if (processRoles.isEmpty || processRoles.contains(ProcessRole.BrokerRole)) { - // validations for all broker setups (i.e. KRaft broker-only and KRaft co-located) + // validations for all broker setups (i.e. broker-only and co-located) validateAdvertisedBrokerListenersNonEmptyForBroker() require(advertisedBrokerListenerNames.contains(interBrokerListenerName), s"${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} must be a listener name defined in ${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG}. " + From 46a9460d8bdf923cfcedca4488a3d8bad6b33f30 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Fri, 24 Jan 2025 19:34:31 +0800 Subject: [PATCH 13/13] addressed by comments --- core/src/main/scala/kafka/server/Server.scala | 1 - .../scala/unit/kafka/server/KafkaMetricsReporterTest.scala | 3 --- 2 files changed, 4 deletions(-) diff --git a/core/src/main/scala/kafka/server/Server.scala b/core/src/main/scala/kafka/server/Server.scala index ca1b4b2f2c0f0..b2b2e21898a69 100644 --- a/core/src/main/scala/kafka/server/Server.scala +++ b/core/src/main/scala/kafka/server/Server.scala @@ -33,7 +33,6 @@ trait Server { object Server { val MetricsPrefix: String = "kafka.server" val ClusterIdLabel: String = "kafka.cluster.id" - val BrokerIdLabel: String = "kafka.broker.id" val NodeIdLabel: String = "kafka.node.id" def initializeMetrics( diff --git a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala index e07ae3032ca6b..f7e729740a7c5 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala @@ -45,7 +45,6 @@ object KafkaMetricsReporterTest { MockMetricsReporter.JMXPREFIX.set(contextLabelOrNull("_namespace", metricsContext)) MockMetricsReporter.CLUSTERID.set(contextLabelOrNull("kafka.cluster.id", metricsContext)) - MockMetricsReporter.BROKERID.set(contextLabelOrNull("kafka.broker.id", metricsContext)) MockMetricsReporter.NODEID.set(contextLabelOrNull("kafka.node.id", metricsContext)) } @@ -58,7 +57,6 @@ object KafkaMetricsReporterTest { object MockMetricsReporter { val JMXPREFIX: AtomicReference[String] = new AtomicReference[String] - val BROKERID : AtomicReference[String] = new AtomicReference[String] val NODEID : AtomicReference[String] = new AtomicReference[String] val CLUSTERID : AtomicReference[String] = new AtomicReference[String] } @@ -84,7 +82,6 @@ class KafkaMetricsReporterTest extends QuorumTestHarness { @ValueSource(strings = Array("kraft")) def testMetricsContextNamespacePresent(quorum: String): Unit = { assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.CLUSTERID.get()) - assertNull(KafkaMetricsReporterTest.MockMetricsReporter.BROKERID.get()) assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.NODEID.get()) assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.JMXPREFIX.get())