From 3ebe588c253cb0f33f4188698319e0ccfd0aac30 Mon Sep 17 00:00:00 2001 From: PoAn Yang Date: Tue, 4 Jun 2024 18:01:37 +0800 Subject: [PATCH] KAFKA-16483: Remove preAppendErrors from createPutCacheCallback Signed-off-by: PoAn Yang --- .../group/GroupMetadataManager.scala | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index e89876e443ce3..b5fddac6f4070 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -27,7 +27,7 @@ import java.util.concurrent.ConcurrentHashMap import com.yammer.metrics.core.Gauge import kafka.common.OffsetAndMetadata import kafka.coordinator.group.GroupMetadataManager.maybeConvertOffsetCommitError -import kafka.server.{LogAppendResult, ReplicaManager, RequestLocal} +import kafka.server.{ReplicaManager, RequestLocal} import kafka.utils.CoreUtils.inLock import kafka.utils.Implicits._ import kafka.utils._ @@ -377,14 +377,13 @@ class GroupMetadataManager(brokerId: Int, } private def createPutCacheCallback(isTxnOffsetCommit: Boolean, - group: GroupMetadata, - consumerId: String, - offsetMetadata: immutable.Map[TopicIdPartition, OffsetAndMetadata], - filteredOffsetMetadata: Map[TopicIdPartition, OffsetAndMetadata], - responseCallback: immutable.Map[TopicIdPartition, Errors] => Unit, - producerId: Long, - records: Map[TopicPartition, MemoryRecords], - preAppendErrors: Map[TopicPartition, LogAppendResult] = Map.empty): Map[TopicPartition, PartitionResponse] => Unit = { + group: GroupMetadata, + consumerId: String, + offsetMetadata: immutable.Map[TopicIdPartition, OffsetAndMetadata], + filteredOffsetMetadata: Map[TopicIdPartition, OffsetAndMetadata], + responseCallback: immutable.Map[TopicIdPartition, Errors] => Unit, + producerId: Long, + records: Map[TopicPartition, MemoryRecords]): Map[TopicPartition, PartitionResponse] => Unit = { val offsetTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, partitionFor(group.groupId)) // set the callback function to insert offsets into cache after log append completed def putCacheCallback(responseStatus: Map[TopicPartition, PartitionResponse]): Unit = { @@ -435,8 +434,6 @@ class GroupMetadataManager(brokerId: Int, val commitStatus = offsetMetadata.map { case (topicIdPartition, offsetAndMetadata) => if (!validateOffsetMetadataLength(offsetAndMetadata.metadata)) (topicIdPartition, Errors.OFFSET_METADATA_TOO_LARGE) - else if (preAppendErrors.contains(topicIdPartition.topicPartition)) - (topicIdPartition, preAppendErrors(topicIdPartition.topicPartition).error) else (topicIdPartition, responseError) }