Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 0 additions & 48 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,6 @@ class KafkaApis(val requestChannel: RequestChannel,
case ApiKeys.UNREGISTER_BROKER => forwardToController(request)
case ApiKeys.DESCRIBE_TRANSACTIONS => handleDescribeTransactionsRequest(request)
case ApiKeys.LIST_TRANSACTIONS => handleListTransactionsRequest(request)
case ApiKeys.ALLOCATE_PRODUCER_IDS => handleAllocateProducerIdsRequest(request)
case ApiKeys.DESCRIBE_QUORUM => forwardToController(request)
case ApiKeys.CONSUMER_GROUP_HEARTBEAT => handleConsumerGroupHeartbeat(request).exceptionally(handleError)
case ApiKeys.CONSUMER_GROUP_DESCRIBE => handleConsumerGroupDescribe(request).exceptionally(handleError)
Expand Down Expand Up @@ -2579,37 +2578,6 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}

def handleAlterClientQuotasRequest(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldAlwaysForward(request))
val alterClientQuotasRequest = request.body[AlterClientQuotasRequest]

if (authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)) {
val result = zkSupport.adminManager.alterClientQuotas(alterClientQuotasRequest.entries.asScala,
alterClientQuotasRequest.validateOnly)

val entriesData = result.iterator.map { case (quotaEntity, apiError) =>
val entityData = quotaEntity.entries.asScala.iterator.map { case (key, value) =>
new AlterClientQuotasResponseData.EntityData()
.setEntityType(key)
.setEntityName(value)
}.toBuffer

new AlterClientQuotasResponseData.EntryData()
.setErrorCode(apiError.error.code)
.setErrorMessage(apiError.message)
.setEntity(entityData.asJava)
}.toBuffer

requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
new AlterClientQuotasResponse(new AlterClientQuotasResponseData()
.setThrottleTimeMs(requestThrottleMs)
.setEntries(entriesData.asJava)))
} else {
requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
alterClientQuotasRequest.getErrorResponse(requestThrottleMs, Errors.CLUSTER_AUTHORIZATION_FAILED.exception))
}
}

def handleDescribeUserScramCredentialsRequest(request: RequestChannel.Request): Unit = {
val describeUserScramCredentialsRequest = request.body[DescribeUserScramCredentialsRequest]

Expand Down Expand Up @@ -2812,22 +2780,6 @@ class KafkaApis(val requestChannel: RequestChannel,
new ListTransactionsResponse(response.setThrottleTimeMs(requestThrottleMs)))
}

def handleAllocateProducerIdsRequest(request: RequestChannel.Request): Unit = {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)

val allocateProducerIdsRequest = request.body[AllocateProducerIdsRequest]

if (!zkSupport.controller.isActive)
requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs =>
allocateProducerIdsRequest.getErrorResponse(throttleTimeMs, Errors.NOT_CONTROLLER.exception))
else
zkSupport.controller.allocateProducerIds(allocateProducerIdsRequest.data, producerIdsResponse =>
requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs =>
new AllocateProducerIdsResponse(producerIdsResponse.setThrottleTimeMs(throttleTimeMs)))
)
}

private def groupVersion(): GroupVersion = {
GroupVersion.fromFeatureLevel(metadataCache.features.finalizedFeatures.getOrDefault(GroupVersion.FEATURE_NAME, 0.toShort))
}
Expand Down
52 changes: 1 addition & 51 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, BROKER_LOGGER}
import org.apache.kafka.common.errors.{ClusterAuthorizationException, UnsupportedVersionException}
import org.apache.kafka.common.internals.{KafkaFutureImpl, Topic}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic, AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction, AddPartitionsToTxnTransactionCollection}
import org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnResult
Expand All @@ -61,7 +61,6 @@ import org.apache.kafka.common.message._
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
import org.apache.kafka.common.protocol.{ApiKeys, Errors, MessageUtil}
import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity}
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.FindCoordinatorRequest.CoordinatorType
import org.apache.kafka.common.requests.MetadataResponse.TopicMetadata
Expand Down Expand Up @@ -730,48 +729,6 @@ class KafkaApisTest extends Logging {
assertEquals(expectedResults, responseMap)
}

@Test
def testAlterClientQuotasWithAuthorizer(): Unit = {
val authorizer: Authorizer = mock(classOf[Authorizer])

authorizeResource(authorizer, AclOperation.ALTER_CONFIGS, ResourceType.CLUSTER,
Resource.CLUSTER_NAME, AuthorizationResult.DENIED)

val quotaEntity = new ClientQuotaEntity(Collections.singletonMap(ClientQuotaEntity.USER, "user"))
val quotas = Seq(new ClientQuotaAlteration(quotaEntity, Seq.empty.asJavaCollection))

val requestHeader = new RequestHeader(ApiKeys.ALTER_CLIENT_QUOTAS, ApiKeys.ALTER_CLIENT_QUOTAS.latestVersion, clientId, 0)

val alterClientQuotasRequest = new AlterClientQuotasRequest.Builder(quotas.asJavaCollection, false)
.build(requestHeader.apiVersion)
val request = buildRequest(alterClientQuotasRequest,
fromPrivilegedListener = true, requestHeader = Option(requestHeader))

when(controller.isActive).thenReturn(true)
when(clientRequestQuotaManager.maybeRecordAndGetThrottleTimeMs(any[RequestChannel.Request](),
anyLong)).thenReturn(0)
kafkaApis = createKafkaApis(authorizer = Some(authorizer))
kafkaApis.handleAlterClientQuotasRequest(request)

val capturedResponse = verifyNoThrottling[AlterClientQuotasResponse](request)
verifyAlterClientQuotaResult(capturedResponse, Map(quotaEntity -> Errors.CLUSTER_AUTHORIZATION_FAILED))

verify(authorizer).authorize(any(), any())
verify(clientRequestQuotaManager).maybeRecordAndGetThrottleTimeMs(any(), anyLong)
}

private def verifyAlterClientQuotaResult(response: AlterClientQuotasResponse,
expected: Map[ClientQuotaEntity, Errors]): Unit = {
val futures = expected.keys.map(quotaEntity => quotaEntity -> new KafkaFutureImpl[Void]()).toMap
response.complete(futures.asJava)
futures.foreach {
case (entity, future) =>
future.whenComplete((_, thrown) =>
assertEquals(thrown, expected(entity).exception())
).isDone
}
}

@ParameterizedTest
@CsvSource(value = Array("0,1500", "1500,0", "3000,1000"))
def testKRaftControllerThrottleTimeEnforced(
Expand Down Expand Up @@ -10040,13 +9997,6 @@ class KafkaApisTest extends Logging {
setResourceType(BROKER_LOGGER.id()))),
response.data())
}

@Test
def testRaftShouldAlwaysForwardAlterClientQuotasRequest(): Unit = {
metadataCache = MetadataCache.kRaftMetadataCache(brokerId, () => KRaftVersion.KRAFT_VERSION_0)
kafkaApis = createKafkaApis(raftSupport = true)
verifyShouldAlwaysForwardErrorMessage(kafkaApis.handleAlterClientQuotasRequest)
}

@Test
def testRaftShouldAlwaysForwardUpdateFeatures(): Unit = {
Expand Down