diff --git a/java/memory/src/main/java/org/apache/arrow/memory/ArrowBuf.java b/java/memory/src/main/java/org/apache/arrow/memory/ArrowBuf.java index a2fdce618ae..823298d4e15 100644 --- a/java/memory/src/main/java/org/apache/arrow/memory/ArrowBuf.java +++ b/java/memory/src/main/java/org/apache/arrow/memory/ArrowBuf.java @@ -34,6 +34,7 @@ import io.netty.buffer.NettyArrowBuf; import io.netty.buffer.PooledByteBufAllocatorL; +import io.netty.util.internal.PlatformDependent; /** * ArrowBuf serves as a facade over underlying memory by providing @@ -230,7 +231,8 @@ public ByteBuffer nioBuffer() { } public ByteBuffer nioBuffer(long index, int length) { - return asNettyBuffer().nioBuffer(index, length); + return length == 0 ? ByteBuffer.allocateDirect(0) : + PlatformDependent.directBuffer(memoryAddress() + index, length); } public long memoryAddress() { diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java index 0a657b39d8f..ee47f6dd812 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseFixedWidthVector.java @@ -525,7 +525,7 @@ private void setReaderAndWriterIndex() { /* specialized handling for BitVector */ valueBuffer.writerIndex(getValidityBufferSizeFromCount(valueCount)); } else { - valueBuffer.writerIndex(valueCount * typeWidth); + valueBuffer.writerIndex((long) valueCount * typeWidth); } } } @@ -835,8 +835,8 @@ public void copyFrom(int fromIndex, int thisIndex, ValueVector from) { BitVectorHelper.unsetBit(this.getValidityBuffer(), thisIndex); } else { BitVectorHelper.setBit(this.getValidityBuffer(), thisIndex); - PlatformDependent.copyMemory(from.getDataBuffer().memoryAddress() + fromIndex * typeWidth, - this.getDataBuffer().memoryAddress() + thisIndex * typeWidth, typeWidth); + PlatformDependent.copyMemory(from.getDataBuffer().memoryAddress() + (long) fromIndex * typeWidth, + this.getDataBuffer().memoryAddress() + (long) thisIndex * typeWidth, typeWidth); } } @@ -878,7 +878,7 @@ public ArrowBufPointer getDataPointer(int index, ArrowBufPointer reuse) { if (isNull(index)) { reuse.set(null, 0, 0); } else { - reuse.set(valueBuffer, index * typeWidth, typeWidth); + reuse.set(valueBuffer, (long) index * typeWidth, typeWidth); } return reuse; } @@ -893,8 +893,8 @@ public int hashCode(int index, ArrowBufHasher hasher) { if (isNull(index)) { return ArrowBufPointer.NULL_HASH_CODE; } - int start = typeWidth * index; - int end = typeWidth * (index + 1); + long start = (long) typeWidth * index; + long end = (long) typeWidth * (index + 1); return ByteFunctionHelpers.hash(hasher, this.getDataBuffer(), start, end); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthVector.java b/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthVector.java index 4ebf679d229..6103d5913ea 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthVector.java @@ -199,7 +199,7 @@ public double getDensity() { return 0.0D; } final int startOffset = offsetBuffer.getInt(0); - final int endOffset = offsetBuffer.getInt(valueCount * OFFSET_WIDTH); + final int endOffset = offsetBuffer.getInt((long) valueCount * OFFSET_WIDTH); final double totalListSize = endOffset - startOffset; return totalListSize / valueCount; } @@ -358,7 +358,7 @@ private void setReaderAndWriterIndex() { } else { final int lastDataOffset = getStartOffset(valueCount); validityBuffer.writerIndex(getValidityBufferSizeFromCount(valueCount)); - offsetBuffer.writerIndex((valueCount + 1) * OFFSET_WIDTH); + offsetBuffer.writerIndex((long) (valueCount + 1) * OFFSET_WIDTH); valueBuffer.writerIndex(lastDataOffset); } } @@ -503,7 +503,7 @@ public void reallocDataBuffer() { if (lastValueAllocationSizeInBytes > 0) { newAllocationSize = lastValueAllocationSizeInBytes; } else { - newAllocationSize = INITIAL_BYTE_COUNT * 2; + newAllocationSize = INITIAL_BYTE_COUNT * 2L; } } newAllocationSize = BaseAllocator.nextPowerOfTwo(newAllocationSize); @@ -582,7 +582,7 @@ public int sizeOfValueBuffer() { if (valueCount == 0) { return 0; } - return offsetBuffer.getInt(valueCount * OFFSET_WIDTH); + return offsetBuffer.getInt((long) valueCount * OFFSET_WIDTH); } /** @@ -610,7 +610,7 @@ public int getBufferSizeFor(final int valueCount) { final int validityBufferSize = getValidityBufferSizeFromCount(valueCount); final int offsetBufferSize = (valueCount + 1) * OFFSET_WIDTH; /* get the end offset for this valueCount */ - final int dataBufferSize = offsetBuffer.getInt(valueCount * OFFSET_WIDTH); + final int dataBufferSize = offsetBuffer.getInt((long) valueCount * OFFSET_WIDTH); return validityBufferSize + offsetBufferSize + dataBufferSize; } @@ -734,13 +734,13 @@ public void splitAndTransferTo(int startIndex, int length, * in the target vector. */ private void splitAndTransferOffsetBuffer(int startIndex, int length, BaseVariableWidthVector target) { - final int start = offsetBuffer.getInt(startIndex * OFFSET_WIDTH); - final int end = offsetBuffer.getInt((startIndex + length) * OFFSET_WIDTH); + final int start = offsetBuffer.getInt((long) startIndex * OFFSET_WIDTH); + final int end = offsetBuffer.getInt((long) (startIndex + length) * OFFSET_WIDTH); final int dataLength = end - start; - target.allocateOffsetBuffer((length + 1) * OFFSET_WIDTH); + target.allocateOffsetBuffer((long) (length + 1) * OFFSET_WIDTH); for (int i = 0; i < length + 1; i++) { - final int relativeSourceOffset = offsetBuffer.getInt((startIndex + i) * OFFSET_WIDTH) - start; - target.offsetBuffer.setInt(i * OFFSET_WIDTH, relativeSourceOffset); + final int relativeSourceOffset = offsetBuffer.getInt((long) (startIndex + i) * OFFSET_WIDTH) - start; + target.offsetBuffer.setInt((long) i * OFFSET_WIDTH, relativeSourceOffset); } final ArrowBuf slicedBuffer = valueBuffer.slice(start, dataLength); target.valueBuffer = transferBuffer(slicedBuffer, target.allocator); @@ -922,7 +922,7 @@ public int getLastSet() { * @return starting offset for the element */ public long getStartEnd(int index) { - return offsetBuffer.getLong(index * OFFSET_WIDTH); + return offsetBuffer.getLong((long) index * OFFSET_WIDTH); } /** @@ -1106,7 +1106,7 @@ public void set(int index, int isSet, int start, int end, ArrowBuf buffer) { final int dataLength = end - start; fillHoles(index); BitVectorHelper.setValidityBit(validityBuffer, index, isSet); - final int startOffset = offsetBuffer.getInt(index * OFFSET_WIDTH); + final int startOffset = offsetBuffer.getInt((long) index * OFFSET_WIDTH); offsetBuffer.setInt((index + 1) * OFFSET_WIDTH, startOffset + dataLength); valueBuffer.setBytes(startOffset, buffer, start, dataLength); lastSet = index; @@ -1129,8 +1129,8 @@ public void setSafe(int index, int isSet, int start, int end, ArrowBuf buffer) { handleSafe(index, dataLength); fillHoles(index); BitVectorHelper.setValidityBit(validityBuffer, index, isSet); - final int startOffset = offsetBuffer.getInt(index * OFFSET_WIDTH); - offsetBuffer.setInt((index + 1) * OFFSET_WIDTH, startOffset + dataLength); + final int startOffset = offsetBuffer.getInt((long) index * OFFSET_WIDTH); + offsetBuffer.setInt((long) (index + 1) * OFFSET_WIDTH, startOffset + dataLength); valueBuffer.setBytes(startOffset, buffer, start, dataLength); lastSet = index; } @@ -1148,8 +1148,8 @@ public void set(int index, int start, int length, ArrowBuf buffer) { assert index >= 0; fillHoles(index); BitVectorHelper.setBit(validityBuffer, index); - final int startOffset = offsetBuffer.getInt(index * OFFSET_WIDTH); - offsetBuffer.setInt((index + 1) * OFFSET_WIDTH, startOffset + length); + final int startOffset = offsetBuffer.getInt((long) index * OFFSET_WIDTH); + offsetBuffer.setInt((long) (index + 1) * OFFSET_WIDTH, startOffset + length); final ArrowBuf bb = buffer.slice(start, length); valueBuffer.setBytes(startOffset, bb); lastSet = index; @@ -1170,8 +1170,8 @@ public void setSafe(int index, int start, int length, ArrowBuf buffer) { handleSafe(index, length); fillHoles(index); BitVectorHelper.setBit(validityBuffer, index); - final int startOffset = offsetBuffer.getInt(index * OFFSET_WIDTH); - offsetBuffer.setInt((index + 1) * OFFSET_WIDTH, startOffset + length); + final int startOffset = offsetBuffer.getInt((long) index * OFFSET_WIDTH); + offsetBuffer.setInt((long) (index + 1) * OFFSET_WIDTH, startOffset + length); final ArrowBuf bb = buffer.slice(start, length); valueBuffer.setBytes(startOffset, bb); lastSet = index; @@ -1198,7 +1198,7 @@ protected final void setBytes(int index, byte[] value, int start, int length) { */ final int startOffset = getStartOffset(index); /* set new end offset */ - offsetBuffer.setInt((index + 1) * OFFSET_WIDTH, startOffset + length); + offsetBuffer.setInt((long) (index + 1) * OFFSET_WIDTH, startOffset + length); /* store the var length data in value buffer */ valueBuffer.setBytes(startOffset, value, start, length); } @@ -1215,7 +1215,7 @@ protected final int getstartOffset(int index) { } public final int getStartOffset(int index) { - return offsetBuffer.getInt(index * OFFSET_WIDTH); + return offsetBuffer.getInt((long) index * OFFSET_WIDTH); } protected final void handleSafe(int index, int dataLength) { @@ -1260,9 +1260,9 @@ protected final void handleSafe(int index, int dataLength) { * @return array of bytes */ public static byte[] get(final ArrowBuf data, final ArrowBuf offset, int index) { - final int currentStartOffset = offset.getInt(index * OFFSET_WIDTH); + final int currentStartOffset = offset.getInt((long) index * OFFSET_WIDTH); final int dataLength = - offset.getInt((index + 1) * OFFSET_WIDTH) - currentStartOffset; + offset.getInt((long) (index + 1) * OFFSET_WIDTH) - currentStartOffset; final byte[] result = new byte[dataLength]; data.getBytes(currentStartOffset, result, 0, dataLength); return result; @@ -1285,11 +1285,11 @@ public static byte[] get(final ArrowBuf data, final ArrowBuf offset, int index) public static ArrowBuf set(ArrowBuf buffer, BufferAllocator allocator, int valueCount, int index, int value) { if (buffer == null) { - buffer = allocator.buffer(valueCount * OFFSET_WIDTH); + buffer = allocator.buffer((long) valueCount * OFFSET_WIDTH); } - buffer.setInt(index * OFFSET_WIDTH, value); + buffer.setInt((long) index * OFFSET_WIDTH, value); if (index == (valueCount - 1)) { - buffer.writerIndex(valueCount * OFFSET_WIDTH); + buffer.writerIndex((long) valueCount * OFFSET_WIDTH); } return buffer; @@ -1309,17 +1309,17 @@ public void copyFrom(int fromIndex, int thisIndex, ValueVector from) { if (from.isNull(fromIndex)) { fillHoles(thisIndex); BitVectorHelper.unsetBit(this.validityBuffer, thisIndex); - final int copyStart = offsetBuffer.getInt(thisIndex * OFFSET_WIDTH); - offsetBuffer.setInt((thisIndex + 1) * OFFSET_WIDTH, copyStart); + final int copyStart = offsetBuffer.getInt((long) thisIndex * OFFSET_WIDTH); + offsetBuffer.setInt((long) (thisIndex + 1) * OFFSET_WIDTH, copyStart); } else { - final int start = from.getOffsetBuffer().getInt(fromIndex * OFFSET_WIDTH); - final int end = from.getOffsetBuffer().getInt((fromIndex + 1) * OFFSET_WIDTH); + final int start = from.getOffsetBuffer().getInt((long) fromIndex * OFFSET_WIDTH); + final int end = from.getOffsetBuffer().getInt((long) (fromIndex + 1) * OFFSET_WIDTH); final int length = end - start; fillHoles(thisIndex); BitVectorHelper.setBit(this.validityBuffer, thisIndex); - final int copyStart = offsetBuffer.getInt(thisIndex * OFFSET_WIDTH); + final int copyStart = offsetBuffer.getInt((long) thisIndex * OFFSET_WIDTH); from.getDataBuffer().getBytes(start, this.valueBuffer, copyStart, length); - offsetBuffer.setInt((thisIndex + 1) * OFFSET_WIDTH, copyStart + length); + offsetBuffer.setInt((long) (thisIndex + 1) * OFFSET_WIDTH, copyStart + length); } lastSet = thisIndex; } @@ -1341,17 +1341,17 @@ public void copyFromSafe(int fromIndex, int thisIndex, ValueVector from) { fillHoles(thisIndex); BitVectorHelper.unsetBit(this.validityBuffer, thisIndex); final int copyStart = offsetBuffer.getInt(thisIndex * OFFSET_WIDTH); - offsetBuffer.setInt((thisIndex + 1) * OFFSET_WIDTH, copyStart); + offsetBuffer.setInt((long) (thisIndex + 1) * OFFSET_WIDTH, copyStart); } else { - final int start = from.getOffsetBuffer().getInt(fromIndex * OFFSET_WIDTH); - final int end = from.getOffsetBuffer().getInt((fromIndex + 1) * OFFSET_WIDTH); + final int start = from.getOffsetBuffer().getInt((long) fromIndex * OFFSET_WIDTH); + final int end = from.getOffsetBuffer().getInt((long) (fromIndex + 1) * OFFSET_WIDTH); final int length = end - start; handleSafe(thisIndex, length); fillHoles(thisIndex); BitVectorHelper.setBit(this.validityBuffer, thisIndex); - final int copyStart = offsetBuffer.getInt(thisIndex * OFFSET_WIDTH); + final int copyStart = offsetBuffer.getInt((long) thisIndex * OFFSET_WIDTH); from.getDataBuffer().getBytes(start, this.valueBuffer, copyStart, length); - offsetBuffer.setInt((thisIndex + 1) * OFFSET_WIDTH, copyStart + length); + offsetBuffer.setInt((long) (thisIndex + 1) * OFFSET_WIDTH, copyStart + length); } lastSet = thisIndex; } @@ -1366,8 +1366,8 @@ public ArrowBufPointer getDataPointer(int index, ArrowBufPointer reuse) { if (isNull(index)) { reuse.set(null, 0, 0); } else { - int offset = offsetBuffer.getInt(index * OFFSET_WIDTH); - int length = offsetBuffer.getInt((index + 1) * OFFSET_WIDTH) - offset; + int offset = offsetBuffer.getInt((long) index * OFFSET_WIDTH); + int length = offsetBuffer.getInt((long) (index + 1) * OFFSET_WIDTH) - offset; reuse.set(valueBuffer, offset, length); } return reuse; @@ -1397,6 +1397,6 @@ public OUT accept(VectorVisitor visitor, IN value) { * Gets the ending offset of a record, given its index. */ public final int getEndOffset(int index) { - return offsetBuffer.getInt((index + 1) * OFFSET_WIDTH); + return offsetBuffer.getInt((long) (index + 1) * OFFSET_WIDTH); } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VarBinaryVector.java b/java/vector/src/main/java/org/apache/arrow/vector/VarBinaryVector.java index 34e820d2539..798d30fe4a8 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VarBinaryVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VarBinaryVector.java @@ -114,7 +114,7 @@ public byte[] get(int index) { } final int startOffset = getStartOffset(index); final int dataLength = - offsetBuffer.getInt((index + 1) * OFFSET_WIDTH) - startOffset; + offsetBuffer.getInt((long) (index + 1) * OFFSET_WIDTH) - startOffset; final byte[] result = new byte[dataLength]; valueBuffer.getBytes(startOffset, result, 0, dataLength); return result; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VarCharVector.java b/java/vector/src/main/java/org/apache/arrow/vector/VarCharVector.java index 1c4b7087152..e725e2d2839 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VarCharVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VarCharVector.java @@ -111,7 +111,7 @@ public byte[] get(int index) { } final int startOffset = getStartOffset(index); final int dataLength = - offsetBuffer.getInt((index + 1) * OFFSET_WIDTH) - startOffset; + offsetBuffer.getInt((long) (index + 1) * OFFSET_WIDTH) - startOffset; final byte[] result = new byte[dataLength]; valueBuffer.getBytes(startOffset, result, 0, dataLength); return result; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java index e441eb004e7..3408c4541d2 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowReader.java @@ -168,8 +168,8 @@ protected void ensureInitialized() throws IOException { */ protected void initialize() throws IOException { Schema originalSchema = readSchema(); - List fields = new ArrayList<>(); - List vectors = new ArrayList<>(); + List fields = new ArrayList<>(originalSchema.getFields().size()); + List vectors = new ArrayList<>(originalSchema.getFields().size()); Map dictionaries = new HashMap<>(); // Convert fields with dictionaries to have the index type @@ -222,10 +222,10 @@ protected void loadDictionary(ArrowDictionaryBatch dictionaryBatch) { FieldVector vector = dictionary.getVector(); // if is deltaVector, concat it with non-delta vector with the same ID. if (dictionaryBatch.isDelta()) { - FieldVector deltaVector = vector.getField().createVector(allocator); - load(dictionaryBatch, deltaVector); - VectorBatchAppender.batchAppend(vector, deltaVector); - deltaVector.close(); + try (FieldVector deltaVector = vector.getField().createVector(allocator)) { + load(dictionaryBatch, deltaVector); + VectorBatchAppender.batchAppend(vector, deltaVector); + } return; } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java index 1786f05a162..e11989af4c0 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowStreamReader.java @@ -144,7 +144,7 @@ private void checkDictionaries() throws IOException { for (FieldVector vector : getVectorSchemaRoot().getFieldVectors()) { DictionaryEncoding encoding = vector.getField().getDictionary(); if (encoding != null) { - // if the dictionaries it need is not available and the vector is not all null, something was wrong. + // if the dictionaries it needs is not available and the vector is not all null, something was wrong. if (!dictionaries.containsKey(encoding.getId()) && vector.getNullCount() < vector.getValueCount()) { throw new IOException("The dictionary was not available, id was:" + encoding.getId()); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java index f4144fd84ba..b3ee0afa886 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ArrowWriter.java @@ -123,15 +123,19 @@ public void writeBatch() throws IOException { protected ArrowBlock writeDictionaryBatch(ArrowDictionaryBatch batch) throws IOException { ArrowBlock block = MessageSerializer.serialize(out, batch, option); - LOGGER.debug("DictionaryRecordBatch at {}, metadata: {}, body: {}", - block.getOffset(), block.getMetadataLength(), block.getBodyLength()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("DictionaryRecordBatch at {}, metadata: {}, body: {}", + block.getOffset(), block.getMetadataLength(), block.getBodyLength()); + } return block; } protected ArrowBlock writeRecordBatch(ArrowRecordBatch batch) throws IOException { ArrowBlock block = MessageSerializer.serialize(out, batch, option); - LOGGER.debug("RecordBatch at {}, metadata: {}, body: {}", - block.getOffset(), block.getMetadataLength(), block.getBodyLength()); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("RecordBatch at {}, metadata: {}, body: {}", + block.getOffset(), block.getMetadataLength(), block.getBodyLength()); + } return block; } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ReadChannel.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ReadChannel.java index cf114a11750..db79661a8d4 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/ReadChannel.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/ReadChannel.java @@ -83,7 +83,7 @@ public long readFully(ArrowBuf buffer, long length) throws IOException { boolean fullRead = true; long bytesLeft = length; while (fullRead && bytesLeft > 0) { - int bytesToRead = (int) Math.min(length, Integer.MAX_VALUE); + int bytesToRead = (int) Math.min(bytesLeft, Integer.MAX_VALUE); int n = readFully(buffer.nioBuffer(buffer.writerIndex(), bytesToRead)); buffer.writerIndex(buffer.writerIndex() + n); fullRead = n == bytesToRead; diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java index d660a566454..a07daaa581c 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/ArrowRecordBatch.java @@ -17,7 +17,6 @@ package org.apache.arrow.vector.ipc.message; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -218,16 +217,7 @@ public long computeBodyLength() { for (int i = 0; i < buffers.size(); i++) { ArrowBuf buffer = buffers.get(i); ArrowBuffer layout = buffersLayout.get(i); - size += (layout.getOffset() - size); - - long readableBytes = buffer.readableBytes(); - while (readableBytes > 0) { - int nextRead = (int) Math.min(readableBytes, Integer.MAX_VALUE); - ByteBuffer nioBuffer = - buffer.nioBuffer(buffer.readerIndex(), nextRead); - readableBytes -= nextRead; - size += nioBuffer.remaining(); - } + size = layout.getOffset() + buffer.readableBytes(); // round up size to the next multiple of 8 size = DataSizeRoundingUtil.roundUpTo8Multiple(size); diff --git a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageChannelReader.java b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageChannelReader.java index f827292a53c..1c7968d7fa7 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageChannelReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/ipc/message/MessageChannelReader.java @@ -63,7 +63,7 @@ public MessageResult readNext() throws IOException { // Read message body data if defined in message if (result.messageHasBody()) { - int bodyLength = (int) result.getMessageBodyLength(); + long bodyLength = result.getMessageBodyLength(); bodyBuffer = MessageSerializer.readMessageBody(in, bodyLength, allocator); } diff --git a/java/vector/src/test/java/org/apache/arrow/vector/ITTestLargeVector.java b/java/vector/src/test/java/org/apache/arrow/vector/ITTestLargeVector.java index 28461a04336..8b824d6a291 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/ITTestLargeVector.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/ITTestLargeVector.java @@ -181,6 +181,45 @@ public void testLargeFixedSizeBinaryVector() { logger.trace("Successfully released the large vector."); } + @Test + public void testLargeVarCharVector() { + logger.trace("Testing large var char vector."); + + final long bufSize = 4 * 1024 * 1024 * 1024L; + final int vecLength = (int) (bufSize / BaseVariableWidthVector.OFFSET_WIDTH); + final String strElement = "a"; + + try (BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + VarCharVector largeVec = new VarCharVector("vec", allocator)) { + largeVec.allocateNew(vecLength); + + logger.trace("Successfully allocated a vector with capacity " + vecLength); + + for (int i = 0; i < vecLength; i++) { + largeVec.setSafe(i, strElement.getBytes()); + + if ((i + 1) % 10000 == 0) { + logger.trace("Successfully written " + (i + 1) + " values"); + } + } + largeVec.setValueCount(vecLength); + assertTrue(largeVec.getOffsetBuffer().readableBytes() > Integer.MAX_VALUE); + assertTrue(largeVec.getDataBuffer().readableBytes() < Integer.MAX_VALUE); + logger.trace("Successfully written " + vecLength + " values"); + + for (int i = 0; i < vecLength; i++) { + byte[] val = largeVec.get(i); + assertEquals(strElement, new String(val)); + + if ((i + 1) % 10000 == 0) { + logger.trace("Successfully read " + (i + 1) + " values"); + } + } + logger.trace("Successfully read " + vecLength + " values"); + } + logger.trace("Successfully released the large vector."); + } + @Test public void testLargeLargeVarCharVector() { logger.trace("Testing large large var char vector."); diff --git a/java/vector/src/test/java/org/apache/arrow/vector/ipc/ITTestIPCWithLargeArrowBuffers.java b/java/vector/src/test/java/org/apache/arrow/vector/ipc/ITTestIPCWithLargeArrowBuffers.java new file mode 100644 index 00000000000..d3c91fd1443 --- /dev/null +++ b/java/vector/src/test/java/org/apache/arrow/vector/ipc/ITTestIPCWithLargeArrowBuffers.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.vector.ipc; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.Map; + +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.dictionary.Dictionary; +import org.apache.arrow.vector.dictionary.DictionaryProvider; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.DictionaryEncoding; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Integration test for reading/writing {@link org.apache.arrow.vector.VectorSchemaRoot} with + * large (more than 2GB) buffers by {@link ArrowReader} and {@link ArrowWriter}.. + * To run this test, please make sure there is at least 8GB free memory, and 8GB + * free.disk space in the system. + */ +public class ITTestIPCWithLargeArrowBuffers { + + private static final Logger logger = LoggerFactory.getLogger(ITTestIPCWithLargeArrowBuffers.class); + + // 4GB buffer size + static final long BUFFER_SIZE = 4 * 1024 * 1024 * 1024L; + + static final int DICTIONARY_VECTOR_SIZE = (int) (BUFFER_SIZE / BigIntVector.TYPE_WIDTH); + + static final int ENCODED_VECTOR_SIZE = (int) (BUFFER_SIZE / IntVector.TYPE_WIDTH); + + static final String FILE_NAME = "largeArrowData.data"; + + static final long DICTIONARY_ID = 123L; + + static final ArrowType.Int ENCODED_VECTOR_TYPE = new ArrowType.Int(32, true); + + static final DictionaryEncoding DICTIONARY_ENCODING = + new DictionaryEncoding(DICTIONARY_ID, false, ENCODED_VECTOR_TYPE); + + static final FieldType ENCODED_FIELD_TYPE = + new FieldType(true, ENCODED_VECTOR_TYPE, DICTIONARY_ENCODING, null); + + static final Field ENCODED_VECTOR_FIELD = new Field("encoded vector", ENCODED_FIELD_TYPE, null); + + private void testWriteLargeArrowData(boolean streamMode) throws IOException { + // simulate encoding big int as int + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE); + BigIntVector dictVector = new BigIntVector("dic vector", allocator); + FileOutputStream out = new FileOutputStream(FILE_NAME); + IntVector encodedVector = (IntVector) ENCODED_VECTOR_FIELD.createVector(allocator)) { + + // prepare dictionary provider. + DictionaryProvider.MapDictionaryProvider provider = new DictionaryProvider.MapDictionaryProvider(); + Dictionary dictionary = new Dictionary(dictVector, DICTIONARY_ENCODING); + provider.put(dictionary); + + // populate the dictionary vector + dictVector.allocateNew(DICTIONARY_VECTOR_SIZE); + for (int i = 0; i < DICTIONARY_VECTOR_SIZE; i++) { + dictVector.set(i, i); + } + dictVector.setValueCount(DICTIONARY_VECTOR_SIZE); + assertTrue(dictVector.getDataBuffer().capacity() > Integer.MAX_VALUE); + logger.trace("Populating dictionary vector finished"); + + // populate the encoded vector + encodedVector.allocateNew(ENCODED_VECTOR_SIZE); + for (int i = 0; i < ENCODED_VECTOR_SIZE; i++) { + encodedVector.set(i, i % DICTIONARY_VECTOR_SIZE); + } + encodedVector.setValueCount(ENCODED_VECTOR_SIZE); + assertTrue(encodedVector.getDataBuffer().capacity() > Integer.MAX_VALUE); + logger.trace("Populating encoded vector finished"); + + // build vector schema root and write data. + try (VectorSchemaRoot root = + new VectorSchemaRoot( + Arrays.asList(ENCODED_VECTOR_FIELD), Arrays.asList(encodedVector), ENCODED_VECTOR_SIZE); + ArrowWriter writer = streamMode ? + new ArrowStreamWriter(root, provider, out) : + new ArrowFileWriter(root, provider, out.getChannel())) { + writer.start(); + writer.writeBatch(); + writer.end(); + logger.trace("Writing data finished"); + } + } + + assertTrue(new File(FILE_NAME).exists()); + } + + private void testReadLargeArrowData(boolean streamMode) throws IOException { + try (RootAllocator allocator = new RootAllocator(Long.MAX_VALUE); + FileInputStream in = new FileInputStream(FILE_NAME); + ArrowReader reader = streamMode ? + new ArrowStreamReader(in, allocator) : + new ArrowFileReader(in.getChannel(), allocator)) { + + // verify schema + Schema readSchema = reader.getVectorSchemaRoot().getSchema(); + assertEquals(1, readSchema.getFields().size()); + assertEquals(ENCODED_VECTOR_FIELD, readSchema.getFields().get(0)); + logger.trace("Verifying schema finished"); + + // verify vector schema root + assertTrue(reader.loadNextBatch()); + VectorSchemaRoot root = reader.getVectorSchemaRoot(); + + assertEquals(ENCODED_VECTOR_SIZE, root.getRowCount()); + assertEquals(1, root.getFieldVectors().size()); + assertTrue(root.getFieldVectors().get(0) instanceof IntVector); + + IntVector encodedVector = (IntVector) root.getVector(0); + for (int i = 0; i < ENCODED_VECTOR_SIZE; i++) { + assertEquals(i % DICTIONARY_VECTOR_SIZE, encodedVector.get(i)); + } + logger.trace("Verifying encoded vector finished"); + + // verify dictionary + Map dictVectors = reader.getDictionaryVectors(); + assertEquals(1, dictVectors.size()); + Dictionary dictionary = dictVectors.get(DICTIONARY_ID); + assertNotNull(dictionary); + + assertTrue(dictionary.getVector() instanceof BigIntVector); + BigIntVector dictVector = (BigIntVector) dictionary.getVector(); + assertEquals(DICTIONARY_VECTOR_SIZE, dictVector.getValueCount()); + for (int i = 0; i < DICTIONARY_VECTOR_SIZE; i++) { + assertEquals(i, dictVector.get(i)); + } + logger.trace("Verifying dictionary vector finished"); + + // ensure no more data available + assertFalse(reader.loadNextBatch()); + } finally { + File dataFile = new File(FILE_NAME); + dataFile.delete(); + assertFalse(dataFile.exists()); + } + } + + @Test + public void testIPC() throws IOException { + logger.trace("Start testing reading/writing large arrow stream data"); + testWriteLargeArrowData(true); + testReadLargeArrowData(true); + logger.trace("Finish testing reading/writing large arrow stream data"); + + logger.trace("Start testing reading/writing large arrow file data"); + testWriteLargeArrowData(false); + testReadLargeArrowData(false); + logger.trace("Finish testing reading/writing large arrow file data"); + } +}