diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index a00fdecc58645..022298957faac 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -131,6 +131,7 @@ public Map errorCounts() { public static final class PartitionResponse { public Errors error; public long baseOffset; + public long lastOffset; public long logAppendTime; public long logStartOffset; public List recordErrors; @@ -153,8 +154,21 @@ public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long } public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, List recordErrors, String errorMessage) { + this(error, baseOffset, INVALID_OFFSET, logAppendTime, logStartOffset, recordErrors, errorMessage); + } + + public PartitionResponse( + Errors error, + long baseOffset, + long lastOffset, + long logAppendTime, + long logStartOffset, + List recordErrors, + String errorMessage + ) { this.error = error; this.baseOffset = baseOffset; + this.lastOffset = lastOffset; this.logAppendTime = logAppendTime; this.logStartOffset = logStartOffset; this.recordErrors = recordErrors; @@ -167,6 +181,7 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; PartitionResponse that = (PartitionResponse) o; return baseOffset == that.baseOffset && + lastOffset == that.lastOffset && logAppendTime == that.logAppendTime && logStartOffset == that.logStartOffset && error == that.error && @@ -176,7 +191,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(error, baseOffset, logAppendTime, logStartOffset, recordErrors, errorMessage); + return Objects.hash(error, baseOffset, lastOffset, logAppendTime, logStartOffset, recordErrors, errorMessage); } @Override @@ -187,6 +202,8 @@ public String toString() { b.append(error); b.append(",offset: "); b.append(baseOffset); + b.append(",lastOffset: "); + b.append(lastOffset); b.append(",logAppendTime: "); b.append(logAppendTime); b.append(", logStartOffset: "); diff --git a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala new file mode 100644 index 0000000000000..5ee576ff3c52e --- /dev/null +++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.coordinator.group + +import kafka.cluster.PartitionListener +import kafka.server.{ActionQueue, ReplicaManager} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.RecordTooLargeException +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.{CompressionType, MemoryRecords, TimestampType} +import org.apache.kafka.common.record.Record.EMPTY_HEADERS +import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse +import org.apache.kafka.common.utils.Time +import org.apache.kafka.coordinator.group.runtime.PartitionWriter +import org.apache.kafka.storage.internals.log.AppendOrigin + +import java.nio.ByteBuffer +import java.util +import scala.collection.Map + +/** + * ListenerAdapter adapts the PartitionListener interface to the + * PartitionWriter.Listener interface. + */ +private[group] class ListenerAdapter( + val listener: PartitionWriter.Listener +) extends PartitionListener { + override def onHighWatermarkUpdated( + tp: TopicPartition, + offset: Long + ): Unit = { + listener.onHighWatermarkUpdated(tp, offset) + } + + override def equals(that: Any): Boolean = that match { + case other: ListenerAdapter => listener.equals(other.listener) + case _ => false + } + + override def hashCode(): Int = { + listener.hashCode() + } + + override def toString: String = { + s"ListenerAdapter(listener=$listener)" + } +} + +class CoordinatorPartitionWriter[T]( + replicaManager: ReplicaManager, + serializer: PartitionWriter.Serializer[T], + compressionType: CompressionType, + time: Time +) extends PartitionWriter[T] { + // We use an action queue which directly executes actions. This is possible + // here because we don't hold any conflicting locks. + private val directActionQueue = new ActionQueue { + override def add(action: () => Unit): Unit = { + action() + } + + override def tryCompleteActions(): Unit = {} + } + + /** + * Register a PartitionWriter.Listener. + * + * @param tp The partition to register the listener to. + * @param listener The listener. + */ + override def registerListener( + tp: TopicPartition, + listener: PartitionWriter.Listener + ): Unit = { + replicaManager.maybeAddListener(tp, new ListenerAdapter(listener)) + } + + /** + * Deregister a PartitionWriter.Listener. + * + * @param tp The partition to deregister the listener from. + * @param listener The listener. + */ + override def deregisterListener( + tp: TopicPartition, + listener: PartitionWriter.Listener + ): Unit = { + replicaManager.removeListener(tp, new ListenerAdapter(listener)) + } + + /** + * Write records to the partitions. Records are written in one batch so + * atomicity is guaranteed. + * + * @param tp The partition to write records to. + * @param records The list of records. The records are written in a single batch. + * @return The log end offset right after the written records. + * @throws KafkaException Any KafkaException caught during the write operation. + */ + override def append( + tp: TopicPartition, + records: util.List[T] + ): Long = { + if (records.isEmpty) throw new IllegalStateException("records must be non-empty.") + + replicaManager.getLogConfig(tp) match { + case Some(logConfig) => + val magic = logConfig.recordVersion.value + val maxBatchSize = logConfig.maxMessageSize + val currentTimeMs = time.milliseconds() + + val recordsBuilder = MemoryRecords.builder( + ByteBuffer.allocate(math.min(16384, maxBatchSize)), + magic, + compressionType, + TimestampType.CREATE_TIME, + 0L, + maxBatchSize + ) + + records.forEach { record => + val keyBytes = serializer.serializeKey(record) + val valueBytes = serializer.serializeValue(record) + + if (recordsBuilder.hasRoomFor(currentTimeMs, keyBytes, valueBytes, EMPTY_HEADERS)) recordsBuilder.append( + currentTimeMs, + keyBytes, + valueBytes, + EMPTY_HEADERS + ) else throw new RecordTooLargeException(s"Message batch size is ${recordsBuilder.estimatedSizeInBytes()} bytes " + + s"in append to partition $tp which exceeds the maximum configured size of $maxBatchSize.") + } + + var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty + replicaManager.appendRecords( + timeout = 0L, + requiredAcks = 1, + internalTopicsAllowed = true, + origin = AppendOrigin.COORDINATOR, + entriesPerPartition = Map(tp -> recordsBuilder.build()), + responseCallback = results => appendResults = results, + // We can directly complete the purgatories here because we don't hold + // any conflicting locks. + actionQueue = directActionQueue + ) + + val partitionResult = appendResults.getOrElse(tp, + throw new IllegalStateException(s"Append status $appendResults should have partition $tp.")) + + if (partitionResult.error != Errors.NONE) { + throw partitionResult.error.exception() + } + + // Required offset. + partitionResult.lastOffset + 1 + + case None => + throw Errors.NOT_LEADER_OR_FOLLOWER.exception() + } + } +} diff --git a/core/src/main/scala/kafka/server/ActionQueue.scala b/core/src/main/scala/kafka/server/ActionQueue.scala index 1b6b8326fa4cf..d0791a98633fb 100644 --- a/core/src/main/scala/kafka/server/ActionQueue.scala +++ b/core/src/main/scala/kafka/server/ActionQueue.scala @@ -22,22 +22,32 @@ import java.util.concurrent.ConcurrentLinkedQueue import kafka.utils.Logging /** - * This queue is used to collect actions which need to be executed later. One use case is that ReplicaManager#appendRecords - * produces record changes so we need to check and complete delayed requests. In order to avoid conflicting locking, - * we add those actions to this queue and then complete them at the end of KafkaApis.handle() or DelayedJoin.onExpiration. + * The action queue is used to collect actions which need to be executed later. */ -class ActionQueue extends Logging { - private val queue = new ConcurrentLinkedQueue[() => Unit]() +trait ActionQueue { /** * add action to this queue. * @param action action */ - def add(action: () => Unit): Unit = queue.add(action) + def add(action: () => Unit): Unit /** * try to complete all delayed actions */ + def tryCompleteActions(): Unit +} + +/** + * This queue is used to collect actions which need to be executed later. One use case is that ReplicaManager#appendRecords + * produces record changes so we need to check and complete delayed requests. In order to avoid conflicting locking, + * we add those actions to this queue and then complete them at the end of KafkaApis.handle() or DelayedJoin.onExpiration. + */ +class DelayedActionQueue extends Logging with ActionQueue { + private val queue = new ConcurrentLinkedQueue[() => Unit]() + + def add(action: () => Unit): Unit = queue.add(action) + def tryCompleteActions(): Unit = { val maxToComplete = queue.size() var count = 0 diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index cf9e557aaac90..1d9e6d3801f3a 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -631,7 +631,7 @@ class ReplicaManager(val config: KafkaConfig, /** * TODO: move this action queue to handle thread so we can simplify concurrency handling */ - private val actionQueue = new ActionQueue + private val actionQueue = new DelayedActionQueue def tryCompleteActions(): Unit = actionQueue.tryCompleteActions() @@ -655,6 +655,7 @@ class ReplicaManager(val config: KafkaConfig, * @param requestLocal container for the stateful instances scoped to this request * @param transactionalId transactional ID if the request is from a producer and the producer is transactional * @param transactionStatePartition partition that holds the transactional state if transactionalId is present + * @param actionQueue the action queue to use. ReplicaManager#actionQueue is used by default. */ def appendRecords(timeout: Long, requiredAcks: Short, @@ -666,7 +667,8 @@ class ReplicaManager(val config: KafkaConfig, recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => (), requestLocal: RequestLocal = RequestLocal.NoCaching, transactionalId: String = null, - transactionStatePartition: Option[Int] = None): Unit = { + transactionStatePartition: Option[Int] = None, + actionQueue: ActionQueue = this.actionQueue): Unit = { if (isValidRequiredAcks(requiredAcks)) { val sTime = time.milliseconds @@ -722,6 +724,7 @@ class ReplicaManager(val config: KafkaConfig, new PartitionResponse( result.error, result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L), + result.info.lastOffset, result.info.logAppendTime, result.info.logStartOffset, result.info.recordErrors, @@ -731,23 +734,21 @@ class ReplicaManager(val config: KafkaConfig, } actionQueue.add { - () => - allResults.foreach { - case (topicPartition, result) => - val requestKey = TopicPartitionOperationKey(topicPartition) - result.info.leaderHwChange match { - case LeaderHwChange.INCREASED => - // some delayed operations may be unblocked after HW changed - delayedProducePurgatory.checkAndComplete(requestKey) - delayedFetchPurgatory.checkAndComplete(requestKey) - delayedDeleteRecordsPurgatory.checkAndComplete(requestKey) - case LeaderHwChange.SAME => - // probably unblock some follower fetch requests since log end offset has been updated - delayedFetchPurgatory.checkAndComplete(requestKey) - case LeaderHwChange.NONE => - // nothing - } + () => allResults.foreach { case (topicPartition, result) => + val requestKey = TopicPartitionOperationKey(topicPartition) + result.info.leaderHwChange match { + case LeaderHwChange.INCREASED => + // some delayed operations may be unblocked after HW changed + delayedProducePurgatory.checkAndComplete(requestKey) + delayedFetchPurgatory.checkAndComplete(requestKey) + delayedDeleteRecordsPurgatory.checkAndComplete(requestKey) + case LeaderHwChange.SAME => + // probably unblock some follower fetch requests since log end offset has been updated + delayedFetchPurgatory.checkAndComplete(requestKey) + case LeaderHwChange.NONE => + // nothing } + } } recordConversionStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordConversionStats }) @@ -764,7 +765,6 @@ class ReplicaManager(val config: KafkaConfig, // this is because while the delayed produce operation is being created, new // requests may arrive and hence make this operation completable. delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys) - } else { // we can respond immediately val produceResponseStatus = produceStatus.map { case (k, status) => k -> status.responseStatus } diff --git a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala index 492e384390683..ab97b1c08f626 100644 --- a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala @@ -180,7 +180,8 @@ object AbstractCoordinatorConcurrencyTest { processingStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => (), requestLocal: RequestLocal = RequestLocal.NoCaching, transactionalId: String = null, - transactionStatePartition: Option[Int]): Unit = { + transactionStatePartition: Option[Int], + actionQueue: ActionQueue = null): Unit = { if (entriesPerPartition.isEmpty) return diff --git a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala new file mode 100644 index 0000000000000..436458ccc4858 --- /dev/null +++ b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.coordinator.group + +import kafka.server.ReplicaManager +import kafka.utils.TestUtils +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.config.TopicConfig +import org.apache.kafka.common.errors.{NotLeaderOrFollowerException, RecordTooLargeException} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch} +import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse +import org.apache.kafka.common.utils.{MockTime, Time} +import org.apache.kafka.coordinator.group.runtime.PartitionWriter +import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig} +import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows} +import org.junit.jupiter.api.Test +import org.mockito.{ArgumentCaptor, ArgumentMatchers} +import org.mockito.Mockito.{mock, verify, when} + +import java.nio.charset.Charset +import java.util.{Collections, Properties} +import scala.collection.Map +import scala.jdk.CollectionConverters._ + +class StringKeyValueSerializer extends PartitionWriter.Serializer[(String, String)] { + override def serializeKey(record: (String, String)): Array[Byte] = { + record._1.getBytes(Charset.defaultCharset()) + } + + override def serializeValue(record: (String, String)): Array[Byte] = { + record._2.getBytes(Charset.defaultCharset()) + } +} + +class CoordinatorPartitionWriterTest { + @Test + def testRegisterDeregisterListener(): Unit = { + val tp = new TopicPartition("foo", 0) + val replicaManager = mock(classOf[ReplicaManager]) + val partitionRecordWriter = new CoordinatorPartitionWriter( + replicaManager, + new StringKeyValueSerializer(), + CompressionType.NONE, + Time.SYSTEM + ) + + val listener = new PartitionWriter.Listener { + override def onHighWatermarkUpdated(tp: TopicPartition, offset: Long): Unit = {} + } + + partitionRecordWriter.registerListener(tp, listener) + verify(replicaManager).maybeAddListener(tp, new ListenerAdapter(listener)) + + partitionRecordWriter.deregisterListener(tp, listener) + verify(replicaManager).removeListener(tp, new ListenerAdapter(listener)) + + assertEquals( + new ListenerAdapter(listener), + new ListenerAdapter(listener) + ) + assertEquals( + new ListenerAdapter(listener).hashCode(), + new ListenerAdapter(listener).hashCode() + ) + } + + @Test + def testWriteRecords(): Unit = { + val tp = new TopicPartition("foo", 0) + val replicaManager = mock(classOf[ReplicaManager]) + val time = new MockTime() + val partitionRecordWriter = new CoordinatorPartitionWriter( + replicaManager, + new StringKeyValueSerializer(), + CompressionType.NONE, + time + ) + + when(replicaManager.getLogConfig(tp)).thenReturn(Some(LogConfig.fromProps( + Collections.emptyMap(), + new Properties() + ))) + + val recordsCapture: ArgumentCaptor[Map[TopicPartition, MemoryRecords]] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, MemoryRecords]]) + val callbackCapture: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) + + when(replicaManager.appendRecords( + ArgumentMatchers.eq(0L), + ArgumentMatchers.eq(1.toShort), + ArgumentMatchers.eq(true), + ArgumentMatchers.eq(AppendOrigin.COORDINATOR), + recordsCapture.capture(), + callbackCapture.capture(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + )).thenAnswer( _ => { + callbackCapture.getValue.apply(Map( + tp -> new PartitionResponse( + Errors.NONE, + 5, + 10, + RecordBatch.NO_TIMESTAMP, + -1, + Collections.emptyList(), + "" + ) + )) + }) + + val records = List( + ("k0", "v0"), + ("k1", "v1"), + ("k2", "v2"), + ) + + assertEquals(11, partitionRecordWriter.append(tp, records.asJava)) + + val batch = recordsCapture.getValue.getOrElse(tp, + throw new AssertionError(s"No records for $tp")) + assertEquals(1, batch.batches().asScala.toList.size) + + val receivedRecords = batch.records.asScala.map { record => + ( + Charset.defaultCharset().decode(record.key).toString, + Charset.defaultCharset().decode(record.value).toString, + ) + }.toList + + assertEquals(records, receivedRecords) + } + + @Test + def testWriteRecordsWithFailure(): Unit = { + val tp = new TopicPartition("foo", 0) + val replicaManager = mock(classOf[ReplicaManager]) + val time = new MockTime() + val partitionRecordWriter = new CoordinatorPartitionWriter( + replicaManager, + new StringKeyValueSerializer(), + CompressionType.NONE, + time + ) + + when(replicaManager.getLogConfig(tp)).thenReturn(Some(LogConfig.fromProps( + Collections.emptyMap(), + new Properties() + ))) + + val recordsCapture: ArgumentCaptor[Map[TopicPartition, MemoryRecords]] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, MemoryRecords]]) + val callbackCapture: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) + + when(replicaManager.appendRecords( + ArgumentMatchers.eq(0L), + ArgumentMatchers.eq(1.toShort), + ArgumentMatchers.eq(true), + ArgumentMatchers.eq(AppendOrigin.COORDINATOR), + recordsCapture.capture(), + callbackCapture.capture(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + )).thenAnswer(_ => { + callbackCapture.getValue.apply(Map( + tp -> new PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER) + )) + }) + + val records = List( + ("k0", "v0"), + ("k1", "v1"), + ("k2", "v2"), + ) + + assertThrows(classOf[NotLeaderOrFollowerException], + () => partitionRecordWriter.append(tp, records.asJava)) + } + + @Test + def testWriteRecordTooLarge(): Unit = { + val tp = new TopicPartition("foo", 0) + val replicaManager = mock(classOf[ReplicaManager]) + val partitionRecordWriter = new CoordinatorPartitionWriter( + replicaManager, + new StringKeyValueSerializer(), + CompressionType.NONE, + Time.SYSTEM + ) + + val maxBatchSize = 16384 + when(replicaManager.getLogConfig(tp)).thenReturn(Some(LogConfig.fromProps( + Map(TopicConfig.MAX_MESSAGE_BYTES_CONFIG -> maxBatchSize).asJava, + new Properties() + ))) + + val randomBytes = TestUtils.randomBytes(maxBatchSize + 1) + // We need more than one record here because the first record + // is always allowed by the MemoryRecordsBuilder. + val records = List( + ("k0", new String(randomBytes)), + ("k1", new String(randomBytes)), + ) + + assertThrows(classOf[RecordTooLargeException], + () => partitionRecordWriter.append(tp, records.asJava)) + } + + @Test + def testWriteEmptyRecordList(): Unit = { + val tp = new TopicPartition("foo", 0) + val replicaManager = mock(classOf[ReplicaManager]) + val partitionRecordWriter = new CoordinatorPartitionWriter( + replicaManager, + new StringKeyValueSerializer(), + CompressionType.NONE, + Time.SYSTEM + ) + + when(replicaManager.getLogConfig(tp)).thenReturn(Some(LogConfig.fromProps( + Collections.emptyMap(), + new Properties() + ))) + + assertThrows(classOf[IllegalStateException], + () => partitionRecordWriter.append(tp, List.empty.asJava)) + } + + @Test + def testNonexistentPartition(): Unit = { + val tp = new TopicPartition("foo", 0) + val replicaManager = mock(classOf[ReplicaManager]) + val partitionRecordWriter = new CoordinatorPartitionWriter( + replicaManager, + new StringKeyValueSerializer(), + CompressionType.NONE, + Time.SYSTEM + ) + + when(replicaManager.getLogConfig(tp)).thenReturn(None) + + val records = List( + ("k0", "v0"), + ("k1", "v1"), + ("k2", "v2"), + ) + + assertThrows(classOf[NotLeaderOrFollowerException], + () => partitionRecordWriter.append(tp, records.asJava)) + } +} diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 403a0c0885ae4..3a7f6f91a9df6 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -3865,6 +3865,7 @@ class GroupCoordinatorTest { any(), any(), any(), + any(), any() )).thenAnswer(_ => { capturedArgument.getValue.apply( @@ -3901,6 +3902,7 @@ class GroupCoordinatorTest { any(), any(), any(), + any(), any())).thenAnswer(_ => { capturedArgument.getValue.apply( Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) -> @@ -4047,6 +4049,7 @@ class GroupCoordinatorTest { any(), any(), any(), + any(), any()) ).thenAnswer(_ => { capturedArgument.getValue.apply( @@ -4082,6 +4085,7 @@ class GroupCoordinatorTest { any(), any(), any(), + any(), any()) ).thenAnswer(_ => { capturedArgument.getValue.apply( diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index da01e46162684..03de6484c7fa9 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -1185,6 +1185,7 @@ class GroupMetadataManagerTest { any(), any(), any(), + any(), any()) verify(replicaManager).getMagic(any()) } @@ -1223,6 +1224,7 @@ class GroupMetadataManagerTest { any(), any(), any(), + any(), any()) verify(replicaManager).getMagic(any()) } @@ -1299,6 +1301,7 @@ class GroupMetadataManagerTest { any(), any(), any(), + any(), any()) // Will update sensor after commit assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count")) @@ -1341,6 +1344,7 @@ class GroupMetadataManagerTest { any(), any(), any(), + any(), any()) verify(replicaManager).getMagic(any()) capturedResponseCallback.getValue.apply(Map(groupTopicPartition -> @@ -1401,6 +1405,7 @@ class GroupMetadataManagerTest { any(), any(), any(), + any(), any()) verify(replicaManager).getMagic(any()) } @@ -1451,6 +1456,7 @@ class GroupMetadataManagerTest { any(), any(), any(), + any(), any()) verify(replicaManager).getMagic(any()) } @@ -1603,6 +1609,7 @@ class GroupMetadataManagerTest { any(), any(), any(), + any(), any()) verify(replicaManager).getMagic(any()) assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count")) @@ -1710,6 +1717,7 @@ class GroupMetadataManagerTest { any(), any(), any(), + any(), any()) verify(replicaManager, times(2)).getMagic(any()) } @@ -2817,6 +2825,7 @@ class GroupMetadataManagerTest { any(), any(), any(), + any(), any()) capturedArgument } @@ -2834,6 +2843,7 @@ class GroupMetadataManagerTest { any(), any(), any(), + any(), any()) ).thenAnswer(_ => { capturedCallback.getValue.apply( diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index 78c5a5e3ce8b6..67919e5c49c9c 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -658,6 +658,7 @@ class TransactionStateManagerTest { any(), any(), any(), + any(), any() ) @@ -703,6 +704,7 @@ class TransactionStateManagerTest { any(), any(), any(), + any(), any() ) @@ -745,6 +747,7 @@ class TransactionStateManagerTest { any(), any(), any(), + any(), any()) assertEquals(Set.empty, listExpirableTransactionalIds()) @@ -803,6 +806,7 @@ class TransactionStateManagerTest { any(), any(), any(), + any(), any() ) @@ -953,6 +957,7 @@ class TransactionStateManagerTest { any(), any(), any(), + any(), any() )).thenAnswer(_ => callbackCapture.getValue.apply( recordsCapture.getValue.map { case (topicPartition, records) => @@ -1105,6 +1110,7 @@ class TransactionStateManagerTest { any(), any(), any(), + any(), any()) ).thenAnswer(_ => capturedArgument.getValue.apply( Map(new TopicPartition(TRANSACTION_STATE_TOPIC_NAME, partitionId) -> diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 8007c873ccc6a..3c16f4d5fc6c2 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -2310,6 +2310,7 @@ class KafkaApisTest { any(), any(), any(), + any(), any()) ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new PartitionResponse(Errors.INVALID_PRODUCER_EPOCH)))) @@ -2378,7 +2379,8 @@ class KafkaApisTest { any(), any(), ArgumentMatchers.eq(transactionalId), - ArgumentMatchers.eq(Some(transactionCoordinatorPartition))) + ArgumentMatchers.eq(Some(transactionCoordinatorPartition)), + any()) } } @@ -2505,6 +2507,7 @@ class KafkaApisTest { any(), ArgumentMatchers.eq(requestLocal), any(), + any(), any()) ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE)))) @@ -2637,6 +2640,7 @@ class KafkaApisTest { any(), ArgumentMatchers.eq(requestLocal), any(), + any(), any()) ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE)))) @@ -2671,6 +2675,7 @@ class KafkaApisTest { any(), ArgumentMatchers.eq(requestLocal), any(), + any(), any()) } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerializer.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerializer.java new file mode 100644 index 0000000000000..4736ebe5a4dcd --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerializer.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.protocol.MessageUtil; +import org.apache.kafka.coordinator.group.runtime.PartitionWriter; + +/** + * Serializer which serializes {{@link Record}} to bytes. + */ +public class RecordSerializer implements PartitionWriter.Serializer { + @Override + public byte[] serializeKey(Record record) { + // Record does not accept a null key. + return MessageUtil.toVersionPrefixedBytes( + record.key().version(), + record.key().message() + ); + } + + @Override + public byte[] serializeValue(Record record) { + // Tombstone is represented with a null value. + if (record.value() == null) { + return null; + } else { + return MessageUtil.toVersionPrefixedBytes( + record.value().version(), + record.value().message() + ); + } + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java new file mode 100644 index 0000000000000..1f8d4119f7ea5 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; + +import java.util.List; + +/** + * A simple interface to write records to Partitions/Logs. It contains the minimum + * required for coordinators. + * + * @param The record type. + */ +public interface PartitionWriter { + + /** + * Serializer to translate T to bytes. + * + * @param The record type. + */ + interface Serializer { + /** + * Serializes the key of the record. + */ + byte[] serializeKey(T record); + + /** + * Serializes the value of the record. + */ + byte[] serializeValue(T record); + } + + /** + * Listener allowing to listen to high watermark changes. This is meant + * to be used in conjunction with {{@link PartitionWriter#append(TopicPartition, List)}}. + */ + interface Listener { + void onHighWatermarkUpdated( + TopicPartition tp, + long offset + ); + } + + /** + * Register a {{@link Listener}}. + * + * @param tp The partition to register the listener to. + * @param listener The listener. + */ + void registerListener( + TopicPartition tp, + Listener listener + ); + + /** + * Deregister a {{@link Listener}}. + * + * @param tp The partition to deregister the listener from. + * @param listener The listener. + */ + void deregisterListener( + TopicPartition tp, + Listener listener + ); + + /** + * Write records to the partitions. Records are written in one batch so + * atomicity is guaranteed. + * + * @param tp The partition to write records to. + * @param records The list of records. The records are written in a single batch. + * @return The log end offset right after the written records. + * @throws KafkaException Any KafkaException caught during the write operation. + */ + long append( + TopicPartition tp, + List records + ) throws KafkaException; +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordSerializerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordSerializerTest.java new file mode 100644 index 0000000000000..dfb4f375ea218 --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordSerializerTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.protocol.MessageUtil; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class RecordSerializerTest { + @Test + public void testSerializeKey() { + RecordSerializer serializer = new RecordSerializer(); + Record record = new Record( + new ApiMessageAndVersion( + new ConsumerGroupMetadataKey().setGroupId("group"), + (short) 1 + ), + new ApiMessageAndVersion( + new ConsumerGroupMetadataValue().setEpoch(10), + (short) 0 + ) + ); + + assertArrayEquals( + MessageUtil.toVersionPrefixedBytes(record.key().version(), record.key().message()), + serializer.serializeKey(record) + ); + } + + @Test + public void testSerializeValue() { + RecordSerializer serializer = new RecordSerializer(); + Record record = new Record( + new ApiMessageAndVersion( + new ConsumerGroupMetadataKey().setGroupId("group"), + (short) 1 + ), + new ApiMessageAndVersion( + new ConsumerGroupMetadataValue().setEpoch(10), + (short) 0 + ) + ); + + assertArrayEquals( + MessageUtil.toVersionPrefixedBytes(record.value().version(), record.value().message()), + serializer.serializeValue(record) + ); + } + + @Test + public void testSerializeNullValue() { + RecordSerializer serializer = new RecordSerializer(); + Record record = new Record( + new ApiMessageAndVersion( + new ConsumerGroupMetadataKey().setGroupId("group"), + (short) 1 + ), + null + ); + + assertNull(serializer.serializeValue(record)); + } +}