From bfe73852037fe4057a226813cb019b29b854ea22 Mon Sep 17 00:00:00 2001 From: Gonzalo Ortiz Date: Tue, 22 Nov 2022 17:28:27 +0100 Subject: [PATCH 1/2] Add chronicle bytes alternative to LArray. Still pending to add benchmarks and tests. --- pinot-segment-spi/pom.xml | 5 + .../segment/spi/memory/BasePinotLBuffer.java | 5 +- .../memory/ByteBufferPinotBufferFactory.java | 50 ++++ .../segment/spi/memory/ByteBufferUtil.java | 52 ++++ .../spi/memory/LArrayPinotBufferFactory.java | 36 +++ .../spi/memory/NonNativePinotDataBuffer.java | 253 ++++++++++++++++++ .../memory/OnlyNativePinotBufferFactory.java | 46 ++++ .../spi/memory/PinotBufferFactory.java | 39 +++ .../segment/spi/memory/PinotDataBuffer.java | 202 +++++++++++--- .../SmallWithFallbackPinotBufferFactory.java | 72 +++++ .../memory/chronicle/ChronicleDataBuffer.java | 217 +++++++++++++++ .../ChroniclePinotBufferFactory.java | 48 ++++ .../spi/memory/PinotDataBufferTest.java | 6 +- 13 files changed, 982 insertions(+), 49 deletions(-) create mode 100644 pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/ByteBufferPinotBufferFactory.java create mode 100644 pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/ByteBufferUtil.java create mode 100644 pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/LArrayPinotBufferFactory.java create mode 100644 pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/NonNativePinotDataBuffer.java create mode 100644 pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/OnlyNativePinotBufferFactory.java create mode 100644 pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotBufferFactory.java create mode 100644 pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/SmallWithFallbackPinotBufferFactory.java create mode 100644 pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/chronicle/ChronicleDataBuffer.java create mode 100644 pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/chronicle/ChroniclePinotBufferFactory.java diff --git a/pinot-segment-spi/pom.xml b/pinot-segment-spi/pom.xml index d7fc083c0ea5..33b3283c80aa 100644 --- a/pinot-segment-spi/pom.xml +++ b/pinot-segment-spi/pom.xml @@ -76,6 +76,11 @@ org.xerial.larray larray-mmap + + net.openhft + chronicle-bytes + 2.23.31 + diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/BasePinotLBuffer.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/BasePinotLBuffer.java index 115212db6916..3cf9c7710216 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/BasePinotLBuffer.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/BasePinotLBuffer.java @@ -130,10 +130,11 @@ public long size() { @Override public PinotDataBuffer view(long start, long end, ByteOrder byteOrder) { + PinotNativeOrderLBuffer view = new PinotNativeOrderLBuffer(_buffer.view(start, end), false, false); if (byteOrder == NATIVE_ORDER) { - return new PinotNativeOrderLBuffer(_buffer.view(start, end), false, false); + return view; } else { - return new PinotNonNativeOrderLBuffer(_buffer.view(start, end), false, false); + return new NonNativePinotDataBuffer(view); } } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/ByteBufferPinotBufferFactory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/ByteBufferPinotBufferFactory.java new file mode 100644 index 000000000000..f6d6bd0cf285 --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/ByteBufferPinotBufferFactory.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.memory; + +import com.google.common.base.Preconditions; +import java.io.File; +import java.io.IOException; +import java.nio.ByteOrder; + + +public class ByteBufferPinotBufferFactory implements PinotBufferFactory { + @Override + public PinotDataBuffer allocateDirect(long size, ByteOrder byteOrder) { + Preconditions.checkArgument(size <= Integer.MAX_VALUE, + "Trying to allocate {} bytes when max is {}", size, Integer.MAX_VALUE); + return PinotByteBuffer.allocateDirect((int) size, byteOrder); + } + + @Override + public PinotDataBuffer readFile(File file, long offset, long size, ByteOrder byteOrder) + throws IOException { + Preconditions.checkArgument(size <= Integer.MAX_VALUE, + "Trying to allocate {} bytes when max is {}", size, Integer.MAX_VALUE); + return PinotByteBuffer.loadFile(file, offset, (int) size, byteOrder); + } + + @Override + public PinotDataBuffer mapFile(File file, boolean readOnly, long offset, long size, ByteOrder byteOrder) + throws IOException { + Preconditions.checkArgument(size <= Integer.MAX_VALUE, + "Trying to allocate {} bytes when max is {}", size, Integer.MAX_VALUE); + return PinotByteBuffer.mapFile(file, readOnly, offset, (int) size, byteOrder); + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/ByteBufferUtil.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/ByteBufferUtil.java new file mode 100644 index 000000000000..70c90f68cd1c --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/ByteBufferUtil.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.memory; + +import java.lang.reflect.Constructor; +import java.nio.ByteBuffer; + + +public class ByteBufferUtil { + + private static final Constructor _dbbCC = findDirectByteBufferConstructor(); + + private ByteBufferUtil() { + } + + public static ByteBuffer newDirectByteBuffer(long addr, int size, Object att) { + _dbbCC.setAccessible(true); + try { + return _dbbCC.newInstance(Long.valueOf(addr), Integer.valueOf(size), att); + } catch (Exception e) { + throw new IllegalStateException("Failed to create DirectByteBuffer", e); + } + } + + @SuppressWarnings("unchecked") + private static Constructor findDirectByteBufferConstructor() { + try { + return (Constructor) Class.forName("java.nio.DirectByteBuffer") + .getDeclaredConstructor(Long.TYPE, Integer.TYPE, Object.class); + } catch (ClassNotFoundException e) { + throw new IllegalStateException("Failed to find java.nio.DirectByteBuffer", e); + } catch (NoSuchMethodException e) { + throw new IllegalStateException("Failed to find constructor f java.nio.DirectByteBuffer", e); + } + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/LArrayPinotBufferFactory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/LArrayPinotBufferFactory.java new file mode 100644 index 000000000000..5f5c407aacb7 --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/LArrayPinotBufferFactory.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.memory; + +import java.io.File; +import java.io.IOException; + + +public class LArrayPinotBufferFactory extends OnlyNativePinotBufferFactory { + @Override + protected PinotDataBuffer allocateDirect(long size) { + return PinotNativeOrderLBuffer.allocateDirect(size); + } + + @Override + protected PinotDataBuffer mapFile(File file, boolean readOnly, long offset, long size) + throws IOException { + return PinotNativeOrderLBuffer.mapFile(file, readOnly, offset, size); + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/NonNativePinotDataBuffer.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/NonNativePinotDataBuffer.java new file mode 100644 index 000000000000..755345666f65 --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/NonNativePinotDataBuffer.java @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.memory; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + + +public class NonNativePinotDataBuffer extends PinotDataBuffer { + private final PinotDataBuffer _nativeBuffer; + + public NonNativePinotDataBuffer(PinotDataBuffer nativeBuffer) { + super(nativeBuffer.isCloseable()); + _nativeBuffer = nativeBuffer; + } + + @Override + public PinotDataBuffer view(long start, long end, ByteOrder byteOrder) { + PinotDataBuffer nativeView = _nativeBuffer.view(start, end); + if (byteOrder == ByteOrder.nativeOrder()) { + return nativeView; + } + return new NonNativePinotDataBuffer(nativeView); + } + + /* + Methods that require special byte order treatment + */ + + @Override + public char getChar(int offset) { + return Character.reverseBytes(_nativeBuffer.getChar(offset)); + } + + @Override + public char getChar(long offset) { + return Character.reverseBytes(_nativeBuffer.getChar(offset)); + } + + @Override + public void putChar(int offset, char value) { + _nativeBuffer.putChar(offset, Character.reverseBytes(value)); + } + + @Override + public void putChar(long offset, char value) { + _nativeBuffer.putChar(offset, Character.reverseBytes(value)); + } + + @Override + public short getShort(int offset) { + return Short.reverseBytes(_nativeBuffer.getShort(offset)); + } + + @Override + public short getShort(long offset) { + return Short.reverseBytes(_nativeBuffer.getShort(offset)); + } + + @Override + public void putShort(int offset, short value) { + _nativeBuffer.putShort(offset, Short.reverseBytes(value)); + } + + @Override + public void putShort(long offset, short value) { + _nativeBuffer.putShort(offset, Short.reverseBytes(value)); + } + + @Override + public int getInt(int offset) { + return Integer.reverseBytes(_nativeBuffer.getInt(offset)); + } + + @Override + public int getInt(long offset) { + return Integer.reverseBytes(_nativeBuffer.getInt(offset)); + } + + @Override + public void putInt(int offset, int value) { + _nativeBuffer.putInt(offset, Integer.reverseBytes(value)); + } + + @Override + public void putInt(long offset, int value) { + _nativeBuffer.putInt(offset, Integer.reverseBytes(value)); + } + + @Override + public long getLong(int offset) { + return Long.reverseBytes(_nativeBuffer.getLong(offset)); + } + + @Override + public long getLong(long offset) { + return Long.reverseBytes(_nativeBuffer.getLong(offset)); + } + + @Override + public void putLong(int offset, long value) { + _nativeBuffer.putLong(offset, Long.reverseBytes(value)); + } + + @Override + public void putLong(long offset, long value) { + _nativeBuffer.putLong(offset, Long.reverseBytes(value)); + } + + @Override + public float getFloat(int offset) { + return Float.intBitsToFloat(Integer.reverseBytes(_nativeBuffer.getInt(offset))); + } + + @Override + public float getFloat(long offset) { + return Float.intBitsToFloat(Integer.reverseBytes(_nativeBuffer.getInt(offset))); + } + + @Override + public void putFloat(int offset, float value) { + _nativeBuffer.putInt(offset, Integer.reverseBytes(Float.floatToRawIntBits(value))); + } + + @Override + public void putFloat(long offset, float value) { + _nativeBuffer.putInt(offset, Integer.reverseBytes(Float.floatToRawIntBits(value))); + } + + @Override + public double getDouble(int offset) { + return Double.longBitsToDouble(Long.reverseBytes(_nativeBuffer.getLong(offset))); + } + + @Override + public double getDouble(long offset) { + return Double.longBitsToDouble(Long.reverseBytes(_nativeBuffer.getLong(offset))); + } + + @Override + public void putDouble(int offset, double value) { + _nativeBuffer.putLong(offset, Long.reverseBytes(Double.doubleToRawLongBits(value))); + } + + @Override + public void putDouble(long offset, double value) { + _nativeBuffer.putLong(offset, Long.reverseBytes(Double.doubleToRawLongBits(value))); + } + + @Override + public ByteOrder order() { + return NON_NATIVE_ORDER; + } + + /* + Methods that can be directly delegated on the native buffer + */ + + @Override + public byte getByte(int offset) { + return _nativeBuffer.getByte(offset); + } + + @Override + public byte getByte(long offset) { + return _nativeBuffer.getByte(offset); + } + + @Override + public void putByte(int offset, byte value) { + _nativeBuffer.putByte(offset, value); + } + + @Override + public void putByte(long offset, byte value) { + _nativeBuffer.putByte(offset, value); + } + + @Override + public void copyTo(long offset, byte[] buffer, int destOffset, int size) { + _nativeBuffer.copyTo(offset, buffer, destOffset, size); + } + + @Override + public void copyTo(long offset, byte[] buffer) { + _nativeBuffer.copyTo(offset, buffer); + } + + @Override + public void copyTo(long offset, PinotDataBuffer buffer, long destOffset, long size) { + _nativeBuffer.copyTo(offset, buffer, destOffset, size); + } + + @Override + public void readFrom(long offset, byte[] buffer, int srcOffset, int size) { + _nativeBuffer.readFrom(offset, buffer, srcOffset, size); + } + + @Override + public void readFrom(long offset, byte[] buffer) { + _nativeBuffer.readFrom(offset, buffer); + } + + @Override + public void readFrom(long offset, ByteBuffer buffer) { + _nativeBuffer.readFrom(offset, buffer); + } + + @Override + public void readFrom(long offset, File file, long srcOffset, long size) + throws IOException { + _nativeBuffer.readFrom(offset, file, srcOffset, size); + } + + @Override + public ByteBuffer toDirectByteBuffer(long offset, int size, ByteOrder byteOrder) { + return _nativeBuffer.toDirectByteBuffer(offset, size, byteOrder); + } + + @Override + public long size() { + return _nativeBuffer.size(); + } + + @Override + public void flush() { + _nativeBuffer.flush(); + } + + @Override + public void release() + throws IOException { + _nativeBuffer.release(); + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/OnlyNativePinotBufferFactory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/OnlyNativePinotBufferFactory.java new file mode 100644 index 000000000000..2bde4968a767 --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/OnlyNativePinotBufferFactory.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.memory; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteOrder; + + +public abstract class OnlyNativePinotBufferFactory implements PinotBufferFactory { + + @Override + public PinotDataBuffer allocateDirect(long size, ByteOrder byteOrder) { + return correctOrder(allocateDirect(size), byteOrder); + } + + protected abstract PinotDataBuffer allocateDirect(long size); + + @Override + public PinotDataBuffer mapFile(File file, boolean readOnly, long offset, long size, ByteOrder byteOrder) + throws IOException { + return correctOrder(mapFile(file, readOnly, offset, size), byteOrder); + } + + protected abstract PinotDataBuffer mapFile(File file, boolean readOnly, long offset, long size) throws IOException; + + private PinotDataBuffer correctOrder(PinotDataBuffer nativeBuffer, ByteOrder byteOrder) { + return byteOrder == ByteOrder.nativeOrder() ? nativeBuffer : new NonNativePinotDataBuffer(nativeBuffer); + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotBufferFactory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotBufferFactory.java new file mode 100644 index 000000000000..873a21516db5 --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotBufferFactory.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.memory; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteOrder; + + +public interface PinotBufferFactory { + + PinotDataBuffer allocateDirect(long size, ByteOrder byteOrder); + + default PinotDataBuffer readFile(File file, long offset, long size, ByteOrder byteOrder) + throws IOException { + PinotDataBuffer buffer = allocateDirect(size, byteOrder); + buffer.readFrom(0, file, offset, size); + return buffer; + } + + PinotDataBuffer mapFile(File file, boolean readOnly, long offset, long size, ByteOrder byteOrder) + throws IOException; +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotDataBuffer.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotDataBuffer.java index 7a2a494732b4..3514fb1312b8 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotDataBuffer.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotDataBuffer.java @@ -31,6 +31,9 @@ import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; +import net.openhft.chronicle.core.Jvm; +import org.apache.pinot.segment.spi.memory.chronicle.ChroniclePinotBufferFactory; +import org.apache.pinot.spi.env.PinotConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,6 +102,52 @@ public String toString() { private static final AtomicLong ALLOCATION_FAILURE_COUNT = new AtomicLong(); private static final Map BUFFER_CONTEXT_MAP = new WeakHashMap<>(); + private static final String OFFHEAP_BUFFER_FACTORY_CONFIG = "pinot.offheap.buffer.factory"; + private static final String OFFHEAP_BUFFER_PRIORITIZE_BYTE_BUFFER_CONFIG = "pinot.offheap.prioritize.bytebuffer"; + + private static volatile PinotBufferFactory _factory = createDefaultFactory(); + + public static void useFactory(PinotBufferFactory factory) { + _factory = factory; + } + + private static PinotBufferFactory createDefaultFactory() { + String factoryClassName; + // TODO: If chronicle is going to be in their own package, use another way to get the runtime version + if (Jvm.majorVersion() > 11) { + factoryClassName = ChroniclePinotBufferFactory.class.getCanonicalName(); + } else { + factoryClassName = LArrayPinotBufferFactory.class.getCanonicalName(); + } + return createFactory(factoryClassName, true); + } + + private static PinotBufferFactory createFactory(String factoryClassName, boolean prioritizeByteBuffer) { + try { + LOGGER.info("Instantiating Pinot buffer factory class {}", factoryClassName); + PinotBufferFactory factory = (PinotBufferFactory) Class.forName(factoryClassName).getConstructor().newInstance(); + + if (prioritizeByteBuffer) { + factory = new SmallWithFallbackPinotBufferFactory(new ByteBufferPinotBufferFactory(), factory); + } + + return factory; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public static void loadFactory(PinotConfiguration configuration) { + boolean prioritizeByteBuffer = configuration.getProperty(OFFHEAP_BUFFER_PRIORITIZE_BYTE_BUFFER_CONFIG, true); + String factoryClassName = configuration.getProperty(OFFHEAP_BUFFER_FACTORY_CONFIG); + if (factoryClassName != null) { + _factory = createFactory(factoryClassName, prioritizeByteBuffer); + } else { + LOGGER.info("No custom Pinot buffer factory class found in configuration. Using default factory"); + _factory = createDefaultFactory(); + } + } + /** * Allocates a buffer using direct memory. *

NOTE: The contents of the allocated buffer are not defined. @@ -111,19 +160,10 @@ public String toString() { public static PinotDataBuffer allocateDirect(long size, ByteOrder byteOrder, @Nullable String description) { PinotDataBuffer buffer; try { - if (size <= Integer.MAX_VALUE) { - buffer = PinotByteBuffer.allocateDirect((int) size, byteOrder); - } else { - if (byteOrder == NATIVE_ORDER) { - buffer = PinotNativeOrderLBuffer.allocateDirect(size); - } else { - buffer = PinotNonNativeOrderLBuffer.allocateDirect(size); - } - } + buffer = _factory.allocateDirect(size, byteOrder); } catch (Exception e) { - LOGGER - .error("Caught exception while allocating direct buffer of size: {} with description: {}", size, description, - e); + LOGGER.error("Caught exception while allocating direct buffer of size: {} with description: {}", size, + description, e); LOGGER.error("Buffer stats: {}", getBufferStats()); ALLOCATION_FAILURE_COUNT.getAndIncrement(); throw e; @@ -144,15 +184,7 @@ public static PinotDataBuffer loadFile(File file, long offset, long size, ByteOr throws IOException { PinotDataBuffer buffer; try { - if (size <= Integer.MAX_VALUE) { - buffer = PinotByteBuffer.loadFile(file, offset, (int) size, byteOrder); - } else { - if (byteOrder == NATIVE_ORDER) { - buffer = PinotNativeOrderLBuffer.loadFile(file, offset, size); - } else { - buffer = PinotNonNativeOrderLBuffer.loadFile(file, offset, size); - } - } + buffer = _factory.readFile(file, offset, size, byteOrder); } catch (Exception e) { LOGGER.error("Caught exception while loading file: {} from offset: {} of size: {} with description: {}", file.getAbsolutePath(), offset, size, description, e); @@ -187,15 +219,7 @@ public static PinotDataBuffer mapFile(File file, boolean readOnly, long offset, throws IOException { PinotDataBuffer buffer; try { - if (size <= Integer.MAX_VALUE) { - buffer = PinotByteBuffer.mapFile(file, readOnly, offset, (int) size, byteOrder); - } else { - if (byteOrder == NATIVE_ORDER) { - buffer = PinotNativeOrderLBuffer.mapFile(file, readOnly, offset, size); - } else { - buffer = PinotNonNativeOrderLBuffer.mapFile(file, readOnly, offset, size); - } - } + buffer = _factory.mapFile(file, readOnly, offset, size, byteOrder); } catch (Exception e) { LOGGER.error("Caught exception while mapping file: {} from offset: {} of size: {} with description: {}", file.getAbsolutePath(), offset, size, description, e); @@ -286,70 +310,115 @@ public synchronized void close() } } - public abstract byte getByte(int offset); + public byte getByte(int offset) { + return getByte((long) offset); + } public abstract byte getByte(long offset); - public abstract void putByte(int offset, byte value); + public void putByte(int offset, byte value) { + putByte((long) offset, value); + } public abstract void putByte(long offset, byte value); - public abstract char getChar(int offset); + public char getChar(int offset) { + return getChar((long) offset); + } public abstract char getChar(long offset); - public abstract void putChar(int offset, char value); + public void putChar(int offset, char value) { + putChar((long) offset, value); + } public abstract void putChar(long offset, char value); - public abstract short getShort(int offset); + public short getShort(int offset) { + return getShort((long) offset); + } public abstract short getShort(long offset); - public abstract void putShort(int offset, short value); + public void putShort(int offset, short value) { + putShort((long) offset, value); + } public abstract void putShort(long offset, short value); - public abstract int getInt(int offset); + public int getInt(int offset) { + return getInt((long) offset); + } public abstract int getInt(long offset); - public abstract void putInt(int offset, int value); + public void putInt(int offset, int value) { + putInt((long) offset, value); + } public abstract void putInt(long offset, int value); - public abstract long getLong(int offset); + public long getLong(int offset) { + return getLong((long) offset); + } public abstract long getLong(long offset); - public abstract void putLong(int offset, long value); + public void putLong(int offset, long value) { + putLong((long) offset, value); + } public abstract void putLong(long offset, long value); - public abstract float getFloat(int offset); + public float getFloat(int offset) { + return getFloat((long) offset); + } public abstract float getFloat(long offset); - public abstract void putFloat(int offset, float value); + public void putFloat(int offset, float value) { + putFloat((long) offset, value); + } public abstract void putFloat(long offset, float value); - public abstract double getDouble(int offset); + public double getDouble(int offset) { + return getDouble((long) offset); + } public abstract double getDouble(long offset); - public abstract void putDouble(int offset, double value); + public void putDouble(int offset, double value) { + putDouble((long) offset, value); + } public abstract void putDouble(long offset, double value); + /** + * Given an array of bytes, copies the content of this object into the array of bytes. + * The first byte to be copied is the one that could be read with {@code this.getByte(offset)} + */ public abstract void copyTo(long offset, byte[] buffer, int destOffset, int size); + /** + * Given an array of bytes, copies the content of this object into the array of bytes. + * The first byte to be copied is the one that could be read with {@code this.getByte(offset)} + */ public void copyTo(long offset, byte[] buffer) { copyTo(offset, buffer, 0, buffer.length); } + /** + * Note: It is the responsibility of the caller to make sure arguments are checked before the methods are called. + * While some rudimentary checks are performed on the input, the checks are best effort and when performance is an + * overriding priority, as when methods of this class are optimized by the runtime compiler, some or all checks + * (if any) may be elided. Hence, the caller must not rely on the checks and corresponding exceptions! + */ public abstract void copyTo(long offset, PinotDataBuffer buffer, long destOffset, long size); + /** + * Given an array of bytes, writes the content in the specified position. + */ public abstract void readFrom(long offset, byte[] buffer, int srcOffset, int size); public void readFrom(long offset, byte[] buffer) { @@ -379,8 +448,49 @@ public PinotDataBuffer view(long start, long end) { return view(start, end, order()); } + /** + * Returns an ByteBuffer with the same content of this buffer. + * + * This receiver object and the returned ByteBuffer share the same memory address, but the receiver conserves the + * ownership. This means that: + *

    + *
  1. The returned ByteBuffer should not be released (aka freed in C). For example, its cleaner should not be + * called. Violations of this rule may produce segmentation faults
  2. + *
  3. The returned ByteBuffer should not be used once the receiver is released. + * Violations of this rule may produce segmentation faults
  4. + *
  5. A write made by either the receiver or the returned ByteBuffer will be seen by the other.
  6. + *
+ * + * Depending on the implementation, this may be a view (and therefore changes on any buffer will be seen by the other) + * or a copy (in which case the cost will be higher, but each copy will have their own lifecycle). + * + * @param byteOrder The byte order of the returned ByteBuffer. No special treatment is done if the order of the + * receiver buffer is different from the order requested. In other words: if this buffer was written + * in big endian and the direct buffer is requested in little endian, the integers read from each + * buffer will be different. + */ public abstract ByteBuffer toDirectByteBuffer(long offset, int size, ByteOrder byteOrder); + /** + * Returns an ByteBuffer with the same content of this buffer. + * + * This receiver object and the returned ByteBuffer share the same memory address, but the receiver conserves the + * ownership. This means that: + *
    + *
  1. The returned ByteBuffer should not be released (aka freed in C). For example, its cleaner should not be + * called. Violations of this rule may produce segmentation faults
  2. + *
  3. The returned ByteBuffer should not be used once the receiver is released. + * Violations of this rule may produce segmentation faults
  4. + *
  5. A write made by either the receiver or the returned ByteBuffer will be seen by the other.
  6. + *
+ * + * Depending on the implementation, this may be a view (and therefore changes on any buffer will be seen by the other) + * or a copy (in which case the cost will be higher, but each copy will have their own lifecycle). + * + */ + // TODO: Most calls to this method are just used to then read the content of the buffer. + // This is unnecessary an generates 2-5 unnecessary objects. We should benchmark whether there is some advantage on + // transforming this buffer into a IntBuffer/LongBuffer/etc when reading sequentially public ByteBuffer toDirectByteBuffer(long offset, int size) { return toDirectByteBuffer(offset, size, order()); } @@ -389,4 +499,8 @@ public ByteBuffer toDirectByteBuffer(long offset, int size) { public abstract void release() throws IOException; + + public boolean isCloseable() { + return _closeable; + } } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/SmallWithFallbackPinotBufferFactory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/SmallWithFallbackPinotBufferFactory.java new file mode 100644 index 000000000000..c16cd4f89f9d --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/SmallWithFallbackPinotBufferFactory.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.memory; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteOrder; + + +/** + * A factory that receives two delegates, using one when the requested buffer can be indexes with integers and the other + * in the other case. + * + * This is commonly used to use ByteBuffers when possible. The utility of that is questionable, as it can increase the + * number of megamorphic calls in the hot path and also make errors related with -XX:MaxDirectMemorySize more + * indeterministic. But it is also the default behavior of Pinot, so it is kept as the default for compatibility + * reasons. + */ +public class SmallWithFallbackPinotBufferFactory implements PinotBufferFactory { + private final PinotBufferFactory _small; + private final PinotBufferFactory _fallback; + + public SmallWithFallbackPinotBufferFactory(PinotBufferFactory small, PinotBufferFactory fallback) { + _small = small; + _fallback = fallback; + } + + @Override + public PinotDataBuffer allocateDirect(long size, ByteOrder byteOrder) { + if (size <= Integer.MAX_VALUE) { + return _small.allocateDirect(size, byteOrder); + } else { + return _fallback.allocateDirect(size, byteOrder); + } + } + + @Override + public PinotDataBuffer readFile(File file, long offset, long size, ByteOrder byteOrder) + throws IOException { + if (size <= Integer.MAX_VALUE) { + return _small.readFile(file, offset, size, byteOrder); + } else { + return _fallback.readFile(file, offset, size, byteOrder); + } + } + + @Override + public PinotDataBuffer mapFile(File file, boolean readOnly, long offset, long size, ByteOrder byteOrder) + throws IOException { + if (size <= Integer.MAX_VALUE) { + return _small.mapFile(file, readOnly, offset, size, byteOrder); + } else { + return _fallback.mapFile(file, readOnly, offset, size, byteOrder); + } + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/chronicle/ChronicleDataBuffer.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/chronicle/ChronicleDataBuffer.java new file mode 100644 index 000000000000..5fcd3cdca220 --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/chronicle/ChronicleDataBuffer.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.memory.chronicle; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; +import net.openhft.chronicle.bytes.Bytes; +import net.openhft.chronicle.bytes.BytesStore; +import net.openhft.chronicle.bytes.MappedBytesStore; +import org.apache.pinot.segment.spi.memory.ByteBufferUtil; +import org.apache.pinot.segment.spi.memory.NonNativePinotDataBuffer; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; + + +public class ChronicleDataBuffer extends PinotDataBuffer { + + private final BytesStore _store; + private final boolean _flushable; + + public ChronicleDataBuffer(BytesStore store, boolean closeable, boolean flushable) { + super(closeable); + _store = store; + _flushable = flushable; + } + + @Override + public byte getByte(long offset) { + return _store.readByte(offset); + } + + @Override + public void putByte(long offset, byte value) { + _store.writeByte(offset, value); + } + + @Override + public char getChar(long offset) { + return (char) _store.readUnsignedShort(offset); + } + + @Override + public void putChar(long offset, char value) { + _store.writeUnsignedShort(offset, value); + } + + @Override + public short getShort(long offset) { + return _store.readShort(offset); + } + + @Override + public void putShort(long offset, short value) { + _store.writeShort(offset, value); + } + + @Override + public int getInt(long offset) { + return _store.readInt(offset); + } + + @Override + public void putInt(long offset, int value) { + _store.writeInt(offset, value); + } + + @Override + public long getLong(long offset) { + return _store.readLong(offset); + } + + @Override + public void putLong(long offset, long value) { + _store.writeLong(offset, value); + } + + @Override + public float getFloat(long offset) { + return _store.readFloat(offset); + } + + @Override + public void putFloat(long offset, float value) { + _store.writeFloat(offset, value); + } + + @Override + public double getDouble(long offset) { + return _store.readDouble(offset); + } + + @Override + public void putDouble(long offset, double value) { + _store.writeDouble(offset, value); + } + + @Override + public void copyTo(long offset, byte[] buffer, int destOffset, int size) { + Bytes dest = Bytes.wrapForWrite(buffer); + dest.writePosition(destOffset); + dest.writeLimit(destOffset + size); + _store.copyTo(dest); + } + + @Override + public void copyTo(long offset, PinotDataBuffer buffer, long destOffset, long size) { + long actualSize = Math.min(size, buffer.size() - destOffset); + actualSize = Math.min(actualSize, _store.length() - offset); + + if (buffer instanceof ChronicleDataBuffer) { + ChronicleDataBuffer other = (ChronicleDataBuffer) buffer; + BytesStore dest = other._store.subBytes(destOffset, size); + _store.copyTo(dest); + } else { + long read = 0; + byte[] arr = new byte[4096]; + while (read < actualSize) { + long offsetOnStore = offset + read; + int bytesToRead = Math.min((int) (actualSize - read), 4096); + long inc = _store.read(offsetOnStore, arr, 0, bytesToRead); + assert inc == bytesToRead; + buffer.readFrom(offsetOnStore, arr, 0, bytesToRead); + read += inc; + } + } + } + + @Override + public void readFrom(long offset, byte[] buffer, int srcOffset, int size) { + _store.write(offset, buffer, srcOffset, size); + } + + @Override + public void readFrom(long offset, ByteBuffer buffer) { + _store.write(offset, buffer, buffer.position(), buffer.remaining()); + } + + @Override + public void readFrom(long offset, File file, long srcOffset, long size) + throws IOException { + try (FileInputStream fos = new FileInputStream(file); FileChannel channel = fos.getChannel()) { + channel.position(offset); + + int len; + byte[] buffer = new byte[4096]; + long offsetOnStore = offset; + while ((len = fos.read(buffer)) > 0) { + _store.write(offsetOnStore, buffer, 0, len); + offsetOnStore += len; + } + } + } + + @Override + public long size() { + return _store.capacity(); + } + + @Override + public ByteOrder order() { + return _store.byteOrder(); + } + + @Override + public PinotDataBuffer view(long start, long end, ByteOrder byteOrder) { + ChronicleDataBuffer buffer = new ChronicleDataBuffer(_store.subBytes(start, end - start), false, false); + + return byteOrder == ByteOrder.nativeOrder() ? buffer : new NonNativePinotDataBuffer(buffer); + } + + @Override + public ByteBuffer toDirectByteBuffer(long offset, int size, ByteOrder byteOrder) { + Object underlying = _store.underlyingObject(); + if (underlying instanceof ByteBuffer) { + return ((ByteBuffer) underlying) + .duplicate() + .position((int) offset) + .limit((int) offset + size) + .slice() + .order(byteOrder); + } + return ByteBufferUtil.newDirectByteBuffer(_store.addressForRead(size), size, _store) + .order(byteOrder); + } + + @Override + public void flush() { + if (_flushable && _store instanceof MappedBytesStore) { + ((MappedBytesStore) _store).syncUpTo(_store.capacity()); + } + } + + @Override + public void release() + throws IOException { + _store.releaseLast(); + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/chronicle/ChroniclePinotBufferFactory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/chronicle/ChroniclePinotBufferFactory.java new file mode 100644 index 000000000000..8e23e589b36a --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/chronicle/ChroniclePinotBufferFactory.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.memory.chronicle; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import net.openhft.chronicle.bytes.BytesStore; +import net.openhft.chronicle.bytes.MappedBytes; +import org.apache.pinot.segment.spi.memory.OnlyNativePinotBufferFactory; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; + + +public class ChroniclePinotBufferFactory extends OnlyNativePinotBufferFactory { + @Override + protected PinotDataBuffer allocateDirect(long size) { + BytesStore store; + if (size < Integer.MAX_VALUE) { + store = BytesStore.wrap(ByteBuffer.allocateDirect((int) size)); + } else { + store = BytesStore.nativeStore(size); + } + return new ChronicleDataBuffer(store, true, false); + } + + @Override + protected PinotDataBuffer mapFile(File file, boolean readOnly, long offset, long size) + throws IOException { + MappedBytes mappedBytes = MappedBytes.singleMappedBytes(file, size, readOnly); + return new ChronicleDataBuffer(mappedBytes, true, true); + } +} diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferTest.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferTest.java index 1538351c152b..0217bf8228ff 100644 --- a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferTest.java +++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferTest.java @@ -603,15 +603,15 @@ public void testConstructors() randomAccessFile.setLength(FILE_OFFSET + LARGE_BUFFER_SIZE); try (PinotDataBuffer buffer1 = PinotDataBuffer .allocateDirect(LARGE_BUFFER_SIZE, PinotDataBuffer.NON_NATIVE_ORDER, null)) { - Assert.assertTrue(buffer1 instanceof PinotNonNativeOrderLBuffer); + //Assert.assertTrue(buffer1 instanceof PinotNonNativeOrderLBuffer); testBufferStats(1, LARGE_BUFFER_SIZE, 0, 0); try (PinotDataBuffer buffer2 = PinotDataBuffer .loadFile(TEMP_FILE, FILE_OFFSET, LARGE_BUFFER_SIZE, PinotDataBuffer.NON_NATIVE_ORDER, null)) { - Assert.assertTrue(buffer2 instanceof PinotNonNativeOrderLBuffer); + //Assert.assertTrue(buffer2 instanceof PinotNonNativeOrderLBuffer); testBufferStats(2, 2 * LARGE_BUFFER_SIZE, 0, 0); try (PinotDataBuffer buffer3 = PinotDataBuffer .mapFile(TEMP_FILE, true, FILE_OFFSET, LARGE_BUFFER_SIZE, PinotDataBuffer.NON_NATIVE_ORDER, null)) { - Assert.assertTrue(buffer3 instanceof PinotNonNativeOrderLBuffer); +// Assert.assertTrue(buffer3 instanceof PinotNonNativeOrderLBuffer); testBufferStats(2, 2 * LARGE_BUFFER_SIZE, 1, LARGE_BUFFER_SIZE); } testBufferStats(2, 2 * LARGE_BUFFER_SIZE, 0, 0); From 9ed70ca90bcce96d273691751618bd1c44fcd93c Mon Sep 17 00:00:00 2001 From: Gonzalo Ortiz Date: Fri, 25 Nov 2022 10:43:22 +0100 Subject: [PATCH 2/2] Added test to chronicle bytes. Still not working after finding some bugs in chronicle bytes --- .../segment/spi/memory/BasePinotLBuffer.java | 2 +- .../segment/spi/memory/ByteBufferUtil.java | 76 +++- .../spi/memory/PinotBufferFactory.java | 1 + .../segment/spi/memory/PinotDataBuffer.java | 18 +- .../memory/chronicle/ChronicleDataBuffer.java | 118 +++++-- .../ChroniclePinotBufferFactory.java | 14 +- .../spi/memory/PinotByteBufferTest.java | 7 + .../memory/PinotChronicleByteBufferTest.java | 10 + .../PinotDataBufferInstanceTestBase.java | 330 ++++++++++++++++++ .../spi/memory/PinotDataBufferStaticTest.java | 4 + .../spi/memory/PinotDataBufferTest.java | 323 +---------------- .../spi/memory/PinotDataBufferTestBase.java | 88 +++++ .../spi/memory/PinotLArrayByteBufferTest.java | 19 + pom.xml | 2 +- 14 files changed, 633 insertions(+), 379 deletions(-) create mode 100644 pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotByteBufferTest.java create mode 100644 pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotChronicleByteBufferTest.java create mode 100644 pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferInstanceTestBase.java create mode 100644 pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferStaticTest.java create mode 100644 pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferTestBase.java create mode 100644 pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotLArrayByteBufferTest.java diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/BasePinotLBuffer.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/BasePinotLBuffer.java index 3cf9c7710216..0361700874c8 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/BasePinotLBuffer.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/BasePinotLBuffer.java @@ -140,7 +140,7 @@ public PinotDataBuffer view(long start, long end, ByteOrder byteOrder) { @Override public ByteBuffer toDirectByteBuffer(long offset, int size, ByteOrder byteOrder) { - return _buffer.toDirectByteBuffer(offset, size).order(byteOrder); + return ByteBufferUtil.newDirectByteBuffer(offset, size, this).order(byteOrder); } @Override diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/ByteBufferUtil.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/ByteBufferUtil.java index 70c90f68cd1c..7845129ee291 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/ByteBufferUtil.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/ByteBufferUtil.java @@ -18,35 +18,77 @@ */ package org.apache.pinot.segment.spi.memory; +import com.google.common.collect.Lists; import java.lang.reflect.Constructor; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Supplier; public class ByteBufferUtil { - private static final Constructor _dbbCC = findDirectByteBufferConstructor(); + private static final ByteBufferCreator _creator; + private static final List _SUPPLIERS = Lists.newArrayList( + () -> { + Constructor dbbCC = + (Constructor) Class.forName("java.nio.DirectByteBuffer") + .getDeclaredConstructor(Long.TYPE, Integer.TYPE, Object.class); + return (addr, size, att) -> { + dbbCC.setAccessible(true); + try { + return dbbCC.newInstance(Long.valueOf(addr), Integer.valueOf(size), att); + } catch (Exception e) { + throw new IllegalStateException("Failed to create DirectByteBuffer", e); + } + }; + }, + () -> { + Constructor dbbCC = + (Constructor) Class.forName("java.nio.DirectByteBuffer") + .getDeclaredConstructor(Long.TYPE, Integer.TYPE); + return (addr, size, att) -> { + dbbCC.setAccessible(true); + try { + return dbbCC.newInstance(Long.valueOf(addr), Integer.valueOf(size)); + } catch (Exception e) { + throw new IllegalStateException("Failed to create DirectByteBuffer", e); + } + }; + } + ); private ByteBufferUtil() { } - public static ByteBuffer newDirectByteBuffer(long addr, int size, Object att) { - _dbbCC.setAccessible(true); - try { - return _dbbCC.newInstance(Long.valueOf(addr), Integer.valueOf(size), att); - } catch (Exception e) { - throw new IllegalStateException("Failed to create DirectByteBuffer", e); + static { + ByteBufferCreator creator = null; + Exception firstException = null; + for (CreatorSupplier supplier : _SUPPLIERS) { + try { + creator = supplier.createCreator(); + } catch (ClassNotFoundException | NoSuchMethodException e) { + if (firstException == null) { + firstException = e; + } + } + } + if (creator == null) { + throw new IllegalStateException("Cannot find a way to instantiate DirectByteBuffer. " + + "Please verify you are using a supported JVM", firstException); } + _creator = creator; } - @SuppressWarnings("unchecked") - private static Constructor findDirectByteBufferConstructor() { - try { - return (Constructor) Class.forName("java.nio.DirectByteBuffer") - .getDeclaredConstructor(Long.TYPE, Integer.TYPE, Object.class); - } catch (ClassNotFoundException e) { - throw new IllegalStateException("Failed to find java.nio.DirectByteBuffer", e); - } catch (NoSuchMethodException e) { - throw new IllegalStateException("Failed to find constructor f java.nio.DirectByteBuffer", e); - } + public static ByteBuffer newDirectByteBuffer(long addr, int size, Object att) { + return _creator.newDirectByteBuffer(addr, size, att); + } + + private interface CreatorSupplier { + ByteBufferCreator createCreator() throws ClassNotFoundException, NoSuchMethodException; + } + + private interface ByteBufferCreator { + ByteBuffer newDirectByteBuffer(long addr, int size, Object att); } } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotBufferFactory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotBufferFactory.java index 873a21516db5..290069223197 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotBufferFactory.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotBufferFactory.java @@ -29,6 +29,7 @@ public interface PinotBufferFactory { default PinotDataBuffer readFile(File file, long offset, long size, ByteOrder byteOrder) throws IOException { + // TODO: Shouldn't we allocate size - offset instead of size? PinotDataBuffer buffer = allocateDirect(size, byteOrder); buffer.readFrom(0, file, offset, size); return buffer; diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotDataBuffer.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotDataBuffer.java index 3514fb1312b8..aa388998fc61 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotDataBuffer.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotDataBuffer.java @@ -114,9 +114,12 @@ public static void useFactory(PinotBufferFactory factory) { private static PinotBufferFactory createDefaultFactory() { String factoryClassName; // TODO: If chronicle is going to be in their own package, use another way to get the runtime version - if (Jvm.majorVersion() > 11) { + int jvmVersion = Jvm.majorVersion(); + if (jvmVersion > 11) { + LOGGER.info("Using Chronicle Bytes as buffer on JVM version {}", jvmVersion); factoryClassName = ChroniclePinotBufferFactory.class.getCanonicalName(); } else { + LOGGER.info("Using LArray as buffer on JVM version {}", jvmVersion); factoryClassName = LArrayPinotBufferFactory.class.getCanonicalName(); } return createFactory(factoryClassName, true); @@ -503,4 +506,17 @@ public abstract void release() public boolean isCloseable() { return _closeable; } + + protected void checkLimits(long capacity, long offset, long size) { + if (offset < 0) { + throw new IllegalArgumentException("Offset " + offset + " cannot be negative"); + } + if (size < 0) { + throw new IllegalArgumentException("Size " + size + " cannot be negative"); + } + if (offset + size > capacity) { + throw new IllegalArgumentException("Size (" + size + ") + offset (" + offset + ") exceeds the capacity of " + + capacity); + } + } } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/chronicle/ChronicleDataBuffer.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/chronicle/ChronicleDataBuffer.java index 5fcd3cdca220..8219999e2705 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/chronicle/ChronicleDataBuffer.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/chronicle/ChronicleDataBuffer.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.segment.spi.memory.chronicle; +import com.google.common.base.Preconditions; import java.io.File; import java.io.FileInputStream; import java.io.IOException; @@ -26,7 +27,7 @@ import java.nio.channels.FileChannel; import net.openhft.chronicle.bytes.Bytes; import net.openhft.chronicle.bytes.BytesStore; -import net.openhft.chronicle.bytes.MappedBytesStore; +import net.openhft.chronicle.bytes.MappedBytes; import org.apache.pinot.segment.spi.memory.ByteBufferUtil; import org.apache.pinot.segment.spi.memory.NonNativePinotDataBuffer; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; @@ -34,13 +35,29 @@ public class ChronicleDataBuffer extends PinotDataBuffer { - private final BytesStore _store; + private final Bytes _store; private final boolean _flushable; + private boolean _closed = false; + private final long _viewStart; + private final long _viewEnd; - public ChronicleDataBuffer(BytesStore store, boolean closeable, boolean flushable) { + public ChronicleDataBuffer(Bytes store, boolean closeable, boolean flushable, long start, long end) { super(closeable); + Preconditions.checkArgument(start >= 0, "Invalid start " + start + ". It must be > 0"); + Preconditions.checkArgument(start <= end, "Invalid start " + start + " and end " + end + + ". Start cannot be greater than end"); + Preconditions.checkArgument(end + store.start() <= store.capacity(), "Invalid end " + end + + ". It cannot be greater than capacity (" + (store.capacity() - store.start()) + ")"); _store = store; _flushable = flushable; + _viewStart = start + store.start(); + _viewEnd = end + store.start(); + + _store.writePosition(_store.capacity()); + } + + public ChronicleDataBuffer(ChronicleDataBuffer other, long start, long end) { + this(other._store, false, other._flushable, start, end); } @Override @@ -60,57 +77,57 @@ public char getChar(long offset) { @Override public void putChar(long offset, char value) { - _store.writeUnsignedShort(offset, value); + _store.writeUnsignedShort(offset + _viewStart, value); } @Override public short getShort(long offset) { - return _store.readShort(offset); + return _store.readShort(offset + _viewStart); } @Override public void putShort(long offset, short value) { - _store.writeShort(offset, value); + _store.writeShort(offset + _viewStart, value); } @Override public int getInt(long offset) { - return _store.readInt(offset); + return _store.readInt(offset + _viewStart); } @Override public void putInt(long offset, int value) { - _store.writeInt(offset, value); + _store.writeInt(offset + _viewStart, value); } @Override public long getLong(long offset) { - return _store.readLong(offset); + return _store.readLong(offset + _viewStart); } @Override public void putLong(long offset, long value) { - _store.writeLong(offset, value); + _store.writeLong(offset + _viewStart, value); } @Override public float getFloat(long offset) { - return _store.readFloat(offset); + return _store.readFloat(offset + _viewStart); } @Override public void putFloat(long offset, float value) { - _store.writeFloat(offset, value); + _store.writeFloat(offset + _viewStart, value); } @Override public double getDouble(long offset) { - return _store.readDouble(offset); + return _store.readDouble(offset + _viewStart); } @Override public void putDouble(long offset, double value) { - _store.writeDouble(offset, value); + _store.writeDouble(offset + _viewStart, value); } @Override @@ -118,61 +135,78 @@ public void copyTo(long offset, byte[] buffer, int destOffset, int size) { Bytes dest = Bytes.wrapForWrite(buffer); dest.writePosition(destOffset); dest.writeLimit(destOffset + size); + _store.readPosition(offset + _viewStart); _store.copyTo(dest); } @Override public void copyTo(long offset, PinotDataBuffer buffer, long destOffset, long size) { long actualSize = Math.min(size, buffer.size() - destOffset); - actualSize = Math.min(actualSize, _store.length() - offset); + actualSize = Math.min(actualSize, _store.capacity() - offset + _viewStart); if (buffer instanceof ChronicleDataBuffer) { ChronicleDataBuffer other = (ChronicleDataBuffer) buffer; - BytesStore dest = other._store.subBytes(destOffset, size); - _store.copyTo(dest); + if (_store == other._store) { + _store.move(offset + _viewStart, destOffset + other._viewStart, size); + } else { + _store.readPosition(offset + _viewStart); + other._store.writePosition(destOffset + other._viewStart); + _store.copyTo(other._store); + } } else { - long read = 0; - byte[] arr = new byte[4096]; - while (read < actualSize) { - long offsetOnStore = offset + read; - int bytesToRead = Math.min((int) (actualSize - read), 4096); - long inc = _store.read(offsetOnStore, arr, 0, bytesToRead); - assert inc == bytesToRead; - buffer.readFrom(offsetOnStore, arr, 0, bytesToRead); - read += inc; + long copied = 0; + while (copied < actualSize) { + int subSize = Math.min((int) (actualSize - copied), Integer.MAX_VALUE); + _store.readPosition(offset + _viewStart + copied); + _store.readLimit(offset + _viewStart + copied + subSize); + BytesStore into = BytesStore.wrap(buffer.toDirectByteBuffer(destOffset, subSize)); + copied += _store.copyTo(into); } + _store.readLimit(_store.capacity()); } } @Override public void readFrom(long offset, byte[] buffer, int srcOffset, int size) { - _store.write(offset, buffer, srcOffset, size); + _store.write(offset + _viewStart, buffer, srcOffset, size); } @Override public void readFrom(long offset, ByteBuffer buffer) { - _store.write(offset, buffer, buffer.position(), buffer.remaining()); + ByteBuffer nativeBuf; + if (buffer.order() == ByteOrder.nativeOrder()) { + nativeBuf = buffer; + } else { + nativeBuf = buffer.duplicate().order(ByteOrder.nativeOrder()); + } + _store.write(offset + _viewStart, nativeBuf, nativeBuf.position(), nativeBuf.remaining()); } @Override public void readFrom(long offset, File file, long srcOffset, long size) throws IOException { + checkLimits(_viewEnd - _viewStart, offset, size); try (FileInputStream fos = new FileInputStream(file); FileChannel channel = fos.getChannel()) { - channel.position(offset); + if (srcOffset + size > file.length()) { + throw new IllegalArgumentException("Final position cannot be larger than the file length"); + } + channel.position(srcOffset); int len; byte[] buffer = new byte[4096]; - long offsetOnStore = offset; - while ((len = fos.read(buffer)) > 0) { + long offsetOnStore = offset + _viewStart; + long pendingBytes = size - srcOffset; + while (pendingBytes >= 0 && (len = fos.read(buffer, 0, Math.min((int) pendingBytes, 4096))) > 0) { _store.write(offsetOnStore, buffer, 0, len); offsetOnStore += len; + pendingBytes -= len; } } } @Override public long size() { - return _store.capacity(); + return _viewEnd - _viewStart; } @Override @@ -182,7 +216,11 @@ public ByteOrder order() { @Override public PinotDataBuffer view(long start, long end, ByteOrder byteOrder) { - ChronicleDataBuffer buffer = new ChronicleDataBuffer(_store.subBytes(start, end - start), false, false); + Preconditions.checkArgument(start >= 0, "Start cannot be negative"); + Preconditions.checkArgument(start < _viewEnd - _viewStart, + "Start cannot be larger than this buffer capacity"); + Preconditions.checkArgument(end <= _viewEnd, "End cannot be higher than this buffer capacity"); + ChronicleDataBuffer buffer = new ChronicleDataBuffer(this, start + _viewStart, end + _viewEnd); return byteOrder == ByteOrder.nativeOrder() ? buffer : new NonNativePinotDataBuffer(buffer); } @@ -198,20 +236,24 @@ public ByteBuffer toDirectByteBuffer(long offset, int size, ByteOrder byteOrder) .slice() .order(byteOrder); } - return ByteBufferUtil.newDirectByteBuffer(_store.addressForRead(size), size, _store) + return ByteBufferUtil.newDirectByteBuffer(_store.addressForRead(size) + offset + _viewStart, size, _store) .order(byteOrder); } @Override public void flush() { - if (_flushable && _store instanceof MappedBytesStore) { - ((MappedBytesStore) _store).syncUpTo(_store.capacity()); + if (_flushable && _store instanceof MappedBytes) { + _store.writePosition(_store.capacity()); + ((MappedBytes) _store).sync(); } } @Override - public void release() + public synchronized void release() throws IOException { - _store.releaseLast(); + if (!_closed) { + _closed = true; + _store.releaseLast(); + } } } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/chronicle/ChroniclePinotBufferFactory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/chronicle/ChroniclePinotBufferFactory.java index 8e23e589b36a..e6d0e39797f1 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/chronicle/ChroniclePinotBufferFactory.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/chronicle/ChroniclePinotBufferFactory.java @@ -21,7 +21,7 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; -import net.openhft.chronicle.bytes.BytesStore; +import net.openhft.chronicle.bytes.Bytes; import net.openhft.chronicle.bytes.MappedBytes; import org.apache.pinot.segment.spi.memory.OnlyNativePinotBufferFactory; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; @@ -30,19 +30,19 @@ public class ChroniclePinotBufferFactory extends OnlyNativePinotBufferFactory { @Override protected PinotDataBuffer allocateDirect(long size) { - BytesStore store; + Bytes store; if (size < Integer.MAX_VALUE) { - store = BytesStore.wrap(ByteBuffer.allocateDirect((int) size)); + store = Bytes.wrapForWrite(ByteBuffer.allocateDirect((int) size)); } else { - store = BytesStore.nativeStore(size); + store = Bytes.allocateDirect(size); } - return new ChronicleDataBuffer(store, true, false); + return new ChronicleDataBuffer(store, true, false, 0, size); } @Override protected PinotDataBuffer mapFile(File file, boolean readOnly, long offset, long size) throws IOException { - MappedBytes mappedBytes = MappedBytes.singleMappedBytes(file, size, readOnly); - return new ChronicleDataBuffer(mappedBytes, true, true); + MappedBytes mappedBytes = MappedBytes.singleMappedBytes(file, offset + size, readOnly); + return new ChronicleDataBuffer(mappedBytes, true, true, offset, size + offset); } } diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotByteBufferTest.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotByteBufferTest.java new file mode 100644 index 000000000000..97b144d6dd68 --- /dev/null +++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotByteBufferTest.java @@ -0,0 +1,7 @@ +package org.apache.pinot.segment.spi.memory; + +public class PinotByteBufferTest extends PinotDataBufferInstanceTestBase { + public PinotByteBufferTest() { + super(new ByteBufferPinotBufferFactory()); + } +} diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotChronicleByteBufferTest.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotChronicleByteBufferTest.java new file mode 100644 index 000000000000..23afa356e594 --- /dev/null +++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotChronicleByteBufferTest.java @@ -0,0 +1,10 @@ +package org.apache.pinot.segment.spi.memory; + +import org.apache.pinot.segment.spi.memory.chronicle.ChroniclePinotBufferFactory; + + +public class PinotChronicleByteBufferTest extends PinotDataBufferInstanceTestBase { + public PinotChronicleByteBufferTest() { + super(new ChroniclePinotBufferFactory()); + } +} diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferInstanceTestBase.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferInstanceTestBase.java new file mode 100644 index 000000000000..4c6f2e548fd2 --- /dev/null +++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferInstanceTestBase.java @@ -0,0 +1,330 @@ +package org.apache.pinot.segment.spi.memory; + +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import java.util.concurrent.Future; +import net.openhft.chronicle.core.Jvm; +import org.apache.commons.io.FileUtils; +import org.testng.Assert; +import org.testng.SkipException; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public abstract class PinotDataBufferInstanceTestBase extends PinotDataBufferTestBase { + + public final PinotBufferFactory _factory; + + public PinotDataBufferInstanceTestBase(PinotBufferFactory factory) { + _factory = factory; + } + + @SuppressWarnings("RedundantExplicitClose") + @Test + public void testMultipleClose() + throws Exception { + try (PinotDataBuffer buffer = _factory.allocateDirect(BUFFER_SIZE, PinotDataBuffer.NATIVE_ORDER)) { + buffer.close(); + } + try (PinotDataBuffer buffer = _factory.allocateDirect(BUFFER_SIZE, PinotDataBuffer.NATIVE_ORDER)) { + buffer.close(); + } + try (RandomAccessFile randomAccessFile = new RandomAccessFile(TEMP_FILE, "rw")) { + randomAccessFile.setLength(FILE_OFFSET + BUFFER_SIZE); + try (PinotDataBuffer buffer = _factory.readFile(TEMP_FILE, FILE_OFFSET, BUFFER_SIZE, ByteOrder.LITTLE_ENDIAN)) { + buffer.close(); + } + try (PinotDataBuffer buffer = _factory.readFile(TEMP_FILE, FILE_OFFSET, BUFFER_SIZE, ByteOrder.BIG_ENDIAN)) { + buffer.close(); + } + try (PinotDataBuffer buffer = _factory + .mapFile(TEMP_FILE, true, FILE_OFFSET, BUFFER_SIZE, ByteOrder.LITTLE_ENDIAN)) { + buffer.close(); + } + try (PinotDataBuffer buffer = + _factory.mapFile(TEMP_FILE, true, FILE_OFFSET, BUFFER_SIZE, ByteOrder.BIG_ENDIAN)) { + buffer.close(); + } + } finally { + FileUtils.forceDelete(TEMP_FILE); + } + } + + @Test + public void testDirectBE() + throws Exception { + try (PinotDataBuffer buffer = _factory.allocateDirect(BUFFER_SIZE, ByteOrder.BIG_ENDIAN)) { + Assert.assertSame(buffer.order(), ByteOrder.BIG_ENDIAN); + testPinotDataBuffer(buffer); + } + } + + @Test + public void testDirectLE() + throws Exception { + try (PinotDataBuffer buffer = PinotByteBuffer.allocateDirect(BUFFER_SIZE, ByteOrder.LITTLE_ENDIAN)) { + Assert.assertSame(buffer.order(), ByteOrder.LITTLE_ENDIAN); + testPinotDataBuffer(buffer); + } + } + + @Test + public void testReadFileBE() + throws Exception { + try (RandomAccessFile randomAccessFile = new RandomAccessFile(TEMP_FILE, "rw")) { + randomAccessFile.setLength(FILE_OFFSET + BUFFER_SIZE); + try (PinotDataBuffer buffer = _factory + .readFile(TEMP_FILE, FILE_OFFSET, BUFFER_SIZE, ByteOrder.BIG_ENDIAN)) { + Assert.assertSame(buffer.order(), ByteOrder.BIG_ENDIAN); + testPinotDataBuffer(buffer); + } + } + } + + @Test + public void testReadFileLE() + throws Exception { + try (RandomAccessFile randomAccessFile = new RandomAccessFile(TEMP_FILE, "rw")) { + randomAccessFile.setLength(FILE_OFFSET + BUFFER_SIZE); + try (PinotDataBuffer buffer = _factory + .readFile(TEMP_FILE, FILE_OFFSET, BUFFER_SIZE, ByteOrder.LITTLE_ENDIAN)) { + Assert.assertSame(buffer.order(), ByteOrder.LITTLE_ENDIAN); + testPinotDataBuffer(buffer); + } + } + } + + @Test + public void testMapFileBE() + throws Exception { + try (RandomAccessFile randomAccessFile = new RandomAccessFile(TEMP_FILE, "rw")) { + randomAccessFile.setLength(FILE_OFFSET + BUFFER_SIZE); + try (PinotDataBuffer buffer = _factory + .mapFile(TEMP_FILE, false, FILE_OFFSET, BUFFER_SIZE, ByteOrder.BIG_ENDIAN)) { + Assert.assertSame(buffer.order(), ByteOrder.BIG_ENDIAN); + testPinotDataBuffer(buffer); + } + } + } + + @Test + public void testMapFileLE() + throws Exception { + try (RandomAccessFile randomAccessFile = new RandomAccessFile(TEMP_FILE, "rw")) { + randomAccessFile.setLength(FILE_OFFSET + BUFFER_SIZE); + try (PinotDataBuffer buffer = _factory + .mapFile(TEMP_FILE, false, FILE_OFFSET, BUFFER_SIZE, ByteOrder.LITTLE_ENDIAN)) { + Assert.assertSame(buffer.order(), ByteOrder.LITTLE_ENDIAN); + testPinotDataBuffer(buffer); + } + } + } + + protected void testPinotDataBuffer(PinotDataBuffer buffer) + throws Exception { + Assert.assertEquals(buffer.size(), BUFFER_SIZE); + testReadWriteByte(buffer); + testReadWriteChar(buffer); + testReadWriteShort(buffer); + testReadWriteInt(buffer); + testReadWriteLong(buffer); + testReadWriteFloat(buffer); + testReadWriteDouble(buffer); + testReadWriteBytes(buffer); + testReadWritePinotDataBuffer(buffer); + testReadFromByteBuffer(buffer); + testConcurrentReadWrite(buffer); + } + + protected void testReadWriteByte(PinotDataBuffer buffer) { + for (int i = 0; i < NUM_ROUNDS; i++) { + int intOffset = RANDOM.nextInt(BUFFER_SIZE); + buffer.putByte(intOffset, _bytes[i]); + Assert.assertEquals(buffer.getByte(intOffset), _bytes[i]); + } + for (int i = 0; i < NUM_ROUNDS; i++) { + long longOffset = RANDOM.nextInt(BUFFER_SIZE); + buffer.putByte(longOffset, _bytes[i]); + Assert.assertEquals(buffer.getByte(longOffset), _bytes[i]); + } + } + + protected void testReadWriteChar(PinotDataBuffer buffer) { + for (int i = 0; i < NUM_ROUNDS; i++) { + int index = RANDOM.nextInt(CHAR_ARRAY_LENGTH); + int intOffset = index * Byte.BYTES; + buffer.putChar(intOffset, _chars[i]); + Assert.assertEquals(buffer.getChar(intOffset), _chars[i]); + } + for (int i = 0; i < NUM_ROUNDS; i++) { + int index = RANDOM.nextInt(CHAR_ARRAY_LENGTH); + long longOffset = index * Byte.BYTES; + buffer.putChar(longOffset, _chars[i]); + Assert.assertEquals(buffer.getChar(longOffset), _chars[i]); + } + } + + protected void testReadWriteShort(PinotDataBuffer buffer) { + for (int i = 0; i < NUM_ROUNDS; i++) { + int index = RANDOM.nextInt(SHORT_ARRAY_LENGTH); + int intOffset = index * Byte.BYTES; + buffer.putShort(intOffset, _shorts[i]); + Assert.assertEquals(buffer.getShort(intOffset), _shorts[i]); + } + for (int i = 0; i < NUM_ROUNDS; i++) { + int index = RANDOM.nextInt(SHORT_ARRAY_LENGTH); + long longOffset = index * Byte.BYTES; + buffer.putShort(longOffset, _shorts[i]); + Assert.assertEquals(buffer.getShort(longOffset), _shorts[i]); + } + } + + protected void testReadWriteInt(PinotDataBuffer buffer) { + for (int i = 0; i < NUM_ROUNDS; i++) { + int index = RANDOM.nextInt(INT_ARRAY_LENGTH); + int intOffset = index * Byte.BYTES; + buffer.putInt(intOffset, _ints[i]); + Assert.assertEquals(buffer.getInt(intOffset), _ints[i]); + } + for (int i = 0; i < NUM_ROUNDS; i++) { + int index = RANDOM.nextInt(INT_ARRAY_LENGTH); + long longOffset = index * Byte.BYTES; + buffer.putInt(longOffset, _ints[i]); + Assert.assertEquals(buffer.getInt(longOffset), _ints[i]); + } + } + + protected void testReadWriteLong(PinotDataBuffer buffer) { + for (int i = 0; i < NUM_ROUNDS; i++) { + int index = RANDOM.nextInt(LONG_ARRAY_LENGTH); + int intOffset = index * Byte.BYTES; + buffer.putLong(intOffset, _longs[i]); + Assert.assertEquals(buffer.getLong(intOffset), _longs[i]); + } + for (int i = 0; i < NUM_ROUNDS; i++) { + int index = RANDOM.nextInt(LONG_ARRAY_LENGTH); + long longOffset = index * Byte.BYTES; + buffer.putLong(longOffset, _longs[i]); + Assert.assertEquals(buffer.getLong(longOffset), _longs[i]); + } + } + + protected void testReadWriteFloat(PinotDataBuffer buffer) { + for (int i = 0; i < NUM_ROUNDS; i++) { + int index = RANDOM.nextInt(FLOAT_ARRAY_LENGTH); + int intOffset = index * Byte.BYTES; + buffer.putFloat(intOffset, _floats[i]); + Assert.assertEquals(buffer.getFloat(intOffset), _floats[i]); + } + for (int i = 0; i < NUM_ROUNDS; i++) { + int index = RANDOM.nextInt(FLOAT_ARRAY_LENGTH); + long longOffset = index * Byte.BYTES; + buffer.putFloat(longOffset, _floats[i]); + Assert.assertEquals(buffer.getFloat(longOffset), _floats[i]); + } + } + + protected void testReadWriteDouble(PinotDataBuffer buffer) { + for (int i = 0; i < NUM_ROUNDS; i++) { + int index = RANDOM.nextInt(DOUBLE_ARRAY_LENGTH); + int intOffset = index * Byte.BYTES; + buffer.putDouble(intOffset, _doubles[i]); + Assert.assertEquals(buffer.getDouble(intOffset), _doubles[i]); + } + for (int i = 0; i < NUM_ROUNDS; i++) { + int index = RANDOM.nextInt(DOUBLE_ARRAY_LENGTH); + long longOffset = index * Byte.BYTES; + buffer.putDouble(longOffset, _doubles[i]); + Assert.assertEquals(buffer.getDouble(longOffset), _doubles[i]); + } + } + + protected void testReadWriteBytes(PinotDataBuffer buffer) { + byte[] readBuffer = new byte[MAX_BYTES_LENGTH]; + byte[] writeBuffer = new byte[MAX_BYTES_LENGTH]; + for (int i = 0; i < NUM_ROUNDS; i++) { + int length = RANDOM.nextInt(MAX_BYTES_LENGTH); + int offset = RANDOM.nextInt(BUFFER_SIZE - length); + int arrayOffset = RANDOM.nextInt(MAX_BYTES_LENGTH - length); + System.arraycopy(_bytes, offset, readBuffer, arrayOffset, length); + buffer.readFrom(offset, readBuffer, arrayOffset, length); + buffer.copyTo(offset, writeBuffer, arrayOffset, length); + int end = arrayOffset + length; + for (int j = arrayOffset; j < end; j++) { + Assert.assertEquals(writeBuffer[j], readBuffer[j]); + } + } + } + + protected void testReadWritePinotDataBuffer(PinotDataBuffer buffer) { + testReadWritePinotDataBuffer(buffer, PinotByteBuffer.allocateDirect(MAX_BYTES_LENGTH, PinotDataBuffer.NATIVE_ORDER), + PinotByteBuffer.allocateDirect(MAX_BYTES_LENGTH, PinotDataBuffer.NON_NATIVE_ORDER)); + testReadWritePinotDataBuffer(buffer, + PinotByteBuffer.allocateDirect(2 * MAX_BYTES_LENGTH, PinotDataBuffer.NON_NATIVE_ORDER) + .view(MAX_BYTES_LENGTH, 2 * MAX_BYTES_LENGTH), + PinotByteBuffer.allocateDirect(2 * MAX_BYTES_LENGTH, PinotDataBuffer.NATIVE_ORDER) + .view(MAX_BYTES_LENGTH, 2 * MAX_BYTES_LENGTH)); + testReadWritePinotDataBuffer(buffer, _factory.allocateDirect(MAX_BYTES_LENGTH, ByteOrder.nativeOrder()), + _factory.allocateDirect(MAX_BYTES_LENGTH, PinotDataBuffer.NON_NATIVE_ORDER)); + testReadWritePinotDataBuffer(buffer, + _factory.allocateDirect(2 * MAX_BYTES_LENGTH, ByteOrder.nativeOrder()) + .view(MAX_BYTES_LENGTH, 2 * MAX_BYTES_LENGTH), + _factory.allocateDirect(2 * MAX_BYTES_LENGTH, PinotDataBuffer.NON_NATIVE_ORDER) + .view(MAX_BYTES_LENGTH, 2 * MAX_BYTES_LENGTH)); + } + + protected void testReadWritePinotDataBuffer(PinotDataBuffer buffer, PinotDataBuffer readBuffer, + PinotDataBuffer writeBuffer) { + for (int i = 0; i < NUM_ROUNDS; i++) { + int length = RANDOM.nextInt(MAX_BYTES_LENGTH); + int offset = RANDOM.nextInt(BUFFER_SIZE - length); + readBuffer.readFrom(0, _bytes, RANDOM.nextInt(BUFFER_SIZE - length), length); + readBuffer.copyTo(0, buffer, offset, length); + for (int j = 0; j < length; j++) { + Assert.assertEquals(buffer.getByte(j + offset), readBuffer.getByte(j)); + } + buffer.copyTo(offset, writeBuffer, 0, length); + for (int j = 0; j < length; j++) { + Assert.assertEquals(writeBuffer.getByte(j), readBuffer.getByte(j)); + } + } + } + + protected void testReadFromByteBuffer(PinotDataBuffer buffer) { + byte[] readBuffer = new byte[MAX_BYTES_LENGTH]; + for (int i = 0; i < NUM_ROUNDS; i++) { + int length = RANDOM.nextInt(MAX_BYTES_LENGTH); + int offset = RANDOM.nextInt(BUFFER_SIZE - length); + System.arraycopy(_bytes, offset, readBuffer, 0, length); + buffer.readFrom(offset, ByteBuffer.wrap(readBuffer, 0, length)); + for (int j = 0; j < length; j++) { + Assert.assertEquals(buffer.getByte(offset + j), readBuffer[j]); + } + } + } + + protected void testConcurrentReadWrite(PinotDataBuffer buffer) + throws Exception { + Future[] futures = new Future[NUM_ROUNDS]; + for (int i = 0; i < NUM_ROUNDS; i++) { + futures[i] = EXECUTOR_SERVICE.submit(() -> { + int length = RANDOM.nextInt(MAX_BYTES_LENGTH); + int offset = RANDOM.nextInt(BUFFER_SIZE - length); + byte[] readBuffer = new byte[length]; + byte[] writeBuffer = new byte[length]; + System.arraycopy(_bytes, offset, readBuffer, 0, length); + buffer.readFrom(offset, readBuffer); + buffer.copyTo(offset, writeBuffer); + Assert.assertTrue(Arrays.equals(readBuffer, writeBuffer)); + buffer.readFrom(offset, ByteBuffer.wrap(readBuffer)); + buffer.copyTo(offset, writeBuffer); + Assert.assertTrue(Arrays.equals(readBuffer, writeBuffer)); + }); + } + for (int i = 0; i < NUM_ROUNDS; i++) { + futures[i].get(); + } + } +} diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferStaticTest.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferStaticTest.java new file mode 100644 index 000000000000..0f95263c2d13 --- /dev/null +++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferStaticTest.java @@ -0,0 +1,4 @@ +package org.apache.pinot.segment.spi.memory; + +public class PinotDataBufferStaticTest { +} diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferTest.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferTest.java index 0217bf8228ff..bcf51a7890f2 100644 --- a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferTest.java +++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferTest.java @@ -18,69 +18,20 @@ */ package org.apache.pinot.segment.spi.memory; -import java.io.File; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.ByteOrder; import java.util.Arrays; -import java.util.Random; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import org.apache.commons.io.FileUtils; import org.testng.Assert; import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -public class PinotDataBufferTest { - private static final Random RANDOM = new Random(); - private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(10); - private static final File TEMP_FILE = new File(FileUtils.getTempDirectory(), "PinotDataBufferTest"); - private static final int FILE_OFFSET = 10; // Not page-aligned - private static final int BUFFER_SIZE = 10_000; // Not page-aligned - private static final int CHAR_ARRAY_LENGTH = BUFFER_SIZE / Character.BYTES; - private static final int SHORT_ARRAY_LENGTH = BUFFER_SIZE / Short.BYTES; - private static final int INT_ARRAY_LENGTH = BUFFER_SIZE / Integer.BYTES; - private static final int LONG_ARRAY_LENGTH = BUFFER_SIZE / Long.BYTES; - private static final int FLOAT_ARRAY_LENGTH = BUFFER_SIZE / Float.BYTES; - private static final int DOUBLE_ARRAY_LENGTH = BUFFER_SIZE / Double.BYTES; - private static final int NUM_ROUNDS = 1000; - private static final int MAX_BYTES_LENGTH = 100; - private static final long LARGE_BUFFER_SIZE = Integer.MAX_VALUE + 2L; // Not page-aligned - - private byte[] _bytes = new byte[BUFFER_SIZE]; - private char[] _chars = new char[CHAR_ARRAY_LENGTH]; - private short[] _shorts = new short[SHORT_ARRAY_LENGTH]; - private int[] _ints = new int[INT_ARRAY_LENGTH]; - private long[] _longs = new long[LONG_ARRAY_LENGTH]; - private float[] _floats = new float[FLOAT_ARRAY_LENGTH]; - private double[] _doubles = new double[DOUBLE_ARRAY_LENGTH]; - - @BeforeClass - public void setUp() { - for (int i = 0; i < BUFFER_SIZE; i++) { - _bytes[i] = (byte) RANDOM.nextInt(); - } - for (int i = 0; i < CHAR_ARRAY_LENGTH; i++) { - _chars[i] = (char) RANDOM.nextInt(); - } - for (int i = 0; i < SHORT_ARRAY_LENGTH; i++) { - _shorts[i] = (short) RANDOM.nextInt(); - } - for (int i = 0; i < INT_ARRAY_LENGTH; i++) { - _ints[i] = RANDOM.nextInt(); - } - for (int i = 0; i < LONG_ARRAY_LENGTH; i++) { - _longs[i] = RANDOM.nextLong(); - } - for (int i = 0; i < FLOAT_ARRAY_LENGTH; i++) { - _floats[i] = RANDOM.nextFloat(); - } - for (int i = 0; i < DOUBLE_ARRAY_LENGTH; i++) { - _doubles[i] = RANDOM.nextDouble(); - } +public class PinotDataBufferTest extends PinotDataBufferInstanceTestBase { + public PinotDataBufferTest(PinotBufferFactory factory) { + super(factory); } @Test @@ -153,207 +104,6 @@ public void testPinotNonNativeOrderLBuffer() } } - private void testPinotDataBuffer(PinotDataBuffer buffer) - throws Exception { - Assert.assertEquals(buffer.size(), BUFFER_SIZE); - testReadWriteByte(buffer); - testReadWriteChar(buffer); - testReadWriteShort(buffer); - testReadWriteInt(buffer); - testReadWriteLong(buffer); - testReadWriteFloat(buffer); - testReadWriteDouble(buffer); - testReadWriteBytes(buffer); - testReadWritePinotDataBuffer(buffer); - testReadFromByteBuffer(buffer); - testConcurrentReadWrite(buffer); - } - - private void testReadWriteByte(PinotDataBuffer buffer) { - for (int i = 0; i < NUM_ROUNDS; i++) { - int intOffset = RANDOM.nextInt(BUFFER_SIZE); - buffer.putByte(intOffset, _bytes[i]); - Assert.assertEquals(buffer.getByte(intOffset), _bytes[i]); - } - for (int i = 0; i < NUM_ROUNDS; i++) { - long longOffset = RANDOM.nextInt(BUFFER_SIZE); - buffer.putByte(longOffset, _bytes[i]); - Assert.assertEquals(buffer.getByte(longOffset), _bytes[i]); - } - } - - private void testReadWriteChar(PinotDataBuffer buffer) { - for (int i = 0; i < NUM_ROUNDS; i++) { - int index = RANDOM.nextInt(CHAR_ARRAY_LENGTH); - int intOffset = index * Byte.BYTES; - buffer.putChar(intOffset, _chars[i]); - Assert.assertEquals(buffer.getChar(intOffset), _chars[i]); - } - for (int i = 0; i < NUM_ROUNDS; i++) { - int index = RANDOM.nextInt(CHAR_ARRAY_LENGTH); - long longOffset = index * Byte.BYTES; - buffer.putChar(longOffset, _chars[i]); - Assert.assertEquals(buffer.getChar(longOffset), _chars[i]); - } - } - - private void testReadWriteShort(PinotDataBuffer buffer) { - for (int i = 0; i < NUM_ROUNDS; i++) { - int index = RANDOM.nextInt(SHORT_ARRAY_LENGTH); - int intOffset = index * Byte.BYTES; - buffer.putShort(intOffset, _shorts[i]); - Assert.assertEquals(buffer.getShort(intOffset), _shorts[i]); - } - for (int i = 0; i < NUM_ROUNDS; i++) { - int index = RANDOM.nextInt(SHORT_ARRAY_LENGTH); - long longOffset = index * Byte.BYTES; - buffer.putShort(longOffset, _shorts[i]); - Assert.assertEquals(buffer.getShort(longOffset), _shorts[i]); - } - } - - private void testReadWriteInt(PinotDataBuffer buffer) { - for (int i = 0; i < NUM_ROUNDS; i++) { - int index = RANDOM.nextInt(INT_ARRAY_LENGTH); - int intOffset = index * Byte.BYTES; - buffer.putInt(intOffset, _ints[i]); - Assert.assertEquals(buffer.getInt(intOffset), _ints[i]); - } - for (int i = 0; i < NUM_ROUNDS; i++) { - int index = RANDOM.nextInt(INT_ARRAY_LENGTH); - long longOffset = index * Byte.BYTES; - buffer.putInt(longOffset, _ints[i]); - Assert.assertEquals(buffer.getInt(longOffset), _ints[i]); - } - } - - private void testReadWriteLong(PinotDataBuffer buffer) { - for (int i = 0; i < NUM_ROUNDS; i++) { - int index = RANDOM.nextInt(LONG_ARRAY_LENGTH); - int intOffset = index * Byte.BYTES; - buffer.putLong(intOffset, _longs[i]); - Assert.assertEquals(buffer.getLong(intOffset), _longs[i]); - } - for (int i = 0; i < NUM_ROUNDS; i++) { - int index = RANDOM.nextInt(LONG_ARRAY_LENGTH); - long longOffset = index * Byte.BYTES; - buffer.putLong(longOffset, _longs[i]); - Assert.assertEquals(buffer.getLong(longOffset), _longs[i]); - } - } - - private void testReadWriteFloat(PinotDataBuffer buffer) { - for (int i = 0; i < NUM_ROUNDS; i++) { - int index = RANDOM.nextInt(FLOAT_ARRAY_LENGTH); - int intOffset = index * Byte.BYTES; - buffer.putFloat(intOffset, _floats[i]); - Assert.assertEquals(buffer.getFloat(intOffset), _floats[i]); - } - for (int i = 0; i < NUM_ROUNDS; i++) { - int index = RANDOM.nextInt(FLOAT_ARRAY_LENGTH); - long longOffset = index * Byte.BYTES; - buffer.putFloat(longOffset, _floats[i]); - Assert.assertEquals(buffer.getFloat(longOffset), _floats[i]); - } - } - - private void testReadWriteDouble(PinotDataBuffer buffer) { - for (int i = 0; i < NUM_ROUNDS; i++) { - int index = RANDOM.nextInt(DOUBLE_ARRAY_LENGTH); - int intOffset = index * Byte.BYTES; - buffer.putDouble(intOffset, _doubles[i]); - Assert.assertEquals(buffer.getDouble(intOffset), _doubles[i]); - } - for (int i = 0; i < NUM_ROUNDS; i++) { - int index = RANDOM.nextInt(DOUBLE_ARRAY_LENGTH); - long longOffset = index * Byte.BYTES; - buffer.putDouble(longOffset, _doubles[i]); - Assert.assertEquals(buffer.getDouble(longOffset), _doubles[i]); - } - } - - private void testReadWriteBytes(PinotDataBuffer buffer) { - byte[] readBuffer = new byte[MAX_BYTES_LENGTH]; - byte[] writeBuffer = new byte[MAX_BYTES_LENGTH]; - for (int i = 0; i < NUM_ROUNDS; i++) { - int length = RANDOM.nextInt(MAX_BYTES_LENGTH); - int offset = RANDOM.nextInt(BUFFER_SIZE - length); - int arrayOffset = RANDOM.nextInt(MAX_BYTES_LENGTH - length); - System.arraycopy(_bytes, offset, readBuffer, arrayOffset, length); - buffer.readFrom(offset, readBuffer, arrayOffset, length); - buffer.copyTo(offset, writeBuffer, arrayOffset, length); - int end = arrayOffset + length; - for (int j = arrayOffset; j < end; j++) { - Assert.assertEquals(writeBuffer[j], readBuffer[j]); - } - } - } - - private void testReadWritePinotDataBuffer(PinotDataBuffer buffer) { - testReadWritePinotDataBuffer(buffer, PinotByteBuffer.allocateDirect(MAX_BYTES_LENGTH, PinotDataBuffer.NATIVE_ORDER), - PinotByteBuffer.allocateDirect(MAX_BYTES_LENGTH, PinotDataBuffer.NON_NATIVE_ORDER)); - testReadWritePinotDataBuffer(buffer, - PinotByteBuffer.allocateDirect(2 * MAX_BYTES_LENGTH, PinotDataBuffer.NON_NATIVE_ORDER) - .view(MAX_BYTES_LENGTH, 2 * MAX_BYTES_LENGTH), - PinotByteBuffer.allocateDirect(2 * MAX_BYTES_LENGTH, PinotDataBuffer.NATIVE_ORDER) - .view(MAX_BYTES_LENGTH, 2 * MAX_BYTES_LENGTH)); - testReadWritePinotDataBuffer(buffer, PinotNativeOrderLBuffer.allocateDirect(MAX_BYTES_LENGTH), - PinotNonNativeOrderLBuffer.allocateDirect(MAX_BYTES_LENGTH)); - testReadWritePinotDataBuffer(buffer, - PinotNonNativeOrderLBuffer.allocateDirect(2 * MAX_BYTES_LENGTH).view(MAX_BYTES_LENGTH, 2 * MAX_BYTES_LENGTH), - PinotNativeOrderLBuffer.allocateDirect(2 * MAX_BYTES_LENGTH).view(MAX_BYTES_LENGTH, 2 * MAX_BYTES_LENGTH)); - } - - private void testReadWritePinotDataBuffer(PinotDataBuffer buffer, PinotDataBuffer readBuffer, - PinotDataBuffer writeBuffer) { - for (int i = 0; i < NUM_ROUNDS; i++) { - int length = RANDOM.nextInt(MAX_BYTES_LENGTH); - int offset = RANDOM.nextInt(BUFFER_SIZE - length); - readBuffer.readFrom(0, _bytes, RANDOM.nextInt(BUFFER_SIZE - length), length); - readBuffer.copyTo(0, buffer, offset, length); - buffer.copyTo(offset, writeBuffer, 0, length); - for (int j = 0; j < length; j++) { - Assert.assertEquals(writeBuffer.getByte(j), readBuffer.getByte(j)); - } - } - } - - private void testReadFromByteBuffer(PinotDataBuffer buffer) { - byte[] readBuffer = new byte[MAX_BYTES_LENGTH]; - for (int i = 0; i < NUM_ROUNDS; i++) { - int length = RANDOM.nextInt(MAX_BYTES_LENGTH); - int offset = RANDOM.nextInt(BUFFER_SIZE - length); - System.arraycopy(_bytes, offset, readBuffer, 0, length); - buffer.readFrom(offset, ByteBuffer.wrap(readBuffer, 0, length)); - for (int j = 0; j < length; j++) { - Assert.assertEquals(buffer.getByte(offset + j), readBuffer[j]); - } - } - } - - private void testConcurrentReadWrite(PinotDataBuffer buffer) - throws Exception { - Future[] futures = new Future[NUM_ROUNDS]; - for (int i = 0; i < NUM_ROUNDS; i++) { - futures[i] = EXECUTOR_SERVICE.submit(() -> { - int length = RANDOM.nextInt(MAX_BYTES_LENGTH); - int offset = RANDOM.nextInt(BUFFER_SIZE - length); - byte[] readBuffer = new byte[length]; - byte[] writeBuffer = new byte[length]; - System.arraycopy(_bytes, offset, readBuffer, 0, length); - buffer.readFrom(offset, readBuffer); - buffer.copyTo(offset, writeBuffer); - Assert.assertTrue(Arrays.equals(readBuffer, writeBuffer)); - buffer.readFrom(offset, ByteBuffer.wrap(readBuffer)); - buffer.copyTo(offset, writeBuffer); - Assert.assertTrue(Arrays.equals(readBuffer, writeBuffer)); - }); - } - for (int i = 0; i < NUM_ROUNDS; i++) { - futures[i].get(); - } - } - @Test public void testPinotByteBufferReadWriteFile() throws Exception { @@ -510,46 +260,6 @@ private void testByteBuffer(ByteBuffer byteBuffer) { } } - @SuppressWarnings("RedundantExplicitClose") - @Test - public void testMultipleClose() - throws Exception { - try (PinotDataBuffer buffer = PinotByteBuffer.allocateDirect(BUFFER_SIZE, PinotDataBuffer.NATIVE_ORDER)) { - buffer.close(); - } - try (PinotDataBuffer buffer = PinotNativeOrderLBuffer.allocateDirect(BUFFER_SIZE)) { - buffer.close(); - } - try (PinotDataBuffer buffer = PinotNonNativeOrderLBuffer.allocateDirect(BUFFER_SIZE)) { - buffer.close(); - } - try (RandomAccessFile randomAccessFile = new RandomAccessFile(TEMP_FILE, "rw")) { - randomAccessFile.setLength(FILE_OFFSET + BUFFER_SIZE); - try (PinotDataBuffer buffer = PinotByteBuffer - .loadFile(TEMP_FILE, FILE_OFFSET, BUFFER_SIZE, PinotDataBuffer.NATIVE_ORDER)) { - buffer.close(); - } - try (PinotDataBuffer buffer = PinotNativeOrderLBuffer.loadFile(TEMP_FILE, FILE_OFFSET, BUFFER_SIZE)) { - buffer.close(); - } - try (PinotDataBuffer buffer = PinotNonNativeOrderLBuffer.loadFile(TEMP_FILE, FILE_OFFSET, BUFFER_SIZE)) { - buffer.close(); - } - try (PinotDataBuffer buffer = PinotByteBuffer - .mapFile(TEMP_FILE, true, FILE_OFFSET, BUFFER_SIZE, PinotDataBuffer.NATIVE_ORDER)) { - buffer.close(); - } - try (PinotDataBuffer buffer = PinotNativeOrderLBuffer.mapFile(TEMP_FILE, true, FILE_OFFSET, BUFFER_SIZE)) { - buffer.close(); - } - try (PinotDataBuffer buffer = PinotNonNativeOrderLBuffer.mapFile(TEMP_FILE, true, FILE_OFFSET, BUFFER_SIZE)) { - buffer.close(); - } - } finally { - FileUtils.forceDelete(TEMP_FILE); - } - } - @Test public void testConstructors() throws Exception { @@ -580,15 +290,15 @@ public void testConstructors() randomAccessFile.setLength(FILE_OFFSET + LARGE_BUFFER_SIZE); try (PinotDataBuffer buffer1 = PinotDataBuffer .allocateDirect(LARGE_BUFFER_SIZE, PinotDataBuffer.NATIVE_ORDER, null)) { - Assert.assertTrue(buffer1 instanceof PinotNativeOrderLBuffer); + Assert.assertSame(buffer1.order(), PinotDataBuffer.NATIVE_ORDER); testBufferStats(1, LARGE_BUFFER_SIZE, 0, 0); try (PinotDataBuffer buffer2 = PinotDataBuffer .loadFile(TEMP_FILE, FILE_OFFSET, LARGE_BUFFER_SIZE, PinotDataBuffer.NATIVE_ORDER, null)) { - Assert.assertTrue(buffer2 instanceof PinotNativeOrderLBuffer); + Assert.assertSame(buffer2.order(), PinotDataBuffer.NATIVE_ORDER); testBufferStats(2, 2 * LARGE_BUFFER_SIZE, 0, 0); try (PinotDataBuffer buffer3 = PinotDataBuffer .mapFile(TEMP_FILE, true, FILE_OFFSET, LARGE_BUFFER_SIZE, PinotDataBuffer.NATIVE_ORDER, null)) { - Assert.assertTrue(buffer3 instanceof PinotNativeOrderLBuffer); + Assert.assertSame(buffer3.order(), PinotDataBuffer.NATIVE_ORDER); testBufferStats(2, 2 * LARGE_BUFFER_SIZE, 1, LARGE_BUFFER_SIZE); } testBufferStats(2, 2 * LARGE_BUFFER_SIZE, 0, 0); @@ -603,15 +313,15 @@ public void testConstructors() randomAccessFile.setLength(FILE_OFFSET + LARGE_BUFFER_SIZE); try (PinotDataBuffer buffer1 = PinotDataBuffer .allocateDirect(LARGE_BUFFER_SIZE, PinotDataBuffer.NON_NATIVE_ORDER, null)) { - //Assert.assertTrue(buffer1 instanceof PinotNonNativeOrderLBuffer); + Assert.assertSame(buffer1.order(), PinotDataBuffer.NON_NATIVE_ORDER); testBufferStats(1, LARGE_BUFFER_SIZE, 0, 0); try (PinotDataBuffer buffer2 = PinotDataBuffer .loadFile(TEMP_FILE, FILE_OFFSET, LARGE_BUFFER_SIZE, PinotDataBuffer.NON_NATIVE_ORDER, null)) { - //Assert.assertTrue(buffer2 instanceof PinotNonNativeOrderLBuffer); + Assert.assertSame(buffer2.order(), PinotDataBuffer.NON_NATIVE_ORDER); testBufferStats(2, 2 * LARGE_BUFFER_SIZE, 0, 0); try (PinotDataBuffer buffer3 = PinotDataBuffer .mapFile(TEMP_FILE, true, FILE_OFFSET, LARGE_BUFFER_SIZE, PinotDataBuffer.NON_NATIVE_ORDER, null)) { -// Assert.assertTrue(buffer3 instanceof PinotNonNativeOrderLBuffer); + Assert.assertSame(buffer3.order(), PinotDataBuffer.NON_NATIVE_ORDER); testBufferStats(2, 2 * LARGE_BUFFER_SIZE, 1, LARGE_BUFFER_SIZE); } testBufferStats(2, 2 * LARGE_BUFFER_SIZE, 0, 0); @@ -623,19 +333,4 @@ public void testConstructors() FileUtils.forceDelete(TEMP_FILE); } } - - private void testBufferStats(int directBufferCount, long directBufferUsage, int mmapBufferCount, - long mmapBufferUsage) { - Assert.assertEquals(PinotDataBuffer.getAllocationFailureCount(), 0); - Assert.assertEquals(PinotDataBuffer.getDirectBufferCount(), directBufferCount); - Assert.assertEquals(PinotDataBuffer.getDirectBufferUsage(), directBufferUsage); - Assert.assertEquals(PinotDataBuffer.getMmapBufferCount(), mmapBufferCount); - Assert.assertEquals(PinotDataBuffer.getMmapBufferUsage(), mmapBufferUsage); - Assert.assertEquals(PinotDataBuffer.getBufferInfo().size(), directBufferCount + mmapBufferCount); - } - - @AfterClass - public void tearDown() { - EXECUTOR_SERVICE.shutdown(); - } } diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferTestBase.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferTestBase.java new file mode 100644 index 000000000000..0a3f40126755 --- /dev/null +++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferTestBase.java @@ -0,0 +1,88 @@ +package org.apache.pinot.segment.spi.memory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.commons.io.FileUtils; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeClass; + + +public class PinotDataBufferTestBase { + + protected static final Random RANDOM = new Random(); + protected static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(10); + protected static final File TEMP_FILE = new File(FileUtils.getTempDirectory(), "PinotDataBufferTest"); + protected static final int FILE_OFFSET = 10; // Not page-aligned + protected static final int BUFFER_SIZE = 10_000; // Not page-aligned + protected static final int CHAR_ARRAY_LENGTH = BUFFER_SIZE / Character.BYTES; + protected static final int SHORT_ARRAY_LENGTH = BUFFER_SIZE / Short.BYTES; + protected static final int INT_ARRAY_LENGTH = BUFFER_SIZE / Integer.BYTES; + protected static final int LONG_ARRAY_LENGTH = BUFFER_SIZE / Long.BYTES; + protected static final int FLOAT_ARRAY_LENGTH = BUFFER_SIZE / Float.BYTES; + protected static final int DOUBLE_ARRAY_LENGTH = BUFFER_SIZE / Double.BYTES; + protected static final int NUM_ROUNDS = 1000; + protected static final int MAX_BYTES_LENGTH = 100; + protected static final long LARGE_BUFFER_SIZE = Integer.MAX_VALUE + 2L; // Not page-aligned + + protected byte[] _bytes = new byte[BUFFER_SIZE]; + protected char[] _chars = new char[CHAR_ARRAY_LENGTH]; + protected short[] _shorts = new short[SHORT_ARRAY_LENGTH]; + protected int[] _ints = new int[INT_ARRAY_LENGTH]; + protected long[] _longs = new long[LONG_ARRAY_LENGTH]; + protected float[] _floats = new float[FLOAT_ARRAY_LENGTH]; + protected double[] _doubles = new double[DOUBLE_ARRAY_LENGTH]; + + @BeforeClass + public void setUp() { + for (int i = 0; i < BUFFER_SIZE; i++) { + _bytes[i] = (byte) RANDOM.nextInt(); + } + for (int i = 0; i < CHAR_ARRAY_LENGTH; i++) { + _chars[i] = (char) RANDOM.nextInt(); + } + for (int i = 0; i < SHORT_ARRAY_LENGTH; i++) { + _shorts[i] = (short) RANDOM.nextInt(); + } + for (int i = 0; i < INT_ARRAY_LENGTH; i++) { + _ints[i] = RANDOM.nextInt(); + } + for (int i = 0; i < LONG_ARRAY_LENGTH; i++) { + _longs[i] = RANDOM.nextLong(); + } + for (int i = 0; i < FLOAT_ARRAY_LENGTH; i++) { + _floats[i] = RANDOM.nextFloat(); + } + for (int i = 0; i < DOUBLE_ARRAY_LENGTH; i++) { + _doubles[i] = RANDOM.nextDouble(); + } + } + + protected void testBufferStats(int directBufferCount, long directBufferUsage, int mmapBufferCount, + long mmapBufferUsage) { + Assert.assertEquals(PinotDataBuffer.getAllocationFailureCount(), 0); + Assert.assertEquals(PinotDataBuffer.getDirectBufferCount(), directBufferCount); + Assert.assertEquals(PinotDataBuffer.getDirectBufferUsage(), directBufferUsage); + Assert.assertEquals(PinotDataBuffer.getMmapBufferCount(), mmapBufferCount); + Assert.assertEquals(PinotDataBuffer.getMmapBufferUsage(), mmapBufferUsage); + Assert.assertEquals(PinotDataBuffer.getBufferInfo().size(), directBufferCount + mmapBufferCount); + } + + @AfterTest + public void deleteFileIfExists() + throws IOException { + if (TEMP_FILE.exists()) { + FileUtils.forceDelete(TEMP_FILE); + } + } + + @AfterClass + public void tearDown() { + EXECUTOR_SERVICE.shutdown(); + } +} diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotLArrayByteBufferTest.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotLArrayByteBufferTest.java new file mode 100644 index 000000000000..efca7eeea380 --- /dev/null +++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotLArrayByteBufferTest.java @@ -0,0 +1,19 @@ +package org.apache.pinot.segment.spi.memory; + +import net.openhft.chronicle.core.Jvm; +import org.testng.SkipException; +import org.testng.annotations.BeforeClass; + + +public class PinotLArrayByteBufferTest extends PinotDataBufferInstanceTestBase { + public PinotLArrayByteBufferTest() { + super(new LArrayPinotBufferFactory()); + } + + @BeforeClass + public void abortOnModernJava() { + if (Jvm.majorVersion() > 11) { +// throw new SkipException("Skipping LArray tests because they cannot run in Java " + Jvm.majorVersion()); + } + } +} diff --git a/pom.xml b/pom.xml index 0a937a5abd6e..7275856101c7 100644 --- a/pom.xml +++ b/pom.xml @@ -109,7 +109,7 @@ ${basedir} dev - 11 + 17 UTF-8 true