BUFFER_CONTEXT_MAP = new WeakHashMap<>();
+ private static final String OFFHEAP_BUFFER_FACTORY_CONFIG = "pinot.offheap.buffer.factory";
+ private static final String OFFHEAP_BUFFER_PRIORITIZE_BYTE_BUFFER_CONFIG = "pinot.offheap.prioritize.bytebuffer";
+
+ private static volatile PinotBufferFactory _factory = createDefaultFactory();
+
+ public static void useFactory(PinotBufferFactory factory) {
+ _factory = factory;
+ }
+
+ private static PinotBufferFactory createDefaultFactory() {
+ String factoryClassName;
+ // TODO: If chronicle is going to be in their own package, use another way to get the runtime version
+ int jvmVersion = Jvm.majorVersion();
+ if (jvmVersion > 11) {
+ LOGGER.info("Using Chronicle Bytes as buffer on JVM version {}", jvmVersion);
+ factoryClassName = ChroniclePinotBufferFactory.class.getCanonicalName();
+ } else {
+ LOGGER.info("Using LArray as buffer on JVM version {}", jvmVersion);
+ factoryClassName = LArrayPinotBufferFactory.class.getCanonicalName();
+ }
+ return createFactory(factoryClassName, true);
+ }
+
+ private static PinotBufferFactory createFactory(String factoryClassName, boolean prioritizeByteBuffer) {
+ try {
+ LOGGER.info("Instantiating Pinot buffer factory class {}", factoryClassName);
+ PinotBufferFactory factory = (PinotBufferFactory) Class.forName(factoryClassName).getConstructor().newInstance();
+
+ if (prioritizeByteBuffer) {
+ factory = new SmallWithFallbackPinotBufferFactory(new ByteBufferPinotBufferFactory(), factory);
+ }
+
+ return factory;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static void loadFactory(PinotConfiguration configuration) {
+ boolean prioritizeByteBuffer = configuration.getProperty(OFFHEAP_BUFFER_PRIORITIZE_BYTE_BUFFER_CONFIG, true);
+ String factoryClassName = configuration.getProperty(OFFHEAP_BUFFER_FACTORY_CONFIG);
+ if (factoryClassName != null) {
+ _factory = createFactory(factoryClassName, prioritizeByteBuffer);
+ } else {
+ LOGGER.info("No custom Pinot buffer factory class found in configuration. Using default factory");
+ _factory = createDefaultFactory();
+ }
+ }
+
/**
* Allocates a buffer using direct memory.
* NOTE: The contents of the allocated buffer are not defined.
@@ -111,19 +163,10 @@ public String toString() {
public static PinotDataBuffer allocateDirect(long size, ByteOrder byteOrder, @Nullable String description) {
PinotDataBuffer buffer;
try {
- if (size <= Integer.MAX_VALUE) {
- buffer = PinotByteBuffer.allocateDirect((int) size, byteOrder);
- } else {
- if (byteOrder == NATIVE_ORDER) {
- buffer = PinotNativeOrderLBuffer.allocateDirect(size);
- } else {
- buffer = PinotNonNativeOrderLBuffer.allocateDirect(size);
- }
- }
+ buffer = _factory.allocateDirect(size, byteOrder);
} catch (Exception e) {
- LOGGER
- .error("Caught exception while allocating direct buffer of size: {} with description: {}", size, description,
- e);
+ LOGGER.error("Caught exception while allocating direct buffer of size: {} with description: {}", size,
+ description, e);
LOGGER.error("Buffer stats: {}", getBufferStats());
ALLOCATION_FAILURE_COUNT.getAndIncrement();
throw e;
@@ -144,15 +187,7 @@ public static PinotDataBuffer loadFile(File file, long offset, long size, ByteOr
throws IOException {
PinotDataBuffer buffer;
try {
- if (size <= Integer.MAX_VALUE) {
- buffer = PinotByteBuffer.loadFile(file, offset, (int) size, byteOrder);
- } else {
- if (byteOrder == NATIVE_ORDER) {
- buffer = PinotNativeOrderLBuffer.loadFile(file, offset, size);
- } else {
- buffer = PinotNonNativeOrderLBuffer.loadFile(file, offset, size);
- }
- }
+ buffer = _factory.readFile(file, offset, size, byteOrder);
} catch (Exception e) {
LOGGER.error("Caught exception while loading file: {} from offset: {} of size: {} with description: {}",
file.getAbsolutePath(), offset, size, description, e);
@@ -187,15 +222,7 @@ public static PinotDataBuffer mapFile(File file, boolean readOnly, long offset,
throws IOException {
PinotDataBuffer buffer;
try {
- if (size <= Integer.MAX_VALUE) {
- buffer = PinotByteBuffer.mapFile(file, readOnly, offset, (int) size, byteOrder);
- } else {
- if (byteOrder == NATIVE_ORDER) {
- buffer = PinotNativeOrderLBuffer.mapFile(file, readOnly, offset, size);
- } else {
- buffer = PinotNonNativeOrderLBuffer.mapFile(file, readOnly, offset, size);
- }
- }
+ buffer = _factory.mapFile(file, readOnly, offset, size, byteOrder);
} catch (Exception e) {
LOGGER.error("Caught exception while mapping file: {} from offset: {} of size: {} with description: {}",
file.getAbsolutePath(), offset, size, description, e);
@@ -286,70 +313,115 @@ public synchronized void close()
}
}
- public abstract byte getByte(int offset);
+ public byte getByte(int offset) {
+ return getByte((long) offset);
+ }
public abstract byte getByte(long offset);
- public abstract void putByte(int offset, byte value);
+ public void putByte(int offset, byte value) {
+ putByte((long) offset, value);
+ }
public abstract void putByte(long offset, byte value);
- public abstract char getChar(int offset);
+ public char getChar(int offset) {
+ return getChar((long) offset);
+ }
public abstract char getChar(long offset);
- public abstract void putChar(int offset, char value);
+ public void putChar(int offset, char value) {
+ putChar((long) offset, value);
+ }
public abstract void putChar(long offset, char value);
- public abstract short getShort(int offset);
+ public short getShort(int offset) {
+ return getShort((long) offset);
+ }
public abstract short getShort(long offset);
- public abstract void putShort(int offset, short value);
+ public void putShort(int offset, short value) {
+ putShort((long) offset, value);
+ }
public abstract void putShort(long offset, short value);
- public abstract int getInt(int offset);
+ public int getInt(int offset) {
+ return getInt((long) offset);
+ }
public abstract int getInt(long offset);
- public abstract void putInt(int offset, int value);
+ public void putInt(int offset, int value) {
+ putInt((long) offset, value);
+ }
public abstract void putInt(long offset, int value);
- public abstract long getLong(int offset);
+ public long getLong(int offset) {
+ return getLong((long) offset);
+ }
public abstract long getLong(long offset);
- public abstract void putLong(int offset, long value);
+ public void putLong(int offset, long value) {
+ putLong((long) offset, value);
+ }
public abstract void putLong(long offset, long value);
- public abstract float getFloat(int offset);
+ public float getFloat(int offset) {
+ return getFloat((long) offset);
+ }
public abstract float getFloat(long offset);
- public abstract void putFloat(int offset, float value);
+ public void putFloat(int offset, float value) {
+ putFloat((long) offset, value);
+ }
public abstract void putFloat(long offset, float value);
- public abstract double getDouble(int offset);
+ public double getDouble(int offset) {
+ return getDouble((long) offset);
+ }
public abstract double getDouble(long offset);
- public abstract void putDouble(int offset, double value);
+ public void putDouble(int offset, double value) {
+ putDouble((long) offset, value);
+ }
public abstract void putDouble(long offset, double value);
+ /**
+ * Given an array of bytes, copies the content of this object into the array of bytes.
+ * The first byte to be copied is the one that could be read with {@code this.getByte(offset)}
+ */
public abstract void copyTo(long offset, byte[] buffer, int destOffset, int size);
+ /**
+ * Given an array of bytes, copies the content of this object into the array of bytes.
+ * The first byte to be copied is the one that could be read with {@code this.getByte(offset)}
+ */
public void copyTo(long offset, byte[] buffer) {
copyTo(offset, buffer, 0, buffer.length);
}
+ /**
+ * Note: It is the responsibility of the caller to make sure arguments are checked before the methods are called.
+ * While some rudimentary checks are performed on the input, the checks are best effort and when performance is an
+ * overriding priority, as when methods of this class are optimized by the runtime compiler, some or all checks
+ * (if any) may be elided. Hence, the caller must not rely on the checks and corresponding exceptions!
+ */
public abstract void copyTo(long offset, PinotDataBuffer buffer, long destOffset, long size);
+ /**
+ * Given an array of bytes, writes the content in the specified position.
+ */
public abstract void readFrom(long offset, byte[] buffer, int srcOffset, int size);
public void readFrom(long offset, byte[] buffer) {
@@ -379,8 +451,49 @@ public PinotDataBuffer view(long start, long end) {
return view(start, end, order());
}
+ /**
+ * Returns an ByteBuffer with the same content of this buffer.
+ *
+ * This receiver object and the returned ByteBuffer share the same memory address, but the receiver conserves the
+ * ownership. This means that:
+ *
+ * - The returned ByteBuffer should not be released (aka freed in C). For example, its cleaner should not be
+ * called. Violations of this rule may produce segmentation faults
+ * - The returned ByteBuffer should not be used once the receiver is released.
+ * Violations of this rule may produce segmentation faults
+ * - A write made by either the receiver or the returned ByteBuffer will be seen by the other.
+ *
+ *
+ * Depending on the implementation, this may be a view (and therefore changes on any buffer will be seen by the other)
+ * or a copy (in which case the cost will be higher, but each copy will have their own lifecycle).
+ *
+ * @param byteOrder The byte order of the returned ByteBuffer. No special treatment is done if the order of the
+ * receiver buffer is different from the order requested. In other words: if this buffer was written
+ * in big endian and the direct buffer is requested in little endian, the integers read from each
+ * buffer will be different.
+ */
public abstract ByteBuffer toDirectByteBuffer(long offset, int size, ByteOrder byteOrder);
+ /**
+ * Returns an ByteBuffer with the same content of this buffer.
+ *
+ * This receiver object and the returned ByteBuffer share the same memory address, but the receiver conserves the
+ * ownership. This means that:
+ *
+ * - The returned ByteBuffer should not be released (aka freed in C). For example, its cleaner should not be
+ * called. Violations of this rule may produce segmentation faults
+ * - The returned ByteBuffer should not be used once the receiver is released.
+ * Violations of this rule may produce segmentation faults
+ * - A write made by either the receiver or the returned ByteBuffer will be seen by the other.
+ *
+ *
+ * Depending on the implementation, this may be a view (and therefore changes on any buffer will be seen by the other)
+ * or a copy (in which case the cost will be higher, but each copy will have their own lifecycle).
+ *
+ */
+ // TODO: Most calls to this method are just used to then read the content of the buffer.
+ // This is unnecessary an generates 2-5 unnecessary objects. We should benchmark whether there is some advantage on
+ // transforming this buffer into a IntBuffer/LongBuffer/etc when reading sequentially
public ByteBuffer toDirectByteBuffer(long offset, int size) {
return toDirectByteBuffer(offset, size, order());
}
@@ -389,4 +502,21 @@ public ByteBuffer toDirectByteBuffer(long offset, int size) {
public abstract void release()
throws IOException;
+
+ public boolean isCloseable() {
+ return _closeable;
+ }
+
+ protected void checkLimits(long capacity, long offset, long size) {
+ if (offset < 0) {
+ throw new IllegalArgumentException("Offset " + offset + " cannot be negative");
+ }
+ if (size < 0) {
+ throw new IllegalArgumentException("Size " + size + " cannot be negative");
+ }
+ if (offset + size > capacity) {
+ throw new IllegalArgumentException("Size (" + size + ") + offset (" + offset + ") exceeds the capacity of "
+ + capacity);
+ }
+ }
}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/SmallWithFallbackPinotBufferFactory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/SmallWithFallbackPinotBufferFactory.java
new file mode 100644
index 000000000000..c16cd4f89f9d
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/SmallWithFallbackPinotBufferFactory.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.memory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteOrder;
+
+
+/**
+ * A factory that receives two delegates, using one when the requested buffer can be indexes with integers and the other
+ * in the other case.
+ *
+ * This is commonly used to use ByteBuffers when possible. The utility of that is questionable, as it can increase the
+ * number of megamorphic calls in the hot path and also make errors related with -XX:MaxDirectMemorySize more
+ * indeterministic. But it is also the default behavior of Pinot, so it is kept as the default for compatibility
+ * reasons.
+ */
+public class SmallWithFallbackPinotBufferFactory implements PinotBufferFactory {
+ private final PinotBufferFactory _small;
+ private final PinotBufferFactory _fallback;
+
+ public SmallWithFallbackPinotBufferFactory(PinotBufferFactory small, PinotBufferFactory fallback) {
+ _small = small;
+ _fallback = fallback;
+ }
+
+ @Override
+ public PinotDataBuffer allocateDirect(long size, ByteOrder byteOrder) {
+ if (size <= Integer.MAX_VALUE) {
+ return _small.allocateDirect(size, byteOrder);
+ } else {
+ return _fallback.allocateDirect(size, byteOrder);
+ }
+ }
+
+ @Override
+ public PinotDataBuffer readFile(File file, long offset, long size, ByteOrder byteOrder)
+ throws IOException {
+ if (size <= Integer.MAX_VALUE) {
+ return _small.readFile(file, offset, size, byteOrder);
+ } else {
+ return _fallback.readFile(file, offset, size, byteOrder);
+ }
+ }
+
+ @Override
+ public PinotDataBuffer mapFile(File file, boolean readOnly, long offset, long size, ByteOrder byteOrder)
+ throws IOException {
+ if (size <= Integer.MAX_VALUE) {
+ return _small.mapFile(file, readOnly, offset, size, byteOrder);
+ } else {
+ return _fallback.mapFile(file, readOnly, offset, size, byteOrder);
+ }
+ }
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/chronicle/ChronicleDataBuffer.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/chronicle/ChronicleDataBuffer.java
new file mode 100644
index 000000000000..8219999e2705
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/chronicle/ChronicleDataBuffer.java
@@ -0,0 +1,259 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.memory.chronicle;
+
+import com.google.common.base.Preconditions;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import net.openhft.chronicle.bytes.Bytes;
+import net.openhft.chronicle.bytes.BytesStore;
+import net.openhft.chronicle.bytes.MappedBytes;
+import org.apache.pinot.segment.spi.memory.ByteBufferUtil;
+import org.apache.pinot.segment.spi.memory.NonNativePinotDataBuffer;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+
+
+public class ChronicleDataBuffer extends PinotDataBuffer {
+
+ private final Bytes> _store;
+ private final boolean _flushable;
+ private boolean _closed = false;
+ private final long _viewStart;
+ private final long _viewEnd;
+
+ public ChronicleDataBuffer(Bytes> store, boolean closeable, boolean flushable, long start, long end) {
+ super(closeable);
+ Preconditions.checkArgument(start >= 0, "Invalid start " + start + ". It must be > 0");
+ Preconditions.checkArgument(start <= end, "Invalid start " + start + " and end " + end
+ + ". Start cannot be greater than end");
+ Preconditions.checkArgument(end + store.start() <= store.capacity(), "Invalid end " + end
+ + ". It cannot be greater than capacity (" + (store.capacity() - store.start()) + ")");
+ _store = store;
+ _flushable = flushable;
+ _viewStart = start + store.start();
+ _viewEnd = end + store.start();
+
+ _store.writePosition(_store.capacity());
+ }
+
+ public ChronicleDataBuffer(ChronicleDataBuffer other, long start, long end) {
+ this(other._store, false, other._flushable, start, end);
+ }
+
+ @Override
+ public byte getByte(long offset) {
+ return _store.readByte(offset);
+ }
+
+ @Override
+ public void putByte(long offset, byte value) {
+ _store.writeByte(offset, value);
+ }
+
+ @Override
+ public char getChar(long offset) {
+ return (char) _store.readUnsignedShort(offset);
+ }
+
+ @Override
+ public void putChar(long offset, char value) {
+ _store.writeUnsignedShort(offset + _viewStart, value);
+ }
+
+ @Override
+ public short getShort(long offset) {
+ return _store.readShort(offset + _viewStart);
+ }
+
+ @Override
+ public void putShort(long offset, short value) {
+ _store.writeShort(offset + _viewStart, value);
+ }
+
+ @Override
+ public int getInt(long offset) {
+ return _store.readInt(offset + _viewStart);
+ }
+
+ @Override
+ public void putInt(long offset, int value) {
+ _store.writeInt(offset + _viewStart, value);
+ }
+
+ @Override
+ public long getLong(long offset) {
+ return _store.readLong(offset + _viewStart);
+ }
+
+ @Override
+ public void putLong(long offset, long value) {
+ _store.writeLong(offset + _viewStart, value);
+ }
+
+ @Override
+ public float getFloat(long offset) {
+ return _store.readFloat(offset + _viewStart);
+ }
+
+ @Override
+ public void putFloat(long offset, float value) {
+ _store.writeFloat(offset + _viewStart, value);
+ }
+
+ @Override
+ public double getDouble(long offset) {
+ return _store.readDouble(offset + _viewStart);
+ }
+
+ @Override
+ public void putDouble(long offset, double value) {
+ _store.writeDouble(offset + _viewStart, value);
+ }
+
+ @Override
+ public void copyTo(long offset, byte[] buffer, int destOffset, int size) {
+ Bytes dest = Bytes.wrapForWrite(buffer);
+ dest.writePosition(destOffset);
+ dest.writeLimit(destOffset + size);
+ _store.readPosition(offset + _viewStart);
+ _store.copyTo(dest);
+ }
+
+ @Override
+ public void copyTo(long offset, PinotDataBuffer buffer, long destOffset, long size) {
+ long actualSize = Math.min(size, buffer.size() - destOffset);
+ actualSize = Math.min(actualSize, _store.capacity() - offset + _viewStart);
+
+ if (buffer instanceof ChronicleDataBuffer) {
+ ChronicleDataBuffer other = (ChronicleDataBuffer) buffer;
+ if (_store == other._store) {
+ _store.move(offset + _viewStart, destOffset + other._viewStart, size);
+ } else {
+ _store.readPosition(offset + _viewStart);
+ other._store.writePosition(destOffset + other._viewStart);
+ _store.copyTo(other._store);
+ }
+ } else {
+ long copied = 0;
+ while (copied < actualSize) {
+ int subSize = Math.min((int) (actualSize - copied), Integer.MAX_VALUE);
+ _store.readPosition(offset + _viewStart + copied);
+ _store.readLimit(offset + _viewStart + copied + subSize);
+ BytesStore, ?> into = BytesStore.wrap(buffer.toDirectByteBuffer(destOffset, subSize));
+ copied += _store.copyTo(into);
+ }
+ _store.readLimit(_store.capacity());
+ }
+ }
+
+ @Override
+ public void readFrom(long offset, byte[] buffer, int srcOffset, int size) {
+ _store.write(offset + _viewStart, buffer, srcOffset, size);
+ }
+
+ @Override
+ public void readFrom(long offset, ByteBuffer buffer) {
+ ByteBuffer nativeBuf;
+ if (buffer.order() == ByteOrder.nativeOrder()) {
+ nativeBuf = buffer;
+ } else {
+ nativeBuf = buffer.duplicate().order(ByteOrder.nativeOrder());
+ }
+ _store.write(offset + _viewStart, nativeBuf, nativeBuf.position(), nativeBuf.remaining());
+ }
+
+ @Override
+ public void readFrom(long offset, File file, long srcOffset, long size)
+ throws IOException {
+ checkLimits(_viewEnd - _viewStart, offset, size);
+ try (FileInputStream fos = new FileInputStream(file); FileChannel channel = fos.getChannel()) {
+ if (srcOffset + size > file.length()) {
+ throw new IllegalArgumentException("Final position cannot be larger than the file length");
+ }
+ channel.position(srcOffset);
+
+ int len;
+ byte[] buffer = new byte[4096];
+ long offsetOnStore = offset + _viewStart;
+ long pendingBytes = size - srcOffset;
+ while (pendingBytes >= 0 && (len = fos.read(buffer, 0, Math.min((int) pendingBytes, 4096))) > 0) {
+ _store.write(offsetOnStore, buffer, 0, len);
+ offsetOnStore += len;
+ pendingBytes -= len;
+ }
+ }
+ }
+
+ @Override
+ public long size() {
+ return _viewEnd - _viewStart;
+ }
+
+ @Override
+ public ByteOrder order() {
+ return _store.byteOrder();
+ }
+
+ @Override
+ public PinotDataBuffer view(long start, long end, ByteOrder byteOrder) {
+ Preconditions.checkArgument(start >= 0, "Start cannot be negative");
+ Preconditions.checkArgument(start < _viewEnd - _viewStart,
+ "Start cannot be larger than this buffer capacity");
+ Preconditions.checkArgument(end <= _viewEnd, "End cannot be higher than this buffer capacity");
+ ChronicleDataBuffer buffer = new ChronicleDataBuffer(this, start + _viewStart, end + _viewEnd);
+
+ return byteOrder == ByteOrder.nativeOrder() ? buffer : new NonNativePinotDataBuffer(buffer);
+ }
+
+ @Override
+ public ByteBuffer toDirectByteBuffer(long offset, int size, ByteOrder byteOrder) {
+ Object underlying = _store.underlyingObject();
+ if (underlying instanceof ByteBuffer) {
+ return ((ByteBuffer) underlying)
+ .duplicate()
+ .position((int) offset)
+ .limit((int) offset + size)
+ .slice()
+ .order(byteOrder);
+ }
+ return ByteBufferUtil.newDirectByteBuffer(_store.addressForRead(size) + offset + _viewStart, size, _store)
+ .order(byteOrder);
+ }
+
+ @Override
+ public void flush() {
+ if (_flushable && _store instanceof MappedBytes) {
+ _store.writePosition(_store.capacity());
+ ((MappedBytes) _store).sync();
+ }
+ }
+
+ @Override
+ public synchronized void release()
+ throws IOException {
+ if (!_closed) {
+ _closed = true;
+ _store.releaseLast();
+ }
+ }
+}
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/chronicle/ChroniclePinotBufferFactory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/chronicle/ChroniclePinotBufferFactory.java
new file mode 100644
index 000000000000..e6d0e39797f1
--- /dev/null
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/chronicle/ChroniclePinotBufferFactory.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.memory.chronicle;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import net.openhft.chronicle.bytes.Bytes;
+import net.openhft.chronicle.bytes.MappedBytes;
+import org.apache.pinot.segment.spi.memory.OnlyNativePinotBufferFactory;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+
+
+public class ChroniclePinotBufferFactory extends OnlyNativePinotBufferFactory {
+ @Override
+ protected PinotDataBuffer allocateDirect(long size) {
+ Bytes> store;
+ if (size < Integer.MAX_VALUE) {
+ store = Bytes.wrapForWrite(ByteBuffer.allocateDirect((int) size));
+ } else {
+ store = Bytes.allocateDirect(size);
+ }
+ return new ChronicleDataBuffer(store, true, false, 0, size);
+ }
+
+ @Override
+ protected PinotDataBuffer mapFile(File file, boolean readOnly, long offset, long size)
+ throws IOException {
+ MappedBytes mappedBytes = MappedBytes.singleMappedBytes(file, offset + size, readOnly);
+ return new ChronicleDataBuffer(mappedBytes, true, true, offset, size + offset);
+ }
+}
diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotByteBufferTest.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotByteBufferTest.java
new file mode 100644
index 000000000000..97b144d6dd68
--- /dev/null
+++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotByteBufferTest.java
@@ -0,0 +1,7 @@
+package org.apache.pinot.segment.spi.memory;
+
+public class PinotByteBufferTest extends PinotDataBufferInstanceTestBase {
+ public PinotByteBufferTest() {
+ super(new ByteBufferPinotBufferFactory());
+ }
+}
diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotChronicleByteBufferTest.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotChronicleByteBufferTest.java
new file mode 100644
index 000000000000..23afa356e594
--- /dev/null
+++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotChronicleByteBufferTest.java
@@ -0,0 +1,10 @@
+package org.apache.pinot.segment.spi.memory;
+
+import org.apache.pinot.segment.spi.memory.chronicle.ChroniclePinotBufferFactory;
+
+
+public class PinotChronicleByteBufferTest extends PinotDataBufferInstanceTestBase {
+ public PinotChronicleByteBufferTest() {
+ super(new ChroniclePinotBufferFactory());
+ }
+}
diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferInstanceTestBase.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferInstanceTestBase.java
new file mode 100644
index 000000000000..4c6f2e548fd2
--- /dev/null
+++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferInstanceTestBase.java
@@ -0,0 +1,330 @@
+package org.apache.pinot.segment.spi.memory;
+
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Arrays;
+import java.util.concurrent.Future;
+import net.openhft.chronicle.core.Jvm;
+import org.apache.commons.io.FileUtils;
+import org.testng.Assert;
+import org.testng.SkipException;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public abstract class PinotDataBufferInstanceTestBase extends PinotDataBufferTestBase {
+
+ public final PinotBufferFactory _factory;
+
+ public PinotDataBufferInstanceTestBase(PinotBufferFactory factory) {
+ _factory = factory;
+ }
+
+ @SuppressWarnings("RedundantExplicitClose")
+ @Test
+ public void testMultipleClose()
+ throws Exception {
+ try (PinotDataBuffer buffer = _factory.allocateDirect(BUFFER_SIZE, PinotDataBuffer.NATIVE_ORDER)) {
+ buffer.close();
+ }
+ try (PinotDataBuffer buffer = _factory.allocateDirect(BUFFER_SIZE, PinotDataBuffer.NATIVE_ORDER)) {
+ buffer.close();
+ }
+ try (RandomAccessFile randomAccessFile = new RandomAccessFile(TEMP_FILE, "rw")) {
+ randomAccessFile.setLength(FILE_OFFSET + BUFFER_SIZE);
+ try (PinotDataBuffer buffer = _factory.readFile(TEMP_FILE, FILE_OFFSET, BUFFER_SIZE, ByteOrder.LITTLE_ENDIAN)) {
+ buffer.close();
+ }
+ try (PinotDataBuffer buffer = _factory.readFile(TEMP_FILE, FILE_OFFSET, BUFFER_SIZE, ByteOrder.BIG_ENDIAN)) {
+ buffer.close();
+ }
+ try (PinotDataBuffer buffer = _factory
+ .mapFile(TEMP_FILE, true, FILE_OFFSET, BUFFER_SIZE, ByteOrder.LITTLE_ENDIAN)) {
+ buffer.close();
+ }
+ try (PinotDataBuffer buffer =
+ _factory.mapFile(TEMP_FILE, true, FILE_OFFSET, BUFFER_SIZE, ByteOrder.BIG_ENDIAN)) {
+ buffer.close();
+ }
+ } finally {
+ FileUtils.forceDelete(TEMP_FILE);
+ }
+ }
+
+ @Test
+ public void testDirectBE()
+ throws Exception {
+ try (PinotDataBuffer buffer = _factory.allocateDirect(BUFFER_SIZE, ByteOrder.BIG_ENDIAN)) {
+ Assert.assertSame(buffer.order(), ByteOrder.BIG_ENDIAN);
+ testPinotDataBuffer(buffer);
+ }
+ }
+
+ @Test
+ public void testDirectLE()
+ throws Exception {
+ try (PinotDataBuffer buffer = PinotByteBuffer.allocateDirect(BUFFER_SIZE, ByteOrder.LITTLE_ENDIAN)) {
+ Assert.assertSame(buffer.order(), ByteOrder.LITTLE_ENDIAN);
+ testPinotDataBuffer(buffer);
+ }
+ }
+
+ @Test
+ public void testReadFileBE()
+ throws Exception {
+ try (RandomAccessFile randomAccessFile = new RandomAccessFile(TEMP_FILE, "rw")) {
+ randomAccessFile.setLength(FILE_OFFSET + BUFFER_SIZE);
+ try (PinotDataBuffer buffer = _factory
+ .readFile(TEMP_FILE, FILE_OFFSET, BUFFER_SIZE, ByteOrder.BIG_ENDIAN)) {
+ Assert.assertSame(buffer.order(), ByteOrder.BIG_ENDIAN);
+ testPinotDataBuffer(buffer);
+ }
+ }
+ }
+
+ @Test
+ public void testReadFileLE()
+ throws Exception {
+ try (RandomAccessFile randomAccessFile = new RandomAccessFile(TEMP_FILE, "rw")) {
+ randomAccessFile.setLength(FILE_OFFSET + BUFFER_SIZE);
+ try (PinotDataBuffer buffer = _factory
+ .readFile(TEMP_FILE, FILE_OFFSET, BUFFER_SIZE, ByteOrder.LITTLE_ENDIAN)) {
+ Assert.assertSame(buffer.order(), ByteOrder.LITTLE_ENDIAN);
+ testPinotDataBuffer(buffer);
+ }
+ }
+ }
+
+ @Test
+ public void testMapFileBE()
+ throws Exception {
+ try (RandomAccessFile randomAccessFile = new RandomAccessFile(TEMP_FILE, "rw")) {
+ randomAccessFile.setLength(FILE_OFFSET + BUFFER_SIZE);
+ try (PinotDataBuffer buffer = _factory
+ .mapFile(TEMP_FILE, false, FILE_OFFSET, BUFFER_SIZE, ByteOrder.BIG_ENDIAN)) {
+ Assert.assertSame(buffer.order(), ByteOrder.BIG_ENDIAN);
+ testPinotDataBuffer(buffer);
+ }
+ }
+ }
+
+ @Test
+ public void testMapFileLE()
+ throws Exception {
+ try (RandomAccessFile randomAccessFile = new RandomAccessFile(TEMP_FILE, "rw")) {
+ randomAccessFile.setLength(FILE_OFFSET + BUFFER_SIZE);
+ try (PinotDataBuffer buffer = _factory
+ .mapFile(TEMP_FILE, false, FILE_OFFSET, BUFFER_SIZE, ByteOrder.LITTLE_ENDIAN)) {
+ Assert.assertSame(buffer.order(), ByteOrder.LITTLE_ENDIAN);
+ testPinotDataBuffer(buffer);
+ }
+ }
+ }
+
+ protected void testPinotDataBuffer(PinotDataBuffer buffer)
+ throws Exception {
+ Assert.assertEquals(buffer.size(), BUFFER_SIZE);
+ testReadWriteByte(buffer);
+ testReadWriteChar(buffer);
+ testReadWriteShort(buffer);
+ testReadWriteInt(buffer);
+ testReadWriteLong(buffer);
+ testReadWriteFloat(buffer);
+ testReadWriteDouble(buffer);
+ testReadWriteBytes(buffer);
+ testReadWritePinotDataBuffer(buffer);
+ testReadFromByteBuffer(buffer);
+ testConcurrentReadWrite(buffer);
+ }
+
+ protected void testReadWriteByte(PinotDataBuffer buffer) {
+ for (int i = 0; i < NUM_ROUNDS; i++) {
+ int intOffset = RANDOM.nextInt(BUFFER_SIZE);
+ buffer.putByte(intOffset, _bytes[i]);
+ Assert.assertEquals(buffer.getByte(intOffset), _bytes[i]);
+ }
+ for (int i = 0; i < NUM_ROUNDS; i++) {
+ long longOffset = RANDOM.nextInt(BUFFER_SIZE);
+ buffer.putByte(longOffset, _bytes[i]);
+ Assert.assertEquals(buffer.getByte(longOffset), _bytes[i]);
+ }
+ }
+
+ protected void testReadWriteChar(PinotDataBuffer buffer) {
+ for (int i = 0; i < NUM_ROUNDS; i++) {
+ int index = RANDOM.nextInt(CHAR_ARRAY_LENGTH);
+ int intOffset = index * Byte.BYTES;
+ buffer.putChar(intOffset, _chars[i]);
+ Assert.assertEquals(buffer.getChar(intOffset), _chars[i]);
+ }
+ for (int i = 0; i < NUM_ROUNDS; i++) {
+ int index = RANDOM.nextInt(CHAR_ARRAY_LENGTH);
+ long longOffset = index * Byte.BYTES;
+ buffer.putChar(longOffset, _chars[i]);
+ Assert.assertEquals(buffer.getChar(longOffset), _chars[i]);
+ }
+ }
+
+ protected void testReadWriteShort(PinotDataBuffer buffer) {
+ for (int i = 0; i < NUM_ROUNDS; i++) {
+ int index = RANDOM.nextInt(SHORT_ARRAY_LENGTH);
+ int intOffset = index * Byte.BYTES;
+ buffer.putShort(intOffset, _shorts[i]);
+ Assert.assertEquals(buffer.getShort(intOffset), _shorts[i]);
+ }
+ for (int i = 0; i < NUM_ROUNDS; i++) {
+ int index = RANDOM.nextInt(SHORT_ARRAY_LENGTH);
+ long longOffset = index * Byte.BYTES;
+ buffer.putShort(longOffset, _shorts[i]);
+ Assert.assertEquals(buffer.getShort(longOffset), _shorts[i]);
+ }
+ }
+
+ protected void testReadWriteInt(PinotDataBuffer buffer) {
+ for (int i = 0; i < NUM_ROUNDS; i++) {
+ int index = RANDOM.nextInt(INT_ARRAY_LENGTH);
+ int intOffset = index * Byte.BYTES;
+ buffer.putInt(intOffset, _ints[i]);
+ Assert.assertEquals(buffer.getInt(intOffset), _ints[i]);
+ }
+ for (int i = 0; i < NUM_ROUNDS; i++) {
+ int index = RANDOM.nextInt(INT_ARRAY_LENGTH);
+ long longOffset = index * Byte.BYTES;
+ buffer.putInt(longOffset, _ints[i]);
+ Assert.assertEquals(buffer.getInt(longOffset), _ints[i]);
+ }
+ }
+
+ protected void testReadWriteLong(PinotDataBuffer buffer) {
+ for (int i = 0; i < NUM_ROUNDS; i++) {
+ int index = RANDOM.nextInt(LONG_ARRAY_LENGTH);
+ int intOffset = index * Byte.BYTES;
+ buffer.putLong(intOffset, _longs[i]);
+ Assert.assertEquals(buffer.getLong(intOffset), _longs[i]);
+ }
+ for (int i = 0; i < NUM_ROUNDS; i++) {
+ int index = RANDOM.nextInt(LONG_ARRAY_LENGTH);
+ long longOffset = index * Byte.BYTES;
+ buffer.putLong(longOffset, _longs[i]);
+ Assert.assertEquals(buffer.getLong(longOffset), _longs[i]);
+ }
+ }
+
+ protected void testReadWriteFloat(PinotDataBuffer buffer) {
+ for (int i = 0; i < NUM_ROUNDS; i++) {
+ int index = RANDOM.nextInt(FLOAT_ARRAY_LENGTH);
+ int intOffset = index * Byte.BYTES;
+ buffer.putFloat(intOffset, _floats[i]);
+ Assert.assertEquals(buffer.getFloat(intOffset), _floats[i]);
+ }
+ for (int i = 0; i < NUM_ROUNDS; i++) {
+ int index = RANDOM.nextInt(FLOAT_ARRAY_LENGTH);
+ long longOffset = index * Byte.BYTES;
+ buffer.putFloat(longOffset, _floats[i]);
+ Assert.assertEquals(buffer.getFloat(longOffset), _floats[i]);
+ }
+ }
+
+ protected void testReadWriteDouble(PinotDataBuffer buffer) {
+ for (int i = 0; i < NUM_ROUNDS; i++) {
+ int index = RANDOM.nextInt(DOUBLE_ARRAY_LENGTH);
+ int intOffset = index * Byte.BYTES;
+ buffer.putDouble(intOffset, _doubles[i]);
+ Assert.assertEquals(buffer.getDouble(intOffset), _doubles[i]);
+ }
+ for (int i = 0; i < NUM_ROUNDS; i++) {
+ int index = RANDOM.nextInt(DOUBLE_ARRAY_LENGTH);
+ long longOffset = index * Byte.BYTES;
+ buffer.putDouble(longOffset, _doubles[i]);
+ Assert.assertEquals(buffer.getDouble(longOffset), _doubles[i]);
+ }
+ }
+
+ protected void testReadWriteBytes(PinotDataBuffer buffer) {
+ byte[] readBuffer = new byte[MAX_BYTES_LENGTH];
+ byte[] writeBuffer = new byte[MAX_BYTES_LENGTH];
+ for (int i = 0; i < NUM_ROUNDS; i++) {
+ int length = RANDOM.nextInt(MAX_BYTES_LENGTH);
+ int offset = RANDOM.nextInt(BUFFER_SIZE - length);
+ int arrayOffset = RANDOM.nextInt(MAX_BYTES_LENGTH - length);
+ System.arraycopy(_bytes, offset, readBuffer, arrayOffset, length);
+ buffer.readFrom(offset, readBuffer, arrayOffset, length);
+ buffer.copyTo(offset, writeBuffer, arrayOffset, length);
+ int end = arrayOffset + length;
+ for (int j = arrayOffset; j < end; j++) {
+ Assert.assertEquals(writeBuffer[j], readBuffer[j]);
+ }
+ }
+ }
+
+ protected void testReadWritePinotDataBuffer(PinotDataBuffer buffer) {
+ testReadWritePinotDataBuffer(buffer, PinotByteBuffer.allocateDirect(MAX_BYTES_LENGTH, PinotDataBuffer.NATIVE_ORDER),
+ PinotByteBuffer.allocateDirect(MAX_BYTES_LENGTH, PinotDataBuffer.NON_NATIVE_ORDER));
+ testReadWritePinotDataBuffer(buffer,
+ PinotByteBuffer.allocateDirect(2 * MAX_BYTES_LENGTH, PinotDataBuffer.NON_NATIVE_ORDER)
+ .view(MAX_BYTES_LENGTH, 2 * MAX_BYTES_LENGTH),
+ PinotByteBuffer.allocateDirect(2 * MAX_BYTES_LENGTH, PinotDataBuffer.NATIVE_ORDER)
+ .view(MAX_BYTES_LENGTH, 2 * MAX_BYTES_LENGTH));
+ testReadWritePinotDataBuffer(buffer, _factory.allocateDirect(MAX_BYTES_LENGTH, ByteOrder.nativeOrder()),
+ _factory.allocateDirect(MAX_BYTES_LENGTH, PinotDataBuffer.NON_NATIVE_ORDER));
+ testReadWritePinotDataBuffer(buffer,
+ _factory.allocateDirect(2 * MAX_BYTES_LENGTH, ByteOrder.nativeOrder())
+ .view(MAX_BYTES_LENGTH, 2 * MAX_BYTES_LENGTH),
+ _factory.allocateDirect(2 * MAX_BYTES_LENGTH, PinotDataBuffer.NON_NATIVE_ORDER)
+ .view(MAX_BYTES_LENGTH, 2 * MAX_BYTES_LENGTH));
+ }
+
+ protected void testReadWritePinotDataBuffer(PinotDataBuffer buffer, PinotDataBuffer readBuffer,
+ PinotDataBuffer writeBuffer) {
+ for (int i = 0; i < NUM_ROUNDS; i++) {
+ int length = RANDOM.nextInt(MAX_BYTES_LENGTH);
+ int offset = RANDOM.nextInt(BUFFER_SIZE - length);
+ readBuffer.readFrom(0, _bytes, RANDOM.nextInt(BUFFER_SIZE - length), length);
+ readBuffer.copyTo(0, buffer, offset, length);
+ for (int j = 0; j < length; j++) {
+ Assert.assertEquals(buffer.getByte(j + offset), readBuffer.getByte(j));
+ }
+ buffer.copyTo(offset, writeBuffer, 0, length);
+ for (int j = 0; j < length; j++) {
+ Assert.assertEquals(writeBuffer.getByte(j), readBuffer.getByte(j));
+ }
+ }
+ }
+
+ protected void testReadFromByteBuffer(PinotDataBuffer buffer) {
+ byte[] readBuffer = new byte[MAX_BYTES_LENGTH];
+ for (int i = 0; i < NUM_ROUNDS; i++) {
+ int length = RANDOM.nextInt(MAX_BYTES_LENGTH);
+ int offset = RANDOM.nextInt(BUFFER_SIZE - length);
+ System.arraycopy(_bytes, offset, readBuffer, 0, length);
+ buffer.readFrom(offset, ByteBuffer.wrap(readBuffer, 0, length));
+ for (int j = 0; j < length; j++) {
+ Assert.assertEquals(buffer.getByte(offset + j), readBuffer[j]);
+ }
+ }
+ }
+
+ protected void testConcurrentReadWrite(PinotDataBuffer buffer)
+ throws Exception {
+ Future[] futures = new Future[NUM_ROUNDS];
+ for (int i = 0; i < NUM_ROUNDS; i++) {
+ futures[i] = EXECUTOR_SERVICE.submit(() -> {
+ int length = RANDOM.nextInt(MAX_BYTES_LENGTH);
+ int offset = RANDOM.nextInt(BUFFER_SIZE - length);
+ byte[] readBuffer = new byte[length];
+ byte[] writeBuffer = new byte[length];
+ System.arraycopy(_bytes, offset, readBuffer, 0, length);
+ buffer.readFrom(offset, readBuffer);
+ buffer.copyTo(offset, writeBuffer);
+ Assert.assertTrue(Arrays.equals(readBuffer, writeBuffer));
+ buffer.readFrom(offset, ByteBuffer.wrap(readBuffer));
+ buffer.copyTo(offset, writeBuffer);
+ Assert.assertTrue(Arrays.equals(readBuffer, writeBuffer));
+ });
+ }
+ for (int i = 0; i < NUM_ROUNDS; i++) {
+ futures[i].get();
+ }
+ }
+}
diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferStaticTest.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferStaticTest.java
new file mode 100644
index 000000000000..0f95263c2d13
--- /dev/null
+++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferStaticTest.java
@@ -0,0 +1,4 @@
+package org.apache.pinot.segment.spi.memory;
+
+public class PinotDataBufferStaticTest {
+}
diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferTest.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferTest.java
index 1538351c152b..bcf51a7890f2 100644
--- a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferTest.java
+++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferTest.java
@@ -18,69 +18,20 @@
*/
package org.apache.pinot.segment.spi.memory;
-import java.io.File;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
-import java.util.Random;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.io.FileUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-public class PinotDataBufferTest {
- private static final Random RANDOM = new Random();
- private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(10);
- private static final File TEMP_FILE = new File(FileUtils.getTempDirectory(), "PinotDataBufferTest");
- private static final int FILE_OFFSET = 10; // Not page-aligned
- private static final int BUFFER_SIZE = 10_000; // Not page-aligned
- private static final int CHAR_ARRAY_LENGTH = BUFFER_SIZE / Character.BYTES;
- private static final int SHORT_ARRAY_LENGTH = BUFFER_SIZE / Short.BYTES;
- private static final int INT_ARRAY_LENGTH = BUFFER_SIZE / Integer.BYTES;
- private static final int LONG_ARRAY_LENGTH = BUFFER_SIZE / Long.BYTES;
- private static final int FLOAT_ARRAY_LENGTH = BUFFER_SIZE / Float.BYTES;
- private static final int DOUBLE_ARRAY_LENGTH = BUFFER_SIZE / Double.BYTES;
- private static final int NUM_ROUNDS = 1000;
- private static final int MAX_BYTES_LENGTH = 100;
- private static final long LARGE_BUFFER_SIZE = Integer.MAX_VALUE + 2L; // Not page-aligned
-
- private byte[] _bytes = new byte[BUFFER_SIZE];
- private char[] _chars = new char[CHAR_ARRAY_LENGTH];
- private short[] _shorts = new short[SHORT_ARRAY_LENGTH];
- private int[] _ints = new int[INT_ARRAY_LENGTH];
- private long[] _longs = new long[LONG_ARRAY_LENGTH];
- private float[] _floats = new float[FLOAT_ARRAY_LENGTH];
- private double[] _doubles = new double[DOUBLE_ARRAY_LENGTH];
-
- @BeforeClass
- public void setUp() {
- for (int i = 0; i < BUFFER_SIZE; i++) {
- _bytes[i] = (byte) RANDOM.nextInt();
- }
- for (int i = 0; i < CHAR_ARRAY_LENGTH; i++) {
- _chars[i] = (char) RANDOM.nextInt();
- }
- for (int i = 0; i < SHORT_ARRAY_LENGTH; i++) {
- _shorts[i] = (short) RANDOM.nextInt();
- }
- for (int i = 0; i < INT_ARRAY_LENGTH; i++) {
- _ints[i] = RANDOM.nextInt();
- }
- for (int i = 0; i < LONG_ARRAY_LENGTH; i++) {
- _longs[i] = RANDOM.nextLong();
- }
- for (int i = 0; i < FLOAT_ARRAY_LENGTH; i++) {
- _floats[i] = RANDOM.nextFloat();
- }
- for (int i = 0; i < DOUBLE_ARRAY_LENGTH; i++) {
- _doubles[i] = RANDOM.nextDouble();
- }
+public class PinotDataBufferTest extends PinotDataBufferInstanceTestBase {
+ public PinotDataBufferTest(PinotBufferFactory factory) {
+ super(factory);
}
@Test
@@ -153,207 +104,6 @@ public void testPinotNonNativeOrderLBuffer()
}
}
- private void testPinotDataBuffer(PinotDataBuffer buffer)
- throws Exception {
- Assert.assertEquals(buffer.size(), BUFFER_SIZE);
- testReadWriteByte(buffer);
- testReadWriteChar(buffer);
- testReadWriteShort(buffer);
- testReadWriteInt(buffer);
- testReadWriteLong(buffer);
- testReadWriteFloat(buffer);
- testReadWriteDouble(buffer);
- testReadWriteBytes(buffer);
- testReadWritePinotDataBuffer(buffer);
- testReadFromByteBuffer(buffer);
- testConcurrentReadWrite(buffer);
- }
-
- private void testReadWriteByte(PinotDataBuffer buffer) {
- for (int i = 0; i < NUM_ROUNDS; i++) {
- int intOffset = RANDOM.nextInt(BUFFER_SIZE);
- buffer.putByte(intOffset, _bytes[i]);
- Assert.assertEquals(buffer.getByte(intOffset), _bytes[i]);
- }
- for (int i = 0; i < NUM_ROUNDS; i++) {
- long longOffset = RANDOM.nextInt(BUFFER_SIZE);
- buffer.putByte(longOffset, _bytes[i]);
- Assert.assertEquals(buffer.getByte(longOffset), _bytes[i]);
- }
- }
-
- private void testReadWriteChar(PinotDataBuffer buffer) {
- for (int i = 0; i < NUM_ROUNDS; i++) {
- int index = RANDOM.nextInt(CHAR_ARRAY_LENGTH);
- int intOffset = index * Byte.BYTES;
- buffer.putChar(intOffset, _chars[i]);
- Assert.assertEquals(buffer.getChar(intOffset), _chars[i]);
- }
- for (int i = 0; i < NUM_ROUNDS; i++) {
- int index = RANDOM.nextInt(CHAR_ARRAY_LENGTH);
- long longOffset = index * Byte.BYTES;
- buffer.putChar(longOffset, _chars[i]);
- Assert.assertEquals(buffer.getChar(longOffset), _chars[i]);
- }
- }
-
- private void testReadWriteShort(PinotDataBuffer buffer) {
- for (int i = 0; i < NUM_ROUNDS; i++) {
- int index = RANDOM.nextInt(SHORT_ARRAY_LENGTH);
- int intOffset = index * Byte.BYTES;
- buffer.putShort(intOffset, _shorts[i]);
- Assert.assertEquals(buffer.getShort(intOffset), _shorts[i]);
- }
- for (int i = 0; i < NUM_ROUNDS; i++) {
- int index = RANDOM.nextInt(SHORT_ARRAY_LENGTH);
- long longOffset = index * Byte.BYTES;
- buffer.putShort(longOffset, _shorts[i]);
- Assert.assertEquals(buffer.getShort(longOffset), _shorts[i]);
- }
- }
-
- private void testReadWriteInt(PinotDataBuffer buffer) {
- for (int i = 0; i < NUM_ROUNDS; i++) {
- int index = RANDOM.nextInt(INT_ARRAY_LENGTH);
- int intOffset = index * Byte.BYTES;
- buffer.putInt(intOffset, _ints[i]);
- Assert.assertEquals(buffer.getInt(intOffset), _ints[i]);
- }
- for (int i = 0; i < NUM_ROUNDS; i++) {
- int index = RANDOM.nextInt(INT_ARRAY_LENGTH);
- long longOffset = index * Byte.BYTES;
- buffer.putInt(longOffset, _ints[i]);
- Assert.assertEquals(buffer.getInt(longOffset), _ints[i]);
- }
- }
-
- private void testReadWriteLong(PinotDataBuffer buffer) {
- for (int i = 0; i < NUM_ROUNDS; i++) {
- int index = RANDOM.nextInt(LONG_ARRAY_LENGTH);
- int intOffset = index * Byte.BYTES;
- buffer.putLong(intOffset, _longs[i]);
- Assert.assertEquals(buffer.getLong(intOffset), _longs[i]);
- }
- for (int i = 0; i < NUM_ROUNDS; i++) {
- int index = RANDOM.nextInt(LONG_ARRAY_LENGTH);
- long longOffset = index * Byte.BYTES;
- buffer.putLong(longOffset, _longs[i]);
- Assert.assertEquals(buffer.getLong(longOffset), _longs[i]);
- }
- }
-
- private void testReadWriteFloat(PinotDataBuffer buffer) {
- for (int i = 0; i < NUM_ROUNDS; i++) {
- int index = RANDOM.nextInt(FLOAT_ARRAY_LENGTH);
- int intOffset = index * Byte.BYTES;
- buffer.putFloat(intOffset, _floats[i]);
- Assert.assertEquals(buffer.getFloat(intOffset), _floats[i]);
- }
- for (int i = 0; i < NUM_ROUNDS; i++) {
- int index = RANDOM.nextInt(FLOAT_ARRAY_LENGTH);
- long longOffset = index * Byte.BYTES;
- buffer.putFloat(longOffset, _floats[i]);
- Assert.assertEquals(buffer.getFloat(longOffset), _floats[i]);
- }
- }
-
- private void testReadWriteDouble(PinotDataBuffer buffer) {
- for (int i = 0; i < NUM_ROUNDS; i++) {
- int index = RANDOM.nextInt(DOUBLE_ARRAY_LENGTH);
- int intOffset = index * Byte.BYTES;
- buffer.putDouble(intOffset, _doubles[i]);
- Assert.assertEquals(buffer.getDouble(intOffset), _doubles[i]);
- }
- for (int i = 0; i < NUM_ROUNDS; i++) {
- int index = RANDOM.nextInt(DOUBLE_ARRAY_LENGTH);
- long longOffset = index * Byte.BYTES;
- buffer.putDouble(longOffset, _doubles[i]);
- Assert.assertEquals(buffer.getDouble(longOffset), _doubles[i]);
- }
- }
-
- private void testReadWriteBytes(PinotDataBuffer buffer) {
- byte[] readBuffer = new byte[MAX_BYTES_LENGTH];
- byte[] writeBuffer = new byte[MAX_BYTES_LENGTH];
- for (int i = 0; i < NUM_ROUNDS; i++) {
- int length = RANDOM.nextInt(MAX_BYTES_LENGTH);
- int offset = RANDOM.nextInt(BUFFER_SIZE - length);
- int arrayOffset = RANDOM.nextInt(MAX_BYTES_LENGTH - length);
- System.arraycopy(_bytes, offset, readBuffer, arrayOffset, length);
- buffer.readFrom(offset, readBuffer, arrayOffset, length);
- buffer.copyTo(offset, writeBuffer, arrayOffset, length);
- int end = arrayOffset + length;
- for (int j = arrayOffset; j < end; j++) {
- Assert.assertEquals(writeBuffer[j], readBuffer[j]);
- }
- }
- }
-
- private void testReadWritePinotDataBuffer(PinotDataBuffer buffer) {
- testReadWritePinotDataBuffer(buffer, PinotByteBuffer.allocateDirect(MAX_BYTES_LENGTH, PinotDataBuffer.NATIVE_ORDER),
- PinotByteBuffer.allocateDirect(MAX_BYTES_LENGTH, PinotDataBuffer.NON_NATIVE_ORDER));
- testReadWritePinotDataBuffer(buffer,
- PinotByteBuffer.allocateDirect(2 * MAX_BYTES_LENGTH, PinotDataBuffer.NON_NATIVE_ORDER)
- .view(MAX_BYTES_LENGTH, 2 * MAX_BYTES_LENGTH),
- PinotByteBuffer.allocateDirect(2 * MAX_BYTES_LENGTH, PinotDataBuffer.NATIVE_ORDER)
- .view(MAX_BYTES_LENGTH, 2 * MAX_BYTES_LENGTH));
- testReadWritePinotDataBuffer(buffer, PinotNativeOrderLBuffer.allocateDirect(MAX_BYTES_LENGTH),
- PinotNonNativeOrderLBuffer.allocateDirect(MAX_BYTES_LENGTH));
- testReadWritePinotDataBuffer(buffer,
- PinotNonNativeOrderLBuffer.allocateDirect(2 * MAX_BYTES_LENGTH).view(MAX_BYTES_LENGTH, 2 * MAX_BYTES_LENGTH),
- PinotNativeOrderLBuffer.allocateDirect(2 * MAX_BYTES_LENGTH).view(MAX_BYTES_LENGTH, 2 * MAX_BYTES_LENGTH));
- }
-
- private void testReadWritePinotDataBuffer(PinotDataBuffer buffer, PinotDataBuffer readBuffer,
- PinotDataBuffer writeBuffer) {
- for (int i = 0; i < NUM_ROUNDS; i++) {
- int length = RANDOM.nextInt(MAX_BYTES_LENGTH);
- int offset = RANDOM.nextInt(BUFFER_SIZE - length);
- readBuffer.readFrom(0, _bytes, RANDOM.nextInt(BUFFER_SIZE - length), length);
- readBuffer.copyTo(0, buffer, offset, length);
- buffer.copyTo(offset, writeBuffer, 0, length);
- for (int j = 0; j < length; j++) {
- Assert.assertEquals(writeBuffer.getByte(j), readBuffer.getByte(j));
- }
- }
- }
-
- private void testReadFromByteBuffer(PinotDataBuffer buffer) {
- byte[] readBuffer = new byte[MAX_BYTES_LENGTH];
- for (int i = 0; i < NUM_ROUNDS; i++) {
- int length = RANDOM.nextInt(MAX_BYTES_LENGTH);
- int offset = RANDOM.nextInt(BUFFER_SIZE - length);
- System.arraycopy(_bytes, offset, readBuffer, 0, length);
- buffer.readFrom(offset, ByteBuffer.wrap(readBuffer, 0, length));
- for (int j = 0; j < length; j++) {
- Assert.assertEquals(buffer.getByte(offset + j), readBuffer[j]);
- }
- }
- }
-
- private void testConcurrentReadWrite(PinotDataBuffer buffer)
- throws Exception {
- Future[] futures = new Future[NUM_ROUNDS];
- for (int i = 0; i < NUM_ROUNDS; i++) {
- futures[i] = EXECUTOR_SERVICE.submit(() -> {
- int length = RANDOM.nextInt(MAX_BYTES_LENGTH);
- int offset = RANDOM.nextInt(BUFFER_SIZE - length);
- byte[] readBuffer = new byte[length];
- byte[] writeBuffer = new byte[length];
- System.arraycopy(_bytes, offset, readBuffer, 0, length);
- buffer.readFrom(offset, readBuffer);
- buffer.copyTo(offset, writeBuffer);
- Assert.assertTrue(Arrays.equals(readBuffer, writeBuffer));
- buffer.readFrom(offset, ByteBuffer.wrap(readBuffer));
- buffer.copyTo(offset, writeBuffer);
- Assert.assertTrue(Arrays.equals(readBuffer, writeBuffer));
- });
- }
- for (int i = 0; i < NUM_ROUNDS; i++) {
- futures[i].get();
- }
- }
-
@Test
public void testPinotByteBufferReadWriteFile()
throws Exception {
@@ -510,46 +260,6 @@ private void testByteBuffer(ByteBuffer byteBuffer) {
}
}
- @SuppressWarnings("RedundantExplicitClose")
- @Test
- public void testMultipleClose()
- throws Exception {
- try (PinotDataBuffer buffer = PinotByteBuffer.allocateDirect(BUFFER_SIZE, PinotDataBuffer.NATIVE_ORDER)) {
- buffer.close();
- }
- try (PinotDataBuffer buffer = PinotNativeOrderLBuffer.allocateDirect(BUFFER_SIZE)) {
- buffer.close();
- }
- try (PinotDataBuffer buffer = PinotNonNativeOrderLBuffer.allocateDirect(BUFFER_SIZE)) {
- buffer.close();
- }
- try (RandomAccessFile randomAccessFile = new RandomAccessFile(TEMP_FILE, "rw")) {
- randomAccessFile.setLength(FILE_OFFSET + BUFFER_SIZE);
- try (PinotDataBuffer buffer = PinotByteBuffer
- .loadFile(TEMP_FILE, FILE_OFFSET, BUFFER_SIZE, PinotDataBuffer.NATIVE_ORDER)) {
- buffer.close();
- }
- try (PinotDataBuffer buffer = PinotNativeOrderLBuffer.loadFile(TEMP_FILE, FILE_OFFSET, BUFFER_SIZE)) {
- buffer.close();
- }
- try (PinotDataBuffer buffer = PinotNonNativeOrderLBuffer.loadFile(TEMP_FILE, FILE_OFFSET, BUFFER_SIZE)) {
- buffer.close();
- }
- try (PinotDataBuffer buffer = PinotByteBuffer
- .mapFile(TEMP_FILE, true, FILE_OFFSET, BUFFER_SIZE, PinotDataBuffer.NATIVE_ORDER)) {
- buffer.close();
- }
- try (PinotDataBuffer buffer = PinotNativeOrderLBuffer.mapFile(TEMP_FILE, true, FILE_OFFSET, BUFFER_SIZE)) {
- buffer.close();
- }
- try (PinotDataBuffer buffer = PinotNonNativeOrderLBuffer.mapFile(TEMP_FILE, true, FILE_OFFSET, BUFFER_SIZE)) {
- buffer.close();
- }
- } finally {
- FileUtils.forceDelete(TEMP_FILE);
- }
- }
-
@Test
public void testConstructors()
throws Exception {
@@ -580,15 +290,15 @@ public void testConstructors()
randomAccessFile.setLength(FILE_OFFSET + LARGE_BUFFER_SIZE);
try (PinotDataBuffer buffer1 = PinotDataBuffer
.allocateDirect(LARGE_BUFFER_SIZE, PinotDataBuffer.NATIVE_ORDER, null)) {
- Assert.assertTrue(buffer1 instanceof PinotNativeOrderLBuffer);
+ Assert.assertSame(buffer1.order(), PinotDataBuffer.NATIVE_ORDER);
testBufferStats(1, LARGE_BUFFER_SIZE, 0, 0);
try (PinotDataBuffer buffer2 = PinotDataBuffer
.loadFile(TEMP_FILE, FILE_OFFSET, LARGE_BUFFER_SIZE, PinotDataBuffer.NATIVE_ORDER, null)) {
- Assert.assertTrue(buffer2 instanceof PinotNativeOrderLBuffer);
+ Assert.assertSame(buffer2.order(), PinotDataBuffer.NATIVE_ORDER);
testBufferStats(2, 2 * LARGE_BUFFER_SIZE, 0, 0);
try (PinotDataBuffer buffer3 = PinotDataBuffer
.mapFile(TEMP_FILE, true, FILE_OFFSET, LARGE_BUFFER_SIZE, PinotDataBuffer.NATIVE_ORDER, null)) {
- Assert.assertTrue(buffer3 instanceof PinotNativeOrderLBuffer);
+ Assert.assertSame(buffer3.order(), PinotDataBuffer.NATIVE_ORDER);
testBufferStats(2, 2 * LARGE_BUFFER_SIZE, 1, LARGE_BUFFER_SIZE);
}
testBufferStats(2, 2 * LARGE_BUFFER_SIZE, 0, 0);
@@ -603,15 +313,15 @@ public void testConstructors()
randomAccessFile.setLength(FILE_OFFSET + LARGE_BUFFER_SIZE);
try (PinotDataBuffer buffer1 = PinotDataBuffer
.allocateDirect(LARGE_BUFFER_SIZE, PinotDataBuffer.NON_NATIVE_ORDER, null)) {
- Assert.assertTrue(buffer1 instanceof PinotNonNativeOrderLBuffer);
+ Assert.assertSame(buffer1.order(), PinotDataBuffer.NON_NATIVE_ORDER);
testBufferStats(1, LARGE_BUFFER_SIZE, 0, 0);
try (PinotDataBuffer buffer2 = PinotDataBuffer
.loadFile(TEMP_FILE, FILE_OFFSET, LARGE_BUFFER_SIZE, PinotDataBuffer.NON_NATIVE_ORDER, null)) {
- Assert.assertTrue(buffer2 instanceof PinotNonNativeOrderLBuffer);
+ Assert.assertSame(buffer2.order(), PinotDataBuffer.NON_NATIVE_ORDER);
testBufferStats(2, 2 * LARGE_BUFFER_SIZE, 0, 0);
try (PinotDataBuffer buffer3 = PinotDataBuffer
.mapFile(TEMP_FILE, true, FILE_OFFSET, LARGE_BUFFER_SIZE, PinotDataBuffer.NON_NATIVE_ORDER, null)) {
- Assert.assertTrue(buffer3 instanceof PinotNonNativeOrderLBuffer);
+ Assert.assertSame(buffer3.order(), PinotDataBuffer.NON_NATIVE_ORDER);
testBufferStats(2, 2 * LARGE_BUFFER_SIZE, 1, LARGE_BUFFER_SIZE);
}
testBufferStats(2, 2 * LARGE_BUFFER_SIZE, 0, 0);
@@ -623,19 +333,4 @@ public void testConstructors()
FileUtils.forceDelete(TEMP_FILE);
}
}
-
- private void testBufferStats(int directBufferCount, long directBufferUsage, int mmapBufferCount,
- long mmapBufferUsage) {
- Assert.assertEquals(PinotDataBuffer.getAllocationFailureCount(), 0);
- Assert.assertEquals(PinotDataBuffer.getDirectBufferCount(), directBufferCount);
- Assert.assertEquals(PinotDataBuffer.getDirectBufferUsage(), directBufferUsage);
- Assert.assertEquals(PinotDataBuffer.getMmapBufferCount(), mmapBufferCount);
- Assert.assertEquals(PinotDataBuffer.getMmapBufferUsage(), mmapBufferUsage);
- Assert.assertEquals(PinotDataBuffer.getBufferInfo().size(), directBufferCount + mmapBufferCount);
- }
-
- @AfterClass
- public void tearDown() {
- EXECUTOR_SERVICE.shutdown();
- }
}
diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferTestBase.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferTestBase.java
new file mode 100644
index 000000000000..0a3f40126755
--- /dev/null
+++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferTestBase.java
@@ -0,0 +1,88 @@
+package org.apache.pinot.segment.spi.memory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Random;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.commons.io.FileUtils;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeClass;
+
+
+public class PinotDataBufferTestBase {
+
+ protected static final Random RANDOM = new Random();
+ protected static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(10);
+ protected static final File TEMP_FILE = new File(FileUtils.getTempDirectory(), "PinotDataBufferTest");
+ protected static final int FILE_OFFSET = 10; // Not page-aligned
+ protected static final int BUFFER_SIZE = 10_000; // Not page-aligned
+ protected static final int CHAR_ARRAY_LENGTH = BUFFER_SIZE / Character.BYTES;
+ protected static final int SHORT_ARRAY_LENGTH = BUFFER_SIZE / Short.BYTES;
+ protected static final int INT_ARRAY_LENGTH = BUFFER_SIZE / Integer.BYTES;
+ protected static final int LONG_ARRAY_LENGTH = BUFFER_SIZE / Long.BYTES;
+ protected static final int FLOAT_ARRAY_LENGTH = BUFFER_SIZE / Float.BYTES;
+ protected static final int DOUBLE_ARRAY_LENGTH = BUFFER_SIZE / Double.BYTES;
+ protected static final int NUM_ROUNDS = 1000;
+ protected static final int MAX_BYTES_LENGTH = 100;
+ protected static final long LARGE_BUFFER_SIZE = Integer.MAX_VALUE + 2L; // Not page-aligned
+
+ protected byte[] _bytes = new byte[BUFFER_SIZE];
+ protected char[] _chars = new char[CHAR_ARRAY_LENGTH];
+ protected short[] _shorts = new short[SHORT_ARRAY_LENGTH];
+ protected int[] _ints = new int[INT_ARRAY_LENGTH];
+ protected long[] _longs = new long[LONG_ARRAY_LENGTH];
+ protected float[] _floats = new float[FLOAT_ARRAY_LENGTH];
+ protected double[] _doubles = new double[DOUBLE_ARRAY_LENGTH];
+
+ @BeforeClass
+ public void setUp() {
+ for (int i = 0; i < BUFFER_SIZE; i++) {
+ _bytes[i] = (byte) RANDOM.nextInt();
+ }
+ for (int i = 0; i < CHAR_ARRAY_LENGTH; i++) {
+ _chars[i] = (char) RANDOM.nextInt();
+ }
+ for (int i = 0; i < SHORT_ARRAY_LENGTH; i++) {
+ _shorts[i] = (short) RANDOM.nextInt();
+ }
+ for (int i = 0; i < INT_ARRAY_LENGTH; i++) {
+ _ints[i] = RANDOM.nextInt();
+ }
+ for (int i = 0; i < LONG_ARRAY_LENGTH; i++) {
+ _longs[i] = RANDOM.nextLong();
+ }
+ for (int i = 0; i < FLOAT_ARRAY_LENGTH; i++) {
+ _floats[i] = RANDOM.nextFloat();
+ }
+ for (int i = 0; i < DOUBLE_ARRAY_LENGTH; i++) {
+ _doubles[i] = RANDOM.nextDouble();
+ }
+ }
+
+ protected void testBufferStats(int directBufferCount, long directBufferUsage, int mmapBufferCount,
+ long mmapBufferUsage) {
+ Assert.assertEquals(PinotDataBuffer.getAllocationFailureCount(), 0);
+ Assert.assertEquals(PinotDataBuffer.getDirectBufferCount(), directBufferCount);
+ Assert.assertEquals(PinotDataBuffer.getDirectBufferUsage(), directBufferUsage);
+ Assert.assertEquals(PinotDataBuffer.getMmapBufferCount(), mmapBufferCount);
+ Assert.assertEquals(PinotDataBuffer.getMmapBufferUsage(), mmapBufferUsage);
+ Assert.assertEquals(PinotDataBuffer.getBufferInfo().size(), directBufferCount + mmapBufferCount);
+ }
+
+ @AfterTest
+ public void deleteFileIfExists()
+ throws IOException {
+ if (TEMP_FILE.exists()) {
+ FileUtils.forceDelete(TEMP_FILE);
+ }
+ }
+
+ @AfterClass
+ public void tearDown() {
+ EXECUTOR_SERVICE.shutdown();
+ }
+}
diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotLArrayByteBufferTest.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotLArrayByteBufferTest.java
new file mode 100644
index 000000000000..efca7eeea380
--- /dev/null
+++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotLArrayByteBufferTest.java
@@ -0,0 +1,19 @@
+package org.apache.pinot.segment.spi.memory;
+
+import net.openhft.chronicle.core.Jvm;
+import org.testng.SkipException;
+import org.testng.annotations.BeforeClass;
+
+
+public class PinotLArrayByteBufferTest extends PinotDataBufferInstanceTestBase {
+ public PinotLArrayByteBufferTest() {
+ super(new LArrayPinotBufferFactory());
+ }
+
+ @BeforeClass
+ public void abortOnModernJava() {
+ if (Jvm.majorVersion() > 11) {
+// throw new SkipException("Skipping LArray tests because they cannot run in Java " + Jvm.majorVersion());
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index 0a937a5abd6e..7275856101c7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -109,7 +109,7 @@
${basedir}
dev
- 11
+ 17
UTF-8
true