Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.arrow.vector.file;

import static org.apache.arrow.vector.schema.FBSerializables.writeAllStructsToVector;

import java.util.ArrayList;
import java.util.List;

Expand Down Expand Up @@ -52,10 +54,10 @@ public ArrowFooter(Footer footer) {

private static List<ArrowBlock> recordBatches(Footer footer) {
List<ArrowBlock> recordBatches = new ArrayList<>();
Block tempBLock = new Block();
Block tempBlock = new Block();
int recordBatchesLength = footer.recordBatchesLength();
for (int i = 0; i < recordBatchesLength; i++) {
Block block = footer.recordBatches(tempBLock, i);
Block block = footer.recordBatches(tempBlock, i);
recordBatches.add(new ArrowBlock(block.offset(), block.metaDataLength(), block.bodyLength()));
}
return recordBatches;
Expand Down Expand Up @@ -88,23 +90,16 @@ public List<ArrowBlock> getRecordBatches() {
public int writeTo(FlatBufferBuilder builder) {
int schemaIndex = schema.getSchema(builder);
Footer.startDictionariesVector(builder, dictionaries.size());
int dicsOffset = endVector(builder, dictionaries);
int dicsOffset = writeAllStructsToVector(builder, dictionaries);
Footer.startRecordBatchesVector(builder, recordBatches.size());
int rbsOffset = endVector(builder, recordBatches);
int rbsOffset = writeAllStructsToVector(builder, recordBatches);
Footer.startFooter(builder);
Footer.addSchema(builder, schemaIndex);
Footer.addDictionaries(builder, dicsOffset);
Footer.addRecordBatches(builder, rbsOffset);
return Footer.endFooter(builder);
}

private int endVector(FlatBufferBuilder builder, List<ArrowBlock> blocks) {
for (ArrowBlock block : blocks) {
block.writeTo(builder);
}
return builder.endVector();
}

@Override
public int hashCode() {
final int prime = 31;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,24 +161,27 @@ public void testWriteReadComplex() throws IOException {
@Test
public void testWriteReadMultipleRBs() throws IOException {
File file = new File("target/mytest_multiple.arrow");
int count = COUNT;
int[] counts = { 10, 5 };

// write
try (
BufferAllocator originalVectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
MapVector parent = new MapVector("parent", originalVectorAllocator, null);
FileOutputStream fileOutputStream = new FileOutputStream(file);) {
writeData(count, parent);
VectorUnloader vectorUnloader = newVectorUnloader(parent.getChild("root"));
Schema schema = vectorUnloader.getSchema();
writeData(counts[0], parent);
VectorUnloader vectorUnloader0 = newVectorUnloader(parent.getChild("root"));
Schema schema = vectorUnloader0.getSchema();
Assert.assertEquals(2, schema.getFields().size());
try (ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);) {
try (ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch()) {
try (ArrowRecordBatch recordBatch = vectorUnloader0.getRecordBatch()) {
Assert.assertEquals("RB #0", counts[0], recordBatch.getLength());
arrowWriter.writeRecordBatch(recordBatch);
}
parent.allocateNew();
writeData(count, parent);
try (ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch()) {
writeData(counts[1], parent); // if we write the same data we don't catch that the metadata is stored in the wrong order.
VectorUnloader vectorUnloader1 = newVectorUnloader(parent.getChild("root"));
try (ArrowRecordBatch recordBatch = vectorUnloader1.getRecordBatch()) {
Assert.assertEquals("RB #1", counts[1], recordBatch.getLength());
arrowWriter.writeRecordBatch(recordBatch);
}
}
Expand All @@ -195,21 +198,27 @@ public void testWriteReadMultipleRBs() throws IOException {
ArrowFooter footer = arrowReader.readFooter();
Schema schema = footer.getSchema();
LOGGER.debug("reading schema: " + schema);
int i = 0;
try (VectorSchemaRoot root = new VectorSchemaRoot(schema, vectorAllocator);) {
VectorLoader vectorLoader = new VectorLoader(root);
List<ArrowBlock> recordBatches = footer.getRecordBatches();
Assert.assertEquals(2, recordBatches.size());
long previousOffset = 0;
for (ArrowBlock rbBlock : recordBatches) {
Assert.assertTrue(rbBlock.getOffset() + " > " + previousOffset, rbBlock.getOffset() > previousOffset);
previousOffset = rbBlock.getOffset();
Assert.assertEquals(0, rbBlock.getOffset() % 8);
Assert.assertEquals(0, rbBlock.getMetadataLength() % 8);
try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) {
Assert.assertEquals("RB #" + i, counts[i], recordBatch.getLength());
List<ArrowBuffer> buffersLayout = recordBatch.getBuffersLayout();
for (ArrowBuffer arrowBuffer : buffersLayout) {
Assert.assertEquals(0, arrowBuffer.getOffset() % 8);
}
vectorLoader.load(recordBatch);
validateContent(count, root);
validateContent(counts[i], root);
}
++i;
}
}
}
Expand Down