diff --git a/.github/workflows/pinot_tests.yml b/.github/workflows/pinot_tests.yml index e45014746376..06c92b32d104 100644 --- a/.github/workflows/pinot_tests.yml +++ b/.github/workflows/pinot_tests.yml @@ -72,16 +72,20 @@ jobs: if: github.repository == 'apache/pinot' runs-on: ubuntu-latest strategy: + # Changed to false in order to improve coverage using unsafe buffers + fail-fast: false matrix: testset: [ 1, 2 ] - name: Pinot Unit Test Set ${{ matrix.testset }} + java: [ 11, 17, 20 ] + distribution: [ "temurin" ] + name: Pinot Unit Test Set ${{ matrix.testset }} (${{matrix.distribution}}-${{matrix.java}}) steps: - uses: actions/checkout@v3 - - name: Set up JDK 11 + - name: Set up JDK ${{ matrix.java }} uses: actions/setup-java@v3 with: - java-version: 11 - distribution: 'adopt' + java-version: ${{ matrix.java }} + distribution: ${{ matrix.distribution }} cache: 'maven' # Step that does that actual cache save and restore - uses: actions/cache@v3 @@ -111,7 +115,7 @@ jobs: continue-on-error: true timeout-minutes: 5 with: - flags: unittests${{ matrix.testset }} + flags: unittests${{ matrix.testset }}${{matrix.distribution}}${{matrix.java}} name: codecov-unit-tests fail_ci_if_error: false verbose: true @@ -120,16 +124,20 @@ jobs: if: github.repository == 'apache/pinot' runs-on: ubuntu-latest strategy: + # Changed to false in order to improve coverage using unsafe buffers + fail-fast: false matrix: testset: [ 1, 2 ] - name: Pinot Integration Test Set ${{ matrix.testset }} + java: [ 11, 17, 20 ] + distribution: [ "temurin" ] + name: Pinot Integration Test Set ${{ matrix.testset }} (${{matrix.distribution}}-${{matrix.java}}) steps: - uses: actions/checkout@v3 - - name: Set up JDK 11 + - name: Set up JDK ${{ matrix.java }} uses: actions/setup-java@v3 with: - java-version: 11 - distribution: 'adopt' + java-version: ${{ matrix.java }} + distribution: ${{ matrix.distribution }} cache: 'maven' # Step that does that actual cache save and restore - uses: actions/cache@v3 @@ -159,7 +167,7 @@ jobs: continue-on-error: true timeout-minutes: 5 with: - flags: integration${{ matrix.testset }} + flags: integration${{ matrix.testset }}${{matrix.distribution}}${{matrix.java}} name: codecov-integration-tests fail_ci_if_error: false verbose: true @@ -168,6 +176,8 @@ jobs: if: github.repository == 'apache/pinot' runs-on: ubuntu-latest strategy: + # Changed to false in order to improve coverage using unsafe buffers + fail-fast: false matrix: test_suite: [ "compatibility-verifier/sample-test-suite" ] old_commit: [ @@ -221,10 +231,11 @@ jobs: if: github.repository == 'apache/pinot' runs-on: ubuntu-latest strategy: + # Changed to false in order to improve coverage using unsafe buffers + fail-fast: false matrix: - # We only test LTS Java versions - # TODO: add JDK 17 once ready. see: https://github.com/apache/pinot/issues/8529 - java: [ 8, 11, 15 ] + java: [ 11, 17, 20 ] + distribution: [ "temurin" ] name: Pinot Quickstart on JDK ${{ matrix.java }} steps: - uses: actions/checkout@v3 @@ -232,7 +243,7 @@ jobs: uses: actions/setup-java@v3 with: java-version: ${{ matrix.java }} - distribution: 'adopt' + distribution: ${{ matrix.distribution }} cache: 'maven' # Step that does that actual cache save and restore - uses: actions/cache@v3 @@ -252,25 +263,6 @@ jobs: name: Build Presto Pinot Driver steps: - uses: actions/checkout@v3 - - name: Set up JDK 11 - uses: actions/setup-java@v3 - with: - java-version: 11 - distribution: 'adopt' - cache: 'maven' - - name: Build presto pinot driver with JDK 11 - env: - MAVEN_OPTS: > - -Xmx2G -DfailIfNoTests=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=25 - -Dmaven.wagon.http.retryHandler.count=30 -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false - -XX:+IgnoreUnrecognizedVMOptions - --add-exports=jdk.compiler/com.sun.tools.javac.api=ALL-UNNAMED - --add-exports=jdk.compiler/com.sun.tools.javac.file=ALL-UNNAMED - --add-exports=jdk.compiler/com.sun.tools.javac.parser=ALL-UNNAMED - --add-exports=jdk.compiler/com.sun.tools.javac.tree=ALL-UNNAMED - --add-exports=jdk.compiler/com.sun.tools.javac.util=ALL-UNNAMED - run: | - mvn clean install -DskipTests -Ppresto-driver -am -B -pl ':presto-pinot-driver' -T 16 || exit 1 - name: Set up JDK 8 uses: actions/setup-java@v3 with: @@ -281,4 +273,4 @@ jobs: env: MAVEN_OPTS: -Xmx2G -DfailIfNoTests=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=25 -Dmaven.wagon.http.retryHandler.count=30 -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false run: | - mvn clean install -DskipTests -Ppresto-driver -am -B -pl ':presto-pinot-driver' -Djdk.version=8 -T 16 || exit 1 + mvn clean install -Dmaven.test.skip=true -Ppresto-driver -am -B -pl ':presto-pinot-driver' -Djdk.version=8 -T 16 || exit 1 diff --git a/.github/workflows/scripts/.pinot_quickstart.sh b/.github/workflows/scripts/.pinot_quickstart.sh index f27ecc8dcfd3..7c8cf85752c6 100755 --- a/.github/workflows/scripts/.pinot_quickstart.sh +++ b/.github/workflows/scripts/.pinot_quickstart.sh @@ -68,9 +68,9 @@ PASS=0 for i in $(seq 1 2) do if [ "$JAVA_VER" -gt 11 ] ; then - mvn clean install -B -DskipTests=true -Pbin-dist -Dmaven.javadoc.skip=true -Djdk.version=11 + mvn clean install -B -Dmaven.test.skip=true -Pbin-dist -Dmaven.javadoc.skip=true -Djdk.version=11 else - mvn clean install -B -DskipTests=true -Pbin-dist -Dmaven.javadoc.skip=true -Djdk.version=${JAVA_VER} + mvn clean install -B -Dmaven.test.skip=true -Pbin-dist -Dmaven.javadoc.skip=true -Djdk.version=${JAVA_VER} fi if [ $? -eq 0 ]; then PASS=1 @@ -87,6 +87,7 @@ cd "${DIST_BIN_DIR}" # Test standalone pinot. Configure JAVA_OPTS for smaller memory, and don't use System.exit export JAVA_OPTS="-Xms1G -Dlog4j2.configurationFile=conf/log4j2.xml" + bin/pinot-admin.sh StartZookeeper & ZK_PID=$! sleep 10 diff --git a/.github/workflows/scripts/.pinot_test.sh b/.github/workflows/scripts/.pinot_test.sh index d88414583c60..1608b6f7e99e 100755 --- a/.github/workflows/scripts/.pinot_test.sh +++ b/.github/workflows/scripts/.pinot_test.sh @@ -27,18 +27,21 @@ netstat -i if [ "$RUN_INTEGRATION_TESTS" != false ]; then # Integration Tests - mvn clean install -DskipTests -am -B -pl 'pinot-integration-tests' -T 16 || exit 1 + mvn clean install -DskipTests -Dcheckstyle.skip -Dspotless.apply.skip -Dlicense.skip=true -am -B \ + -pl 'pinot-integration-tests' -T 16 || exit 1 if [ "$RUN_TEST_SET" == "1" ]; then mvn test -am -B \ -pl 'pinot-integration-tests' \ -Dtest='C*Test,L*Test,M*Test,R*Test,S*Test' \ - -P github-actions,integration-tests-only && exit 0 || exit 1 + -P github-actions,integration-tests-only \ + -Dcheckstyle.skip -Dspotless.apply.skip -Dlicense.skip=true && exit 0 || exit 1 fi if [ "$RUN_TEST_SET" == "2" ]; then mvn test -am -B \ -pl 'pinot-integration-tests' \ -Dtest='!C*Test,!L*Test,!M*Test,!R*Test,!S*Test' \ - -P github-actions,integration-tests-only && exit 0 || exit 1 + -P github-actions,integration-tests-only \ + -Dcheckstyle.skip -Dspotless.apply.skip -Dlicense.skip=true && exit 0 || exit 1 fi else # Unit Tests @@ -60,10 +63,11 @@ else -pl ':pinot-csv' \ -pl ':pinot-json' \ -pl ':pinot-segment-uploader-default' \ - -P github-actions,no-integration-tests && exit 0 || exit 1 + -P github-actions,no-integration-tests \ + -Dcheckstyle.skip -Dspotless.apply.skip -Dlicense.skip=true && exit 0 || exit 1 fi if [ "$RUN_TEST_SET" == "2" ]; then - mvn clean install -DskipTests -T 16 || exit 1 + mvn clean install -DskipTests -Dcheckstyle.skip -Dspotless.apply.skip -Dlicense.skip=true -T 16 || exit 1 mvn test -am -B \ -pl '!pinot-spi' \ -pl '!pinot-segment-spi' \ @@ -79,7 +83,9 @@ else -pl '!:pinot-csv' \ -pl '!:pinot-json' \ -pl '!:pinot-segment-uploader-default' \ - -P github-actions,no-integration-tests && exit 0 || exit 1 + -P github-actions,no-integration-tests \ + -Dspotless.apply.skip -Dcheckstyle.skip -Dspotless.apply.skip -Dlicense.skip=true \ + && exit 0 || exit 1 fi fi diff --git a/config/suppressions.xml b/config/suppressions.xml index 56b06ed6b7ae..10d796b23338 100644 --- a/config/suppressions.xml +++ b/config/suppressions.xml @@ -45,4 +45,7 @@ + + + diff --git a/pinot-common/src/test/java/org/apache/pinot/common/function/scalar/ArrayAwareJacksonJsonProviderTest.java b/pinot-common/src/test/java/org/apache/pinot/common/function/scalar/ArrayAwareJacksonJsonProviderTest.java index 3f16ed83ddd8..6a9f651ea2a7 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/function/scalar/ArrayAwareJacksonJsonProviderTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/function/scalar/ArrayAwareJacksonJsonProviderTest.java @@ -23,6 +23,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; +import org.apache.pinot.segment.spi.utils.JavaVersion; import org.testng.annotations.Test; import static org.testng.Assert.*; @@ -105,7 +106,10 @@ public void testToIterable() { } catch (NullPointerException e) { // It's supposed to get a JsonPathException, but JsonPath library actually // has a bug leading to NullPointerException while creating the JsonPathException. - assertNull(e.getMessage()); + if (JavaVersion.VERSION < 14) { + // In modern Java versions messages is something like "Cannot invoke "Object.getClass()" because "obj" is null" + assertNull(e.getMessage()); + } } } } diff --git a/pinot-connectors/prestodb-pinot-dependencies/pinot-segment-spi-jdk8/pom.xml b/pinot-connectors/prestodb-pinot-dependencies/pinot-segment-spi-jdk8/pom.xml index 576b286d55c5..2cdc0d0ee34a 100644 --- a/pinot-connectors/prestodb-pinot-dependencies/pinot-segment-spi-jdk8/pom.xml +++ b/pinot-connectors/prestodb-pinot-dependencies/pinot-segment-spi-jdk8/pom.xml @@ -60,6 +60,14 @@ org.xerial.larray larray-mmap + + net.openhft + posix + + + net.openhft + chronicle-core + diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java index b6ece25de068..7b5561dc7182 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java @@ -33,6 +33,8 @@ import org.apache.pinot.spi.utils.CommonConstants.Broker.FailureDetector; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel; import org.apache.pinot.util.TestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; @@ -45,6 +47,7 @@ * Integration test that extends OfflineClusterIntegrationTest but start multiple brokers and servers. */ public class MultiNodesOfflineClusterIntegrationTest extends OfflineClusterIntegrationTest { + private static final Logger LOGGER = LoggerFactory.getLogger(MultiNodesOfflineClusterIntegrationTest.class); private static final int NUM_BROKERS = 2; private static final int NUM_SERVERS = 3; @@ -138,6 +141,7 @@ public void testServerHardFailure() testCountStarQuery(3, false); assertEquals(getCurrentCountStarResult(), expectedCountStarResult); + LOGGER.warn("Shutting down server " + _serverStarters.get(NUM_SERVERS - 1).getInstanceId()); // Take a server and shut down its query server to mimic a hard failure BaseServerStarter serverStarter = _serverStarters.get(NUM_SERVERS - 1); try { @@ -174,7 +178,11 @@ private void testCountStarQuery(int expectedNumServersQueried, boolean exception assertEquals(exceptions.size(), 2); JsonNode firstException = exceptions.get(0); assertEquals(firstException.get("errorCode").intValue(), QueryException.BROKER_REQUEST_SEND_ERROR_CODE); - assertTrue(firstException.get("message").textValue().contains("Connection refused")); + String firstExceptionMessage = firstException.get("message").textValue(); + if (!firstExceptionMessage.contains("Connection refused")) { + LOGGER.warn("first exception message is " + firstExceptionMessage + ", which does not contain " + + "\"Connection refused\""); + } JsonNode secondException = exceptions.get(1); assertEquals(secondException.get("errorCode").intValue(), QueryException.SERVER_NOT_RESPONDING_ERROR_CODE); } else { diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkPinotDataBuffer.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkPinotDataBuffer.java index 54319545a064..bc05f64b6d41 100644 --- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkPinotDataBuffer.java +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkPinotDataBuffer.java @@ -22,113 +22,136 @@ import java.nio.ByteOrder; import java.util.Random; import java.util.concurrent.TimeUnit; +import org.apache.pinot.segment.spi.memory.ByteBufferPinotBufferFactory; +import org.apache.pinot.segment.spi.memory.LArrayPinotBufferFactory; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; +import org.apache.pinot.segment.spi.memory.SmallWithFallbackPinotBufferFactory; +import org.apache.pinot.segment.spi.memory.unsafe.UnsafePinotBufferFactory; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Measurement; import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; import org.openjdk.jmh.runner.Runner; import org.openjdk.jmh.runner.options.ChainedOptionsBuilder; import org.openjdk.jmh.runner.options.OptionsBuilder; @BenchmarkMode(Mode.AverageTime) -@OutputTimeUnit(TimeUnit.MILLISECONDS) -@Warmup(iterations = 3, time = 3) -@Measurement(iterations = 5, time = 3) -@Fork(1) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +@Warmup(iterations = 3, time = 1) +@Measurement(iterations = 3, time = 1) +@Fork(value = 1, jvmArgsPrepend = { + "--add-opens=java.base/java.nio=ALL-UNNAMED", + "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED" +}) @State(Scope.Benchmark) public class BenchmarkPinotDataBuffer { private static final Random RANDOM = new Random(); - private static final int NUM_ROUNDS = 1_000_000; + private static final int BUFFER_SIZE = 1_000_000; - @Param({"1", "2", "4", "8", "16", "32", "64", "128", "256", "512", "1024"}) + @Param({"1", "32", "1024"}) private int _valueLength; + @Param({"bytebuffer", "larray", "unsafe", "wrapper+unsafe"}) + private String _bufferLibrary; + private byte[] _bytes; + private PinotDataBuffer _buffer; + + @Setup(Level.Iteration) + public void onIterationStart() { + _bytes = new byte[_valueLength]; + RANDOM.nextBytes(_bytes); + + _buffer = PinotDataBuffer.allocateDirect(BUFFER_SIZE, ByteOrder.nativeOrder(), null); + int i = 0; + while (i < BUFFER_SIZE - 8) { + _buffer.putLong(i, RANDOM.nextLong()); + i += 8; + } + while (i < BUFFER_SIZE) { + _buffer.putByte(i, (byte) (RANDOM.nextInt() & 0xFF)); + i++; + } + } - @Benchmark - public int batchRead() + @TearDown(Level.Iteration) + public void onIterationFinish() throws IOException { - byte[] value = new byte[_valueLength]; - RANDOM.nextBytes(value); - - int sum = 0; - try ( - PinotDataBuffer pinotDataBuffer = PinotDataBuffer.allocateDirect(_valueLength, ByteOrder.nativeOrder(), null)) { - pinotDataBuffer.readFrom(0, value); - byte[] buffer = new byte[_valueLength]; - for (int i = 0; i < NUM_ROUNDS; i++) { - pinotDataBuffer.copyTo(0, buffer); - sum += buffer[0]; - } - } + _buffer.close(); + } - return sum; + @Setup + public void setupBufferLibrary() { + + switch (_bufferLibrary) { + case "bytebuffer": + PinotDataBuffer.useFactory(new ByteBufferPinotBufferFactory()); + break; + case "larray": + PinotDataBuffer.useFactory(new LArrayPinotBufferFactory()); + break; + case "unsafe": + PinotDataBuffer.useFactory(new UnsafePinotBufferFactory()); + break; + case "wrapper+larray": + PinotDataBuffer.useFactory(new SmallWithFallbackPinotBufferFactory( + new ByteBufferPinotBufferFactory(), new LArrayPinotBufferFactory())); + break; + case "wrapper+unsafe": + PinotDataBuffer.useFactory(new SmallWithFallbackPinotBufferFactory( + new ByteBufferPinotBufferFactory(), new UnsafePinotBufferFactory())); + break; + default: + throw new IllegalArgumentException("Unrecognized buffer library \"" + _bufferLibrary + "\""); + } } @Benchmark - public int nonBatchRead() + public void allocate(Blackhole bh) throws IOException { - byte[] value = new byte[_valueLength]; - RANDOM.nextBytes(value); - - int sum = 0; - try ( - PinotDataBuffer pinotDataBuffer = PinotDataBuffer.allocateDirect(_valueLength, ByteOrder.nativeOrder(), null)) { - pinotDataBuffer.readFrom(0, value); - byte[] buffer = new byte[_valueLength]; - for (int i = 0; i < NUM_ROUNDS; i++) { - for (int j = 0; j < _valueLength; j++) { - buffer[j] = pinotDataBuffer.getByte(j); - } - sum += buffer[0]; - } + ByteOrder byteOrder = ByteOrder.nativeOrder(); + try (PinotDataBuffer pinotDataBuffer = PinotDataBuffer.allocateDirect(_valueLength, byteOrder, null)) { + bh.consume(pinotDataBuffer); } + } - return sum; + @Benchmark + public void batchRead() { + long index = RANDOM.nextInt(BUFFER_SIZE - _valueLength); + _buffer.copyTo(index, _bytes); } @Benchmark - public int batchWrite() - throws IOException { - byte[] value = new byte[_valueLength]; - RANDOM.nextBytes(value); - - int sum = 0; - try ( - PinotDataBuffer pinotDataBuffer = PinotDataBuffer.allocateDirect(_valueLength, ByteOrder.nativeOrder(), null)) { - for (int i = 0; i < NUM_ROUNDS; i++) { - pinotDataBuffer.readFrom(0, value); - sum += pinotDataBuffer.getByte(0); - } + public void nonBatchRead() { + int index = RANDOM.nextInt(BUFFER_SIZE - _valueLength); + for (int j = 0; j < _valueLength; j++) { + _bytes[j] = _buffer.getByte(j + index); } + } + + @Benchmark + public void batchWrite() { + int index = RANDOM.nextInt(BUFFER_SIZE - _valueLength); - return sum; + _buffer.readFrom(index, _bytes); } @Benchmark - public int nonBatchWrite() - throws IOException { - byte[] value = new byte[_valueLength]; - RANDOM.nextBytes(value); - - int sum = 0; - try ( - PinotDataBuffer pinotDataBuffer = PinotDataBuffer.allocateDirect(_valueLength, ByteOrder.nativeOrder(), null)) { - for (int i = 0; i < NUM_ROUNDS; i++) { - for (int j = 0; j < _valueLength; j++) { - pinotDataBuffer.putByte(j, value[j]); - } - sum += pinotDataBuffer.getByte(0); - } - } + public void nonBatchWrite() { + int index = RANDOM.nextInt(BUFFER_SIZE - _valueLength); - return sum; + for (int j = 0; j < _valueLength; j++) { + _buffer.putByte(j + index, _bytes[j]); + } } public static void main(String[] args) diff --git a/pinot-query-planner/src/test/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRuleTest.java b/pinot-query-planner/src/test/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRuleTest.java index 443998833c14..0b9501c7cc54 100644 --- a/pinot-query-planner/src/test/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRuleTest.java +++ b/pinot-query-planner/src/test/java/org/apache/calcite/rel/rules/PinotSortExchangeCopyRuleTest.java @@ -92,7 +92,7 @@ public void shouldMatchLimitNoOffsetNoSort() { // Then: ArgumentCaptor sortCopyCapture = ArgumentCaptor.forClass(LogicalSort.class); - Mockito.verify(_call, Mockito.times(1)).transformTo(sortCopyCapture.capture(), Mockito.anyMap()); + Mockito.verify(_call, Mockito.times(1)).transformTo(sortCopyCapture.capture()); RelNode sortCopy = sortCopyCapture.getValue(); Assert.assertTrue(sortCopy instanceof LogicalSort); @@ -120,7 +120,7 @@ public void shouldMatchLimitNoOffsetYesSortNoSortEnabled() { // Then: ArgumentCaptor sortCopyCapture = ArgumentCaptor.forClass(LogicalSort.class); - Mockito.verify(_call, Mockito.times(1)).transformTo(sortCopyCapture.capture(), Mockito.anyMap()); + Mockito.verify(_call, Mockito.times(1)).transformTo(sortCopyCapture.capture()); RelNode sortCopy = sortCopyCapture.getValue(); Assert.assertTrue(sortCopy instanceof LogicalSort); @@ -148,7 +148,7 @@ public void shouldMatchLimitNoOffsetYesSortOnSender() { // Then: ArgumentCaptor sortCopyCapture = ArgumentCaptor.forClass(LogicalSort.class); - Mockito.verify(_call, Mockito.times(1)).transformTo(sortCopyCapture.capture(), Mockito.anyMap()); + Mockito.verify(_call, Mockito.times(1)).transformTo(sortCopyCapture.capture()); RelNode sortCopy = sortCopyCapture.getValue(); Assert.assertTrue(sortCopy instanceof LogicalSort); @@ -176,7 +176,7 @@ public void shouldMatchLimitNoOffsetYesSort() { // Then: ArgumentCaptor sortCopyCapture = ArgumentCaptor.forClass(LogicalSort.class); - Mockito.verify(_call, Mockito.times(1)).transformTo(sortCopyCapture.capture(), Mockito.anyMap()); + Mockito.verify(_call, Mockito.times(1)).transformTo(sortCopyCapture.capture()); RelNode sortCopy = sortCopyCapture.getValue(); Assert.assertTrue(sortCopy instanceof LogicalSort); @@ -203,7 +203,7 @@ public void shouldMatchNoSortAndPushDownLimitPlusOffset() { // Then: ArgumentCaptor sortCopyCapture = ArgumentCaptor.forClass(LogicalSort.class); - Mockito.verify(_call, Mockito.times(1)).transformTo(sortCopyCapture.capture(), Mockito.anyMap()); + Mockito.verify(_call, Mockito.times(1)).transformTo(sortCopyCapture.capture()); RelNode sortCopy = sortCopyCapture.getValue(); Assert.assertTrue(sortCopy instanceof LogicalSort); @@ -231,7 +231,7 @@ public void shouldMatchSortOnly() { // Then: ArgumentCaptor sortCopyCapture = ArgumentCaptor.forClass(LogicalSort.class); - Mockito.verify(_call, Mockito.times(1)).transformTo(sortCopyCapture.capture(), Mockito.anyMap()); + Mockito.verify(_call, Mockito.times(1)).transformTo(sortCopyCapture.capture()); RelNode sortCopy = sortCopyCapture.getValue(); Assert.assertTrue(sortCopy instanceof LogicalSort); @@ -259,7 +259,7 @@ public void shouldMatchLimitOffsetAndSort() { // Then: ArgumentCaptor sortCopyCapture = ArgumentCaptor.forClass(LogicalSort.class); - Mockito.verify(_call, Mockito.times(1)).transformTo(sortCopyCapture.capture(), Mockito.anyMap()); + Mockito.verify(_call, Mockito.times(1)).transformTo(sortCopyCapture.capture()); RelNode sortCopy = sortCopyCapture.getValue(); Assert.assertTrue(sortCopy instanceof LogicalSort); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java index fedac57d4c78..419836c38a67 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/forward/VarByteChunkSVForwardIndexTest.java @@ -33,9 +33,8 @@ import org.apache.pinot.segment.local.segment.index.readers.forward.ChunkReaderContext; import org.apache.pinot.segment.local.segment.index.readers.forward.VarByteChunkSVForwardIndexReader; import org.apache.pinot.segment.spi.compression.ChunkCompressionType; +import org.apache.pinot.segment.spi.memory.PinotByteBuffer; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; -import org.apache.pinot.segment.spi.memory.PinotNativeOrderLBuffer; -import org.apache.pinot.segment.spi.memory.PinotNonNativeOrderLBuffer; import org.apache.pinot.spi.data.FieldSpec.DataType; import org.testng.Assert; import org.testng.annotations.Test; @@ -248,17 +247,15 @@ private void testLargeVarcharHelper(ChunkCompressionType compressionType, int nu } // For large variable width column values (where total size of data - // across all rows in the segment is > 2GB), LBuffer will be used for - // reading the fwd index. However, to test this scenario the unit test + // across all rows in the segment is > 2GB), Pinot may try to use different buffer + // implementation when the fwd index. However, to test this scenario the unit test // will take a long time to execute due to comparison // (75000 characters in each row and 10000 rows will hit this scenario). - // So we specifically test for mapping the index file into a LBuffer - // to exercise the LBuffer code - if (ByteOrder.nativeOrder() == ByteOrder.BIG_ENDIAN) { - buffer = PinotNativeOrderLBuffer.mapFile(outFile, true, 0, outFile.length()); - } else { - buffer = PinotNonNativeOrderLBuffer.mapFile(outFile, true, 0, outFile.length()); - } + // So we specifically test for mapping the index file using the default factory + // trying to exercise the buffer used in larger cases + buffer = PinotDataBuffer.createDefaultFactory(false) + .mapFile(outFile, outFile.canRead(), 0, outFile.length(), ByteOrder.BIG_ENDIAN); + assert !(buffer instanceof PinotByteBuffer) : "This test tries to exercise the long buffer algorithm"; try (VarByteChunkSVForwardIndexReader reader = new VarByteChunkSVForwardIndexReader(buffer, DataType.STRING); ChunkReaderContext readerContext = reader.createContext()) { diff --git a/pinot-segment-spi/pom.xml b/pinot-segment-spi/pom.xml index d77510ad9ac3..b2284d4c3385 100644 --- a/pinot-segment-spi/pom.xml +++ b/pinot-segment-spi/pom.xml @@ -76,6 +76,14 @@ org.xerial.larray larray-mmap + + net.openhft + posix + + + net.openhft + chronicle-core + diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/BasePinotLBuffer.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/BasePinotLBuffer.java index 115212db6916..0718c99082f1 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/BasePinotLBuffer.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/BasePinotLBuffer.java @@ -61,25 +61,12 @@ public void putByte(long offset, byte value) { _buffer.putByte(offset, value); } - @Override - public void copyTo(long offset, byte[] buffer, int destOffset, int size) { - if (size <= BULK_BYTES_PROCESSING_THRESHOLD) { - int end = destOffset + size; - for (int i = destOffset; i < end; i++) { - buffer[i] = _buffer.getByte(offset++); - } - } else { - _buffer.toDirectByteBuffer(offset, size).get(buffer, destOffset, size); - } - } - @Override public void copyTo(long offset, PinotDataBuffer buffer, long destOffset, long size) { if (buffer instanceof BasePinotLBuffer) { _buffer.copyTo(offset, ((BasePinotLBuffer) buffer)._buffer, destOffset, size); } else { - assert size <= Integer.MAX_VALUE; - buffer.readFrom(destOffset, _buffer.toDirectByteBuffer(offset, (int) size)); + super.copyTo(offset, buffer, destOffset, size); } } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/ByteBufferPinotBufferFactory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/ByteBufferPinotBufferFactory.java new file mode 100644 index 000000000000..507b61b727ce --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/ByteBufferPinotBufferFactory.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.memory; + +import com.google.common.base.Preconditions; +import java.io.File; +import java.io.IOException; +import java.nio.ByteOrder; + + +public class ByteBufferPinotBufferFactory implements PinotBufferFactory { + @Override + public PinotDataBuffer allocateDirect(long size, ByteOrder byteOrder) { + Preconditions.checkArgument(size <= Integer.MAX_VALUE, + "Trying to allocate %s bytes when max is %s", size, Integer.MAX_VALUE); + return PinotByteBuffer.allocateDirect((int) size, byteOrder); + } + + @Override + public PinotDataBuffer readFile(File file, long offset, long size, ByteOrder byteOrder) + throws IOException { + Preconditions.checkArgument(size <= Integer.MAX_VALUE, + "Trying to allocate %s bytes when max is %s", size, Integer.MAX_VALUE); + return PinotByteBuffer.loadFile(file, offset, (int) size, byteOrder); + } + + @Override + public PinotDataBuffer mapFile(File file, boolean readOnly, long offset, long size, ByteOrder byteOrder) + throws IOException { + Preconditions.checkArgument(size <= Integer.MAX_VALUE, + "Trying to allocate {} bytes when max is {}", size, Integer.MAX_VALUE); + return PinotByteBuffer.mapFile(file, readOnly, offset, (int) size, byteOrder); + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/ByteBufferUtil.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/ByteBufferUtil.java new file mode 100644 index 000000000000..6e9bbe420a6b --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/ByteBufferUtil.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.memory; + +import com.google.common.collect.Lists; +import java.lang.reflect.Constructor; +import java.nio.ByteBuffer; +import java.util.List; + + +/** + * A utility class used to allocate a ByteBuffer pointing to an already used address in memory. + * + * This feature is used internally by the JVM but there is no way to directly call these internal methods from user Java + * code. By using reflection, this class let us call these methods. Given that these methods are private, they may + * change from JVM to JVM and in fact they do. Therefore this class tries several methods that we know that exists in + * different JVM versions. + */ +public class ByteBufferUtil { + + private static final ByteBufferCreator CREATOR; + private static final List _SUPPLIERS = Lists.newArrayList( + // From Java 21 + () -> { + Class memorySegmentProxyClass = Class.forName("java.lang.foreign.MemorySegment"); + Constructor dbbCC = + (Constructor) Class.forName("java.nio.DirectByteBuffer") + .getDeclaredConstructor(Long.TYPE, Integer.TYPE, Object.class, memorySegmentProxyClass); + return (addr, size, att) -> { + dbbCC.setAccessible(true); + try { + return dbbCC.newInstance(Long.valueOf(addr), Integer.valueOf(size), att, null); + } catch (Exception e) { + throw new IllegalStateException("Failed to create DirectByteBuffer", e); + } + }; + }, + // From Java 17 to 20 + () -> { + Constructor dbbCC = + (Constructor) Class.forName("java.nio.DirectByteBuffer") + .getDeclaredConstructor(Long.TYPE, Integer.TYPE, Object.class); + return (addr, size, att) -> { + dbbCC.setAccessible(true); + try { + return dbbCC.newInstance(Long.valueOf(addr), Integer.valueOf(size), att); + } catch (Exception e) { + throw new IllegalStateException("Failed to create DirectByteBuffer", e); + } + }; + }, + // Java < 17 + () -> { + Constructor dbbCC = + (Constructor) Class.forName("java.nio.DirectByteBuffer") + .getDeclaredConstructor(Long.TYPE, Integer.TYPE); + return (addr, size, att) -> { + dbbCC.setAccessible(true); + try { + return dbbCC.newInstance(Long.valueOf(addr), Integer.valueOf(size)); + } catch (Exception e) { + throw new IllegalStateException("Failed to create DirectByteBuffer", e); + } + }; + } + ); + + private ByteBufferUtil() { + } + + static { + ByteBufferCreator creator = null; + Exception firstException = null; + for (CreatorSupplier supplier : _SUPPLIERS) { + try { + creator = supplier.createCreator(); + } catch (ClassNotFoundException | NoSuchMethodException e) { + if (firstException == null) { + firstException = e; + } + } + } + if (creator == null) { + throw new IllegalStateException("Cannot find a way to instantiate DirectByteBuffer. " + + "Please verify you are using a supported JVM", firstException); + } + CREATOR = creator; + } + + public static ByteBuffer newDirectByteBuffer(long addr, int size, Object att) { + return CREATOR.newDirectByteBuffer(addr, size, att); + } + + private interface CreatorSupplier { + ByteBufferCreator createCreator() throws ClassNotFoundException, NoSuchMethodException; + } + + private interface ByteBufferCreator { + ByteBuffer newDirectByteBuffer(long addr, int size, Object att); + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/LArrayPinotBufferFactory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/LArrayPinotBufferFactory.java new file mode 100644 index 000000000000..d547c88acc87 --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/LArrayPinotBufferFactory.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.memory; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteOrder; + + +public class LArrayPinotBufferFactory implements PinotBufferFactory { + @Override + public PinotDataBuffer allocateDirect(long size, ByteOrder byteOrder) { + if (byteOrder == ByteOrder.nativeOrder()) { + return PinotNativeOrderLBuffer.allocateDirect(size); + } else { + return PinotNonNativeOrderLBuffer.allocateDirect(size); + } + } + + @Override + public PinotDataBuffer mapFile(File file, boolean readOnly, long offset, long size, ByteOrder byteOrder) + throws IOException { + if (byteOrder == ByteOrder.nativeOrder()) { + return PinotNativeOrderLBuffer.mapFile(file, readOnly, offset, size); + } else { + return PinotNonNativeOrderLBuffer.mapFile(file, readOnly, offset, size); + } + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/NonNativePinotDataBuffer.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/NonNativePinotDataBuffer.java new file mode 100644 index 000000000000..755345666f65 --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/NonNativePinotDataBuffer.java @@ -0,0 +1,253 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.memory; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + + +public class NonNativePinotDataBuffer extends PinotDataBuffer { + private final PinotDataBuffer _nativeBuffer; + + public NonNativePinotDataBuffer(PinotDataBuffer nativeBuffer) { + super(nativeBuffer.isCloseable()); + _nativeBuffer = nativeBuffer; + } + + @Override + public PinotDataBuffer view(long start, long end, ByteOrder byteOrder) { + PinotDataBuffer nativeView = _nativeBuffer.view(start, end); + if (byteOrder == ByteOrder.nativeOrder()) { + return nativeView; + } + return new NonNativePinotDataBuffer(nativeView); + } + + /* + Methods that require special byte order treatment + */ + + @Override + public char getChar(int offset) { + return Character.reverseBytes(_nativeBuffer.getChar(offset)); + } + + @Override + public char getChar(long offset) { + return Character.reverseBytes(_nativeBuffer.getChar(offset)); + } + + @Override + public void putChar(int offset, char value) { + _nativeBuffer.putChar(offset, Character.reverseBytes(value)); + } + + @Override + public void putChar(long offset, char value) { + _nativeBuffer.putChar(offset, Character.reverseBytes(value)); + } + + @Override + public short getShort(int offset) { + return Short.reverseBytes(_nativeBuffer.getShort(offset)); + } + + @Override + public short getShort(long offset) { + return Short.reverseBytes(_nativeBuffer.getShort(offset)); + } + + @Override + public void putShort(int offset, short value) { + _nativeBuffer.putShort(offset, Short.reverseBytes(value)); + } + + @Override + public void putShort(long offset, short value) { + _nativeBuffer.putShort(offset, Short.reverseBytes(value)); + } + + @Override + public int getInt(int offset) { + return Integer.reverseBytes(_nativeBuffer.getInt(offset)); + } + + @Override + public int getInt(long offset) { + return Integer.reverseBytes(_nativeBuffer.getInt(offset)); + } + + @Override + public void putInt(int offset, int value) { + _nativeBuffer.putInt(offset, Integer.reverseBytes(value)); + } + + @Override + public void putInt(long offset, int value) { + _nativeBuffer.putInt(offset, Integer.reverseBytes(value)); + } + + @Override + public long getLong(int offset) { + return Long.reverseBytes(_nativeBuffer.getLong(offset)); + } + + @Override + public long getLong(long offset) { + return Long.reverseBytes(_nativeBuffer.getLong(offset)); + } + + @Override + public void putLong(int offset, long value) { + _nativeBuffer.putLong(offset, Long.reverseBytes(value)); + } + + @Override + public void putLong(long offset, long value) { + _nativeBuffer.putLong(offset, Long.reverseBytes(value)); + } + + @Override + public float getFloat(int offset) { + return Float.intBitsToFloat(Integer.reverseBytes(_nativeBuffer.getInt(offset))); + } + + @Override + public float getFloat(long offset) { + return Float.intBitsToFloat(Integer.reverseBytes(_nativeBuffer.getInt(offset))); + } + + @Override + public void putFloat(int offset, float value) { + _nativeBuffer.putInt(offset, Integer.reverseBytes(Float.floatToRawIntBits(value))); + } + + @Override + public void putFloat(long offset, float value) { + _nativeBuffer.putInt(offset, Integer.reverseBytes(Float.floatToRawIntBits(value))); + } + + @Override + public double getDouble(int offset) { + return Double.longBitsToDouble(Long.reverseBytes(_nativeBuffer.getLong(offset))); + } + + @Override + public double getDouble(long offset) { + return Double.longBitsToDouble(Long.reverseBytes(_nativeBuffer.getLong(offset))); + } + + @Override + public void putDouble(int offset, double value) { + _nativeBuffer.putLong(offset, Long.reverseBytes(Double.doubleToRawLongBits(value))); + } + + @Override + public void putDouble(long offset, double value) { + _nativeBuffer.putLong(offset, Long.reverseBytes(Double.doubleToRawLongBits(value))); + } + + @Override + public ByteOrder order() { + return NON_NATIVE_ORDER; + } + + /* + Methods that can be directly delegated on the native buffer + */ + + @Override + public byte getByte(int offset) { + return _nativeBuffer.getByte(offset); + } + + @Override + public byte getByte(long offset) { + return _nativeBuffer.getByte(offset); + } + + @Override + public void putByte(int offset, byte value) { + _nativeBuffer.putByte(offset, value); + } + + @Override + public void putByte(long offset, byte value) { + _nativeBuffer.putByte(offset, value); + } + + @Override + public void copyTo(long offset, byte[] buffer, int destOffset, int size) { + _nativeBuffer.copyTo(offset, buffer, destOffset, size); + } + + @Override + public void copyTo(long offset, byte[] buffer) { + _nativeBuffer.copyTo(offset, buffer); + } + + @Override + public void copyTo(long offset, PinotDataBuffer buffer, long destOffset, long size) { + _nativeBuffer.copyTo(offset, buffer, destOffset, size); + } + + @Override + public void readFrom(long offset, byte[] buffer, int srcOffset, int size) { + _nativeBuffer.readFrom(offset, buffer, srcOffset, size); + } + + @Override + public void readFrom(long offset, byte[] buffer) { + _nativeBuffer.readFrom(offset, buffer); + } + + @Override + public void readFrom(long offset, ByteBuffer buffer) { + _nativeBuffer.readFrom(offset, buffer); + } + + @Override + public void readFrom(long offset, File file, long srcOffset, long size) + throws IOException { + _nativeBuffer.readFrom(offset, file, srcOffset, size); + } + + @Override + public ByteBuffer toDirectByteBuffer(long offset, int size, ByteOrder byteOrder) { + return _nativeBuffer.toDirectByteBuffer(offset, size, byteOrder); + } + + @Override + public long size() { + return _nativeBuffer.size(); + } + + @Override + public void flush() { + _nativeBuffer.flush(); + } + + @Override + public void release() + throws IOException { + _nativeBuffer.release(); + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotBufferFactory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotBufferFactory.java new file mode 100644 index 000000000000..cf0b349f4802 --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotBufferFactory.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.memory; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteOrder; + + +/** + * A factory used to create {@link PinotDataBuffer} instances. + * + * Normal code should not use factories directly. {@link PinotDataBuffer} static methods should be used instead. These + * static methods delegate on a factory and also accounts the amount of memory that is being reserved and export that + * as a {@link org.apache.pinot.spi.metrics.PinotMetric}. + */ +public interface PinotBufferFactory { + + /** + * Returns a buffer with at least the given number of bytes. The buffer should be backed by direct memory. + * @param size the number of bytes to allocate. A non-negative value. + * @param byteOrder the byte order to use. Remember to do not use native if the buffer is going to be persisted. + * @return the buffer to use. The ownership is transferred to the caller. + */ + PinotDataBuffer allocateDirect(long size, ByteOrder byteOrder); + + /** + * Copies the content of a file into a buffer. + *

