Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import io.netty.buffer.NettyArrowBuf;
import io.netty.buffer.PooledByteBufAllocatorL;
import io.netty.util.internal.PlatformDependent;

/**
* ArrowBuf serves as a facade over underlying memory by providing
Expand Down Expand Up @@ -230,7 +231,8 @@ public ByteBuffer nioBuffer() {
}

public ByteBuffer nioBuffer(long index, int length) {
return asNettyBuffer().nioBuffer(index, length);
return length == 0 ? ByteBuffer.allocateDirect(0) :
PlatformDependent.directBuffer(memoryAddress() + index, length);
}

public long memoryAddress() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ private void setReaderAndWriterIndex() {
/* specialized handling for BitVector */
valueBuffer.writerIndex(getValidityBufferSizeFromCount(valueCount));
} else {
valueBuffer.writerIndex(valueCount * typeWidth);
valueBuffer.writerIndex((long) valueCount * typeWidth);
}
}
}
Expand Down Expand Up @@ -835,8 +835,8 @@ public void copyFrom(int fromIndex, int thisIndex, ValueVector from) {
BitVectorHelper.unsetBit(this.getValidityBuffer(), thisIndex);
} else {
BitVectorHelper.setBit(this.getValidityBuffer(), thisIndex);
PlatformDependent.copyMemory(from.getDataBuffer().memoryAddress() + fromIndex * typeWidth,
this.getDataBuffer().memoryAddress() + thisIndex * typeWidth, typeWidth);
PlatformDependent.copyMemory(from.getDataBuffer().memoryAddress() + (long) fromIndex * typeWidth,
this.getDataBuffer().memoryAddress() + (long) thisIndex * typeWidth, typeWidth);
}
}

Expand Down Expand Up @@ -878,7 +878,7 @@ public ArrowBufPointer getDataPointer(int index, ArrowBufPointer reuse) {
if (isNull(index)) {
reuse.set(null, 0, 0);
} else {
reuse.set(valueBuffer, index * typeWidth, typeWidth);
reuse.set(valueBuffer, (long) index * typeWidth, typeWidth);
}
return reuse;
}
Expand All @@ -893,8 +893,8 @@ public int hashCode(int index, ArrowBufHasher hasher) {
if (isNull(index)) {
return ArrowBufPointer.NULL_HASH_CODE;
}
int start = typeWidth * index;
int end = typeWidth * (index + 1);
long start = (long) typeWidth * index;
long end = (long) typeWidth * (index + 1);
return ByteFunctionHelpers.hash(hasher, this.getDataBuffer(), start, end);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public double getDensity() {
return 0.0D;
}
final int startOffset = offsetBuffer.getInt(0);
final int endOffset = offsetBuffer.getInt(valueCount * OFFSET_WIDTH);
final int endOffset = offsetBuffer.getInt((long) valueCount * OFFSET_WIDTH);
final double totalListSize = endOffset - startOffset;
return totalListSize / valueCount;
}
Expand Down Expand Up @@ -358,7 +358,7 @@ private void setReaderAndWriterIndex() {
} else {
final int lastDataOffset = getStartOffset(valueCount);
validityBuffer.writerIndex(getValidityBufferSizeFromCount(valueCount));
offsetBuffer.writerIndex((valueCount + 1) * OFFSET_WIDTH);
offsetBuffer.writerIndex((long) (valueCount + 1) * OFFSET_WIDTH);
valueBuffer.writerIndex(lastDataOffset);
}
}
Expand Down Expand Up @@ -503,7 +503,7 @@ public void reallocDataBuffer() {
if (lastValueAllocationSizeInBytes > 0) {
newAllocationSize = lastValueAllocationSizeInBytes;
} else {
newAllocationSize = INITIAL_BYTE_COUNT * 2;
newAllocationSize = INITIAL_BYTE_COUNT * 2L;
}
}
newAllocationSize = BaseAllocator.nextPowerOfTwo(newAllocationSize);
Expand Down Expand Up @@ -582,7 +582,7 @@ public int sizeOfValueBuffer() {
if (valueCount == 0) {
return 0;
}
return offsetBuffer.getInt(valueCount * OFFSET_WIDTH);
return offsetBuffer.getInt((long) valueCount * OFFSET_WIDTH);
}

