Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,17 @@
public class VectorUnloader {

private final VectorSchemaRoot root;
private final boolean includeNullCount;
private final boolean alignBuffers;

public VectorUnloader(VectorSchemaRoot root) {
this(root, true, true);
}

public VectorUnloader(VectorSchemaRoot root, boolean includeNullCount, boolean alignBuffers) {
this.root = root;
this.includeNullCount = includeNullCount;
this.alignBuffers = alignBuffers;
}

public ArrowRecordBatch getRecordBatch() {
Expand All @@ -40,12 +48,12 @@ public ArrowRecordBatch getRecordBatch() {
for (FieldVector vector : root.getFieldVectors()) {
appendNodes(vector, nodes, buffers);
}
return new ArrowRecordBatch(root.getRowCount(), nodes, buffers);
return new ArrowRecordBatch(root.getRowCount(), nodes, buffers, alignBuffers);
}

private void appendNodes(FieldVector vector, List<ArrowFieldNode> nodes, List<ArrowBuf> buffers) {
Accessor accessor = vector.getAccessor();
nodes.add(new ArrowFieldNode(accessor.getValueCount(), accessor.getNullCount()));
nodes.add(new ArrowFieldNode(accessor.getValueCount(), includeNullCount ? accessor.getNullCount() : -1));
List<ArrowBuf> fieldBuffers = vector.getFieldBuffers();
List<ArrowVectorType> expectedBuffers = vector.getField().getTypeLayout().getVectorTypes();
if (fieldBuffers.size() != expectedBuffers.size()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,16 @@ public class ArrowRecordBatch implements ArrowMessage {

private boolean closed = false;

public ArrowRecordBatch(int length, List<ArrowFieldNode> nodes, List<ArrowBuf> buffers) {
this(length, nodes, buffers, true);
}

/**
* @param length how many rows in this batch
* @param nodes field level info
* @param buffers will be retained until this recordBatch is closed
*/
public ArrowRecordBatch(int length, List<ArrowFieldNode> nodes, List<ArrowBuf> buffers) {
public ArrowRecordBatch(int length, List<ArrowFieldNode> nodes, List<ArrowBuf> buffers, boolean alignBuffers) {
super();
this.length = length;
this.nodes = nodes;
Expand All @@ -66,7 +70,7 @@ public ArrowRecordBatch(int length, List<ArrowFieldNode> nodes, List<ArrowBuf> b
arrowBuffers.add(new ArrowBuffer(0, offset, size));
LOGGER.debug(String.format("Buffer in RecordBatch at %d, length: %d", offset, size));
offset += size;
if (offset % 8 != 0) { // align on 8 byte boundaries
if (alignBuffers && offset % 8 != 0) { // align on 8 byte boundaries
offset += 8 - (offset % 8);
}
}
Expand Down