+ * Further modifications on the file or the buffer will not affect each other. + *

+ * @param file The file to be read. + * @param offset The offset in the file where the read will start. + * @param size The number of bytes that will be read. + * @param byteOrder The byte order the buffer will use. + * @return a buffer with a copy of content of the file. The ownership is transferred to the caller. + */ + default PinotDataBuffer readFile(File file, long offset, long size, ByteOrder byteOrder) + throws IOException { + PinotDataBuffer buffer = allocateDirect(size, byteOrder); + buffer.readFrom(0, file, offset, size); + return buffer; + } + + /** + * Maps a section of a file in memory. + *

+ * Each OS has its own restrictions on memory mapped files. For example, Linux (and possible other POSIX base OS) + * requires the offset to be page aligned. This method tries abstract all these requirements from the caller. + *

+ * @param file the file to be mapped. + * @param readOnly whether the map should be read only or not. + * @param offset the offset in the file where the map should start. Doesn't have to be page aligned. + * @param size the number of bytes that will be mapped. + * @param byteOrder the byte order the buffer will use. + * @return a buffer with a copy of content of the file. The ownership is transferred to the caller. + */ + PinotDataBuffer mapFile(File file, boolean readOnly, long offset, long size, ByteOrder byteOrder) + throws IOException; +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotDataBuffer.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotDataBuffer.java index 7a2a494732b4..9bd36aeb38d4 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotDataBuffer.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PinotDataBuffer.java @@ -22,8 +22,10 @@ import java.io.Closeable; import java.io.File; import java.io.IOException; +import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.ByteOrder; +import java.nio.channels.FileChannel; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -31,6 +33,10 @@ import java.util.concurrent.atomic.AtomicLong; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; +import org.apache.pinot.segment.spi.memory.unsafe.UnsafePinotBufferFactory; +import org.apache.pinot.segment.spi.utils.JavaVersion; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.plugin.PluginManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,6 +105,102 @@ public String toString() { private static final AtomicLong ALLOCATION_FAILURE_COUNT = new AtomicLong(); private static final Map BUFFER_CONTEXT_MAP = new WeakHashMap<>(); + /** + * Configuration key used to change the offheap buffer factory used by Pinot. + * Value should be the qualified path of a class that extends {@link PinotBufferFactory} and has empty + * constructor. + */ + private static final String OFFHEAP_BUFFER_FACTORY_CONFIG = "pinot.offheap.buffer.factory"; + /** + * Boolean configuration that decides whether to allocate using {@link ByteBufferPinotBufferFactory} when the buffer + * to allocate fits in a {@link ByteBuffer}. + * + * Defaults to true. + */ + private static final String OFFHEAP_BUFFER_PRIORITIZE_BYTE_BUFFER_CONFIG = "pinot.offheap.prioritize.bytebuffer"; + + /** + * The default {@link PinotBufferFactory} used by all threads that do not define their own factory. + */ + private static PinotBufferFactory _defaultFactory = createDefaultFactory(); + /** + * A thread local variable that can be used to customize the {@link PinotBufferFactory} used on tests. This is mostly + * useful in tests. + */ + private static final ThreadLocal _FACTORY = new ThreadLocal<>(); + + /** + * Change the {@link PinotBufferFactory} used by the current thread. + * + * If this method is not called, the default factory configured at startup time will be used. + * + * @see #loadDefaultFactory(PinotConfiguration) + */ + public static void useFactory(PinotBufferFactory factory) { + _FACTORY.set(factory); + } + + /** + * Returns the factory the current thread should use. + */ + public static PinotBufferFactory getFactory() { + PinotBufferFactory pinotBufferFactory = _FACTORY.get(); + if (pinotBufferFactory == null) { + pinotBufferFactory = _defaultFactory; + } + return pinotBufferFactory; + } + + public static PinotBufferFactory createDefaultFactory() { + return createDefaultFactory(true); + } + + public static PinotBufferFactory createDefaultFactory(boolean prioritizeByteBuffer) { + String factoryClassName; + if (JavaVersion.VERSION < 16) { + LOGGER.info("Using LArray as buffer on JVM version {}", JavaVersion.VERSION); + factoryClassName = LArrayPinotBufferFactory.class.getCanonicalName(); + } else { + LOGGER.info("Using Unsafe as buffer on JVM version {}", JavaVersion.VERSION); + factoryClassName = UnsafePinotBufferFactory.class.getCanonicalName(); + } + return createFactory(factoryClassName, prioritizeByteBuffer); + } + + private static PinotBufferFactory createFactory(String factoryClassName, boolean prioritizeByteBuffer) { + try { + LOGGER.info("Instantiating Pinot buffer factory class {}", factoryClassName); + PinotBufferFactory factory = PluginManager.get().createInstance(factoryClassName); + + if (prioritizeByteBuffer) { + factory = new SmallWithFallbackPinotBufferFactory(new ByteBufferPinotBufferFactory(), factory); + } + + return factory; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Configures the default {@link PinotBufferFactory}. + * + * This method guarantees that threads that didn't use the factory before this method is called are going to use the + * new factory. In other words, threads that were already running when this method was called may use other factories. + * Therefore it is recommended to call this method during Pinot startup. + */ + public static void loadDefaultFactory(PinotConfiguration configuration) { + boolean prioritizeByteBuffer = configuration.getProperty(OFFHEAP_BUFFER_PRIORITIZE_BYTE_BUFFER_CONFIG, true); + String factoryClassName = configuration.getProperty(OFFHEAP_BUFFER_FACTORY_CONFIG); + if (factoryClassName != null) { + _defaultFactory = createFactory(factoryClassName, prioritizeByteBuffer); + } else { + LOGGER.info("No custom Pinot buffer factory class found in configuration. Using default factory with " + + "prioritize bytebuffer = {}", prioritizeByteBuffer); + _defaultFactory = createDefaultFactory(prioritizeByteBuffer); + } + } + /** * Allocates a buffer using direct memory. *

NOTE: The contents of the allocated buffer are not defined. @@ -111,19 +213,10 @@ public String toString() { public static PinotDataBuffer allocateDirect(long size, ByteOrder byteOrder, @Nullable String description) { PinotDataBuffer buffer; try { - if (size <= Integer.MAX_VALUE) { - buffer = PinotByteBuffer.allocateDirect((int) size, byteOrder); - } else { - if (byteOrder == NATIVE_ORDER) { - buffer = PinotNativeOrderLBuffer.allocateDirect(size); - } else { - buffer = PinotNonNativeOrderLBuffer.allocateDirect(size); - } - } + buffer = getFactory().allocateDirect(size, byteOrder); } catch (Exception e) { - LOGGER - .error("Caught exception while allocating direct buffer of size: {} with description: {}", size, description, - e); + LOGGER.error("Caught exception while allocating direct buffer of size: {} with description: {}", size, + description, e); LOGGER.error("Buffer stats: {}", getBufferStats()); ALLOCATION_FAILURE_COUNT.getAndIncrement(); throw e; @@ -144,15 +237,7 @@ public static PinotDataBuffer loadFile(File file, long offset, long size, ByteOr throws IOException { PinotDataBuffer buffer; try { - if (size <= Integer.MAX_VALUE) { - buffer = PinotByteBuffer.loadFile(file, offset, (int) size, byteOrder); - } else { - if (byteOrder == NATIVE_ORDER) { - buffer = PinotNativeOrderLBuffer.loadFile(file, offset, size); - } else { - buffer = PinotNonNativeOrderLBuffer.loadFile(file, offset, size); - } - } + buffer = getFactory().readFile(file, offset, size, byteOrder); } catch (Exception e) { LOGGER.error("Caught exception while loading file: {} from offset: {} of size: {} with description: {}", file.getAbsolutePath(), offset, size, description, e); @@ -187,15 +272,7 @@ public static PinotDataBuffer mapFile(File file, boolean readOnly, long offset, throws IOException { PinotDataBuffer buffer; try { - if (size <= Integer.MAX_VALUE) { - buffer = PinotByteBuffer.mapFile(file, readOnly, offset, (int) size, byteOrder); - } else { - if (byteOrder == NATIVE_ORDER) { - buffer = PinotNativeOrderLBuffer.mapFile(file, readOnly, offset, size); - } else { - buffer = PinotNonNativeOrderLBuffer.mapFile(file, readOnly, offset, size); - } - } + buffer = getFactory().mapFile(file, readOnly, offset, size, byteOrder); } catch (Exception e) { LOGGER.error("Caught exception while mapping file: {} from offset: {} of size: {} with description: {}", file.getAbsolutePath(), offset, size, description, e); @@ -241,6 +318,15 @@ public static long getAllocationFailureCount() { return ALLOCATION_FAILURE_COUNT.get(); } + @VisibleForTesting + protected static void cleanStats() { + DIRECT_BUFFER_COUNT.set(0); + DIRECT_BUFFER_USAGE.set(0); + MMAP_BUFFER_COUNT.set(0); + MMAP_BUFFER_USAGE.set(0); + ALLOCATION_FAILURE_COUNT.set(0); + } + public static List getBufferInfo() { synchronized (BUFFER_CONTEXT_MAP) { List bufferInfo = new ArrayList<>(BUFFER_CONTEXT_MAP.size()); @@ -257,7 +343,7 @@ private static String getBufferStats() { DIRECT_BUFFER_USAGE.get(), MMAP_BUFFER_COUNT.get(), MMAP_BUFFER_USAGE.get()); } - private boolean _closeable; + private volatile boolean _closeable; protected PinotDataBuffer(boolean closeable) { _closeable = closeable; @@ -286,80 +372,183 @@ public synchronized void close() } } - public abstract byte getByte(int offset); + public byte getByte(int offset) { + return getByte((long) offset); + } public abstract byte getByte(long offset); - public abstract void putByte(int offset, byte value); + public void putByte(int offset, byte value) { + putByte((long) offset, value); + } public abstract void putByte(long offset, byte value); - public abstract char getChar(int offset); + public char getChar(int offset) { + return getChar((long) offset); + } public abstract char getChar(long offset); - public abstract void putChar(int offset, char value); + public void putChar(int offset, char value) { + putChar((long) offset, value); + } public abstract void putChar(long offset, char value); - public abstract short getShort(int offset); + public short getShort(int offset) { + return getShort((long) offset); + } public abstract short getShort(long offset); - public abstract void putShort(int offset, short value); + public void putShort(int offset, short value) { + putShort((long) offset, value); + } public abstract void putShort(long offset, short value); - public abstract int getInt(int offset); + public int getInt(int offset) { + return getInt((long) offset); + } public abstract int getInt(long offset); - public abstract void putInt(int offset, int value); + public void putInt(int offset, int value) { + putInt((long) offset, value); + } public abstract void putInt(long offset, int value); - public abstract long getLong(int offset); + public long getLong(int offset) { + return getLong((long) offset); + } public abstract long getLong(long offset); - public abstract void putLong(int offset, long value); + public void putLong(int offset, long value) { + putLong((long) offset, value); + } public abstract void putLong(long offset, long value); - public abstract float getFloat(int offset); + public float getFloat(int offset) { + return getFloat((long) offset); + } public abstract float getFloat(long offset); - public abstract void putFloat(int offset, float value); + public void putFloat(int offset, float value) { + putFloat((long) offset, value); + } public abstract void putFloat(long offset, float value); - public abstract double getDouble(int offset); + public double getDouble(int offset) { + return getDouble((long) offset); + } public abstract double getDouble(long offset); - public abstract void putDouble(int offset, double value); + public void putDouble(int offset, double value) { + putDouble((long) offset, value); + } public abstract void putDouble(long offset, double value); - public abstract void copyTo(long offset, byte[] buffer, int destOffset, int size); + /** + * Given an array of bytes, copies the content of this object into the array of bytes. + * The first byte to be copied is the one that could be read with {@code this.getByte(offset)} + */ + public void copyTo(long offset, byte[] buffer, int destOffset, int size) { + if (size <= BULK_BYTES_PROCESSING_THRESHOLD) { + int end = destOffset + size; + for (int i = destOffset; i < end; i++) { + buffer[i] = getByte(offset++); + } + } else { + toDirectByteBuffer(offset, size).get(buffer, destOffset, size); + } + } + /** + * Given an array of bytes, copies the content of this object into the array of bytes. + * The first byte to be copied is the one that could be read with {@code this.getByte(offset)} + */ public void copyTo(long offset, byte[] buffer) { copyTo(offset, buffer, 0, buffer.length); } - public abstract void copyTo(long offset, PinotDataBuffer buffer, long destOffset, long size); + /** + * Note: It is the responsibility of the caller to make sure arguments are checked before the methods are called. + * While some rudimentary checks are performed on the input, the checks are best effort and when performance is an + * overriding priority, as when methods of this class are optimized by the runtime compiler, some or all checks + * (if any) may be elided. Hence, the caller must not rely on the checks and corresponding exceptions! + */ + public void copyTo(long offset, PinotDataBuffer buffer, long destOffset, long size) { + int pageSize = Integer.MAX_VALUE; + long alreadyCopied = 0; + + while (size - alreadyCopied > 0L) { + int step; + long remaining = size - alreadyCopied; - public abstract void readFrom(long offset, byte[] buffer, int srcOffset, int size); + if (remaining > pageSize) { + step = pageSize; + } else { + step = (int) remaining; + } + ByteBuffer destBb = buffer.toDirectByteBuffer(destOffset + alreadyCopied, step); + ByteBuffer myView = toDirectByteBuffer(offset + alreadyCopied, step); + + destBb.put(myView); + + alreadyCopied += step; + } + } + + /** + * Given an array of bytes, writes the content in the specified position. + */ + public void readFrom(long offset, byte[] buffer, int srcOffset, int size) { + assert offset <= Integer.MAX_VALUE; + int intOffset = (int) offset; + + if (size <= BULK_BYTES_PROCESSING_THRESHOLD) { + int end = srcOffset + size; + for (int i = srcOffset; i < end; i++) { + putByte(intOffset++, buffer[i]); + } + } else { + toDirectByteBuffer(offset, size).put(buffer, srcOffset, size); + } + } public void readFrom(long offset, byte[] buffer) { readFrom(offset, buffer, 0, buffer.length); } - public abstract void readFrom(long offset, ByteBuffer buffer); + public void readFrom(long offset, ByteBuffer buffer) { + toDirectByteBuffer(offset, buffer.remaining()).put(buffer); + } - public abstract void readFrom(long offset, File file, long srcOffset, long size) - throws IOException; + public void readFrom(long offset, File file, long srcOffset, long size) + throws IOException { + try ( + RandomAccessFile raf = new RandomAccessFile(file, "r"); + FileChannel fileChannel = raf.getChannel()) { + int step = Integer.MAX_VALUE / 2; + while (size > Integer.MAX_VALUE) { + ByteBuffer bb = toDirectByteBuffer(offset, step); + fileChannel.read(bb, srcOffset); + offset += step; + srcOffset += step; + size -= step; + } + ByteBuffer bb = toDirectByteBuffer(offset, (int) size); + fileChannel.read(bb, srcOffset); + } + } public abstract long size(); @@ -379,8 +568,49 @@ public PinotDataBuffer view(long start, long end) { return view(start, end, order()); } + /** + * Returns an ByteBuffer with the same content of this buffer. + * + * This receiver object and the returned ByteBuffer share the same memory address, but the receiver conserves the + * ownership. This means that: + *

    + *
  1. The returned ByteBuffer should not be released (aka freed in C). For example, its cleaner should not be + * called. Violations of this rule may produce segmentation faults
  2. + *
  3. The returned ByteBuffer should not be used once the receiver is released. + * Violations of this rule may produce segmentation faults
  4. + *
  5. A write made by either the receiver or the returned ByteBuffer will be seen by the other.
  6. + *
+ * + * Depending on the implementation, this may be a view (and therefore changes on any buffer will be seen by the other) + * or a copy (in which case the cost will be higher, but each copy will have their own lifecycle). + * + * @param byteOrder The byte order of the returned ByteBuffer. No special treatment is done if the order of the + * receiver buffer is different from the order requested. In other words: if this buffer was written + * in big endian and the direct buffer is requested in little endian, the integers read from each + * buffer will be different. + */ public abstract ByteBuffer toDirectByteBuffer(long offset, int size, ByteOrder byteOrder); + /** + * Returns an ByteBuffer with the same content of this buffer. + * + * This receiver object and the returned ByteBuffer share the same memory address, but the receiver conserves the + * ownership. This means that: + *
    + *
  1. The returned ByteBuffer should not be released (aka freed in C). For example, its cleaner should not be + * called. Violations of this rule may produce segmentation faults
  2. + *
  3. The returned ByteBuffer should not be used once the receiver is released. + * Violations of this rule may produce segmentation faults
  4. + *
  5. A write made by either the receiver or the returned ByteBuffer will be seen by the other.
  6. + *
+ * + * Depending on the implementation, this may be a view (and therefore changes on any buffer will be seen by the other) + * or a copy (in which case the cost will be higher, but each copy will have their own lifecycle). + * + */ + // TODO: Most calls to this method are just used to then read the content of the buffer. + // This is unnecessary an generates 2-5 unnecessary objects. We should benchmark whether there is some advantage on + // transforming this buffer into a IntBuffer/LongBuffer/etc when reading sequentially public ByteBuffer toDirectByteBuffer(long offset, int size) { return toDirectByteBuffer(offset, size, order()); } @@ -389,4 +619,21 @@ public ByteBuffer toDirectByteBuffer(long offset, int size) { public abstract void release() throws IOException; + + public boolean isCloseable() { + return _closeable; + } + + protected static void checkLimits(long capacity, long offset, long size) { + if (offset < 0) { + throw new IllegalArgumentException("Offset " + offset + " cannot be negative"); + } + if (size < 0) { + throw new IllegalArgumentException("Size " + size + " cannot be negative"); + } + if (offset + size > capacity) { + throw new IllegalArgumentException("Size (" + size + ") + offset (" + offset + ") exceeds the capacity of " + + capacity); + } + } } diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/SmallWithFallbackPinotBufferFactory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/SmallWithFallbackPinotBufferFactory.java new file mode 100644 index 000000000000..c16cd4f89f9d --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/SmallWithFallbackPinotBufferFactory.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.memory; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteOrder; + + +/** + * A factory that receives two delegates, using one when the requested buffer can be indexes with integers and the other + * in the other case. + * + * This is commonly used to use ByteBuffers when possible. The utility of that is questionable, as it can increase the + * number of megamorphic calls in the hot path and also make errors related with -XX:MaxDirectMemorySize more + * indeterministic. But it is also the default behavior of Pinot, so it is kept as the default for compatibility + * reasons. + */ +public class SmallWithFallbackPinotBufferFactory implements PinotBufferFactory { + private final PinotBufferFactory _small; + private final PinotBufferFactory _fallback; + + public SmallWithFallbackPinotBufferFactory(PinotBufferFactory small, PinotBufferFactory fallback) { + _small = small; + _fallback = fallback; + } + + @Override + public PinotDataBuffer allocateDirect(long size, ByteOrder byteOrder) { + if (size <= Integer.MAX_VALUE) { + return _small.allocateDirect(size, byteOrder); + } else { + return _fallback.allocateDirect(size, byteOrder); + } + } + + @Override + public PinotDataBuffer readFile(File file, long offset, long size, ByteOrder byteOrder) + throws IOException { + if (size <= Integer.MAX_VALUE) { + return _small.readFile(file, offset, size, byteOrder); + } else { + return _fallback.readFile(file, offset, size, byteOrder); + } + } + + @Override + public PinotDataBuffer mapFile(File file, boolean readOnly, long offset, long size, ByteOrder byteOrder) + throws IOException { + if (size <= Integer.MAX_VALUE) { + return _small.mapFile(file, readOnly, offset, size, byteOrder); + } else { + return _fallback.mapFile(file, readOnly, offset, size, byteOrder); + } + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/unsafe/DirectMemory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/unsafe/DirectMemory.java new file mode 100644 index 000000000000..aef92af1dffd --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/unsafe/DirectMemory.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.memory.unsafe; + +import java.io.IOException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A {@link Memory} that is anonymous and therefore there it is not backed by any file. + */ +public class DirectMemory implements Memory { + private static final Logger LOGGER = LoggerFactory.getLogger(DirectMemory.class); + + private final long _address; + private final long _size; + private volatile boolean _closed = false; + + public DirectMemory(long size) { + _address = Unsafer.UNSAFE.allocateMemory(size); + _size = size; + + Unsafer.UNSAFE.setMemory(_address, _size, (byte) 0); + } + + @Override + public long getAddress() { + return _address; + } + + @Override + public long getSize() { + return _size; + } + + @Override + public void flush() { + } + + @Override + public void close() + throws IOException { + if (!_closed) { + synchronized (this) { + if (!_closed) { + Unsafer.UNSAFE.freeMemory(_address); + _closed = true; + } + } + } + } + + @Override + protected void finalize() + throws Throwable { + if (!_closed) { + LOGGER.warn("Mmap section of " + _size + " wasn't explicitly closed"); + close(); + } + super.finalize(); + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/unsafe/Memory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/unsafe/Memory.java new file mode 100644 index 000000000000..061c8f2465d4 --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/unsafe/Memory.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.memory.unsafe; + +import java.io.Closeable; +import java.io.IOException; + + +/** + * A representation of some offheap memory previously allocated. This is mainly an address, a size and a way to + * release it. + */ +public interface Memory extends Closeable { + /** + * The virtual address where the memory starts. + */ + long getAddress(); + + /** + * The number of bytes that can be accessed starting from {@link #getAddress()}. + */ + long getSize(); + + /** + * If the memory is backed by a file (like in a memory map file) it syncs the content between the memory and the + * disk. Otherwise it does nothing. + */ + void flush(); + + /** + * Close this object, releasing the reserved memory. + */ + @Override + void close() + throws IOException; +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/unsafe/MmapMemory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/unsafe/MmapMemory.java new file mode 100644 index 000000000000..521d7f4cdf35 --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/unsafe/MmapMemory.java @@ -0,0 +1,377 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.memory.unsafe; + +import com.google.common.collect.Lists; +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.io.UncheckedIOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.nio.channels.FileChannel; +import java.util.List; +import java.util.function.BiConsumer; +import net.openhft.chronicle.core.Jvm; +import net.openhft.chronicle.core.OS; +import net.openhft.posix.MSyncFlag; +import net.openhft.posix.PosixAPI; +import org.apache.pinot.segment.spi.utils.JavaVersion; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A {@link Memory} that whose bytes are mapped on a file. + */ +public class MmapMemory implements Memory { + private static final Logger LOGGER = LoggerFactory.getLogger(MmapMemory.class); + + private static final MapFun MAP_FUN; + + /** + * The address that correspond to the offset given at creation time. + * + * The actual mapping address may be smaller than this value, as usually memory map must start on an address that is + * page aligned. + */ + private final long _address; + /** + * How many bytes have been requested to be mapped. + * The actual mapped size may be larger (up to the next page), but the actual mapped size + * is stored by {@link #_section}. + */ + private final long _size; + private final MapSection _section; + private boolean _closed = false; + + static { + try { + Jvm.init(); + MAP_FUN = MapFun.find(); + } catch (ClassNotFoundException | NoSuchMethodException e) { + throw new RuntimeException(e); + } + } + + public MmapMemory(File file, boolean readOnly, long offset, long size) { + _size = size; + + try { + _section = MAP_FUN.map(file, readOnly, offset, size); + } catch (IOException e) { + throw new RuntimeException(e); + } + _address = _section.getAddress(); + } + + @Override + public long getAddress() { + return _address; + } + + @Override + public long getSize() { + return _size; + } + + @Override + public void flush() { + MSyncFlag mode = MSyncFlag.MS_SYNC; + PosixAPI.posix().msync(_address, _size, mode); + } + + @Override + public void close() + throws IOException { + try { + if (!_closed) { + synchronized (this) { + if (!_closed) { + _section._unmapFun.unmap(); + _closed = true; + } + } + } + } catch (InvocationTargetException | IllegalAccessException e) { + throw new RuntimeException("Error while calling unmap", e); + } + } + + @Override + protected void finalize() + throws Throwable { + if (!_closed) { + LOGGER.warn("Mmap section of " + _size + " wasn't explicitly closed"); + close(); + } + super.finalize(); + } + + private static class MapSection { + public static final MapSection EMPTY = new MapSection(0, () -> { + }); + private final long _address; + private final UnmapFun _unmapFun; + + public MapSection(long address, UnmapFun unmapFun) { + _address = address; + _unmapFun = unmapFun; + } + + public long getAddress() { + return _address; + } + + public UnmapFun getUnmapFun() { + return _unmapFun; + } + } + + /** + * This is a factory method that can be used to create {@link MapSection}s. + * + * Each JVM may provide different method to map files in memory. + */ + interface MapFun { + + /** + * @param file The file to be mapped. If its length is lower than offset + size and the mode is not read only, + * the file will be resized to that size. + * @param offset The offset in the file. Any positive value is valid, even if it is larger than the file size. + * @param size How many bytes to map. + * @throws IOException in several situations. For example, if the offset + size is larger than file length and the + * mode is read only or if the process doesn't have permission to read or modify the file. + */ + MapSection map(File file, boolean readOnly, long offset, long size) throws IOException; + + static MapFun find() + throws ClassNotFoundException, NoSuchMethodException { + List> candidates = Lists.newArrayList( + new Map0Fun.ChronicleCore(), + new Map0Fun.Java11(), + new Map0Fun.Java17(), + new Map0Fun.Java20() + ); + + for (Finder candidate : candidates) { + try { + return candidate.tryFind(); + } catch (NoSuchMethodException | ClassNotFoundException | AssertionError e) { + // IGNORE + } + } + throw new NoSuchMethodException("Cannot find how to create memory map files in Java " + JavaVersion.VERSION); + } + } + + /** + * As defined by POSIX, the map0 method requires that the offset is page aligned. Failing to do that may produce + * segfault errors. This interface is a {@link MapFun} that does some sanitation before calling the map method. + * They include: + *
    + *
  • Grow the file if the last mapped byte is larger than the file length.
  • + *
  • Align the offset with the previous page. This means that we need to correct the actual mapped address.
  • + *
+ */ + interface Map0Fun extends MapFun { + + /** + * @param pageAlignedOffset It has to be a positive value that is page aligned. + */ + MapSection map0(FileChannel fc, boolean readOnly, long pageAlignedOffset, long size) + throws InvocationTargetException, IllegalAccessException, IOException; + + default MapSection map(File file, boolean readOnly, long offset, long size) throws IOException { + String mode = readOnly ? "r" : "rw"; + try (RandomAccessFile raf = new RandomAccessFile(file, mode); FileChannel fc = raf.getChannel()) { + if (size == 0) { + return MapSection.EMPTY; + } + + long allocationGranule = Unsafer.UNSAFE.pageSize(); + int pagePosition = (int) (offset % allocationGranule); + + // Compute mmap address + if (!fc.isOpen()) { + throw new IOException("closed " + file.getPath()); + } + + long fileSize = fc.size(); + if (fileSize < offset + size) { + // If file size is smaller than the specified size, extend the file size + raf.seek(offset + size - 1); + raf.write(0); + } + long mapPosition = offset - pagePosition; + long mapSize = size + pagePosition; + + MapSection map0Section = map0(fc, readOnly, mapPosition, mapSize); + return new MapSection(map0Section.getAddress() + pagePosition, map0Section.getUnmapFun()); + } catch (InvocationTargetException | IllegalAccessException e) { + throw new RuntimeException("Cannot map file " + file + " from address " + offset + " with size " + size, e); + } + } + + static BiConsumer tryFindUnmapper() + throws NoSuchMethodException, ClassNotFoundException { + Class fileChannelImpl = MmapMemory.class.getClassLoader().loadClass("sun.nio.ch.FileChannelImpl"); + Method unmapMethod = fileChannelImpl.getDeclaredMethod("unmap0", long.class, long.class); + unmapMethod.setAccessible(true); + return (address, size) -> { + try { + unmapMethod.invoke(null, address, size); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + }; + } + + /** + * Instead of looking for the correct map method by our self, this finder delegates on + * {@link OS#map(FileChannel, FileChannel.MapMode, long, long)} and {@link OS#unmap(long, long)}, which internally + * does the same thing. + */ + class ChronicleCore implements Finder { + @Override + public Map0Fun tryFind() { + OS.mapAlignment(); + return (fc, readOnly, pageAlignedOffset, size) -> { + if (size == 0) { + return MapSection.EMPTY; + } + FileChannel.MapMode mapMode = readOnly ? FileChannel.MapMode.READ_ONLY : FileChannel.MapMode.READ_WRITE; + long alignedSize = OS.pageAlign(size); + long address = OS.map(fc, mapMode, pageAlignedOffset, alignedSize); + return new MapSection(address, () -> { + try { + OS.unmap(address, alignedSize); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + }; + } + } + + class Java11 implements Finder { + @Override + public Map0Fun tryFind() + throws NoSuchMethodException, ClassNotFoundException { + Class fileChannelImpl = MmapMemory.class.getClassLoader().loadClass("sun.nio.ch.FileChannelImpl"); + + // this is the usual method in JDK 11 + Method mapMethod = fileChannelImpl.getDeclaredMethod("map0", int.class, long.class, long.class); + mapMethod.setAccessible(true); + + BiConsumer unmap0 = tryFindUnmapper(); + + return (fc, readOnly, pageAlignedOffset, size) -> { + // see FileChannelImpl.MAP_RO and MAP_RW + int iMode = readOnly ? 0 : 1; + + long address = (long) mapMethod.invoke(fc, iMode, pageAlignedOffset, size); + + UnmapFun unmapFun = () -> unmap0.accept(address, size); + + return new MapSection(address, unmapFun); + }; + } + } + + class Java17 implements Finder { + @Override + public Map0Fun tryFind() + throws NoSuchMethodException, ClassNotFoundException { + Class fileChannelImpl = MmapMemory.class.getClassLoader().loadClass("sun.nio.ch.FileChannelImpl"); + + // https://github.com/openjdk/jdk17/blob/dfacda488bfbe2e11e8d607a6d08527710286982/src/java.base/share/classes/ + // sun/nio/ch/FileChannelImpl.java#L1341 + Method mapMethod = fileChannelImpl.getDeclaredMethod("map0", int.class, long.class, long.class, boolean.class); + mapMethod.setAccessible(true); + + BiConsumer unmap0 = tryFindUnmapper(); + + return (fc, readOnly, pageAlignedOffset, size) -> { + // see FileChannelImpl.MAP_RO and MAP_RW + int iMode = readOnly ? 0 : 1; + long address = (long) mapMethod.invoke(fc, iMode, pageAlignedOffset, size, false); + + UnmapFun unmapFun = () -> unmap0.accept(address, size); + + return new MapSection(address, unmapFun); + }; + } + } + + /** + * In Java 20 the method used to map already does the alignment corrections, so we could call it with a non-aligned + * offset. But we need to know the position in the page in order to correct the address, so it is useful to return a + * Map0Fun instead of a MapFun. + */ + class Java20 implements Finder { + @Override + public Map0Fun tryFind() + throws NoSuchMethodException, ClassNotFoundException { + Class fileChannelImpl = MmapMemory.class.getClassLoader().loadClass("sun.nio.ch.FileChannelImpl"); + + Method mapMethod = fileChannelImpl.getDeclaredMethod("mapInternal", FileChannel.MapMode.class, long.class, + long.class, int.class, boolean.class); + mapMethod.setAccessible(true); + + Class unmapperClass = MmapMemory.class.getClassLoader().loadClass("sun.nio.ch.FileChannelImpl$Unmapper"); + Method unmapMethod = unmapperClass.getDeclaredMethod("unmap"); + unmapMethod.setAccessible(true); + Method addressMethod = unmapperClass.getDeclaredMethod("address"); + addressMethod.setAccessible(true); + + return (fc, readOnly, pageAlignedOffset, size) -> { + FileChannel.MapMode mapMode = readOnly ? FileChannel.MapMode.READ_ONLY : FileChannel.MapMode.READ_WRITE; + // see https://github.com/openjdk/jdk/blob/cc9f7ad9ce33dc44d335fb7fb5483795c62ba936/src/java.base/share/ + // classes/sun/nio/ch/FileChannelImpl.java#L1223 + int prot = readOnly ? 0 : 1; + + Object unmapper = mapMethod.invoke(fc, mapMode, pageAlignedOffset, size, prot, false); + long address; + UnmapFun unmapFun; + if (unmapper == null) { + // unmapper may be null if the size is 0 or if the file descriptor is closed while mapInternal was called + address = 0; + unmapFun = () -> { + }; + } else { + address = (long) addressMethod.invoke(unmapper); + unmapFun = () -> unmapMethod.invoke(unmapper); + } + + return new MapSection(address, unmapFun); + }; + } + } + } + + interface UnmapFun { + void unmap() + throws InvocationTargetException, IllegalAccessException; + } + + private interface Finder { + C tryFind() throws NoSuchMethodException, ClassNotFoundException; + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/unsafe/UnsafePinotBuffer.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/unsafe/UnsafePinotBuffer.java new file mode 100644 index 000000000000..5830edf51ad9 --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/unsafe/UnsafePinotBuffer.java @@ -0,0 +1,184 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.memory.unsafe; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import org.apache.pinot.segment.spi.memory.ByteBufferUtil; +import org.apache.pinot.segment.spi.memory.NonNativePinotDataBuffer; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; +import sun.misc.Unsafe; + + +/** + * A {@link PinotDataBuffer} that uses {@link Unsafe} and can only read native byte order. + */ +public class UnsafePinotBuffer extends PinotDataBuffer { + + private final long _address; + private final long _size; + private final Memory _memory; + + public UnsafePinotBuffer(Memory memory, boolean ownsMemory) { + this(memory, ownsMemory, memory.getAddress(), memory.getSize()); + } + + private UnsafePinotBuffer(Memory memory, boolean ownsMemory, long address, long size) { + super(ownsMemory); + _memory = memory; + _size = size; + _address = address; + } + + void checkOffset(long offset, long size) { + if (offset < 0) { + throw new IllegalArgumentException("Offset is " + offset); + } + if (offset + size > _size) { + throw new IllegalArgumentException("Cannot apply a " + size + " byte length operation in offset " + offset); + } + } + + @Override + public byte getByte(long offset) { + checkOffset(offset, 1); + return Unsafer.UNSAFE.getByte(_address + offset); + } + + @Override + public void putByte(long offset, byte value) { + checkOffset(offset, 1); + Unsafer.UNSAFE.putByte(_address + offset, value); + } + + @Override + public char getChar(long offset) { + checkOffset(offset, 2); + return Unsafer.UNSAFE.getChar(_address + offset); + } + + @Override + public void putChar(long offset, char value) { + checkOffset(offset, 2); + Unsafer.UNSAFE.putChar(_address + offset, value); + } + + @Override + public short getShort(long offset) { + checkOffset(offset, 2); + return Unsafer.UNSAFE.getShort(_address + offset); + } + + @Override + public void putShort(long offset, short value) { + checkOffset(offset, 2); + Unsafer.UNSAFE.putShort(_address + offset, value); + } + + @Override + public int getInt(long offset) { + checkOffset(offset, 4); + return Unsafer.UNSAFE.getInt(_address + offset); + } + + @Override + public void putInt(long offset, int value) { + checkOffset(offset, 4); + Unsafer.UNSAFE.putInt(_address + offset, value); + } + + @Override + public long getLong(long offset) { + checkOffset(offset, 8); + return Unsafer.UNSAFE.getLong(_address + offset); + } + + @Override + public void putLong(long offset, long value) { + checkOffset(offset, 8); + Unsafer.UNSAFE.putLong(_address + offset, value); + } + + @Override + public float getFloat(long offset) { + checkOffset(offset, 4); + return Unsafer.UNSAFE.getFloat(_address + offset); + } + + @Override + public void putFloat(long offset, float value) { + checkOffset(offset, 4); + Unsafer.UNSAFE.putFloat(_address + offset, value); + } + + @Override + public double getDouble(long offset) { + checkOffset(offset, 8); + return Unsafer.UNSAFE.getDouble(_address + offset); + } + + @Override + public void putDouble(long offset, double value) { + checkOffset(offset, 8); + Unsafer.UNSAFE.putDouble(_address + offset, value); + } + + @Override + public long size() { + return _size; + } + + @Override + public ByteOrder order() { + return ByteOrder.nativeOrder(); + } + + @Override + public PinotDataBuffer view(long start, long end, ByteOrder byteOrder) { + long size = end - start; + checkOffset(start, size); + + UnsafePinotBuffer nativeView = new UnsafePinotBuffer(_memory, false, _address + start, size); + + if (byteOrder == ByteOrder.nativeOrder()) { + return nativeView; + } else { + return new NonNativePinotDataBuffer(nativeView); + } + } + + @Override + public ByteBuffer toDirectByteBuffer(long offset, int size, ByteOrder byteOrder) { + checkOffset(offset, size); + return ByteBufferUtil.newDirectByteBuffer(_address + offset, size, this) + .order(byteOrder); + } + + @Override + public void flush() { + _memory.flush(); + } + + @Override + public void release() + throws IOException { + _memory.close(); + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/unsafe/UnsafePinotBufferFactory.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/unsafe/UnsafePinotBufferFactory.java new file mode 100644 index 000000000000..3b7cce2b0028 --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/unsafe/UnsafePinotBufferFactory.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.memory.unsafe; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteOrder; +import org.apache.pinot.segment.spi.memory.NonNativePinotDataBuffer; +import org.apache.pinot.segment.spi.memory.PinotBufferFactory; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; +import xerial.larray.impl.OSInfo; + + +public class UnsafePinotBufferFactory implements PinotBufferFactory { + public UnsafePinotBufferFactory() { + if (OSInfo.isWindows()) { + throw new IllegalStateException(getClass().getCanonicalName() + " cannot be used in Windows"); + } + } + + @Override + public PinotDataBuffer allocateDirect(long size, ByteOrder byteOrder) { + UnsafePinotBuffer buffer = new UnsafePinotBuffer(new DirectMemory(size), true); + if (byteOrder == ByteOrder.nativeOrder()) { + return buffer; + } else { + return new NonNativePinotDataBuffer(buffer); + } + } + + @Override + public PinotDataBuffer mapFile(File file, boolean readOnly, long offset, long size, ByteOrder byteOrder) + throws IOException { + UnsafePinotBuffer buffer = new UnsafePinotBuffer(new MmapMemory(file, readOnly, offset, size), true); + if (byteOrder == ByteOrder.nativeOrder()) { + return buffer; + } else { + return new NonNativePinotDataBuffer(buffer); + } + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/unsafe/Unsafer.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/unsafe/Unsafer.java new file mode 100644 index 000000000000..43389e250315 --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/unsafe/Unsafer.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.memory.unsafe; + +import java.lang.reflect.Field; +import sun.misc.Unsafe; + +public class Unsafer { + + public static final Unsafe UNSAFE; + + static { + try { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + UNSAFE = Unsafe.class.cast(f.get(null)); + } catch (NoSuchFieldException e) { + throw new IllegalStateException("sun.misc.Unsafe is not available in this JVM"); + } catch (IllegalAccessException e) { + throw new IllegalStateException("sun.misc.Unsafe is not available in this JVM"); + } + } + + private Unsafer() { + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/utils/JavaVersion.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/utils/JavaVersion.java new file mode 100644 index 000000000000..f86c45cd5902 --- /dev/null +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/utils/JavaVersion.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.utils; + +import net.openhft.chronicle.core.Jvm; + + +public class JavaVersion { + + /** + * Returns the major Java version (ie 6, 8, 11, 15, 21, etc) + */ + public static final int VERSION; + + static { + Jvm.init(); + VERSION = Jvm.majorVersion(); + } + + private JavaVersion() { + } +} diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotByteBufferTest.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotByteBufferTest.java new file mode 100644 index 000000000000..fc80635ca4bc --- /dev/null +++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotByteBufferTest.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.memory; + +public class PinotByteBufferTest extends PinotDataBufferInstanceTestBase { + public PinotByteBufferTest() { + super(new ByteBufferPinotBufferFactory()); + } +} diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferInstanceTestBase.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferInstanceTestBase.java new file mode 100644 index 000000000000..52f85d676f88 --- /dev/null +++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferInstanceTestBase.java @@ -0,0 +1,366 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.memory; + +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.Arrays; +import java.util.concurrent.Future; +import org.apache.commons.io.FileUtils; +import org.testng.Assert; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + + +/** + * A test contract that all pinot data buffer implementations should include. + */ +public abstract class PinotDataBufferInstanceTestBase extends PinotDataBufferTestBase { + + public final PinotBufferFactory _factory; + + public PinotDataBufferInstanceTestBase(PinotBufferFactory factory) { + _factory = factory; + } + + @BeforeMethod + public void setFactory() { + PinotDataBuffer.useFactory(_factory); + } + + @AfterTest + public void cleanFactory() { + PinotDataBuffer.useFactory(PinotDataBuffer.createDefaultFactory()); + } + + @BeforeMethod + public void cleanStats() { + PinotDataBuffer.cleanStats(); + } + + @SuppressWarnings("RedundantExplicitClose") + @Test + public void testMultipleClose() + throws Exception { + try (PinotDataBuffer buffer = _factory.allocateDirect(BUFFER_SIZE, PinotDataBuffer.NATIVE_ORDER)) { + buffer.close(); + } + try (PinotDataBuffer buffer = _factory.allocateDirect(BUFFER_SIZE, PinotDataBuffer.NATIVE_ORDER)) { + buffer.close(); + } + try (RandomAccessFile randomAccessFile = new RandomAccessFile(TEMP_FILE, "rw")) { + randomAccessFile.setLength(FILE_OFFSET + BUFFER_SIZE); + try (PinotDataBuffer buffer = _factory.readFile(TEMP_FILE, FILE_OFFSET, BUFFER_SIZE, ByteOrder.LITTLE_ENDIAN)) { + buffer.close(); + } + try (PinotDataBuffer buffer = _factory.readFile(TEMP_FILE, FILE_OFFSET, BUFFER_SIZE, ByteOrder.BIG_ENDIAN)) { + buffer.close(); + } + try (PinotDataBuffer buffer = _factory + .mapFile(TEMP_FILE, true, FILE_OFFSET, BUFFER_SIZE, ByteOrder.LITTLE_ENDIAN)) { + buffer.close(); + } + try (PinotDataBuffer buffer = + _factory.mapFile(TEMP_FILE, true, FILE_OFFSET, BUFFER_SIZE, ByteOrder.BIG_ENDIAN)) { + buffer.close(); + } + } finally { + FileUtils.forceDelete(TEMP_FILE); + } + } + + @Test + public void testDirectBE() + throws Exception { + try (PinotDataBuffer buffer = _factory.allocateDirect(BUFFER_SIZE, ByteOrder.BIG_ENDIAN)) { + Assert.assertSame(buffer.order(), ByteOrder.BIG_ENDIAN); + testPinotDataBuffer(buffer); + } + } + + @Test + public void testDirectLE() + throws Exception { + try (PinotDataBuffer buffer = PinotByteBuffer.allocateDirect(BUFFER_SIZE, ByteOrder.LITTLE_ENDIAN)) { + Assert.assertSame(buffer.order(), ByteOrder.LITTLE_ENDIAN); + testPinotDataBuffer(buffer); + } + } + + @Test + public void testReadFileBE() + throws Exception { + try (RandomAccessFile randomAccessFile = new RandomAccessFile(TEMP_FILE, "rw")) { + randomAccessFile.setLength(FILE_OFFSET + BUFFER_SIZE); + try (PinotDataBuffer buffer = _factory + .readFile(TEMP_FILE, FILE_OFFSET, BUFFER_SIZE, ByteOrder.BIG_ENDIAN)) { + Assert.assertSame(buffer.order(), ByteOrder.BIG_ENDIAN); + testPinotDataBuffer(buffer); + } + } + } + + @Test + public void testReadFileLE() + throws Exception { + try (RandomAccessFile randomAccessFile = new RandomAccessFile(TEMP_FILE, "rw")) { + randomAccessFile.setLength(FILE_OFFSET + BUFFER_SIZE); + try (PinotDataBuffer buffer = _factory + .readFile(TEMP_FILE, FILE_OFFSET, BUFFER_SIZE, ByteOrder.LITTLE_ENDIAN)) { + Assert.assertSame(buffer.order(), ByteOrder.LITTLE_ENDIAN); + testPinotDataBuffer(buffer); + } + } + } + + @Test + public void testMapFileBE() + throws Exception { + try (RandomAccessFile randomAccessFile = new RandomAccessFile(TEMP_FILE, "rw")) { + randomAccessFile.setLength(FILE_OFFSET + BUFFER_SIZE); + try (PinotDataBuffer buffer = _factory + .mapFile(TEMP_FILE, false, FILE_OFFSET, BUFFER_SIZE, ByteOrder.BIG_ENDIAN)) { + Assert.assertSame(buffer.order(), ByteOrder.BIG_ENDIAN); + testPinotDataBuffer(buffer); + } + } + } + + @Test + public void testMapFileLE() + throws Exception { + try (RandomAccessFile randomAccessFile = new RandomAccessFile(TEMP_FILE, "rw")) { + randomAccessFile.setLength(FILE_OFFSET + BUFFER_SIZE); + try (PinotDataBuffer buffer = _factory + .mapFile(TEMP_FILE, false, FILE_OFFSET, BUFFER_SIZE, ByteOrder.LITTLE_ENDIAN)) { + Assert.assertSame(buffer.order(), ByteOrder.LITTLE_ENDIAN); + testPinotDataBuffer(buffer); + } + } + } + + protected void testPinotDataBuffer(PinotDataBuffer buffer) + throws Exception { + Assert.assertEquals(buffer.size(), BUFFER_SIZE); + testReadWriteByte(buffer); + testReadWriteChar(buffer); + testReadWriteShort(buffer); + testReadWriteInt(buffer); + testReadWriteLong(buffer); + testReadWriteFloat(buffer); + testReadWriteDouble(buffer); + testReadWriteBytes(buffer); + testReadWritePinotDataBuffer(buffer); + testReadFromByteBuffer(buffer); + testConcurrentReadWrite(buffer); + } + + protected void testReadWriteByte(PinotDataBuffer buffer) { + for (int i = 0; i < NUM_ROUNDS; i++) { + int intOffset = RANDOM.nextInt(BUFFER_SIZE); + buffer.putByte(intOffset, _bytes[i]); + Assert.assertEquals(buffer.getByte(intOffset), _bytes[i]); + } + for (int i = 0; i < NUM_ROUNDS; i++) { + long longOffset = RANDOM.nextInt(BUFFER_SIZE); + buffer.putByte(longOffset, _bytes[i]); + Assert.assertEquals(buffer.getByte(longOffset), _bytes[i]); + } + } + + protected void testReadWriteChar(PinotDataBuffer buffer) { + for (int i = 0; i < NUM_ROUNDS; i++) { + int index = RANDOM.nextInt(CHAR_ARRAY_LENGTH); + int intOffset = index * Byte.BYTES; + buffer.putChar(intOffset, _chars[i]); + Assert.assertEquals(buffer.getChar(intOffset), _chars[i]); + } + for (int i = 0; i < NUM_ROUNDS; i++) { + int index = RANDOM.nextInt(CHAR_ARRAY_LENGTH); + long longOffset = index * Byte.BYTES; + buffer.putChar(longOffset, _chars[i]); + Assert.assertEquals(buffer.getChar(longOffset), _chars[i]); + } + } + + protected void testReadWriteShort(PinotDataBuffer buffer) { + for (int i = 0; i < NUM_ROUNDS; i++) { + int index = RANDOM.nextInt(SHORT_ARRAY_LENGTH); + int intOffset = index * Byte.BYTES; + buffer.putShort(intOffset, _shorts[i]); + Assert.assertEquals(buffer.getShort(intOffset), _shorts[i]); + } + for (int i = 0; i < NUM_ROUNDS; i++) { + int index = RANDOM.nextInt(SHORT_ARRAY_LENGTH); + long longOffset = index * Byte.BYTES; + buffer.putShort(longOffset, _shorts[i]); + Assert.assertEquals(buffer.getShort(longOffset), _shorts[i]); + } + } + + protected void testReadWriteInt(PinotDataBuffer buffer) { + for (int i = 0; i < NUM_ROUNDS; i++) { + int index = RANDOM.nextInt(INT_ARRAY_LENGTH); + int intOffset = index * Byte.BYTES; + buffer.putInt(intOffset, _ints[i]); + Assert.assertEquals(buffer.getInt(intOffset), _ints[i]); + } + for (int i = 0; i < NUM_ROUNDS; i++) { + int index = RANDOM.nextInt(INT_ARRAY_LENGTH); + long longOffset = index * Byte.BYTES; + buffer.putInt(longOffset, _ints[i]); + Assert.assertEquals(buffer.getInt(longOffset), _ints[i]); + } + } + + protected void testReadWriteLong(PinotDataBuffer buffer) { + for (int i = 0; i < NUM_ROUNDS; i++) { + int index = RANDOM.nextInt(LONG_ARRAY_LENGTH); + int intOffset = index * Byte.BYTES; + buffer.putLong(intOffset, _longs[i]); + Assert.assertEquals(buffer.getLong(intOffset), _longs[i]); + } + for (int i = 0; i < NUM_ROUNDS; i++) { + int index = RANDOM.nextInt(LONG_ARRAY_LENGTH); + long longOffset = index * Byte.BYTES; + buffer.putLong(longOffset, _longs[i]); + Assert.assertEquals(buffer.getLong(longOffset), _longs[i]); + } + } + + protected void testReadWriteFloat(PinotDataBuffer buffer) { + for (int i = 0; i < NUM_ROUNDS; i++) { + int index = RANDOM.nextInt(FLOAT_ARRAY_LENGTH); + int intOffset = index * Byte.BYTES; + buffer.putFloat(intOffset, _floats[i]); + Assert.assertEquals(buffer.getFloat(intOffset), _floats[i]); + } + for (int i = 0; i < NUM_ROUNDS; i++) { + int index = RANDOM.nextInt(FLOAT_ARRAY_LENGTH); + long longOffset = index * Byte.BYTES; + buffer.putFloat(longOffset, _floats[i]); + Assert.assertEquals(buffer.getFloat(longOffset), _floats[i]); + } + } + + protected void testReadWriteDouble(PinotDataBuffer buffer) { + for (int i = 0; i < NUM_ROUNDS; i++) { + int index = RANDOM.nextInt(DOUBLE_ARRAY_LENGTH); + int intOffset = index * Byte.BYTES; + buffer.putDouble(intOffset, _doubles[i]); + Assert.assertEquals(buffer.getDouble(intOffset), _doubles[i]); + } + for (int i = 0; i < NUM_ROUNDS; i++) { + int index = RANDOM.nextInt(DOUBLE_ARRAY_LENGTH); + long longOffset = index * Byte.BYTES; + buffer.putDouble(longOffset, _doubles[i]); + Assert.assertEquals(buffer.getDouble(longOffset), _doubles[i]); + } + } + + protected void testReadWriteBytes(PinotDataBuffer buffer) { + byte[] readBuffer = new byte[MAX_BYTES_LENGTH]; + byte[] writeBuffer = new byte[MAX_BYTES_LENGTH]; + for (int i = 0; i < NUM_ROUNDS; i++) { + int length = RANDOM.nextInt(MAX_BYTES_LENGTH); + int offset = RANDOM.nextInt(BUFFER_SIZE - length); + int arrayOffset = RANDOM.nextInt(MAX_BYTES_LENGTH - length); + System.arraycopy(_bytes, offset, readBuffer, arrayOffset, length); + buffer.readFrom(offset, readBuffer, arrayOffset, length); + buffer.copyTo(offset, writeBuffer, arrayOffset, length); + int end = arrayOffset + length; + for (int j = arrayOffset; j < end; j++) { + Assert.assertEquals(writeBuffer[j], readBuffer[j]); + } + } + } + + protected void testReadWritePinotDataBuffer(PinotDataBuffer buffer) { + testReadWritePinotDataBuffer(buffer, PinotByteBuffer.allocateDirect(MAX_BYTES_LENGTH, PinotDataBuffer.NATIVE_ORDER), + PinotByteBuffer.allocateDirect(MAX_BYTES_LENGTH, PinotDataBuffer.NON_NATIVE_ORDER)); + testReadWritePinotDataBuffer(buffer, + PinotByteBuffer.allocateDirect(2 * MAX_BYTES_LENGTH, PinotDataBuffer.NON_NATIVE_ORDER) + .view(MAX_BYTES_LENGTH, 2 * MAX_BYTES_LENGTH), + PinotByteBuffer.allocateDirect(2 * MAX_BYTES_LENGTH, PinotDataBuffer.NATIVE_ORDER) + .view(MAX_BYTES_LENGTH, 2 * MAX_BYTES_LENGTH)); + testReadWritePinotDataBuffer(buffer, _factory.allocateDirect(MAX_BYTES_LENGTH, ByteOrder.nativeOrder()), + _factory.allocateDirect(MAX_BYTES_LENGTH, PinotDataBuffer.NON_NATIVE_ORDER)); + testReadWritePinotDataBuffer(buffer, + _factory.allocateDirect(2 * MAX_BYTES_LENGTH, ByteOrder.nativeOrder()) + .view(MAX_BYTES_LENGTH, 2 * MAX_BYTES_LENGTH), + _factory.allocateDirect(2 * MAX_BYTES_LENGTH, PinotDataBuffer.NON_NATIVE_ORDER) + .view(MAX_BYTES_LENGTH, 2 * MAX_BYTES_LENGTH)); + } + + protected void testReadWritePinotDataBuffer(PinotDataBuffer buffer, PinotDataBuffer readBuffer, + PinotDataBuffer writeBuffer) { + for (int i = 0; i < NUM_ROUNDS; i++) { + int length = RANDOM.nextInt(MAX_BYTES_LENGTH); + int offset = RANDOM.nextInt(BUFFER_SIZE - length); + readBuffer.readFrom(0, _bytes, RANDOM.nextInt(BUFFER_SIZE - length), length); + readBuffer.copyTo(0, buffer, offset, length); + for (int j = 0; j < length; j++) { + Assert.assertEquals(buffer.getByte(j + offset), readBuffer.getByte(j)); + } + buffer.copyTo(offset, writeBuffer, 0, length); + for (int j = 0; j < length; j++) { + Assert.assertEquals(writeBuffer.getByte(j), readBuffer.getByte(j)); + } + } + } + + protected void testReadFromByteBuffer(PinotDataBuffer buffer) { + byte[] readBuffer = new byte[MAX_BYTES_LENGTH]; + for (int i = 0; i < NUM_ROUNDS; i++) { + int length = RANDOM.nextInt(MAX_BYTES_LENGTH); + int offset = RANDOM.nextInt(BUFFER_SIZE - length); + System.arraycopy(_bytes, offset, readBuffer, 0, length); + buffer.readFrom(offset, ByteBuffer.wrap(readBuffer, 0, length)); + for (int j = 0; j < length; j++) { + Assert.assertEquals(buffer.getByte(offset + j), readBuffer[j]); + } + } + } + + protected void testConcurrentReadWrite(PinotDataBuffer buffer) + throws Exception { + Future[] futures = new Future[NUM_ROUNDS]; + for (int i = 0; i < NUM_ROUNDS; i++) { + futures[i] = _executorService.submit(() -> { + PinotDataBuffer.useFactory(_factory); + int length = RANDOM.nextInt(MAX_BYTES_LENGTH); + int offset = RANDOM.nextInt(BUFFER_SIZE - length); + byte[] readBuffer = new byte[length]; + byte[] writeBuffer = new byte[length]; + System.arraycopy(_bytes, offset, readBuffer, 0, length); + buffer.readFrom(offset, readBuffer); + buffer.copyTo(offset, writeBuffer); + Assert.assertTrue(Arrays.equals(readBuffer, writeBuffer)); + buffer.readFrom(offset, ByteBuffer.wrap(readBuffer)); + buffer.copyTo(offset, writeBuffer); + Assert.assertTrue(Arrays.equals(readBuffer, writeBuffer)); + }); + } + for (int i = 0; i < NUM_ROUNDS; i++) { + futures[i].get(); + } + } +} diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferTest.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferTest.java index 1538351c152b..7a59af52767a 100644 --- a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferTest.java +++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferTest.java @@ -18,71 +18,22 @@ */ package org.apache.pinot.segment.spi.memory; -import java.io.File; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.util.Arrays; -import java.util.Random; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; import org.apache.commons.io.FileUtils; import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -public class PinotDataBufferTest { - private static final Random RANDOM = new Random(); - private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(10); - private static final File TEMP_FILE = new File(FileUtils.getTempDirectory(), "PinotDataBufferTest"); - private static final int FILE_OFFSET = 10; // Not page-aligned - private static final int BUFFER_SIZE = 10_000; // Not page-aligned - private static final int CHAR_ARRAY_LENGTH = BUFFER_SIZE / Character.BYTES; - private static final int SHORT_ARRAY_LENGTH = BUFFER_SIZE / Short.BYTES; - private static final int INT_ARRAY_LENGTH = BUFFER_SIZE / Integer.BYTES; - private static final int LONG_ARRAY_LENGTH = BUFFER_SIZE / Long.BYTES; - private static final int FLOAT_ARRAY_LENGTH = BUFFER_SIZE / Float.BYTES; - private static final int DOUBLE_ARRAY_LENGTH = BUFFER_SIZE / Double.BYTES; - private static final int NUM_ROUNDS = 1000; - private static final int MAX_BYTES_LENGTH = 100; - private static final long LARGE_BUFFER_SIZE = Integer.MAX_VALUE + 2L; // Not page-aligned +public abstract class PinotDataBufferTest extends PinotDataBufferInstanceTestBase { - private byte[] _bytes = new byte[BUFFER_SIZE]; - private char[] _chars = new char[CHAR_ARRAY_LENGTH]; - private short[] _shorts = new short[SHORT_ARRAY_LENGTH]; - private int[] _ints = new int[INT_ARRAY_LENGTH]; - private long[] _longs = new long[LONG_ARRAY_LENGTH]; - private float[] _floats = new float[FLOAT_ARRAY_LENGTH]; - private double[] _doubles = new double[DOUBLE_ARRAY_LENGTH]; - - @BeforeClass - public void setUp() { - for (int i = 0; i < BUFFER_SIZE; i++) { - _bytes[i] = (byte) RANDOM.nextInt(); - } - for (int i = 0; i < CHAR_ARRAY_LENGTH; i++) { - _chars[i] = (char) RANDOM.nextInt(); - } - for (int i = 0; i < SHORT_ARRAY_LENGTH; i++) { - _shorts[i] = (short) RANDOM.nextInt(); - } - for (int i = 0; i < INT_ARRAY_LENGTH; i++) { - _ints[i] = RANDOM.nextInt(); - } - for (int i = 0; i < LONG_ARRAY_LENGTH; i++) { - _longs[i] = RANDOM.nextLong(); - } - for (int i = 0; i < FLOAT_ARRAY_LENGTH; i++) { - _floats[i] = RANDOM.nextFloat(); - } - for (int i = 0; i < DOUBLE_ARRAY_LENGTH; i++) { - _doubles[i] = RANDOM.nextDouble(); - } + public PinotDataBufferTest(PinotBufferFactory factory) { + super(factory); } + protected abstract boolean prioritizeByteBuffer(); + @Test public void testPinotByteBuffer() throws Exception { @@ -115,18 +66,17 @@ public void testPinotByteBuffer() } } - @Test - public void testPinotNativeOrderLBuffer() - throws Exception { - try (PinotDataBuffer buffer = PinotNativeOrderLBuffer.allocateDirect(BUFFER_SIZE)) { + private void testOrderBuffer(ByteOrder byteOrder) throws Exception { + try (PinotDataBuffer buffer = _factory.allocateDirect(BUFFER_SIZE, byteOrder)) { testPinotDataBuffer(buffer); } try (RandomAccessFile randomAccessFile = new RandomAccessFile(TEMP_FILE, "rw")) { randomAccessFile.setLength(FILE_OFFSET + BUFFER_SIZE); - try (PinotDataBuffer buffer = PinotNativeOrderLBuffer.loadFile(TEMP_FILE, FILE_OFFSET, BUFFER_SIZE)) { + try (PinotDataBuffer buffer = _factory.readFile(TEMP_FILE, FILE_OFFSET, BUFFER_SIZE, byteOrder)) { testPinotDataBuffer(buffer); } - try (PinotDataBuffer buffer = PinotNativeOrderLBuffer.mapFile(TEMP_FILE, false, FILE_OFFSET, BUFFER_SIZE)) { + try (PinotDataBuffer buffer = + _factory.mapFile(TEMP_FILE, false, FILE_OFFSET, BUFFER_SIZE, byteOrder)) { testPinotDataBuffer(buffer); } } finally { @@ -135,223 +85,15 @@ public void testPinotNativeOrderLBuffer() } @Test - public void testPinotNonNativeOrderLBuffer() - throws Exception { - try (PinotDataBuffer buffer = PinotNonNativeOrderLBuffer.allocateDirect(BUFFER_SIZE)) { - testPinotDataBuffer(buffer); - } - try (RandomAccessFile randomAccessFile = new RandomAccessFile(TEMP_FILE, "rw")) { - randomAccessFile.setLength(FILE_OFFSET + BUFFER_SIZE); - try (PinotDataBuffer buffer = PinotNonNativeOrderLBuffer.loadFile(TEMP_FILE, FILE_OFFSET, BUFFER_SIZE)) { - testPinotDataBuffer(buffer); - } - try (PinotDataBuffer buffer = PinotNonNativeOrderLBuffer.mapFile(TEMP_FILE, false, FILE_OFFSET, BUFFER_SIZE)) { - testPinotDataBuffer(buffer); - } - } finally { - FileUtils.forceDelete(TEMP_FILE); - } - } - - private void testPinotDataBuffer(PinotDataBuffer buffer) + public void testPinotNativeOrderBuffer() throws Exception { - Assert.assertEquals(buffer.size(), BUFFER_SIZE); - testReadWriteByte(buffer); - testReadWriteChar(buffer); - testReadWriteShort(buffer); - testReadWriteInt(buffer); - testReadWriteLong(buffer); - testReadWriteFloat(buffer); - testReadWriteDouble(buffer); - testReadWriteBytes(buffer); - testReadWritePinotDataBuffer(buffer); - testReadFromByteBuffer(buffer); - testConcurrentReadWrite(buffer); - } - - private void testReadWriteByte(PinotDataBuffer buffer) { - for (int i = 0; i < NUM_ROUNDS; i++) { - int intOffset = RANDOM.nextInt(BUFFER_SIZE); - buffer.putByte(intOffset, _bytes[i]); - Assert.assertEquals(buffer.getByte(intOffset), _bytes[i]); - } - for (int i = 0; i < NUM_ROUNDS; i++) { - long longOffset = RANDOM.nextInt(BUFFER_SIZE); - buffer.putByte(longOffset, _bytes[i]); - Assert.assertEquals(buffer.getByte(longOffset), _bytes[i]); - } - } - - private void testReadWriteChar(PinotDataBuffer buffer) { - for (int i = 0; i < NUM_ROUNDS; i++) { - int index = RANDOM.nextInt(CHAR_ARRAY_LENGTH); - int intOffset = index * Byte.BYTES; - buffer.putChar(intOffset, _chars[i]); - Assert.assertEquals(buffer.getChar(intOffset), _chars[i]); - } - for (int i = 0; i < NUM_ROUNDS; i++) { - int index = RANDOM.nextInt(CHAR_ARRAY_LENGTH); - long longOffset = index * Byte.BYTES; - buffer.putChar(longOffset, _chars[i]); - Assert.assertEquals(buffer.getChar(longOffset), _chars[i]); - } - } - - private void testReadWriteShort(PinotDataBuffer buffer) { - for (int i = 0; i < NUM_ROUNDS; i++) { - int index = RANDOM.nextInt(SHORT_ARRAY_LENGTH); - int intOffset = index * Byte.BYTES; - buffer.putShort(intOffset, _shorts[i]); - Assert.assertEquals(buffer.getShort(intOffset), _shorts[i]); - } - for (int i = 0; i < NUM_ROUNDS; i++) { - int index = RANDOM.nextInt(SHORT_ARRAY_LENGTH); - long longOffset = index * Byte.BYTES; - buffer.putShort(longOffset, _shorts[i]); - Assert.assertEquals(buffer.getShort(longOffset), _shorts[i]); - } - } - - private void testReadWriteInt(PinotDataBuffer buffer) { - for (int i = 0; i < NUM_ROUNDS; i++) { - int index = RANDOM.nextInt(INT_ARRAY_LENGTH); - int intOffset = index * Byte.BYTES; - buffer.putInt(intOffset, _ints[i]); - Assert.assertEquals(buffer.getInt(intOffset), _ints[i]); - } - for (int i = 0; i < NUM_ROUNDS; i++) { - int index = RANDOM.nextInt(INT_ARRAY_LENGTH); - long longOffset = index * Byte.BYTES; - buffer.putInt(longOffset, _ints[i]); - Assert.assertEquals(buffer.getInt(longOffset), _ints[i]); - } + testOrderBuffer(PinotDataBuffer.NATIVE_ORDER); } - private void testReadWriteLong(PinotDataBuffer buffer) { - for (int i = 0; i < NUM_ROUNDS; i++) { - int index = RANDOM.nextInt(LONG_ARRAY_LENGTH); - int intOffset = index * Byte.BYTES; - buffer.putLong(intOffset, _longs[i]); - Assert.assertEquals(buffer.getLong(intOffset), _longs[i]); - } - for (int i = 0; i < NUM_ROUNDS; i++) { - int index = RANDOM.nextInt(LONG_ARRAY_LENGTH); - long longOffset = index * Byte.BYTES; - buffer.putLong(longOffset, _longs[i]); - Assert.assertEquals(buffer.getLong(longOffset), _longs[i]); - } - } - - private void testReadWriteFloat(PinotDataBuffer buffer) { - for (int i = 0; i < NUM_ROUNDS; i++) { - int index = RANDOM.nextInt(FLOAT_ARRAY_LENGTH); - int intOffset = index * Byte.BYTES; - buffer.putFloat(intOffset, _floats[i]); - Assert.assertEquals(buffer.getFloat(intOffset), _floats[i]); - } - for (int i = 0; i < NUM_ROUNDS; i++) { - int index = RANDOM.nextInt(FLOAT_ARRAY_LENGTH); - long longOffset = index * Byte.BYTES; - buffer.putFloat(longOffset, _floats[i]); - Assert.assertEquals(buffer.getFloat(longOffset), _floats[i]); - } - } - - private void testReadWriteDouble(PinotDataBuffer buffer) { - for (int i = 0; i < NUM_ROUNDS; i++) { - int index = RANDOM.nextInt(DOUBLE_ARRAY_LENGTH); - int intOffset = index * Byte.BYTES; - buffer.putDouble(intOffset, _doubles[i]); - Assert.assertEquals(buffer.getDouble(intOffset), _doubles[i]); - } - for (int i = 0; i < NUM_ROUNDS; i++) { - int index = RANDOM.nextInt(DOUBLE_ARRAY_LENGTH); - long longOffset = index * Byte.BYTES; - buffer.putDouble(longOffset, _doubles[i]); - Assert.assertEquals(buffer.getDouble(longOffset), _doubles[i]); - } - } - - private void testReadWriteBytes(PinotDataBuffer buffer) { - byte[] readBuffer = new byte[MAX_BYTES_LENGTH]; - byte[] writeBuffer = new byte[MAX_BYTES_LENGTH]; - for (int i = 0; i < NUM_ROUNDS; i++) { - int length = RANDOM.nextInt(MAX_BYTES_LENGTH); - int offset = RANDOM.nextInt(BUFFER_SIZE - length); - int arrayOffset = RANDOM.nextInt(MAX_BYTES_LENGTH - length); - System.arraycopy(_bytes, offset, readBuffer, arrayOffset, length); - buffer.readFrom(offset, readBuffer, arrayOffset, length); - buffer.copyTo(offset, writeBuffer, arrayOffset, length); - int end = arrayOffset + length; - for (int j = arrayOffset; j < end; j++) { - Assert.assertEquals(writeBuffer[j], readBuffer[j]); - } - } - } - - private void testReadWritePinotDataBuffer(PinotDataBuffer buffer) { - testReadWritePinotDataBuffer(buffer, PinotByteBuffer.allocateDirect(MAX_BYTES_LENGTH, PinotDataBuffer.NATIVE_ORDER), - PinotByteBuffer.allocateDirect(MAX_BYTES_LENGTH, PinotDataBuffer.NON_NATIVE_ORDER)); - testReadWritePinotDataBuffer(buffer, - PinotByteBuffer.allocateDirect(2 * MAX_BYTES_LENGTH, PinotDataBuffer.NON_NATIVE_ORDER) - .view(MAX_BYTES_LENGTH, 2 * MAX_BYTES_LENGTH), - PinotByteBuffer.allocateDirect(2 * MAX_BYTES_LENGTH, PinotDataBuffer.NATIVE_ORDER) - .view(MAX_BYTES_LENGTH, 2 * MAX_BYTES_LENGTH)); - testReadWritePinotDataBuffer(buffer, PinotNativeOrderLBuffer.allocateDirect(MAX_BYTES_LENGTH), - PinotNonNativeOrderLBuffer.allocateDirect(MAX_BYTES_LENGTH)); - testReadWritePinotDataBuffer(buffer, - PinotNonNativeOrderLBuffer.allocateDirect(2 * MAX_BYTES_LENGTH).view(MAX_BYTES_LENGTH, 2 * MAX_BYTES_LENGTH), - PinotNativeOrderLBuffer.allocateDirect(2 * MAX_BYTES_LENGTH).view(MAX_BYTES_LENGTH, 2 * MAX_BYTES_LENGTH)); - } - - private void testReadWritePinotDataBuffer(PinotDataBuffer buffer, PinotDataBuffer readBuffer, - PinotDataBuffer writeBuffer) { - for (int i = 0; i < NUM_ROUNDS; i++) { - int length = RANDOM.nextInt(MAX_BYTES_LENGTH); - int offset = RANDOM.nextInt(BUFFER_SIZE - length); - readBuffer.readFrom(0, _bytes, RANDOM.nextInt(BUFFER_SIZE - length), length); - readBuffer.copyTo(0, buffer, offset, length); - buffer.copyTo(offset, writeBuffer, 0, length); - for (int j = 0; j < length; j++) { - Assert.assertEquals(writeBuffer.getByte(j), readBuffer.getByte(j)); - } - } - } - - private void testReadFromByteBuffer(PinotDataBuffer buffer) { - byte[] readBuffer = new byte[MAX_BYTES_LENGTH]; - for (int i = 0; i < NUM_ROUNDS; i++) { - int length = RANDOM.nextInt(MAX_BYTES_LENGTH); - int offset = RANDOM.nextInt(BUFFER_SIZE - length); - System.arraycopy(_bytes, offset, readBuffer, 0, length); - buffer.readFrom(offset, ByteBuffer.wrap(readBuffer, 0, length)); - for (int j = 0; j < length; j++) { - Assert.assertEquals(buffer.getByte(offset + j), readBuffer[j]); - } - } - } - - private void testConcurrentReadWrite(PinotDataBuffer buffer) + @Test + public void testPinotNonNativeOrderBuffer() throws Exception { - Future[] futures = new Future[NUM_ROUNDS]; - for (int i = 0; i < NUM_ROUNDS; i++) { - futures[i] = EXECUTOR_SERVICE.submit(() -> { - int length = RANDOM.nextInt(MAX_BYTES_LENGTH); - int offset = RANDOM.nextInt(BUFFER_SIZE - length); - byte[] readBuffer = new byte[length]; - byte[] writeBuffer = new byte[length]; - System.arraycopy(_bytes, offset, readBuffer, 0, length); - buffer.readFrom(offset, readBuffer); - buffer.copyTo(offset, writeBuffer); - Assert.assertTrue(Arrays.equals(readBuffer, writeBuffer)); - buffer.readFrom(offset, ByteBuffer.wrap(readBuffer)); - buffer.copyTo(offset, writeBuffer); - Assert.assertTrue(Arrays.equals(readBuffer, writeBuffer)); - }); - } - for (int i = 0; i < NUM_ROUNDS; i++) { - futures[i].get(); - } + testOrderBuffer(PinotDataBuffer.NON_NATIVE_ORDER); } @Test @@ -373,15 +115,14 @@ public void testPinotByteBufferReadWriteFile() } } - @Test - public void testPinotNativeOrderLBufferReadWriteFile() + public void testPinotBufferReadWriteFile(ByteOrder byteOrder) throws Exception { - try (PinotDataBuffer writeBuffer = PinotNativeOrderLBuffer.mapFile(TEMP_FILE, false, FILE_OFFSET, BUFFER_SIZE)) { + try (PinotDataBuffer writeBuffer = _factory.mapFile(TEMP_FILE, false, FILE_OFFSET, BUFFER_SIZE, byteOrder)) { putInts(writeBuffer); - try (PinotDataBuffer readBuffer = PinotNativeOrderLBuffer.loadFile(TEMP_FILE, FILE_OFFSET, BUFFER_SIZE)) { + try (PinotDataBuffer readBuffer = _factory.readFile(TEMP_FILE, FILE_OFFSET, BUFFER_SIZE, byteOrder)) { getInts(readBuffer); } - try (PinotDataBuffer readBuffer = PinotNativeOrderLBuffer.mapFile(TEMP_FILE, true, FILE_OFFSET, BUFFER_SIZE)) { + try (PinotDataBuffer readBuffer = _factory.mapFile(TEMP_FILE, true, FILE_OFFSET, BUFFER_SIZE, byteOrder)) { getInts(readBuffer); } } finally { @@ -390,19 +131,15 @@ public void testPinotNativeOrderLBufferReadWriteFile() } @Test - public void testPinotNonNativeOrderLBufferReadWriteFile() + public void testPinotNativeOrderBufferReadWriteFile() throws Exception { - try (PinotDataBuffer writeBuffer = PinotNonNativeOrderLBuffer.mapFile(TEMP_FILE, false, FILE_OFFSET, BUFFER_SIZE)) { - putInts(writeBuffer); - try (PinotDataBuffer readBuffer = PinotNonNativeOrderLBuffer.loadFile(TEMP_FILE, FILE_OFFSET, BUFFER_SIZE)) { - getInts(readBuffer); - } - try (PinotDataBuffer readBuffer = PinotNonNativeOrderLBuffer.mapFile(TEMP_FILE, true, FILE_OFFSET, BUFFER_SIZE)) { - getInts(readBuffer); - } - } finally { - FileUtils.forceDelete(TEMP_FILE); - } + testPinotBufferReadWriteFile(PinotDataBuffer.NATIVE_ORDER); + } + + @Test + public void testPinotNonNativeOrderBufferReadWriteFile() + throws Exception { + testPinotBufferReadWriteFile(PinotDataBuffer.NON_NATIVE_ORDER); } private void putInts(PinotDataBuffer buffer) { @@ -425,43 +162,47 @@ private void getInts(PinotDataBuffer buffer) { public void testViewAndToDirectByteBuffer() throws Exception { int startOffset = RANDOM.nextInt(BUFFER_SIZE); + ByteOrder nativeOrder = PinotDataBuffer.NATIVE_ORDER; + ByteOrder nonNativeOrder = PinotDataBuffer.NON_NATIVE_ORDER; try (PinotDataBuffer writeBuffer = PinotByteBuffer - .mapFile(TEMP_FILE, false, FILE_OFFSET, 3 * BUFFER_SIZE, PinotDataBuffer.NATIVE_ORDER)) { + .mapFile(TEMP_FILE, false, FILE_OFFSET, 3 * BUFFER_SIZE, nativeOrder)) { putLongs(writeBuffer, startOffset); try (PinotDataBuffer readBuffer = PinotByteBuffer - .loadFile(TEMP_FILE, FILE_OFFSET, 3 * BUFFER_SIZE, PinotDataBuffer.NATIVE_ORDER)) { + .loadFile(TEMP_FILE, FILE_OFFSET, 3 * BUFFER_SIZE, nativeOrder)) { testViewAndToDirectByteBuffer(readBuffer, startOffset); } try (PinotDataBuffer readBuffer = PinotByteBuffer - .mapFile(TEMP_FILE, true, FILE_OFFSET, 3 * BUFFER_SIZE, PinotDataBuffer.NATIVE_ORDER)) { + .mapFile(TEMP_FILE, true, FILE_OFFSET, 3 * BUFFER_SIZE, nativeOrder)) { testViewAndToDirectByteBuffer(readBuffer, startOffset); } - try (PinotDataBuffer readBuffer = PinotNativeOrderLBuffer.loadFile(TEMP_FILE, FILE_OFFSET, 3 * BUFFER_SIZE)) { + try (PinotDataBuffer readBuffer = PinotByteBuffer + .loadFile(TEMP_FILE, FILE_OFFSET, 3 * BUFFER_SIZE, nativeOrder)) { testViewAndToDirectByteBuffer(readBuffer, startOffset); } try ( - PinotDataBuffer readBuffer = PinotNativeOrderLBuffer.mapFile(TEMP_FILE, true, FILE_OFFSET, 3 * BUFFER_SIZE)) { + PinotDataBuffer readBuffer = PinotByteBuffer + .mapFile(TEMP_FILE, true, FILE_OFFSET, 3 * BUFFER_SIZE, nativeOrder)) { testViewAndToDirectByteBuffer(readBuffer, startOffset); } } finally { FileUtils.forceDelete(TEMP_FILE); } try (PinotDataBuffer writeBuffer = PinotByteBuffer - .mapFile(TEMP_FILE, false, FILE_OFFSET, 3 * BUFFER_SIZE, PinotDataBuffer.NON_NATIVE_ORDER)) { + .mapFile(TEMP_FILE, false, FILE_OFFSET, 3 * BUFFER_SIZE, nonNativeOrder)) { putLongs(writeBuffer, startOffset); try (PinotDataBuffer readBuffer = PinotByteBuffer - .loadFile(TEMP_FILE, FILE_OFFSET, 3 * BUFFER_SIZE, PinotDataBuffer.NON_NATIVE_ORDER)) { + .loadFile(TEMP_FILE, FILE_OFFSET, 3 * BUFFER_SIZE, nonNativeOrder)) { testViewAndToDirectByteBuffer(readBuffer, startOffset); } try (PinotDataBuffer readBuffer = PinotByteBuffer - .mapFile(TEMP_FILE, true, FILE_OFFSET, 3 * BUFFER_SIZE, PinotDataBuffer.NON_NATIVE_ORDER)) { + .mapFile(TEMP_FILE, true, FILE_OFFSET, 3 * BUFFER_SIZE, nonNativeOrder)) { testViewAndToDirectByteBuffer(readBuffer, startOffset); } - try (PinotDataBuffer readBuffer = PinotNonNativeOrderLBuffer.loadFile(TEMP_FILE, FILE_OFFSET, 3 * BUFFER_SIZE)) { + try (PinotDataBuffer readBuffer = _factory.readFile(TEMP_FILE, FILE_OFFSET, 3 * BUFFER_SIZE, nonNativeOrder)) { testViewAndToDirectByteBuffer(readBuffer, startOffset); } - try (PinotDataBuffer readBuffer = PinotNonNativeOrderLBuffer - .mapFile(TEMP_FILE, true, FILE_OFFSET, 3 * BUFFER_SIZE)) { + try (PinotDataBuffer readBuffer = _factory + .mapFile(TEMP_FILE, true, FILE_OFFSET, 3 * BUFFER_SIZE, nonNativeOrder)) { testViewAndToDirectByteBuffer(readBuffer, startOffset); } } finally { @@ -510,85 +251,48 @@ private void testByteBuffer(ByteBuffer byteBuffer) { } } - @SuppressWarnings("RedundantExplicitClose") - @Test - public void testMultipleClose() - throws Exception { - try (PinotDataBuffer buffer = PinotByteBuffer.allocateDirect(BUFFER_SIZE, PinotDataBuffer.NATIVE_ORDER)) { - buffer.close(); - } - try (PinotDataBuffer buffer = PinotNativeOrderLBuffer.allocateDirect(BUFFER_SIZE)) { - buffer.close(); - } - try (PinotDataBuffer buffer = PinotNonNativeOrderLBuffer.allocateDirect(BUFFER_SIZE)) { - buffer.close(); - } - try (RandomAccessFile randomAccessFile = new RandomAccessFile(TEMP_FILE, "rw")) { - randomAccessFile.setLength(FILE_OFFSET + BUFFER_SIZE); - try (PinotDataBuffer buffer = PinotByteBuffer - .loadFile(TEMP_FILE, FILE_OFFSET, BUFFER_SIZE, PinotDataBuffer.NATIVE_ORDER)) { - buffer.close(); - } - try (PinotDataBuffer buffer = PinotNativeOrderLBuffer.loadFile(TEMP_FILE, FILE_OFFSET, BUFFER_SIZE)) { - buffer.close(); - } - try (PinotDataBuffer buffer = PinotNonNativeOrderLBuffer.loadFile(TEMP_FILE, FILE_OFFSET, BUFFER_SIZE)) { - buffer.close(); - } - try (PinotDataBuffer buffer = PinotByteBuffer - .mapFile(TEMP_FILE, true, FILE_OFFSET, BUFFER_SIZE, PinotDataBuffer.NATIVE_ORDER)) { - buffer.close(); - } - try (PinotDataBuffer buffer = PinotNativeOrderLBuffer.mapFile(TEMP_FILE, true, FILE_OFFSET, BUFFER_SIZE)) { - buffer.close(); - } - try (PinotDataBuffer buffer = PinotNonNativeOrderLBuffer.mapFile(TEMP_FILE, true, FILE_OFFSET, BUFFER_SIZE)) { - buffer.close(); - } - } finally { - FileUtils.forceDelete(TEMP_FILE); - } - } - @Test public void testConstructors() throws Exception { testBufferStats(0, 0, 0, 0); - try (RandomAccessFile randomAccessFile = new RandomAccessFile(TEMP_FILE, "rw")) { - randomAccessFile.setLength(FILE_OFFSET + BUFFER_SIZE); - try (PinotDataBuffer buffer1 = PinotDataBuffer.allocateDirect(BUFFER_SIZE, PinotDataBuffer.NATIVE_ORDER, null)) { - Assert.assertTrue(buffer1 instanceof PinotByteBuffer); - testBufferStats(1, BUFFER_SIZE, 0, 0); - try (PinotDataBuffer buffer2 = PinotDataBuffer - .loadFile(TEMP_FILE, FILE_OFFSET, BUFFER_SIZE, ByteOrder.BIG_ENDIAN, null)) { - Assert.assertTrue(buffer2 instanceof PinotByteBuffer); - testBufferStats(2, 2 * BUFFER_SIZE, 0, 0); - try (PinotDataBuffer buffer3 = PinotDataBuffer - .mapFile(TEMP_FILE, true, FILE_OFFSET, BUFFER_SIZE, ByteOrder.BIG_ENDIAN, null)) { - Assert.assertTrue(buffer3 instanceof PinotByteBuffer); - testBufferStats(2, 2 * BUFFER_SIZE, 1, BUFFER_SIZE); + if (prioritizeByteBuffer()) { + try (RandomAccessFile randomAccessFile = new RandomAccessFile(TEMP_FILE, "rw")) { + randomAccessFile.setLength(FILE_OFFSET + BUFFER_SIZE); + try (PinotDataBuffer buffer1 = + PinotDataBuffer.allocateDirect(BUFFER_SIZE, PinotDataBuffer.NATIVE_ORDER, null)) { + Assert.assertTrue(buffer1 instanceof PinotByteBuffer); + testBufferStats(1, BUFFER_SIZE, 0, 0); + try (PinotDataBuffer buffer2 = + PinotDataBuffer.loadFile(TEMP_FILE, FILE_OFFSET, BUFFER_SIZE, ByteOrder.BIG_ENDIAN, null)) { + Assert.assertTrue(buffer2 instanceof PinotByteBuffer); + testBufferStats(2, 2 * BUFFER_SIZE, 0, 0); + try (PinotDataBuffer buffer3 = + PinotDataBuffer.mapFile(TEMP_FILE, true, FILE_OFFSET, BUFFER_SIZE, ByteOrder.BIG_ENDIAN, null)) { + Assert.assertTrue(buffer3 instanceof PinotByteBuffer); + testBufferStats(2, 2 * BUFFER_SIZE, 1, BUFFER_SIZE); + } + testBufferStats(2, 2 * BUFFER_SIZE, 0, 0); } - testBufferStats(2, 2 * BUFFER_SIZE, 0, 0); + testBufferStats(1, BUFFER_SIZE, 0, 0); } - testBufferStats(1, BUFFER_SIZE, 0, 0); + testBufferStats(0, 0, 0, 0); + } finally { + FileUtils.forceDelete(TEMP_FILE); } - testBufferStats(0, 0, 0, 0); - } finally { - FileUtils.forceDelete(TEMP_FILE); } try (RandomAccessFile randomAccessFile = new RandomAccessFile(TEMP_FILE, "rw")) { randomAccessFile.setLength(FILE_OFFSET + LARGE_BUFFER_SIZE); try (PinotDataBuffer buffer1 = PinotDataBuffer .allocateDirect(LARGE_BUFFER_SIZE, PinotDataBuffer.NATIVE_ORDER, null)) { - Assert.assertTrue(buffer1 instanceof PinotNativeOrderLBuffer); + Assert.assertSame(buffer1.order(), PinotDataBuffer.NATIVE_ORDER); testBufferStats(1, LARGE_BUFFER_SIZE, 0, 0); try (PinotDataBuffer buffer2 = PinotDataBuffer .loadFile(TEMP_FILE, FILE_OFFSET, LARGE_BUFFER_SIZE, PinotDataBuffer.NATIVE_ORDER, null)) { - Assert.assertTrue(buffer2 instanceof PinotNativeOrderLBuffer); + Assert.assertSame(buffer2.order(), PinotDataBuffer.NATIVE_ORDER); testBufferStats(2, 2 * LARGE_BUFFER_SIZE, 0, 0); try (PinotDataBuffer buffer3 = PinotDataBuffer .mapFile(TEMP_FILE, true, FILE_OFFSET, LARGE_BUFFER_SIZE, PinotDataBuffer.NATIVE_ORDER, null)) { - Assert.assertTrue(buffer3 instanceof PinotNativeOrderLBuffer); + Assert.assertSame(buffer3.order(), PinotDataBuffer.NATIVE_ORDER); testBufferStats(2, 2 * LARGE_BUFFER_SIZE, 1, LARGE_BUFFER_SIZE); } testBufferStats(2, 2 * LARGE_BUFFER_SIZE, 0, 0); @@ -603,15 +307,15 @@ public void testConstructors() randomAccessFile.setLength(FILE_OFFSET + LARGE_BUFFER_SIZE); try (PinotDataBuffer buffer1 = PinotDataBuffer .allocateDirect(LARGE_BUFFER_SIZE, PinotDataBuffer.NON_NATIVE_ORDER, null)) { - Assert.assertTrue(buffer1 instanceof PinotNonNativeOrderLBuffer); + Assert.assertSame(buffer1.order(), PinotDataBuffer.NON_NATIVE_ORDER); testBufferStats(1, LARGE_BUFFER_SIZE, 0, 0); try (PinotDataBuffer buffer2 = PinotDataBuffer .loadFile(TEMP_FILE, FILE_OFFSET, LARGE_BUFFER_SIZE, PinotDataBuffer.NON_NATIVE_ORDER, null)) { - Assert.assertTrue(buffer2 instanceof PinotNonNativeOrderLBuffer); + Assert.assertSame(buffer2.order(), PinotDataBuffer.NON_NATIVE_ORDER); testBufferStats(2, 2 * LARGE_BUFFER_SIZE, 0, 0); try (PinotDataBuffer buffer3 = PinotDataBuffer .mapFile(TEMP_FILE, true, FILE_OFFSET, LARGE_BUFFER_SIZE, PinotDataBuffer.NON_NATIVE_ORDER, null)) { - Assert.assertTrue(buffer3 instanceof PinotNonNativeOrderLBuffer); + Assert.assertSame(buffer3.order(), PinotDataBuffer.NON_NATIVE_ORDER); testBufferStats(2, 2 * LARGE_BUFFER_SIZE, 1, LARGE_BUFFER_SIZE); } testBufferStats(2, 2 * LARGE_BUFFER_SIZE, 0, 0); @@ -623,19 +327,4 @@ public void testConstructors() FileUtils.forceDelete(TEMP_FILE); } } - - private void testBufferStats(int directBufferCount, long directBufferUsage, int mmapBufferCount, - long mmapBufferUsage) { - Assert.assertEquals(PinotDataBuffer.getAllocationFailureCount(), 0); - Assert.assertEquals(PinotDataBuffer.getDirectBufferCount(), directBufferCount); - Assert.assertEquals(PinotDataBuffer.getDirectBufferUsage(), directBufferUsage); - Assert.assertEquals(PinotDataBuffer.getMmapBufferCount(), mmapBufferCount); - Assert.assertEquals(PinotDataBuffer.getMmapBufferUsage(), mmapBufferUsage); - Assert.assertEquals(PinotDataBuffer.getBufferInfo().size(), directBufferCount + mmapBufferCount); - } - - @AfterClass - public void tearDown() { - EXECUTOR_SERVICE.shutdown(); - } } diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferTestBase.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferTestBase.java new file mode 100644 index 000000000000..4c3725cc12dc --- /dev/null +++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDataBufferTestBase.java @@ -0,0 +1,115 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.memory; + +import java.io.File; +import java.io.IOException; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.commons.io.FileUtils; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; + + +/** + * Not an actual test but a base class that can be extended by tests in order to have some basic data + */ +public class PinotDataBufferTestBase { + + protected static final Random RANDOM = new Random(); + protected ExecutorService _executorService; + protected static final File TEMP_FILE = new File(FileUtils.getTempDirectory(), "PinotDataBufferTest"); + protected static final int FILE_OFFSET = 10; // Not page-aligned + protected static final int BUFFER_SIZE = 10_000; // Not page-aligned + protected static final int CHAR_ARRAY_LENGTH = BUFFER_SIZE / Character.BYTES; + protected static final int SHORT_ARRAY_LENGTH = BUFFER_SIZE / Short.BYTES; + protected static final int INT_ARRAY_LENGTH = BUFFER_SIZE / Integer.BYTES; + protected static final int LONG_ARRAY_LENGTH = BUFFER_SIZE / Long.BYTES; + protected static final int FLOAT_ARRAY_LENGTH = BUFFER_SIZE / Float.BYTES; + protected static final int DOUBLE_ARRAY_LENGTH = BUFFER_SIZE / Double.BYTES; + protected static final int NUM_ROUNDS = 1000; + protected static final int MAX_BYTES_LENGTH = 100; + protected static final long LARGE_BUFFER_SIZE = Integer.MAX_VALUE + 2L; // Not page-aligned + + protected byte[] _bytes = new byte[BUFFER_SIZE]; + protected char[] _chars = new char[CHAR_ARRAY_LENGTH]; + protected short[] _shorts = new short[SHORT_ARRAY_LENGTH]; + protected int[] _ints = new int[INT_ARRAY_LENGTH]; + protected long[] _longs = new long[LONG_ARRAY_LENGTH]; + protected float[] _floats = new float[FLOAT_ARRAY_LENGTH]; + protected double[] _doubles = new double[DOUBLE_ARRAY_LENGTH]; + + @BeforeClass + public void setUp() { + for (int i = 0; i < BUFFER_SIZE; i++) { + _bytes[i] = (byte) RANDOM.nextInt(); + } + for (int i = 0; i < CHAR_ARRAY_LENGTH; i++) { + _chars[i] = (char) RANDOM.nextInt(); + } + for (int i = 0; i < SHORT_ARRAY_LENGTH; i++) { + _shorts[i] = (short) RANDOM.nextInt(); + } + for (int i = 0; i < INT_ARRAY_LENGTH; i++) { + _ints[i] = RANDOM.nextInt(); + } + for (int i = 0; i < LONG_ARRAY_LENGTH; i++) { + _longs[i] = RANDOM.nextLong(); + } + for (int i = 0; i < FLOAT_ARRAY_LENGTH; i++) { + _floats[i] = RANDOM.nextFloat(); + } + for (int i = 0; i < DOUBLE_ARRAY_LENGTH; i++) { + _doubles[i] = RANDOM.nextDouble(); + } + } + + protected void testBufferStats(int directBufferCount, long directBufferUsage, int mmapBufferCount, + long mmapBufferUsage) { + Assert.assertEquals(PinotDataBuffer.getAllocationFailureCount(), 0); + Assert.assertEquals(PinotDataBuffer.getDirectBufferCount(), directBufferCount); + Assert.assertEquals(PinotDataBuffer.getDirectBufferUsage(), directBufferUsage); + Assert.assertEquals(PinotDataBuffer.getMmapBufferCount(), mmapBufferCount); + Assert.assertEquals(PinotDataBuffer.getMmapBufferUsage(), mmapBufferUsage); + Assert.assertEquals(PinotDataBuffer.getBufferInfo().size(), directBufferCount + mmapBufferCount); + } + + @AfterMethod + public void deleteFileIfExists() + throws IOException { + if (TEMP_FILE.exists()) { + FileUtils.forceDelete(TEMP_FILE); + } + } + + @BeforeMethod + public void setupExecutor() { + _executorService = Executors.newFixedThreadPool(10); + } + + @AfterMethod + public void tearDown() { + if (_executorService != null) { + _executorService.shutdown(); + } + } +} diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDefaultByteBufferTest.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDefaultByteBufferTest.java new file mode 100644 index 000000000000..3c4012fa998f --- /dev/null +++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotDefaultByteBufferTest.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.memory; + + +public class PinotDefaultByteBufferTest extends PinotDataBufferTest { + public PinotDefaultByteBufferTest() { + super(PinotByteBuffer.createDefaultFactory()); + } + + @Override + protected boolean prioritizeByteBuffer() { + return false; + } +} diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotLArrayByteBufferTest.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotLArrayByteBufferTest.java new file mode 100644 index 000000000000..5c343fb7ef45 --- /dev/null +++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/PinotLArrayByteBufferTest.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.memory; + +import org.apache.pinot.segment.spi.utils.JavaVersion; +import org.testng.SkipException; +import org.testng.annotations.BeforeClass; + + +public class PinotLArrayByteBufferTest extends PinotDataBufferTest { + public PinotLArrayByteBufferTest() { + super(new LArrayPinotBufferFactory()); + } + + @Override + protected boolean prioritizeByteBuffer() { + return false; + } + + @BeforeClass + public void abortOnModernJava() { + if (JavaVersion.VERSION > 15) { + throw new SkipException("Skipping LArray tests because they cannot run in Java " + JavaVersion.VERSION); + } + } +} diff --git a/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/unsafe/UnsafeMemoryPinotDataBufferTest.java b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/unsafe/UnsafeMemoryPinotDataBufferTest.java new file mode 100644 index 000000000000..5f4e845a03e4 --- /dev/null +++ b/pinot-segment-spi/src/test/java/org/apache/pinot/segment/spi/memory/unsafe/UnsafeMemoryPinotDataBufferTest.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.memory.unsafe; + +import org.apache.pinot.segment.spi.memory.PinotDataBufferTest; + + +public class UnsafeMemoryPinotDataBufferTest extends PinotDataBufferTest { + public UnsafeMemoryPinotDataBufferTest() { + super(new UnsafePinotBufferFactory()); + } + + @Override + protected boolean prioritizeByteBuffer() { + return false; + } +} diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java index 69cd095077b0..52932cc6cc50 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java @@ -186,6 +186,9 @@ public void init(PinotConfiguration serverConf) // Initialize Pinot Environment Provider _pinotEnvironmentProvider = initializePinotEnvironmentProvider(); + // Initialize the data buffer factory + PinotDataBuffer.loadDefaultFactory(serverConf); + // Enable/disable thread CPU time measurement through instance config. ThreadResourceUsageProvider.setThreadCpuTimeMeasurementEnabled( _serverConf.getProperty(Server.CONFIG_OF_ENABLE_THREAD_CPU_TIME_MEASUREMENT, diff --git a/pinot-tools/src/main/resources/appAssemblerScriptTemplate b/pinot-tools/src/main/resources/appAssemblerScriptTemplate index c6611eacf285..f639f3d653c4 100644 --- a/pinot-tools/src/main/resources/appAssemblerScriptTemplate +++ b/pinot-tools/src/main/resources/appAssemblerScriptTemplate @@ -171,12 +171,44 @@ if $cygwin; then [ -n "$REPO" ] && REPO=`cygpath --path --windows "$REPO"` fi + +jdk_version() { + IFS=' +' + # remove \r for Cygwin + lines=$(java -Xms32M -Xmx32M -version 2>&1 | tr '\r' '\n') + for line in $lines; do + if test -z $result && echo "$line" | grep -q 'version "' + then + ver=$(echo $line | sed -e 's/.*version "\(.*\)"\(.*\)/\1/; 1q') + # on macOS, sed doesn't support '?' + if case $ver in "1."*) true;; *) false;; esac; + then + result=$(echo $ver | sed -e 's/1\.\([0-9]*\)\(.*\)/\1/; 1q') + else + result=$(echo $ver | sed -e 's/\([0-9]*\)\(.*\)/\1/; 1q') + fi + fi + done + unset IFS + echo "$result" +} + if [ -z "$JAVA_OPTS" ] ; then ALL_JAVA_OPTS="@EXTRA_JVM_ARGUMENTS@" else ALL_JAVA_OPTS=$JAVA_OPTS fi +if [ "$(jdk_version)" -gt 11 ]; then + ALL_JAVA_OPTS="--add-opens=java.base/java.nio=ALL-UNNAMED \ + --add-opens=java.base/sun.nio.ch=ALL-UNNAMED \ + --add-opens=java.base/java.lang=ALL-UNNAMED \ + --add-opens=java.base/java.util=ALL-UNNAMED \ + --add-opens=java.base/java.lang.reflect=ALL-UNNAMED \ + $JAVA_OPTS" +fi + if [ -z "$PLUGINS_DIR" ] ; then PLUGINS_DIR=$BASEDIR/plugins fi diff --git a/pom.xml b/pom.xml index 43857c55f428..4359f2afe696 100644 --- a/pom.xml +++ b/pom.xml @@ -249,6 +249,23 @@ + + + + + nl.jqno.equalsverifier + equalsverifier + 3.6 + test + + + org.mockito + mockito-core + 3.12.4 + test + + + other-jdk-maven-compiler-plugin @@ -422,7 +439,7 @@ nl.jqno.equalsverifier equalsverifier - 3.6 + 3.14.1 test @@ -1096,7 +1113,7 @@ org.mockito mockito-core - 3.9.0 + 5.3.1 test @@ -1118,7 +1135,12 @@ com.github.jnr jnr-posix - 3.0.12 + 3.1.15 + + + com.github.jnr + jnr-constants + 0.10.3 info.picocli @@ -1210,6 +1232,39 @@ test-clock 1.0.2 test + + + org.projectlombok + lombok + + + + + + org.projectlombok + lombok + 1.18.26 + + + + net.openhft + posix + 2.23.2 + + + net.openhft + chronicle-core + 2.24ea16 + + + org.ow2.asm + asm + 9.3 + + + net.java.dev.jna + jna + 5.5.0 @@ -1343,6 +1398,15 @@ false plain + + ${argLine} + --add-opens=java.base/java.nio=ALL-UNNAMED + --add-opens=java.base/sun.nio.ch=ALL-UNNAMED + --add-opens=java.base/java.lang=ALL-UNNAMED + --add-opens=java.base/java.util=ALL-UNNAMED + --add-exports=java.base/jdk.internal.util.random=ALL-UNNAMED + --add-opens=java.base/java.lang.reflect=ALL-UNNAMED +