/**
Expand Down Expand Up @@ -610,7 +610,7 @@ public int getBufferSizeFor(final int valueCount) {
final int validityBufferSize = getValidityBufferSizeFromCount(valueCount);
final int offsetBufferSize = (valueCount + 1) * OFFSET_WIDTH;
/* get the end offset for this valueCount */
final int dataBufferSize = offsetBuffer.getInt(valueCount * OFFSET_WIDTH);
final int dataBufferSize = offsetBuffer.getInt((long) valueCount * OFFSET_WIDTH);
return validityBufferSize + offsetBufferSize + dataBufferSize;
}

Expand Down Expand Up @@ -734,13 +734,13 @@ public void splitAndTransferTo(int startIndex, int length,
* in the target vector.
*/
private void splitAndTransferOffsetBuffer(int startIndex, int length, BaseVariableWidthVector target) {
final int start = offsetBuffer.getInt(startIndex * OFFSET_WIDTH);
final int end = offsetBuffer.getInt((startIndex + length) * OFFSET_WIDTH);
final int start = offsetBuffer.getInt((long) startIndex * OFFSET_WIDTH);
final int end = offsetBuffer.getInt((long) (startIndex + length) * OFFSET_WIDTH);
final int dataLength = end - start;
target.allocateOffsetBuffer((length + 1) * OFFSET_WIDTH);
target.allocateOffsetBuffer((long) (length + 1) * OFFSET_WIDTH);
for (int i = 0; i < length + 1; i++) {
final int relativeSourceOffset = offsetBuffer.getInt((startIndex + i) * OFFSET_WIDTH) - start;
target.offsetBuffer.setInt(i * OFFSET_WIDTH, relativeSourceOffset);
final int relativeSourceOffset = offsetBuffer.getInt((long) (startIndex + i) * OFFSET_WIDTH) - start;
target.offsetBuffer.setInt((long) i * OFFSET_WIDTH, relativeSourceOffset);
}
final ArrowBuf slicedBuffer = valueBuffer.slice(start, dataLength);
target.valueBuffer = transferBuffer(slicedBuffer, target.allocator);
Expand Down Expand Up @@ -922,7 +922,7 @@ public int getLastSet() {
* @return starting offset for the element
*/
public long getStartEnd(int index) {
return offsetBuffer.getLong(index * OFFSET_WIDTH);
return offsetBuffer.getLong((long) index * OFFSET_WIDTH);
}

/**
Expand Down Expand Up @@ -1106,7 +1106,7 @@ public void set(int index, int isSet, int start, int end, ArrowBuf buffer) {
final int dataLength = end - start;
fillHoles(index);
BitVectorHelper.setValidityBit(validityBuffer, index, isSet);
final int startOffset = offsetBuffer.getInt(index * OFFSET_WIDTH);
final int startOffset = offsetBuffer.getInt((long) index * OFFSET_WIDTH);
offsetBuffer.setInt((index + 1) * OFFSET_WIDTH, startOffset + dataLength);
valueBuffer.setBytes(startOffset, buffer, start, dataLength);
lastSet = index;
Expand All @@ -1129,8 +1129,8 @@ public void setSafe(int index, int isSet, int start, int end, ArrowBuf buffer) {
handleSafe(index, dataLength);
fillHoles(index);
BitVectorHelper.setValidityBit(validityBuffer, index, isSet);
final int startOffset = offsetBuffer.getInt(index * OFFSET_WIDTH);
offsetBuffer.setInt((index + 1) * OFFSET_WIDTH, startOffset + dataLength);
final int startOffset = offsetBuffer.getInt((long) index * OFFSET_WIDTH);
offsetBuffer.setInt((long) (index + 1) * OFFSET_WIDTH, startOffset + dataLength);
valueBuffer.setBytes(startOffset, buffer, start, dataLength);
lastSet = index;
}
Expand All @@ -1148,8 +1148,8 @@ public void set(int index, int start, int length, ArrowBuf buffer) {
assert index >= 0;
fillHoles(index);
BitVectorHelper.setBit(validityBuffer, index);
final int startOffset = offsetBuffer.getInt(index * OFFSET_WIDTH);
offsetBuffer.setInt((index + 1) * OFFSET_WIDTH, startOffset + length);
final int startOffset = offsetBuffer.getInt((long) index * OFFSET_WIDTH);
offsetBuffer.setInt((long) (index + 1) * OFFSET_WIDTH, startOffset + length);
final ArrowBuf bb = buffer.slice(start, length);
valueBuffer.setBytes(startOffset, bb);
lastSet = index;
Expand All @@ -1170,8 +1170,8 @@ public void setSafe(int index, int start, int length, ArrowBuf buffer) {
handleSafe(index, length);
fillHoles(index);
BitVectorHelper.setBit(validityBuffer, index);
final int startOffset = offsetBuffer.getInt(index * OFFSET_WIDTH);
offsetBuffer.setInt((index + 1) * OFFSET_WIDTH, startOffset + length);
final int startOffset = offsetBuffer.getInt((long) index * OFFSET_WIDTH);
offsetBuffer.setInt((long) (index + 1) * OFFSET_WIDTH, startOffset + length);
final ArrowBuf bb = buffer.slice(start, length);
valueBuffer.setBytes(startOffset, bb);
lastSet = index;
Expand All @@ -1198,7 +1198,7 @@ protected final void setBytes(int index, byte[] value, int start, int length) {
*/
final int startOffset = getStartOffset(index);
/* set new end offset */
offsetBuffer.setInt((index + 1) * OFFSET_WIDTH, startOffset + length);
offsetBuffer.setInt((long) (index + 1) * OFFSET_WIDTH, startOffset + length);
/* store the var length data in value buffer */
valueBuffer.setBytes(startOffset, value, start, length);
}
Expand All @@ -1215,7 +1215,7 @@ protected final int getstartOffset(int index) {
}

public final int getStartOffset(int index) {
return offsetBuffer.getInt(index * OFFSET_WIDTH);
return offsetBuffer.getInt((long) index * OFFSET_WIDTH);
}

protected final void handleSafe(int index, int dataLength) {
Expand Down Expand Up @@ -1260,9 +1260,9 @@ protected final void handleSafe(int index, int dataLength) {
* @return array of bytes
*/
public static byte[] get(final ArrowBuf data, final ArrowBuf offset, int index) {
final int currentStartOffset = offset.getInt(index * OFFSET_WIDTH);
final int currentStartOffset = offset.getInt((long) index * OFFSET_WIDTH);
final int dataLength =
offset.getInt((index + 1) * OFFSET_WIDTH) - currentStartOffset;
offset.getInt((long) (index + 1) * OFFSET_WIDTH) - currentStartOffset;
final byte[] result = new byte[dataLength];
data.getBytes(currentStartOffset, result, 0, dataLength);
return result;
Expand All @@ -1285,11 +1285,11 @@ public static byte[] get(final ArrowBuf data, final ArrowBuf offset, int index)
public static ArrowBuf set(ArrowBuf buffer, BufferAllocator allocator,
int valueCount, int index, int value) {
if (buffer == null) {
buffer = allocator.buffer(valueCount * OFFSET_WIDTH);
buffer = allocator.buffer((long) valueCount * OFFSET_WIDTH);
}
buffer.setInt(index * OFFSET_WIDTH, value);
buffer.setInt((long) index * OFFSET_WIDTH, value);
if (index == (valueCount - 1)) {
buffer.writerIndex(valueCount * OFFSET_WIDTH);
buffer.writerIndex((long) valueCount * OFFSET_WIDTH);
}

return buffer;
Expand All @@ -1309,17 +1309,17 @@ public void copyFrom(int fromIndex, int thisIndex, ValueVector from) {
if (from.isNull(fromIndex)) {
fillHoles(thisIndex);
BitVectorHelper.unsetBit(this.validityBuffer, thisIndex);
final int copyStart = offsetBuffer.getInt(thisIndex * OFFSET_WIDTH);
offsetBuffer.setInt((thisIndex + 1) * OFFSET_WIDTH, copyStart);
final int copyStart = offsetBuffer.getInt((long) thisIndex * OFFSET_WIDTH);
offsetBuffer.setInt((long) (thisIndex + 1) * OFFSET_WIDTH, copyStart);
} else {
final int start = from.getOffsetBuffer().getInt(fromIndex * OFFSET_WIDTH);
final int end = from.getOffsetBuffer().getInt((fromIndex + 1) * OFFSET_WIDTH);
final int start = from.getOffsetBuffer().getInt((long) fromIndex * OFFSET_WIDTH);
final int end = from.getOffsetBuffer().getInt((long) (fromIndex + 1) * OFFSET_WIDTH);
final int length = end - start;
fillHoles(thisIndex);
BitVectorHelper.setBit(this.validityBuffer, thisIndex);
final int copyStart = offsetBuffer.getInt(thisIndex * OFFSET_WIDTH);
final int copyStart = offsetBuffer.getInt((long) thisIndex * OFFSET_WIDTH);
from.getDataBuffer().getBytes(start, this.valueBuffer, copyStart, length);
offsetBuffer.setInt((thisIndex + 1) * OFFSET_WIDTH, copyStart + length);
offsetBuffer.setInt((long) (thisIndex + 1) * OFFSET_WIDTH, copyStart + length);
}
lastSet = thisIndex;
}
Expand All @@ -1341,17 +1341,17 @@ public void copyFromSafe(int fromIndex, int thisIndex, ValueVector from) {
fillHoles(thisIndex);
BitVectorHelper.unsetBit(this.validityBuffer, thisIndex);
final int copyStart = offsetBuffer.getInt(thisIndex * OFFSET_WIDTH);
offsetBuffer.setInt((thisIndex + 1) * OFFSET_WIDTH, copyStart);
offsetBuffer.setInt((long) (thisIndex + 1) * OFFSET_WIDTH, copyStart);
} else {
final int start = from.getOffsetBuffer().getInt(fromIndex * OFFSET_WIDTH);
final int end = from.getOffsetBuffer().getInt((fromIndex + 1) * OFFSET_WIDTH);
final int start = from.getOffsetBuffer().getInt((long) fromIndex * OFFSET_WIDTH);
final int end = from.getOffsetBuffer().getInt((long) (fromIndex + 1) * OFFSET_WIDTH);
final int length = end - start;
handleSafe(thisIndex, length);
fillHoles(thisIndex);
BitVectorHelper.setBit(this.validityBuffer, thisIndex);
final int copyStart = offsetBuffer.getInt(thisIndex * OFFSET_WIDTH);
final int copyStart = offsetBuffer.getInt((long) thisIndex * OFFSET_WIDTH);
from.getDataBuffer().getBytes(start, this.valueBuffer, copyStart, length);
offsetBuffer.setInt((thisIndex + 1) * OFFSET_WIDTH, copyStart + length);
offsetBuffer.setInt((long) (thisIndex + 1) * OFFSET_WIDTH, copyStart + length);
}
lastSet = thisIndex;
}
Expand All @@ -1366,8 +1366,8 @@ public ArrowBufPointer getDataPointer(int index, ArrowBufPointer reuse) {
if (isNull(index)) {
reuse.set(null, 0, 0);
} else {
int offset = offsetBuffer.getInt(index * OFFSET_WIDTH);
int length = offsetBuffer.getInt((index + 1) * OFFSET_WIDTH) - offset;
int offset = offsetBuffer.getInt((long) index * OFFSET_WIDTH);
int length = offsetBuffer.getInt((long) (index + 1) * OFFSET_WIDTH) - offset;
reuse.set(valueBuffer, offset, length);
}
return reuse;
Expand Down Expand Up @@ -1397,6 +1397,6 @@ public <OUT, IN> OUT accept(VectorVisitor<OUT, IN> visitor, IN value) {
* Gets the ending offset of a record, given its index.
*/
public final int getEndOffset(int index) {
return offsetBuffer.getInt((index + 1) * OFFSET_WIDTH);
return offsetBuffer.getInt((long) (index + 1) * OFFSET_WIDTH);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public byte[] get(int index) {
}
final int startOffset = getStartOffset(index);
final int dataLength =
offsetBuffer.getInt((index + 1) * OFFSET_WIDTH) - startOffset;
offsetBuffer.getInt((long) (index + 1) * OFFSET_WIDTH) - startOffset;
final byte[] result = new byte[dataLength];
valueBuffer.getBytes(startOffset, result, 0, dataLength);
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public byte[] get(int index) {
}
final int startOffset = getStartOffset(index);
final int dataLength =
offsetBuffer.getInt((index + 1) * OFFSET_WIDTH) - startOffset;
offsetBuffer.getInt((long) (index + 1) * OFFSET_WIDTH) - startOffset;
final byte[] result = new byte[dataLength];
valueBuffer.getBytes(startOffset, result, 0, dataLength);
return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@ protected void ensureInitialized() throws IOException {
*/
protected void initialize() throws IOException {
Schema originalSchema = readSchema();
List<Field> fields = new ArrayList<>();
List<FieldVector> vectors = new ArrayList<>();
List<Field> fields = new ArrayList<>(originalSchema.getFields().size());
List<FieldVector> vectors = new ArrayList<>(originalSchema.getFields().size());
Map<Long, Dictionary> dictionaries = new HashMap<>();

// Convert fields with dictionaries to have the index type
Expand Down Expand Up @@ -222,10 +222,10 @@ protected void loadDictionary(ArrowDictionaryBatch dictionaryBatch) {
FieldVector vector = dictionary.getVector();
// if is deltaVector, concat it with non-delta vector with the same ID.
if (dictionaryBatch.isDelta()) {
FieldVector deltaVector = vector.getField().createVector(allocator);
load(dictionaryBatch, deltaVector);
VectorBatchAppender.batchAppend(vector, deltaVector);
deltaVector.close();
try (FieldVector deltaVector = vector.getField().createVector(allocator)) {
load(dictionaryBatch, deltaVector);
VectorBatchAppender.batchAppend(vector, deltaVector);
}
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ private void checkDictionaries() throws IOException {
for (FieldVector vector : getVectorSchemaRoot().getFieldVectors()) {
DictionaryEncoding encoding = vector.getField().getDictionary();
if (encoding != null) {
// if the dictionaries it need is not available and the vector is not all null, something was wrong.
// if the dictionaries it needs is not available and the vector is not all null, something was wrong.
if (!dictionaries.containsKey(encoding.getId()) && vector.getNullCount() < vector.getValueCount()) {
throw new IOException("The dictionary was not available, id was:" + encoding.getId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,15 +123,19 @@ public void writeBatch() throws IOException {

protected ArrowBlock writeDictionaryBatch(ArrowDictionaryBatch batch) throws IOException {
ArrowBlock block = MessageSerializer.serialize(out, batch, option);
LOGGER.debug("DictionaryRecordBatch at {}, metadata: {}, body: {}",
block.getOffset(), block.getMetadataLength(), block.getBodyLength());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("DictionaryRecordBatch at {}, metadata: {}, body: {}",
block.getOffset(), block.getMetadataLength(), block.getBodyLength());
}
return block;
}

protected ArrowBlock writeRecordBatch(ArrowRecordBatch batch) throws IOException {
ArrowBlock block = MessageSerializer.serialize(out, batch, option);
LOGGER.debug("RecordBatch at {}, metadata: {}, body: {}",
block.getOffset(), block.getMetadataLength(), block.getBodyLength());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("RecordBatch at {}, metadata: {}, body: {}",
block.getOffset(), block.getMetadataLength(), block.getBodyLength());
}
return block;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public long readFully(ArrowBuf buffer, long length) throws IOException {
boolean fullRead = true;
long bytesLeft = length;
while (fullRead && bytesLeft > 0) {
int bytesToRead = (int) Math.min(length, Integer.MAX_VALUE);
int bytesToRead = (int) Math.min(bytesLeft, Integer.MAX_VALUE);
int n = readFully(buffer.nioBuffer(buffer.writerIndex(), bytesToRead));
buffer.writerIndex(buffer.writerIndex() + n);
fullRead = n == bytesToRead;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.arrow.vector.ipc.message;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -218,16 +217,7 @@ public long computeBodyLength() {
for (int i = 0; i < buffers.size(); i++) {
ArrowBuf buffer = buffers.get(i);
ArrowBuffer layout = buffersLayout.get(i);
size += (layout.getOffset() - size);

long readableBytes = buffer.readableBytes();
while (readableBytes > 0) {
int nextRead = (int) Math.min(readableBytes, Integer.MAX_VALUE);
ByteBuffer nioBuffer =
buffer.nioBuffer(buffer.readerIndex(), nextRead);
readableBytes -= nextRead;
size += nioBuffer.remaining();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has significant performance overhead.

}
size = layout.getOffset() + buffer.readableBytes();

// round up size to the next multiple of 8
size = DataSizeRoundingUtil.roundUpTo8Multiple(size);
Expand Down
Loading