From 2b35ebbf7e3c71f47da865ca8084464eb2a24e2a Mon Sep 17 00:00:00 2001 From: Mingdao Yang Date: Fri, 10 Jan 2025 02:13:22 +0800 Subject: [PATCH 1/5] KAFKA-18399 Remove ZooKeeper from KafkaApis: ALTER_CLIENT_QUOTAS, handleAllocateProducerIdsRequest --- .../main/scala/kafka/server/KafkaApis.scala | 43 +------------------ 1 file changed, 2 insertions(+), 41 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 085df39c8865b..bc630091e6e1c 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -2580,34 +2580,7 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleAlterClientQuotasRequest(request: RequestChannel.Request): Unit = { - val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request)) - val alterClientQuotasRequest = request.body[AlterClientQuotasRequest] - - if (authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)) { - val result = zkSupport.adminManager.alterClientQuotas(alterClientQuotasRequest.entries.asScala, - alterClientQuotasRequest.validateOnly) - - val entriesData = result.iterator.map { case (quotaEntity, apiError) => - val entityData = quotaEntity.entries.asScala.iterator.map { case (key, value) => - new AlterClientQuotasResponseData.EntityData() - .setEntityType(key) - .setEntityName(value) - }.toBuffer - - new AlterClientQuotasResponseData.EntryData() - .setErrorCode(apiError.error.code) - .setErrorMessage(apiError.message) - .setEntity(entityData.asJava) - }.toBuffer - - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => - new AlterClientQuotasResponse(new AlterClientQuotasResponseData() - .setThrottleTimeMs(requestThrottleMs) - .setEntries(entriesData.asJava))) - } else { - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => - alterClientQuotasRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception)) - } + throw KafkaApis.shouldAlwaysForward(request) } def handleDescribeUserScramCredentialsRequest(request: RequestChannel.Request): Unit = { @@ -2813,19 +2786,7 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleAllocateProducerIdsRequest(request: RequestChannel.Request): Unit = { - val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request)) - authHelper.authorizeClusterOperation(request, CLUSTER_ACTION) - - val allocateProducerIdsRequest = request.body[AllocateProducerIdsRequest] - - if (!zkSupport.controller.isActive) - requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => - allocateProducerIdsRequest.getErrorResponse(throttleTimeMs, Errors.NOT_CONTROLLER.exception)) - else - zkSupport.controller.allocateProducerIds(allocateProducerIdsRequest.data, producerIdsResponse => - requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => - new AllocateProducerIdsResponse(producerIdsResponse.setThrottleTimeMs(throttleTimeMs))) - ) + throw KafkaApis.shouldNeverReceive(request) } private def groupVersion(): GroupVersion = { From a1383722e819c38f7eb0075f9a72ebcb2e6e0e51 Mon Sep 17 00:00:00 2001 From: Mingdao Yang Date: Tue, 14 Jan 2025 01:20:56 +0800 Subject: [PATCH 2/5] Remove testAlterClientQuotasWithAuthorizer --- .../unit/kafka/server/KafkaApisTest.scala | 42 ------------------- 1 file changed, 42 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index cdc77a8213e9d..a50dd09f1b935 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -730,48 +730,6 @@ class KafkaApisTest extends Logging { assertEquals(expectedResults, responseMap) } - @Test - def testAlterClientQuotasWithAuthorizer(): Unit = { - val authorizer: Authorizer = mock(classOf[Authorizer]) - - authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, ResourceType.CLUSTER, - Resource.CLUSTER_NAME, AuthorizationResult.DENIED) - - val quotaEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user")) - val quotas = Seq(new ClientQuotaAlteration(quotaEntity, Seq.empty.asJavaCollection)) - - val requestHeader = new RequestHeader(ApiKeys.ALTER_CLIENT_QUOTAS, ApiKeys.ALTER_CLIENT_QUOTAS.latestVersion, clientId, 0) - - val alterClientQuotasRequest = new AlterClientQuotasRequest.Builder(quotas.asJavaCollection, false) - .build(requestHeader.apiVersion) - val request = buildRequest(alterClientQuotasRequest, - fromPrivilegedListener = true, requestHeader = Option(requestHeader)) - - when(controller.isActive).thenReturn(true) - when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](), - anyLong)).thenReturn(0) - kafkaApis = createKafkaApis(authorizer = Some(authorizer)) - kafkaApis.handleAlterClientQuotasRequest(request) - - val capturedResponse = verifyNoThrottling[AlterClientQuotasResponse](request) - verifyAlterClientQuotaResult(capturedResponse, Map(quotaEntity -> Errors.CLUSTER_AUTHORIZATION_FAILED)) - - verify(authorizer).authorize(any(), any()) - verify(clientRequestQuotaManager).maybeRecordAndGetThrottleTimeMs(any(), anyLong) - } - - private def verifyAlterClientQuotaResult(response: AlterClientQuotasResponse, - expected: Map[ClientQuotaEntity, Errors]): Unit = { - val futures = expected.keys.map(quotaEntity => quotaEntity -> new KafkaFutureImpl[Void]()).toMap - response.complete(futures.asJava) - futures.foreach { - case (entity, future) => - future.whenComplete((_, thrown) => - assertEquals(thrown, expected(entity).exception()) - ).isDone - } - } - @ParameterizedTest @CsvSource(value = Array("0,1500", "1500,0", "3000,1000")) def testKRaftControllerThrottleTimeEnforced( From e6b2977ef7be4209393e4db86f8151b9973c0c26 Mon Sep 17 00:00:00 2001 From: Mingdao Yang Date: Tue, 14 Jan 2025 02:31:40 +0800 Subject: [PATCH 3/5] Remove unused import --- core/src/test/scala/unit/kafka/server/KafkaApisTest.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index a50dd09f1b935..497631d82f83c 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -35,7 +35,7 @@ import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.config.ConfigResource import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER} import org.apache.kafka.common.errors.{ClusterAuthorizationException, UnsupportedVersionException} -import org.apache.kafka.common.internals.{KafkaFutureImpl, Topic} +import org.apache.kafka.common.internals.Topic import org.apache.kafka.common.memory.MemoryPool import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic, AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection} import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnResult @@ -61,7 +61,6 @@ import org.apache.kafka.common.message._ import org.apache.kafka.common.metrics.Metrics import org.apache.kafka.common.network.{ClientInformation, ListenerName} import org.apache.kafka.common.protocol.{ApiKeys, Errors, MessageUtil} -import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity} import org.apache.kafka.common.record._ import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata From 904543457e2c3e13e485238b930cab9c9b24a289 Mon Sep 17 00:00:00 2001 From: Mingdao Yang Date: Tue, 14 Jan 2025 03:02:44 +0800 Subject: [PATCH 4/5] Remove unused handleAlterClientQuotasRequest --- core/src/main/scala/kafka/server/KafkaApis.scala | 4 ---- core/src/test/scala/unit/kafka/server/KafkaApisTest.scala | 7 ------- 2 files changed, 11 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index bc630091e6e1c..6315b9d323a06 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -2579,10 +2579,6 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def handleAlterClientQuotasRequest(request: RequestChannel.Request): Unit = { - throw KafkaApis.shouldAlwaysForward(request) - } - def handleDescribeUserScramCredentialsRequest(request: RequestChannel.Request): Unit = { val describeUserScramCredentialsRequest = request.body[DescribeUserScramCredentialsRequest] diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 497631d82f83c..7957e2e197b7a 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -9997,13 +9997,6 @@ class KafkaApisTest extends Logging { setResourceType(BROKER_LOGGER.id()))), response.data()) } - - @Test - def testRaftShouldAlwaysForwardAlterClientQuotasRequest(): Unit = { - metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0) - kafkaApis = createKafkaApis(raftSupport = true) - verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterClientQuotasRequest) - } @Test def testRaftShouldAlwaysForwardUpdateFeatures(): Unit = { From cedebdaa5a686444e84adea39a7c4753baad5efb Mon Sep 17 00:00:00 2001 From: Mingdao Yang Date: Tue, 14 Jan 2025 04:20:06 +0800 Subject: [PATCH 5/5] Remove handleAllocateProducerIdsRequest --- core/src/main/scala/kafka/server/KafkaApis.scala | 5 ----- 1 file changed, 5 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 6315b9d323a06..387a88645e837 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -224,7 +224,6 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.UNREGISTER_BROKER => forwardToController(request) case ApiKeys.DESCRIBE_TRANSACTIONS => handleDescribeTransactionsRequest(request) case ApiKeys.LIST_TRANSACTIONS => handleListTransactionsRequest(request) - case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request) case ApiKeys.DESCRIBE_QUORUM => forwardToController(request) case ApiKeys.CONSUMER_GROUP_HEARTBEAT => handleConsumerGroupHeartbeat(request).exceptionally(handleError) case ApiKeys.CONSUMER_GROUP_DESCRIBE => handleConsumerGroupDescribe(request).exceptionally(handleError) @@ -2781,10 +2780,6 @@ class KafkaApis(val requestChannel: RequestChannel, new ListTransactionsResponse(response.setThrottleTimeMs(requestThrottleMs))) } - def handleAllocateProducerIdsRequest(request: RequestChannel.Request): Unit = { - throw KafkaApis.shouldNeverReceive(request) - } - private def groupVersion(): GroupVersion = { GroupVersion.fromFeatureLevel(metadataCache.features.finalizedFeatures.getOrDefault(GroupVersion.FEATURE_NAME, 0.toShort)) }