Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import org.apache.kafka.server.purgatory.{DelayedOperationPurgatory, GroupJoinKe
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.storage.internals.log.VerificationGuard

import java.util.concurrent.CompletableFuture
import scala.collection.{Map, Seq, Set, immutable, mutable}
import scala.math.max

Expand Down Expand Up @@ -982,7 +983,7 @@ private[group] class GroupCoordinator(

def scheduleHandleTxnCompletion(producerId: Long,
offsetsPartitions: Iterable[TopicPartition],
transactionResult: TransactionResult): Unit = {
transactionResult: TransactionResult): CompletableFuture[Void] = {
require(offsetsPartitions.forall(_.topic == Topic.GROUP_METADATA_TOPIC_NAME))
val isCommit = transactionResult == TransactionResult.COMMIT
groupManager.scheduleHandleTxnCompletion(producerId, offsetsPartitions.map(_.partition).toSet, isCommit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -587,12 +587,16 @@ private[group] class GroupCoordinatorAdapter(
producerId: Long,
partitions: java.lang.Iterable[TopicPartition],
transactionResult: TransactionResult
): Unit = {
coordinator.scheduleHandleTxnCompletion(
producerId,
partitions.asScala,
transactionResult
)
): CompletableFuture[Void] = {
try {
coordinator.scheduleHandleTxnCompletion(
producerId,
partitions.asScala,
transactionResult
)
} catch {
case e: Throwable => FutureUtils.failedFuture(e)
}
}

override def onPartitionsDeleted(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.nio.ByteBuffer
import java.util.{Optional, OptionalInt, OptionalLong}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.{CompletableFuture, ConcurrentHashMap}
import java.util.function.Supplier
import com.yammer.metrics.core.Gauge
import kafka.coordinator.group.GroupMetadataManager.maybeConvertOffsetCommitError
Expand Down Expand Up @@ -931,9 +931,17 @@ class GroupMetadataManager(brokerId: Int,
* more group metadata locks to handle transaction completion, this operation is scheduled on
* the scheduler thread to avoid deadlocks.
*/
def scheduleHandleTxnCompletion(producerId: Long, completedPartitions: Set[Int], isCommit: Boolean): Unit = {
scheduler.scheduleOnce(s"handleTxnCompletion-$producerId", () =>
handleTxnCompletion(producerId, completedPartitions, isCommit))
def scheduleHandleTxnCompletion(producerId: Long, completedPartitions: Set[Int], isCommit: Boolean): CompletableFuture[Void] = {
val future = new CompletableFuture[Void]()
scheduler.scheduleOnce(s"handleTxnCompletion-$producerId", () => {
try {
handleTxnCompletion(producerId, completedPartitions, isCommit)
future.complete(null)
} catch {
case e: Throwable => future.completeExceptionally(e)
}
})
future
}

private[group] def handleTxnCompletion(producerId: Long, completedPartitions: Set[Int], isCommit: Boolean): Unit = {
Expand Down
51 changes: 32 additions & 19 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2376,28 +2376,41 @@ class KafkaApis(val requestChannel: RequestChannel,
trace(s"End transaction marker append for producer id $producerId completed with status: $currentErrors")
updateErrors(producerId, currentErrors)

if (!groupCoordinator.isNewGroupCoordinator) {
val successfulOffsetsPartitions = currentErrors.asScala.filter { case (topicPartition, error) =>
topicPartition.topic == GROUP_METADATA_TOPIC_NAME && error == Errors.NONE
}.keys

if (successfulOffsetsPartitions.nonEmpty) {
// as soon as the end transaction marker has been written for a transactional offset commit,
// call to the group coordinator to materialize the offsets into the cache
try {
groupCoordinator.onTransactionCompleted(producerId, successfulOffsetsPartitions.asJava, result)
} catch {
case e: Exception =>
error(s"Received an exception while trying to update the offsets cache on transaction marker append", e)
val updatedErrors = new ConcurrentHashMap[TopicPartition, Errors]()
successfulOffsetsPartitions.foreach(updatedErrors.put(_, Errors.UNKNOWN_SERVER_ERROR))
updateErrors(producerId, updatedErrors)
}
def maybeSendResponse(): Unit = {
if (numAppends.decrementAndGet() == 0) {
requestHelper.sendResponseExemptThrottle(request, new WriteTxnMarkersResponse(errors))
}
}

if (numAppends.decrementAndGet() == 0)
requestHelper.sendResponseExemptThrottle(request, new WriteTxnMarkersResponse(errors))
// The new group coordinator uses GroupCoordinator#completeTransaction so we do
// not need to call GroupCoordinator#onTransactionCompleted here.
if (config.isNewGroupCoordinatorEnabled) {
maybeSendResponse()
return
}

val successfulOffsetsPartitions = currentErrors.asScala.filter { case (topicPartition, error) =>
topicPartition.topic == GROUP_METADATA_TOPIC_NAME && error == Errors.NONE
}.keys

// If no end transaction marker has been written to a __consumer_offsets partition, we do not
// need to call GroupCoordinator#onTransactionCompleted.
if (successfulOffsetsPartitions.isEmpty) {
maybeSendResponse()
return
}

// Otherwise, we call GroupCoordinator#onTransactionCompleted to materialize the offsets
// into the cache and we wait until the meterialization is completed.
groupCoordinator.onTransactionCompleted(producerId, successfulOffsetsPartitions.asJava, result).whenComplete { (_, exception) =>
if (exception != null) {
error(s"Received an exception while trying to update the offsets cache on transaction marker append", exception)
val updatedErrors = new ConcurrentHashMap[TopicPartition, Errors]()
successfulOffsetsPartitions.foreach(updatedErrors.put(_, Errors.UNKNOWN_SERVER_ERROR))
updateErrors(producerId, updatedErrors)
}
maybeSendResponse()
}
}

// TODO: The current append API makes doing separate writes per producerId a little easier, but it would
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.kafka.common.message.OffsetDeleteRequestData.{OffsetDeleteRequ
import org.apache.kafka.common.message.OffsetDeleteResponseData.{OffsetDeleteResponsePartition, OffsetDeleteResponsePartitionCollection, OffsetDeleteResponseTopic, OffsetDeleteResponseTopicCollection}
import org.apache.kafka.common.network.{ClientInformation, ListenerName}
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.requests.{OffsetFetchResponse, RequestContext, RequestHeader}
import org.apache.kafka.common.requests.{OffsetFetchResponse, RequestContext, RequestHeader, TransactionResult}
import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource
Expand All @@ -37,6 +37,7 @@ import org.apache.kafka.test.TestUtils.assertFutureThrows
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.mockito.ArgumentMatchers.any
import org.mockito.{ArgumentCaptor, ArgumentMatchers}
import org.mockito.Mockito.{mock, verify, when}

Expand Down Expand Up @@ -930,4 +931,26 @@ class GroupCoordinatorAdapterTest {
assertTrue(future.isCompletedExceptionally)
assertFutureThrows(future, classOf[UnsupportedVersionException])
}

@Test
def testOnTransactionCompletedWithUnexpectedException(): Unit = {
val groupCoordinator = mock(classOf[GroupCoordinator])
val adapter = new GroupCoordinatorAdapter(groupCoordinator, Time.SYSTEM)

when(groupCoordinator.scheduleHandleTxnCompletion(
any(),
any(),
any()
)).thenThrow(new IllegalStateException("Oh no!"))

val future = adapter.onTransactionCompleted(
10,
Seq.empty[TopicPartition].asJava,
TransactionResult.COMMIT
)

assertTrue(future.isDone)
assertTrue(future.isCompletedExceptionally)
assertFutureThrows(future, classOf[Exception])
}
}
126 changes: 126 additions & 0 deletions core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3337,6 +3337,132 @@ class KafkaApisTest extends Logging {
any())
}

@Test
def testHandleWriteTxnMarkersRequestWithOldGroupCoordinator(): Unit = {
val offset0 = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
val offset1 = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 1)
val foo0 = new TopicPartition("foo", 0)
val foo1 = new TopicPartition("foo", 1)

val allPartitions = List(
offset0,
offset1,
foo0,
foo1
)

val writeTxnMarkersRequest = new WriteTxnMarkersRequest.Builder(
ApiKeys.WRITE_TXN_MARKERS.latestVersion(),
List(
new TxnMarkerEntry(
1L,
1.toShort,
0,
TransactionResult.COMMIT,
List(offset0, foo0).asJava
),
new TxnMarkerEntry(
2L,
1.toShort,
0,
TransactionResult.ABORT,
List(offset1, foo1).asJava
)
).asJava
).build()

val requestChannelRequest = buildRequest(writeTxnMarkersRequest)

allPartitions.foreach { tp =>
when(replicaManager.getMagic(tp))
.thenReturn(Some(RecordBatch.MAGIC_VALUE_V2))
}

when(groupCoordinator.onTransactionCompleted(
ArgumentMatchers.eq(1L),
ArgumentMatchers.any(),
ArgumentMatchers.eq(TransactionResult.COMMIT)
)).thenReturn(CompletableFuture.completedFuture[Void](null))

when(groupCoordinator.onTransactionCompleted(
ArgumentMatchers.eq(2L),
ArgumentMatchers.any(),
ArgumentMatchers.eq(TransactionResult.ABORT)
)).thenReturn(FutureUtils.failedFuture[Void](Errors.NOT_CONTROLLER.exception))

val entriesPerPartition: ArgumentCaptor[Map[TopicPartition, MemoryRecords]] =
ArgumentCaptor.forClass(classOf[Map[TopicPartition, MemoryRecords]])
val responseCallback: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] =
ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])

when(replicaManager.appendRecords(
ArgumentMatchers.eq(ServerConfigs.REQUEST_TIMEOUT_MS_DEFAULT.toLong),
ArgumentMatchers.eq(-1),
ArgumentMatchers.eq(true),
ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
entriesPerPartition.capture(),
responseCallback.capture(),
any(),
any(),
ArgumentMatchers.eq(RequestLocal.noCaching()),
any(),
any()
)).thenAnswer { _ =>
responseCallback.getValue.apply(
entriesPerPartition.getValue.keySet.map { tp =>
tp -> new PartitionResponse(Errors.NONE)
}.toMap
)
}
kafkaApis = createKafkaApis(overrideProperties = Map(
GroupCoordinatorConfig.NEW_GROUP_COORDINATOR_ENABLE_CONFIG -> "false"
))
kafkaApis.handleWriteTxnMarkersRequest(requestChannelRequest, RequestLocal.noCaching())

val expectedResponse = new WriteTxnMarkersResponseData()
.setMarkers(List(
new WriteTxnMarkersResponseData.WritableTxnMarkerResult()
.setProducerId(1L)
.setTopics(List(
new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult()
.setName(Topic.GROUP_METADATA_TOPIC_NAME)
.setPartitions(List(
new WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult()
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code)
).asJava),
new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult()
.setName("foo")
.setPartitions(List(
new WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult()
.setPartitionIndex(0)
.setErrorCode(Errors.NONE.code)
).asJava)
).asJava),
new WriteTxnMarkersResponseData.WritableTxnMarkerResult()
.setProducerId(2L)
.setTopics(List(
new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult()
.setName(Topic.GROUP_METADATA_TOPIC_NAME)
.setPartitions(List(
new WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult()
.setPartitionIndex(1)
.setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code)
).asJava),
new WriteTxnMarkersResponseData.WritableTxnMarkerTopicResult()
.setName("foo")
.setPartitions(List(
new WriteTxnMarkersResponseData.WritableTxnMarkerPartitionResult()
.setPartitionIndex(1)
.setErrorCode(Errors.NONE.code)
).asJava)
).asJava)
).asJava)

val response = verifyNoThrottling[WriteTxnMarkersResponse](requestChannelRequest)
assertEquals(normalize(expectedResponse), normalize(response.data))
}

@Test
def testHandleWriteTxnMarkersRequestWithNewGroupCoordinator(): Unit = {
val offset0 = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,11 +339,18 @@ CompletableFuture<Void> completeTransaction(
/**
* Commit or abort the pending transactional offsets for the given partitions.
*
* This method is only used by the old group coordinator. Internally, the old
* group coordinator completes the transaction asynchronously in order to
* avoid deadlocks. Hence, this method returns a future that the caller
* can wait on.
*
* @param producerId The producer id.
* @param partitions The partitions.
* @param transactionResult The result of the transaction.
*
* @return A future yielding the result.
*/
void onTransactionCompleted(
CompletableFuture<Void> onTransactionCompleted(
long producerId,
Iterable<TopicPartition> partitions,
TransactionResult transactionResult
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1086,7 +1086,7 @@ public CompletableFuture<Void> completeTransaction(
* See {@link GroupCoordinator#onTransactionCompleted(long, Iterable, TransactionResult)}.
*/
@Override
public void onTransactionCompleted(
public CompletableFuture<Void> onTransactionCompleted(
long producerId,
Iterable<TopicPartition> partitions,
TransactionResult transactionResult
Expand Down