diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala index 836adcc9793b3..6a55e51d3aeb4 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala @@ -38,6 +38,7 @@ import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, GroupJoinKe import org.apache.kafka.server.record.BrokerCompressionType import org.apache.kafka.storage.internals.log.VerificationGuard +import java.util.concurrent.CompletableFuture import scala.collection.{Map, Seq, Set, immutable, mutable} import scala.math.max @@ -982,7 +983,7 @@ private[group] class GroupCoordinator( def scheduleHandleTxnCompletion(producerId: Long, offsetsPartitions: Iterable[TopicPartition], - transactionResult: TransactionResult): Unit = { + transactionResult: TransactionResult): CompletableFuture[Void] = { require(offsetsPartitions.forall(_.topic == Topic.GROUP_METADATA_TOPIC_NAME)) val isCommit = transactionResult == TransactionResult.COMMIT groupManager.scheduleHandleTxnCompletion(producerId, offsetsPartitions.map(_.partition).toSet, isCommit) diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala index d8d47c46116c1..44e2723fac705 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala @@ -587,12 +587,16 @@ private[group] class GroupCoordinatorAdapter( producerId: Long, partitions: java.lang.Iterable[TopicPartition], transactionResult: TransactionResult - ): Unit = { - coordinator.scheduleHandleTxnCompletion( - producerId, - partitions.asScala, - transactionResult - ) + ): CompletableFuture[Void] = { + try { + coordinator.scheduleHandleTxnCompletion( + producerId, + partitions.asScala, + transactionResult + ) + } catch { + case e: Throwable => FutureUtils.failedFuture(e) + } } override def onPartitionsDeleted( diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index 8c30efd81f8f7..0c667aa7fa5cf 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -21,7 +21,7 @@ import java.nio.ByteBuffer import java.util.{Optional, OptionalInt, OptionalLong} import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.locks.ReentrantLock -import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.{CompletableFuture, ConcurrentHashMap} import java.util.function.Supplier import com.yammer.metrics.core.Gauge import kafka.coordinator.group.GroupMetadataManager.maybeConvertOffsetCommitError @@ -931,9 +931,17 @@ class GroupMetadataManager(brokerId: Int, * more group metadata locks to handle transaction completion, this operation is scheduled on * the scheduler thread to avoid deadlocks. */ - def scheduleHandleTxnCompletion(producerId: Long, completedPartitions: Set[Int], isCommit: Boolean): Unit = { - scheduler.scheduleOnce(s"handleTxnCompletion-$producerId", () => - handleTxnCompletion(producerId, completedPartitions, isCommit)) + def scheduleHandleTxnCompletion(producerId: Long, completedPartitions: Set[Int], isCommit: Boolean): CompletableFuture[Void] = { + val future = new CompletableFuture[Void]() + scheduler.scheduleOnce(s"handleTxnCompletion-$producerId", () => { + try { + handleTxnCompletion(producerId, completedPartitions, isCommit) + future.complete(null) + } catch { + case e: Throwable => future.completeExceptionally(e) + } + }) + future } private[group] def handleTxnCompletion(producerId: Long, completedPartitions: Set[Int], isCommit: Boolean): Unit = { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 405caf240b848..eb75829faea7c 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -2376,28 +2376,41 @@ class KafkaApis(val requestChannel: RequestChannel, trace(s"End transaction marker append for producer id $producerId completed with status: $currentErrors") updateErrors(producerId, currentErrors) - if (!groupCoordinator.isNewGroupCoordinator) { - val successfulOffsetsPartitions = currentErrors.asScala.filter { case (topicPartition, error) => - topicPartition.topic == GROUP_METADATA_TOPIC_NAME && error == Errors.NONE - }.keys - - if (successfulOffsetsPartitions.nonEmpty) { - // as soon as the end transaction marker has been written for a transactional offset commit, - // call to the group coordinator to materialize the offsets into the cache - try { - groupCoordinator.onTransactionCompleted(producerId, successfulOffsetsPartitions.asJava, result) - } catch { - case e: Exception => - error(s"Received an exception while trying to update the offsets cache on transaction marker append", e) - val updatedErrors = new ConcurrentHashMap[TopicPartition, Errors]() - successfulOffsetsPartitions.foreach(updatedErrors.put(_, Errors.UNKNOWN_SERVER_ERROR)) - updateErrors(producerId, updatedErrors) - } + def maybeSendResponse(): Unit = { + if (numAppends.decrementAndGet() == 0) { + requestHelper.sendResponseExemptThrottle(request, new WriteTxnMarkersResponse(errors)) } } - if (numAppends.decrementAndGet() == 0) - requestHelper.sendResponseExemptThrottle(request, new WriteTxnMarkersResponse(errors)) + // The new group coordinator uses GroupCoordinator#completeTransaction so we do + // not need to call GroupCoordinator#onTransactionCompleted here. + if (config.isNewGroupCoordinatorEnabled) { + maybeSendResponse() + return + } + + val successfulOffsetsPartitions = currentErrors.asScala.filter { case (topicPartition, error) => + topicPartition.topic == GROUP_METADATA_TOPIC_NAME && error == Errors.NONE + }.keys + + // If no end transaction marker has been written to a __consumer_offsets partition, we do not + // need to call GroupCoordinator#onTransactionCompleted. + if (successfulOffsetsPartitions.isEmpty) { + maybeSendResponse() + return + } + + // Otherwise, we call GroupCoordinator#onTransactionCompleted to materialize the offsets + // into the cache and we wait until the meterialization is completed. + groupCoordinator.onTransactionCompleted(producerId, successfulOffsetsPartitions.asJava, result).whenComplete { (_, exception) => + if (exception != null) { + error(s"Received an exception while trying to update the offsets cache on transaction marker append", exception) + val updatedErrors = new ConcurrentHashMap[TopicPartition, Errors]() + successfulOffsetsPartitions.foreach(updatedErrors.put(_, Errors.UNKNOWN_SERVER_ERROR)) + updateErrors(producerId, updatedErrors) + } + maybeSendResponse() + } } // TODO: The current append API makes doing separate writes per producerId a little easier, but it would diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala index 8d631fe2b8008..6fc021e48e860 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala @@ -26,7 +26,7 @@ import org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequ import org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteResponsePartition, OffsetDeleteResponsePartitionCollection, OffsetDeleteResponseTopic, OffsetDeleteResponseTopicCollection} import org.apache.kafka.common.network.{ClientInformation, ListenerName} import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.requests.{OffsetFetchResponse, RequestContext, RequestHeader} +import org.apache.kafka.common.requests.{OffsetFetchResponse, RequestContext, RequestHeader, TransactionResult} import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol} import org.apache.kafka.common.utils.{BufferSupplier, Time} import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource @@ -37,6 +37,7 @@ import org.apache.kafka.test.TestUtils.assertFutureThrows import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue} import org.junit.jupiter.api.Test import org.junit.jupiter.params.ParameterizedTest +import org.mockito.ArgumentMatchers.any import org.mockito.{ArgumentCaptor, ArgumentMatchers} import org.mockito.Mockito.{mock, verify, when} @@ -930,4 +931,26 @@ class GroupCoordinatorAdapterTest { assertTrue(future.isCompletedExceptionally) assertFutureThrows(future, classOf[UnsupportedVersionException]) } + + @Test + def testOnTransactionCompletedWithUnexpectedException(): Unit = { + val groupCoordinator = mock(classOf[GroupCoordinator]) + val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM) + + when(groupCoordinator.scheduleHandleTxnCompletion( + any(), + any(), + any() + )).thenThrow(new IllegalStateException("Oh no!")) + + val future = adapter.onTransactionCompleted( + 10, + Seq.empty[TopicPartition].asJava, + TransactionResult.COMMIT + ) + + assertTrue(future.isDone) + assertTrue(future.isCompletedExceptionally) + assertFutureThrows(future, classOf[Exception]) + } } diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 27cd6644bd91f..a2ca922226c45 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -3337,6 +3337,132 @@ class KafkaApisTest extends Logging { any()) } + @Test + def testHandleWriteTxnMarkersRequestWithOldGroupCoordinator(): Unit = { + val offset0 = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) + val offset1 = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1) + val foo0 = new TopicPartition("foo", 0) + val foo1 = new TopicPartition("foo", 1) + + val allPartitions = List( + offset0, + offset1, + foo0, + foo1 + ) + + val writeTxnMarkersRequest = new WriteTxnMarkersRequest.Builder( + ApiKeys.WRITE_TXN_MARKERS.latestVersion(), + List( + new TxnMarkerEntry( + 1L, + 1.toShort, + 0, + TransactionResult.COMMIT, + List(offset0, foo0).asJava + ), + new TxnMarkerEntry( + 2L, + 1.toShort, + 0, + TransactionResult.ABORT, + List(offset1, foo1).asJava + ) + ).asJava + ).build() + + val requestChannelRequest = buildRequest(writeTxnMarkersRequest) + + allPartitions.foreach { tp => + when(replicaManager.getMagic(tp)) + .thenReturn(Some(RecordBatch.MAGIC_VALUE_V2)) + } + + when(groupCoordinator.onTransactionCompleted( + ArgumentMatchers.eq(1L), + ArgumentMatchers.any(), + ArgumentMatchers.eq(TransactionResult.COMMIT) + )).thenReturn(CompletableFuture.completedFuture[Void](null)) + + when(groupCoordinator.onTransactionCompleted( + ArgumentMatchers.eq(2L), + ArgumentMatchers.any(), + ArgumentMatchers.eq(TransactionResult.ABORT) + )).thenReturn(FutureUtils.failedFuture[Void](Errors.NOT_CONTROLLER.exception)) + + val entriesPerPartition: ArgumentCaptor[Map[TopicPartition, MemoryRecords]] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, MemoryRecords]]) + val responseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) + + when(replicaManager.appendRecords( + ArgumentMatchers.eq(ServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT.toLong), + ArgumentMatchers.eq(-1), + ArgumentMatchers.eq(true), + ArgumentMatchers.eq(AppendOrigin.COORDINATOR), + entriesPerPartition.capture(), + responseCallback.capture(), + any(), + any(), + ArgumentMatchers.eq(RequestLocal.noCaching()), + any(), + any() + )).thenAnswer { _ => + responseCallback.getValue.apply( + entriesPerPartition.getValue.keySet.map { tp => + tp -> new PartitionResponse(Errors.NONE) + }.toMap + ) + } + kafkaApis = createKafkaApis(overrideProperties = Map( + GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "false" + )) + kafkaApis.handleWriteTxnMarkersRequest(requestChannelRequest, RequestLocal.noCaching()) + + val expectedResponse = new WriteTxnMarkersResponseData() + .setMarkers(List( + new WriteTxnMarkersResponseData.WritableTxnMarkerResult() + .setProducerId(1L) + .setTopics(List( + new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult() + .setName(Topic.GROUP_METADATA_TOPIC_NAME) + .setPartitions(List( + new WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code) + ).asJava), + new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult() + .setName("foo") + .setPartitions(List( + new WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult() + .setPartitionIndex(0) + .setErrorCode(Errors.NONE.code) + ).asJava) + ).asJava), + new WriteTxnMarkersResponseData.WritableTxnMarkerResult() + .setProducerId(2L) + .setTopics(List( + new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult() + .setName(Topic.GROUP_METADATA_TOPIC_NAME) + .setPartitions(List( + new WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult() + .setPartitionIndex(1) + .setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code) + ).asJava), + new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult() + .setName("foo") + .setPartitions(List( + new WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult() + .setPartitionIndex(1) + .setErrorCode(Errors.NONE.code) + ).asJava) + ).asJava) + ).asJava) + + val response = verifyNoThrottling[WriteTxnMarkersResponse](requestChannelRequest) + assertEquals(normalize(expectedResponse), normalize(response.data)) + } + @Test def testHandleWriteTxnMarkersRequestWithNewGroupCoordinator(): Unit = { val offset0 = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java index 223cff9720eaf..b57963aed9247 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinator.java @@ -339,11 +339,18 @@ CompletableFuture completeTransaction( /** * Commit or abort the pending transactional offsets for the given partitions. * + * This method is only used by the old group coordinator. Internally, the old + * group coordinator completes the transaction asynchronously in order to + * avoid deadlocks. Hence, this method returns a future that the caller + * can wait on. + * * @param producerId The producer id. * @param partitions The partitions. * @param transactionResult The result of the transaction. + * + * @return A future yielding the result. */ - void onTransactionCompleted( + CompletableFuture onTransactionCompleted( long producerId, Iterable partitions, TransactionResult transactionResult diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index 96d0253a09ec1..8b8c4bb0f9985 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -1086,7 +1086,7 @@ public CompletableFuture completeTransaction( * See {@link GroupCoordinator#onTransactionCompleted(long, Iterable, TransactionResult)}. */ @Override - public void onTransactionCompleted( + public CompletableFuture onTransactionCompleted( long producerId, Iterable partitions, TransactionResult transactionResult