diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java index 27b13ba220251..8c3ec12186dc4 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitRequest.java @@ -82,6 +82,11 @@ public Builder(final String transactionalId, .setGroupInstanceId(groupInstanceId.orElse(null)); } + public Builder(final TxnOffsetCommitRequestData data) { + super(ApiKeys.TXN_OFFSET_COMMIT); + this.data = data; + } + @Override public TxnOffsetCommitRequest build(short version) { if (version < 3 && groupMetadataSet()) { @@ -179,6 +184,11 @@ public TxnOffsetCommitResponse getErrorResponse(int throttleTimeMs, Throwable e) .setTopics(responseTopicData)); } + @Override + public TxnOffsetCommitResponse getErrorResponse(Throwable e) { + return getErrorResponse(AbstractResponse.DEFAULT_THROTTLE_TIME, e); + } + public static TxnOffsetCommitRequest parse(ByteBuffer buffer, short version) { return new TxnOffsetCommitRequest(new TxnOffsetCommitRequestData( new ByteBufferAccessor(buffer), version), version); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java index 18244fcb17c33..c47b36a2e0b3a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/TxnOffsetCommitResponse.java @@ -27,7 +27,9 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.function.Function; /** * Possible error codes: @@ -47,6 +49,84 @@ */ public class TxnOffsetCommitResponse extends AbstractResponse { + public static class Builder { + TxnOffsetCommitResponseData data = new TxnOffsetCommitResponseData(); + HashMap byTopicName = new HashMap<>(); + + private TxnOffsetCommitResponseTopic getOrCreateTopic( + String topicName + ) { + TxnOffsetCommitResponseTopic topic = byTopicName.get(topicName); + if (topic == null) { + topic = new TxnOffsetCommitResponseTopic().setName(topicName); + data.topics().add(topic); + byTopicName.put(topicName, topic); + } + return topic; + } + + public Builder addPartition( + String topicName, + int partitionIndex, + Errors error + ) { + final TxnOffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName); + + topicResponse.partitions().add(new TxnOffsetCommitResponsePartition() + .setPartitionIndex(partitionIndex) + .setErrorCode(error.code())); + + return this; + } + + public

Builder addPartitions( + String topicName, + List

partitions, + Function partitionIndex, + Errors error + ) { + final TxnOffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName); + + partitions.forEach(partition -> { + topicResponse.partitions().add(new TxnOffsetCommitResponsePartition() + .setPartitionIndex(partitionIndex.apply(partition)) + .setErrorCode(error.code())); + }); + + return this; + } + + public Builder merge( + TxnOffsetCommitResponseData newData + ) { + if (data.topics().isEmpty()) { + // If the current data is empty, we can discard it and use the new data. + data = newData; + } else { + // Otherwise, we have to merge them together. + newData.topics().forEach(newTopic -> { + TxnOffsetCommitResponseTopic existingTopic = byTopicName.get(newTopic.name()); + if (existingTopic == null) { + // If no topic exists, we can directly copy the new topic data. + data.topics().add(newTopic); + byTopicName.put(newTopic.name(), newTopic); + } else { + // Otherwise, we add the partitions to the existing one. Note we + // expect non-overlapping partitions here as we don't verify + // if the partition is already in the list before adding it. + existingTopic.partitions().addAll(newTopic.partitions()); + } + }); + } + + return this; + } + + public TxnOffsetCommitResponse build() { + return new TxnOffsetCommitResponse(data); + } + } + private final TxnOffsetCommitResponseData data; public TxnOffsetCommitResponse(TxnOffsetCommitResponseData data) { diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala index 1a97079ce3923..2e5059bf62d2b 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala @@ -20,7 +20,7 @@ import kafka.common.OffsetAndMetadata import kafka.server.RequestLocal import kafka.utils.Implicits.MapExtensionMethods import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.message.{DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetFetchRequestData, OffsetFetchResponseData, SyncGroupRequestData, SyncGroupResponseData} +import org.apache.kafka.common.message.{DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetFetchRequestData, OffsetFetchResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData} import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.RecordBatch import org.apache.kafka.common.requests.{OffsetCommitRequest, RequestContext} @@ -360,21 +360,13 @@ class GroupCoordinatorAdapter( request.topics.forEach { topic => topic.partitions.forEach { partition => val tp = new TopicPartition(topic.name, partition.partitionIndex) - partitions += tp -> new OffsetAndMetadata( - offset = partition.committedOffset, - leaderEpoch = partition.committedLeaderEpoch match { - case RecordBatch.NO_PARTITION_LEADER_EPOCH => Optional.empty[Integer] - case committedLeaderEpoch => Optional.of[Integer](committedLeaderEpoch) - }, - metadata = partition.committedMetadata match { - case null => OffsetAndMetadata.NoMetadata - case metadata => metadata - }, - commitTimestamp = partition.commitTimestamp match { - case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimeMs - case customTimestamp => customTimestamp - }, - expireTimestamp = expireTimeMs + partitions += tp -> createOffsetAndMetadata( + currentTimeMs, + partition.committedOffset, + partition.committedLeaderEpoch, + partition.committedMetadata, + partition.commitTimestamp, + expireTimeMs ) } } @@ -391,4 +383,91 @@ class GroupCoordinatorAdapter( future } + + override def commitTransactionalOffsets( + context: RequestContext, + request: TxnOffsetCommitRequestData, + bufferSupplier: BufferSupplier + ): CompletableFuture[TxnOffsetCommitResponseData] = { + val currentTimeMs = time.milliseconds + val future = new CompletableFuture[TxnOffsetCommitResponseData]() + + def callback(results: Map[TopicPartition, Errors]): Unit = { + val response = new TxnOffsetCommitResponseData() + val byTopics = new mutable.HashMap[String, TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic]() + + results.forKeyValue { (tp, error) => + val topic = byTopics.get(tp.topic) match { + case Some(existingTopic) => + existingTopic + case None => + val newTopic = new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic().setName(tp.topic) + byTopics += tp.topic -> newTopic + response.topics.add(newTopic) + newTopic + } + + topic.partitions.add(new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(tp.partition) + .setErrorCode(error.code)) + } + + future.complete(response) + } + + val partitions = new mutable.HashMap[TopicPartition, OffsetAndMetadata]() + request.topics.forEach { topic => + topic.partitions.forEach { partition => + val tp = new TopicPartition(topic.name, partition.partitionIndex) + partitions += tp -> createOffsetAndMetadata( + currentTimeMs, + partition.committedOffset, + partition.committedLeaderEpoch, + partition.committedMetadata, + OffsetCommitRequest.DEFAULT_TIMESTAMP, // means that currentTimeMs is used. + None + ) + } + } + + coordinator.handleTxnCommitOffsets( + request.groupId, + request.producerId, + request.producerEpoch, + request.memberId, + Option(request.groupInstanceId), + request.generationId, + partitions.toMap, + callback, + RequestLocal(bufferSupplier) + ) + + future + } + + private def createOffsetAndMetadata( + currentTimeMs: Long, + offset: Long, + leaderEpoch: Int, + metadata: String, + commitTimestamp: Long, + expireTimestamp: Option[Long] + ): OffsetAndMetadata = { + new OffsetAndMetadata( + offset = offset, + leaderEpoch = leaderEpoch match { + case RecordBatch.NO_PARTITION_LEADER_EPOCH => Optional.empty[Integer] + case committedLeaderEpoch => Optional.of[Integer](committedLeaderEpoch) + }, + metadata = metadata match { + case null => OffsetAndMetadata.NoMetadata + case metadata => metadata + }, + commitTimestamp = commitTimestamp match { + case OffsetCommitRequest.DEFAULT_TIMESTAMP => currentTimeMs + case customTimestamp => customTimestamp + }, + expireTimestamp = expireTimestamp + ) + } } diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 42768fa98f9fb..fbdf86f330355 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -19,7 +19,6 @@ package kafka.server import kafka.admin.AdminUtils import kafka.api.ElectLeadersRequestOps -import kafka.common.OffsetAndMetadata import kafka.controller.ReplicaAssignment import kafka.coordinator.group._ import kafka.coordinator.transaction.{InitProducerIdResult, TransactionCoordinator} @@ -80,7 +79,7 @@ import java.util.concurrent.atomic.AtomicInteger import java.util.{Collections, Optional} import scala.annotation.nowarn import scala.collection.mutable.ArrayBuffer -import scala.collection.{Map, Seq, Set, immutable, mutable} +import scala.collection.{Map, Seq, Set, mutable} import scala.jdk.CollectionConverters._ import scala.util.{Failure, Success, Try} @@ -207,7 +206,7 @@ class KafkaApis(val requestChannel: RequestChannel, case ApiKeys.ADD_OFFSETS_TO_TXN => handleAddOffsetsToTxnRequest(request, requestLocal) case ApiKeys.END_TXN => handleEndTxnRequest(request, requestLocal) case ApiKeys.WRITE_TXN_MARKERS => handleWriteTxnMarkersRequest(request, requestLocal) - case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request, requestLocal) + case ApiKeys.TXN_OFFSET_COMMIT => handleTxnOffsetCommitRequest(request, requestLocal).exceptionally(handleError) case ApiKeys.DESCRIBE_ACLS => handleDescribeAcls(request) case ApiKeys.CREATE_ACLS => maybeForwardToController(request, handleCreateAcls) case ApiKeys.DELETE_ACLS => maybeForwardToController(request, handleDeleteAcls) @@ -2500,90 +2499,108 @@ class KafkaApis(val requestChannel: RequestChannel, } } - def handleTxnOffsetCommitRequest(request: RequestChannel.Request, requestLocal: RequestLocal): Unit = { + def handleTxnOffsetCommitRequest( + request: RequestChannel.Request, + requestLocal: RequestLocal + ): CompletableFuture[Unit] = { ensureInterBrokerVersion(IBP_0_11_0_IV0) - val header = request.header val txnOffsetCommitRequest = request.body[TxnOffsetCommitRequest] + def sendResponse(response: TxnOffsetCommitResponse): Unit = { + // We need to replace COORDINATOR_LOAD_IN_PROGRESS with COORDINATOR_NOT_AVAILABLE + // for older producer client from 0.11 to prior 2.0, which could potentially crash due + // to unexpected loading error. This bug is fixed later by KAFKA-7296. Clients using + // txn commit protocol >= 2 (version 2.3 and onwards) are guaranteed to have + // the fix to check for the loading error. + if (txnOffsetCommitRequest.version < 2) { + response.data.topics.forEach { topic => + topic.partitions.forEach { partition => + if (partition.errorCode == Errors.COORDINATOR_LOAD_IN_PROGRESS.code) { + partition.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code) + } + } + } + } + + requestHelper.sendMaybeThrottle(request, response) + } + // authorize for the transactionalId and the consumer group. Note that we skip producerId authorization // since it is implied by transactionalId authorization - if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, txnOffsetCommitRequest.data.transactionalId)) - requestHelper.sendErrorResponseMaybeThrottle(request, Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception) - else if (!authHelper.authorize(request.context, READ, GROUP, txnOffsetCommitRequest.data.groupId)) - requestHelper.sendErrorResponseMaybeThrottle(request, Errors.GROUP_AUTHORIZATION_FAILED.exception) - else { - val unauthorizedTopicErrors = mutable.Map[TopicPartition, Errors]() - val nonExistingTopicErrors = mutable.Map[TopicPartition, Errors]() - val authorizedTopicCommittedOffsets = mutable.Map[TopicPartition, TxnOffsetCommitRequest.CommittedOffset]() - val committedOffsets = txnOffsetCommitRequest.offsets.asScala - val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, committedOffsets)(_._1.topic) + if (!authHelper.authorize(request.context, WRITE, TRANSACTIONAL_ID, txnOffsetCommitRequest.data.transactionalId)) { + sendResponse(txnOffsetCommitRequest.getErrorResponse(Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED.exception)) + CompletableFuture.completedFuture[Unit](()) + } else if (!authHelper.authorize(request.context, READ, GROUP, txnOffsetCommitRequest.data.groupId)) { + sendResponse(txnOffsetCommitRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception)) + CompletableFuture.completedFuture[Unit](()) + } else { + val authorizedTopics = authHelper.filterByAuthorized( + request.context, + READ, + TOPIC, + txnOffsetCommitRequest.data.topics.asScala + )(_.name) - for ((topicPartition, commitedOffset) <- committedOffsets) { - if (!authorizedTopics.contains(topicPartition.topic)) - unauthorizedTopicErrors += topicPartition -> Errors.TOPIC_AUTHORIZATION_FAILED - else if (!metadataCache.contains(topicPartition)) - nonExistingTopicErrors += topicPartition -> Errors.UNKNOWN_TOPIC_OR_PARTITION - else - authorizedTopicCommittedOffsets += (topicPartition -> commitedOffset) - } + val responseBuilder = new TxnOffsetCommitResponse.Builder() + val authorizedTopicCommittedOffsets = new mutable.ArrayBuffer[TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic]() + txnOffsetCommitRequest.data.topics.forEach { topic => + if (!authorizedTopics.contains(topic.name)) { + // If the topic is not authorized, we add the topic and all its partitions + // to the response with TOPIC_AUTHORIZATION_FAILED. + responseBuilder.addPartitions[TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition]( + topic.name, topic.partitions, _.partitionIndex, Errors.TOPIC_AUTHORIZATION_FAILED) + } else if (!metadataCache.contains(topic.name)) { + // If the topic is unknown, we add the topic and all its partitions + // to the response with UNKNOWN_TOPIC_OR_PARTITION. + responseBuilder.addPartitions[TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition]( + topic.name, topic.partitions, _.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION) + } else { + // Otherwise, we check all partitions to ensure that they all exist. + val topicWithValidPartitions = new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic().setName(topic.name) - // the callback for sending an offset commit response - def sendResponseCallback(authorizedTopicErrors: Map[TopicPartition, Errors]): Unit = { - val combinedCommitStatus = mutable.Map() ++= authorizedTopicErrors ++= unauthorizedTopicErrors ++= nonExistingTopicErrors - if (isDebugEnabled) - combinedCommitStatus.forKeyValue { (topicPartition, error) => - if (error != Errors.NONE) { - debug(s"TxnOffsetCommit with correlation id ${header.correlationId} from client ${header.clientId} " + - s"on partition $topicPartition failed due to ${error.exceptionName}") + topic.partitions.forEach { partition => + if (metadataCache.getPartitionInfo(topic.name, partition.partitionIndex).nonEmpty) { + topicWithValidPartitions.partitions.add(partition) + } else { + responseBuilder.addPartition(topic.name, partition.partitionIndex, Errors.UNKNOWN_TOPIC_OR_PARTITION) } } - // We need to replace COORDINATOR_LOAD_IN_PROGRESS with COORDINATOR_NOT_AVAILABLE - // for older producer client from 0.11 to prior 2.0, which could potentially crash due - // to unexpected loading error. This bug is fixed later by KAFKA-7296. Clients using - // txn commit protocol >= 2 (version 2.3 and onwards) are guaranteed to have - // the fix to check for the loading error. - if (txnOffsetCommitRequest.version < 2) { - combinedCommitStatus ++= combinedCommitStatus.collect { - case (tp, error) if error == Errors.COORDINATOR_LOAD_IN_PROGRESS => tp -> Errors.COORDINATOR_NOT_AVAILABLE + if (!topicWithValidPartitions.partitions.isEmpty) { + authorizedTopicCommittedOffsets += topicWithValidPartitions } } - - requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => - new TxnOffsetCommitResponse(requestThrottleMs, combinedCommitStatus.asJava)) } - if (authorizedTopicCommittedOffsets.isEmpty) - sendResponseCallback(Map.empty) - else { - val offsetMetadata = convertTxnOffsets(authorizedTopicCommittedOffsets.toMap) - groupCoordinator.handleTxnCommitOffsets( - txnOffsetCommitRequest.data.groupId, - txnOffsetCommitRequest.data.producerId, - txnOffsetCommitRequest.data.producerEpoch, - txnOffsetCommitRequest.data.memberId, - Option(txnOffsetCommitRequest.data.groupInstanceId), - txnOffsetCommitRequest.data.generationId, - offsetMetadata, - sendResponseCallback, - requestLocal) + if (authorizedTopicCommittedOffsets.isEmpty) { + sendResponse(responseBuilder.build()) + CompletableFuture.completedFuture[Unit](()) + } else { + val txnOffsetCommitRequestData = new TxnOffsetCommitRequestData() + .setGroupId(txnOffsetCommitRequest.data.groupId) + .setMemberId(txnOffsetCommitRequest.data.memberId) + .setGenerationId(txnOffsetCommitRequest.data.generationId) + .setGroupInstanceId(txnOffsetCommitRequest.data.groupInstanceId) + .setProducerEpoch(txnOffsetCommitRequest.data.producerEpoch) + .setProducerId(txnOffsetCommitRequest.data.producerId) + .setTransactionalId(txnOffsetCommitRequest.data.transactionalId) + .setTopics(authorizedTopicCommittedOffsets.asJava) + + newGroupCoordinator.commitTransactionalOffsets( + request.context, + txnOffsetCommitRequestData, + requestLocal.bufferSupplier + ).handle[Unit] { (response, exception) => + if (exception != null) { + sendResponse(txnOffsetCommitRequest.getErrorResponse(exception)) + } else { + sendResponse(responseBuilder.merge(response).build()) + } + } } } } - private def convertTxnOffsets(offsetsMap: immutable.Map[TopicPartition, TxnOffsetCommitRequest.CommittedOffset]): immutable.Map[TopicPartition, OffsetAndMetadata] = { - val currentTimestamp = time.milliseconds - offsetsMap.map { case (topicPartition, partitionData) => - val metadata = if (partitionData.metadata == null) OffsetAndMetadata.NoMetadata else partitionData.metadata - topicPartition -> new OffsetAndMetadata( - offset = partitionData.offset, - leaderEpoch = partitionData.leaderEpoch, - metadata = metadata, - commitTimestamp = currentTimestamp, - expireTimestamp = None) - } - } - def handleDescribeAcls(request: RequestChannel.Request): Unit = { aclApis.handleDescribeAcls(request) } diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala index 9ecbdd54e064c..8ce2b8caa837d 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala @@ -21,7 +21,7 @@ import kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallbac import kafka.server.RequestLocal import kafka.utils.MockTime import org.apache.kafka.common.TopicPartition -import org.apache.kafka.common.message.{DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetFetchRequestData, OffsetFetchResponseData, SyncGroupRequestData, SyncGroupResponseData} +import org.apache.kafka.common.message.{DeleteGroupsResponseData, DescribeGroupsResponseData, HeartbeatRequestData, HeartbeatResponseData, JoinGroupRequestData, JoinGroupResponseData, LeaveGroupRequestData, LeaveGroupResponseData, ListGroupsRequestData, ListGroupsResponseData, OffsetCommitRequestData, OffsetCommitResponseData, OffsetFetchRequestData, OffsetFetchResponseData, SyncGroupRequestData, SyncGroupResponseData, TxnOffsetCommitRequestData, TxnOffsetCommitResponseData} import org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember import org.apache.kafka.common.network.{ClientInformation, ListenerName} @@ -680,4 +680,76 @@ class GroupCoordinatorAdapterTest { assertTrue(future.isDone) assertEquals(expectedResponseData, future.get()) } + + @Test + def testCommitTransactionalOffsets(): Unit = { + val groupCoordinator = mock(classOf[GroupCoordinator]) + val time = new MockTime() + val adapter = new GroupCoordinatorAdapter(groupCoordinator, time) + val now = time.milliseconds() + + val ctx = makeContext(ApiKeys.TXN_OFFSET_COMMIT, ApiKeys.TXN_OFFSET_COMMIT.latestVersion) + val data = new TxnOffsetCommitRequestData() + .setGroupId("group") + .setMemberId("member") + .setGenerationId(10) + .setProducerEpoch(1) + .setProducerId(2) + .setTransactionalId("transaction-id") + .setTopics(List( + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() + .setName("foo") + .setPartitions(List( + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100) + .setCommittedLeaderEpoch(1) + ).asJava) + ).asJava) + val bufferSupplier = BufferSupplier.create() + + val future = adapter.commitTransactionalOffsets(ctx, data, bufferSupplier) + assertFalse(future.isDone) + + val capturedCallback: ArgumentCaptor[Map[TopicPartition, Errors] => Unit] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, Errors] => Unit]) + + verify(groupCoordinator).handleTxnCommitOffsets( + ArgumentMatchers.eq(data.groupId), + ArgumentMatchers.eq(data.producerId), + ArgumentMatchers.eq(data.producerEpoch), + ArgumentMatchers.eq(data.memberId), + ArgumentMatchers.eq(None), + ArgumentMatchers.eq(data.generationId), + ArgumentMatchers.eq(Map( + new TopicPartition("foo", 0) -> new OffsetAndMetadata( + offset = 100, + leaderEpoch = Optional.of[Integer](1), + metadata = "", + commitTimestamp = now, + expireTimestamp = None + ) + )), + capturedCallback.capture(), + ArgumentMatchers.eq(RequestLocal(bufferSupplier)) + ) + + capturedCallback.getValue.apply(Map( + new TopicPartition("foo", 0) -> Errors.NONE + )) + + val expectedData = new TxnOffsetCommitResponseData() + .setTopics(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() + .setName("foo") + .setPartitions(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code) + ).asJava) + ).asJava) + + assertTrue(future.isDone) + assertEquals(expectedData, future.get()) + } } diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 81c2c9ffbe2f4..c40dff4be9ddd 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -1565,59 +1565,299 @@ class KafkaApisTest { } @Test - def shouldReplaceCoordinatorNotAvailableWithLoadInProcessInTxnOffsetCommitWithOlderClient(): Unit = { + def testHandleTxnOffsetCommitRequest(): Unit = { + addTopicToMetadataCache("foo", numPartitions = 1) + + val txnOffsetCommitRequest = new TxnOffsetCommitRequestData() + .setGroupId("group") + .setMemberId("member") + .setGenerationId(10) + .setProducerId(20) + .setProducerEpoch(30) + .setGroupInstanceId("instance-id") + .setTransactionalId("transactional-id") + .setTopics(List( + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() + .setName("foo") + .setPartitions(List( + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(10)).asJava)).asJava) + + val requestChannelRequest = buildRequest(new TxnOffsetCommitRequest.Builder(txnOffsetCommitRequest).build()) + + val future = new CompletableFuture[TxnOffsetCommitResponseData]() + when(newGroupCoordinator.commitTransactionalOffsets( + requestChannelRequest.context, + txnOffsetCommitRequest, + RequestLocal.NoCaching.bufferSupplier + )).thenReturn(future) + + createKafkaApis().handle( + requestChannelRequest, + RequestLocal.NoCaching + ) + + // This is the response returned by the group coordinator. + val txnOffsetCommitResponse = new TxnOffsetCommitResponseData() + .setTopics(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() + .setName("foo") + .setPartitions(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code)).asJava)).asJava) + + future.complete(txnOffsetCommitResponse) + val response = verifyNoThrottling[TxnOffsetCommitResponse](requestChannelRequest) + assertEquals(txnOffsetCommitResponse, response.data) + } + + @Test + def testHandleTxnOffsetCommitRequestFutureFailed(): Unit = { + addTopicToMetadataCache("foo", numPartitions = 1) + + val txnOffsetCommitRequest = new TxnOffsetCommitRequestData() + .setGroupId("group") + .setMemberId("member") + .setTopics(List( + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() + .setName("foo") + .setPartitions(List( + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(10)).asJava)).asJava) + + val requestChannelRequest = buildRequest(new TxnOffsetCommitRequest.Builder(txnOffsetCommitRequest).build()) + + val future = new CompletableFuture[TxnOffsetCommitResponseData]() + when(newGroupCoordinator.commitTransactionalOffsets( + requestChannelRequest.context, + txnOffsetCommitRequest, + RequestLocal.NoCaching.bufferSupplier + )).thenReturn(future) + + createKafkaApis().handle( + requestChannelRequest, + RequestLocal.NoCaching + ) + + val expectedTxnOffsetCommitResponse = new TxnOffsetCommitResponseData() + .setTopics(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() + .setName("foo") + .setPartitions(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NOT_COORDINATOR.code)).asJava)).asJava) + + future.completeExceptionally(Errors.NOT_COORDINATOR.exception) + val response = verifyNoThrottling[TxnOffsetCommitResponse](requestChannelRequest) + assertEquals(expectedTxnOffsetCommitResponse, response.data) + } + + @Test + def testHandleTxnOffsetCommitRequestTopicsAndPartitionsValidation(): Unit = { + addTopicToMetadataCache("foo", numPartitions = 2) + addTopicToMetadataCache("bar", numPartitions = 2) + + val txnOffsetCommitRequest = new TxnOffsetCommitRequestData() + .setGroupId("group") + .setMemberId("member") + .setTopics(List( + // foo exists but only has 2 partitions. + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() + .setName("foo") + .setPartitions(List( + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(10), + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(20), + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() + .setPartitionIndex(2) + .setCommittedOffset(30)).asJava), + // bar exists. + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() + .setName("bar") + .setPartitions(List( + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(40), + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(50)).asJava), + // zar does not exist. + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() + .setName("zar") + .setPartitions(List( + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(60), + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(70)).asJava)).asJava) + + val requestChannelRequest = buildRequest(new TxnOffsetCommitRequest.Builder(txnOffsetCommitRequest).build()) + + // This is the request expected by the group coordinator. + val expectedTnxOffsetCommitRequest = new TxnOffsetCommitRequestData() + .setGroupId("group") + .setMemberId("member") + .setTopics(List( + // foo exists but only has 2 partitions. + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() + .setName("foo") + .setPartitions(List( + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(10), + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(20)).asJava), + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() + .setName("bar") + .setPartitions(List( + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(40), + new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() + .setPartitionIndex(1) + .setCommittedOffset(50)).asJava)).asJava) + + val future = new CompletableFuture[TxnOffsetCommitResponseData]() + when(newGroupCoordinator.commitTransactionalOffsets( + requestChannelRequest.context, + expectedTnxOffsetCommitRequest, + RequestLocal.NoCaching.bufferSupplier + )).thenReturn(future) + + createKafkaApis().handle( + requestChannelRequest, + RequestLocal.NoCaching + ) + + // This is the response returned by the group coordinator. + val txnOffsetCommitResponse = new TxnOffsetCommitResponseData() + .setTopics(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() + .setName("foo") + .setPartitions(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code), + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.NONE.code)).asJava), + new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() + .setName("bar") + .setPartitions(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code), + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.NONE.code)).asJava)).asJava) + + val expectedTxnOffsetCommitResponse = new TxnOffsetCommitResponseData() + .setTopics(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() + .setName("foo") + .setPartitions(List( + // foo-2 is first because partitions failing the validation + // are put in the response first. + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(2) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code), + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code), + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.NONE.code)).asJava), + // zar is before bar because topics failing the validation are + // put in the response first. + new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() + .setName("zar") + .setPartitions(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code), + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)).asJava), + new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() + .setName("bar") + .setPartitions(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code), + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(1) + .setErrorCode(Errors.NONE.code)).asJava)).asJava) + + future.complete(txnOffsetCommitResponse) + val response = verifyNoThrottling[TxnOffsetCommitResponse](requestChannelRequest) + assertEquals(expectedTxnOffsetCommitResponse, response.data) + } + + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.TXN_OFFSET_COMMIT) + def shouldReplaceCoordinatorNotAvailableWithLoadInProcessInTxnOffsetCommitWithOlderClient(version: Short): Unit = { val topic = "topic" addTopicToMetadataCache(topic, numPartitions = 2) - for (version <- ApiKeys.TXN_OFFSET_COMMIT.oldestVersion to ApiKeys.TXN_OFFSET_COMMIT.latestVersion) { - reset(replicaManager, clientRequestQuotaManager, requestChannel, groupCoordinator) + val topicPartition = new TopicPartition(topic, 1) + val capturedResponse: ArgumentCaptor[TxnOffsetCommitResponse] = ArgumentCaptor.forClass(classOf[TxnOffsetCommitResponse]) - val topicPartition = new TopicPartition(topic, 1) - val capturedResponse: ArgumentCaptor[TxnOffsetCommitResponse] = ArgumentCaptor.forClass(classOf[TxnOffsetCommitResponse]) - val responseCallback: ArgumentCaptor[Map[TopicPartition, Errors] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, Errors] => Unit]) + val partitionOffsetCommitData = new TxnOffsetCommitRequest.CommittedOffset(15L, "", Optional.empty()) + val groupId = "groupId" - val partitionOffsetCommitData = new TxnOffsetCommitRequest.CommittedOffset(15L, "", Optional.empty()) - val groupId = "groupId" + val producerId = 15L + val epoch = 0.toShort - val producerId = 15L - val epoch = 0.toShort + val offsetCommitRequest = new TxnOffsetCommitRequest.Builder( + "txnId", + groupId, + producerId, + epoch, + Map(topicPartition -> partitionOffsetCommitData).asJava, + ).build(version) + val request = buildRequest(offsetCommitRequest) - val offsetCommitRequest = new TxnOffsetCommitRequest.Builder( - "txnId", - groupId, - producerId, - epoch, - Map(topicPartition -> partitionOffsetCommitData).asJava, - ).build(version.toShort) - val request = buildRequest(offsetCommitRequest) + val requestLocal = RequestLocal.withThreadConfinedCaching + val future = new CompletableFuture[TxnOffsetCommitResponseData]() + when(newGroupCoordinator.commitTransactionalOffsets( + request.context, + offsetCommitRequest.data, + requestLocal.bufferSupplier + )).thenReturn(future) - val requestLocal = RequestLocal.withThreadConfinedCaching - when(groupCoordinator.handleTxnCommitOffsets( - ArgumentMatchers.eq(groupId), - ArgumentMatchers.eq(producerId), - ArgumentMatchers.eq(epoch), - anyString, - ArgumentMatchers.eq(Option.empty), - anyInt, - any(), - responseCallback.capture(), - ArgumentMatchers.eq(requestLocal) - )).thenAnswer(_ => responseCallback.getValue.apply(Map(topicPartition -> Errors.COORDINATOR_LOAD_IN_PROGRESS))) + future.complete(new TxnOffsetCommitResponseData() + .setTopics(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() + .setName(topicPartition.topic) + .setPartitions(List( + new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(topicPartition.partition) + .setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code) + ).asJava) + ).asJava)) - createKafkaApis().handleTxnOffsetCommitRequest(request, requestLocal) + createKafkaApis().handleTxnOffsetCommitRequest(request, requestLocal) - verify(requestChannel).sendResponse( - ArgumentMatchers.eq(request), - capturedResponse.capture(), - ArgumentMatchers.eq(None) - ) - val response = capturedResponse.getValue + verify(requestChannel).sendResponse( + ArgumentMatchers.eq(request), + capturedResponse.capture(), + ArgumentMatchers.eq(None) + ) + val response = capturedResponse.getValue - if (version < 2) { - assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, response.errors().get(topicPartition)) - } else { - assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, response.errors().get(topicPartition)) - } + if (version < 2) { + assertEquals(Errors.COORDINATOR_NOT_AVAILABLE, response.errors().get(topicPartition)) + } else { + assertEquals(Errors.COORDINATOR_LOAD_IN_PROGRESS, response.errors().get(topicPartition)) } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java index 30d4841082216..d110d23cec933 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java @@ -32,6 +32,8 @@ import org.apache.kafka.common.message.OffsetFetchResponseData; import org.apache.kafka.common.message.SyncGroupRequestData; import org.apache.kafka.common.message.SyncGroupResponseData; +import org.apache.kafka.common.message.TxnOffsetCommitRequestData; +import org.apache.kafka.common.message.TxnOffsetCommitResponseData; import org.apache.kafka.common.requests.RequestContext; import org.apache.kafka.common.utils.BufferSupplier; @@ -181,5 +183,20 @@ CompletableFuture commitOffsets( OffsetCommitRequestData request, BufferSupplier bufferSupplier ); + + /** + * Commit transactional offsets for a given Group. + * + * @param context The request context. + * @param request The TnxOffsetCommitRequest data. + * @param bufferSupplier The buffer supplier tight to the request thread. + * + * @return A future yielding the response or an exception. + */ + CompletableFuture commitTransactionalOffsets( + RequestContext context, + TxnOffsetCommitRequestData request, + BufferSupplier bufferSupplier + ); }