diff --git a/build.gradle b/build.gradle
index ca98890bd5a39..13a2850a389fc 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1314,6 +1314,7 @@ project(':group-coordinator') {
implementation project(':server-common')
implementation project(':clients')
implementation project(':metadata')
+ implementation project(':storage')
implementation libs.slf4jApi
implementation libs.metrics
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 39a77326bdee2..bfe468c5b8794 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -248,6 +248,7 @@
+
diff --git a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
index c8c8625ced41f..2c69372558f34 100644
--- a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
+++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala
@@ -17,7 +17,7 @@
package kafka.coordinator.group
import kafka.cluster.PartitionListener
-import kafka.server.{ActionQueue, ReplicaManager}
+import kafka.server.{ActionQueue, ReplicaManager, RequestLocal}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.RecordTooLargeException
import org.apache.kafka.common.protocol.Errors
@@ -27,9 +27,10 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.coordinator.group.runtime.PartitionWriter
-import org.apache.kafka.storage.internals.log.AppendOrigin
+import org.apache.kafka.storage.internals.log.VerificationGuard
import java.util
+import java.util.concurrent.CompletableFuture
import scala.collection.Map
/**
@@ -81,10 +82,7 @@ class CoordinatorPartitionWriter[T](
}
/**
- * Register a PartitionWriter.Listener.
- *
- * @param tp The partition to register the listener to.
- * @param listener The listener.
+ * {@inheritDoc}
*/
override def registerListener(
tp: TopicPartition,
@@ -94,10 +92,7 @@ class CoordinatorPartitionWriter[T](
}
/**
- * Deregister a PartitionWriter.Listener.
- *
- * @param tp The partition to deregister the listener from.
- * @param listener The listener.
+ * {@inheritDoc}
*/
override def deregisterListener(
tp: TopicPartition,
@@ -107,20 +102,13 @@ class CoordinatorPartitionWriter[T](
}
/**
- * Write records to the partitions. Records are written in one batch so
- * atomicity is guaranteed.
- *
- * @param tp The partition to write records to.
- * @param producerId The producer id.
- * @param producerEpoch The producer epoch.
- * @param records The list of records. The records are written in a single batch.
- * @return The log end offset right after the written records.
- * @throws KafkaException Any KafkaException caught during the write operation.
+ * {@inheritDoc}
*/
override def append(
tp: TopicPartition,
producerId: Long,
producerEpoch: Short,
+ verificationGuard: VerificationGuard,
records: util.List[T]
): Long = {
if (records.isEmpty) throw new IllegalStateException("records must be non-empty.")
@@ -161,7 +149,7 @@ class CoordinatorPartitionWriter[T](
s"in append to partition $tp which exceeds the maximum configured size of $maxBatchSize.")
}
- internalAppend(tp, recordsBuilder.build())
+ internalAppend(tp, recordsBuilder.build(), verificationGuard)
} finally {
bufferSupplier.release(buffer)
}
@@ -172,15 +160,7 @@ class CoordinatorPartitionWriter[T](
}
/**
- * Write the transaction end marker.
- *
- * @param tp The partition to write records to.
- * @param producerId The producer id.
- * @param producerEpoch The producer epoch.
- * @param coordinatorEpoch The epoch of the transaction coordinator.
- * @param result The transaction result.
- * @return The log end offset right after the written records.
- * @throws KafkaException Any KafkaException caught during the write operation.
+ * {@inheritDoc}
*/
override def appendEndTransactionMarker(
tp: TopicPartition,
@@ -201,18 +181,48 @@ class CoordinatorPartitionWriter[T](
))
}
+ /**
+ * {@inheritDoc}
+ */
+ override def maybeStartTransactionVerification(
+ tp: TopicPartition,
+ transactionalId: String,
+ producerId: Long,
+ producerEpoch: Short
+ ): CompletableFuture[VerificationGuard] = {
+ val future = new CompletableFuture[VerificationGuard]()
+ replicaManager.maybeStartTransactionVerificationForPartition(
+ topicPartition = tp,
+ transactionalId = transactionalId,
+ producerId = producerId,
+ producerEpoch = producerEpoch,
+ baseSequence = RecordBatch.NO_SEQUENCE,
+ requestLocal = RequestLocal.NoCaching,
+ callback = (error, _, verificationGuard) => {
+ if (error != Errors.NONE) {
+ future.completeExceptionally(error.exception)
+ } else {
+ future.complete(verificationGuard)
+ }
+ }
+ )
+ future
+ }
+
private def internalAppend(
tp: TopicPartition,
- memoryRecords: MemoryRecords
+ memoryRecords: MemoryRecords,
+ verificationGuard: VerificationGuard = VerificationGuard.SENTINEL
): Long = {
var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty
- replicaManager.appendRecords(
+ replicaManager.appendForGroup(
timeout = 0L,
requiredAcks = 1,
- internalTopicsAllowed = true,
- origin = AppendOrigin.COORDINATOR,
entriesPerPartition = Map(tp -> memoryRecords),
responseCallback = results => appendResults = results,
+ requestLocal = RequestLocal.NoCaching,
+ verificationGuards = Map(tp -> verificationGuard),
+ delayedProduceLock = None,
// We can directly complete the purgatories here because we don't hold
// any conflicting locks.
actionQueue = directActionQueue
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index b4d0560390036..48ec0d0fe18aa 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -989,6 +989,7 @@ class ReplicaManager(val config: KafkaConfig,
* @param requestLocal container for the stateful instances scoped to this request -- this must correspond to the
* thread calling this method
* @param verificationGuards the mapping from topic partition to verification guards if transaction verification is used
+ * @param actionQueue the action queue to use
*/
def appendForGroup(
timeout: Long,
@@ -997,7 +998,8 @@ class ReplicaManager(val config: KafkaConfig,
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
delayedProduceLock: Option[Lock],
requestLocal: RequestLocal,
- verificationGuards: Map[TopicPartition, VerificationGuard]
+ verificationGuards: Map[TopicPartition, VerificationGuard],
+ actionQueue: ActionQueue = this.defaultActionQueue
): Unit = {
if (!isValidRequiredAcks(requiredAcks)) {
sendInvalidRequiredAcksResponse(entriesPerPartition, responseCallback)
@@ -1025,7 +1027,7 @@ class ReplicaManager(val config: KafkaConfig,
val allResults = localProduceResults
val produceStatus = buildProducePartitionStatus(allResults)
- addCompletePurgatoryAction(defaultActionQueue, allResults)
+ addCompletePurgatoryAction(actionQueue, allResults)
maybeAddDelayedProduce(
requiredAcks,
diff --git a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
index a0fb73ee31f35..086ee3fae1681 100644
--- a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala
@@ -215,7 +215,8 @@ object AbstractCoordinatorConcurrencyTest {
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
delayedProduceLock: Option[Lock] = None,
requestLocal: RequestLocal = RequestLocal.NoCaching,
- verificationGuards: Map[TopicPartition, VerificationGuard] = Map.empty): Unit = {
+ verificationGuards: Map[TopicPartition, VerificationGuard] = Map.empty,
+ actionQueue: ActionQueue = null): Unit = {
appendRecords(timeout, requiredAcks, true, AppendOrigin.COORDINATOR, entriesPerPartition, responseCallback,
delayedProduceLock, requestLocal = requestLocal)
}
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
index 0cfbb8f821c4e..e43cc7956e731 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala
@@ -16,7 +16,7 @@
*/
package kafka.coordinator.group
-import kafka.server.ReplicaManager
+import kafka.server.{ReplicaManager, RequestLocal}
import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.TopicConfig
@@ -27,7 +27,8 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.{MockTime, Time}
import org.apache.kafka.coordinator.group.runtime.PartitionWriter
-import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig}
+import org.apache.kafka.storage.internals.log.{LogConfig, VerificationGuard}
+import org.apache.kafka.test.TestUtils.assertFutureThrows
import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue}
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
@@ -104,17 +105,14 @@ class CoordinatorPartitionWriterTest {
val callbackCapture: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] =
ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
- when(replicaManager.appendRecords(
+ when(replicaManager.appendForGroup(
ArgumentMatchers.eq(0L),
ArgumentMatchers.eq(1.toShort),
- ArgumentMatchers.eq(true),
- ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
recordsCapture.capture(),
callbackCapture.capture(),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
- ArgumentMatchers.any(),
- ArgumentMatchers.any(),
+ ArgumentMatchers.eq(Map(tp -> VerificationGuard.SENTINEL)),
ArgumentMatchers.any()
)).thenAnswer( _ => {
callbackCapture.getValue.apply(Map(
@@ -140,6 +138,7 @@ class CoordinatorPartitionWriterTest {
tp,
RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH,
+ VerificationGuard.SENTINEL,
records.asJava
))
@@ -168,6 +167,7 @@ class CoordinatorPartitionWriterTest {
CompressionType.NONE,
time
)
+ val verificationGuard = new VerificationGuard()
when(replicaManager.getLogConfig(tp)).thenReturn(Some(LogConfig.fromProps(
Collections.emptyMap(),
@@ -179,17 +179,14 @@ class CoordinatorPartitionWriterTest {
val callbackCapture: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] =
ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
- when(replicaManager.appendRecords(
+ when(replicaManager.appendForGroup(
ArgumentMatchers.eq(0L),
ArgumentMatchers.eq(1.toShort),
- ArgumentMatchers.eq(true),
- ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
recordsCapture.capture(),
callbackCapture.capture(),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
- ArgumentMatchers.any(),
- ArgumentMatchers.any(),
+ ArgumentMatchers.eq(Map(tp -> verificationGuard)),
ArgumentMatchers.any()
)).thenAnswer(_ => {
callbackCapture.getValue.apply(Map(
@@ -215,6 +212,7 @@ class CoordinatorPartitionWriterTest {
tp,
100L,
50.toShort,
+ verificationGuard,
records.asJava
))
@@ -260,17 +258,14 @@ class CoordinatorPartitionWriterTest {
val callbackCapture: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] =
ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
- when(replicaManager.appendRecords(
+ when(replicaManager.appendForGroup(
ArgumentMatchers.eq(0L),
ArgumentMatchers.eq(1.toShort),
- ArgumentMatchers.eq(true),
- ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
recordsCapture.capture(),
callbackCapture.capture(),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
- ArgumentMatchers.any(),
- ArgumentMatchers.any(),
+ ArgumentMatchers.eq(Map(tp -> VerificationGuard.SENTINEL)),
ArgumentMatchers.any()
)).thenAnswer(_ => {
callbackCapture.getValue.apply(Map(
@@ -311,6 +306,58 @@ class CoordinatorPartitionWriterTest {
assertEquals(List(controlRecordType), receivedRecords)
}
+ @ParameterizedTest
+ @EnumSource(value = classOf[Errors], names = Array("NONE", "NOT_ENOUGH_REPLICAS"))
+ def testMaybeStartTransactionVerification(error: Errors): Unit = {
+ val tp = new TopicPartition("foo", 0)
+ val replicaManager = mock(classOf[ReplicaManager])
+ val time = new MockTime()
+ val partitionRecordWriter = new CoordinatorPartitionWriter(
+ replicaManager,
+ new StringKeyValueSerializer(),
+ CompressionType.NONE,
+ time
+ )
+
+ val verificationGuard = if (error == Errors.NONE) {
+ new VerificationGuard()
+ } else {
+ VerificationGuard.SENTINEL
+ }
+
+ val callbackCapture: ArgumentCaptor[(Errors, RequestLocal, VerificationGuard) => Unit] =
+ ArgumentCaptor.forClass(classOf[(Errors, RequestLocal, VerificationGuard) => Unit])
+
+ when(replicaManager.maybeStartTransactionVerificationForPartition(
+ ArgumentMatchers.eq(tp),
+ ArgumentMatchers.eq("transactional-id"),
+ ArgumentMatchers.eq(10L),
+ ArgumentMatchers.eq(5.toShort),
+ ArgumentMatchers.eq(RecordBatch.NO_SEQUENCE),
+ ArgumentMatchers.eq(RequestLocal.NoCaching),
+ callbackCapture.capture()
+ )).thenAnswer(_ => {
+ callbackCapture.getValue.apply(
+ error,
+ RequestLocal.NoCaching,
+ verificationGuard
+ )
+ })
+
+ val future = partitionRecordWriter.maybeStartTransactionVerification(
+ tp,
+ "transactional-id",
+ 10L,
+ 5.toShort
+ )
+
+ if (error == Errors.NONE) {
+ assertEquals(verificationGuard, future.get)
+ } else {
+ assertFutureThrows(future, error.exception.getClass)
+ }
+ }
+
@Test
def testWriteRecordsWithFailure(): Unit = {
val tp = new TopicPartition("foo", 0)
@@ -333,17 +380,14 @@ class CoordinatorPartitionWriterTest {
val callbackCapture: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] =
ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit])
- when(replicaManager.appendRecords(
+ when(replicaManager.appendForGroup(
ArgumentMatchers.eq(0L),
ArgumentMatchers.eq(1.toShort),
- ArgumentMatchers.eq(true),
- ArgumentMatchers.eq(AppendOrigin.COORDINATOR),
recordsCapture.capture(),
callbackCapture.capture(),
ArgumentMatchers.any(),
ArgumentMatchers.any(),
- ArgumentMatchers.any(),
- ArgumentMatchers.any(),
+ ArgumentMatchers.eq(Map(tp -> VerificationGuard.SENTINEL)),
ArgumentMatchers.any()
)).thenAnswer(_ => {
callbackCapture.getValue.apply(Map(
@@ -361,8 +405,9 @@ class CoordinatorPartitionWriterTest {
tp,
RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH,
- records.asJava)
- )
+ VerificationGuard.SENTINEL,
+ records.asJava
+ ))
}
@Test
@@ -394,8 +439,9 @@ class CoordinatorPartitionWriterTest {
tp,
RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH,
- records.asJava)
- )
+ VerificationGuard.SENTINEL,
+ records.asJava
+ ))
}
@Test
@@ -418,8 +464,9 @@ class CoordinatorPartitionWriterTest {
tp,
RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH,
- List.empty.asJava)
- )
+ VerificationGuard.SENTINEL,
+ List.empty.asJava
+ ))
}
@Test
@@ -445,7 +492,8 @@ class CoordinatorPartitionWriterTest {
tp,
RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH,
- records.asJava)
- )
+ VerificationGuard.SENTINEL,
+ records.asJava
+ ))
}
}
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 2b6e226ec64bc..817488423d7b4 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -19,7 +19,7 @@ package kafka.coordinator.group
import java.util.{Optional, OptionalInt}
import kafka.common.OffsetAndMetadata
-import kafka.server.{DelayedOperationPurgatory, HostedPartition, KafkaConfig, ReplicaManager, RequestLocal}
+import kafka.server.{ActionQueue, DelayedOperationPurgatory, HostedPartition, KafkaConfig, ReplicaManager, RequestLocal}
import kafka.utils._
import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
import org.apache.kafka.common.protocol.Errors
@@ -3902,7 +3902,8 @@ class GroupCoordinatorTest {
capturedArgument.capture(),
any[Option[ReentrantLock]],
any(classOf[RequestLocal]),
- any[Map[TopicPartition, VerificationGuard]]
+ any[Map[TopicPartition, VerificationGuard]],
+ any[ActionQueue]
)).thenAnswer(_ => {
capturedArgument.getValue.apply(
Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
@@ -3934,7 +3935,8 @@ class GroupCoordinatorTest {
capturedArgument.capture(),
any[Option[ReentrantLock]],
any(classOf[RequestLocal]),
- any[Map[TopicPartition, VerificationGuard]]
+ any[Map[TopicPartition, VerificationGuard]],
+ any[ActionQueue]
)).thenAnswer(_ => {
capturedArgument.getValue.apply(
Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
@@ -4077,7 +4079,8 @@ class GroupCoordinatorTest {
capturedArgument.capture(),
any[Option[ReentrantLock]],
any(classOf[RequestLocal]),
- any[Map[TopicPartition, VerificationGuard]]
+ any[Map[TopicPartition, VerificationGuard]],
+ any[ActionQueue]
)).thenAnswer(_ => {
capturedArgument.getValue.apply(
Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) ->
@@ -4119,7 +4122,8 @@ class GroupCoordinatorTest {
capturedArgument.capture(),
any[Option[ReentrantLock]],
any(classOf[RequestLocal]),
- any[Map[TopicPartition, VerificationGuard]]
+ any[Map[TopicPartition, VerificationGuard]],
+ any[ActionQueue]
)).thenAnswer(_ => {
capturedArgument.getValue.apply(
Map(offsetTopicPartition ->
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index d43a12255989b..dd7712f0e2926 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -1181,6 +1181,7 @@ class GroupMetadataManagerTest {
any(),
any[Option[ReentrantLock]],
any(),
+ any(),
any())
verify(replicaManager).getMagic(any())
}
@@ -1215,6 +1216,7 @@ class GroupMetadataManagerTest {
any(),
any[Option[ReentrantLock]],
any(),
+ any(),
any())
verify(replicaManager).getMagic(any())
}
@@ -1288,6 +1290,7 @@ class GroupMetadataManagerTest {
any(),
any[Option[ReentrantLock]],
any(),
+ any(),
any())
// Will update sensor after commit
assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
@@ -1328,7 +1331,8 @@ class GroupMetadataManagerTest {
capturedResponseCallback.capture(),
any[Option[ReentrantLock]],
any(),
- ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard)))
+ ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard)),
+ any())
verify(replicaManager).getMagic(any())
capturedResponseCallback.getValue.apply(Map(groupTopicPartition ->
new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L)))
@@ -1386,7 +1390,8 @@ class GroupMetadataManagerTest {
any(),
any[Option[ReentrantLock]],
any(),
- ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard)))
+ ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard)),
+ any())
verify(replicaManager).getMagic(any())
}
@@ -1434,7 +1439,8 @@ class GroupMetadataManagerTest {
any(),
any[Option[ReentrantLock]],
any(),
- ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard)))
+ ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard)),
+ any())
verify(replicaManager).getMagic(any())
}
@@ -1585,6 +1591,7 @@ class GroupMetadataManagerTest {
any(),
any[Option[ReentrantLock]],
any(),
+ any(),
any())
verify(replicaManager).getMagic(any())
assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count"))
@@ -1690,6 +1697,7 @@ class GroupMetadataManagerTest {
any(),
any[Option[ReentrantLock]],
any(),
+ any(),
any())
verify(replicaManager, times(2)).getMagic(any())
}
@@ -2797,6 +2805,7 @@ class GroupMetadataManagerTest {
capturedArgument.capture(),
any[Option[ReentrantLock]],
any(),
+ any(),
any())
capturedArgument
}
@@ -2810,6 +2819,7 @@ class GroupMetadataManagerTest {
capturedCallback.capture(),
any[Option[ReentrantLock]],
any(),
+ any(),
any()
)).thenAnswer(_ => {
capturedCallback.getValue.apply(
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
index 2c95333b45e8a..c69739ac99bbf 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
@@ -1106,6 +1106,8 @@ private static boolean isGroupIdNotEmpty(String groupId) {
* @return The Errors instance associated with the given exception.
*/
private static Errors normalizeException(Throwable exception) {
+ exception = Errors.maybeUnwrapException(exception);
+
if (exception instanceof UnknownTopicOrPartitionException ||
exception instanceof NotEnoughReplicasException ||
exception instanceof TimeoutException) {
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
index fffdd83b2568e..854e3e9d90343 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java
@@ -35,6 +35,7 @@
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.server.util.timer.TimerTask;
+import org.apache.kafka.storage.internals.log.VerificationGuard;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.slf4j.Logger;
@@ -580,6 +581,11 @@ class CoordinatorWriteEvent implements CoordinatorEvent, DeferredEvent {
*/
final short producerEpoch;
+ /**
+ * The verification guard.
+ */
+ final VerificationGuard verificationGuard;
+
/**
* The write operation to execute.
*/
@@ -627,6 +633,7 @@ class CoordinatorWriteEvent implements CoordinatorEvent, DeferredEvent {
null,
RecordBatch.NO_PRODUCER_ID,
RecordBatch.NO_PRODUCER_EPOCH,
+ VerificationGuard.SENTINEL,
writeTimeout,
op
);
@@ -640,6 +647,7 @@ class CoordinatorWriteEvent implements CoordinatorEvent, DeferredEvent {
* @param transactionalId The transactional id.
* @param producerId The producer id.
* @param producerEpoch The producer epoch.
+ * @param verificationGuard The verification guard.
* @param writeTimeout The write operation timeout
* @param op The write operation.
*/
@@ -649,6 +657,7 @@ class CoordinatorWriteEvent implements CoordinatorEvent, DeferredEvent {
String transactionalId,
long producerId,
short producerEpoch,
+ VerificationGuard verificationGuard,
Duration writeTimeout,
CoordinatorWriteOperation op
) {
@@ -658,6 +667,7 @@ class CoordinatorWriteEvent implements CoordinatorEvent, DeferredEvent {
this.transactionalId = transactionalId;
this.producerId = producerId;
this.producerEpoch = producerEpoch;
+ this.verificationGuard = verificationGuard;
this.future = new CompletableFuture<>();
this.createdTimeMs = time.milliseconds();
this.writeTimeout = writeTimeout;
@@ -717,6 +727,7 @@ public void run() {
tp,
producerId,
producerEpoch,
+ verificationGuard,
result.records()
);
context.coordinator.updateLastWrittenOffset(offset);
@@ -1466,17 +1477,25 @@ public CompletableFuture scheduleTransactionalWriteOperation(
) {
throwIfNotRunning();
log.debug("Scheduled execution of transactional write operation {}.", name);
- CoordinatorWriteEvent event = new CoordinatorWriteEvent<>(
- name,
+ return partitionWriter.maybeStartTransactionVerification(
tp,
transactionalId,
producerId,
- producerEpoch,
- timeout,
- op
- );
- enqueue(event);
- return event.future;
+ producerEpoch
+ ).thenCompose(verificationGuard -> {
+ CoordinatorWriteEvent event = new CoordinatorWriteEvent<>(
+ name,
+ tp,
+ transactionalId,
+ producerId,
+ producerEpoch,
+ verificationGuard,
+ timeout,
+ op
+ );
+ enqueue(event);
+ return event.future;
+ });
}
/**
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java
index e4270dd8fcb71..d647fce149db9 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java
@@ -19,8 +19,10 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.storage.internals.log.VerificationGuard;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
/**
* A simple interface to write records to Partitions/Logs. It contains the minimum
@@ -84,10 +86,11 @@ void deregisterListener(
* Write records to the partitions. Records are written in one batch so
* atomicity is guaranteed.
*
- * @param tp The partition to write records to.
- * @param producerId The producer id.
- * @param producerEpoch The producer epoch.
- * @param records The list of records. The records are written in a single batch.
+ * @param tp The partition to write records to.
+ * @param producerId The producer id.
+ * @param producerEpoch The producer epoch.
+ * @param verificationGuard The verification guard.
+ * @param records The list of records. The records are written in a single batch.
* @return The log end offset right after the written records.
* @throws KafkaException Any KafkaException caught during the write operation.
*/
@@ -95,6 +98,7 @@ long append(
TopicPartition tp,
long producerId,
short producerEpoch,
+ VerificationGuard verificationGuard,
List records
) throws KafkaException;
@@ -116,4 +120,23 @@ long appendEndTransactionMarker(
int coordinatorEpoch,
TransactionResult result
) throws KafkaException;
+
+ /**
+ * Verify the transaction.
+ *
+ * @param tp The partition to write records to.
+ * @param transactionalId The transactional id.
+ * @param producerId The producer id.
+ * @param producerEpoch The producer epoch.
+ * @return A future failed with any error encountered; or the {@link VerificationGuard}
+ * if the transaction required verification and {@link VerificationGuard#SENTINEL}
+ * if it did not.
+ * @throws KafkaException Any KafkaException caught during the operation.
+ */
+ CompletableFuture maybeStartTransactionVerification(
+ TopicPartition tp,
+ String transactionalId,
+ long producerId,
+ short producerEpoch
+ ) throws KafkaException;
}
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
index 0d27cdb2f0bbc..8a89b9cbb1957 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java
@@ -87,6 +87,7 @@
import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -1909,6 +1910,56 @@ public void testCommitTransactionalOffsets() throws ExecutionException, Interrup
assertEquals(response, future.get());
}
+ @Test
+ public void testCommitTransactionalOffsetsWithWrappedError() throws ExecutionException, InterruptedException {
+ CoordinatorRuntime runtime = mockRuntime();
+ GroupCoordinatorService service = new GroupCoordinatorService(
+ new LogContext(),
+ createConfig(),
+ runtime,
+ new GroupCoordinatorMetrics()
+ );
+ service.startup(() -> 1);
+
+ TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData()
+ .setGroupId("foo")
+ .setTransactionalId("transactional-id")
+ .setProducerId(10L)
+ .setProducerEpoch((short) 5)
+ .setMemberId("member-id")
+ .setGenerationId(10)
+ .setTopics(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition()
+ .setPartitionIndex(0)
+ .setCommittedOffset(100)))));
+
+ TxnOffsetCommitResponseData response = new TxnOffsetCommitResponseData()
+ .setTopics(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic()
+ .setName("topic")
+ .setPartitions(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition()
+ .setPartitionIndex(0)
+ .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())))));
+
+ when(runtime.scheduleTransactionalWriteOperation(
+ ArgumentMatchers.eq("txn-commit-offset"),
+ ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)),
+ ArgumentMatchers.eq("transactional-id"),
+ ArgumentMatchers.eq(10L),
+ ArgumentMatchers.eq((short) 5),
+ ArgumentMatchers.eq(Duration.ofMillis(5000)),
+ ArgumentMatchers.any()
+ )).thenReturn(FutureUtils.failedFuture(new CompletionException(Errors.NOT_ENOUGH_REPLICAS.exception())));
+
+ CompletableFuture future = service.commitTransactionalOffsets(
+ requestContext(ApiKeys.TXN_OFFSET_COMMIT),
+ request,
+ BufferSupplier.NO_CACHING
+ );
+
+ assertEquals(response, future.get());
+ }
+
@Test
public void testCompleteTransaction() throws ExecutionException, InterruptedException {
CoordinatorRuntime runtime = mockRuntime();
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
index e0d4dd08cdfd2..d9c3ea3417977 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java
@@ -19,6 +19,8 @@
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.NotCoordinatorException;
+import org.apache.kafka.common.errors.NotEnoughReplicasException;
+import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.LogContext;
@@ -29,7 +31,9 @@
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.MockTimer;
+import org.apache.kafka.storage.internals.log.VerificationGuard;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
@@ -67,6 +71,8 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyShort;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
@@ -208,6 +214,7 @@ public long append(
TopicPartition tp,
long producerId,
short producerEpoch,
+ VerificationGuard verificationGuard,
List records
) throws KafkaException {
if (records.size() <= maxRecordsInBatch) {
@@ -215,6 +222,7 @@ public long append(
tp,
producerId,
producerEpoch,
+ verificationGuard,
records
);
} else {
@@ -1117,6 +1125,15 @@ public CoordinatorShardBuilder get() {
// Verify that the listener was registered.
verify(writer, times(1)).registerListener(eq(TP), any());
+ // Prepare the transaction verification.
+ VerificationGuard guard = new VerificationGuard();
+ when(writer.maybeStartTransactionVerification(
+ TP,
+ "transactional-id",
+ 100L,
+ (short) 50
+ )).thenReturn(CompletableFuture.completedFuture(guard));
+
// Schedule a transactional write.
runtime.scheduleTransactionalWriteOperation(
"tnx-write",
@@ -1134,6 +1151,7 @@ public CoordinatorShardBuilder get() {
eq(TP),
eq(100L),
eq((short) 50),
+ eq(guard),
eq(Arrays.asList("record1", "record2"))
);
@@ -1151,6 +1169,75 @@ public CoordinatorShardBuilder get() {
);
}
+ @Test
+ public void testScheduleTransactionalWriteOpWhenVerificationFails() {
+ MockTimer timer = new MockTimer();
+ MockPartitionWriter writer = mock(MockPartitionWriter.class);
+ MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class);
+ MockCoordinatorShardBuilder shardBuilder = new MockCoordinatorShardBuilder() {
+ @Override
+ public MockCoordinatorShard build() {
+ return coordinator;
+ }
+ };
+ MockCoordinatorShardBuilderSupplier shardBuilderSupplier = new MockCoordinatorShardBuilderSupplier() {
+ @Override
+ public CoordinatorShardBuilder get() {
+ return shardBuilder;
+ }
+ };
+
+ CoordinatorRuntime runtime =
+ new CoordinatorRuntime.Builder()
+ .withTime(timer.time())
+ .withTimer(timer)
+ .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+ .withLoader(new MockCoordinatorLoader())
+ .withEventProcessor(new DirectEventProcessor())
+ .withPartitionWriter(writer)
+ .withCoordinatorShardBuilderSupplier(shardBuilderSupplier)
+ .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+ .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+ .build();
+
+ // Schedule the loading.
+ runtime.scheduleLoadOperation(TP, 10);
+
+ // Verify that the listener was registered.
+ verify(writer, times(1)).registerListener(eq(TP), any());
+
+ // Fail the transaction verification.
+ when(writer.maybeStartTransactionVerification(
+ TP,
+ "transactional-id",
+ 100L,
+ (short) 50
+ )).thenReturn(FutureUtils.failedFuture(Errors.NOT_ENOUGH_REPLICAS.exception()));
+
+ // Schedule a transactional write.
+ CompletableFuture future = runtime.scheduleTransactionalWriteOperation(
+ "tnx-write",
+ TP,
+ "transactional-id",
+ 100L,
+ (short) 50,
+ Duration.ofMillis(5000),
+ state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response")
+ );
+
+ // Verify that the future is failed with the expected exception.
+ assertFutureThrows(future, NotEnoughReplicasException.class);
+
+ // Verify that the writer is not called.
+ verify(writer, times(0)).append(
+ any(),
+ anyLong(),
+ anyShort(),
+ any(),
+ any()
+ );
+ }
+
@ParameterizedTest
@EnumSource(value = TransactionResult.class)
public void testScheduleTransactionCompletion(TransactionResult result) throws ExecutionException, InterruptedException, TimeoutException {
diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java
index 3cf8d480e46a6..7dea0b83f7ee9 100644
--- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java
+++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java
@@ -20,12 +20,14 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.storage.internals.log.VerificationGuard;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@@ -223,6 +225,7 @@ public long append(
TopicPartition tp,
long producerId,
short producerEpoch,
+ VerificationGuard verificationGuard,
List records
) throws KafkaException {
PartitionState state = partitionState(tp);
@@ -266,6 +269,16 @@ public long appendEndTransactionMarker(
}
}
+ @Override
+ public CompletableFuture maybeStartTransactionVerification(
+ TopicPartition tp,
+ String transactionalId,
+ long producerId,
+ short producerEpoch
+ ) throws KafkaException {
+ return CompletableFuture.completedFuture(new VerificationGuard());
+ }
+
public void commit(
TopicPartition tp,
long offset
diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/transactions_test.py
index 90d459bb8d641..b9b39f355e46a 100644
--- a/tests/kafkatest/tests/core/transactions_test.py
+++ b/tests/kafkatest/tests/core/transactions_test.py
@@ -257,7 +257,7 @@ def setup_topics(self):
check_order=[True, False],
use_group_metadata=[True, False],
metadata_quorum=quorum.all_kraft,
- use_new_coordinator=[False]
+ use_new_coordinator=[True, False]
)
def test_transactions(self, failure_mode, bounce_target, check_order, use_group_metadata, metadata_quorum=quorum.zk, use_new_coordinator=False):
security_protocol = 'PLAINTEXT'