Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ public UnionVector promoteToUnion() {
return vector;
}

private int lastSet;
private int lastSet = 0;

public class Accessor extends BaseRepeatedAccessor {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.NullableVarBinaryVector;
import org.apache.arrow.vector.NullableVarCharVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TimeMicroVector;
import org.apache.arrow.vector.TimeMilliVector;
Expand All @@ -63,10 +65,10 @@
import org.apache.arrow.vector.UInt4Vector;
import org.apache.arrow.vector.UInt8Vector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.ValueVector.Mutator;
import org.apache.arrow.vector.VarBinaryVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.complex.ListVector;
import org.apache.arrow.vector.dictionary.Dictionary;
import org.apache.arrow.vector.dictionary.DictionaryProvider;
import org.apache.arrow.vector.schema.ArrowVectorType;
Expand All @@ -84,7 +86,6 @@
import com.google.common.base.Objects;

public class JsonFileReader implements AutoCloseable, DictionaryProvider {
private final File inputFile;
private final JsonParser parser;
private final BufferAllocator allocator;
private Schema schema;
Expand All @@ -93,7 +94,6 @@ public class JsonFileReader implements AutoCloseable, DictionaryProvider {

public JsonFileReader(File inputFile, BufferAllocator allocator) throws JsonParseException, IOException {
super();
this.inputFile = inputFile;
this.allocator = allocator;
MappingJsonFactory jsonFactory = new MappingJsonFactory();
this.parser = jsonFactory.createParser(inputFile);
Expand Down Expand Up @@ -216,10 +216,9 @@ public VectorSchemaRoot read() throws IOException {
}
}

/*
* TODO: This method doesn't load some vectors correctly. For instance, it doesn't initialize
* `lastSet` in ListVector, VarCharVector, NullableVarBinaryVector A better way of implementing
* this function is to use `loadFieldBuffers` methods in FieldVector.
/**
* TODO: A better way of implementing this function is to use `loadFieldBuffers` methods in
* FieldVector to set the inner-vector data as done in `ArrowFileReader`.
*/
private void readVector(Field field, FieldVector vector) throws JsonParseException, IOException {
List<ArrowVectorType> vectorTypes = field.getTypeLayout().getVectorTypes();
Expand All @@ -234,29 +233,42 @@ private void readVector(Field field, FieldVector vector) throws JsonParseExcepti
if (started && !Objects.equal(field.getName(), name)) {
throw new IllegalArgumentException("Expected field " + field.getName() + " but got " + name);
}

// Initialize the vector with required capacity
int count = readNextField("count", Integer.class);
vector.setInitialCapacity(count);
vector.allocateNew();
vector.getMutator().setValueCount(count);

// Read inner vectors
for (int v = 0; v < vectorTypes.size(); v++) {
ArrowVectorType vectorType = vectorTypes.get(v);
BufferBacked innerVector = fieldInnerVectors.get(v);
ValueVector valueVector = (ValueVector) fieldInnerVectors.get(v);
nextFieldIs(vectorType.getName());
readToken(START_ARRAY);
ValueVector valueVector = (ValueVector) innerVector;

int innerVectorCount = vectorType.equals(OFFSET) ? count + 1 : count;
valueVector.setInitialCapacity(innerVectorCount);
valueVector.allocateNew();

for (int i = 0; i < innerVectorCount; i++) {
parser.nextToken();
setValueFromParser(valueVector, i);
}
Mutator mutator = valueVector.getMutator();
mutator.setValueCount(innerVectorCount);
readToken(END_ARRAY);
}
// if children

// Set lastSet before valueCount to prevent setValueCount from filling empty values
switch (vector.getMinorType()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not crazy about this because case matching makes it hard to maintain for new types of Vectors.

I think the proper way to do this is to use loadFieldBuffers in this class, since loadFieldBuffers should initialize vectors correctly. I think ArrowFileReader uses it too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it's not ideal in that sense but loadFieldBuffers uses nodes that come from FB data in ArrowRecordBatchs. I think it was designed only for that type of data, so I'm not sure how to apply it here for json.

case LIST:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the case matching here complete? I think so but can you confirm these are the only vectors that have "lastSet"?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I double checked and this is it.

// ListVector starts lastSet from index 0, so lastSet value is always last index written + 1
((ListVector) vector).getMutator().setLastSet(count);
break;
case VARBINARY:
((NullableVarBinaryVector) vector).getMutator().setLastSet(count - 1);
break;
case VARCHAR:
((NullableVarCharVector) vector).getMutator().setLastSet(count - 1);
break;
}
vector.getMutator().setValueCount(count);

// read child vectors, if any
List<Field> fields = field.getChildren();
if (!fields.isEmpty()) {
List<FieldVector> vectorChildren = vector.getChildrenFromFields();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.arrow.vector.NullableDecimalVector;
import org.apache.arrow.vector.NullableIntVector;
import org.apache.arrow.vector.NullableTimeMilliVector;
import org.apache.arrow.vector.NullableVarBinaryVector;
import org.apache.arrow.vector.NullableVarCharVector;
import org.apache.arrow.vector.ValueVector.Accessor;
import org.apache.arrow.vector.VectorSchemaRoot;
Expand Down Expand Up @@ -541,4 +542,52 @@ public void writeUnionData(int count, NullableMapVector parent) {
writer.setValueCount(count);
varchar.release();
}

protected void writeVarBinaryData(int count, NullableMapVector parent) {
Assert.assertTrue(count < 100);
ComplexWriter writer = new ComplexWriterImpl("root", parent);
MapWriter rootWriter = writer.rootAsMap();
ListWriter listWriter = rootWriter.list("list");
ArrowBuf varbin = allocator.buffer(count);
for (int i = 0; i < count; i++) {
varbin.setByte(i, i);
listWriter.setPosition(i);
listWriter.startList();
for (int j = 0; j < i % 3; j++) {
listWriter.varBinary().writeVarBinary(0, i + 1, varbin);
}
listWriter.endList();
}
writer.setValueCount(count);
varbin.release();
}

protected void validateVarBinary(int count, VectorSchemaRoot root) {
Assert.assertEquals(count, root.getRowCount());
ListVector listVector = (ListVector) root.getVector("list");
byte[] expectedArray = new byte[count];
int numVarBinaryValues = 0;
for (int i = 0; i < count; i++) {
expectedArray[i] = (byte) i;
Object obj = listVector.getAccessor().getObject(i);
List<?> objList = (List) obj;
if (i % 3 == 0) {
Assert.assertTrue(objList.isEmpty());
} else {
byte[] expected = Arrays.copyOfRange(expectedArray, 0, i + 1);
for (int j = 0; j < i % 3; j++) {
byte[] result = (byte[]) objList.get(j);
Assert.assertArrayEquals(result, expected);
numVarBinaryValues++;
}
}
}

// ListVector lastSet should be the index of last value + 1
Assert.assertEquals(listVector.getMutator().getLastSet(), count);

// NullableVarBinaryVector lastSet should be the index of last value
NullableVarBinaryVector binaryVector = (NullableVarBinaryVector) listVector.getChildrenFromFields().get(0);
Assert.assertEquals(binaryVector.getMutator().getLastSet(), numVarBinaryValues - 1);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@icexelloss I added tests for ListVector with NullableVarBinaryVector that also checks lastSet values. Please take another look when you get a chance, thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the tests were already there covering this scenario in TestListVector.java. I remember adding tests for setLastSet().

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@siddharthteotia I think the issue is TestListVector doesn't test the case that a ListVector is created from json reader.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -622,6 +622,47 @@ public void testWriteReadFixedSizeList() throws IOException {
}
}

@Test
public void testWriteReadVarBin() throws IOException {
File file = new File("target/mytest_varbin.arrow");
ByteArrayOutputStream stream = new ByteArrayOutputStream();
int count = COUNT;

// write
try (
BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
NullableMapVector parent = NullableMapVector.empty("parent", vectorAllocator)) {
writeVarBinaryData(count, parent);
VectorSchemaRoot root = new VectorSchemaRoot(parent.getChild("root"));
validateVarBinary(count, root);
write(parent.getChild("root"), file, stream);
}

// read from file
try (
BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
FileInputStream fileInputStream = new FileInputStream(file);
ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator)) {
VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
Schema schema = root.getSchema();
LOGGER.debug("reading schema: " + schema);
Assert.assertTrue(arrowReader.loadNextBatch());
validateVarBinary(count, root);
}

// read from stream
try (
BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray());
ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator)) {
VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
Schema schema = root.getSchema();
LOGGER.debug("reading schema: " + schema);
Assert.assertTrue(arrowReader.loadNextBatch());
validateVarBinary(count, root);
}
}


/**
* Writes the contents of parents to file. If outStream is non-null, also writes it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,4 +285,33 @@ public void testSetStructLength() throws IOException {
}
}

@Test
public void testWriteReadVarBinJSON() throws IOException {
File file = new File("target/mytest_varbin.json");
int count = COUNT;

// write
try (
BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
NullableMapVector parent = NullableMapVector.empty("parent", vectorAllocator)) {
writeVarBinaryData(count, parent);
VectorSchemaRoot root = new VectorSchemaRoot(parent.getChild("root"));
validateVarBinary(count, root);
writeJSON(file, new VectorSchemaRoot(parent.getChild("root")), null);
}

// read
try (
BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE)) {
JsonFileReader reader = new JsonFileReader(file, readerAllocator);
Schema schema = reader.start();
LOGGER.debug("reading schema: " + schema);

// initialize vectors
try (VectorSchemaRoot root = reader.read();) {
validateVarBinary(count, root);
}
reader.close();
}
}
}