From e7060bdc542df412eace00ab719c0b11ba534020 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Wed, 23 Nov 2022 16:03:32 +0100 Subject: [PATCH 1/4] KAFKA-14367; Add `TnxOffsetCommit` to the new `GroupCoordinator` interface --- .../requests/TxnOffsetCommitRequest.java | 10 + .../server/builders/KafkaApisBuilder.java | 2 +- .../group/GroupCoordinatorAdapter.scala | 77 ++++- .../scala/kafka/server/BrokerServer.scala | 2 +- .../main/scala/kafka/server/KafkaApis.scala | 211 ++++++++---- .../main/scala/kafka/server/KafkaServer.scala | 2 +- .../group/GroupCoordinatorAdapterTest.scala | 95 ++++- .../unit/kafka/server/KafkaApisTest.scala | 324 +++++++++++++++--- .../coordinator/group/GroupCoordinator.java | 17 + 9 files changed, 616 insertions(+), 124 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java index 27b13ba220251..8c3ec12186dc4 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java @@ -82,6 +82,11 @@ public Builder(final String transactionalId, .setGroupInstanceId(groupInstanceId.orElse(null)); } + public Builder(final TxnOffsetCommitRequestData data) { + super(ApiKeys.TXN_OFFSET_COMMIT); + this.data = data; + } + @Override public TxnOffsetCommitRequest build(short version) { if (version < 3 && groupMetadataSet()) { @@ -179,6 +184,11 @@ public TxnOffsetCommitResponse getErrorResponse(int throttleTimeMs, Throwable e) .setTopics(responseTopicData)); } + @Override + public TxnOffsetCommitResponse getErrorResponse(Throwable e) { + return getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e); + } + public static TxnOffsetCommitRequest parse(ByteBuffer buffer, short version) { return new TxnOffsetCommitRequest(new TxnOffsetCommitRequestData( new ByteBufferAccessor(buffer), version), version); diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java index 95c18a6f3e3f4..18cd42c77cba1 100644 --- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java +++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java @@ -179,7 +179,7 @@ public KafkaApis build() { metadataSupport, replicaManager, groupCoordinator, - new GroupCoordinatorAdapter(groupCoordinator), + new GroupCoordinatorAdapter(groupCoordinator, time), txnCoordinator, autoTopicCreationManager, brokerId, diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala index 4e96e4373a73a..5a3d82c2ec994 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala @@ -16,15 +16,20 @@ */ package kafka.coordinator.group +import kafka.common.OffsetAndMetadata import kafka.server.RequestLocal import kafka.utils.Implicits.MapExtensionMethods -import org.apache.kafka.common.message.{DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, SyncGroupRequestData, SyncGroupResponseData} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.{DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.requests.RequestContext -import org.apache.kafka.common.utils.BufferSupplier +import org.apache.kafka.common.utils.{BufferSupplier, Time} import java.util +import java.util.Optional import java.util.concurrent.CompletableFuture -import scala.collection.immutable +import scala.collection.{immutable, mutable} import scala.jdk.CollectionConverters._ /** @@ -32,7 +37,8 @@ import scala.jdk.CollectionConverters._ * that exposes the new org.apache.kafka.coordinator.group.GroupCoordinator interface. */ class GroupCoordinatorAdapter( - val coordinator: GroupCoordinator + private val coordinator: GroupCoordinator, + private val time: Time ) extends org.apache.kafka.coordinator.group.GroupCoordinator { override def joinGroup( @@ -234,4 +240,67 @@ class GroupCoordinatorAdapter( } CompletableFuture.completedFuture(results) } + + override def commitTransactionalOffsets( + context: RequestContext, + request: TxnOffsetCommitRequestData, + bufferSupplier: BufferSupplier + ): CompletableFuture[TxnOffsetCommitResponseData] = { + val future = new CompletableFuture[TxnOffsetCommitResponseData]() + + def callback(results: Map[TopicPartition, Errors]): Unit = { + val response = new TxnOffsetCommitResponseData() + val byTopics = new util.HashMap[String, TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic]() + + results.forKeyValue { (tp, error) => + var topic = byTopics.get(tp.topic) + if (topic == null) { + topic = new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic().setName(tp.topic) + byTopics.put(tp.topic, topic) + response.topics.add(topic) + } + topic.partitions.add(new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(tp.partition) + .setErrorCode(error.code)) + } + + future.complete(response) + } + + val currentTimestamp = time.milliseconds + val partitions = new mutable.HashMap[TopicPartition, OffsetAndMetadata]() + + request.topics.forEach { topic => + topic.partitions.forEach { partition => + val tp = new TopicPartition(topic.name, partition.partitionIndex) + partitions += tp -> new OffsetAndMetadata( + offset = partition.committedOffset, + leaderEpoch = partition.committedLeaderEpoch match { + case RecordBatch.NO_PARTITION_LEADER_EPOCH => Optional.empty[Integer] + case committedLeaderEpoch => Optional.of[Integer](committedLeaderEpoch) + }, + metadata = partition.committedMetadata match { + case null => OffsetAndMetadata.NoMetadata + case metadata => metadata + }, + commitTimestamp = currentTimestamp, + expireTimestamp = None + ) + } + } + + coordinator.handleTxnCommitOffsets( + request.groupId, + request.producerId, + request.producerEpoch, + request.memberId, + Option(request.groupInstanceId), + request.generationId, + partitions.toMap, + callback, + RequestLocal(bufferSupplier) + ) + + future + } } diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index cbe700358f6f6..dafffdc3d0c16 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -407,7 +407,7 @@ class BrokerServer( metadataSupport = raftSupport, replicaManager = replicaManager, groupCoordinator = groupCoordinator, - newGroupCoordinator = new GroupCoordinatorAdapter(groupCoordinator), + newGroupCoordinator = new GroupCoordinatorAdapter(groupCoordinator, time), txnCoordinator = transactionCoordinator, autoTopicCreationManager = autoTopicCreationManager, brokerId = config.nodeId, diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 662b61a5fd39a..8786a5cd5f018 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -208,7 +208,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request, requestLocal) case ApiKeys.END_TXN => handleEndTxnRequest(request, requestLocal) case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request, requestLocal) - case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request, requestLocal) + case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request, requestLocal).exceptionally(handleError) case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request) case ApiKeys.CREATE_ACLS => maybeForwardToController(request, handleCreateAcls) case ApiKeys.DELETE_ACLS => maybeForwardToController(request, handleDeleteAcls) @@ -2464,87 +2464,166 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def handleTxnOffsetCommitRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { + def handleTxnOffsetCommitRequest( + request: RequestChannel.Request, + requestLocal: RequestLocal + ): CompletableFuture[Unit] = { ensureInterBrokerVersion(IBP_0_11_0_IV0) - val header = request.header val txnOffsetCommitRequest = request.body[TxnOffsetCommitRequest] + def sendResponse(response: TxnOffsetCommitResponse): Unit = { + trace(s"Sending tnx offset commit response $response for correlation id ${request.header.correlationId} to " + + s"client ${request.header.clientId}.") + + if (isDebugEnabled) { + response.data.topics.forEach { topic => + topic.partitions.forEach { partition => + if (partition.errorCode != Errors.NONE.code) { + debug(s"TxnOffsetCommit with correlation id ${request.header.correlationId} from client ${request.header.clientId} " + + s"on partition ${topic.name}-${partition.partitionIndex} failed due to ${Errors.forCode(partition.errorCode)}") + } + } + } + } + + // We need to replace COORDINATOR_LOAD_IN_PROGRESS with COORDINATOR_NOT_AVAILABLE + // for older producer client from 0.11 to prior 2.0, which could potentially crash due + // to unexpected loading error. This bug is fixed later by KAFKA-7296. Clients using + // txn commit protocol >= 2 (version 2.3 and onwards) are guaranteed to have + // the fix to check for the loading error. + if (txnOffsetCommitRequest.version < 2) { + response.data.topics.forEach { topic => + topic.partitions.forEach { partition => + if (partition.errorCode == Errors.COORDINATOR_LOAD_IN_PROGRESS.code) { + partition.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code) + } + } + } + } + + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { + response.maybeSetThrottleTimeMs(requestThrottleMs) + response + }) + } + // authorize for the transactionalId and the consumer group. Note that we skip producerId authorization // since it is implied by transactionalId authorization - if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, txnOffsetCommitRequest.data.transactionalId)) - requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception) - else if (!authHelper.authorize(request.context, READ, GROUP, txnOffsetCommitRequest.data.groupId)) - requestHelper.sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception) - else { - val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]() - val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]() - val authorizedTopicCommittedOffsets = mutable.Map[TopicPartition, TxnOffsetCommitRequest.CommittedOffset]() - val committedOffsets = txnOffsetCommitRequest.offsets.asScala - val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, committedOffsets)(_._1.topic) + if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, txnOffsetCommitRequest.data.transactionalId)) { + sendResponse(txnOffsetCommitRequest.getErrorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)) + CompletableFuture.completedFuture[Unit](()) + } else if (!authHelper.authorize(request.context, READ, GROUP, txnOffsetCommitRequest.data.groupId)) { + sendResponse(txnOffsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) + CompletableFuture.completedFuture[Unit](()) + } else { + val txnOffsetCommitResponseData = new TxnOffsetCommitResponseData() + val topicsPendingPartitions = new mutable.HashMap[String, TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic]() + + def makePartitionResponse( + partitionIndex: Int, + error: Errors + ): TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition = { + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(partitionIndex) + .setErrorCode(error.code) + } - for ((topicPartition, commitedOffset) <- committedOffsets) { - if (!authorizedTopics.contains(topicPartition.topic)) - unauthorizedTopicErrors += topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED - else if (!metadataCache.contains(topicPartition)) - nonExistingTopicErrors += topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION - else - authorizedTopicCommittedOffsets += (topicPartition -> commitedOffset) + def addTopicToResponse( + topic: TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic, + error: Errors + ): Unit = { + val topicResponse = new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic().setName(topic.name) + txnOffsetCommitResponseData.topics.add(topicResponse) + topic.partitions.forEach { partition => + topicResponse.partitions.add(makePartitionResponse(partition.partitionIndex, error)) + } } - // the callback for sending an offset commit response - def sendResponseCallback(authorizedTopicErrors: Map[TopicPartition, Errors]): Unit = { - val combinedCommitStatus = mutable.Map() ++= authorizedTopicErrors ++= unauthorizedTopicErrors ++= nonExistingTopicErrors - if (isDebugEnabled) - combinedCommitStatus.forKeyValue { (topicPartition, error) => - if (error != Errors.NONE) { - debug(s"TxnOffsetCommit with correlation id ${header.correlationId} from client ${header.clientId} " + - s"on partition $topicPartition failed due to ${error.exceptionName}") + val authorizedTopics = authHelper.filterByAuthorized( + request.context, + READ, + TOPIC, + txnOffsetCommitRequest.data.topics.asScala + )(_.name) + + val authorizedTopicCommittedOffsets = new util.ArrayList[TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic]() + txnOffsetCommitRequest.data.topics.forEach { topic => + if (!authorizedTopics.contains(topic.name)) { + addTopicToResponse(topic, Errors.TOPIC_AUTHORIZATION_FAILED) + } else if (!metadataCache.contains(topic.name)) { + addTopicToResponse(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION) + } else { + var topicRequestWithValidPartitions: TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic = null + var topicResponseWithInvalidPartitions: TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic = null + + topic.partitions.forEach { partition => + if (metadataCache.getPartitionInfo(topic.name, partition.partitionIndex).nonEmpty) { + if (topicRequestWithValidPartitions == null) { + topicRequestWithValidPartitions = new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic().setName(topic.name) + } + topicRequestWithValidPartitions.partitions.add(partition) + } else { + if (topicResponseWithInvalidPartitions == null) { + topicResponseWithInvalidPartitions = new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic().setName(topic.name) + } + topicResponseWithInvalidPartitions.partitions.add(makePartitionResponse(partition.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)) } } - // We need to replace COORDINATOR_LOAD_IN_PROGRESS with COORDINATOR_NOT_AVAILABLE - // for older producer client from 0.11 to prior 2.0, which could potentially crash due - // to unexpected loading error. This bug is fixed later by KAFKA-7296. Clients using - // txn commit protocol >= 2 (version 2.3 and onwards) are guaranteed to have - // the fix to check for the loading error. - if (txnOffsetCommitRequest.version < 2) { - combinedCommitStatus ++= combinedCommitStatus.collect { - case (tp, error) if error == Errors.COORDINATOR_LOAD_IN_PROGRESS => tp -> Errors.COORDINATOR_NOT_AVAILABLE + if (topicRequestWithValidPartitions != null) { + authorizedTopicCommittedOffsets.add(topicRequestWithValidPartitions) } - } - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => - new TxnOffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava)) + if (topicResponseWithInvalidPartitions != null) { + txnOffsetCommitResponseData.topics.add(topicResponseWithInvalidPartitions) + // We keep track of topics with both known and unknown partitions such + // that we can merge the response from the coordinator into it later on. + if (topicRequestWithValidPartitions != null) { + topicsPendingPartitions += topic.name -> topicResponseWithInvalidPartitions + } + } + } } - if (authorizedTopicCommittedOffsets.isEmpty) - sendResponseCallback(Map.empty) - else { - val offsetMetadata = convertTxnOffsets(authorizedTopicCommittedOffsets.toMap) - groupCoordinator.handleTxnCommitOffsets( - txnOffsetCommitRequest.data.groupId, - txnOffsetCommitRequest.data.producerId, - txnOffsetCommitRequest.data.producerEpoch, - txnOffsetCommitRequest.data.memberId, - Option(txnOffsetCommitRequest.data.groupInstanceId), - txnOffsetCommitRequest.data.generationId, - offsetMetadata, - sendResponseCallback, - requestLocal) - } - } - } + if (authorizedTopicCommittedOffsets.isEmpty) { + sendResponse(new TxnOffsetCommitResponse(txnOffsetCommitResponseData)) + CompletableFuture.completedFuture[Unit](()) + } else { + val txnOffsetCommitRequestData = new TxnOffsetCommitRequestData() + .setGroupId(txnOffsetCommitRequest.data.groupId) + .setMemberId(txnOffsetCommitRequest.data.memberId) + .setGenerationId(txnOffsetCommitRequest.data.generationId) + .setGroupInstanceId(txnOffsetCommitRequest.data.groupInstanceId) + .setProducerEpoch(txnOffsetCommitRequest.data.producerEpoch) + .setProducerId(txnOffsetCommitRequest.data.producerId) + .setTransactionalId(txnOffsetCommitRequest.data.transactionalId) + .setTopics(authorizedTopicCommittedOffsets) + + newGroupCoordinator.commitTransactionalOffsets( + request.context, + txnOffsetCommitRequestData, + requestLocal.bufferSupplier + ).handle[Unit] { (response, exception) => + if (exception != null) { + sendResponse(txnOffsetCommitRequest.getErrorResponse(exception)) + } else { + if (txnOffsetCommitResponseData.topics.isEmpty) { + sendResponse(new TxnOffsetCommitResponse(response)) + } else { + response.topics.forEach { topic => + topicsPendingPartitions.get(topic.name) match { + case None => + txnOffsetCommitResponseData.topics.add(topic) - private def convertTxnOffsets(offsetsMap: immutable.Map[TopicPartition, TxnOffsetCommitRequest.CommittedOffset]): immutable.Map[TopicPartition, OffsetAndMetadata] = { - val currentTimestamp = time.milliseconds - offsetsMap.map { case (topicPartition, partitionData) => - val metadata = if (partitionData.metadata == null) OffsetAndMetadata.NoMetadata else partitionData.metadata - topicPartition -> new OffsetAndMetadata( - offset = partitionData.offset, - leaderEpoch = partitionData.leaderEpoch, - metadata = metadata, - commitTimestamp = currentTimestamp, - expireTimestamp = None) + case Some(existingTopicResponse) => + existingTopicResponse.partitions.addAll(topic.partitions) + } + } + sendResponse(new TxnOffsetCommitResponse(txnOffsetCommitResponseData)) + } + } + } + } } } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 11e81fb55ae18..8da5a2a795249 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -497,7 +497,7 @@ class KafkaServer( metadataSupport = zkSupport, replicaManager = replicaManager, groupCoordinator = groupCoordinator, - newGroupCoordinator = new GroupCoordinatorAdapter(groupCoordinator), + newGroupCoordinator = new GroupCoordinatorAdapter(groupCoordinator, time), txnCoordinator = transactionCoordinator, autoTopicCreationManager = autoTopicCreationManager, brokerId = config.brokerId, diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala index 323547474c98d..6c7770ac11f65 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala @@ -16,16 +16,19 @@ */ package kafka.coordinator.group +import kafka.common.OffsetAndMetadata import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback, SyncGroupCallback} import kafka.server.RequestLocal -import org.apache.kafka.common.message.{DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, SyncGroupRequestData, SyncGroupResponseData} +import kafka.utils.MockTime +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.message.{DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData} import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember import org.apache.kafka.common.network.{ClientInformation, ListenerName} import org.apache.kafka.common.protocol.{ApiKeys, Errors} import org.apache.kafka.common.requests.{RequestContext, RequestHeader} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} -import org.apache.kafka.common.utils.BufferSupplier +import org.apache.kafka.common.utils.{BufferSupplier, Time} import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.api.Test @@ -34,6 +37,7 @@ import org.mockito.{ArgumentCaptor, ArgumentMatchers} import org.mockito.Mockito.{mock, verify, when} import java.net.InetAddress +import java.util.Optional import scala.jdk.CollectionConverters._ class GroupCoordinatorAdapterTest { @@ -58,7 +62,7 @@ class GroupCoordinatorAdapterTest { @ApiKeyVersionsSource(apiKey = ApiKeys.JOIN_GROUP) def testJoinGroup(version: Short): Unit = { val groupCoordinator = mock(classOf[GroupCoordinator]) - val adapter = new GroupCoordinatorAdapter(groupCoordinator) + val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) val ctx = makeContext(ApiKeys.JOIN_GROUP, version) val request = new JoinGroupRequestData() @@ -146,7 +150,7 @@ class GroupCoordinatorAdapterTest { @ApiKeyVersionsSource(apiKey = ApiKeys.SYNC_GROUP) def testSyncGroup(version: Short): Unit = { val groupCoordinator = mock(classOf[GroupCoordinator]) - val adapter = new GroupCoordinatorAdapter(groupCoordinator) + val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) val ctx = makeContext(ApiKeys.SYNC_GROUP, version) val data = new SyncGroupRequestData() @@ -213,7 +217,7 @@ class GroupCoordinatorAdapterTest { @Test def testHeartbeat(): Unit = { val groupCoordinator = mock(classOf[GroupCoordinator]) - val adapter = new GroupCoordinatorAdapter(groupCoordinator) + val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) val ctx = makeContext(ApiKeys.HEARTBEAT, ApiKeys.HEARTBEAT.latestVersion) val data = new HeartbeatRequestData() @@ -244,7 +248,7 @@ class GroupCoordinatorAdapterTest { def testLeaveGroup(): Unit = { val groupCoordinator = mock(classOf[GroupCoordinator]) - val adapter = new GroupCoordinatorAdapter(groupCoordinator) + val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) val ctx = makeContext(ApiKeys.LEAVE_GROUP, ApiKeys.LEAVE_GROUP.latestVersion) val data = new LeaveGroupRequestData() @@ -313,7 +317,7 @@ class GroupCoordinatorAdapterTest { expectedStatesFilter: Set[String] ): Unit = { val groupCoordinator = mock(classOf[GroupCoordinator]) - val adapter = new GroupCoordinatorAdapter(groupCoordinator) + val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) val ctx = makeContext(ApiKeys.LIST_GROUPS, ApiKeys.LIST_GROUPS.latestVersion) val data = new ListGroupsRequestData() @@ -348,7 +352,7 @@ class GroupCoordinatorAdapterTest { @Test def testDescribeGroup(): Unit = { val groupCoordinator = mock(classOf[GroupCoordinator]) - val adapter = new GroupCoordinatorAdapter(groupCoordinator) + val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) val groupId1 = "group-1" val groupId2 = "group-2" @@ -405,7 +409,7 @@ class GroupCoordinatorAdapterTest { @Test def testDeleteGroups(): Unit = { val groupCoordinator = mock(classOf[GroupCoordinator]) - val adapter = new GroupCoordinatorAdapter(groupCoordinator) + val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) val ctx = makeContext(ApiKeys.DELETE_GROUPS, ApiKeys.DELETE_GROUPS.latestVersion) val groupIds = List("group-1", "group-2", "group-3") @@ -436,4 +440,77 @@ class GroupCoordinatorAdapterTest { assertEquals(expectedResults, future.get()) } + + @Test + def testCommitTransactionalOffsets(): Unit = { + val groupCoordinator = mock(classOf[GroupCoordinator]) + val time = new MockTime() + val adapter = new GroupCoordinatorAdapter(groupCoordinator, time) + val now = time.milliseconds() + + val ctx = makeContext(ApiKeys.TXN_OFFSET_COMMIT, ApiKeys.TXN_OFFSET_COMMIT.latestVersion) + val data = new TxnOffsetCommitRequestData() + .setGroupId("group") + .setMemberId("member") + .setGenerationId(10) + .setProducerEpoch(1) + .setProducerId(2) + .setTransactionalId("transaction-id") + .setTopics(List( + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() + .setName("foo") + .setPartitions(List( + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100) + .setCommittedLeaderEpoch(1) + ).asJava) + ).asJava) + val bufferSupplier = BufferSupplier.create() + + val future = adapter.commitTransactionalOffsets(ctx, data, bufferSupplier) + assertFalse(future.isDone) + + val capturedCallback: ArgumentCaptor[Map[TopicPartition, Errors] => Unit] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, Errors] => Unit]) + + verify(groupCoordinator).handleTxnCommitOffsets( + ArgumentMatchers.eq(data.groupId), + ArgumentMatchers.eq(data.producerId), + ArgumentMatchers.eq(data.producerEpoch), + ArgumentMatchers.eq(data.memberId), + ArgumentMatchers.eq(None), + ArgumentMatchers.eq(data.generationId), + ArgumentMatchers.eq(Map( + new TopicPartition("foo", 0) -> new OffsetAndMetadata( + offset = 100, + leaderEpoch = Optional.of[Integer](1), + metadata = "", + commitTimestamp = now, + expireTimestamp = None + ) + )), + capturedCallback.capture(), + ArgumentMatchers.eq(RequestLocal(bufferSupplier)) + ) + + capturedCallback.getValue.apply(Map( + new TopicPartition("foo", 0) -> Errors.NONE + )) + + val expectedData = new TxnOffsetCommitResponseData() + .setTopics(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() + .setName("foo") + .setPartitions(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code) + ).asJava) + ).asJava + ) + + assertTrue(future.isDone) + assertEquals(expectedData, future.get()) + } } diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 4b039b5543f7e..af8d839bb37c8 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -1331,59 +1331,299 @@ class KafkaApisTest { } @Test - def shouldReplaceCoordinatorNotAvailableWithLoadInProcessInTxnOffsetCommitWithOlderClient(): Unit = { + def testHandleTxnOffsetCommitRequest(): Unit = { + addTopicToMetadataCache("foo", numPartitions = 1) + + val txnOffsetCommitRequest = new TxnOffsetCommitRequestData() + .setGroupId("group") + .setMemberId("member") + .setGenerationId(10) + .setProducerId(20) + .setProducerEpoch(30) + .setGroupInstanceId("instance-id") + .setTransactionalId("transactional-id") + .setTopics(List( + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() + .setName("foo") + .setPartitions(List( + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(10)).asJava)).asJava) + + val requestChannelRequest = buildRequest(new TxnOffsetCommitRequest.Builder(txnOffsetCommitRequest).build()) + + val future = new CompletableFuture[TxnOffsetCommitResponseData]() + when(newGroupCoordinator.commitTransactionalOffsets( + requestChannelRequest.context, + txnOffsetCommitRequest, + RequestLocal.NoCaching.bufferSupplier + )).thenReturn(future) + + createKafkaApis().handle( + requestChannelRequest, + RequestLocal.NoCaching + ) + + // This is the response returned by the group coordinator. + val txnOffsetCommitResponse = new TxnOffsetCommitResponseData() + .setTopics(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() + .setName("foo") + .setPartitions(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code)).asJava)).asJava) + + future.complete(txnOffsetCommitResponse) + val response = verifyNoThrottling[TxnOffsetCommitResponse](requestChannelRequest) + assertEquals(txnOffsetCommitResponse, response.data) + } + + @Test + def testHandleTxnOffsetCommitRequestFutureFailed(): Unit = { + addTopicToMetadataCache("foo", numPartitions = 1) + + val txnOffsetCommitRequest = new TxnOffsetCommitRequestData() + .setGroupId("group") + .setMemberId("member") + .setTopics(List( + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() + .setName("foo") + .setPartitions(List( + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(10)).asJava)).asJava) + + val requestChannelRequest = buildRequest(new TxnOffsetCommitRequest.Builder(txnOffsetCommitRequest).build()) + + val future = new CompletableFuture[TxnOffsetCommitResponseData]() + when(newGroupCoordinator.commitTransactionalOffsets( + requestChannelRequest.context, + txnOffsetCommitRequest, + RequestLocal.NoCaching.bufferSupplier + )).thenReturn(future) + + createKafkaApis().handle( + requestChannelRequest, + RequestLocal.NoCaching + ) + + val expectedTxnOffsetCommitResponse = new TxnOffsetCommitResponseData() + .setTopics(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() + .setName("foo") + .setPartitions(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NOT_COORDINATOR.code)).asJava)).asJava) + + future.completeExceptionally(Errors.NOT_COORDINATOR.exception) + val response = verifyNoThrottling[TxnOffsetCommitResponse](requestChannelRequest) + assertEquals(expectedTxnOffsetCommitResponse, response.data) + } + + @Test + def testHandleTxnOffsetCommitRequestTopicsAndPartitionsValidation(): Unit = { + addTopicToMetadataCache("foo", numPartitions = 2) + addTopicToMetadataCache("bar", numPartitions = 2) + + val txnOffsetCommitRequest = new TxnOffsetCommitRequestData() + .setGroupId("group") + .setMemberId("member") + .setTopics(List( + // foo exists but only has 2 partitions. + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() + .setName("foo") + .setPartitions(List( + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(10), + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(20), + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() + .setPartitionIndex(2) + .setCommittedOffset(30)).asJava), + // bar exists. + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() + .setName("bar") + .setPartitions(List( + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(40), + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(50)).asJava), + // zar does not exist. + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() + .setName("zar") + .setPartitions(List( + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(60), + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(70)).asJava)).asJava) + + val requestChannelRequest = buildRequest(new TxnOffsetCommitRequest.Builder(txnOffsetCommitRequest).build()) + + // This is the request expected by the group coordinator. + val expectedTnxOffsetCommitRequest = new TxnOffsetCommitRequestData() + .setGroupId("group") + .setMemberId("member") + .setTopics(List( + // foo exists but only has 2 partitions. + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() + .setName("foo") + .setPartitions(List( + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(10), + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(20)).asJava), + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() + .setName("bar") + .setPartitions(List( + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(40), + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(50)).asJava)).asJava) + + val future = new CompletableFuture[TxnOffsetCommitResponseData]() + when(newGroupCoordinator.commitTransactionalOffsets( + requestChannelRequest.context, + expectedTnxOffsetCommitRequest, + RequestLocal.NoCaching.bufferSupplier + )).thenReturn(future) + + createKafkaApis().handle( + requestChannelRequest, + RequestLocal.NoCaching + ) + + // This is the response returned by the group coordinator. + val txnOffsetCommitResponse = new TxnOffsetCommitResponseData() + .setTopics(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() + .setName("foo") + .setPartitions(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code), + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.NONE.code)).asJava), + new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() + .setName("bar") + .setPartitions(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code), + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.NONE.code)).asJava)).asJava) + + val expectedTxnOffsetCommitResponse = new TxnOffsetCommitResponseData() + .setTopics(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() + .setName("foo") + .setPartitions(List( + // foo-2 is first because partitions failing the validation + // are put in the response first. + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(2) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code), + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code), + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.NONE.code)).asJava), + // zar is before bar because topics failing the validation are + // put in the response first. + new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() + .setName("zar") + .setPartitions(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code), + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)).asJava), + new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() + .setName("bar") + .setPartitions(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code), + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.NONE.code)).asJava)).asJava) + + future.complete(txnOffsetCommitResponse) + val response = verifyNoThrottling[TxnOffsetCommitResponse](requestChannelRequest) + assertEquals(expectedTxnOffsetCommitResponse, response.data) + } + + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT) + def shouldReplaceCoordinatorNotAvailableWithLoadInProcessInTxnOffsetCommitWithOlderClient(version: Short): Unit = { val topic = "topic" addTopicToMetadataCache(topic, numPartitions = 2) - for (version <- ApiKeys.TXN_OFFSET_COMMIT.oldestVersion to ApiKeys.TXN_OFFSET_COMMIT.latestVersion) { - reset(replicaManager, clientRequestQuotaManager, requestChannel, groupCoordinator) + val topicPartition = new TopicPartition(topic, 1) + val capturedResponse: ArgumentCaptor[TxnOffsetCommitResponse] = ArgumentCaptor.forClass(classOf[TxnOffsetCommitResponse]) - val topicPartition = new TopicPartition(topic, 1) - val capturedResponse: ArgumentCaptor[TxnOffsetCommitResponse] = ArgumentCaptor.forClass(classOf[TxnOffsetCommitResponse]) - val responseCallback: ArgumentCaptor[Map[TopicPartition, Errors] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, Errors] => Unit]) + val partitionOffsetCommitData = new TxnOffsetCommitRequest.CommittedOffset(15L, "", Optional.empty()) + val groupId = "groupId" - val partitionOffsetCommitData = new TxnOffsetCommitRequest.CommittedOffset(15L, "", Optional.empty()) - val groupId = "groupId" + val producerId = 15L + val epoch = 0.toShort - val producerId = 15L - val epoch = 0.toShort + val offsetCommitRequest = new TxnOffsetCommitRequest.Builder( + "txnId", + groupId, + producerId, + epoch, + Map(topicPartition -> partitionOffsetCommitData).asJava, + ).build(version) + val request = buildRequest(offsetCommitRequest) - val offsetCommitRequest = new TxnOffsetCommitRequest.Builder( - "txnId", - groupId, - producerId, - epoch, - Map(topicPartition -> partitionOffsetCommitData).asJava, - ).build(version.toShort) - val request = buildRequest(offsetCommitRequest) + val requestLocal = RequestLocal.withThreadConfinedCaching + val future = new CompletableFuture[TxnOffsetCommitResponseData]() + when(newGroupCoordinator.commitTransactionalOffsets( + request.context, + offsetCommitRequest.data, + requestLocal.bufferSupplier + )).thenReturn(future) - val requestLocal = RequestLocal.withThreadConfinedCaching - when(groupCoordinator.handleTxnCommitOffsets( - ArgumentMatchers.eq(groupId), - ArgumentMatchers.eq(producerId), - ArgumentMatchers.eq(epoch), - anyString, - ArgumentMatchers.eq(Option.empty), - anyInt, - any(), - responseCallback.capture(), - ArgumentMatchers.eq(requestLocal) - )).thenAnswer(_ => responseCallback.getValue.apply(Map(topicPartition -> Errors.COORDINATOR_LOAD_IN_PROGRESS))) + future.complete(new TxnOffsetCommitResponseData() + .setTopics(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() + .setName(topicPartition.topic) + .setPartitions(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(topicPartition.partition) + .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code) + ).asJava) + ).asJava)) - createKafkaApis().handleTxnOffsetCommitRequest(request, requestLocal) + createKafkaApis().handleTxnOffsetCommitRequest(request, requestLocal) - verify(requestChannel).sendResponse( - ArgumentMatchers.eq(request), - capturedResponse.capture(), - ArgumentMatchers.eq(None) - ) - val response = capturedResponse.getValue + verify(requestChannel).sendResponse( + ArgumentMatchers.eq(request), + capturedResponse.capture(), + ArgumentMatchers.eq(None) + ) + val response = capturedResponse.getValue - if (version < 2) { - assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, response.errors().get(topicPartition)) - } else { - assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, response.errors().get(topicPartition)) - } + if (version < 2) { + assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, response.errors().get(topicPartition)) + } else { + assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, response.errors().get(topicPartition)) } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java index 1faa8a07fac9a..5508a400fdb1c 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java @@ -28,6 +28,8 @@ import org.apache.kafka.common.message.ListGroupsResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.message.TxnOffsetCommitRequestData; +import org.apache.kafka.common.message.TxnOffsetCommitResponseData; import org.apache.kafka.common.requests.RequestContext; import org.apache.kafka.common.utils.BufferSupplier; @@ -132,5 +134,20 @@ CompletableFuture delet List groupIds, BufferSupplier bufferSupplier ); + + /** + * Commit transactional offsets for a given Group. + * + * @param context The request context. + * @param request The TnxOffsetCommitRequest data. + * @param bufferSupplier The buffer supplier tight to the request thread. + * + * @return A future yielding the response or an exception. + */ + CompletableFuture commitTransactionalOffsets( + RequestContext context, + TxnOffsetCommitRequestData request, + BufferSupplier bufferSupplier + ); } From a8d54231632355493b9d6e24431227199b44daea Mon Sep 17 00:00:00 2001 From: David Jacot Date: Mon, 9 Jan 2023 17:07:25 +0100 Subject: [PATCH 2/4] refactor --- .../requests/TxnOffsetCommitResponse.java | 78 ++++++++++++++ .../main/scala/kafka/server/KafkaApis.scala | 101 ++++-------------- 2 files changed, 98 insertions(+), 81 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java index 18244fcb17c33..39b8ab5145eab 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java @@ -27,7 +27,9 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.function.Function; /** * Possible error codes: @@ -47,6 +49,82 @@ */ public class TxnOffsetCommitResponse extends AbstractResponse { + public static class Builder { + TxnOffsetCommitResponseData data = new TxnOffsetCommitResponseData(); + HashMap byTopicName = new HashMap<>(); + + private TxnOffsetCommitResponseTopic getOrCreateTopic( + String topicName + ) { + TxnOffsetCommitResponseTopic topic = byTopicName.get(topicName); + if (topic == null) { + topic = new TxnOffsetCommitResponseTopic().setName(topicName); + data.topics().add(topic); + byTopicName.put(topicName, topic); + } + return topic; + } + + public Builder addPartition( + String topicName, + int partitionIndex, + Errors error + ) { + final TxnOffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName); + + topicResponse.partitions().add(new TxnOffsetCommitResponsePartition() + .setPartitionIndex(partitionIndex) + .setErrorCode(error.code())); + + return this; + } + + public

Builder addPartitions( + String topicName, + List

partitions, + Function partitionIndex, + Errors error + ) { + final TxnOffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName); + + partitions.forEach(partition -> { + topicResponse.partitions().add(new TxnOffsetCommitResponsePartition() + .setPartitionIndex(partitionIndex.apply(partition)) + .setErrorCode(error.code())); + }); + + return this; + } + + public Builder merge( + TxnOffsetCommitResponseData newData + ) { + if (data.topics().isEmpty()) { + // If the current data is empty, we can discard it and use the new data. + data = newData; + } else { + // Otherwise, we have to merge them together. + newData.topics().forEach(newTopic -> { + TxnOffsetCommitResponseTopic existingTopic = byTopicName.get(newTopic.name()); + if (existingTopic == null) { + // If no topic exists, we can directly copy the new topic data. + data.topics().add(newTopic); + byTopicName.put(newTopic.name(), newTopic); + } else { + // Otherwise, we add the partitions to the existing one. + existingTopic.partitions().addAll(newTopic.partitions()); + } + }); + } + + return this; + } + + public TxnOffsetCommitResponse build() { + return new TxnOffsetCommitResponse(data); + } + } + private final TxnOffsetCommitResponseData data; public TxnOffsetCommitResponse(TxnOffsetCommitResponseData data) { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 8786a5cd5f018..1788c96a1730c 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -2472,20 +2472,6 @@ class KafkaApis(val requestChannel: RequestChannel, val txnOffsetCommitRequest = request.body[TxnOffsetCommitRequest] def sendResponse(response: TxnOffsetCommitResponse): Unit = { - trace(s"Sending tnx offset commit response $response for correlation id ${request.header.correlationId} to " + - s"client ${request.header.clientId}.") - - if (isDebugEnabled) { - response.data.topics.forEach { topic => - topic.partitions.forEach { partition => - if (partition.errorCode != Errors.NONE.code) { - debug(s"TxnOffsetCommit with correlation id ${request.header.correlationId} from client ${request.header.clientId} " + - s"on partition ${topic.name}-${partition.partitionIndex} failed due to ${Errors.forCode(partition.errorCode)}") - } - } - } - } - // We need to replace COORDINATOR_LOAD_IN_PROGRESS with COORDINATOR_NOT_AVAILABLE // for older producer client from 0.11 to prior 2.0, which could potentially crash due // to unexpected loading error. This bug is fixed later by KAFKA-7296. Clients using @@ -2501,10 +2487,7 @@ class KafkaApis(val requestChannel: RequestChannel, } } - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { - response.maybeSetThrottleTimeMs(requestThrottleMs) - response - }) + requestHelper.sendMaybeThrottle(request, response) } // authorize for the transactionalId and the consumer group. Note that we skip producerId authorization @@ -2516,29 +2499,6 @@ class KafkaApis(val requestChannel: RequestChannel, sendResponse(txnOffsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) CompletableFuture.completedFuture[Unit](()) } else { - val txnOffsetCommitResponseData = new TxnOffsetCommitResponseData() - val topicsPendingPartitions = new mutable.HashMap[String, TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic]() - - def makePartitionResponse( - partitionIndex: Int, - error: Errors - ): TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition = { - new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() - .setPartitionIndex(partitionIndex) - .setErrorCode(error.code) - } - - def addTopicToResponse( - topic: TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic, - error: Errors - ): Unit = { - val topicResponse = new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic().setName(topic.name) - txnOffsetCommitResponseData.topics.add(topicResponse) - topic.partitions.forEach { partition => - topicResponse.partitions.add(makePartitionResponse(partition.partitionIndex, error)) - } - } - val authorizedTopics = authHelper.filterByAuthorized( request.context, READ, @@ -2546,47 +2506,39 @@ class KafkaApis(val requestChannel: RequestChannel, txnOffsetCommitRequest.data.topics.asScala )(_.name) - val authorizedTopicCommittedOffsets = new util.ArrayList[TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic]() + val responseBuilder = new TxnOffsetCommitResponse.Builder() + val authorizedTopicCommittedOffsets = new mutable.ArrayBuffer[TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic]() txnOffsetCommitRequest.data.topics.forEach { topic => if (!authorizedTopics.contains(topic.name)) { - addTopicToResponse(topic, Errors.TOPIC_AUTHORIZATION_FAILED) + // If the topic is not authorized, we add the topic and all its partitions + // to the response with TOPIC_AUTHORIZATION_FAILED. + responseBuilder.addPartitions[TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition]( + topic.name, topic.partitions, _.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED) } else if (!metadataCache.contains(topic.name)) { - addTopicToResponse(topic, Errors.UNKNOWN_TOPIC_OR_PARTITION) + // If the topic is unknown, we add the topic and all its partitions + // to the response with UNKNOWN_TOPIC_OR_PARTITION. + responseBuilder.addPartitions[TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition]( + topic.name, topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION) } else { - var topicRequestWithValidPartitions: TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic = null - var topicResponseWithInvalidPartitions: TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic = null + // Otherwise, we check all partitions to ensure that they all exist. + val topicWithValidPartitions = new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic().setName(topic.name) topic.partitions.forEach { partition => if (metadataCache.getPartitionInfo(topic.name, partition.partitionIndex).nonEmpty) { - if (topicRequestWithValidPartitions == null) { - topicRequestWithValidPartitions = new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic().setName(topic.name) - } - topicRequestWithValidPartitions.partitions.add(partition) + topicWithValidPartitions.partitions.add(partition) } else { - if (topicResponseWithInvalidPartitions == null) { - topicResponseWithInvalidPartitions = new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic().setName(topic.name) - } - topicResponseWithInvalidPartitions.partitions.add(makePartitionResponse(partition.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION)) + responseBuilder.addPartition(topic.name, partition.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION) } } - if (topicRequestWithValidPartitions != null) { - authorizedTopicCommittedOffsets.add(topicRequestWithValidPartitions) - } - - if (topicResponseWithInvalidPartitions != null) { - txnOffsetCommitResponseData.topics.add(topicResponseWithInvalidPartitions) - // We keep track of topics with both known and unknown partitions such - // that we can merge the response from the coordinator into it later on. - if (topicRequestWithValidPartitions != null) { - topicsPendingPartitions += topic.name -> topicResponseWithInvalidPartitions - } + if (!topicWithValidPartitions.partitions.isEmpty) { + authorizedTopicCommittedOffsets += topicWithValidPartitions } } } if (authorizedTopicCommittedOffsets.isEmpty) { - sendResponse(new TxnOffsetCommitResponse(txnOffsetCommitResponseData)) + sendResponse(responseBuilder.build()) CompletableFuture.completedFuture[Unit](()) } else { val txnOffsetCommitRequestData = new TxnOffsetCommitRequestData() @@ -2597,7 +2549,7 @@ class KafkaApis(val requestChannel: RequestChannel, .setProducerEpoch(txnOffsetCommitRequest.data.producerEpoch) .setProducerId(txnOffsetCommitRequest.data.producerId) .setTransactionalId(txnOffsetCommitRequest.data.transactionalId) - .setTopics(authorizedTopicCommittedOffsets) + .setTopics(authorizedTopicCommittedOffsets.asJava) newGroupCoordinator.commitTransactionalOffsets( request.context, @@ -2607,20 +2559,7 @@ class KafkaApis(val requestChannel: RequestChannel, if (exception != null) { sendResponse(txnOffsetCommitRequest.getErrorResponse(exception)) } else { - if (txnOffsetCommitResponseData.topics.isEmpty) { - sendResponse(new TxnOffsetCommitResponse(response)) - } else { - response.topics.forEach { topic => - topicsPendingPartitions.get(topic.name) match { - case None => - txnOffsetCommitResponseData.topics.add(topic) - - case Some(existingTopicResponse) => - existingTopicResponse.partitions.addAll(topic.partitions) - } - } - sendResponse(new TxnOffsetCommitResponse(txnOffsetCommitResponseData)) - } + sendResponse(responseBuilder.merge(response).build()) } } } From 6d17c95c0c61a4c2b5eaaec9a7b4d485e061835f Mon Sep 17 00:00:00 2001 From: David Jacot Date: Thu, 12 Jan 2023 16:24:18 +0100 Subject: [PATCH 3/4] fixup --- .../kafka/coordinator/group/GroupCoordinatorAdapterTest.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala index 4ad0637a4c277..74aac6a2899ee 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala @@ -677,8 +677,7 @@ class GroupCoordinatorAdapterTest { .setPartitionIndex(0) .setErrorCode(Errors.NONE.code) ).asJava) - ).asJava - ) + ).asJava) assertTrue(future.isDone) assertEquals(expectedData, future.get()) From 0d14658f8e742392b278a5c29bfad45394fc3ace Mon Sep 17 00:00:00 2001 From: David Jacot Date: Thu, 12 Jan 2023 18:44:24 +0100 Subject: [PATCH 4/4] refactor --- .../requests/TxnOffsetCommitResponse.java | 4 +- .../group/GroupCoordinatorAdapter.scala | 86 +++++++++++-------- 2 files changed, 54 insertions(+), 36 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java index 39b8ab5145eab..c47b36a2e0b3a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java @@ -111,7 +111,9 @@ public Builder merge( data.topics().add(newTopic); byTopicName.put(newTopic.name(), newTopic); } else { - // Otherwise, we add the partitions to the existing one. + // Otherwise, we add the partitions to the existing one. Note we + // expect non-overlapping partitions here as we don't verify + // if the partition is already in the list before adding it. existingTopic.partitions().addAll(newTopic.partitions()); } }); diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala index 0130ffcf9eb57..2e5059bf62d2b 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala @@ -360,21 +360,13 @@ class GroupCoordinatorAdapter( request.topics.forEach { topic => topic.partitions.forEach { partition => val tp = new TopicPartition(topic.name, partition.partitionIndex) - partitions += tp -> new OffsetAndMetadata( - offset = partition.committedOffset, - leaderEpoch = partition.committedLeaderEpoch match { - case RecordBatch.NO_PARTITION_LEADER_EPOCH => Optional.empty[Integer] - case committedLeaderEpoch => Optional.of[Integer](committedLeaderEpoch) - }, - metadata = partition.committedMetadata match { - case null => OffsetAndMetadata.NoMetadata - case metadata => metadata - }, - commitTimestamp = partition.commitTimestamp match { - case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimeMs - case customTimestamp => customTimestamp - }, - expireTimestamp = expireTimeMs + partitions += tp -> createOffsetAndMetadata( + currentTimeMs, + partition.committedOffset, + partition.committedLeaderEpoch, + partition.committedMetadata, + partition.commitTimestamp, + expireTimeMs ) } } @@ -397,19 +389,24 @@ class GroupCoordinatorAdapter( request: TxnOffsetCommitRequestData, bufferSupplier: BufferSupplier ): CompletableFuture[TxnOffsetCommitResponseData] = { + val currentTimeMs = time.milliseconds val future = new CompletableFuture[TxnOffsetCommitResponseData]() def callback(results: Map[TopicPartition, Errors]): Unit = { val response = new TxnOffsetCommitResponseData() - val byTopics = new util.HashMap[String, TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic]() + val byTopics = new mutable.HashMap[String, TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic]() results.forKeyValue { (tp, error) => - var topic = byTopics.get(tp.topic) - if (topic == null) { - topic = new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic().setName(tp.topic) - byTopics.put(tp.topic, topic) - response.topics.add(topic) + val topic = byTopics.get(tp.topic) match { + case Some(existingTopic) => + existingTopic + case None => + val newTopic = new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic().setName(tp.topic) + byTopics += tp.topic -> newTopic + response.topics.add(newTopic) + newTopic } + topic.partitions.add(new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() .setPartitionIndex(tp.partition) .setErrorCode(error.code)) @@ -418,24 +415,17 @@ class GroupCoordinatorAdapter( future.complete(response) } - val currentTimestamp = time.milliseconds val partitions = new mutable.HashMap[TopicPartition, OffsetAndMetadata]() - request.topics.forEach { topic => topic.partitions.forEach { partition => val tp = new TopicPartition(topic.name, partition.partitionIndex) - partitions += tp -> new OffsetAndMetadata( - offset = partition.committedOffset, - leaderEpoch = partition.committedLeaderEpoch match { - case RecordBatch.NO_PARTITION_LEADER_EPOCH => Optional.empty[Integer] - case committedLeaderEpoch => Optional.of[Integer](committedLeaderEpoch) - }, - metadata = partition.committedMetadata match { - case null => OffsetAndMetadata.NoMetadata - case metadata => metadata - }, - commitTimestamp = currentTimestamp, - expireTimestamp = None + partitions += tp -> createOffsetAndMetadata( + currentTimeMs, + partition.committedOffset, + partition.committedLeaderEpoch, + partition.committedMetadata, + OffsetCommitRequest.DEFAULT_TIMESTAMP, // means that currentTimeMs is used. + None ) } } @@ -454,4 +444,30 @@ class GroupCoordinatorAdapter( future } + + private def createOffsetAndMetadata( + currentTimeMs: Long, + offset: Long, + leaderEpoch: Int, + metadata: String, + commitTimestamp: Long, + expireTimestamp: Option[Long] + ): OffsetAndMetadata = { + new OffsetAndMetadata( + offset = offset, + leaderEpoch = leaderEpoch match { + case RecordBatch.NO_PARTITION_LEADER_EPOCH => Optional.empty[Integer] + case committedLeaderEpoch => Optional.of[Integer](committedLeaderEpoch) + }, + metadata = metadata match { + case null => OffsetAndMetadata.NoMetadata + case metadata => metadata + }, + commitTimestamp = commitTimestamp match { + case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimeMs + case customTimestamp => customTimestamp + }, + expireTimestamp = expireTimestamp + ) + } }