From 4dd01b4e10d94f20f1fd7b564e20a95c86943082 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Thu, 4 May 2023 14:44:51 +0200 Subject: [PATCH 1/9] KAFKA-14462; [14/N]; Add PartitionWriter --- .../group/PartitionWriterImpl.scala | 144 +++++++++++ .../scala/kafka/server/ReplicaManager.scala | 49 ++-- .../group/PartitionWriterImplTest.scala | 226 ++++++++++++++++++ .../coordinator/group/RecordSerializer.java | 47 ++++ .../kafka/coordinator/group/Result.java | 2 +- .../group/runtime/PartitionWriter.java | 95 ++++++++ .../group/RecordSerializerTest.java | 82 +++++++ 7 files changed, 625 insertions(+), 20 deletions(-) create mode 100644 core/src/main/scala/kafka/coordinator/group/PartitionWriterImpl.scala create mode 100644 core/src/test/scala/unit/kafka/coordinator/group/PartitionWriterImplTest.scala create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerializer.java create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java create mode 100644 group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordSerializerTest.java diff --git a/core/src/main/scala/kafka/coordinator/group/PartitionWriterImpl.scala b/core/src/main/scala/kafka/coordinator/group/PartitionWriterImpl.scala new file mode 100644 index 0000000000000..6aabd323b2d46 --- /dev/null +++ b/core/src/main/scala/kafka/coordinator/group/PartitionWriterImpl.scala @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.coordinator.group + +import kafka.cluster.PartitionListener +import kafka.server.{ReplicaManager, RequestLocal} +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.RecordTooLargeException +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.{CompressionType, MemoryRecords, TimestampType} +import org.apache.kafka.common.record.Record.EMPTY_HEADERS +import org.apache.kafka.common.utils.Time +import org.apache.kafka.coordinator.group.runtime.PartitionWriter +import org.apache.kafka.storage.internals.log.AppendOrigin + +import java.nio.ByteBuffer +import java.util +import scala.collection.Map + +private[group] class ListenerAdaptor( + val listener: PartitionWriter.Listener +) extends PartitionListener { + override def onHighWatermarkUpdated( + tp: TopicPartition, + offset: Long + ): Unit = { + listener.onHighWatermarkUpdated(tp, offset) + } + + override def equals(that: Any): Boolean = that match { + case other: ListenerAdaptor => listener.equals(other.listener) + case _ => false + } + + override def hashCode(): Int = { + listener.hashCode() + } + + override def toString: String = { + s"ListenerAdaptor(listener=$listener)" + } +} + +class PartitionWriterImpl[T]( + replicaManager: ReplicaManager, + serializer: PartitionWriter.Serializer[T], + compressionType: CompressionType, + time: Time +) extends PartitionWriter[T] { + + override def registerListener( + tp: TopicPartition, + listener: PartitionWriter.Listener + ): Unit = { + replicaManager.maybeAddListener(tp, new ListenerAdaptor(listener)) + } + + override def deregisterListener( + tp: TopicPartition, + listener: PartitionWriter.Listener + ): Unit = { + replicaManager.removeListener(tp, new ListenerAdaptor(listener)) + } + + override def append( + tp: TopicPartition, + records: util.List[T] + ): Long = { + if (records.isEmpty) { + throw new IllegalStateException("records must be non-empty.") + } + + replicaManager.getLogConfig(tp) match { + case Some(logConfig) => + val magic = logConfig.recordVersion.value + val maxBatchSize = logConfig.maxMessageSize + val currentTimeMs = time.milliseconds() + + val recordsBuilder = MemoryRecords.builder( + ByteBuffer.allocate(math.min(16384, maxBatchSize)), + magic, + compressionType, + TimestampType.CREATE_TIME, + 0L, + maxBatchSize + ) + + records.forEach { record => + val keyBytes = serializer.serializeKey(record) + val valueBytes = serializer.serializeValue(record) + + if (recordsBuilder.hasRoomFor(currentTimeMs, keyBytes, valueBytes, EMPTY_HEADERS)) { + recordsBuilder.append( + currentTimeMs, + keyBytes, + valueBytes, + EMPTY_HEADERS + ) + } else { + throw new RecordTooLargeException(s"Message batch size is ${recordsBuilder.estimatedSizeInBytes()} bytes " + + s"in append to partition $tp which exceeds the maximum configured size of $maxBatchSize.") + } + } + + val appendResults = replicaManager.appendToLocalLog( + internalTopicsAllowed = true, + origin = AppendOrigin.COORDINATOR, + entriesPerPartition = Map(tp -> recordsBuilder.build()), + requiredAcks = 1, + requestLocal = RequestLocal.NoCaching + ) + + val partitionResult = appendResults.getOrElse(tp, + throw new IllegalStateException("Append status %s should only have one partition %s" + .format(appendResults, tp))) + + // Complete delayed operations. + replicaManager.maybeCompletePurgatories( + tp, + partitionResult.info.leaderHwChange + ) + + // Required offset. + partitionResult.info.lastOffset + 1 + + case None => + throw Errors.NOT_LEADER_OR_FOLLOWER.exception() + } + } +} diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 5980ebef2219d..380d62702463b 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -687,19 +687,10 @@ class ReplicaManager(val config: KafkaConfig, () => allResults.foreach { case (topicPartition, result) => - val requestKey = TopicPartitionOperationKey(topicPartition) - result.info.leaderHwChange match { - case LeaderHwChange.INCREASED => - // some delayed operations may be unblocked after HW changed - delayedProducePurgatory.checkAndComplete(requestKey) - delayedFetchPurgatory.checkAndComplete(requestKey) - delayedDeleteRecordsPurgatory.checkAndComplete(requestKey) - case LeaderHwChange.SAME => - // probably unblock some follower fetch requests since log end offset has been updated - delayedFetchPurgatory.checkAndComplete(requestKey) - case LeaderHwChange.NONE => - // nothing - } + maybeCompletePurgatories( + topicPartition, + result.info.leaderHwChange + ) } } @@ -769,6 +760,25 @@ class ReplicaManager(val config: KafkaConfig, } } + def maybeCompletePurgatories( + topicPartition: TopicPartition, + leaderHwChange: LeaderHwChange + ): Unit = { + val requestKey = TopicPartitionOperationKey(topicPartition) + leaderHwChange match { + case LeaderHwChange.INCREASED => + // some delayed operations may be unblocked after HW changed + delayedProducePurgatory.checkAndComplete(requestKey) + delayedFetchPurgatory.checkAndComplete(requestKey) + delayedDeleteRecordsPurgatory.checkAndComplete(requestKey) + case LeaderHwChange.SAME => + // probably unblock some follower fetch requests since log end offset has been updated + delayedFetchPurgatory.checkAndComplete(requestKey) + case LeaderHwChange.NONE => + // nothing + } + } + /** * Delete records on leader replicas of the partition, and wait for delete records operation be propagated to other replicas; * the callback function will be triggered either when timeout or logStartOffset of all live replicas have reached the specified offset @@ -1019,13 +1029,14 @@ class ReplicaManager(val config: KafkaConfig, } /** - * Append the messages to the local replica logs + * Append the messages to the local replica logs. ReplicaManager#appendRecords should usually be + * used instead of this method. */ - private def appendToLocalLog(internalTopicsAllowed: Boolean, - origin: AppendOrigin, - entriesPerPartition: Map[TopicPartition, MemoryRecords], - requiredAcks: Short, - requestLocal: RequestLocal): Map[TopicPartition, LogAppendResult] = { + def appendToLocalLog(internalTopicsAllowed: Boolean, + origin: AppendOrigin, + entriesPerPartition: Map[TopicPartition, MemoryRecords], + requiredAcks: Short, + requestLocal: RequestLocal): Map[TopicPartition, LogAppendResult] = { val traceEnabled = isTraceEnabled def processFailedRecord(topicPartition: TopicPartition, t: Throwable) = { val logStartOffset = onlinePartition(topicPartition).map(_.logStartOffset).getOrElse(-1L) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/PartitionWriterImplTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/PartitionWriterImplTest.scala new file mode 100644 index 0000000000000..a96a37687e634 --- /dev/null +++ b/core/src/test/scala/unit/kafka/coordinator/group/PartitionWriterImplTest.scala @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package kafka.coordinator.group + +import kafka.server.{LogAppendResult, ReplicaManager, RequestLocal} +import kafka.utils.TestUtils +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.config.TopicConfig +import org.apache.kafka.common.errors.{NotLeaderOrFollowerException, RecordTooLargeException} +import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, RecordConversionStats} +import org.apache.kafka.common.utils.{MockTime, Time} +import org.apache.kafka.coordinator.group.runtime.PartitionWriter +import org.apache.kafka.storage.internals.log.{AppendOrigin, LeaderHwChange, LogAppendInfo, LogConfig} +import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows} +import org.junit.jupiter.api.Test +import org.mockito.{ArgumentCaptor, ArgumentMatchers} +import org.mockito.Mockito.{mock, verify, when} + +import java.nio.charset.Charset +import java.util.{Collections, Optional, OptionalInt, Properties} +import scala.jdk.CollectionConverters._ + +class StringKeyValueSerializer extends PartitionWriter.Serializer[(String, String)] { + override def serializeKey(record: (String, String)): Array[Byte] = { + record._1.getBytes(Charset.defaultCharset()) + } + + override def serializeValue(record: (String, String)): Array[Byte] = { + record._2.getBytes(Charset.defaultCharset()) + } +} + +class PartitionWriterImplTest { + @Test + def testRegisterDeregisterListener(): Unit = { + val tp = new TopicPartition("foo", 0) + val replicaManager = mock(classOf[ReplicaManager]) + val partitionRecordWriter = new PartitionWriterImpl( + replicaManager, + new StringKeyValueSerializer(), + CompressionType.NONE, + Time.SYSTEM + ) + + val listener = new PartitionWriter.Listener { + override def onHighWatermarkUpdated(tp: TopicPartition, offset: Long): Unit = {} + } + + partitionRecordWriter.registerListener(tp, listener) + verify(replicaManager).maybeAddListener(tp, new ListenerAdaptor(listener)) + + partitionRecordWriter.deregisterListener(tp, listener) + verify(replicaManager).removeListener(tp, new ListenerAdaptor(listener)) + + assertEquals( + new ListenerAdaptor(listener), + new ListenerAdaptor(listener) + ) + assertEquals( + new ListenerAdaptor(listener).hashCode(), + new ListenerAdaptor(listener).hashCode() + ) + } + + @Test + def testWriteRecords(): Unit = { + val tp = new TopicPartition("foo", 0) + val replicaManager = mock(classOf[ReplicaManager]) + val time = new MockTime() + val partitionRecordWriter = new PartitionWriterImpl( + replicaManager, + new StringKeyValueSerializer(), + CompressionType.NONE, + time + ) + + when(replicaManager.getLogConfig(tp)).thenReturn(Some(LogConfig.fromProps( + Collections.emptyMap(), + new Properties() + ))) + + val recordsCapture: ArgumentCaptor[Map[TopicPartition, MemoryRecords]] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, MemoryRecords]]) + when(replicaManager.appendToLocalLog( + ArgumentMatchers.eq(true), + ArgumentMatchers.eq(AppendOrigin.COORDINATOR), + recordsCapture.capture(), + ArgumentMatchers.eq(1), + ArgumentMatchers.eq(RequestLocal.NoCaching) + )).thenReturn(Map(tp -> LogAppendResult(new LogAppendInfo( + Optional.empty(), + 10, + OptionalInt.empty(), + RecordBatch.NO_TIMESTAMP, + -1L, + RecordBatch.NO_TIMESTAMP, + -1L, + RecordConversionStats.EMPTY, + CompressionType.NONE, + CompressionType.NONE, + -1, + -1, + false, + -1L, + Collections.emptyList(), + "", + LeaderHwChange.INCREASED + )))) + + val records = List( + ("k0", "v0"), + ("k1", "v1"), + ("k2", "v2"), + ) + + assertEquals(11, partitionRecordWriter.append( + tp, + records.asJava + )) + + verify(replicaManager).maybeCompletePurgatories( + tp, + LeaderHwChange.INCREASED + ) + + val batch = recordsCapture.getValue.getOrElse(tp, + throw new AssertionError(s"No records for $tp")) + assertEquals(1, batch.batches().asScala.toList.size) + + val receivedRecords = batch.records.asScala.map { record => + ( + Charset.defaultCharset().decode(record.key).toString, + Charset.defaultCharset().decode(record.value).toString, + + ) + }.toList + + assertEquals(records, receivedRecords) + } + + @Test + def testWriteRecordTooLarge(): Unit = { + val tp = new TopicPartition("foo", 0) + val replicaManager = mock(classOf[ReplicaManager]) + val partitionRecordWriter = new PartitionWriterImpl( + replicaManager, + new StringKeyValueSerializer(), + CompressionType.NONE, + Time.SYSTEM + ) + + val maxBatchSize = 16384 + when(replicaManager.getLogConfig(tp)).thenReturn(Some(LogConfig.fromProps( + Map(TopicConfig.MAX_MESSAGE_BYTES_CONFIG -> maxBatchSize).asJava, + new Properties() + ))) + + val randomBytes = TestUtils.randomBytes(maxBatchSize + 1) + // We need more than one record here because the first record + // is always allowed by the MemoryRecordsBuilder. + val records = List( + ("k0", new String(randomBytes)), + ("k1", new String(randomBytes)), + ) + + assertThrows(classOf[RecordTooLargeException], + () => partitionRecordWriter.append(tp, records.asJava)) + } + + @Test + def testWriteEmptyRecordList(): Unit = { + val tp = new TopicPartition("foo", 0) + val replicaManager = mock(classOf[ReplicaManager]) + val partitionRecordWriter = new PartitionWriterImpl( + replicaManager, + new StringKeyValueSerializer(), + CompressionType.NONE, + Time.SYSTEM + ) + + when(replicaManager.getLogConfig(tp)).thenReturn(Some(LogConfig.fromProps( + Collections.emptyMap(), + new Properties() + ))) + + assertThrows(classOf[IllegalStateException], + () => partitionRecordWriter.append(tp, List.empty.asJava)) + } + + @Test + def testInexistentPartition(): Unit = { + val tp = new TopicPartition("foo", 0) + val replicaManager = mock(classOf[ReplicaManager]) + val partitionRecordWriter = new PartitionWriterImpl( + replicaManager, + new StringKeyValueSerializer(), + CompressionType.NONE, + Time.SYSTEM + ) + + when(replicaManager.getLogConfig(tp)).thenReturn(None) + + val records = List( + ("k0", "v0"), + ("k1", "v1"), + ("k2", "v2"), + ) + + assertThrows(classOf[NotLeaderOrFollowerException], + () => partitionRecordWriter.append(tp, records.asJava)) + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerializer.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerializer.java new file mode 100644 index 0000000000000..4736ebe5a4dcd --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordSerializer.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.protocol.MessageUtil; +import org.apache.kafka.coordinator.group.runtime.PartitionWriter; + +/** + * Serializer which serializes {{@link Record}} to bytes. + */ +public class RecordSerializer implements PartitionWriter.Serializer { + @Override + public byte[] serializeKey(Record record) { + // Record does not accept a null key. + return MessageUtil.toVersionPrefixedBytes( + record.key().version(), + record.key().message() + ); + } + + @Override + public byte[] serializeValue(Record record) { + // Tombstone is represented with a null value. + if (record.value() == null) { + return null; + } else { + return MessageUtil.toVersionPrefixedBytes( + record.value().version(), + record.value().message() + ); + } + } +} diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Result.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Result.java index 57b96202192f2..f21fe86ad7890 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Result.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/Result.java @@ -25,7 +25,7 @@ * * @param The type of the response. */ -class Result { +public class Result { /** * The records. */ diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java new file mode 100644 index 0000000000000..1f8d4119f7ea5 --- /dev/null +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/PartitionWriter.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.runtime; + +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicPartition; + +import java.util.List; + +/** + * A simple interface to write records to Partitions/Logs. It contains the minimum + * required for coordinators. + * + * @param The record type. + */ +public interface PartitionWriter { + + /** + * Serializer to translate T to bytes. + * + * @param The record type. + */ + interface Serializer { + /** + * Serializes the key of the record. + */ + byte[] serializeKey(T record); + + /** + * Serializes the value of the record. + */ + byte[] serializeValue(T record); + } + + /** + * Listener allowing to listen to high watermark changes. This is meant + * to be used in conjunction with {{@link PartitionWriter#append(TopicPartition, List)}}. + */ + interface Listener { + void onHighWatermarkUpdated( + TopicPartition tp, + long offset + ); + } + + /** + * Register a {{@link Listener}}. + * + * @param tp The partition to register the listener to. + * @param listener The listener. + */ + void registerListener( + TopicPartition tp, + Listener listener + ); + + /** + * Deregister a {{@link Listener}}. + * + * @param tp The partition to deregister the listener from. + * @param listener The listener. + */ + void deregisterListener( + TopicPartition tp, + Listener listener + ); + + /** + * Write records to the partitions. Records are written in one batch so + * atomicity is guaranteed. + * + * @param tp The partition to write records to. + * @param records The list of records. The records are written in a single batch. + * @return The log end offset right after the written records. + * @throws KafkaException Any KafkaException caught during the write operation. + */ + long append( + TopicPartition tp, + List records + ) throws KafkaException; +} diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordSerializerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordSerializerTest.java new file mode 100644 index 0000000000000..dfb4f375ea218 --- /dev/null +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordSerializerTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group; + +import org.apache.kafka.common.protocol.MessageUtil; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey; +import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +public class RecordSerializerTest { + @Test + public void testSerializeKey() { + RecordSerializer serializer = new RecordSerializer(); + Record record = new Record( + new ApiMessageAndVersion( + new ConsumerGroupMetadataKey().setGroupId("group"), + (short) 1 + ), + new ApiMessageAndVersion( + new ConsumerGroupMetadataValue().setEpoch(10), + (short) 0 + ) + ); + + assertArrayEquals( + MessageUtil.toVersionPrefixedBytes(record.key().version(), record.key().message()), + serializer.serializeKey(record) + ); + } + + @Test + public void testSerializeValue() { + RecordSerializer serializer = new RecordSerializer(); + Record record = new Record( + new ApiMessageAndVersion( + new ConsumerGroupMetadataKey().setGroupId("group"), + (short) 1 + ), + new ApiMessageAndVersion( + new ConsumerGroupMetadataValue().setEpoch(10), + (short) 0 + ) + ); + + assertArrayEquals( + MessageUtil.toVersionPrefixedBytes(record.value().version(), record.value().message()), + serializer.serializeValue(record) + ); + } + + @Test + public void testSerializeNullValue() { + RecordSerializer serializer = new RecordSerializer(); + Record record = new Record( + new ApiMessageAndVersion( + new ConsumerGroupMetadataKey().setGroupId("group"), + (short) 1 + ), + null + ); + + assertNull(serializer.serializeValue(record)); + } +} From c4da8434ec8c58ab0556645462ef9f9ddd249d2e Mon Sep 17 00:00:00 2001 From: David Jacot Date: Thu, 4 May 2023 14:54:39 +0200 Subject: [PATCH 2/9] small cleanup --- .../unit/kafka/coordinator/group/PartitionWriterImplTest.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/PartitionWriterImplTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/PartitionWriterImplTest.scala index a96a37687e634..9204292f2d8a4 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/PartitionWriterImplTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/PartitionWriterImplTest.scala @@ -145,7 +145,6 @@ class PartitionWriterImplTest { ( Charset.defaultCharset().decode(record.key).toString, Charset.defaultCharset().decode(record.value).toString, - ) }.toList From 41c7cbfca302fd27fc4f7e887e0129bb011d576b Mon Sep 17 00:00:00 2001 From: David Jacot Date: Thu, 1 Jun 2023 16:49:19 +0200 Subject: [PATCH 3/9] address minor comments --- .../group/PartitionWriterImpl.scala | 43 +++++++++++++------ .../group/PartitionWriterImplTest.scala | 2 +- 2 files changed, 30 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/PartitionWriterImpl.scala b/core/src/main/scala/kafka/coordinator/group/PartitionWriterImpl.scala index 6aabd323b2d46..15a637d1fc6ff 100644 --- a/core/src/main/scala/kafka/coordinator/group/PartitionWriterImpl.scala +++ b/core/src/main/scala/kafka/coordinator/group/PartitionWriterImpl.scala @@ -62,6 +62,12 @@ class PartitionWriterImpl[T]( time: Time ) extends PartitionWriter[T] { + /** + * Register a PartitionWriter.Listener. + * + * @param tp The partition to register the listener to. + * @param listener The listener. + */ override def registerListener( tp: TopicPartition, listener: PartitionWriter.Listener @@ -69,6 +75,12 @@ class PartitionWriterImpl[T]( replicaManager.maybeAddListener(tp, new ListenerAdaptor(listener)) } + /** + * Deregister a PartitionWriter.Listener. + * + * @param tp The partition to deregister the listener from. + * @param listener The listener. + */ override def deregisterListener( tp: TopicPartition, listener: PartitionWriter.Listener @@ -76,13 +88,20 @@ class PartitionWriterImpl[T]( replicaManager.removeListener(tp, new ListenerAdaptor(listener)) } + /** + * Write records to the partitions. Records are written in one batch so + * atomicity is guaranteed. + * + * @param tp The partition to write records to. + * @param records The list of records. The records are written in a single batch. + * @return The log end offset right after the written records. + * @throws KafkaException Any KafkaException caught during the write operation. + */ override def append( tp: TopicPartition, records: util.List[T] ): Long = { - if (records.isEmpty) { - throw new IllegalStateException("records must be non-empty.") - } + if (records.isEmpty) throw new IllegalStateException("records must be non-empty.") replicaManager.getLogConfig(tp) match { case Some(logConfig) => @@ -103,17 +122,13 @@ class PartitionWriterImpl[T]( val keyBytes = serializer.serializeKey(record) val valueBytes = serializer.serializeValue(record) - if (recordsBuilder.hasRoomFor(currentTimeMs, keyBytes, valueBytes, EMPTY_HEADERS)) { - recordsBuilder.append( - currentTimeMs, - keyBytes, - valueBytes, - EMPTY_HEADERS - ) - } else { - throw new RecordTooLargeException(s"Message batch size is ${recordsBuilder.estimatedSizeInBytes()} bytes " + - s"in append to partition $tp which exceeds the maximum configured size of $maxBatchSize.") - } + if (recordsBuilder.hasRoomFor(currentTimeMs, keyBytes, valueBytes, EMPTY_HEADERS)) recordsBuilder.append( + currentTimeMs, + keyBytes, + valueBytes, + EMPTY_HEADERS + ) else throw new RecordTooLargeException(s"Message batch size is ${recordsBuilder.estimatedSizeInBytes()} bytes " + + s"in append to partition $tp which exceeds the maximum configured size of $maxBatchSize.") } val appendResults = replicaManager.appendToLocalLog( diff --git a/core/src/test/scala/unit/kafka/coordinator/group/PartitionWriterImplTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/PartitionWriterImplTest.scala index 9204292f2d8a4..16e816789a924 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/PartitionWriterImplTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/PartitionWriterImplTest.scala @@ -201,7 +201,7 @@ class PartitionWriterImplTest { } @Test - def testInexistentPartition(): Unit = { + def testNonexistentPartition(): Unit = { val tp = new TopicPartition("foo", 0) val replicaManager = mock(classOf[ReplicaManager]) val partitionRecordWriter = new PartitionWriterImpl( From 08638b2293e084e64cf369c78757c6eb27c17a6a Mon Sep 17 00:00:00 2001 From: David Jacot Date: Fri, 2 Jun 2023 10:52:44 +0200 Subject: [PATCH 4/9] refactor --- .../common/requests/ProduceResponse.java | 19 +++++- .../group/PartitionWriterImpl.scala | 23 +++---- .../scala/kafka/server/ReplicaManager.scala | 62 +++++++----------- .../AbstractCoordinatorConcurrencyTest.scala | 3 +- .../group/PartitionWriterImplTest.scala | 64 ++++++++++--------- 5 files changed, 87 insertions(+), 84 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java index a00fdecc58645..022298957faac 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java @@ -131,6 +131,7 @@ public Map errorCounts() { public static final class PartitionResponse { public Errors error; public long baseOffset; + public long lastOffset; public long logAppendTime; public long logStartOffset; public List recordErrors; @@ -153,8 +154,21 @@ public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long } public PartitionResponse(Errors error, long baseOffset, long logAppendTime, long logStartOffset, List recordErrors, String errorMessage) { + this(error, baseOffset, INVALID_OFFSET, logAppendTime, logStartOffset, recordErrors, errorMessage); + } + + public PartitionResponse( + Errors error, + long baseOffset, + long lastOffset, + long logAppendTime, + long logStartOffset, + List recordErrors, + String errorMessage + ) { this.error = error; this.baseOffset = baseOffset; + this.lastOffset = lastOffset; this.logAppendTime = logAppendTime; this.logStartOffset = logStartOffset; this.recordErrors = recordErrors; @@ -167,6 +181,7 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; PartitionResponse that = (PartitionResponse) o; return baseOffset == that.baseOffset && + lastOffset == that.lastOffset && logAppendTime == that.logAppendTime && logStartOffset == that.logStartOffset && error == that.error && @@ -176,7 +191,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(error, baseOffset, logAppendTime, logStartOffset, recordErrors, errorMessage); + return Objects.hash(error, baseOffset, lastOffset, logAppendTime, logStartOffset, recordErrors, errorMessage); } @Override @@ -187,6 +202,8 @@ public String toString() { b.append(error); b.append(",offset: "); b.append(baseOffset); + b.append(",lastOffset: "); + b.append(lastOffset); b.append(",logAppendTime: "); b.append(logAppendTime); b.append(", logStartOffset: "); diff --git a/core/src/main/scala/kafka/coordinator/group/PartitionWriterImpl.scala b/core/src/main/scala/kafka/coordinator/group/PartitionWriterImpl.scala index 15a637d1fc6ff..fd5924212a5c5 100644 --- a/core/src/main/scala/kafka/coordinator/group/PartitionWriterImpl.scala +++ b/core/src/main/scala/kafka/coordinator/group/PartitionWriterImpl.scala @@ -17,12 +17,13 @@ package kafka.coordinator.group import kafka.cluster.PartitionListener -import kafka.server.{ReplicaManager, RequestLocal} +import kafka.server.ReplicaManager import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.RecordTooLargeException import org.apache.kafka.common.protocol.Errors import org.apache.kafka.common.record.{CompressionType, MemoryRecords, TimestampType} import org.apache.kafka.common.record.Record.EMPTY_HEADERS +import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.Time import org.apache.kafka.coordinator.group.runtime.PartitionWriter import org.apache.kafka.storage.internals.log.AppendOrigin @@ -131,26 +132,22 @@ class PartitionWriterImpl[T]( s"in append to partition $tp which exceeds the maximum configured size of $maxBatchSize.") } - val appendResults = replicaManager.appendToLocalLog( + var appendResults: Map[TopicPartition, PartitionResponse] = Map.empty + replicaManager.appendRecords( + timeout = 0L, + requiredAcks = 1, internalTopicsAllowed = true, origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> recordsBuilder.build()), - requiredAcks = 1, - requestLocal = RequestLocal.NoCaching + responseCallback = results => appendResults = results, + actionQueueAdd = item => item() // Immediately complete the action queue item. ) val partitionResult = appendResults.getOrElse(tp, - throw new IllegalStateException("Append status %s should only have one partition %s" - .format(appendResults, tp))) - - // Complete delayed operations. - replicaManager.maybeCompletePurgatories( - tp, - partitionResult.info.leaderHwChange - ) + throw new IllegalStateException(s"Append status $appendResults should only have one partition $tp")) // Required offset. - partitionResult.info.lastOffset + 1 + partitionResult.lastOffset + 1 case None => throw Errors.NOT_LEADER_OR_FOLLOWER.exception() diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 380d62702463b..884ffb711c4c8 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -622,6 +622,7 @@ class ReplicaManager(val config: KafkaConfig, * @param requestLocal container for the stateful instances scoped to this request * @param transactionalId transactional ID if the request is from a producer and the producer is transactional * @param transactionStatePartition partition that holds the transactional state if transactionalId is present + * @param actionQueueAdd function to add an action to the action queue. */ def appendRecords(timeout: Long, requiredAcks: Short, @@ -633,7 +634,8 @@ class ReplicaManager(val config: KafkaConfig, recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => (), requestLocal: RequestLocal = RequestLocal.NoCaching, transactionalId: String = null, - transactionStatePartition: Option[Int] = None): Unit = { + transactionStatePartition: Option[Int] = None, + actionQueueAdd: (() => Unit) => Unit = actionQueue.add): Unit = { if (isValidRequiredAcks(requiredAcks)) { val sTime = time.milliseconds @@ -675,6 +677,7 @@ class ReplicaManager(val config: KafkaConfig, new PartitionResponse( result.error, result.info.firstOffset.map[Long](_.messageOffset).orElse(-1L), + result.info.lastOffset, result.info.logAppendTime, result.info.logStartOffset, result.info.recordErrors, @@ -683,15 +686,22 @@ class ReplicaManager(val config: KafkaConfig, ) // response status } - actionQueue.add { - () => - allResults.foreach { - case (topicPartition, result) => - maybeCompletePurgatories( - topicPartition, - result.info.leaderHwChange - ) + actionQueueAdd { + () => allResults.foreach { case (topicPartition, result) => + val requestKey = TopicPartitionOperationKey(topicPartition) + result.info.leaderHwChange match { + case LeaderHwChange.INCREASED => + // some delayed operations may be unblocked after HW changed + delayedProducePurgatory.checkAndComplete(requestKey) + delayedFetchPurgatory.checkAndComplete(requestKey) + delayedDeleteRecordsPurgatory.checkAndComplete(requestKey) + case LeaderHwChange.SAME => + // probably unblock some follower fetch requests since log end offset has been updated + delayedFetchPurgatory.checkAndComplete(requestKey) + case LeaderHwChange.NONE => + // nothing } + } } recordConversionStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordConversionStats }) @@ -708,7 +718,6 @@ class ReplicaManager(val config: KafkaConfig, // this is because while the delayed produce operation is being created, new // requests may arrive and hence make this operation completable. delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys) - } else { // we can respond immediately val produceResponseStatus = produceStatus.map { case (k, status) => k -> status.responseStatus } @@ -760,25 +769,6 @@ class ReplicaManager(val config: KafkaConfig, } } - def maybeCompletePurgatories( - topicPartition: TopicPartition, - leaderHwChange: LeaderHwChange - ): Unit = { - val requestKey = TopicPartitionOperationKey(topicPartition) - leaderHwChange match { - case LeaderHwChange.INCREASED => - // some delayed operations may be unblocked after HW changed - delayedProducePurgatory.checkAndComplete(requestKey) - delayedFetchPurgatory.checkAndComplete(requestKey) - delayedDeleteRecordsPurgatory.checkAndComplete(requestKey) - case LeaderHwChange.SAME => - // probably unblock some follower fetch requests since log end offset has been updated - delayedFetchPurgatory.checkAndComplete(requestKey) - case LeaderHwChange.NONE => - // nothing - } - } - /** * Delete records on leader replicas of the partition, and wait for delete records operation be propagated to other replicas; * the callback function will be triggered either when timeout or logStartOffset of all live replicas have reached the specified offset @@ -1028,15 +1018,11 @@ class ReplicaManager(val config: KafkaConfig, requiredAcks == -1 || requiredAcks == 1 || requiredAcks == 0 } - /** - * Append the messages to the local replica logs. ReplicaManager#appendRecords should usually be - * used instead of this method. - */ - def appendToLocalLog(internalTopicsAllowed: Boolean, - origin: AppendOrigin, - entriesPerPartition: Map[TopicPartition, MemoryRecords], - requiredAcks: Short, - requestLocal: RequestLocal): Map[TopicPartition, LogAppendResult] = { + private def appendToLocalLog(internalTopicsAllowed: Boolean, + origin: AppendOrigin, + entriesPerPartition: Map[TopicPartition, MemoryRecords], + requiredAcks: Short, + requestLocal: RequestLocal): Map[TopicPartition, LogAppendResult] = { val traceEnabled = isTraceEnabled def processFailedRecord(topicPartition: TopicPartition, t: Throwable) = { val logStartOffset = onlinePartition(topicPartition).map(_.logStartOffset).getOrElse(-1L) diff --git a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala index 492e384390683..0751ef424cbf6 100644 --- a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala @@ -180,7 +180,8 @@ object AbstractCoordinatorConcurrencyTest { processingStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => (), requestLocal: RequestLocal = RequestLocal.NoCaching, transactionalId: String = null, - transactionStatePartition: Option[Int]): Unit = { + transactionStatePartition: Option[Int], + actionQueueAdd: (() => Unit) => Unit = null): Unit = { if (entriesPerPartition.isEmpty) return diff --git a/core/src/test/scala/unit/kafka/coordinator/group/PartitionWriterImplTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/PartitionWriterImplTest.scala index 16e816789a924..a846fbeb23bba 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/PartitionWriterImplTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/PartitionWriterImplTest.scala @@ -16,22 +16,25 @@ */ package kafka.coordinator.group -import kafka.server.{LogAppendResult, ReplicaManager, RequestLocal} +import kafka.server.ReplicaManager import kafka.utils.TestUtils import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.errors.{NotLeaderOrFollowerException, RecordTooLargeException} -import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch, RecordConversionStats} +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.record.{CompressionType, MemoryRecords, RecordBatch} +import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse import org.apache.kafka.common.utils.{MockTime, Time} import org.apache.kafka.coordinator.group.runtime.PartitionWriter -import org.apache.kafka.storage.internals.log.{AppendOrigin, LeaderHwChange, LogAppendInfo, LogConfig} +import org.apache.kafka.storage.internals.log.{AppendOrigin, LogConfig} import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows} import org.junit.jupiter.api.Test import org.mockito.{ArgumentCaptor, ArgumentMatchers} import org.mockito.Mockito.{mock, verify, when} import java.nio.charset.Charset -import java.util.{Collections, Optional, OptionalInt, Properties} +import java.util.{Collections, Properties} +import scala.collection.Map import scala.jdk.CollectionConverters._ class StringKeyValueSerializer extends PartitionWriter.Serializer[(String, String)] { @@ -95,31 +98,35 @@ class PartitionWriterImplTest { val recordsCapture: ArgumentCaptor[Map[TopicPartition, MemoryRecords]] = ArgumentCaptor.forClass(classOf[Map[TopicPartition, MemoryRecords]]) - when(replicaManager.appendToLocalLog( + val callbackCapture: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) + + when(replicaManager.appendRecords( + ArgumentMatchers.eq(0L), + ArgumentMatchers.eq(1.toShort), ArgumentMatchers.eq(true), ArgumentMatchers.eq(AppendOrigin.COORDINATOR), recordsCapture.capture(), - ArgumentMatchers.eq(1), - ArgumentMatchers.eq(RequestLocal.NoCaching) - )).thenReturn(Map(tp -> LogAppendResult(new LogAppendInfo( - Optional.empty(), - 10, - OptionalInt.empty(), - RecordBatch.NO_TIMESTAMP, - -1L, - RecordBatch.NO_TIMESTAMP, - -1L, - RecordConversionStats.EMPTY, - CompressionType.NONE, - CompressionType.NONE, - -1, - -1, - false, - -1L, - Collections.emptyList(), - "", - LeaderHwChange.INCREASED - )))) + callbackCapture.capture(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + )).thenAnswer( _ => { + callbackCapture.getValue.apply(Map( + tp -> new PartitionResponse( + Errors.NONE, + 5, + 10, + RecordBatch.NO_TIMESTAMP, + -1, + Collections.emptyList(), + "" + ) + )) + }) val records = List( ("k0", "v0"), @@ -132,11 +139,6 @@ class PartitionWriterImplTest { records.asJava )) - verify(replicaManager).maybeCompletePurgatories( - tp, - LeaderHwChange.INCREASED - ) - val batch = recordsCapture.getValue.getOrElse(tp, throw new AssertionError(s"No records for $tp")) assertEquals(1, batch.batches().asScala.toList.size) From 5172c275b9e3f2d7bb34b4b70c8028dd8dc24163 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Fri, 2 Jun 2023 11:00:15 +0200 Subject: [PATCH 5/9] rename --- ...erImpl.scala => CoordinatorPartitionWriter.scala} | 2 +- ...st.scala => CoordinatorPartitionWriterTest.scala} | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) rename core/src/main/scala/kafka/coordinator/group/{PartitionWriterImpl.scala => CoordinatorPartitionWriter.scala} (99%) rename core/src/test/scala/unit/kafka/coordinator/group/{PartitionWriterImplTest.scala => CoordinatorPartitionWriterTest.scala} (95%) diff --git a/core/src/main/scala/kafka/coordinator/group/PartitionWriterImpl.scala b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala similarity index 99% rename from core/src/main/scala/kafka/coordinator/group/PartitionWriterImpl.scala rename to core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala index fd5924212a5c5..a4668c93f2ad8 100644 --- a/core/src/main/scala/kafka/coordinator/group/PartitionWriterImpl.scala +++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala @@ -56,7 +56,7 @@ private[group] class ListenerAdaptor( } } -class PartitionWriterImpl[T]( +class CoordinatorPartitionWriter[T]( replicaManager: ReplicaManager, serializer: PartitionWriter.Serializer[T], compressionType: CompressionType, diff --git a/core/src/test/scala/unit/kafka/coordinator/group/PartitionWriterImplTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala similarity index 95% rename from core/src/test/scala/unit/kafka/coordinator/group/PartitionWriterImplTest.scala rename to core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala index a846fbeb23bba..732664b9e7d75 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/PartitionWriterImplTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala @@ -47,12 +47,12 @@ class StringKeyValueSerializer extends PartitionWriter.Serializer[(String, Strin } } -class PartitionWriterImplTest { +class CoordinatorPartitionWriterTest { @Test def testRegisterDeregisterListener(): Unit = { val tp = new TopicPartition("foo", 0) val replicaManager = mock(classOf[ReplicaManager]) - val partitionRecordWriter = new PartitionWriterImpl( + val partitionRecordWriter = new CoordinatorPartitionWriter( replicaManager, new StringKeyValueSerializer(), CompressionType.NONE, @@ -84,7 +84,7 @@ class PartitionWriterImplTest { val tp = new TopicPartition("foo", 0) val replicaManager = mock(classOf[ReplicaManager]) val time = new MockTime() - val partitionRecordWriter = new PartitionWriterImpl( + val partitionRecordWriter = new CoordinatorPartitionWriter( replicaManager, new StringKeyValueSerializer(), CompressionType.NONE, @@ -157,7 +157,7 @@ class PartitionWriterImplTest { def testWriteRecordTooLarge(): Unit = { val tp = new TopicPartition("foo", 0) val replicaManager = mock(classOf[ReplicaManager]) - val partitionRecordWriter = new PartitionWriterImpl( + val partitionRecordWriter = new CoordinatorPartitionWriter( replicaManager, new StringKeyValueSerializer(), CompressionType.NONE, @@ -186,7 +186,7 @@ class PartitionWriterImplTest { def testWriteEmptyRecordList(): Unit = { val tp = new TopicPartition("foo", 0) val replicaManager = mock(classOf[ReplicaManager]) - val partitionRecordWriter = new PartitionWriterImpl( + val partitionRecordWriter = new CoordinatorPartitionWriter( replicaManager, new StringKeyValueSerializer(), CompressionType.NONE, @@ -206,7 +206,7 @@ class PartitionWriterImplTest { def testNonexistentPartition(): Unit = { val tp = new TopicPartition("foo", 0) val replicaManager = mock(classOf[ReplicaManager]) - val partitionRecordWriter = new PartitionWriterImpl( + val partitionRecordWriter = new CoordinatorPartitionWriter( replicaManager, new StringKeyValueSerializer(), CompressionType.NONE, From 88f71e5169b604b73305e0cd427cb358dcee7318 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Fri, 2 Jun 2023 21:15:00 +0200 Subject: [PATCH 6/9] fix tests --- .../kafka/coordinator/group/GroupCoordinatorTest.scala | 4 ++++ .../coordinator/group/GroupMetadataManagerTest.scala | 10 ++++++++++ .../transaction/TransactionStateManagerTest.scala | 6 ++++++ .../test/scala/unit/kafka/server/KafkaApisTest.scala | 7 ++++++- 4 files changed, 26 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala index 403a0c0885ae4..3a7f6f91a9df6 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala @@ -3865,6 +3865,7 @@ class GroupCoordinatorTest { any(), any(), any(), + any(), any() )).thenAnswer(_ => { capturedArgument.getValue.apply( @@ -3901,6 +3902,7 @@ class GroupCoordinatorTest { any(), any(), any(), + any(), any())).thenAnswer(_ => { capturedArgument.getValue.apply( Map(new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId) -> @@ -4047,6 +4049,7 @@ class GroupCoordinatorTest { any(), any(), any(), + any(), any()) ).thenAnswer(_ => { capturedArgument.getValue.apply( @@ -4082,6 +4085,7 @@ class GroupCoordinatorTest { any(), any(), any(), + any(), any()) ).thenAnswer(_ => { capturedArgument.getValue.apply( diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index da01e46162684..03de6484c7fa9 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -1185,6 +1185,7 @@ class GroupMetadataManagerTest { any(), any(), any(), + any(), any()) verify(replicaManager).getMagic(any()) } @@ -1223,6 +1224,7 @@ class GroupMetadataManagerTest { any(), any(), any(), + any(), any()) verify(replicaManager).getMagic(any()) } @@ -1299,6 +1301,7 @@ class GroupMetadataManagerTest { any(), any(), any(), + any(), any()) // Will update sensor after commit assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count")) @@ -1341,6 +1344,7 @@ class GroupMetadataManagerTest { any(), any(), any(), + any(), any()) verify(replicaManager).getMagic(any()) capturedResponseCallback.getValue.apply(Map(groupTopicPartition -> @@ -1401,6 +1405,7 @@ class GroupMetadataManagerTest { any(), any(), any(), + any(), any()) verify(replicaManager).getMagic(any()) } @@ -1451,6 +1456,7 @@ class GroupMetadataManagerTest { any(), any(), any(), + any(), any()) verify(replicaManager).getMagic(any()) } @@ -1603,6 +1609,7 @@ class GroupMetadataManagerTest { any(), any(), any(), + any(), any()) verify(replicaManager).getMagic(any()) assertEquals(1, TestUtils.totalMetricValue(metrics, "offset-commit-count")) @@ -1710,6 +1717,7 @@ class GroupMetadataManagerTest { any(), any(), any(), + any(), any()) verify(replicaManager, times(2)).getMagic(any()) } @@ -2817,6 +2825,7 @@ class GroupMetadataManagerTest { any(), any(), any(), + any(), any()) capturedArgument } @@ -2834,6 +2843,7 @@ class GroupMetadataManagerTest { any(), any(), any(), + any(), any()) ).thenAnswer(_ => { capturedCallback.getValue.apply( diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala index 78c5a5e3ce8b6..67919e5c49c9c 100644 --- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala @@ -658,6 +658,7 @@ class TransactionStateManagerTest { any(), any(), any(), + any(), any() ) @@ -703,6 +704,7 @@ class TransactionStateManagerTest { any(), any(), any(), + any(), any() ) @@ -745,6 +747,7 @@ class TransactionStateManagerTest { any(), any(), any(), + any(), any()) assertEquals(Set.empty, listExpirableTransactionalIds()) @@ -803,6 +806,7 @@ class TransactionStateManagerTest { any(), any(), any(), + any(), any() ) @@ -953,6 +957,7 @@ class TransactionStateManagerTest { any(), any(), any(), + any(), any() )).thenAnswer(_ => callbackCapture.getValue.apply( recordsCapture.getValue.map { case (topicPartition, records) => @@ -1105,6 +1110,7 @@ class TransactionStateManagerTest { any(), any(), any(), + any(), any()) ).thenAnswer(_ => capturedArgument.getValue.apply( Map(new TopicPartition(TRANSACTION_STATE_TOPIC_NAME, partitionId) -> diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 8007c873ccc6a..3c16f4d5fc6c2 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -2310,6 +2310,7 @@ class KafkaApisTest { any(), any(), any(), + any(), any()) ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp -> new PartitionResponse(Errors.INVALID_PRODUCER_EPOCH)))) @@ -2378,7 +2379,8 @@ class KafkaApisTest { any(), any(), ArgumentMatchers.eq(transactionalId), - ArgumentMatchers.eq(Some(transactionCoordinatorPartition))) + ArgumentMatchers.eq(Some(transactionCoordinatorPartition)), + any()) } } @@ -2505,6 +2507,7 @@ class KafkaApisTest { any(), ArgumentMatchers.eq(requestLocal), any(), + any(), any()) ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE)))) @@ -2637,6 +2640,7 @@ class KafkaApisTest { any(), ArgumentMatchers.eq(requestLocal), any(), + any(), any()) ).thenAnswer(_ => responseCallback.getValue.apply(Map(tp2 -> new PartitionResponse(Errors.NONE)))) @@ -2671,6 +2675,7 @@ class KafkaApisTest { any(), ArgumentMatchers.eq(requestLocal), any(), + any(), any()) } From 606a061461db05fa80fd4149a98e3be3d71b7f4f Mon Sep 17 00:00:00 2001 From: David Jacot Date: Fri, 2 Jun 2023 21:26:41 +0200 Subject: [PATCH 7/9] address comment --- .../kafka/coordinator/group/CoordinatorPartitionWriter.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala index a4668c93f2ad8..138f5c6ee7ab8 100644 --- a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala +++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala @@ -144,7 +144,7 @@ class CoordinatorPartitionWriter[T]( ) val partitionResult = appendResults.getOrElse(tp, - throw new IllegalStateException(s"Append status $appendResults should only have one partition $tp")) + throw new IllegalStateException(s"Append status $appendResults should have partition $tp.")) // Required offset. partitionResult.lastOffset + 1 From f9daa7ff738fe7342b6783a1ada91f7a5c3d744e Mon Sep 17 00:00:00 2001 From: David Jacot Date: Mon, 5 Jun 2023 09:27:51 +0200 Subject: [PATCH 8/9] refactor --- .../group/CoordinatorPartitionWriter.scala | 27 ++++++++++++++----- .../main/scala/kafka/server/ActionQueue.scala | 22 ++++++++++----- .../scala/kafka/server/ReplicaManager.scala | 11 +++++--- .../AbstractCoordinatorConcurrencyTest.scala | 2 +- .../CoordinatorPartitionWriterTest.scala | 12 ++++----- 5 files changed, 51 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala index 138f5c6ee7ab8..c354bac443788 100644 --- a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala +++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala @@ -17,7 +17,7 @@ package kafka.coordinator.group import kafka.cluster.PartitionListener -import kafka.server.ReplicaManager +import kafka.server.{ActionQueue, ReplicaManager} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.errors.RecordTooLargeException import org.apache.kafka.common.protocol.Errors @@ -32,7 +32,11 @@ import java.nio.ByteBuffer import java.util import scala.collection.Map -private[group] class ListenerAdaptor( +/** + * ListenerAdapter adapts the PartitionListener interface to the + * PartitionWriter.Listener interface. + */ +private[group] class ListenerAdapter( val listener: PartitionWriter.Listener ) extends PartitionListener { override def onHighWatermarkUpdated( @@ -43,7 +47,7 @@ private[group] class ListenerAdaptor( } override def equals(that: Any): Boolean = that match { - case other: ListenerAdaptor => listener.equals(other.listener) + case other: ListenerAdapter => listener.equals(other.listener) case _ => false } @@ -62,6 +66,15 @@ class CoordinatorPartitionWriter[T]( compressionType: CompressionType, time: Time ) extends PartitionWriter[T] { + // We use an action queue which directly executes actions. This is possible + // here because we don't hold any conflicting locks. + private val directActionQueue = new ActionQueue { + override def add(action: () => Unit): Unit = { + action() + } + + override def tryCompleteActions(): Unit = {} + } /** * Register a PartitionWriter.Listener. @@ -73,7 +86,7 @@ class CoordinatorPartitionWriter[T]( tp: TopicPartition, listener: PartitionWriter.Listener ): Unit = { - replicaManager.maybeAddListener(tp, new ListenerAdaptor(listener)) + replicaManager.maybeAddListener(tp, new ListenerAdapter(listener)) } /** @@ -86,7 +99,7 @@ class CoordinatorPartitionWriter[T]( tp: TopicPartition, listener: PartitionWriter.Listener ): Unit = { - replicaManager.removeListener(tp, new ListenerAdaptor(listener)) + replicaManager.removeListener(tp, new ListenerAdapter(listener)) } /** @@ -140,7 +153,9 @@ class CoordinatorPartitionWriter[T]( origin = AppendOrigin.COORDINATOR, entriesPerPartition = Map(tp -> recordsBuilder.build()), responseCallback = results => appendResults = results, - actionQueueAdd = item => item() // Immediately complete the action queue item. + // We can directly complete the purgatories here because we don't hold + // any conflicting locks. + actionQueue = directActionQueue ) val partitionResult = appendResults.getOrElse(tp, diff --git a/core/src/main/scala/kafka/server/ActionQueue.scala b/core/src/main/scala/kafka/server/ActionQueue.scala index 1b6b8326fa4cf..d0791a98633fb 100644 --- a/core/src/main/scala/kafka/server/ActionQueue.scala +++ b/core/src/main/scala/kafka/server/ActionQueue.scala @@ -22,22 +22,32 @@ import java.util.concurrent.ConcurrentLinkedQueue import kafka.utils.Logging /** - * This queue is used to collect actions which need to be executed later. One use case is that ReplicaManager#appendRecords - * produces record changes so we need to check and complete delayed requests. In order to avoid conflicting locking, - * we add those actions to this queue and then complete them at the end of KafkaApis.handle() or DelayedJoin.onExpiration. + * The action queue is used to collect actions which need to be executed later. */ -class ActionQueue extends Logging { - private val queue = new ConcurrentLinkedQueue[() => Unit]() +trait ActionQueue { /** * add action to this queue. * @param action action */ - def add(action: () => Unit): Unit = queue.add(action) + def add(action: () => Unit): Unit /** * try to complete all delayed actions */ + def tryCompleteActions(): Unit +} + +/** + * This queue is used to collect actions which need to be executed later. One use case is that ReplicaManager#appendRecords + * produces record changes so we need to check and complete delayed requests. In order to avoid conflicting locking, + * we add those actions to this queue and then complete them at the end of KafkaApis.handle() or DelayedJoin.onExpiration. + */ +class DelayedActionQueue extends Logging with ActionQueue { + private val queue = new ConcurrentLinkedQueue[() => Unit]() + + def add(action: () => Unit): Unit = queue.add(action) + def tryCompleteActions(): Unit = { val maxToComplete = queue.size() var count = 0 diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index bad4b61579ba4..b5542aeb9dd30 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -631,7 +631,7 @@ class ReplicaManager(val config: KafkaConfig, /** * TODO: move this action queue to handle thread so we can simplify concurrency handling */ - private val actionQueue = new ActionQueue + private val actionQueue = new DelayedActionQueue def tryCompleteActions(): Unit = actionQueue.tryCompleteActions() @@ -655,7 +655,7 @@ class ReplicaManager(val config: KafkaConfig, * @param requestLocal container for the stateful instances scoped to this request * @param transactionalId transactional ID if the request is from a producer and the producer is transactional * @param transactionStatePartition partition that holds the transactional state if transactionalId is present - * @param actionQueueAdd function to add an action to the action queue. + * @param actionQueue the action queue to use. ReplicaManager#actionQueue is used by default. */ def appendRecords(timeout: Long, requiredAcks: Short, @@ -668,7 +668,7 @@ class ReplicaManager(val config: KafkaConfig, requestLocal: RequestLocal = RequestLocal.NoCaching, transactionalId: String = null, transactionStatePartition: Option[Int] = None, - actionQueueAdd: (() => Unit) => Unit = actionQueue.add): Unit = { + actionQueue: ActionQueue = this.actionQueue): Unit = { if (isValidRequiredAcks(requiredAcks)) { val sTime = time.milliseconds @@ -733,7 +733,7 @@ class ReplicaManager(val config: KafkaConfig, ) // response status } - actionQueueAdd { + actionQueue.add { () => allResults.foreach { case (topicPartition, result) => val requestKey = TopicPartitionOperationKey(topicPartition) result.info.leaderHwChange match { @@ -1066,6 +1066,9 @@ class ReplicaManager(val config: KafkaConfig, requiredAcks == -1 || requiredAcks == 1 || requiredAcks == 0 } + /** + * Append the messages to the local replica logs + */ private def appendToLocalLog(internalTopicsAllowed: Boolean, origin: AppendOrigin, entriesPerPartition: Map[TopicPartition, MemoryRecords], diff --git a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala index 0751ef424cbf6..ab97b1c08f626 100644 --- a/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/AbstractCoordinatorConcurrencyTest.scala @@ -181,7 +181,7 @@ object AbstractCoordinatorConcurrencyTest { requestLocal: RequestLocal = RequestLocal.NoCaching, transactionalId: String = null, transactionStatePartition: Option[Int], - actionQueueAdd: (() => Unit) => Unit = null): Unit = { + actionQueue: ActionQueue = null): Unit = { if (entriesPerPartition.isEmpty) return diff --git a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala index 732664b9e7d75..7f300c6785633 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala @@ -64,18 +64,18 @@ class CoordinatorPartitionWriterTest { } partitionRecordWriter.registerListener(tp, listener) - verify(replicaManager).maybeAddListener(tp, new ListenerAdaptor(listener)) + verify(replicaManager).maybeAddListener(tp, new ListenerAdapter(listener)) partitionRecordWriter.deregisterListener(tp, listener) - verify(replicaManager).removeListener(tp, new ListenerAdaptor(listener)) + verify(replicaManager).removeListener(tp, new ListenerAdapter(listener)) assertEquals( - new ListenerAdaptor(listener), - new ListenerAdaptor(listener) + new ListenerAdapter(listener), + new ListenerAdapter(listener) ) assertEquals( - new ListenerAdaptor(listener).hashCode(), - new ListenerAdaptor(listener).hashCode() + new ListenerAdapter(listener).hashCode(), + new ListenerAdapter(listener).hashCode() ) } From b20eefa827155a580f89b84ce1d79e633b362b60 Mon Sep 17 00:00:00 2001 From: David Jacot Date: Tue, 6 Jun 2023 07:58:16 +0200 Subject: [PATCH 9/9] address comments --- .../group/CoordinatorPartitionWriter.scala | 6 +- .../CoordinatorPartitionWriterTest.scala | 56 +++++++++++++++++-- 2 files changed, 57 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala index c354bac443788..5ee576ff3c52e 100644 --- a/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala +++ b/core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala @@ -56,7 +56,7 @@ private[group] class ListenerAdapter( } override def toString: String = { - s"ListenerAdaptor(listener=$listener)" + s"ListenerAdapter(listener=$listener)" } } @@ -161,6 +161,10 @@ class CoordinatorPartitionWriter[T]( val partitionResult = appendResults.getOrElse(tp, throw new IllegalStateException(s"Append status $appendResults should have partition $tp.")) + if (partitionResult.error != Errors.NONE) { + throw partitionResult.error.exception() + } + // Required offset. partitionResult.lastOffset + 1 diff --git a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala index 7f300c6785633..436458ccc4858 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala @@ -134,10 +134,7 @@ class CoordinatorPartitionWriterTest { ("k2", "v2"), ) - assertEquals(11, partitionRecordWriter.append( - tp, - records.asJava - )) + assertEquals(11, partitionRecordWriter.append(tp, records.asJava)) val batch = recordsCapture.getValue.getOrElse(tp, throw new AssertionError(s"No records for $tp")) @@ -153,6 +150,57 @@ class CoordinatorPartitionWriterTest { assertEquals(records, receivedRecords) } + @Test + def testWriteRecordsWithFailure(): Unit = { + val tp = new TopicPartition("foo", 0) + val replicaManager = mock(classOf[ReplicaManager]) + val time = new MockTime() + val partitionRecordWriter = new CoordinatorPartitionWriter( + replicaManager, + new StringKeyValueSerializer(), + CompressionType.NONE, + time + ) + + when(replicaManager.getLogConfig(tp)).thenReturn(Some(LogConfig.fromProps( + Collections.emptyMap(), + new Properties() + ))) + + val recordsCapture: ArgumentCaptor[Map[TopicPartition, MemoryRecords]] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, MemoryRecords]]) + val callbackCapture: ArgumentCaptor[Map[TopicPartition, PartitionResponse] => Unit] = + ArgumentCaptor.forClass(classOf[Map[TopicPartition, PartitionResponse] => Unit]) + + when(replicaManager.appendRecords( + ArgumentMatchers.eq(0L), + ArgumentMatchers.eq(1.toShort), + ArgumentMatchers.eq(true), + ArgumentMatchers.eq(AppendOrigin.COORDINATOR), + recordsCapture.capture(), + callbackCapture.capture(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any(), + ArgumentMatchers.any() + )).thenAnswer(_ => { + callbackCapture.getValue.apply(Map( + tp -> new PartitionResponse(Errors.NOT_LEADER_OR_FOLLOWER) + )) + }) + + val records = List( + ("k0", "v0"), + ("k1", "v1"), + ("k2", "v2"), + ) + + assertThrows(classOf[NotLeaderOrFollowerException], + () => partitionRecordWriter.append(tp, records.asJava)) + } + @Test def testWriteRecordTooLarge(): Unit = { val tp = new TopicPartition("foo", 0)