Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/java.yml
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to include this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lidavidm I was testing Java log enabling via this PR. And this should be removed. Plus I haven't completed the required offset changes to all vector types yet.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you move this to draft if it's not ready for review?

Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ jobs:
env:
JDK: ${{ matrix.jdk }}
MAVEN: ${{ matrix.maven }}
MAVEN_OPTS: -Darrow.memory.debug.allocator=true
steps:
- name: Checkout Arrow
uses: actions/checkout@3df4ab11eba7bda6032a0b82a6bb43b11571feac # v4.0.0
Expand All @@ -86,6 +87,7 @@ jobs:
ARCHERY_DOCKER_USER: ${{ secrets.DOCKERHUB_USER }}
ARCHERY_DOCKER_PASSWORD: ${{ secrets.DOCKERHUB_TOKEN }}
DEVELOCITY_ACCESS_KEY: ${{ secrets.GE_ACCESS_TOKEN }}
JAVA_TOOL_OPTIONS: -Darrow.memory.debug.allocator=true
run: |
archery docker run \
-e CI=true \
Expand Down
4 changes: 2 additions & 2 deletions ci/scripts/java_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,13 @@ if [ "${ARROW_JAVA_JNI}" = "ON" ]; then
fi

# Use `2 * ncores` threads
${mvn} -T 2C clean install
${mvn} -T 2C clean install -Darrow.memory.debug.allocator=true

if [ "${BUILD_DOCS_JAVA}" == "ON" ]; then
# HTTP pooling is turned of to avoid download issues https://issues.apache.org/jira/browse/ARROW-11633
# GH-43378: Maven site plugins not compatible with multithreading
mkdir -p ${build_dir}/docs/java/reference
${mvn} -Dcheckstyle.skip=true -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false clean install site
${mvn} -Dcheckstyle.skip=true -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Darrow.memory.debug.allocator=true clean install site
rsync -a target/site/apidocs/ ${build_dir}/docs/java/reference
fi

Expand Down
9 changes: 7 additions & 2 deletions java/c/src/main/java/org/apache/arrow/c/ArrayImporter.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,11 @@ private void doImport(ArrowArray.Snapshot snapshot) {
checkState(children[i] != NULL, "ArrowArray struct has NULL child at position %s", i);
ArrayImporter childImporter =
new ArrayImporter(allocator, childVectors.get(i), dictionaryProvider);
childImporter.importChild(this, ArrowArray.wrap(children[i]));
ArrowArray childArray = ArrowArray.wrap(children[i]);
ArrowArray.Snapshot childSnapshot = childArray.snapshot();
childSnapshot.offset = snapshot.offset;
childArray.save(childSnapshot);
childImporter.importChild(this, childArray);
}
}

