Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 0 additions & 109 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -345,14 +345,6 @@ class KafkaApis(val requestChannel: RequestChannel,
if (authorizedTopicsRequest.isEmpty) {
requestHelper.sendMaybeThrottle(request, responseBuilder.build())
CompletableFuture.completedFuture(())
} else if (request.header.apiVersion == 0) {
// For version 0, always store offsets in ZK.
commitOffsetsToZookeeper(
request,
offsetCommitRequest,
authorizedTopicsRequest,
responseBuilder
)
} else {
// For version > 0, store offsets in Coordinator.
commitOffsetsToCoordinator(
Expand All @@ -366,41 +358,6 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}

private def commitOffsetsToZookeeper(
request: RequestChannel.Request,
offsetCommitRequest: OffsetCommitRequest,
authorizedTopicsRequest: mutable.ArrayBuffer[OffsetCommitRequestData.OffsetCommitRequestTopic],
responseBuilder: OffsetCommitResponse.Builder
): CompletableFuture[Unit] = {
val zkSupport = metadataSupport.requireZkOrThrow(
KafkaApis.unsupported("Version 0 offset commit requests"))

authorizedTopicsRequest.foreach { topic =>
topic.partitions.forEach { partition =>
val error = try {
if (partition.committedMetadata != null && partition.committedMetadata.length > config.groupCoordinatorConfig.offsetMetadataMaxSize) {
Errors.OFFSET_METADATA_TOO_LARGE
} else {
zkSupport.zkClient.setOrCreateConsumerOffset(
offsetCommitRequest.data.groupId,
new TopicPartition(topic.name, partition.partitionIndex),
partition.committedOffset
)
Errors.NONE
}
} catch {
case e: Throwable =>
Errors.forException(e)
}

responseBuilder.addPartition(topic.name, partition.partitionIndex, error)
}
}

requestHelper.sendMaybeThrottle(request, responseBuilder.build())
CompletableFuture.completedFuture[Unit](())
}

private def commitOffsetsToCoordinator(
request: RequestChannel.Request,
offsetCommitRequest: OffsetCommitRequest,
Expand Down Expand Up @@ -1067,61 +1024,6 @@ class KafkaApis(val requestChannel: RequestChannel,
* Handle an offset fetch request
*/
def handleOffsetFetchRequest(request: RequestChannel.Request): CompletableFuture[Unit] = {
val version = request.header.apiVersion
if (version == 0) {
handleOffsetFetchRequestFromZookeeper(request)
} else {
handleOffsetFetchRequestFromCoordinator(request)
}
}

private def handleOffsetFetchRequestFromZookeeper(request: RequestChannel.Request): CompletableFuture[Unit] = {
val header = request.header
val offsetFetchRequest = request.body[OffsetFetchRequest]

def createResponse(requestThrottleMs: Int): AbstractResponse = {
val offsetFetchResponse =
// reject the request if not authorized to the group
if (!authHelper.authorize(request.context, DESCRIBE, GROUP, offsetFetchRequest.groupId))
offsetFetchRequest.getErrorResponse(requestThrottleMs, Errors.GROUP_AUTHORIZATION_FAILED)
else {
val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.unsupported("Version 0 offset fetch requests"))
val (authorizedPartitions, unauthorizedPartitions) = partitionByAuthorized(
offsetFetchRequest.partitions.asScala, request.context)

// version 0 reads offsets from ZK
val authorizedPartitionData = authorizedPartitions.map { topicPartition =>
try {
if (!metadataCache.contains(topicPartition))
(topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
else {
val payloadOpt = zkSupport.zkClient.getConsumerOffset(offsetFetchRequest.groupId, topicPartition)
payloadOpt match {
case Some(payload) =>
(topicPartition, new OffsetFetchResponse.PartitionData(payload,
Optional.empty(), OffsetFetchResponse.NO_METADATA, Errors.NONE))
case None =>
(topicPartition, OffsetFetchResponse.UNKNOWN_PARTITION)
}
}
} catch {
case e: Throwable =>
(topicPartition, new OffsetFetchResponse.PartitionData(OffsetFetchResponse.INVALID_OFFSET,
Optional.empty(), OffsetFetchResponse.NO_METADATA, Errors.forException(e)))
}
}.toMap

val unauthorizedPartitionData = unauthorizedPartitions.map(_ -> OffsetFetchResponse.UNAUTHORIZED_PARTITION).toMap
new OffsetFetchResponse(requestThrottleMs, Errors.NONE, (authorizedPartitionData ++ unauthorizedPartitionData).asJava)
}
trace(s"Sending offset fetch response $offsetFetchResponse for correlation id ${header.correlationId} to client ${header.clientId}.")
offsetFetchResponse
}
requestHelper.sendResponseMaybeThrottle(request, createResponse)
CompletableFuture.completedFuture[Unit](())
}

private def handleOffsetFetchRequestFromCoordinator(request: RequestChannel.Request): CompletableFuture[Unit] = {
val offsetFetchRequest = request.body[OffsetFetchRequest]
val groups = offsetFetchRequest.groups()
val requireStable = offsetFetchRequest.requireStable()
Expand Down Expand Up @@ -1232,13 +1134,6 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}

private def partitionByAuthorized(
seq: Seq[TopicPartition],
context: RequestContext
): (Seq[TopicPartition], Seq[TopicPartition]) = {
authHelper.partitionSeqByAuthorized(context, DESCRIBE, TOPIC, seq)(_.topic)
}

def handleFindCoordinatorRequest(request: RequestChannel.Request): Unit = {
val version = request.header.apiVersion
if (version < 4) {
Expand Down Expand Up @@ -4427,8 +4322,4 @@ object KafkaApis {
private[server] def shouldAlwaysForward(request: RequestChannel.Request): Exception = {
new UnsupportedVersionException(s"Should always be forwarded to the Active Controller when using a Raft-based metadata quorum: ${request.header.apiKey}")
}

private def unsupported(text: String): Exception = {
new UnsupportedVersionException(s"Unsupported when using a Raft-based metadata quorum: $text")
}
}