From b37593115e9903430e8d3e581c210720fab0329e Mon Sep 17 00:00:00 2001 From: David Jacot Date: Mon, 8 Jan 2024 13:42:20 +0100 Subject: [PATCH 1/8] KAFKA-14505; [4/N] Wire transaction verification --- build.gradle | 1 + checkstyle/import-control.xml | 1 + .../group/CoordinatorPartitionWriter.scala | 53 +++++++-- .../scala/kafka/server/ReplicaManager.scala | 6 +- .../AbstractCoordinatorConcurrencyTest.scala | 3 +- .../CoordinatorPartitionWriterTest.scala | 108 +++++++++++++----- .../group/GroupCoordinatorTest.scala | 14 ++- .../group/GroupMetadataManagerTest.scala | 16 ++- .../group/runtime/CoordinatorRuntime.java | 35 ++++-- .../group/runtime/PartitionWriter.java | 29 ++++- .../group/runtime/CoordinatorRuntimeTest.java | 87 ++++++++++++++ .../runtime/InMemoryPartitionWriter.java | 13 +++ 12 files changed, 306 insertions(+), 60 deletions(-) diff --git a/build.gradle b/build.gradle index ca98890bd5a39..13a2850a389fc 100644 --- a/build.gradle +++ b/build.gradle @@ -1314,6 +1314,7 @@ project(':group-coordinator') { implementation project(':server-common') implementation project(':clients') implementation project(':metadata') + implementation project(':storage') implementation libs.slf4jApi implementation libs.metrics diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 39a77326bdee2..bfe468c5b8794 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -248,6 +248,7 @@ + diff --git a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala index c8c8625ced41f..33e33d54affee 100644 --- a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala +++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala @@ -17,7 +17,7 @@ package kafka.coordinator.group import kafka.cluster.PartitionListener -import kafka.server.{ActionQueue, ReplicaManager} +import kafka.server.{ActionQueue, ReplicaManager, RequestLocal} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.RecordTooLargeException import org.apache.kafka.common.protocol.Errors @@ -27,9 +27,10 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.TransactionResult import org.apache.kafka.common.utils.{BufferSupplier, Time} import org.apache.kafka.coordinator.group.runtime.PartitionWriter -import org.apache.kafka.storage.internals.log.AppendOrigin +import org.apache.kafka.storage.internals.log.{VerificationGuard} import java.util +import java.util.concurrent.CompletableFuture import scala.collection.Map /** @@ -121,6 +122,7 @@ class CoordinatorPartitionWriter[T]( tp: TopicPartition, producerId: Long, producerEpoch: Short, + verificationGuard: VerificationGuard, records: util.List[T] ): Long = { if (records.isEmpty) throw new IllegalStateException("records must be non-empty.") @@ -161,7 +163,7 @@ class CoordinatorPartitionWriter[T]( s"in append to partition $tp which exceeds the maximum configured size of $maxBatchSize.") } - internalAppend(tp, recordsBuilder.build()) + internalAppend(tp, recordsBuilder.build(), verificationGuard) } finally { bufferSupplier.release(buffer) } @@ -201,18 +203,55 @@ class CoordinatorPartitionWriter[T]( )) } + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. + * @throws KafkaException Any KafkaException caught during the operation. + */ + override def maybeStartTransactionVerification( + tp: TopicPartition, + transactionalId: String, + producerId: Long, + producerEpoch: Short + ): CompletableFuture[VerificationGuard] = { + val future = new CompletableFuture[VerificationGuard]() + replicaManager.maybeStartTransactionVerificationForPartition( + topicPartition = tp, + transactionalId = transactionalId, + producerId = producerId, + producerEpoch = producerEpoch, + baseSequence = RecordBatch.NO_SEQUENCE, + requestLocal = RequestLocal.NoCaching, + callback = (error, _, verificationGuard) => { + if (error != Errors.NONE) { + future.completeExceptionally(error.exception) + } else { + future.complete(verificationGuard) + } + } + ) + future + } + private def internalAppend( tp: TopicPartition, - memoryRecords: MemoryRecords + memoryRecords: MemoryRecords, + verificationGuard: VerificationGuard = VerificationGuard.SENTINEL ): Long = { var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty - replicaManager.appendRecords( + replicaManager.appendForGroup( timeout = 0L, requiredAcks = 1, - internalTopicsAllowed = true, - origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> memoryRecords), responseCallback = results => appendResults = results, + requestLocal = RequestLocal.NoCaching, + verificationGuards = Map(tp -> verificationGuard), + delayedProduceLock = None, // We can directly complete the purgatories here because we don't hold // any conflicting locks. actionQueue = directActionQueue diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index b4d0560390036..48ec0d0fe18aa 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -989,6 +989,7 @@ class ReplicaManager(val config: KafkaConfig, * @param requestLocal container for the stateful instances scoped to this request -- this must correspond to the * thread calling this method * @param verificationGuards the mapping from topic partition to verification guards if transaction verification is used + * @param actionQueue the action queue to use */ def appendForGroup( timeout: Long, @@ -997,7 +998,8 @@ class ReplicaManager(val config: KafkaConfig, responseCallback: Map[TopicPartition, PartitionResponse] => Unit, delayedProduceLock: Option[Lock], requestLocal: RequestLocal, - verificationGuards: Map[TopicPartition, VerificationGuard] + verificationGuards: Map[TopicPartition, VerificationGuard], + actionQueue: ActionQueue = this.defaultActionQueue ): Unit = { if (!isValidRequiredAcks(requiredAcks)) { sendInvalidRequiredAcksResponse(entriesPerPartition, responseCallback) @@ -1025,7 +1027,7 @@ class ReplicaManager(val config: KafkaConfig, val allResults = localProduceResults val produceStatus = buildProducePartitionStatus(allResults) - addCompletePurgatoryAction(defaultActionQueue, allResults) + addCompletePurgatoryAction(actionQueue, allResults) maybeAddDelayedProduce( requiredAcks, diff --git a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala index a0fb73ee31f35..086ee3fae1681 100644 --- a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala @@ -215,7 +215,8 @@ object AbstractCoordinatorConcurrencyTest { responseCallback: Map[TopicPartition, PartitionResponse] => Unit, delayedProduceLock: Option[Lock] = None, requestLocal: RequestLocal = RequestLocal.NoCaching, - verificationGuards: Map[TopicPartition, VerificationGuard] = Map.empty): Unit = { + verificationGuards: Map[TopicPartition, VerificationGuard] = Map.empty, + actionQueue: ActionQueue = null): Unit = { appendRecords(timeout, requiredAcks, true, AppendOrigin.COORDINATOR, entriesPerPartition, responseCallback, delayedProduceLock, requestLocal = requestLocal) } diff --git a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala index 0cfbb8f821c4e..e43cc7956e731 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala @@ -16,7 +16,7 @@ */ package kafka.coordinator.group -import kafka.server.ReplicaManager +import kafka.server.{ReplicaManager, RequestLocal} import kafka.utils.TestUtils import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.config.TopicConfig @@ -27,7 +27,8 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.TransactionResult import org.apache.kafka.common.utils.{MockTime, Time} import org.apache.kafka.coordinator.group.runtime.PartitionWriter -import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig} +import org.apache.kafka.storage.internals.log.{LogConfig, VerificationGuard} +import org.apache.kafka.test.TestUtils.assertFutureThrows import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows, assertTrue} import org.junit.jupiter.api.Test import org.junit.jupiter.params.ParameterizedTest @@ -104,17 +105,14 @@ class CoordinatorPartitionWriterTest { val callbackCapture: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) - when(replicaManager.appendRecords( + when(replicaManager.appendForGroup( ArgumentMatchers.eq(0L), ArgumentMatchers.eq(1.toShort), - ArgumentMatchers.eq(true), - ArgumentMatchers.eq(AppendOrigin.COORDINATOR), recordsCapture.capture(), callbackCapture.capture(), ArgumentMatchers.any(), ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), + ArgumentMatchers.eq(Map(tp -> VerificationGuard.SENTINEL)), ArgumentMatchers.any() )).thenAnswer( _ => { callbackCapture.getValue.apply(Map( @@ -140,6 +138,7 @@ class CoordinatorPartitionWriterTest { tp, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, + VerificationGuard.SENTINEL, records.asJava )) @@ -168,6 +167,7 @@ class CoordinatorPartitionWriterTest { CompressionType.NONE, time ) + val verificationGuard = new VerificationGuard() when(replicaManager.getLogConfig(tp)).thenReturn(Some(LogConfig.fromProps( Collections.emptyMap(), @@ -179,17 +179,14 @@ class CoordinatorPartitionWriterTest { val callbackCapture: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) - when(replicaManager.appendRecords( + when(replicaManager.appendForGroup( ArgumentMatchers.eq(0L), ArgumentMatchers.eq(1.toShort), - ArgumentMatchers.eq(true), - ArgumentMatchers.eq(AppendOrigin.COORDINATOR), recordsCapture.capture(), callbackCapture.capture(), ArgumentMatchers.any(), ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), + ArgumentMatchers.eq(Map(tp -> verificationGuard)), ArgumentMatchers.any() )).thenAnswer(_ => { callbackCapture.getValue.apply(Map( @@ -215,6 +212,7 @@ class CoordinatorPartitionWriterTest { tp, 100L, 50.toShort, + verificationGuard, records.asJava )) @@ -260,17 +258,14 @@ class CoordinatorPartitionWriterTest { val callbackCapture: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) - when(replicaManager.appendRecords( + when(replicaManager.appendForGroup( ArgumentMatchers.eq(0L), ArgumentMatchers.eq(1.toShort), - ArgumentMatchers.eq(true), - ArgumentMatchers.eq(AppendOrigin.COORDINATOR), recordsCapture.capture(), callbackCapture.capture(), ArgumentMatchers.any(), ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), + ArgumentMatchers.eq(Map(tp -> VerificationGuard.SENTINEL)), ArgumentMatchers.any() )).thenAnswer(_ => { callbackCapture.getValue.apply(Map( @@ -311,6 +306,58 @@ class CoordinatorPartitionWriterTest { assertEquals(List(controlRecordType), receivedRecords) } + @ParameterizedTest + @EnumSource(value = classOf[Errors], names = Array("NONE", "NOT_ENOUGH_REPLICAS")) + def testMaybeStartTransactionVerification(error: Errors): Unit = { + val tp = new TopicPartition("foo", 0) + val replicaManager = mock(classOf[ReplicaManager]) + val time = new MockTime() + val partitionRecordWriter = new CoordinatorPartitionWriter( + replicaManager, + new StringKeyValueSerializer(), + CompressionType.NONE, + time + ) + + val verificationGuard = if (error == Errors.NONE) { + new VerificationGuard() + } else { + VerificationGuard.SENTINEL + } + + val callbackCapture: ArgumentCaptor[(Errors, RequestLocal, VerificationGuard) => Unit] = + ArgumentCaptor.forClass(classOf[(Errors, RequestLocal, VerificationGuard) => Unit]) + + when(replicaManager.maybeStartTransactionVerificationForPartition( + ArgumentMatchers.eq(tp), + ArgumentMatchers.eq("transactional-id"), + ArgumentMatchers.eq(10L), + ArgumentMatchers.eq(5.toShort), + ArgumentMatchers.eq(RecordBatch.NO_SEQUENCE), + ArgumentMatchers.eq(RequestLocal.NoCaching), + callbackCapture.capture() + )).thenAnswer(_ => { + callbackCapture.getValue.apply( + error, + RequestLocal.NoCaching, + verificationGuard + ) + }) + + val future = partitionRecordWriter.maybeStartTransactionVerification( + tp, + "transactional-id", + 10L, + 5.toShort + ) + + if (error == Errors.NONE) { + assertEquals(verificationGuard, future.get) + } else { + assertFutureThrows(future, error.exception.getClass) + } + } + @Test def testWriteRecordsWithFailure(): Unit = { val tp = new TopicPartition("foo", 0) @@ -333,17 +380,14 @@ class CoordinatorPartitionWriterTest { val callbackCapture: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) - when(replicaManager.appendRecords( + when(replicaManager.appendForGroup( ArgumentMatchers.eq(0L), ArgumentMatchers.eq(1.toShort), - ArgumentMatchers.eq(true), - ArgumentMatchers.eq(AppendOrigin.COORDINATOR), recordsCapture.capture(), callbackCapture.capture(), ArgumentMatchers.any(), ArgumentMatchers.any(), - ArgumentMatchers.any(), - ArgumentMatchers.any(), + ArgumentMatchers.eq(Map(tp -> VerificationGuard.SENTINEL)), ArgumentMatchers.any() )).thenAnswer(_ => { callbackCapture.getValue.apply(Map( @@ -361,8 +405,9 @@ class CoordinatorPartitionWriterTest { tp, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, - records.asJava) - ) + VerificationGuard.SENTINEL, + records.asJava + )) } @Test @@ -394,8 +439,9 @@ class CoordinatorPartitionWriterTest { tp, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, - records.asJava) - ) + VerificationGuard.SENTINEL, + records.asJava + )) } @Test @@ -418,8 +464,9 @@ class CoordinatorPartitionWriterTest { tp, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, - List.empty.asJava) - ) + VerificationGuard.SENTINEL, + List.empty.asJava + )) } @Test @@ -445,7 +492,8 @@ class CoordinatorPartitionWriterTest { tp, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, - records.asJava) - ) + VerificationGuard.SENTINEL, + records.asJava + )) } } diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 2b6e226ec64bc..817488423d7b4 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -19,7 +19,7 @@ package kafka.coordinator.group import java.util.{Optional, OptionalInt} import kafka.common.OffsetAndMetadata -import kafka.server.{DelayedOperationPurgatory, HostedPartition, KafkaConfig, ReplicaManager, RequestLocal} +import kafka.server.{ActionQueue, DelayedOperationPurgatory, HostedPartition, KafkaConfig, ReplicaManager, RequestLocal} import kafka.utils._ import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid} import org.apache.kafka.common.protocol.Errors @@ -3902,7 +3902,8 @@ class GroupCoordinatorTest { capturedArgument.capture(), any[Option[ReentrantLock]], any(classOf[RequestLocal]), - any[Map[TopicPartition, VerificationGuard]] + any[Map[TopicPartition, VerificationGuard]], + any[ActionQueue] )).thenAnswer(_ => { capturedArgument.getValue.apply( Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) -> @@ -3934,7 +3935,8 @@ class GroupCoordinatorTest { capturedArgument.capture(), any[Option[ReentrantLock]], any(classOf[RequestLocal]), - any[Map[TopicPartition, VerificationGuard]] + any[Map[TopicPartition, VerificationGuard]], + any[ActionQueue] )).thenAnswer(_ => { capturedArgument.getValue.apply( Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) -> @@ -4077,7 +4079,8 @@ class GroupCoordinatorTest { capturedArgument.capture(), any[Option[ReentrantLock]], any(classOf[RequestLocal]), - any[Map[TopicPartition, VerificationGuard]] + any[Map[TopicPartition, VerificationGuard]], + any[ActionQueue] )).thenAnswer(_ => { capturedArgument.getValue.apply( Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) -> @@ -4119,7 +4122,8 @@ class GroupCoordinatorTest { capturedArgument.capture(), any[Option[ReentrantLock]], any(classOf[RequestLocal]), - any[Map[TopicPartition, VerificationGuard]] + any[Map[TopicPartition, VerificationGuard]], + any[ActionQueue] )).thenAnswer(_ => { capturedArgument.getValue.apply( Map(offsetTopicPartition -> diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index d43a12255989b..dd7712f0e2926 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -1181,6 +1181,7 @@ class GroupMetadataManagerTest { any(), any[Option[ReentrantLock]], any(), + any(), any()) verify(replicaManager).getMagic(any()) } @@ -1215,6 +1216,7 @@ class GroupMetadataManagerTest { any(), any[Option[ReentrantLock]], any(), + any(), any()) verify(replicaManager).getMagic(any()) } @@ -1288,6 +1290,7 @@ class GroupMetadataManagerTest { any(), any[Option[ReentrantLock]], any(), + any(), any()) // Will update sensor after commit assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count")) @@ -1328,7 +1331,8 @@ class GroupMetadataManagerTest { capturedResponseCallback.capture(), any[Option[ReentrantLock]], any(), - ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard))) + ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard)), + any()) verify(replicaManager).getMagic(any()) capturedResponseCallback.getValue.apply(Map(groupTopicPartition -> new PartitionResponse(Errors.NONE, 0L, RecordBatch.NO_TIMESTAMP, 0L))) @@ -1386,7 +1390,8 @@ class GroupMetadataManagerTest { any(), any[Option[ReentrantLock]], any(), - ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard))) + ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard)), + any()) verify(replicaManager).getMagic(any()) } @@ -1434,7 +1439,8 @@ class GroupMetadataManagerTest { any(), any[Option[ReentrantLock]], any(), - ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard))) + ArgumentMatchers.eq(Map(offsetTopicPartition -> verificationGuard)), + any()) verify(replicaManager).getMagic(any()) } @@ -1585,6 +1591,7 @@ class GroupMetadataManagerTest { any(), any[Option[ReentrantLock]], any(), + any(), any()) verify(replicaManager).getMagic(any()) assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count")) @@ -1690,6 +1697,7 @@ class GroupMetadataManagerTest { any(), any[Option[ReentrantLock]], any(), + any(), any()) verify(replicaManager, times(2)).getMagic(any()) } @@ -2797,6 +2805,7 @@ class GroupMetadataManagerTest { capturedArgument.capture(), any[Option[ReentrantLock]], any(), + any(), any()) capturedArgument } @@ -2810,6 +2819,7 @@ class GroupMetadataManagerTest { capturedCallback.capture(), any[Option[ReentrantLock]], any(), + any(), any() )).thenAnswer(_ => { capturedCallback.getValue.apply( diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java index fffdd83b2568e..854e3e9d90343 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java @@ -35,6 +35,7 @@ import org.apache.kafka.image.MetadataImage; import org.apache.kafka.server.util.timer.Timer; import org.apache.kafka.server.util.timer.TimerTask; +import org.apache.kafka.storage.internals.log.VerificationGuard; import org.apache.kafka.timeline.SnapshotRegistry; import org.slf4j.Logger; @@ -580,6 +581,11 @@ class CoordinatorWriteEvent implements CoordinatorEvent, DeferredEvent { */ final short producerEpoch; + /** + * The verification guard. + */ + final VerificationGuard verificationGuard; + /** * The write operation to execute. */ @@ -627,6 +633,7 @@ class CoordinatorWriteEvent implements CoordinatorEvent, DeferredEvent { null, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, + VerificationGuard.SENTINEL, writeTimeout, op ); @@ -640,6 +647,7 @@ class CoordinatorWriteEvent implements CoordinatorEvent, DeferredEvent { * @param transactionalId The transactional id. * @param producerId The producer id. * @param producerEpoch The producer epoch. + * @param verificationGuard The verification guard. * @param writeTimeout The write operation timeout * @param op The write operation. */ @@ -649,6 +657,7 @@ class CoordinatorWriteEvent implements CoordinatorEvent, DeferredEvent { String transactionalId, long producerId, short producerEpoch, + VerificationGuard verificationGuard, Duration writeTimeout, CoordinatorWriteOperation op ) { @@ -658,6 +667,7 @@ class CoordinatorWriteEvent implements CoordinatorEvent, DeferredEvent { this.transactionalId = transactionalId; this.producerId = producerId; this.producerEpoch = producerEpoch; + this.verificationGuard = verificationGuard; this.future = new CompletableFuture<>(); this.createdTimeMs = time.milliseconds(); this.writeTimeout = writeTimeout; @@ -717,6 +727,7 @@ public void run() { tp, producerId, producerEpoch, + verificationGuard, result.records() ); context.coordinator.updateLastWrittenOffset(offset); @@ -1466,17 +1477,25 @@ public CompletableFuture scheduleTransactionalWriteOperation( ) { throwIfNotRunning(); log.debug("Scheduled execution of transactional write operation {}.", name); - CoordinatorWriteEvent event = new CoordinatorWriteEvent<>( - name, + return partitionWriter.maybeStartTransactionVerification( tp, transactionalId, producerId, - producerEpoch, - timeout, - op - ); - enqueue(event); - return event.future; + producerEpoch + ).thenCompose(verificationGuard -> { + CoordinatorWriteEvent event = new CoordinatorWriteEvent<>( + name, + tp, + transactionalId, + producerId, + producerEpoch, + verificationGuard, + timeout, + op + ); + enqueue(event); + return event.future; + }); } /** diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java index e4270dd8fcb71..0d2f8161c5d14 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java @@ -19,8 +19,10 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.requests.TransactionResult; +import org.apache.kafka.storage.internals.log.VerificationGuard; import java.util.List; +import java.util.concurrent.CompletableFuture; /** * A simple interface to write records to Partitions/Logs. It contains the minimum @@ -84,10 +86,11 @@ void deregisterListener( * Write records to the partitions. Records are written in one batch so * atomicity is guaranteed. * - * @param tp The partition to write records to. - * @param producerId The producer id. - * @param producerEpoch The producer epoch. - * @param records The list of records. The records are written in a single batch. + * @param tp The partition to write records to. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @param verificationGuard The verification guard. + * @param records The list of records. The records are written in a single batch. * @return The log end offset right after the written records. * @throws KafkaException Any KafkaException caught during the write operation. */ @@ -95,6 +98,7 @@ long append( TopicPartition tp, long producerId, short producerEpoch, + VerificationGuard verificationGuard, List records ) throws KafkaException; @@ -116,4 +120,21 @@ long appendEndTransactionMarker( int coordinatorEpoch, TransactionResult result ) throws KafkaException; + + /** + * Verify the transaction. + * + * @param tp The partition to write records to. + * @param transactionalId The transactional id. + * @param producerId The producer id. + * @param producerEpoch The producer epoch. + * @return A future containing the {@link VerificationGuard} or an exception. + * @throws KafkaException Any KafkaException caught during the operation. + */ + CompletableFuture maybeStartTransactionVerification( + TopicPartition tp, + String transactionalId, + long producerId, + short producerEpoch + ) throws KafkaException; } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java index e0d4dd08cdfd2..d9c3ea3417977 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java @@ -19,6 +19,8 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.NotCoordinatorException; +import org.apache.kafka.common.errors.NotEnoughReplicasException; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.requests.TransactionResult; import org.apache.kafka.common.utils.LogContext; @@ -29,7 +31,9 @@ import org.apache.kafka.image.MetadataDelta; import org.apache.kafka.image.MetadataImage; import org.apache.kafka.image.MetadataProvenance; +import org.apache.kafka.server.util.FutureUtils; import org.apache.kafka.server.util.timer.MockTimer; +import org.apache.kafka.storage.internals.log.VerificationGuard; import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.TimelineHashMap; import org.apache.kafka.timeline.TimelineHashSet; @@ -67,6 +71,8 @@ import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyShort; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; @@ -208,6 +214,7 @@ public long append( TopicPartition tp, long producerId, short producerEpoch, + VerificationGuard verificationGuard, List records ) throws KafkaException { if (records.size() <= maxRecordsInBatch) { @@ -215,6 +222,7 @@ public long append( tp, producerId, producerEpoch, + verificationGuard, records ); } else { @@ -1117,6 +1125,15 @@ public CoordinatorShardBuilder get() { // Verify that the listener was registered. verify(writer, times(1)).registerListener(eq(TP), any()); + // Prepare the transaction verification. + VerificationGuard guard = new VerificationGuard(); + when(writer.maybeStartTransactionVerification( + TP, + "transactional-id", + 100L, + (short) 50 + )).thenReturn(CompletableFuture.completedFuture(guard)); + // Schedule a transactional write. runtime.scheduleTransactionalWriteOperation( "tnx-write", @@ -1134,6 +1151,7 @@ public CoordinatorShardBuilder get() { eq(TP), eq(100L), eq((short) 50), + eq(guard), eq(Arrays.asList("record1", "record2")) ); @@ -1151,6 +1169,75 @@ public CoordinatorShardBuilder get() { ); } + @Test + public void testScheduleTransactionalWriteOpWhenVerificationFails() { + MockTimer timer = new MockTimer(); + MockPartitionWriter writer = mock(MockPartitionWriter.class); + MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class); + MockCoordinatorShardBuilder shardBuilder = new MockCoordinatorShardBuilder() { + @Override + public MockCoordinatorShard build() { + return coordinator; + } + }; + MockCoordinatorShardBuilderSupplier shardBuilderSupplier = new MockCoordinatorShardBuilderSupplier() { + @Override + public CoordinatorShardBuilder get() { + return shardBuilder; + } + }; + + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(shardBuilderSupplier) + .withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class)) + .build(); + + // Schedule the loading. + runtime.scheduleLoadOperation(TP, 10); + + // Verify that the listener was registered. + verify(writer, times(1)).registerListener(eq(TP), any()); + + // Fail the transaction verification. + when(writer.maybeStartTransactionVerification( + TP, + "transactional-id", + 100L, + (short) 50 + )).thenReturn(FutureUtils.failedFuture(Errors.NOT_ENOUGH_REPLICAS.exception())); + + // Schedule a transactional write. + CompletableFuture future = runtime.scheduleTransactionalWriteOperation( + "tnx-write", + TP, + "transactional-id", + 100L, + (short) 50, + Duration.ofMillis(5000), + state -> new CoordinatorResult<>(Arrays.asList("record1", "record2"), "response") + ); + + // Verify that the future is failed with the expected exception. + assertFutureThrows(future, NotEnoughReplicasException.class); + + // Verify that the writer is not called. + verify(writer, times(0)).append( + any(), + anyLong(), + anyShort(), + any(), + any() + ); + } + @ParameterizedTest @EnumSource(value = TransactionResult.class) public void testScheduleTransactionCompletion(TransactionResult result) throws ExecutionException, InterruptedException, TimeoutException { diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java index 3cf8d480e46a6..7dea0b83f7ee9 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/InMemoryPartitionWriter.java @@ -20,12 +20,14 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.RecordBatch; import org.apache.kafka.common.requests.TransactionResult; +import org.apache.kafka.storage.internals.log.VerificationGuard; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @@ -223,6 +225,7 @@ public long append( TopicPartition tp, long producerId, short producerEpoch, + VerificationGuard verificationGuard, List records ) throws KafkaException { PartitionState state = partitionState(tp); @@ -266,6 +269,16 @@ public long appendEndTransactionMarker( } } + @Override + public CompletableFuture maybeStartTransactionVerification( + TopicPartition tp, + String transactionalId, + long producerId, + short producerEpoch + ) throws KafkaException { + return CompletableFuture.completedFuture(new VerificationGuard()); + } + public void commit( TopicPartition tp, long offset From cf0feff7efeb6fe922ca30f773f0fc580339f5a2 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Tue, 9 Jan 2024 14:30:47 +0100 Subject: [PATCH 2/8] address minor comments --- .../kafka/coordinator/group/CoordinatorPartitionWriter.scala | 4 +++- .../kafka/coordinator/group/runtime/PartitionWriter.java | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala index 33e33d54affee..dd6a49642160a 100644 --- a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala +++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala @@ -210,7 +210,9 @@ class CoordinatorPartitionWriter[T]( * @param transactionalId The transactional id. * @param producerId The producer id. * @param producerEpoch The producer epoch. - * @return A future containing the {@link VerificationGuard} or an exception. + * @return A future containing the {@link VerificationGuard} or an exception if the + * transaction requires a verification; or {@link VerificationGuard#SENTINEL} + * if the transaction has already been verified. * @throws KafkaException Any KafkaException caught during the operation. */ override def maybeStartTransactionVerification( diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java index 0d2f8161c5d14..8609b36827e52 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java @@ -128,7 +128,9 @@ long appendEndTransactionMarker( * @param transactionalId The transactional id. * @param producerId The producer id. * @param producerEpoch The producer epoch. - * @return A future containing the {@link VerificationGuard} or an exception. + * @return A future containing the {@link VerificationGuard} or an exception if the + * transaction requires a verification; or {@link VerificationGuard#SENTINEL} + * if the transaction has already been verified. * @throws KafkaException Any KafkaException caught during the operation. */ CompletableFuture maybeStartTransactionVerification( From bcebe9ff395498b1de569700e737986b0ce693c9 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Tue, 9 Jan 2024 14:31:08 +0100 Subject: [PATCH 3/8] Revert "MINOR: disable test_transactions with new group coordinator" This reverts commit ba49006561cd1ead97c67743d08797445ff1a6d7. --- tests/kafkatest/tests/core/transactions_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/kafkatest/tests/core/transactions_test.py b/tests/kafkatest/tests/core/transactions_test.py index 90d459bb8d641..b9b39f355e46a 100644 --- a/tests/kafkatest/tests/core/transactions_test.py +++ b/tests/kafkatest/tests/core/transactions_test.py @@ -257,7 +257,7 @@ def setup_topics(self): check_order=[True, False], use_group_metadata=[True, False], metadata_quorum=quorum.all_kraft, - use_new_coordinator=[False] + use_new_coordinator=[True, False] ) def test_transactions(self, failure_mode, bounce_target, check_order, use_group_metadata, metadata_quorum=quorum.zk, use_new_coordinator=False): security_protocol = 'PLAINTEXT' From 118e1af3e6c5fad49ff52244ecb4a869b6c1c776 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Wed, 10 Jan 2024 08:49:15 +0100 Subject: [PATCH 4/8] unwrap exception before converting it --- .../apache/kafka/coordinator/group/GroupCoordinatorService.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java index 2c95333b45e8a..c69739ac99bbf 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java @@ -1106,6 +1106,8 @@ private static boolean isGroupIdNotEmpty(String groupId) { * @return The Errors instance associated with the given exception. */ private static Errors normalizeException(Throwable exception) { + exception = Errors.maybeUnwrapException(exception); + if (exception instanceof UnknownTopicOrPartitionException || exception instanceof NotEnoughReplicasException || exception instanceof TimeoutException) { From 36b462860c66f389a9d937224a63aa10072e81d3 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Wed, 10 Jan 2024 09:37:18 +0100 Subject: [PATCH 5/8] address minor comment --- .../kafka/coordinator/group/runtime/PartitionWriter.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java index 8609b36827e52..d647fce149db9 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java @@ -128,9 +128,9 @@ long appendEndTransactionMarker( * @param transactionalId The transactional id. * @param producerId The producer id. * @param producerEpoch The producer epoch. - * @return A future containing the {@link VerificationGuard} or an exception if the - * transaction requires a verification; or {@link VerificationGuard#SENTINEL} - * if the transaction has already been verified. + * @return A future failed with any error encountered; or the {@link VerificationGuard} + * if the transaction required verification and {@link VerificationGuard#SENTINEL} + * if it did not. * @throws KafkaException Any KafkaException caught during the operation. */ CompletableFuture maybeStartTransactionVerification( From 8535113ae0a36f4a8166fb7dbdb0379a0d4f5014 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Wed, 10 Jan 2024 09:41:16 +0100 Subject: [PATCH 6/8] add test for the unwrapping --- .../group/GroupCoordinatorServiceTest.java | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java index 0d27cdb2f0bbc..8a89b9cbb1957 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.java @@ -87,6 +87,7 @@ import java.util.OptionalInt; import java.util.Properties; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -1909,6 +1910,56 @@ public void testCommitTransactionalOffsets() throws ExecutionException, Interrup assertEquals(response, future.get()); } + @Test + public void testCommitTransactionalOffsetsWithWrappedError() throws ExecutionException, InterruptedException { + CoordinatorRuntime runtime = mockRuntime(); + GroupCoordinatorService service = new GroupCoordinatorService( + new LogContext(), + createConfig(), + runtime, + new GroupCoordinatorMetrics() + ); + service.startup(() -> 1); + + TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData() + .setGroupId("foo") + .setTransactionalId("transactional-id") + .setProducerId(10L) + .setProducerEpoch((short) 5) + .setMemberId("member-id") + .setGenerationId(10) + .setTopics(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic() + .setName("topic") + .setPartitions(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition() + .setPartitionIndex(0) + .setCommittedOffset(100))))); + + TxnOffsetCommitResponseData response = new TxnOffsetCommitResponseData() + .setTopics(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic() + .setName("topic") + .setPartitions(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition() + .setPartitionIndex(0) + .setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()))))); + + when(runtime.scheduleTransactionalWriteOperation( + ArgumentMatchers.eq("txn-commit-offset"), + ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), + ArgumentMatchers.eq("transactional-id"), + ArgumentMatchers.eq(10L), + ArgumentMatchers.eq((short) 5), + ArgumentMatchers.eq(Duration.ofMillis(5000)), + ArgumentMatchers.any() + )).thenReturn(FutureUtils.failedFuture(new CompletionException(Errors.NOT_ENOUGH_REPLICAS.exception()))); + + CompletableFuture future = service.commitTransactionalOffsets( + requestContext(ApiKeys.TXN_OFFSET_COMMIT), + request, + BufferSupplier.NO_CACHING + ); + + assertEquals(response, future.get()); + } + @Test public void testCompleteTransaction() throws ExecutionException, InterruptedException { CoordinatorRuntime runtime = mockRuntime(); From b5d57a07d69d6d8e12652b580980704c7c7b2489 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Thu, 11 Jan 2024 08:33:31 +0100 Subject: [PATCH 7/8] update javadoc --- .../group/CoordinatorPartitionWriter.scala | 41 +++---------------- 1 file changed, 5 insertions(+), 36 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala index dd6a49642160a..2aba85b17af99 100644 --- a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala +++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala @@ -82,10 +82,7 @@ class CoordinatorPartitionWriter[T]( } /** - * Register a PartitionWriter.Listener. - * - * @param tp The partition to register the listener to. - * @param listener The listener. + * {@inheritDoc} */ override def registerListener( tp: TopicPartition, @@ -95,10 +92,7 @@ class CoordinatorPartitionWriter[T]( } /** - * Deregister a PartitionWriter.Listener. - * - * @param tp The partition to deregister the listener from. - * @param listener The listener. + * {@inheritDoc} */ override def deregisterListener( tp: TopicPartition, @@ -108,15 +102,7 @@ class CoordinatorPartitionWriter[T]( } /** - * Write records to the partitions. Records are written in one batch so - * atomicity is guaranteed. - * - * @param tp The partition to write records to. - * @param producerId The producer id. - * @param producerEpoch The producer epoch. - * @param records The list of records. The records are written in a single batch. - * @return The log end offset right after the written records. - * @throws KafkaException Any KafkaException caught during the write operation. + * {@inheritDoc} */ override def append( tp: TopicPartition, @@ -174,15 +160,7 @@ class CoordinatorPartitionWriter[T]( } /** - * Write the transaction end marker. - * - * @param tp The partition to write records to. - * @param producerId The producer id. - * @param producerEpoch The producer epoch. - * @param coordinatorEpoch The epoch of the transaction coordinator. - * @param result The transaction result. - * @return The log end offset right after the written records. - * @throws KafkaException Any KafkaException caught during the write operation. + * {@inheritDoc} */ override def appendEndTransactionMarker( tp: TopicPartition, @@ -204,16 +182,7 @@ class CoordinatorPartitionWriter[T]( } /** - * Verify the transaction. - * - * @param tp The partition to write records to. - * @param transactionalId The transactional id. - * @param producerId The producer id. - * @param producerEpoch The producer epoch. - * @return A future containing the {@link VerificationGuard} or an exception if the - * transaction requires a verification; or {@link VerificationGuard#SENTINEL} - * if the transaction has already been verified. - * @throws KafkaException Any KafkaException caught during the operation. + * {@inheritDoc} */ override def maybeStartTransactionVerification( tp: TopicPartition, From f711ad973f7e399ffd43e1f137abbe46a884daac Mon Sep 17 00:00:00 2001 From: David Jacot Date: Thu, 11 Jan 2024 08:40:22 +0100 Subject: [PATCH 8/8] small cleanup --- .../kafka/coordinator/group/CoordinatorPartitionWriter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala index 2aba85b17af99..2c69372558f34 100644 --- a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala +++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala @@ -27,7 +27,7 @@ import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.requests.TransactionResult import org.apache.kafka.common.utils.{BufferSupplier, Time} import org.apache.kafka.coordinator.group.runtime.PartitionWriter -import org.apache.kafka.storage.internals.log.{VerificationGuard} +import org.apache.kafka.storage.internals.log.VerificationGuard import java.util import java.util.concurrent.CompletableFuture