Expand All @@ -124,7 +128,8 @@ private void doImport(ArrowArray.Snapshot snapshot) {
NativeUtil.toJavaArray(snapshot.buffers, checkedCastToInt(snapshot.n_buffers));

try (final BufferImportTypeVisitor visitor =
new BufferImportTypeVisitor(allocator, underlyingAllocation, fieldNode, bufferPointers)) {
new BufferImportTypeVisitor(
allocator, underlyingAllocation, fieldNode, snapshot.offset, bufferPointers)) {
final List<ArrowBuf> buffers;
if (bufferPointers == null || bufferPointers.length == 0) {
buffers = Collections.emptyList();
Expand Down
131 changes: 97 additions & 34 deletions java/c/src/main/java/org/apache/arrow/c/BufferImportTypeVisitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.arrow.util.AutoCloseables;
import org.apache.arrow.util.VisibleForTesting;
import org.apache.arrow.vector.BaseVariableWidthViewVector;
import org.apache.arrow.vector.BitVectorHelper;
import org.apache.arrow.vector.DateDayVector;
import org.apache.arrow.vector.DateMilliVector;
import org.apache.arrow.vector.DurationVector;
Expand Down Expand Up @@ -59,17 +60,20 @@ class BufferImportTypeVisitor implements ArrowType.ArrowTypeVisitor<List<ArrowBu
private final BufferAllocator allocator;
private final ReferenceCountedArrowArray underlyingAllocation;
private final ArrowFieldNode fieldNode;
private final long arrowArrayOffset;
private final long[] buffers;
private final List<ArrowBuf> imported;

BufferImportTypeVisitor(
BufferAllocator allocator,
ReferenceCountedArrowArray underlyingAllocation,
ArrowFieldNode fieldNode,
long arrowArrayOffset,
long[] buffers) {
this.allocator = allocator;
this.underlyingAllocation = underlyingAllocation;
this.fieldNode = fieldNode;
this.arrowArrayOffset = arrowArrayOffset;
this.buffers = buffers;
this.imported = new ArrayList<>();
}
Expand Down Expand Up @@ -110,14 +114,63 @@ private ArrowBuf importFixedBits(ArrowType type, int index, long bitsPerSlot) {
return importBuffer(type, index, capacity);
}

private ArrowBuf importFixedBitsWithOffset(ArrowType type, int index, long bitsPerSlot) {
// TODO: merge with importFixedBits
// Calculate the total capacity needed, including the offset
final long totalSlots = arrowArrayOffset + fieldNode.getLength();
final long totalBits = totalSlots * bitsPerSlot;
final long capacity = DataSizeRoundingUtil.divideBy8Ceil(totalBits);

// Import the buffer with the calculated capacity
ArrowBuf buf = importBuffer(type, index, capacity);

// Calculate the start and end positions in bits
final long startBit = arrowArrayOffset * bitsPerSlot;
final long endBit = (arrowArrayOffset + fieldNode.getLength()) * bitsPerSlot;

// Calculate the start and end positions in bytes
// TODO: this cannot process bit boundaries in slicing
final long startByte = DataSizeRoundingUtil.divideBy8Ceil(startBit);
final long endByte = DataSizeRoundingUtil.divideBy8Ceil(endBit);

if (startByte != endByte) {
return buf.slice(startByte, endByte - startByte);
} else {
ArrowBuf bufCopy = allocator.buffer(buf.capacity());
bufCopy.setZero(0, buf.capacity());
for (int i = 0; i < bufCopy.capacity() * 8; i++) {
int bitIndex = (int) (i + arrowArrayOffset);
if (bitIndex < buf.capacity() * 8) {
if (BitVectorHelper.get(buf, bitIndex) == 1) {
BitVectorHelper.setBit(bufCopy, i);
} else {
BitVectorHelper.unsetBit(bufCopy, i);
}
} else {
BitVectorHelper.unsetBit(bufCopy, i);
}
}
imported.add(bufCopy);
return bufCopy;
}
}

private ArrowBuf importFixedBytes(ArrowType type, int index, long bytesPerSlot) {
final long capacity = bytesPerSlot * fieldNode.getLength();
return importBuffer(type, index, capacity);
}

private ArrowBuf importFixedBytesWithOffset(ArrowType type, int index, long bytesPerSlot) {
final long capacity = bytesPerSlot * (fieldNode.getLength() + arrowArrayOffset);
ArrowBuf buf = importBuffer(type, index, capacity);
return buf.slice(arrowArrayOffset * bytesPerSlot, fieldNode.getLength() * bytesPerSlot);
}

private ArrowBuf importOffsets(ArrowType type, long bytesPerSlot) {
final long capacity = bytesPerSlot * (fieldNode.getLength() + 1);
return importBuffer(type, 1, capacity);
final long capacity = bytesPerSlot * (fieldNode.getLength() + arrowArrayOffset + 1);
ArrowBuf offsets = importBuffer(type, 1, capacity);
return offsets.slice(
arrowArrayOffset * bytesPerSlot, (long) (fieldNode.getLength() + 1) * bytesPerSlot);
}

private ArrowBuf importData(ArrowType type, long capacity) {
Expand All @@ -137,6 +190,20 @@ private ArrowBuf maybeImportBitmap(ArrowType type) {
return importFixedBits(type, 0, /*bitsPerSlot=*/ 1);
}

private ArrowBuf maybeImportBitmapWithOffset(ArrowType type) {
// TODO: merge with maybeImportBitMap
checkState(
buffers.length > 0,
"Expected at least %s buffers for type %s, but found %s",
1,
type,
buffers.length);
if (buffers[0] == NULL) {
return null;
}
return importFixedBitsWithOffset(type, 0, /*bitsPerSlot=*/ 1);
}

@Override
public List<ArrowBuf> visit(ArrowType.Null type) {
checkState(
Expand All @@ -155,18 +222,19 @@ public List<ArrowBuf> visit(ArrowType.Struct type) {

@Override
public List<ArrowBuf> visit(ArrowType.List type) {
return Arrays.asList(maybeImportBitmap(type), importOffsets(type, ListVector.OFFSET_WIDTH));
return Arrays.asList(
maybeImportBitmapWithOffset(type), importOffsets(type, ListVector.OFFSET_WIDTH));
}

@Override
public List<ArrowBuf> visit(ArrowType.LargeList type) {
return Arrays.asList(
maybeImportBitmap(type), importOffsets(type, LargeListVector.OFFSET_WIDTH));
maybeImportBitmapWithOffset(type), importOffsets(type, LargeListVector.OFFSET_WIDTH));
}

@Override
public List<ArrowBuf> visit(ArrowType.FixedSizeList type) {
return Collections.singletonList(maybeImportBitmap(type));
return Collections.singletonList(maybeImportBitmapWithOffset(type));
}

@Override
Expand All @@ -190,7 +258,8 @@ public List<ArrowBuf> visit(ArrowType.Map type) {

@Override
public List<ArrowBuf> visit(ArrowType.Int type) {
return Arrays.asList(maybeImportBitmap(type), importFixedBits(type, 1, type.getBitWidth()));
return Arrays.asList(
maybeImportBitmapWithOffset(type), importFixedBitsWithOffset(type, 1, type.getBitWidth()));
}

@Override
Expand All @@ -212,19 +281,16 @@ public List<ArrowBuf> visit(ArrowType.FloatingPoint type) {

@Override
public List<ArrowBuf> visit(ArrowType.Utf8 type) {
try (ArrowBuf offsets = importOffsets(type, VarCharVector.OFFSET_WIDTH)) {
final int start = offsets.getInt(0);
final int end = offsets.getInt(fieldNode.getLength() * (long) VarCharVector.OFFSET_WIDTH);
checkState(
end >= start,
"Offset buffer for type %s is malformed: start: %s, end: %s",
type,
start,
end);
final int len = end - start;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this line was needed here? Or Am I missing something?

offsets.getReferenceManager().retain();
return Arrays.asList(maybeImportBitmap(type), offsets, importData(type, len));
}
ArrowBuf offsets = importOffsets(type, VarCharVector.OFFSET_WIDTH);
final int start = offsets.getInt(0);
final int end = offsets.getInt((fieldNode.getLength()) * (long) VarCharVector.OFFSET_WIDTH);
checkState(
end >= start,
"Offset buffer for type %s is malformed: start: %s, end: %s",
type,
start,
end);
return Arrays.asList(maybeImportBitmapWithOffset(type), offsets, importData(type, end));
}

private List<ArrowBuf> visitVariableWidthView(ArrowType type) {
Expand All @@ -238,8 +304,8 @@ private List<ArrowBuf> visitVariableWidthView(ArrowType type) {
importBuffer(type, variadicSizeBufferIndex, variadicSizeBufferCapacity);

ArrowBuf view =
importFixedBytes(type, viewBufferIndex, BaseVariableWidthViewVector.ELEMENT_SIZE);
buffers.add(maybeImportBitmap(type));
importFixedBytesWithOffset(type, viewBufferIndex, BaseVariableWidthViewVector.ELEMENT_SIZE);
buffers.add(maybeImportBitmapWithOffset(type));
buffers.add(view);

// 0th buffer is validity buffer
Expand Down Expand Up @@ -280,19 +346,16 @@ public List<ArrowBuf> visit(ArrowType.LargeUtf8 type) {

@Override
public List<ArrowBuf> visit(ArrowType.Binary type) {
try (ArrowBuf offsets = importOffsets(type, VarBinaryVector.OFFSET_WIDTH)) {
final int start = offsets.getInt(0);
final int end = offsets.getInt(fieldNode.getLength() * (long) VarBinaryVector.OFFSET_WIDTH);
checkState(
end >= start,
"Offset buffer for type %s is malformed: start: %s, end: %s",
type,
start,
end);
final int len = end - start;
offsets.getReferenceManager().retain();
return Arrays.asList(maybeImportBitmap(type), offsets, importData(type, len));
}
ArrowBuf offsets = importOffsets(type, VarBinaryVector.OFFSET_WIDTH);
final int start = offsets.getInt(0);
final int end = offsets.getInt(fieldNode.getLength() * (long) VarBinaryVector.OFFSET_WIDTH);
checkState(
end >= start,
"Offset buffer for type %s is malformed: start: %s, end: %s",
type,
start,
end);
return Arrays.asList(maybeImportBitmapWithOffset(type), offsets, importData(type, end));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void importBuffer() throws Exception {
// Note values are all dummy values here
try (BufferImportTypeVisitor notEmptyDataVisitor =
new BufferImportTypeVisitor(
allocator, dummyHandle, new ArrowFieldNode(/* length= */ 1, 0), new long[] {0})) {
allocator, dummyHandle, new ArrowFieldNode(/* length= */ 1, 0), 0, new long[] {0})) {

// Too few buffers
assertThrows(
Expand All @@ -82,7 +82,7 @@ allocator, dummyHandle, new ArrowFieldNode(/* length= */ 1, 0), new long[] {0}))

try (BufferImportTypeVisitor emptyDataVisitor =
new BufferImportTypeVisitor(
allocator, dummyHandle, new ArrowFieldNode(/* length= */ 0, 0), new long[] {0})) {
allocator, dummyHandle, new ArrowFieldNode(/* length= */ 0, 0), 0, new long[] {0})) {

// Too few buffers
assertThrows(
Expand All @@ -106,7 +106,7 @@ void cleanupAfterFailure() throws Exception {
long address = MemoryUtil.allocateMemory(16);
try (BufferImportTypeVisitor visitor =
new BufferImportTypeVisitor(
allocator, dummyHandle, new ArrowFieldNode(0, 0), new long[] {address})) {
allocator, dummyHandle, new ArrowFieldNode(0, 0), 0, new long[] {address})) {
// This fails, but only after we've already imported a buffer.
assertThrows(IllegalStateException.class, () -> visitor.visit(new ArrowType.Int(32, true)));
} finally {
Expand All @@ -123,7 +123,8 @@ void bufferAssociatedWithAllocator() throws Exception {
long baseline = allocator.getAllocatedMemory();
ArrowFieldNode fieldNode = new ArrowFieldNode(fieldLength, 0);
try (BufferImportTypeVisitor visitor =
new BufferImportTypeVisitor(allocator, dummyHandle, fieldNode, new long[] {0, address})) {
new BufferImportTypeVisitor(
allocator, dummyHandle, fieldNode, 0, new long[] {0, address})) {
List<ArrowBuf> buffers = visitor.visit(new ArrowType.Int(32, true));
assertThat(buffers).hasSize(2);
assertThat(buffers.get(0)).isNull();
Expand Down
Loading