From 12d18b8ec8062533df5fd6390a1c9177fd8afae0 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Fri, 15 Jan 2021 12:32:43 -0800 Subject: [PATCH 1/6] MINOR: Drop enable.metadata.quorum config --- .../apache/kafka/clients/NodeApiVersions.java | 4 +-- .../apache/kafka/common/config/ConfigDef.java | 2 +- .../apache/kafka/common/protocol/ApiKeys.java | 30 +++++++++---------- .../kafka/common/protocol/Protocol.java | 2 +- .../common/requests/ApiVersionsResponse.java | 4 +-- .../kafka/clients/NodeApiVersionsTest.java | 8 ++--- .../requests/ApiVersionsResponseTest.java | 6 ++-- core/src/main/scala/kafka/Kafka.scala | 5 ++-- .../scala/kafka/network/RequestChannel.scala | 8 ++--- .../scala/kafka/network/SocketServer.scala | 14 ++++----- .../kafka/server/ForwardingManager.scala | 4 +-- .../main/scala/kafka/server/KafkaApis.scala | 28 ++++++++--------- .../main/scala/kafka/server/KafkaConfig.scala | 18 ++++++----- .../main/scala/kafka/server/KafkaServer.scala | 19 ++++++------ .../scala/kafka/tools/TestRaftServer.scala | 2 +- .../admin/BrokerApiVersionsCommandTest.scala | 2 +- .../scala/unit/kafka/api/ApiVersionTest.scala | 2 +- .../integration/KafkaServerTestHarness.scala | 11 +++++-- .../AbstractApiVersionsRequestTest.scala | 2 +- ...reateTopicsRequestWithForwardingTest.scala | 6 +--- .../kafka/server/ForwardingManagerTest.scala | 2 +- .../unit/kafka/server/KafkaApisTest.scala | 13 +++++--- .../unit/kafka/server/RequestQuotaTest.scala | 6 ++-- .../scala/unit/kafka/utils/TestUtils.scala | 6 +++- 24 files changed, 108 insertions(+), 96 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java index 7588dee312931..658d481308ead 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java +++ b/clients/src/main/java/org/apache/kafka/clients/NodeApiVersions.java @@ -62,7 +62,7 @@ public static NodeApiVersions create() { */ public static NodeApiVersions create(Collection overrides) { List apiVersions = new LinkedList<>(overrides); - for (ApiKeys apiKey : ApiKeys.enabledApis()) { + for (ApiKeys apiKey : ApiKeys.brokerApis()) { boolean exists = false; for (ApiVersion apiVersion : apiVersions) { if (apiVersion.apiKey() == apiKey.id) { @@ -170,7 +170,7 @@ public String toString(boolean lineBreaks) { // Also handle the case where some apiKey types are not specified at all in the given ApiVersions, // which may happen when the remote is too old. - for (ApiKeys apiKey : ApiKeys.enabledApis()) { + for (ApiKeys apiKey : ApiKeys.brokerApis()) { if (!apiKeysText.containsKey(apiKey.id)) { StringBuilder bld = new StringBuilder(); bld.append(apiKey.name).append("("). diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index 156e08fd7123f..0de11c834d1d5 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -427,7 +427,7 @@ public ConfigDef defineInternal(final String name, final Type type, final Object } /** - * Get the configuration keys +ss * Get the configuration keys * @return a map containing all configuration keys */ public Map configKeys() { diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index ca1f4b8dfdd4d..67586d541cf2c 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -90,15 +90,15 @@ public enum ApiKeys { ALTER_CLIENT_QUOTAS(ApiMessageType.ALTER_CLIENT_QUOTAS, false, true), DESCRIBE_USER_SCRAM_CREDENTIALS(ApiMessageType.DESCRIBE_USER_SCRAM_CREDENTIALS), ALTER_USER_SCRAM_CREDENTIALS(ApiMessageType.ALTER_USER_SCRAM_CREDENTIALS, false, true), - VOTE(ApiMessageType.VOTE, true, RecordBatch.MAGIC_VALUE_V0, false, false), - BEGIN_QUORUM_EPOCH(ApiMessageType.BEGIN_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false, false), - END_QUORUM_EPOCH(ApiMessageType.END_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false, false), - DESCRIBE_QUORUM(ApiMessageType.DESCRIBE_QUORUM, true, RecordBatch.MAGIC_VALUE_V0, false, false), + VOTE(ApiMessageType.VOTE, true, RecordBatch.MAGIC_VALUE_V0, false, true), + BEGIN_QUORUM_EPOCH(ApiMessageType.BEGIN_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false, true), + END_QUORUM_EPOCH(ApiMessageType.END_QUORUM_EPOCH, true, RecordBatch.MAGIC_VALUE_V0, false, true), + DESCRIBE_QUORUM(ApiMessageType.DESCRIBE_QUORUM, true, RecordBatch.MAGIC_VALUE_V0, false, true), ALTER_ISR(ApiMessageType.ALTER_ISR, true), UPDATE_FEATURES(ApiMessageType.UPDATE_FEATURES, false, true), - ENVELOPE(ApiMessageType.ENVELOPE, true, RecordBatch.MAGIC_VALUE_V0, false, false), - FETCH_SNAPSHOT(ApiMessageType.FETCH_SNAPSHOT, false, RecordBatch.MAGIC_VALUE_V0, false, false), - DESCRIBE_CLUSTER(ApiMessageType.DESCRIBE_CLUSTER, false, RecordBatch.MAGIC_VALUE_V0, false, true); + ENVELOPE(ApiMessageType.ENVELOPE, true, RecordBatch.MAGIC_VALUE_V0, false, true), + FETCH_SNAPSHOT(ApiMessageType.FETCH_SNAPSHOT, false, RecordBatch.MAGIC_VALUE_V0, false, true), + DESCRIBE_CLUSTER(ApiMessageType.DESCRIBE_CLUSTER); // The generator ensures every `ApiMessageType` has a unique id private static final Map ID_TO_TYPE = Arrays.stream(ApiKeys.values()) @@ -116,8 +116,8 @@ public enum ApiKeys { /** indicates the minimum required inter broker magic required to support the API */ public final byte minRequiredInterBrokerMagic; - /** indicates whether the API is enabled and should be exposed in ApiVersions **/ - public final boolean isEnabled; + /** indicates whether this is an API which is only exposed by the KIP-500 controller **/ + public final boolean isControllerOnlyApi; /** indicates whether the API is enabled for forwarding **/ public final boolean forwardable; @@ -139,7 +139,7 @@ public enum ApiKeys { } ApiKeys(ApiMessageType messageType, boolean clusterAction, byte minRequiredInterBrokerMagic, boolean forwardable) { - this(messageType, clusterAction, minRequiredInterBrokerMagic, forwardable, true); + this(messageType, clusterAction, minRequiredInterBrokerMagic, forwardable, false); } ApiKeys( @@ -147,14 +147,14 @@ public enum ApiKeys { boolean clusterAction, byte minRequiredInterBrokerMagic, boolean forwardable, - boolean isEnabled + boolean isControllerOnlyApi ) { this.messageType = messageType; this.id = messageType.apiKey(); this.name = messageType.name; this.clusterAction = clusterAction; this.minRequiredInterBrokerMagic = minRequiredInterBrokerMagic; - this.isEnabled = isEnabled; + this.isControllerOnlyApi = isControllerOnlyApi; this.requiresDelayedAllocation = forwardable || shouldRetainsBufferReference(messageType.requestSchemas()); this.forwardable = forwardable; @@ -210,7 +210,7 @@ private static String toHtml() { b.append("Name\n"); b.append("Key\n"); b.append(""); - for (ApiKeys key : ApiKeys.enabledApis()) { + for (ApiKeys key : ApiKeys.brokerApis()) { b.append("\n"); b.append(""); b.append("" + key.name + ""); @@ -242,9 +242,9 @@ public void visit(Type field) { return hasBuffer.get(); } - public static List enabledApis() { + public static List brokerApis() { return Arrays.stream(values()) - .filter(api -> api.isEnabled) + .filter(api -> !api.isControllerOnlyApi) .collect(Collectors.toList()); } diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java index 0d573dbde4763..f31c613a8c51b 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java @@ -133,7 +133,7 @@ public static String toHtml() { b.append("\n"); schemaToFieldTableHtml(ResponseHeaderData.SCHEMAS[i], b); } - for (ApiKeys key : ApiKeys.enabledApis()) { + for (ApiKeys key : ApiKeys.brokerApis()) { // Key b.append("
"); b.append(""); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java index 3a6ff2fe9c6a0..bb62c96a32e90 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java @@ -119,7 +119,7 @@ private static ApiVersionsResponse createApiVersionsResponse( public static ApiVersionCollection defaultApiKeys(final byte minMagic) { ApiVersionCollection apiKeys = new ApiVersionCollection(); - for (ApiKeys apiKey : ApiKeys.enabledApis()) { + for (ApiKeys apiKey : ApiKeys.brokerApis()) { if (apiKey.minRequiredInterBrokerMagic <= minMagic) { apiKeys.add(ApiVersionsResponse.toApiVersion(apiKey)); } @@ -137,7 +137,7 @@ public static ApiVersionCollection defaultApiKeys(final byte minMagic) { public static ApiVersionCollection intersectControllerApiVersions(final byte minMagic, final Map activeControllerApiVersions) { ApiVersionCollection apiKeys = new ApiVersionCollection(); - for (ApiKeys apiKey : ApiKeys.enabledApis()) { + for (ApiKeys apiKey : ApiKeys.brokerApis()) { if (apiKey.minRequiredInterBrokerMagic <= minMagic) { ApiVersion brokerApiVersion = toApiVersion(apiKey); diff --git a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java index 01d8cf6bdc434..7c19d9f098866 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NodeApiVersionsTest.java @@ -38,7 +38,7 @@ public void testUnsupportedVersionsToString() { NodeApiVersions versions = new NodeApiVersions(new ApiVersionCollection()); StringBuilder bld = new StringBuilder(); String prefix = "("; - for (ApiKeys apiKey : ApiKeys.enabledApis()) { + for (ApiKeys apiKey : ApiKeys.brokerApis()) { bld.append(prefix).append(apiKey.name). append("(").append(apiKey.id).append("): UNSUPPORTED"); prefix = ", "; @@ -143,10 +143,10 @@ public void testUsableVersionLatestVersions() { .setMaxVersion((short) 1)); NodeApiVersions versions = new NodeApiVersions(versionList); for (ApiKeys apiKey: ApiKeys.values()) { - if (apiKey.isEnabled) { - assertEquals(apiKey.latestVersion(), versions.latestUsableVersion(apiKey)); - } else { + if (apiKey.isControllerOnlyApi) { assertNull(versions.apiVersion(apiKey)); + } else { + assertEquals(apiKey.latestVersion(), versions.latestUsableVersion(apiKey)); } } } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java index f387a06a26068..38a586917624c 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/ApiVersionsResponseTest.java @@ -40,7 +40,7 @@ public class ApiVersionsResponseTest { @Test public void shouldCreateApiResponseThatHasAllApiKeysSupportedByBroker() { - assertEquals(apiKeysInResponse(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE), new HashSet<>(ApiKeys.enabledApis())); + assertEquals(apiKeysInResponse(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE), new HashSet<>(ApiKeys.brokerApis())); assertTrue(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().supportedFeatures().isEmpty()); assertTrue(ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().finalizedFeatures().isEmpty()); assertEquals(ApiVersionsResponse.UNKNOWN_FINALIZED_FEATURES_EPOCH, ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().finalizedFeaturesEpoch()); @@ -49,9 +49,9 @@ public void shouldCreateApiResponseThatHasAllApiKeysSupportedByBroker() { @Test public void shouldHaveCorrectDefaultApiVersionsResponse() { Collection apiVersions = ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.data().apiKeys(); - assertEquals(apiVersions.size(), ApiKeys.enabledApis().size(), "API versions for all API keys must be maintained."); + assertEquals(apiVersions.size(), ApiKeys.brokerApis().size(), "API versions for all API keys must be maintained."); - for (ApiKeys key : ApiKeys.enabledApis()) { + for (ApiKeys key : ApiKeys.brokerApis()) { ApiVersion version = ApiVersionsResponse.DEFAULT_API_VERSIONS_RESPONSE.apiVersion(key.id); assertNotNull(version, "Could not find ApiVersion for API " + key.name); assertEquals(version.minVersion(), key.oldestVersion(), "Incorrect min version for Api " + key.name); diff --git a/core/src/main/scala/kafka/Kafka.scala b/core/src/main/scala/kafka/Kafka.scala index 1e1f345cf336b..4e278c95e34f8 100755 --- a/core/src/main/scala/kafka/Kafka.scala +++ b/core/src/main/scala/kafka/Kafka.scala @@ -65,11 +65,12 @@ object Kafka extends Logging { private def buildServer(props: Properties): Server = { val config = KafkaConfig.fromProps(props, false) - if (config.processRoles.isEmpty) { + if (config.requiresZookeeper) { new KafkaServer( config, Time.SYSTEM, - threadNamePrefix = None + threadNamePrefix = None, + enableForwarding = false ) } else { new KafkaRaftServer( diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 777c940077253..7d3112560d851 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -59,11 +59,11 @@ object RequestChannel extends Logging { val sanitizedUser: String = Sanitizer.sanitize(principal.getName) } - class Metrics(allowDisabledApis: Boolean = false) { + class Metrics(allowControllerOnlyApis: Boolean = false) { private val metricsMap = mutable.Map[String, RequestMetrics]() - (ApiKeys.values.toSeq.filter(_.isEnabled || allowDisabledApis).map(_.name) ++ + (ApiKeys.values.toSeq.filter(!_.isControllerOnlyApi || allowControllerOnlyApis).map(_.name) ++ Seq(RequestMetrics.consumerFetchMetricName, RequestMetrics.followFetchMetricName)).foreach { name => metricsMap.put(name, new RequestMetrics(name)) } @@ -337,9 +337,9 @@ object RequestChannel extends Logging { class RequestChannel(val queueSize: Int, val metricNamePrefix: String, time: Time, - allowDisabledApis: Boolean = false) extends KafkaMetricsGroup { + allowControllerOnlyApis: Boolean = false) extends KafkaMetricsGroup { import RequestChannel._ - val metrics = new RequestChannel.Metrics(allowDisabledApis) + val metrics = new RequestChannel.Metrics(allowControllerOnlyApis) private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize) private val processors = new ConcurrentHashMap[Int, Processor]() val requestQueueSizeMetricName = metricNamePrefix.concat(RequestQueueSizeMetric) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index 9345452ceea45..da02c5ef4efd2 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -77,7 +77,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time, val credentialProvider: CredentialProvider, - val allowDisabledApis: Boolean = false) + val allowControllerOnlyApis: Boolean = false) extends Logging with KafkaMetricsGroup with BrokerReconfigurable { private val maxQueuedRequests = config.queuedMaxRequests @@ -93,12 +93,12 @@ class SocketServer(val config: KafkaConfig, // data-plane private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]() private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]() - val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time, allowDisabledApis) + val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, DataPlaneMetricPrefix, time, allowControllerOnlyApis) // control-plane private var controlPlaneProcessorOpt : Option[Processor] = None private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ => - new RequestChannel(20, ControlPlaneMetricPrefix, time, allowDisabledApis)) + new RequestChannel(20, ControlPlaneMetricPrefix, time, allowControllerOnlyApis)) private var nextProcessorId = 0 val connectionQuotas = new ConnectionQuotas(config, time, metrics) @@ -429,7 +429,7 @@ class SocketServer(val config: KafkaConfig, memoryPool, logContext, isPrivilegedListener = isPrivilegedListener, - allowDisabledApis = allowDisabledApis + allowControllerOnlyApis = allowControllerOnlyApis ) } @@ -790,7 +790,7 @@ private[kafka] class Processor(val id: Int, logContext: LogContext, connectionQueueSize: Int = ConnectionQueueSize, isPrivilegedListener: Boolean = false, - allowDisabledApis: Boolean = false) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup { + allowControllerOnlyApis: Boolean = false) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup { private object ConnectionId { def fromString(s: String): Option[ConnectionId] = s.split("-") match { @@ -981,10 +981,10 @@ private[kafka] class Processor(val id: Int, protected def parseRequestHeader(buffer: ByteBuffer): RequestHeader = { val header = RequestHeader.parse(buffer) - if (header.apiKey.isEnabled || allowDisabledApis) { + if (!header.apiKey.isControllerOnlyApi || allowControllerOnlyApis) { header } else { - throw new InvalidRequestException("Received request for disabled api key " + header.apiKey) + throw new InvalidRequestException("Received request for KIP-500 controller-only api key " + header.apiKey) } } diff --git a/core/src/main/scala/kafka/server/ForwardingManager.scala b/core/src/main/scala/kafka/server/ForwardingManager.scala index 8d285e80ae0c8..20fda299d08f1 100644 --- a/core/src/main/scala/kafka/server/ForwardingManager.scala +++ b/core/src/main/scala/kafka/server/ForwardingManager.scala @@ -36,7 +36,7 @@ trait ForwardingManager { responseCallback: Option[AbstractResponse] => Unit ): Unit - def controllerApiVersions(): Option[NodeApiVersions] + def controllerApiVersions: Option[NodeApiVersions] def start(): Unit = {} @@ -140,7 +140,7 @@ class ForwardingManagerImpl( channelManager.sendRequest(envelopeRequest, new ForwardingResponseHandler) } - override def controllerApiVersions(): Option[NodeApiVersions] = + override def controllerApiVersions: Option[NodeApiVersions] = channelManager.controllerApiVersions() private def parseResponse( diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 09c1ba49aca99..9004cef6063de 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -101,7 +101,7 @@ class KafkaApis(val requestChannel: RequestChannel, val groupCoordinator: GroupCoordinator, val txnCoordinator: TransactionCoordinator, val controller: KafkaController, - val forwardingManager: ForwardingManager, + val forwardingManager: Option[ForwardingManager], val zkClient: KafkaZkClient, val brokerId: Int, val config: KafkaConfig, @@ -131,7 +131,7 @@ class KafkaApis(val requestChannel: RequestChannel, } private def isForwardingEnabled(request: RequestChannel.Request): Boolean = { - config.metadataQuorumEnabled && request.context.principalSerde.isPresent + forwardingManager.isDefined && request.context.principalSerde.isPresent } private def maybeForwardToController( @@ -149,12 +149,12 @@ class KafkaApis(val requestChannel: RequestChannel, } } - if (!request.isForwarded && !controller.isActive && isForwardingEnabled(request)) { - forwardingManager.forwardRequest(request, responseCallback) - } else { - // When the KIP-500 mode is off or the principal serde is undefined, forwarding is not supported, - // therefore requests are handled directly. - handler(request) + forwardingManager match { + case Some(mgr) if !request.isForwarded && !controller.isActive => + mgr.forwardRequest(request, responseCallback) + + case _ => + handler(request) } } @@ -1742,11 +1742,7 @@ class KafkaApis(val requestChannel: RequestChannel, else { val supportedFeatures = brokerFeatures.supportedFeatures val finalizedFeaturesOpt = finalizedFeatureCache.get - val controllerApiVersions = if (isForwardingEnabled(request)) { - forwardingManager.controllerApiVersions() - } else { - None - } + val controllerApiVersions = forwardingManager.flatMap(_.controllerApiVersions) val apiVersionsResponse = finalizedFeaturesOpt match { @@ -3251,9 +3247,9 @@ class KafkaApis(val requestChannel: RequestChannel, // If forwarding is not yet enabled or this request has been received on an invalid endpoint, // then we treat the request as unparsable and close the connection. - if (!config.metadataQuorumEnabled) { + if (!isForwardingEnabled(request)) { info(s"Closing connection ${request.context.connectionId} because it sent an `Envelope` " + - s"request, which is not accepted without enabling the internal config ${KafkaConfig.EnableMetadataQuorumProp}") + "request even though forwarding has not been enabled") requestHelper.closeConnection(request, Collections.emptyMap()) return } else if (!request.context.fromPrivilegedListener) { @@ -3279,7 +3275,7 @@ class KafkaApis(val requestChannel: RequestChannel, val forwardedRequestHeader = parseForwardedRequestHeader(forwardedRequestBuffer) val forwardedApi = forwardedRequestHeader.apiKey - if (!forwardedApi.forwardable || !forwardedApi.isEnabled) { + if (!forwardedApi.forwardable) { throw new InvalidRequestException(s"API $forwardedApi is not enabled or is not eligible for forwarding") } diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index fac691eb8dc1c..9e0d8160762d6 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -363,7 +363,6 @@ object KafkaConfig { val RequestTimeoutMsProp = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG val ConnectionSetupTimeoutMsProp = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG val ConnectionSetupTimeoutMaxMsProp = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG - val EnableMetadataQuorumProp = "enable.metadata.quorum" val ProcessRolesProp = "process.roles" /************* Authorizer Configuration ***********/ @@ -648,6 +647,9 @@ object KafkaConfig { val RequestTimeoutMsDoc = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC val ConnectionSetupTimeoutMsDoc = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC val ConnectionSetupTimeoutMaxMsDoc = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC + val ProcessRolesDoc = "[ALPHA] The roles that this process plays: 'broker', 'controller', or 'broker,controller' if it is both. " + + "This configuration is only for clusters upgraded for KIP-500, which replaces the dependence on Zookeeper with " + + "a self-managed Raft quorum. Leave this config undefined or empty for Zookeeper clusters." /************* Authorizer Configuration ***********/ val AuthorizerClassNameDoc = s"The fully qualified name of a class that implements s${classOf[Authorizer].getName}" + " interface, which is used by the broker for authorization. This config also supports authorizers that implement the deprecated" + @@ -1009,7 +1011,7 @@ object KafkaConfig { new ConfigDef() /** ********* Zookeeper Configuration ***********/ - .define(ZkConnectProp, STRING, HIGH, ZkConnectDoc) + .define(ZkConnectProp, STRING, "", HIGH, ZkConnectDoc) .define(ZkSessionTimeoutMsProp, INT, Defaults.ZkSessionTimeoutMs, HIGH, ZkSessionTimeoutMsDoc) .define(ZkConnectionTimeoutMsProp, INT, null, HIGH, ZkConnectionTimeoutMsDoc) .define(ZkSyncTimeMsProp, INT, Defaults.ZkSyncTimeMs, LOW, ZkSyncTimeMsDoc) @@ -1044,9 +1046,6 @@ object KafkaConfig { .define(RequestTimeoutMsProp, INT, Defaults.RequestTimeoutMs, HIGH, RequestTimeoutMsDoc) .define(ConnectionSetupTimeoutMsProp, LONG, Defaults.ConnectionSetupTimeoutMs, MEDIUM, ConnectionSetupTimeoutMsDoc) .define(ConnectionSetupTimeoutMaxMsProp, LONG, Defaults.ConnectionSetupTimeoutMaxMs, MEDIUM, ConnectionSetupTimeoutMaxMsDoc) - - // Experimental flag to turn on APIs required for the internal metadata quorum (KIP-500) - .defineInternal(EnableMetadataQuorumProp, BOOLEAN, false, LOW) .defineInternal(ProcessRolesProp, LIST, Collections.emptyList(), ValidList.in("broker", "controller"), HIGH, ProcessRolesDoc) /************* Authorizer Configuration ***********/ @@ -1480,6 +1479,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO var brokerId: Int = getInt(KafkaConfig.BrokerIdProp) val processRoles = parseProcessRoles() + def requiresZookeeper: Boolean = processRoles.isEmpty + private def parseProcessRoles(): Set[ProcessRole] = { val roles = getList(KafkaConfig.ProcessRolesProp).asScala.map { case "broker" => BrokerRole @@ -1619,9 +1620,6 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO /** ********* Feature configuration ***********/ def isFeatureVersioningSupported = interBrokerProtocolVersion >= KAFKA_2_7_IV0 - /** ********* Experimental metadata quorum configuration ***********/ - def metadataQuorumEnabled = getBoolean(KafkaConfig.EnableMetadataQuorumProp) - /** ********* Group coordinator configuration ***********/ val groupMinSessionTimeoutMs = getInt(KafkaConfig.GroupMinSessionTimeoutMsProp) val groupMaxSessionTimeoutMs = getInt(KafkaConfig.GroupMaxSessionTimeoutMsProp) @@ -1910,5 +1908,9 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO s"${KafkaConfig.FailedAuthenticationDelayMsProp}=$failedAuthenticationDelayMs should always be less than" + s" ${KafkaConfig.ConnectionsMaxIdleMsProp}=$connectionsMaxIdleMs to prevent failed" + s" authentication responses from timing out") + + if (requiresZookeeper && zkConnect.isEmpty) { + throw new ConfigException(s"Missing required configuration '${KafkaConfig.ZkConnectProp}' which has no default value.") + } } } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index b1748ef107135..fabd0199a006a 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -87,6 +87,7 @@ class KafkaServer( val config: KafkaConfig, time: Time = Time.SYSTEM, threadNamePrefix: Option[String] = None, + enableForwarding: Boolean = false ) extends Server with Logging with KafkaMetricsGroup { private val startupComplete = new AtomicBoolean(false) @@ -129,7 +130,7 @@ class KafkaServer( var kafkaController: KafkaController = null - var forwardingManager: ForwardingManager = null + var forwardingManager: Option[ForwardingManager] = None var alterIsrManager: AlterIsrManager = null @@ -254,10 +255,10 @@ class KafkaServer( // Delay starting processors until the end of the initialization sequence to ensure // that credentials have been loaded before processing authentications. // - // Note that we allow the use of disabled APIs when experimental support for - // the internal metadata quorum has been enabled + // Note that we allow the use of KIP-500 controller APIs when forwarding is enabled + // so that the Envelope request is exposed. This is only used in testing currently. socketServer = new SocketServer(config, metrics, time, credentialProvider, - allowDisabledApis = config.metadataQuorumEnabled) + allowControllerOnlyApis = enableForwarding) socketServer.startup(startProcessingRequests = false) /* start replica manager */ @@ -293,15 +294,15 @@ class KafkaServer( kafkaController = new KafkaController(config, zkClient, time, metrics, brokerInfo, brokerEpoch, tokenManager, brokerFeatures, featureCache, threadNamePrefix) kafkaController.startup() - if (config.metadataQuorumEnabled) { - forwardingManager = ForwardingManager( + if (enableForwarding) { + this.forwardingManager = Some(ForwardingManager( config, metadataCache, time, metrics, threadNamePrefix - ) - forwardingManager.start() + )) + forwardingManager.foreach(_.start()) } adminManager = new ZkAdminManager(config, metrics, metadataCache, zkClient) @@ -685,7 +686,7 @@ class KafkaServer( CoreUtils.swallow(alterIsrManager.shutdown(), this) if (forwardingManager != null) - CoreUtils.swallow(forwardingManager.shutdown(), this) + CoreUtils.swallow(forwardingManager.foreach(_.shutdown()), this) if (logManager != null) CoreUtils.swallow(logManager.shutdown(), this) diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala index 9ac3f0f4904ad..65c1cab54d72d 100644 --- a/core/src/main/scala/kafka/tools/TestRaftServer.scala +++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala @@ -66,7 +66,7 @@ class TestRaftServer( tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames) credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache) - socketServer = new SocketServer(config, metrics, time, credentialProvider, allowDisabledApis = true) + socketServer = new SocketServer(config, metrics, time, credentialProvider, allowControllerOnlyApis = true) socketServer.startup(startProcessingRequests = false) raftManager = new KafkaRaftManager[Array[Byte]]( diff --git a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala index 5dc6ba4ecbca2..6224591addb3e 100644 --- a/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala +++ b/core/src/test/scala/integration/kafka/admin/BrokerApiVersionsCommandTest.scala @@ -55,7 +55,7 @@ class BrokerApiVersionsCommandTest extends KafkaServerTestHarness { assertTrue(lineIter.hasNext) assertEquals(s"$brokerList (id: 0 rack: null) -> (", lineIter.next()) val nodeApiVersions = NodeApiVersions.create - val enabledApis = ApiKeys.enabledApis.asScala + val enabledApis = ApiKeys.brokerApis.asScala for (apiKey <- enabledApis) { val apiVersion = nodeApiVersions.apiVersion(apiKey) assertNotNull(apiVersion) diff --git a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala index f3c16fcfaf8c3..1e86876057326 100644 --- a/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala +++ b/core/src/test/scala/unit/kafka/api/ApiVersionTest.scala @@ -232,7 +232,7 @@ class ApiVersionTest { Features.emptySupportedFeatures, None ) - assertEquals(new util.HashSet[ApiKeys](ApiKeys.enabledApis), apiKeysInResponse(response)) + assertEquals(new util.HashSet[ApiKeys](ApiKeys.brokerApis), apiKeysInResponse(response)) assertEquals(AbstractResponse.DEFAULT_THROTTLE_TIME, response.throttleTimeMs) assertTrue(response.data.supportedFeatures.isEmpty) assertTrue(response.data.finalizedFeatures.isEmpty) diff --git a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala index 8eae07e69e56b..63ba97620092c 100755 --- a/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala +++ b/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala @@ -85,6 +85,7 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness { protected def serverSaslProperties: Option[Properties] = None protected def clientSaslProperties: Option[Properties] = None protected def brokerTime(brokerId: Int): Time = Time.SYSTEM + protected def enableForwarding: Boolean = false @BeforeEach override def setUp(): Unit = { @@ -98,8 +99,14 @@ abstract class KafkaServerTestHarness extends ZooKeeperTestHarness { // Add each broker to `servers` buffer as soon as it is created to ensure that brokers // are shutdown cleanly in tearDown even if a subsequent broker fails to start - for (config <- configs) - servers += TestUtils.createServer(config, time = brokerTime(config.brokerId)) + for (config <- configs) { + servers += TestUtils.createServer( + config, + time = brokerTime(config.brokerId), + threadNamePrefix = None, + enableForwarding + ) + } brokerList = TestUtils.bootstrapServers(servers, listenerName) alive = new Array[Boolean](servers.length) Arrays.fill(alive, true) diff --git a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala index fd599c6e4ea5b..d71764e433ba9 100644 --- a/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/AbstractApiVersionsRequestTest.scala @@ -47,7 +47,7 @@ abstract class AbstractApiVersionsRequestTest extends BaseRequestTest { } def validateApiVersionsResponse(apiVersionsResponse: ApiVersionsResponse, listenerName: ListenerName = interBrokerListenerName): Unit = { - val expectedApis = ApiKeys.enabledApis() + val expectedApis = ApiKeys.brokerApis() if (listenerName == controlPlaneListenerName) { expectedApis.add(ApiKeys.ENVELOPE) } diff --git a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithForwardingTest.scala b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithForwardingTest.scala index b35286d44b1ac..a538cebcc7c11 100644 --- a/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithForwardingTest.scala +++ b/core/src/test/scala/unit/kafka/server/CreateTopicsRequestWithForwardingTest.scala @@ -17,8 +17,6 @@ package kafka.server -import java.util.Properties - import org.apache.kafka.common.protocol.Errors import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test @@ -27,9 +25,7 @@ import scala.jdk.CollectionConverters._ class CreateTopicsRequestWithForwardingTest extends AbstractCreateTopicsRequestTest { - override def brokerPropertyOverrides(properties: Properties): Unit = { - properties.put(KafkaConfig.EnableMetadataQuorumProp, true.toString) - } + override def enableForwarding: Boolean = true @Test def testForwardToController(): Unit = { diff --git a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala index d94d06599fc5b..200635faf15bc 100644 --- a/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ForwardingManagerTest.scala @@ -191,7 +191,7 @@ class ForwardingManagerTest { startTimeNanos = time.nanoseconds(), memoryPool = MemoryPool.NONE, buffer = requestBuffer, - metrics = new RequestChannel.Metrics(allowDisabledApis = true), + metrics = new RequestChannel.Metrics(allowControllerOnlyApis = true), envelope = None ) } diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 22a97cee10f79..909ecc68b6687 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -124,14 +124,19 @@ class KafkaApisTest { val properties = TestUtils.createBrokerConfig(brokerId, "zk") properties.put(KafkaConfig.InterBrokerProtocolVersionProp, interBrokerProtocolVersion.toString) properties.put(KafkaConfig.LogMessageFormatVersionProp, interBrokerProtocolVersion.toString) - properties.put(KafkaConfig.EnableMetadataQuorumProp, enableForwarding.toString) + + val forwardingManagerOpt = if (enableForwarding) + Some(this.forwardingManager) + else + None + new KafkaApis(requestChannel, replicaManager, adminManager, groupCoordinator, txnCoordinator, controller, - forwardingManager, + forwardingManagerOpt, zkClient, brokerId, new KafkaConfig(properties), @@ -601,7 +606,7 @@ class KafkaApisTest { val requestHeader = new RequestHeader(ApiKeys.API_VERSIONS, ApiKeys.API_VERSIONS.latestVersion, clientId, 0) val permittedVersion: Short = 0 - EasyMock.expect(forwardingManager.controllerApiVersions()).andReturn( + EasyMock.expect(forwardingManager.controllerApiVersions).andReturn( Some(NodeApiVersions.create(ApiKeys.ALTER_CONFIGS.id, permittedVersion, permittedVersion))) val capturedResponse = expectNoThrottling() @@ -637,7 +642,7 @@ class KafkaApisTest { val requestHeader = new RequestHeader(ApiKeys.API_VERSIONS, ApiKeys.API_VERSIONS.latestVersion, clientId, 0) - EasyMock.expect(forwardingManager.controllerApiVersions()).andReturn(None) + EasyMock.expect(forwardingManager.controllerApiVersions).andReturn(None) val capturedResponse = expectNoThrottling() diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index ba074b8e89154..698ecefe751c5 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -171,7 +171,7 @@ class RequestQuotaTest extends BaseRequestTest { def testUnauthorizedThrottle(): Unit = { RequestQuotaTest.principal = RequestQuotaTest.UnauthorizedPrincipal - for (apiKey <- ApiKeys.enabledApis.asScala) { + for (apiKey <- ApiKeys.brokerApis.asScala) { submitTest(apiKey, () => checkUnauthorizedRequestThrottle(apiKey)) } @@ -739,9 +739,9 @@ class RequestQuotaTest extends BaseRequestTest { } object RequestQuotaTest { - val ClusterActions = ApiKeys.enabledApis.asScala.filter(_.clusterAction).toSet + val ClusterActions = ApiKeys.brokerApis.asScala.filter(_.clusterAction).toSet val SaslActions = Set(ApiKeys.SASL_HANDSHAKE, ApiKeys.SASL_AUTHENTICATE) - val ClientActions = ApiKeys.enabledApis.asScala.toSet -- ClusterActions -- SaslActions + val ClientActions = ApiKeys.brokerApis.asScala.toSet -- ClusterActions -- SaslActions val UnauthorizedPrincipal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "Unauthorized") // Principal used for all client connections. This is modified by tests which diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index ee4c9f7f56121..f786240f5cd8f 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -156,7 +156,11 @@ object TestUtils extends Logging { } def createServer(config: KafkaConfig, time: Time, threadNamePrefix: Option[String]): KafkaServer = { - val server = new KafkaServer(config, time, threadNamePrefix = threadNamePrefix) + createServer(config, time, threadNamePrefix, enableForwarding = false) + } + + def createServer(config: KafkaConfig, time: Time, threadNamePrefix: Option[String], enableForwarding: Boolean): KafkaServer = { + val server = new KafkaServer(config, time, threadNamePrefix, enableForwarding) server.startup() server } From 0d87e52c4136404be045f708b0f0d99b18549e3e Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 19 Jan 2021 16:31:48 -0800 Subject: [PATCH 2/6] Remove notes about `zookeeper.connect` in raft README --- raft/README.md | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/raft/README.md b/raft/README.md index 383470bc4f253..6b41879544f2b 100644 --- a/raft/README.md +++ b/raft/README.md @@ -12,8 +12,7 @@ Below we describe the details to set this up. bin/test-raft-server-start.sh config/raft.properties ### Run Multi Node Quorum ### -Create 3 separate raft quorum properties as the following -(note that the `zookeeper.connect` config is required, but unused): +Create 3 separate raft quorum properties as the following: `cat << EOF >> config/raft-quorum-1.properties` @@ -21,8 +20,6 @@ Create 3 separate raft quorum properties as the following listeners=PLAINTEXT://localhost:9092 controller.quorum.voters=1@localhost:9092,2@localhost:9093,3@localhost:9094 log.dirs=/tmp/raft-logs-1 - - zookeeper.connect=localhost:2181 EOF `cat << EOF >> config/raft-quorum-2.properties` @@ -31,8 +28,6 @@ Create 3 separate raft quorum properties as the following listeners=PLAINTEXT://localhost:9093 controller.quorum.voters=1@localhost:9092,2@localhost:9093,3@localhost:9094 log.dirs=/tmp/raft-logs-2 - - zookeeper.connect=localhost:2181 EOF `cat << EOF >> config/raft-quorum-3.properties` @@ -41,8 +36,6 @@ Create 3 separate raft quorum properties as the following listeners=PLAINTEXT://localhost:9094 controller.quorum.voters=1@localhost:9092,2@localhost:9093,3@localhost:9094 log.dirs=/tmp/raft-logs-3 - - zookeeper.connect=localhost:2181 EOF Open up 3 separate terminals, and run individual commands: From 8c8a4ee9f4c98801854ed4b4cad55faa658d5fe2 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 19 Jan 2021 16:53:14 -0800 Subject: [PATCH 3/6] Fix checkstyle error in `ApiVersionsResponse` --- .../org/apache/kafka/common/requests/ApiVersionsResponse.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java index bb62c96a32e90..2bf9360f126d6 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ApiVersionsResponse.java @@ -119,7 +119,7 @@ private static ApiVersionsResponse createApiVersionsResponse( public static ApiVersionCollection defaultApiKeys(final byte minMagic) { ApiVersionCollection apiKeys = new ApiVersionCollection(); - for (ApiKeys apiKey : ApiKeys.brokerApis()) { + for (ApiKeys apiKey : ApiKeys.brokerApis()) { if (apiKey.minRequiredInterBrokerMagic <= minMagic) { apiKeys.add(ApiVersionsResponse.toApiVersion(apiKey)); } From 95430126a60313781298dc7c7fb4610f31194a74 Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Tue, 19 Jan 2021 17:27:19 -0800 Subject: [PATCH 4/6] Fix compilation error in `MetadataRequestBenchmark` --- .../apache/kafka/jmh/metadata/MetadataRequestBenchmark.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java index 32cb417bc24fe..20897392bb453 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java @@ -22,7 +22,6 @@ import kafka.coordinator.transaction.TransactionCoordinator; import kafka.network.RequestChannel; import kafka.network.RequestConvertToJson; -import kafka.server.ZkAdminManager; import kafka.server.BrokerFeatures; import kafka.server.BrokerTopicStats; import kafka.server.ClientQuotaManager; @@ -30,7 +29,6 @@ import kafka.server.ControllerMutationQuotaManager; import kafka.server.FetchManager; import kafka.server.FinalizedFeatureCache; -import kafka.server.ForwardingManager; import kafka.server.KafkaApis; import kafka.server.KafkaConfig; import kafka.server.KafkaConfig$; @@ -38,6 +36,7 @@ import kafka.server.QuotaFactory; import kafka.server.ReplicaManager; import kafka.server.ReplicationQuotaManager; +import kafka.server.ZkAdminManager; import kafka.zk.KafkaZkClient; import org.apache.kafka.common.memory.MemoryPool; import org.apache.kafka.common.message.UpdateMetadataRequestData.UpdateMetadataBroker; @@ -99,7 +98,6 @@ public class MetadataRequestBenchmark { private ZkAdminManager adminManager = Mockito.mock(ZkAdminManager.class); private TransactionCoordinator transactionCoordinator = Mockito.mock(TransactionCoordinator.class); private KafkaController kafkaController = Mockito.mock(KafkaController.class); - private ForwardingManager forwardingManager = Mockito.mock(ForwardingManager.class); private KafkaZkClient kafkaZkClient = Mockito.mock(KafkaZkClient.class); private Metrics metrics = new Metrics(); private int brokerId = 1; @@ -176,7 +174,7 @@ private KafkaApis createKafkaApis() { groupCoordinator, transactionCoordinator, kafkaController, - forwardingManager, + Option.empty(), kafkaZkClient, brokerId, new KafkaConfig(kafkaProps), From 1cfba6a1ca0484e917f1e87d23f977eb64c1cccc Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Wed, 20 Jan 2021 12:16:09 -0800 Subject: [PATCH 5/6] Fix failing tests and make `process.roles` internal again --- .../java/org/apache/kafka/common/config/ConfigDef.java | 2 +- core/src/main/scala/kafka/server/KafkaConfig.scala | 9 +++------ 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java index 0de11c834d1d5..156e08fd7123f 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java +++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java @@ -427,7 +427,7 @@ public ConfigDef defineInternal(final String name, final Type type, final Object } /** -ss * Get the configuration keys + * Get the configuration keys * @return a map containing all configuration keys */ public Map configKeys() { diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 9e0d8160762d6..e9334596ddb00 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -647,7 +647,7 @@ object KafkaConfig { val RequestTimeoutMsDoc = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC val ConnectionSetupTimeoutMsDoc = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_DOC val ConnectionSetupTimeoutMaxMsDoc = CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_DOC - val ProcessRolesDoc = "[ALPHA] The roles that this process plays: 'broker', 'controller', or 'broker,controller' if it is both. " + + val ProcessRolesDoc = "The roles that this process plays: 'broker', 'controller', or 'broker,controller' if it is both. " + "This configuration is only for clusters upgraded for KIP-500, which replaces the dependence on Zookeeper with " + "a self-managed Raft quorum. Leave this config undefined or empty for Zookeeper clusters." /************* Authorizer Configuration ***********/ @@ -999,9 +999,6 @@ object KafkaConfig { val PasswordEncoderKeyLengthDoc = "The key length used for encoding dynamically configured passwords." val PasswordEncoderIterationsDoc = "The iteration count used for encoding dynamically configured passwords." - /** ********* Experimental metadata quorum configuration ***********/ - val ProcessRolesDoc = "This configuration determines what roles this process should play: broker, controller, or both" - private val configDef = { import ConfigDef.Importance._ import ConfigDef.Range._ @@ -1011,7 +1008,7 @@ object KafkaConfig { new ConfigDef() /** ********* Zookeeper Configuration ***********/ - .define(ZkConnectProp, STRING, "", HIGH, ZkConnectDoc) + .define(ZkConnectProp, STRING, null, HIGH, ZkConnectDoc) .define(ZkSessionTimeoutMsProp, INT, Defaults.ZkSessionTimeoutMs, HIGH, ZkSessionTimeoutMsDoc) .define(ZkConnectionTimeoutMsProp, INT, null, HIGH, ZkConnectionTimeoutMsDoc) .define(ZkSyncTimeMsProp, INT, Defaults.ZkSyncTimeMs, LOW, ZkSyncTimeMsDoc) @@ -1909,7 +1906,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO s" ${KafkaConfig.ConnectionsMaxIdleMsProp}=$connectionsMaxIdleMs to prevent failed" + s" authentication responses from timing out") - if (requiresZookeeper && zkConnect.isEmpty) { + if (requiresZookeeper && zkConnect == null) { throw new ConfigException(s"Missing required configuration '${KafkaConfig.ZkConnectProp}' which has no default value.") } } From 50fa8e955b3f146b624224917374eaf06575a71b Mon Sep 17 00:00:00 2001 From: Jason Gustafson Date: Thu, 21 Jan 2021 11:44:49 -0800 Subject: [PATCH 6/6] Add test cases for zookeeper.connect/process.roles requirement --- .../main/scala/kafka/tools/TestRaftServer.scala | 5 +++++ .../unit/kafka/server/KafkaConfigTest.scala | 17 +++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala index 65c1cab54d72d..68b2d0609e644 100644 --- a/core/src/main/scala/kafka/tools/TestRaftServer.scala +++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala @@ -413,6 +413,11 @@ object TestRaftServer extends Logging { val configFile = opts.options.valueOf(opts.configOpt) val serverProps = Utils.loadProps(configFile) + + // KafkaConfig requires either `process.roles` or `zookeeper.connect`. Neither are + // actually used by the test server, so we fill in `process.roles` with an arbitrary value. + serverProps.put(KafkaConfig.ProcessRolesProp, "controller") + val config = KafkaConfig.fromProps(serverProps, doLog = false) val throughput = opts.options.valueOf(opts.throughputOpt) val recordSize = opts.options.valueOf(opts.recordSizeOpt) diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala index 863dfd834dbc5..6cbf04a690986 100755 --- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala @@ -995,4 +995,21 @@ class KafkaConfigTest { val raftConfig = new RaftConfig(KafkaConfig.fromProps(props)) assertEquals(expectedVoters, raftConfig.quorumVoterConnections()) } + + @Test + def testZookeeperConnectRequiredIfEmptyProcessRoles(): Unit = { + val props = new Properties() + props.put(KafkaConfig.ProcessRolesProp, "") + props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092") + assertFalse(isValidKafkaConfig(props)) + } + + @Test + def testZookeeperConnectNotRequiredIfNonEmptyProcessRoles(): Unit = { + val props = new Properties() + props.put(KafkaConfig.ProcessRolesProp, "broker") + props.put(KafkaConfig.ListenersProp, "PLAINTEXT://127.0.0.1:9092") + assertTrue(isValidKafkaConfig(props)) + } + }