Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 3 additions & 14 deletions core/src/main/scala/kafka/server/KafkaConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.kafka.security.authorizer.AuthorizerUtils
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.MetadataVersion
import org.apache.kafka.server.config.{AbstractKafkaConfig, DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ZkConfigs}
import org.apache.kafka.server.config.{AbstractKafkaConfig, DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.MetricConfigs
import org.apache.kafka.server.util.Csv
Expand Down Expand Up @@ -188,14 +188,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
def valuesFromThisConfigWithPrefixOverride(prefix: String): util.Map[String, AnyRef] =
super.valuesWithPrefixOverride(prefix)

/** ********* Zookeeper Configuration ***********/
val zkConnect: String = getString(ZkConfigs.ZK_CONNECT_CONFIG)
val zkSessionTimeoutMs: Int = getInt(ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG)
val zkConnectionTimeoutMs: Int =
Option(getInt(ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_CONFIG)).map(_.toInt).getOrElse(getInt(ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG))
val zkEnableSecureAcls: Boolean = getBoolean(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG)
val zkMaxInFlightRequests: Int = getInt(ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG)
Copy link
Copy Markdown
Member

@ijuma ijuma Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also remove them from ZkConfigs?


private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this)
def remoteLogManagerConfig = _remoteLogManagerConfig

Expand Down Expand Up @@ -231,9 +223,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
val controllerPerformanceSamplePeriodMs: Long = getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS)
val controllerPerformanceAlwaysLogThresholdMs: Long = getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS)

def requiresZookeeper: Boolean = processRoles.isEmpty
def usesSelfManagedQuorum: Boolean = processRoles.nonEmpty

private def parseProcessRoles(): Set[ProcessRole] = {
val roles = getList(KRaftConfigs.PROCESS_ROLES_CONFIG).asScala.map {
case "broker" => ProcessRole.BrokerRole
Expand Down Expand Up @@ -610,7 +599,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])
.map { case (listenerName, protocolName) =>
ListenerName.normalised(listenerName) -> getSecurityProtocol(protocolName, SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG)
}
if (usesSelfManagedQuorum && !originals.containsKey(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG)) {
if (!originals.containsKey(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG)) {
// Nothing was specified explicitly for listener.security.protocol.map, so we are using the default value,
// and we are using KRaft.
// Add PLAINTEXT mappings for controller listeners as long as there is no SSL or SASL_{PLAINTEXT,SSL} in use
Expand Down Expand Up @@ -734,7 +723,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _])

val listenerNames = listeners.map(_.listenerName).toSet
if (processRoles.isEmpty || processRoles.contains(ProcessRole.BrokerRole)) {
// validations for all broker setups (i.e. ZooKeeper and KRaft broker-only and KRaft co-located)
// validations for all broker setups (i.e. broker-only and co-located)
validateAdvertisedBrokerListenersNonEmptyForBroker()
require(advertisedBrokerListenerNames.contains(interBrokerListenerName),
s"${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} must be a listener name defined in ${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG}. " +
Expand Down
9 changes: 1 addition & 8 deletions core/src/main/scala/kafka/server/Server.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ trait Server {
object Server {
val MetricsPrefix: String = "kafka.server"
val ClusterIdLabel: String = "kafka.cluster.id"
val BrokerIdLabel: String = "kafka.broker.id"
val NodeIdLabel: String = "kafka.node.id"

def initializeMetrics(
Expand Down Expand Up @@ -69,13 +68,7 @@ object Server {
): KafkaMetricsContext = {
val contextLabels = new java.util.HashMap[String, Object]
contextLabels.put(ClusterIdLabel, clusterId)

if (config.usesSelfManagedQuorum) {
contextLabels.put(NodeIdLabel, config.nodeId.toString)
} else {
contextLabels.put(BrokerIdLabel, config.brokerId.toString)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove BrokerIdLabel and MockMetricsReporter.BROKERID

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

addressed it :)

}

contextLabels.put(NodeIdLabel, config.nodeId.toString)
contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX))
new KafkaMetricsContext(MetricsPrefix, contextLabels)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.kafka.common.utils.Utils
import org.apache.kafka.metadata.authorizer.StandardAuthorizer
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.config.{ServerConfigs, ZkConfigs}
import org.apache.kafka.server.config.ServerConfigs
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNull}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo}

Expand Down Expand Up @@ -79,7 +79,6 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslS
import DescribeAuthorizedOperationsTest._

override val brokerCount = 1
this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true")
this.serverConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, classOf[StandardAuthorizer].getName)

