diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index e243d40bbb2ff..6aea088d8c6f4 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -44,7 +44,7 @@ import org.apache.kafka.security.authorizer.AuthorizerUtils import org.apache.kafka.server.ProcessRole import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.common.MetadataVersion -import org.apache.kafka.server.config.{AbstractKafkaConfig, DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ZkConfigs} +import org.apache.kafka.server.config.{AbstractKafkaConfig, DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.metrics.MetricConfigs import org.apache.kafka.server.util.Csv @@ -188,14 +188,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) def valuesFromThisConfigWithPrefixOverride(prefix: String): util.Map[String, AnyRef] = super.valuesWithPrefixOverride(prefix) - /** ********* Zookeeper Configuration ***********/ - val zkConnect: String = getString(ZkConfigs.ZK_CONNECT_CONFIG) - val zkSessionTimeoutMs: Int = getInt(ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG) - val zkConnectionTimeoutMs: Int = - Option(getInt(ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_CONFIG)).map(_.toInt).getOrElse(getInt(ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG)) - val zkEnableSecureAcls: Boolean = getBoolean(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG) - val zkMaxInFlightRequests: Int = getInt(ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG) - private val _remoteLogManagerConfig = new RemoteLogManagerConfig(this) def remoteLogManagerConfig = _remoteLogManagerConfig @@ -231,9 +223,6 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) val controllerPerformanceSamplePeriodMs: Long = getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_SAMPLE_PERIOD_MS) val controllerPerformanceAlwaysLogThresholdMs: Long = getLong(KRaftConfigs.CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS) - def requiresZookeeper: Boolean = processRoles.isEmpty - def usesSelfManagedQuorum: Boolean = processRoles.nonEmpty - private def parseProcessRoles(): Set[ProcessRole] = { val roles = getList(KRaftConfigs.PROCESS_ROLES_CONFIG).asScala.map { case "broker" => ProcessRole.BrokerRole @@ -610,7 +599,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) .map { case (listenerName, protocolName) => ListenerName.normalised(listenerName) -> getSecurityProtocol(protocolName, SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG) } - if (usesSelfManagedQuorum && !originals.containsKey(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG)) { + if (!originals.containsKey(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG)) { // Nothing was specified explicitly for listener.security.protocol.map, so we are using the default value, // and we are using KRaft. // Add PLAINTEXT mappings for controller listeners as long as there is no SSL or SASL_{PLAINTEXT,SSL} in use @@ -734,7 +723,7 @@ class KafkaConfig private(doLog: Boolean, val props: util.Map[_, _]) val listenerNames = listeners.map(_.listenerName).toSet if (processRoles.isEmpty || processRoles.contains(ProcessRole.BrokerRole)) { - // validations for all broker setups (i.e. ZooKeeper and KRaft broker-only and KRaft co-located) + // validations for all broker setups (i.e. broker-only and co-located) validateAdvertisedBrokerListenersNonEmptyForBroker() require(advertisedBrokerListenerNames.contains(interBrokerListenerName), s"${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} must be a listener name defined in ${SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG}. " + diff --git a/core/src/main/scala/kafka/server/Server.scala b/core/src/main/scala/kafka/server/Server.scala index d85060cc72db2..b2b2e21898a69 100644 --- a/core/src/main/scala/kafka/server/Server.scala +++ b/core/src/main/scala/kafka/server/Server.scala @@ -33,7 +33,6 @@ trait Server { object Server { val MetricsPrefix: String = "kafka.server" val ClusterIdLabel: String = "kafka.cluster.id" - val BrokerIdLabel: String = "kafka.broker.id" val NodeIdLabel: String = "kafka.node.id" def initializeMetrics( @@ -69,13 +68,7 @@ object Server { ): KafkaMetricsContext = { val contextLabels = new java.util.HashMap[String, Object] contextLabels.put(ClusterIdLabel, clusterId) - - if (config.usesSelfManagedQuorum) { - contextLabels.put(NodeIdLabel, config.nodeId.toString) - } else { - contextLabels.put(BrokerIdLabel, config.brokerId.toString) - } - + contextLabels.put(NodeIdLabel, config.nodeId.toString) contextLabels.putAll(config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX)) new KafkaMetricsContext(MetricsPrefix, contextLabels) } diff --git a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala index 0f23b93e31cf6..c7426c0d78efb 100644 --- a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala +++ b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala @@ -27,7 +27,7 @@ import org.apache.kafka.common.utils.Utils import org.apache.kafka.metadata.authorizer.StandardAuthorizer import org.apache.kafka.security.authorizer.AclEntry import org.apache.kafka.server.authorizer.Authorizer -import org.apache.kafka.server.config.{ServerConfigs, ZkConfigs} +import org.apache.kafka.server.config.ServerConfigs import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNull} import org.junit.jupiter.api.{AfterEach, BeforeEach, Disabled, Test, TestInfo} @@ -79,7 +79,6 @@ class DescribeAuthorizedOperationsTest extends IntegrationTestHarness with SaslS import DescribeAuthorizedOperationsTest._ override val brokerCount = 1 - this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true") this.serverConfig.setProperty(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, classOf[StandardAuthorizer].getName) var client: Admin = _ diff --git a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala index 0f4174e1250d2..d20bd5abc950d 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala @@ -53,7 +53,7 @@ import org.apache.kafka.controller.ControllerRequestContextUtil.ANONYMOUS_CONTEX import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig} import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.security.authorizer.AclEntry -import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs, ServerLogConfigs, ZkConfigs} +import org.apache.kafka.server.config.{QuotaConfig, ServerConfigs, ServerLogConfigs} import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogFileUtils} import org.apache.kafka.test.TestUtils.{DEFAULT_MAX_WAIT_MS, assertFutureThrows} import org.apache.logging.log4j.core.config.Configurator @@ -4082,7 +4082,7 @@ object PlaintextAdminIntegrationTest { new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4"), OpType.SET) )) alterConfigs.put(topicResource2, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "snappy"), OpType.SET))) - alterConfigs.put(brokerResource, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181"), OpType.SET))) + alterConfigs.put(brokerResource, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, "EXTERNAL://localhost:0,INTERNAL://localhost:0"), OpType.SET))) var alterResult = admin.incrementalAlterConfigs(alterConfigs) assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.values.keySet) @@ -4111,7 +4111,7 @@ object PlaintextAdminIntegrationTest { new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4"), OpType.SET) )) alterConfigs.put(topicResource2, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(TopicConfig.COMPRESSION_TYPE_CONFIG, "gzip"), OpType.SET))) - alterConfigs.put(brokerResource, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181"), OpType.SET))) + alterConfigs.put(brokerResource, util.Arrays.asList(new AlterConfigOp(new ConfigEntry(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, "EXTERNAL://localhost:0,INTERNAL://localhost:0"), OpType.SET))) alterResult = admin.incrementalAlterConfigs(alterConfigs, new AlterConfigsOptions().validateOnly(true)) assertEquals(Set(topicResource1, topicResource2, brokerResource).asJava, alterResult.values.keySet) diff --git a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala index 30a33c2ab647c..b41ccb6316caf 100644 --- a/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslMultiMechanismConsumerTest.scala @@ -15,7 +15,6 @@ package kafka.api import kafka.security.JaasTestUtils import kafka.utils.{TestInfoUtils, TestUtils} import org.apache.kafka.common.security.auth.SecurityProtocol -import org.apache.kafka.server.config.ZkConfigs import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout} import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.MethodSource @@ -26,7 +25,6 @@ import scala.jdk.CollectionConverters._ class SaslMultiMechanismConsumerTest extends BaseConsumerTest with SaslSetup { private val kafkaClientSaslMechanism = "PLAIN" private val kafkaServerSaslMechanisms = List("GSSAPI", "PLAIN") - this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true") override protected def securityProtocol = SecurityProtocol.SASL_SSL override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks")) override protected val serverSaslProperties = Some(kafkaServerSaslProperties(kafkaServerSaslMechanisms, kafkaClientSaslMechanism)) diff --git a/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala index 460ebe2cb4e75..22c3077f4f9b8 100644 --- a/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala +++ b/core/src/test/scala/integration/kafka/api/SaslSslConsumerTest.scala @@ -15,12 +15,10 @@ package kafka.api import kafka.security.JaasTestUtils import kafka.utils.TestUtils import org.apache.kafka.common.security.auth.SecurityProtocol -import org.apache.kafka.server.config.ZkConfigs import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo, Timeout} @Timeout(600) class SaslSslConsumerTest extends BaseConsumerTest with SaslSetup { - this.serverConfig.setProperty(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true") override protected def securityProtocol = SecurityProtocol.SASL_SSL override protected lazy val trustStoreFile = Some(TestUtils.tempFile("truststore", ".jks")) diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 10b42f96b4e54..3e77c76d9b4cf 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -34,7 +34,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.network.SocketServerConfigs import org.apache.kafka.server.authorizer._ -import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs, ZkConfigs} +import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, ServerConfigs, ServerLogConfigs} import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig import org.apache.kafka.server.metrics.{KafkaYammerMetrics, MetricConfigs} import org.apache.kafka.server.util.KafkaScheduler @@ -253,7 +253,7 @@ class DynamicBrokerConfigTest { val securityPropsWithoutListenerPrefix = Map(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG -> "PKCS12") verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, securityPropsWithoutListenerPrefix) - val nonDynamicProps = Map(ZkConfigs.ZK_CONNECT_CONFIG -> "somehost:2181") + val nonDynamicProps = Map(KRaftConfigs.NODE_ID_CONFIG -> "123") verifyConfigUpdateWithInvalidConfig(config, origProps, validProps, nonDynamicProps) // Test update of configs with invalid type diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 31c192ff9ef30..f937ea6c81d72 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -49,6 +49,17 @@ import org.junit.jupiter.api.function.Executable import scala.jdk.CollectionConverters._ class KafkaConfigTest { + + def createDefaultConfig(): Properties = { + val props = new Properties() + props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "broker,controller") + props.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") + props.setProperty(KRaftConfigs.NODE_ID_CONFIG, "1") + props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "PLAINTEXT://localhost:0,CONTROLLER://localhost:5000") + props.setProperty(QuorumConfig.QUORUM_VOTERS_CONFIG, "2@localhost:5000") + props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "PLAINTEXT:PLAINTEXT,CONTROLLER:SASL_SSL") + props + } @Test def testLogRetentionTimeHoursProvided(): Unit = { @@ -547,9 +558,7 @@ class KafkaConfigTest { @Test def testListenerNameMissingFromListenerSecurityProtocolMap(): Unit = { - val props = new Properties() - props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1") - props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181") + val props = createDefaultConfig() props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "SSL://localhost:9091,REPLICATION://localhost:9092") props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, "SSL") @@ -558,9 +567,7 @@ class KafkaConfigTest { @Test def testInterBrokerListenerNameMissingFromListenerSecurityProtocolMap(): Unit = { - val props = new Properties() - props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1") - props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181") + val props = createDefaultConfig() props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "SSL://localhost:9091") props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, "REPLICATION") @@ -569,9 +576,7 @@ class KafkaConfigTest { @Test def testInterBrokerListenerNameAndSecurityProtocolSet(): Unit = { - val props = new Properties() - props.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1") - props.setProperty(ZkConfigs.ZK_CONNECT_CONFIG, "localhost:2181") + val props = createDefaultConfig() props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, "SSL://localhost:9091") props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, "SSL") @@ -794,11 +799,6 @@ class KafkaConfigTest { KafkaConfig.configNames.foreach { name => name match { - case ZkConfigs.ZK_CONNECT_CONFIG => // ignore string - case ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number") - case ZkConfigs.ZK_CONNECTION_TIMEOUT_MS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number") - case ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_boolean") - case ZkConfigs.ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_number", "0") case ZkConfigs.ZK_SSL_CLIENT_ENABLE_CONFIG => assertPropertyInvalid(baseProperties, name, "not_a_boolean") case ZkConfigs.ZK_CLIENT_CNXN_SOCKET_CONFIG => //ignore string case ZkConfigs.ZK_SSL_KEY_STORE_LOCATION_CONFIG => //ignore string @@ -1181,7 +1181,6 @@ class KafkaConfigTest { defaults.setProperty(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "CONTROLLER://localhost:9092") defaults.setProperty(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") // For ZkConnectionTimeoutMs - defaults.setProperty(ZkConfigs.ZK_SESSION_TIMEOUT_MS_CONFIG, "1234") defaults.setProperty(ServerConfigs.BROKER_ID_GENERATION_ENABLE_CONFIG, "false") defaults.setProperty(ServerConfigs.RESERVED_BROKER_MAX_ID_CONFIG, "1") defaults.setProperty(ServerConfigs.BROKER_ID_CONFIG, "1") @@ -1198,7 +1197,6 @@ class KafkaConfigTest { defaults.setProperty(MetricConfigs.METRIC_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.toString) val config = KafkaConfig.fromProps(defaults) - assertEquals(1234, config.zkConnectionTimeoutMs) assertEquals(false, config.brokerIdGenerationEnable) assertEquals(1, config.maxReservedBrokerId) assertEquals(1, config.brokerId) diff --git a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala index e07ae3032ca6b..f7e729740a7c5 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaMetricsReporterTest.scala @@ -45,7 +45,6 @@ object KafkaMetricsReporterTest { MockMetricsReporter.JMXPREFIX.set(contextLabelOrNull("_namespace", metricsContext)) MockMetricsReporter.CLUSTERID.set(contextLabelOrNull("kafka.cluster.id", metricsContext)) - MockMetricsReporter.BROKERID.set(contextLabelOrNull("kafka.broker.id", metricsContext)) MockMetricsReporter.NODEID.set(contextLabelOrNull("kafka.node.id", metricsContext)) } @@ -58,7 +57,6 @@ object KafkaMetricsReporterTest { object MockMetricsReporter { val JMXPREFIX: AtomicReference[String] = new AtomicReference[String] - val BROKERID : AtomicReference[String] = new AtomicReference[String] val NODEID : AtomicReference[String] = new AtomicReference[String] val CLUSTERID : AtomicReference[String] = new AtomicReference[String] } @@ -84,7 +82,6 @@ class KafkaMetricsReporterTest extends QuorumTestHarness { @ValueSource(strings = Array("kraft")) def testMetricsContextNamespacePresent(quorum: String): Unit = { assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.CLUSTERID.get()) - assertNull(KafkaMetricsReporterTest.MockMetricsReporter.BROKERID.get()) assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.NODEID.get()) assertNotNull(KafkaMetricsReporterTest.MockMetricsReporter.JMXPREFIX.get()) diff --git a/core/src/test/scala/unit/kafka/server/ServerTest.scala b/core/src/test/scala/unit/kafka/server/ServerTest.scala index 4b2b900b3757d..5b60d3e08f887 100644 --- a/core/src/test/scala/unit/kafka/server/ServerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ServerTest.scala @@ -29,7 +29,7 @@ import scala.jdk.CollectionConverters._ class ServerTest { @Test - def testCreateSelfManagedKafkaMetricsContext(): Unit = { + def testCreateKafkaMetricsContext(): Unit = { val nodeId = 0 val clusterId = Uuid.randomUuid().toString diff --git a/server/src/main/java/org/apache/kafka/server/config/ZkConfigs.java b/server/src/main/java/org/apache/kafka/server/config/ZkConfigs.java index 0fd251edd160e..b3b1b06911c61 100644 --- a/server/src/main/java/org/apache/kafka/server/config/ZkConfigs.java +++ b/server/src/main/java/org/apache/kafka/server/config/ZkConfigs.java @@ -22,23 +22,15 @@ import java.util.Map; import java.util.stream.Collectors; -import static org.apache.kafka.common.config.ConfigDef.Importance.HIGH; import static org.apache.kafka.common.config.ConfigDef.Importance.LOW; import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM; -import static org.apache.kafka.common.config.ConfigDef.Range.atLeast; import static org.apache.kafka.common.config.ConfigDef.Type.BOOLEAN; -import static org.apache.kafka.common.config.ConfigDef.Type.INT; import static org.apache.kafka.common.config.ConfigDef.Type.LIST; import static org.apache.kafka.common.config.ConfigDef.Type.PASSWORD; import static org.apache.kafka.common.config.ConfigDef.Type.STRING; public final class ZkConfigs { /** ********* Zookeeper Configuration ***********/ - public static final String ZK_CONNECT_CONFIG = "zookeeper.connect"; - public static final String ZK_SESSION_TIMEOUT_MS_CONFIG = "zookeeper.session.timeout.ms"; - public static final String ZK_CONNECTION_TIMEOUT_MS_CONFIG = "zookeeper.connection.timeout.ms"; - public static final String ZK_ENABLE_SECURE_ACLS_CONFIG = "zookeeper.set.acl"; - public static final String ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG = "zookeeper.max.in.flight.requests"; public static final String ZK_SSL_CLIENT_ENABLE_CONFIG = "zookeeper.ssl.client.enable"; public static final String ZK_CLIENT_CNXN_SOCKET_CONFIG = "zookeeper.clientCnxnSocket"; public static final String ZK_SSL_KEY_STORE_LOCATION_CONFIG = "zookeeper.ssl.keystore.location"; @@ -54,15 +46,6 @@ public final class ZkConfigs { public static final String ZK_SSL_CRL_ENABLE_CONFIG = "zookeeper.ssl.crl.enable"; public static final String ZK_SSL_OCSP_ENABLE_CONFIG = "zookeeper.ssl.ocsp.enable"; - public static final String ZK_CONNECT_DOC = "Specifies the ZooKeeper connection string in the form hostname:port where host and port are the " + - "host and port of a ZooKeeper server. To allow connecting through other ZooKeeper nodes when that ZooKeeper machine is " + - "down you can also specify multiple hosts in the form hostname1:port1,hostname2:port2,hostname3:port3.\n" + - "The server can also have a ZooKeeper chroot path as part of its ZooKeeper connection string which puts its data under some path in the global ZooKeeper namespace. " + - "For example to give a chroot path of /chroot/path you would give the connection string as hostname1:port1,hostname2:port2,hostname3:port3/chroot/path."; - public static final String ZK_SESSION_TIMEOUT_MS_DOC = "Zookeeper session timeout"; - public static final String ZK_CONNECTION_TIMEOUT_MS_DOC = "The max time that the client waits to establish a connection to ZooKeeper. If not set, the value in " + ZK_SESSION_TIMEOUT_MS_CONFIG + " is used"; - public static final String ZK_ENABLE_SECURE_ACLS_DOC = "Set client to use secure ACLs"; - public static final String ZK_MAX_IN_FLIGHT_REQUESTS_DOC = "The maximum number of unacknowledged requests the client will send to ZooKeeper before blocking."; public static final String ZK_SSL_CLIENT_ENABLE_DOC; public static final String ZK_CLIENT_CNXN_SOCKET_DOC; public static final String ZK_SSL_KEY_STORE_LOCATION_DOC; @@ -81,9 +64,6 @@ public final class ZkConfigs { // a map from the Kafka config to the corresponding ZooKeeper Java system property public static final Map ZK_SSL_CONFIG_TO_SYSTEM_PROPERTY_MAP; - public static final int ZK_SESSION_TIMEOUT_MS = 18000; - public static final boolean ZK_ENABLE_SECURE_ACLS = false; - public static final int ZK_MAX_IN_FLIGHT_REQUESTS = 10; public static final boolean ZK_SSL_CLIENT_ENABLE = false; public static final String ZK_SSL_PROTOCOL = "TLSv1.2"; public static final String ZK_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM = "HTTPS"; @@ -152,11 +132,6 @@ public final class ZkConfigs { } public static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(ZK_CONNECT_CONFIG, STRING, null, HIGH, ZK_CONNECT_DOC) - .define(ZK_SESSION_TIMEOUT_MS_CONFIG, INT, ZK_SESSION_TIMEOUT_MS, HIGH, ZK_SESSION_TIMEOUT_MS_DOC) - .define(ZK_CONNECTION_TIMEOUT_MS_CONFIG, INT, null, HIGH, ZK_CONNECTION_TIMEOUT_MS_DOC) - .define(ZK_ENABLE_SECURE_ACLS_CONFIG, BOOLEAN, ZK_ENABLE_SECURE_ACLS, HIGH, ZK_ENABLE_SECURE_ACLS_DOC) - .define(ZK_MAX_IN_FLIGHT_REQUESTS_CONFIG, INT, ZK_MAX_IN_FLIGHT_REQUESTS, atLeast(1), HIGH, ZK_MAX_IN_FLIGHT_REQUESTS_DOC) .define(ZK_SSL_CLIENT_ENABLE_CONFIG, BOOLEAN, ZK_SSL_CLIENT_ENABLE, MEDIUM, ZK_SSL_CLIENT_ENABLE_DOC) .define(ZK_CLIENT_CNXN_SOCKET_CONFIG, STRING, null, MEDIUM, ZK_CLIENT_CNXN_SOCKET_DOC) .define(ZK_SSL_KEY_STORE_LOCATION_CONFIG, STRING, null, MEDIUM, ZK_SSL_KEY_STORE_LOCATION_DOC)