diff --git a/java/algorithm/src/main/java/org/apache/arrow/algorithm/sort/FixedWidthInPlaceVectorSorter.java b/java/algorithm/src/main/java/org/apache/arrow/algorithm/sort/FixedWidthInPlaceVectorSorter.java index bac25cf8165..c5ecd8acb9b 100644 --- a/java/algorithm/src/main/java/org/apache/arrow/algorithm/sort/FixedWidthInPlaceVectorSorter.java +++ b/java/algorithm/src/main/java/org/apache/arrow/algorithm/sort/FixedWidthInPlaceVectorSorter.java @@ -46,6 +46,7 @@ public void sortInPlace(V vec, VectorValueComparator comparator) { this.comparator = comparator; this.pivotBuffer = (V) vec.getField().createVector(vec.getAllocator()); this.pivotBuffer.allocateNew(1); + this.pivotBuffer.setValueCount(1); comparator.attachVectors(vec, pivotBuffer); quickSort(); diff --git a/java/performance/src/test/java/org/apache/arrow/vector/util/TransferPairBenchmarks.java b/java/performance/src/test/java/org/apache/arrow/vector/util/TransferPairBenchmarks.java new file mode 100644 index 00000000000..235eca53c84 --- /dev/null +++ b/java/performance/src/test/java/org/apache/arrow/vector/util/TransferPairBenchmarks.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.vector.util; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VarCharVector; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +/** + * Benchmarks for {@link TransferPair}. + */ +@State(Scope.Benchmark) +public class TransferPairBenchmarks { + + private static final int VECTOR_LENGTH = 1024; + + private static final int ALLOCATOR_CAPACITY = 1024 * 1024; + + private BufferAllocator allocator; + + private IntVector intVector; + + private VarCharVector varCharVector; + + /** + * Setup benchmarks. + */ + @Setup + public void prepare() { + allocator = new RootAllocator(ALLOCATOR_CAPACITY); + intVector = new IntVector("intVector", allocator); + varCharVector = new VarCharVector("varcharVector", allocator); + + intVector.allocateNew(VECTOR_LENGTH); + varCharVector.allocateNew(VECTOR_LENGTH); + + for (int i = 0; i < VECTOR_LENGTH; i++) { + if (i % 3 == 0) { + intVector.setNull(i); + varCharVector.setNull(i); + } else { + intVector.setSafe(i, i * i); + varCharVector.setSafe(i, ("teststring" + i).getBytes(StandardCharsets.UTF_8)); + } + } + intVector.setValueCount(VECTOR_LENGTH); + varCharVector.setValueCount(VECTOR_LENGTH); + } + + /** + * Tear down benchmarks. + */ + @TearDown + public void tearDown() { + intVector.close(); + varCharVector.close();; + allocator.close(); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public int splitAndTransferIntVector() { + IntVector toVector = new IntVector("intVector", allocator); + toVector.setValueCount(VECTOR_LENGTH); + TransferPair transferPair = intVector.makeTransferPair(toVector); + transferPair.splitAndTransfer(0, VECTOR_LENGTH); + toVector.close(); + return 0; + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public int splitAndTransferVarcharVector() { + VarCharVector toVector = new VarCharVector("varcharVector", allocator); + toVector.setValueCount(VECTOR_LENGTH); + TransferPair transferPair = varCharVector.makeTransferPair(toVector); + transferPair.splitAndTransfer(0, VECTOR_LENGTH); + toVector.close(); + return 0; + } + + public static void main(String [] args) throws RunnerException { + Options opt = new OptionsBuilder() + .include(TransferPairBenchmarks.class.getSimpleName()) + .forks(1) + .build(); + + new Runner(opt).run(); + } +} diff --git a/java/vector/src/main/codegen/templates/UnionVector.java b/java/vector/src/main/codegen/templates/UnionVector.java index db3c8a89f5e..98d372aa97f 100644 --- a/java/vector/src/main/codegen/templates/UnionVector.java +++ b/java/vector/src/main/codegen/templates/UnionVector.java @@ -469,7 +469,10 @@ public void transfer() { @Override public void splitAndTransfer(int startIndex, int length) { - Preconditions.checkArgument(startIndex + length <= valueCount); + Preconditions.checkArgument(startIndex >= 0 && startIndex < valueCount, + "Invalid startIndex: %s", startIndex); + Preconditions.checkArgument(startIndex + length <= valueCount, + "Invalid length: %s", length); to.clear(); internalStructVectorTransferPair.splitAndTransfer(startIndex, length); final int startPoint = startIndex * TYPE_WIDTH; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java index ed91d6e659b..a4e94bcac09 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java @@ -584,7 +584,10 @@ public void transferTo(BaseFixedWidthVector target) { */ public void splitAndTransferTo(int startIndex, int length, BaseFixedWidthVector target) { - Preconditions.checkArgument(startIndex + length <= valueCount); + Preconditions.checkArgument(startIndex >= 0 && startIndex < valueCount, + "Invalid startIndex: %s", startIndex); + Preconditions.checkArgument(startIndex + length <= valueCount, + "Invalid length: %s", length); compareTypes(target, "splitAndTransferTo"); target.clear(); splitAndTransferValidityBuffer(startIndex, length, target); diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthVector.java index 7e276445643..89395ef7e22 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthVector.java @@ -714,6 +714,10 @@ public void transferTo(BaseVariableWidthVector target) { */ public void splitAndTransferTo(int startIndex, int length, BaseVariableWidthVector target) { + Preconditions.checkArgument(startIndex >= 0 && startIndex < valueCount, + "Invalid startIndex: %s", startIndex); + Preconditions.checkArgument(startIndex + length <= valueCount, + "Invalid length: %s", length); compareTypes(target, "splitAndTransferTo"); target.clear(); splitAndTransferValidityBuffer(startIndex, length, target); @@ -750,7 +754,6 @@ private void splitAndTransferOffsetBuffer(int startIndex, int length, BaseVariab */ private void splitAndTransferValidityBuffer(int startIndex, int length, BaseVariableWidthVector target) { - Preconditions.checkArgument(startIndex + length <= valueCount); int firstByteSource = BitVectorHelper.byteIndex(startIndex); int lastByteSource = BitVectorHelper.byteIndex(valueCount - 1); int byteSizeTarget = getValidityBufferSizeFromCount(length); diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java index 2fa5fdef4ef..b41c62677fc 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/BitVector.java @@ -158,6 +158,10 @@ public int getBufferSize() { * @param target destination vector */ public void splitAndTransferTo(int startIndex, int length, BaseFixedWidthVector target) { + Preconditions.checkArgument(startIndex >= 0 && startIndex < valueCount, + "Invalid startIndex: %s", startIndex); + Preconditions.checkArgument(startIndex + length <= valueCount, + "Invalid length: %s", length); compareTypes(target, "splitAndTransferTo"); target.clear(); target.validityBuffer = splitAndTransferBuffer(startIndex, length, target, @@ -174,7 +178,6 @@ private ArrowBuf splitAndTransferBuffer( BaseFixedWidthVector target, ArrowBuf sourceBuffer, ArrowBuf destBuffer) { - assert startIndex + length <= valueCount; int firstByteSource = BitVectorHelper.byteIndex(startIndex); int lastByteSource = BitVectorHelper.byteIndex(valueCount - 1); int byteSizeTarget = getValidityBufferSizeFromCount(length); diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java index 653719feafe..8fa43fb06b7 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/FixedSizeListVector.java @@ -590,7 +590,10 @@ public void transfer() { @Override public void splitAndTransfer(int startIndex, int length) { - Preconditions.checkArgument(startIndex + length <= valueCount); + Preconditions.checkArgument(startIndex >= 0 && startIndex < valueCount, + "Invalid startIndex: %s", startIndex); + Preconditions.checkArgument(startIndex + length <= valueCount, + "Invalid length: %s", length); final int startPoint = listSize * startIndex; final int sliceLength = listSize * length; to.clear(); diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java index 59481d39399..312a3556082 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java @@ -493,7 +493,10 @@ public void transfer() { */ @Override public void splitAndTransfer(int startIndex, int length) { - Preconditions.checkArgument(startIndex + length <= valueCount); + Preconditions.checkArgument(startIndex >= 0 && startIndex < valueCount, + "Invalid startIndex: %s", startIndex); + Preconditions.checkArgument(startIndex + length <= valueCount, + "Invalid length: %s", length); final int startPoint = offsetBuffer.getInt(startIndex * OFFSET_WIDTH); final int sliceLength = offsetBuffer.getInt((startIndex + length) * OFFSET_WIDTH) - startPoint; to.clear(); diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/StructVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/StructVector.java index ec922a032a9..7b22835963a 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/StructVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/StructVector.java @@ -195,7 +195,10 @@ public void copyValueSafe(int fromIndex, int toIndex) { @Override public void splitAndTransfer(int startIndex, int length) { - Preconditions.checkArgument(startIndex + length <= valueCount); + Preconditions.checkArgument(startIndex >= 0 && startIndex < valueCount, + "Invalid startIndex: %s", startIndex); + Preconditions.checkArgument(startIndex + length <= valueCount, + "Invalid length: %s", length); target.clear(); splitAndTransferValidityBuffer(startIndex, length, target); super.splitAndTransfer(startIndex, length); diff --git a/java/vector/src/test/java/org/apache/arrow/vector/TestSplitAndTransfer.java b/java/vector/src/test/java/org/apache/arrow/vector/TestSplitAndTransfer.java index 6405e256b22..d5e4a00d599 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/TestSplitAndTransfer.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/TestSplitAndTransfer.java @@ -18,16 +18,17 @@ package org.apache.arrow.vector; import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; -import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.util.TransferPair; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.jupiter.api.Assertions; public class TestSplitAndTransfer { private BufferAllocator allocator; @@ -41,6 +42,17 @@ public void init() { public void terminate() throws Exception { allocator.close(); } + + private void populateVarcharVector(final VarCharVector vector, int valueCount, String[] compareArray) { + for (int i = 0; i < valueCount; i += 3) { + final String s = String.format("%010d", i); + vector.set(i, s.getBytes()); + if (compareArray != null) { + compareArray[i] = s; + } + } + vector.setValueCount(valueCount); + } @Test /* VarCharVector */ public void test() throws Exception { @@ -50,12 +62,7 @@ public void test() throws Exception { final int valueCount = 500; final String[] compareArray = new String[valueCount]; - for (int i = 0; i < valueCount; i += 3) { - final String s = String.format("%010d", i); - varCharVector.set(i, s.getBytes()); - compareArray[i] = s; - } - varCharVector.setValueCount(valueCount); + populateVarcharVector(varCharVector, valueCount, compareArray); final TransferPair tp = varCharVector.getTransferPair(allocator); final VarCharVector newVarCharVector = (VarCharVector) tp.getTo(); @@ -89,11 +96,7 @@ public void testMemoryConstrainedTransfer() { final int valueCount = 1000; - for (int i = 0; i < valueCount; i += 3) { - final String s = String.format("%010d", i); - varCharVector.set(i, s.getBytes()); - } - varCharVector.setValueCount(valueCount); + populateVarcharVector(varCharVector, valueCount, null); final TransferPair tp = varCharVector.getTransferPair(allocator); final VarCharVector newVarCharVector = (VarCharVector) tp.getTo(); @@ -107,4 +110,143 @@ public void testMemoryConstrainedTransfer() { } } } + + @Test + public void testTransfer() { + try (final VarCharVector varCharVector = new VarCharVector("myvector", allocator)) { + varCharVector.allocateNew(10000, 1000); + + final int valueCount = 500; + final String[] compareArray = new String[valueCount]; + populateVarcharVector(varCharVector, valueCount, compareArray); + + final TransferPair tp = varCharVector.getTransferPair(allocator); + final VarCharVector newVarCharVector = (VarCharVector) tp.getTo(); + tp.transfer(); + + assertEquals(0, varCharVector.valueCount); + assertEquals(valueCount, newVarCharVector.valueCount); + + for (int i = 0; i < valueCount; i++) { + final boolean expectedSet = (i % 3) == 0; + if (expectedSet) { + final byte[] expectedValue = compareArray[i].getBytes(); + assertFalse(newVarCharVector.isNull(i)); + assertArrayEquals(expectedValue, newVarCharVector.get(i)); + } else { + assertTrue(newVarCharVector.isNull(i)); + } + } + + newVarCharVector.clear(); + } + } + + @Test + public void testCopyValueSafe() { + try (final VarCharVector varCharVector = new VarCharVector("myvector", allocator); + final VarCharVector newVarCharVector = new VarCharVector("newvector", allocator)) { + varCharVector.allocateNew(10000, 1000); + + final int valueCount = 500; + populateVarcharVector(varCharVector, valueCount, null); + + final TransferPair tp = varCharVector.makeTransferPair(newVarCharVector); + + // new vector memory is not pre-allocated, we expect copyValueSafe work fine. + for (int i = 0; i < valueCount; i++) { + tp.copyValueSafe(i, i); + } + newVarCharVector.setValueCount(valueCount); + + for (int i = 0; i < valueCount; i++) { + final boolean expectedSet = (i % 3) == 0; + if (expectedSet) { + assertFalse(varCharVector.isNull(i)); + assertFalse(newVarCharVector.isNull(i)); + assertArrayEquals(varCharVector.get(i), newVarCharVector.get(i)); + } else { + assertTrue(newVarCharVector.isNull(i)); + } + } + } + } + + @Test + public void testSplitAndTransferNon() { + try (final VarCharVector varCharVector = new VarCharVector("myvector", allocator)) { + + varCharVector.allocateNew(10000, 1000); + final int valueCount = 500; + populateVarcharVector(varCharVector, valueCount, null); + + final TransferPair tp = varCharVector.getTransferPair(allocator); + VarCharVector newVarCharVector = (VarCharVector) tp.getTo(); + + tp.splitAndTransfer(0, 0); + assertEquals(0, newVarCharVector.getValueCount()); + + newVarCharVector.clear(); + } + } + + @Test + public void testSplitAndTransferAll() { + try (final VarCharVector varCharVector = new VarCharVector("myvector", allocator)) { + + varCharVector.allocateNew(10000, 1000); + final int valueCount = 500; + populateVarcharVector(varCharVector, valueCount, null); + + final TransferPair tp = varCharVector.getTransferPair(allocator); + VarCharVector newVarCharVector = (VarCharVector) tp.getTo(); + + tp.splitAndTransfer(0, valueCount); + assertEquals(valueCount, newVarCharVector.getValueCount()); + + newVarCharVector.clear(); + } + } + + @Test + public void testInvalidStartIndex() { + try (final VarCharVector varCharVector = new VarCharVector("myvector", allocator); + final VarCharVector newVarCharVector = new VarCharVector("newvector", allocator)) { + + varCharVector.allocateNew(10000, 1000); + final int valueCount = 500; + populateVarcharVector(varCharVector, valueCount, null); + + final TransferPair tp = varCharVector.makeTransferPair(newVarCharVector); + + IllegalArgumentException e = Assertions.assertThrows( + IllegalArgumentException.class, + () -> tp.splitAndTransfer(valueCount, 10)); + + assertEquals("Invalid startIndex: 500", e.getMessage()); + + newVarCharVector.clear(); + } + } + + @Test + public void testInvalidLength() { + try (final VarCharVector varCharVector = new VarCharVector("myvector", allocator); + final VarCharVector newVarCharVector = new VarCharVector("newvector", allocator)) { + + varCharVector.allocateNew(10000, 1000); + final int valueCount = 500; + populateVarcharVector(varCharVector, valueCount, null); + + final TransferPair tp = varCharVector.makeTransferPair(newVarCharVector); + + IllegalArgumentException e = Assertions.assertThrows( + IllegalArgumentException.class, + () -> tp.splitAndTransfer(0, valueCount * 2)); + + assertEquals("Invalid length: 1000", e.getMessage()); + + newVarCharVector.clear(); + } + } }