var client: Admin = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEX
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs, ServerLogConfigs, ZkConfigs}
import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogFileUtils}
import org.apache.kafka.test.TestUtils.{DEFAULT_MAX_WAIT_MS, assertFutureThrows}
import org.apache.logging.log4j.core.config.Configurator
Expand Down Expand Up @@ -4082,7 +4082,7 @@ object PlaintextAdminIntegrationTest {
new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4"), OpType.SET)
))
alterConfigs.put(topicResource2, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy"), OpType.SET)))
alterConfigs.put(brokerResource, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181"), OpType.SET)))
alterConfigs.put(brokerResource, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, "EXTERNAL://localhost:0,INTERNAL://localhost:0"), OpType.SET)))
var alterResult = admin.incrementalAlterConfigs(alterConfigs)

assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.values.keySet)
Expand Down Expand Up @@ -4111,7 +4111,7 @@ object PlaintextAdminIntegrationTest {
new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4"), OpType.SET)
))
alterConfigs.put(topicResource2, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "gzip"), OpType.SET)))
alterConfigs.put(brokerResource, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181"), OpType.SET)))
alterConfigs.put(brokerResource, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, "EXTERNAL://localhost:0,INTERNAL://localhost:0"), OpType.SET)))
alterResult = admin.incrementalAlterConfigs(alterConfigs, new AlterConfigsOptions().validateOnly(true))

assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.values.keySet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package kafka.api
import kafka.security.JaasTestUtils
import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.server.config.ZkConfigs
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.MethodSource
Expand All @@ -26,7 +25,6 @@ import scala.jdk.CollectionConverters._
class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslSetup {
private val kafkaClientSaslMechanism = "PLAIN"
private val kafkaServerSaslMechanisms = List("GSSAPI", "PLAIN")
this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true")
override protected def securityProtocol = SecurityProtocol.SASL_SSL
override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks"))
override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@ package kafka.api
import kafka.security.JaasTestUtils
import kafka.utils.TestUtils
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.server.config.ZkConfigs
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout}

@Timeout(600)
class SaslSslConsumerTest extends BaseConsumerTest with SaslSetup {
this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true")
override protected def securityProtocol = SecurityProtocol.SASL_SSL
override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ZkConfigs}
import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.{KafkaYammerMetrics, MetricConfigs}
import org.apache.kafka.server.util.KafkaScheduler
Expand Down Expand Up @@ -253,7 +253,7 @@ class DynamicBrokerConfigTest {

val securityPropsWithoutListenerPrefix = Map(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG -> "PKCS12")
verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, securityPropsWithoutListenerPrefix)
val nonDynamicProps = Map(ZkConfigs.ZK_CONNECT_CONFIG -> "somehost:2181")
val nonDynamicProps = Map(KRaftConfigs.NODE_ID_CONFIG -> "123")
verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, nonDynamicProps)

// Test update of configs with invalid type
Expand Down
30 changes: 14 additions & 16 deletions core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,17 @@ import org.junit.jupiter.api.function.Executable
import scala.jdk.CollectionConverters._

class KafkaConfigTest {

def createDefaultConfig(): Properties = {
val props = new Properties()
props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker,controller")
props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "1")
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://localhost:0,CONTROLLER://localhost:5000")
props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:5000")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this config is invalid. It uses combined mode so the id should be aligned with node.id. 2@localhost:5000 -> 1@localhost:5000 - otherwise, the test cases using this config will always get invalid KafkaConfig

@mingyen066 Could you please file a minor to fix it? @m1a2st is busy today

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I've create a minor fix PR
#18815

props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "PLAINTEXT:PLAINTEXT,CONTROLLER:SASL_SSL")
props
}

