From 70df0cc402b47f41fc1b31bdcf4b3ba0c532e063 Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Wed, 27 Sep 2017 11:51:57 -0700 Subject: [PATCH 1/2] set lastSet in JsonFileReader and initialize lastSet for ListVector --- .../arrow/vector/complex/ListVector.java | 2 +- .../vector/file/json/JsonFileReader.java | 44 +++++++++++-------- 2 files changed, 27 insertions(+), 19 deletions(-) diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java index 470317f8c07..6511efcb7d5 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java @@ -368,7 +368,7 @@ public UnionVector promoteToUnion() { return vector; } - private int lastSet; + private int lastSet = 0; public class Accessor extends BaseRepeatedAccessor { diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java b/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java index 8bb0f26d978..fbddf41aa69 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java @@ -44,6 +44,8 @@ import org.apache.arrow.vector.Float4Vector; import org.apache.arrow.vector.Float8Vector; import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.NullableVarBinaryVector; +import org.apache.arrow.vector.NullableVarCharVector; import org.apache.arrow.vector.SmallIntVector; import org.apache.arrow.vector.TimeMicroVector; import org.apache.arrow.vector.TimeMilliVector; @@ -63,10 +65,10 @@ import org.apache.arrow.vector.UInt4Vector; import org.apache.arrow.vector.UInt8Vector; import org.apache.arrow.vector.ValueVector; -import org.apache.arrow.vector.ValueVector.Mutator; import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.dictionary.Dictionary; import org.apache.arrow.vector.dictionary.DictionaryProvider; import org.apache.arrow.vector.schema.ArrowVectorType; @@ -84,7 +86,6 @@ import com.google.common.base.Objects; public class JsonFileReader implements AutoCloseable, DictionaryProvider { - private final File inputFile; private final JsonParser parser; private final BufferAllocator allocator; private Schema schema; @@ -93,7 +94,6 @@ public class JsonFileReader implements AutoCloseable, DictionaryProvider { public JsonFileReader(File inputFile, BufferAllocator allocator) throws JsonParseException, IOException { super(); - this.inputFile = inputFile; this.allocator = allocator; MappingJsonFactory jsonFactory = new MappingJsonFactory(); this.parser = jsonFactory.createParser(inputFile); @@ -216,11 +216,6 @@ public VectorSchemaRoot read() throws IOException { } } - /* - * TODO: This method doesn't load some vectors correctly. For instance, it doesn't initialize - * `lastSet` in ListVector, VarCharVector, NullableVarBinaryVector A better way of implementing - * this function is to use `loadFieldBuffers` methods in FieldVector. - */ private void readVector(Field field, FieldVector vector) throws JsonParseException, IOException { List vectorTypes = field.getTypeLayout().getVectorTypes(); List fieldInnerVectors = vector.getFieldInnerVectors(); @@ -234,29 +229,42 @@ private void readVector(Field field, FieldVector vector) throws JsonParseExcepti if (started && !Objects.equal(field.getName(), name)) { throw new IllegalArgumentException("Expected field " + field.getName() + " but got " + name); } + + // Initialize the vector with required capacity int count = readNextField("count", Integer.class); + vector.setInitialCapacity(count); vector.allocateNew(); - vector.getMutator().setValueCount(count); + + // Read inner vectors for (int v = 0; v < vectorTypes.size(); v++) { ArrowVectorType vectorType = vectorTypes.get(v); - BufferBacked innerVector = fieldInnerVectors.get(v); + ValueVector valueVector = (ValueVector) fieldInnerVectors.get(v); nextFieldIs(vectorType.getName()); readToken(START_ARRAY); - ValueVector valueVector = (ValueVector) innerVector; - int innerVectorCount = vectorType.equals(OFFSET) ? count + 1 : count; - valueVector.setInitialCapacity(innerVectorCount); - valueVector.allocateNew(); - for (int i = 0; i < innerVectorCount; i++) { parser.nextToken(); setValueFromParser(valueVector, i); } - Mutator mutator = valueVector.getMutator(); - mutator.setValueCount(innerVectorCount); readToken(END_ARRAY); } - // if children + + // Set lastSet before valueCount to prevent setValueCount from filling empty values + switch (vector.getMinorType()) { + case LIST: + // ListVector starts lastSet from index 0 + ((ListVector) vector).getMutator().setLastSet(count); + break; + case VARBINARY: + ((NullableVarBinaryVector) vector).getMutator().setLastSet(count - 1); + break; + case VARCHAR: + ((NullableVarCharVector) vector).getMutator().setLastSet(count - 1); + break; + } + vector.getMutator().setValueCount(count); + + // read child vectors, if any List fields = field.getChildren(); if (!fields.isEmpty()) { List vectorChildren = vector.getChildrenFromFields(); From 8f97a3dbf45dfd6e9457286b681d7228809b9d1b Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Thu, 28 Sep 2017 16:48:23 -0700 Subject: [PATCH 2/2] added test for VarBinaryVector that checks lastSet after reading --- .../vector/file/json/JsonFileReader.java | 6 ++- .../arrow/vector/file/BaseFileTest.java | 49 +++++++++++++++++++ .../arrow/vector/file/TestArrowFile.java | 41 ++++++++++++++++ .../arrow/vector/file/json/TestJSONFile.java | 29 +++++++++++ 4 files changed, 124 insertions(+), 1 deletion(-) diff --git a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java b/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java index fbddf41aa69..c6ebd61aa07 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/file/json/JsonFileReader.java @@ -216,6 +216,10 @@ public VectorSchemaRoot read() throws IOException { } } + /** + * TODO: A better way of implementing this function is to use `loadFieldBuffers` methods in + * FieldVector to set the inner-vector data as done in `ArrowFileReader`. + */ private void readVector(Field field, FieldVector vector) throws JsonParseException, IOException { List vectorTypes = field.getTypeLayout().getVectorTypes(); List fieldInnerVectors = vector.getFieldInnerVectors(); @@ -252,7 +256,7 @@ private void readVector(Field field, FieldVector vector) throws JsonParseExcepti // Set lastSet before valueCount to prevent setValueCount from filling empty values switch (vector.getMinorType()) { case LIST: - // ListVector starts lastSet from index 0 + // ListVector starts lastSet from index 0, so lastSet value is always last index written + 1 ((ListVector) vector).getMutator().setLastSet(count); break; case VARBINARY: diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/BaseFileTest.java b/java/vector/src/test/java/org/apache/arrow/vector/file/BaseFileTest.java index c05d5904977..ba62de0a6d9 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/file/BaseFileTest.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/file/BaseFileTest.java @@ -32,6 +32,7 @@ import org.apache.arrow.vector.NullableDecimalVector; import org.apache.arrow.vector.NullableIntVector; import org.apache.arrow.vector.NullableTimeMilliVector; +import org.apache.arrow.vector.NullableVarBinaryVector; import org.apache.arrow.vector.NullableVarCharVector; import org.apache.arrow.vector.ValueVector.Accessor; import org.apache.arrow.vector.VectorSchemaRoot; @@ -541,4 +542,52 @@ public void writeUnionData(int count, NullableMapVector parent) { writer.setValueCount(count); varchar.release(); } + + protected void writeVarBinaryData(int count, NullableMapVector parent) { + Assert.assertTrue(count < 100); + ComplexWriter writer = new ComplexWriterImpl("root", parent); + MapWriter rootWriter = writer.rootAsMap(); + ListWriter listWriter = rootWriter.list("list"); + ArrowBuf varbin = allocator.buffer(count); + for (int i = 0; i < count; i++) { + varbin.setByte(i, i); + listWriter.setPosition(i); + listWriter.startList(); + for (int j = 0; j < i % 3; j++) { + listWriter.varBinary().writeVarBinary(0, i + 1, varbin); + } + listWriter.endList(); + } + writer.setValueCount(count); + varbin.release(); + } + + protected void validateVarBinary(int count, VectorSchemaRoot root) { + Assert.assertEquals(count, root.getRowCount()); + ListVector listVector = (ListVector) root.getVector("list"); + byte[] expectedArray = new byte[count]; + int numVarBinaryValues = 0; + for (int i = 0; i < count; i++) { + expectedArray[i] = (byte) i; + Object obj = listVector.getAccessor().getObject(i); + List objList = (List) obj; + if (i % 3 == 0) { + Assert.assertTrue(objList.isEmpty()); + } else { + byte[] expected = Arrays.copyOfRange(expectedArray, 0, i + 1); + for (int j = 0; j < i % 3; j++) { + byte[] result = (byte[]) objList.get(j); + Assert.assertArrayEquals(result, expected); + numVarBinaryValues++; + } + } + } + + // ListVector lastSet should be the index of last value + 1 + Assert.assertEquals(listVector.getMutator().getLastSet(), count); + + // NullableVarBinaryVector lastSet should be the index of last value + NullableVarBinaryVector binaryVector = (NullableVarBinaryVector) listVector.getChildrenFromFields().get(0); + Assert.assertEquals(binaryVector.getMutator().getLastSet(), numVarBinaryValues - 1); + } } diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java index c483ba7de91..81e58989fcc 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/file/TestArrowFile.java @@ -622,6 +622,47 @@ public void testWriteReadFixedSizeList() throws IOException { } } + @Test + public void testWriteReadVarBin() throws IOException { + File file = new File("target/mytest_varbin.arrow"); + ByteArrayOutputStream stream = new ByteArrayOutputStream(); + int count = COUNT; + + // write + try ( + BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); + NullableMapVector parent = NullableMapVector.empty("parent", vectorAllocator)) { + writeVarBinaryData(count, parent); + VectorSchemaRoot root = new VectorSchemaRoot(parent.getChild("root")); + validateVarBinary(count, root); + write(parent.getChild("root"), file, stream); + } + + // read from file + try ( + BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); + FileInputStream fileInputStream = new FileInputStream(file); + ArrowFileReader arrowReader = new ArrowFileReader(fileInputStream.getChannel(), readerAllocator)) { + VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + Schema schema = root.getSchema(); + LOGGER.debug("reading schema: " + schema); + Assert.assertTrue(arrowReader.loadNextBatch()); + validateVarBinary(count, root); + } + + // read from stream + try ( + BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE); + ByteArrayInputStream input = new ByteArrayInputStream(stream.toByteArray()); + ArrowStreamReader arrowReader = new ArrowStreamReader(input, readerAllocator)) { + VectorSchemaRoot root = arrowReader.getVectorSchemaRoot(); + Schema schema = root.getSchema(); + LOGGER.debug("reading schema: " + schema); + Assert.assertTrue(arrowReader.loadNextBatch()); + validateVarBinary(count, root); + } + } + /** * Writes the contents of parents to file. If outStream is non-null, also writes it diff --git a/java/vector/src/test/java/org/apache/arrow/vector/file/json/TestJSONFile.java b/java/vector/src/test/java/org/apache/arrow/vector/file/json/TestJSONFile.java index 960567fc870..ee90d340d7c 100644 --- a/java/vector/src/test/java/org/apache/arrow/vector/file/json/TestJSONFile.java +++ b/java/vector/src/test/java/org/apache/arrow/vector/file/json/TestJSONFile.java @@ -285,4 +285,33 @@ public void testSetStructLength() throws IOException { } } + @Test + public void testWriteReadVarBinJSON() throws IOException { + File file = new File("target/mytest_varbin.json"); + int count = COUNT; + + // write + try ( + BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE); + NullableMapVector parent = NullableMapVector.empty("parent", vectorAllocator)) { + writeVarBinaryData(count, parent); + VectorSchemaRoot root = new VectorSchemaRoot(parent.getChild("root")); + validateVarBinary(count, root); + writeJSON(file, new VectorSchemaRoot(parent.getChild("root")), null); + } + + // read + try ( + BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE)) { + JsonFileReader reader = new JsonFileReader(file, readerAllocator); + Schema schema = reader.start(); + LOGGER.debug("reading schema: " + schema); + + // initialize vectors + try (VectorSchemaRoot root = reader.read();) { + validateVarBinary(count, root); + } + reader.close(); + } + } }