From 41be8fa19d6600685df091fb7235dfe42a30fc1f Mon Sep 17 00:00:00 2001 From: Chia-Ping Tsai Date: Wed, 26 Aug 2020 01:44:05 +0800 Subject: [PATCH] KAFKA-10433 Reuse the ByteBuffer in validating compressed records --- .../kafka/common/record/BufferSupplier.java | 31 +++++++++++++ .../common/record/BufferSupplierTest.java | 44 +++++++++++++++++++ .../main/scala/kafka/log/LogValidator.scala | 4 +- 3 files changed, 77 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java b/clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java index 1a6c92c712fba..c1d77f083b7f5 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java +++ b/clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java @@ -22,6 +22,9 @@ import java.util.Deque; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentMap; /** * Simple non-threadsafe interface for caching byte buffers. This is suitable for simple cases like ensuring that @@ -48,6 +51,34 @@ public static BufferSupplier create() { return new DefaultSupplier(); } + /** + * A thread-safe buffer supplier instance. + */ + public static final BufferSupplier SINGLETON = new BufferSupplier() { + // We currently use a single block size, so optimise for that case + // visible for testing + final ConcurrentMap> bufferMap = new ConcurrentHashMap<>(1); + + @Override + public ByteBuffer get(int size) { + Deque bufferQueue = bufferMap.get(size); + ByteBuffer buffer = bufferQueue == null ? null : bufferQueue.pollFirst(); + return buffer == null ? ByteBuffer.allocate(size) : buffer; + } + + @Override + public void release(ByteBuffer buffer) { + buffer.clear(); + bufferMap.computeIfAbsent(buffer.capacity(), size -> new ConcurrentLinkedDeque<>()) + .addLast(buffer); + } + + @Override + public void close() { + bufferMap.clear(); + } + }; + /** * Supply a buffer with the required capacity. This may return a cached buffer or allocate a new instance. */ diff --git a/clients/src/test/java/org/apache/kafka/common/record/BufferSupplierTest.java b/clients/src/test/java/org/apache/kafka/common/record/BufferSupplierTest.java index dea0c9854133e..4abd43fe0dce8 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/BufferSupplierTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/BufferSupplierTest.java @@ -20,9 +20,17 @@ import org.junit.Test; import java.nio.ByteBuffer; +import java.util.Deque; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.IntStream; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; public class BufferSupplierTest { @@ -43,4 +51,40 @@ public void testGrowableBuffer() { assertEquals(0, increased.position()); } + @SuppressWarnings("unchecked") + @Test + public void testSingleton() throws InterruptedException, NoSuchFieldException, IllegalAccessException { + int threadCount = 4; + ExecutorService threadPool = Executors.newFixedThreadPool(threadCount); + AtomicBoolean closed = new AtomicBoolean(false); + try { + IntStream.range(0, threadCount).forEach(index -> + threadPool.execute(() -> { + try { + while (!closed.get()) { + ByteBuffer buffer = BufferSupplier.SINGLETON.get(index % 2 == 0 ? 10 : 20); + BufferSupplier.SINGLETON.release(buffer); + TimeUnit.MILLISECONDS.sleep(10); + } + } catch (InterruptedException e) { + // nothing + } + }) + ); + TimeUnit.SECONDS.sleep(5); + } finally { + closed.set(true); + threadPool.shutdown(); + assertTrue(threadPool.awaitTermination(30, TimeUnit.SECONDS)); + } + + ConcurrentMap> bufferMap = (ConcurrentMap>) BufferSupplier.SINGLETON + .getClass().getDeclaredField("bufferMap").get(BufferSupplier.SINGLETON); + assertEquals(2, bufferMap.size()); + bufferMap.values().forEach(queue -> assertEquals(2, queue.size())); + + BufferSupplier.SINGLETON.close(); + assertEquals(0, bufferMap.size()); + } + } diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala index 8e1744683588f..629efabe135ea 100644 --- a/core/src/main/scala/kafka/log/LogValidator.scala +++ b/core/src/main/scala/kafka/log/LogValidator.scala @@ -398,9 +398,9 @@ private[log] object LogValidator extends Logging { // if we are on version 2 and beyond, and we know we are going for in place assignment, // then we can optimize the iterator to skip key / value / headers since they would not be used at all val recordsIterator = if (inPlaceAssignment && firstBatch.magic >= RecordBatch.MAGIC_VALUE_V2) - batch.skipKeyValueIterator(BufferSupplier.NO_CACHING) + batch.skipKeyValueIterator(BufferSupplier.SINGLETON) else - batch.streamingIterator(BufferSupplier.NO_CACHING) + batch.streamingIterator(BufferSupplier.SINGLETON) try { val recordErrors = new ArrayBuffer[ApiRecordError](0)