@Test
def testLogRetentionTimeHoursProvided(): Unit = {
Expand Down Expand Up @@ -547,9 +558,7 @@ class KafkaConfigTest {

@Test
def testListenerNameMissingFromListenerSecurityProtocolMap(): Unit = {
val props = new Properties()
props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
val props = createDefaultConfig()

props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "SSL://localhost:9091,REPLICATION://localhost:9092")
props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, "SSL")
Expand All @@ -558,9 +567,7 @@ class KafkaConfigTest {

@Test
def testInterBrokerListenerNameMissingFromListenerSecurityProtocolMap(): Unit = {
val props = new Properties()
props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
val props = createDefaultConfig()

props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "SSL://localhost:9091")
props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, "REPLICATION")
Expand All @@ -569,9 +576,7 @@ class KafkaConfigTest {

@Test
def testInterBrokerListenerNameAndSecurityProtocolSet(): Unit = {
val props = new Properties()
props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181")
val props = createDefaultConfig()

props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "SSL://localhost:9091")
props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, "SSL")
Expand Down Expand Up @@ -794,11 +799,6 @@ class KafkaConfigTest {

KafkaConfig.configNames.foreach { name =>
name match {
case ZkConfigs.ZK_CONNECT_CONFIG => // ignore string
case ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
case ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number")
case ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
case ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0")
case ZkConfigs.ZK_SSL_CLIENT_ENABLE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_boolean")
case ZkConfigs.ZK_CLIENT_CNXN_SOCKET_CONFIG => //ignore string
case ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_CONFIG => //ignore string
Expand Down Expand Up @@ -1181,7 +1181,6 @@ class KafkaConfigTest {
defaults.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "CONTROLLER://localhost:9092")
defaults.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
// For ZkConnectionTimeoutMs
defaults.setProperty(ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG, "1234")
defaults.setProperty(ServerConfigs.BROKER_ID_GENERATION_ENABLE_CONFIG, "false")
defaults.setProperty(ServerConfigs.RESERVED_BROKER_MAX_ID_CONFIG, "1")
defaults.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1")
Expand All @@ -1198,7 +1197,6 @@ class KafkaConfigTest {
defaults.setProperty(MetricConfigs.METRIC_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.toString)

val config = KafkaConfig.fromProps(defaults)
assertEquals(1234, config.zkConnectionTimeoutMs)
assertEquals(false, config.brokerIdGenerationEnable)
assertEquals(1, config.maxReservedBrokerId)
assertEquals(1, config.brokerId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ object KafkaMetricsReporterTest {

MockMetricsReporter.JMXPREFIX.set(contextLabelOrNull("_namespace", metricsContext))
MockMetricsReporter.CLUSTERID.set(contextLabelOrNull("kafka.cluster.id", metricsContext))
MockMetricsReporter.BROKERID.set(contextLabelOrNull("kafka.broker.id", metricsContext))
MockMetricsReporter.NODEID.set(contextLabelOrNull("kafka.node.id", metricsContext))
}

Expand All @@ -58,7 +57,6 @@ object KafkaMetricsReporterTest {

object MockMetricsReporter {
val JMXPREFIX: AtomicReference[String] = new AtomicReference[String]
val BROKERID : AtomicReference[String] = new AtomicReference[String]
val NODEID : AtomicReference[String] = new AtomicReference[String]
val CLUSTERID : AtomicReference[String] = new AtomicReference[String]
}
Expand All @@ -84,7 +82,6 @@ class KafkaMetricsReporterTest extends QuorumTestHarness {
@ValueSource(strings = Array("kraft"))
def testMetricsContextNamespacePresent(quorum: String): Unit = {
assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.CLUSTERID.get())
assertNull(KafkaMetricsReporterTest.MockMetricsReporter.BROKERID.get())
assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.NODEID.get())
assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.JMXPREFIX.get())

Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/server/ServerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import scala.jdk.CollectionConverters._
class ServerTest {

@Test
def testCreateSelfManagedKafkaMetricsContext(): Unit = {
def testCreateKafkaMetricsContext(): Unit = {
val nodeId = 0
val clusterId = Uuid.randomUuid().toString

Expand Down
Loading