Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import java.util.Deque;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;

/**
* Simple non-threadsafe interface for caching byte buffers. This is suitable for simple cases like ensuring that
Expand All @@ -48,6 +51,34 @@ public static BufferSupplier create() {
return new DefaultSupplier();
}

/**
* A thread-safe buffer supplier instance.
*/
public static final BufferSupplier SINGLETON = new BufferSupplier() {
// We currently use a single block size, so optimise for that case
// visible for testing
final ConcurrentMap<Integer, Deque<ByteBuffer>> bufferMap = new ConcurrentHashMap<>(1);

@Override
public ByteBuffer get(int size) {
Deque<ByteBuffer> bufferQueue = bufferMap.get(size);
ByteBuffer buffer = bufferQueue == null ? null : bufferQueue.pollFirst();
return buffer == null ? ByteBuffer.allocate(size) : buffer;
}

@Override
public void release(ByteBuffer buffer) {
buffer.clear();
bufferMap.computeIfAbsent(buffer.capacity(), size -> new ConcurrentLinkedDeque<>())
.addLast(buffer);
}

@Override
public void close() {
bufferMap.clear();
}
};

/**
* Supply a buffer with the required capacity. This may return a cached buffer or allocate a new instance.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,17 @@
import org.junit.Test;

import java.nio.ByteBuffer;
import java.util.Deque;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;

public class BufferSupplierTest {

Expand All @@ -43,4 +51,40 @@ public void testGrowableBuffer() {
assertEquals(0, increased.position());
}

@SuppressWarnings("unchecked")
@Test
public void testSingleton() throws InterruptedException, NoSuchFieldException, IllegalAccessException {
int threadCount = 4;
ExecutorService threadPool = Executors.newFixedThreadPool(threadCount);
AtomicBoolean closed = new AtomicBoolean(false);
try {
IntStream.range(0, threadCount).forEach(index ->
threadPool.execute(() -> {
try {
while (!closed.get()) {
ByteBuffer buffer = BufferSupplier.SINGLETON.get(index % 2 == 0 ? 10 : 20);
BufferSupplier.SINGLETON.release(buffer);
TimeUnit.MILLISECONDS.sleep(10);
}
} catch (InterruptedException e) {
// nothing
}
})
);
TimeUnit.SECONDS.sleep(5);
} finally {
closed.set(true);
threadPool.shutdown();
assertTrue(threadPool.awaitTermination(30, TimeUnit.SECONDS));
}

ConcurrentMap<Integer, Deque<ByteBuffer>> bufferMap = (ConcurrentMap<Integer, Deque<ByteBuffer>>) BufferSupplier.SINGLETON
.getClass().getDeclaredField("bufferMap").get(BufferSupplier.SINGLETON);
assertEquals(2, bufferMap.size());
bufferMap.values().forEach(queue -> assertEquals(2, queue.size()));

BufferSupplier.SINGLETON.close();
assertEquals(0, bufferMap.size());
}

}
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/LogValidator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -398,9 +398,9 @@ private[log] object LogValidator extends Logging {
// if we are on version 2 and beyond, and we know we are going for in place assignment,
// then we can optimize the iterator to skip key / value / headers since they would not be used at all
val recordsIterator = if (inPlaceAssignment && firstBatch.magic >= RecordBatch.MAGIC_VALUE_V2)
batch.skipKeyValueIterator(BufferSupplier.NO_CACHING)
batch.skipKeyValueIterator(BufferSupplier.SINGLETON)
else
batch.streamingIterator(BufferSupplier.NO_CACHING)
batch.streamingIterator(BufferSupplier.SINGLETON)

try {
val recordErrors = new ArrayBuffer[ApiRecordError](0)
Expand Down