From 73edd51c604bbaa758a4d720795e8aacd4ba701e Mon Sep 17 00:00:00 2001 From: tianchen Date: Wed, 14 Aug 2019 22:24:31 +0800 Subject: [PATCH 01/11] initial support array and map --- .../org/apache/arrow/AvroToArrowUtils.java | 52 +++++++---- .../arrow/consumers/AvroArraysConsumer.java | 82 +++++++++++++++++ .../arrow/consumers/AvroMapConsumer.java | 88 ++++++++++++++++++ .../org/apache/arrow/AvroToArrowTest.java | 90 +++++++++++++++++-- .../src/test/resources/schema/test_array.avsc | 23 +++++ .../src/test/resources/schema/test_map.avsc | 23 +++++ .../vector/complex/AbstractStructVector.java | 2 +- .../arrow/vector/complex/ListVector.java | 4 + 8 files changed, 342 insertions(+), 22 deletions(-) create mode 100644 java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java create mode 100644 java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java create mode 100644 java/adapter/avro/src/test/resources/schema/test_array.avsc create mode 100644 java/adapter/avro/src/test/resources/schema/test_map.avsc diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java index 77f34df1479..de8d35c9152 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java @@ -27,18 +27,7 @@ import java.util.Map; import java.util.stream.Collectors; -import org.apache.arrow.consumers.AvroBooleanConsumer; -import org.apache.arrow.consumers.AvroBytesConsumer; -import org.apache.arrow.consumers.AvroDoubleConsumer; -import org.apache.arrow.consumers.AvroFloatConsumer; -import org.apache.arrow.consumers.AvroIntConsumer; -import org.apache.arrow.consumers.AvroLongConsumer; -import org.apache.arrow.consumers.AvroNullConsumer; -import org.apache.arrow.consumers.AvroStringConsumer; -import org.apache.arrow.consumers.AvroUnionsConsumer; -import org.apache.arrow.consumers.CompositeAvroConsumer; -import org.apache.arrow.consumers.Consumer; -import org.apache.arrow.consumers.NullableTypeConsumer; +import org.apache.arrow.consumers.*; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.util.Preconditions; import org.apache.arrow.vector.BigIntVector; @@ -51,6 +40,9 @@ import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.ZeroVector; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.complex.StructVector; import org.apache.arrow.vector.complex.UnionVector; import org.apache.arrow.vector.types.Types; import org.apache.arrow.vector.types.UnionMode; @@ -105,6 +97,10 @@ private static Consumer createConsumer( switch (type) { case UNION: return createUnionConsumer(schema, name, allocator); + case ARRAY: + return createArrayConsumer(schema, name, allocator); + case MAP: + return createMapConsumer(schema, name, allocator); case STRING: arrowType = new ArrowType.Utf8(); fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); @@ -164,6 +160,34 @@ private static Consumer createConsumer( return consumer; } + private static Consumer createArrayConsumer(Schema schema, String name, BufferAllocator allocator) { + + Consumer delegate = createConsumer(schema.getElementType(), schema.getElementType().getName(), allocator); + ListVector listVector = ListVector.empty(name, allocator); + listVector.setDataVector(delegate.getVector()); + + return new AvroArraysConsumer(listVector, delegate); + } + + private static Consumer createMapConsumer(Schema schema, String name, BufferAllocator allocator) { + + StructVector structVector = + new StructVector(name, allocator, FieldType.nullable(ArrowType.Struct.INSTANCE), null); + + Consumer keyConsumer = new AvroStringConsumer(new VarCharVector("key", allocator)); + Consumer valueConsumer = createConsumer(schema.getValueType(), schema.getValueType().getName(), allocator); + + + structVector.putChild(keyConsumer.getVector().getField().getName(), keyConsumer.getVector()); + structVector.putChild(valueConsumer.getVector().getField().getName(), valueConsumer.getVector()); + structVector.allocateNewSafe(); + + MapVector mapVector = MapVector.empty(name, allocator, false); + mapVector.setDataVector(structVector); + + return new AvroMapConsumer(mapVector, keyConsumer, valueConsumer); + } + private static Consumer createUnionConsumer(Schema schema, String name, BufferAllocator allocator) { int size = schema.getTypes().size(); long nullCount = schema.getTypes().stream().filter(s -> s.getType() == Type.NULL).count(); @@ -227,10 +251,6 @@ public static VectorSchemaRoot avroToArrowVectors(Schema schema, Decoder decoder consumers.add(consumer); vectors.add(consumer.getVector()); } - } else if (type == Type.MAP) { - throw new UnsupportedOperationException(); - } else if (type == Type.ARRAY) { - throw new UnsupportedOperationException(); } else if (type == Type.ENUM) { throw new UnsupportedOperationException(); } else { diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java new file mode 100644 index 00000000000..abb9765ef1f --- /dev/null +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import org.apache.arrow.vector.BitVectorHelper; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.impl.IntWriterImpl; +import org.apache.arrow.vector.complex.impl.UnionListWriter; +import org.apache.arrow.vector.complex.writer.BaseWriter; +import org.apache.arrow.vector.complex.writer.IntWriter; +import org.apache.avro.io.Decoder; + +import java.io.IOException; + +/** + * Consumer which consume int type values from avro decoder. + * Write the data to {@link ListVector}. + */ +public class AvroArraysConsumer implements Consumer { + + private final ListVector vector; + + private final Consumer delegate; + + /** + * Instantiate a ArrayConsumer. + */ + public AvroArraysConsumer(ListVector vector, Consumer delegete) { + this.vector = vector; + this.delegate = delegete; + } + + @Override + public void consume(Decoder decoder) throws IOException { + int count = (int) decoder.arrayNext(); + + int idx = vector.getValueCount(); + vector.startNewValue(idx); + for (int i = 0; i < count; i++) { + delegate.consume(decoder); + } + decoder.skipArray(); + + int end = vector.getOffsetBuffer().getInt(idx * 4) + count; + vector.getOffsetBuffer().setInt((idx + 1) * 4, end); + BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), vector.getValueCount()); + + vector.setValueCount(idx + 1); + } + + @Override + public void addNull() { + vector.setValueCount(vector.getValueCount() + 1); + } + + @Override + public void setPosition(int index) { + vector.startNewValue(index); + } + + @Override + public FieldVector getVector() { + return this.vector; + } +} diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java new file mode 100644 index 00000000000..beea504b78e --- /dev/null +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import org.apache.arrow.vector.BitVectorHelper; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.avro.io.Decoder; + +import java.io.IOException; + +/** + * Consumer which consume map type values from avro decoder. + * Write the data to {@link MapVector}. + */ +public class AvroMapConsumer implements Consumer { + + private final MapVector vector; + + private final Consumer keyDelegate; + private final Consumer valueDelegate; + + /** + * Instantiate a AvroMapConsumer. + */ + public AvroMapConsumer(MapVector vector, Consumer keyDelegate, Consumer valueDelegate) { + this.vector = vector; + this.keyDelegate = keyDelegate; + this.valueDelegate = valueDelegate; + } + + @Override + public void consume(Decoder decoder) throws IOException { + int count = (int) decoder.mapNext(); + + int idx = vector.getValueCount(); + vector.startNewValue(idx); + for (int i = 0; i < count; i++) { + keyDelegate.consume(decoder); + valueDelegate.consume(decoder); + } + decoder.skipMap(); + + int end = vector.getOffsetBuffer().getInt(idx * 4) + count; + vector.getOffsetBuffer().setInt((idx + 1) * 4, end); + + + int dataValueCount = vector.getDataVector().getValueCount(); + for (int i = dataValueCount; i < end; i++) { + BitVectorHelper.setValidityBitToOne(vector.getDataVector().getValidityBuffer(), i); + } + + BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), vector.getValueCount()); + vector.setValueCount(idx + 1); + // set data vector valueCount + vector.getDataVector().setValueCount(end); + } + + @Override + public void addNull() { + vector.setValueCount(vector.getValueCount() + 1); + } + + @Override + public void setPosition(int index) { + vector.startNewValue(index); + } + + @Override + public FieldVector getVector() { + return this.vector; + } +} diff --git a/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java b/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java index 47840d61f65..029a1210cb6 100644 --- a/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java +++ b/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java @@ -27,15 +27,17 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Objects; +import java.util.*; import org.apache.arrow.memory.BaseAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.ListVector; +import org.apache.arrow.vector.complex.MapVector; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.util.JsonStringArrayList; +import org.apache.arrow.vector.util.JsonStringHashMap; import org.apache.arrow.vector.util.Text; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; @@ -286,6 +288,52 @@ public void testNullableBooleanType() throws Exception { checkRecordResult(schema, data, root); } + @Test + public void testArrayType() throws Exception { + Schema schema = getSchema("test_array.avsc"); + List data = new ArrayList(Arrays.asList( + Arrays.asList("11", "222", "999"), + Arrays.asList("12222", "2333", "1000"), + Arrays.asList("1rrr", "2ggg"), + Arrays.asList("1vvv", "2bbb"), + Arrays.asList("1fff", "2"))); + + VectorSchemaRoot root = writeAndRead(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + checkArrayResult(data, (ListVector) vector); + } + + @Test + public void testMapType() throws Exception { + Schema schema = getSchema("test_map.avsc"); + + List keys = Arrays.asList("key1", "key2", "key3", "key4", "key5", "key6"); + List vals = Arrays.asList("val1", "val2", "val3", "val4", "val5", "val6"); + + List data = new ArrayList<>(); + LinkedHashMap map1 = new LinkedHashMap(); + map1.put(keys.get(0), vals.get(0)); + map1.put(keys.get(1), vals.get(1)); + data.add(map1); + + LinkedHashMap map2 = new LinkedHashMap(); + map2.put(keys.get(2), vals.get(2)); + map2.put(keys.get(3), vals.get(3)); + data.add(map2); + + LinkedHashMap map3 = new LinkedHashMap(); + map3.put(keys.get(4), vals.get(4)); + map3.put(keys.get(5), vals.get(5)); + data.add(map3); + + VectorSchemaRoot root = writeAndRead(schema, data); + MapVector vector = (MapVector) root.getFieldVectors().get(0); + + checkPrimitiveResult(keys, vector.getDataVector().getChildrenFromFields().get(0)); + checkPrimitiveResult(vals, vector.getDataVector().getChildrenFromFields().get(1)); + } + @Test public void testUnionType() throws Exception { Schema schema = getSchema("test_union.avsc"); @@ -332,7 +380,39 @@ public void testNullableUnionType() throws Exception { checkPrimitiveResult(expected, vector); } - private void checkPrimitiveResult(ArrayList data, FieldVector vector) { + private void checkArrayResult(List expected, ListVector vector) { + assertEquals(expected.size(), vector.getValueCount()); + for (int i = 0; i < expected.size(); i++) { + checkPrimitiveResult(expected.get(i), (JsonStringArrayList) vector.getObject(i)); + } + } + + private void checkMapResult(List expected, MapVector vector) { + assertEquals(expected.size(), vector.getValueCount()); + for (int i = 0; i < expected.size(); i++) { + assertEquals(expected.get(i), vector.getObject(i).toString()); + } + } + + private void checkPrimitiveResult(List expected, List actual) { + assertEquals(expected.size(), actual.size()); + for (int i = 0; i < expected.size(); i++) { + Object value1 = expected.get(i); + Object value2 = actual.get(i); + if (value1 == null) { + assertTrue(value2 == null); + continue; + } + if (value2 instanceof byte[]) { + value2 = ByteBuffer.wrap((byte[]) value2); + } else if (value2 instanceof Text) { + value2 = value2.toString(); + } + assertTrue(Objects.equals(value1, value2)); + } + } + + private void checkPrimitiveResult(List data, FieldVector vector) { assertEquals(data.size(), vector.getValueCount()); for (int i = 0; i < data.size(); i++) { Object value1 = data.get(i); diff --git a/java/adapter/avro/src/test/resources/schema/test_array.avsc b/java/adapter/avro/src/test/resources/schema/test_array.avsc new file mode 100644 index 00000000000..5b75a4031d8 --- /dev/null +++ b/java/adapter/avro/src/test/resources/schema/test_array.avsc @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "array", + "items": "string", + "name": "testArray" +} diff --git a/java/adapter/avro/src/test/resources/schema/test_map.avsc b/java/adapter/avro/src/test/resources/schema/test_map.avsc new file mode 100644 index 00000000000..0dfa3a595bb --- /dev/null +++ b/java/adapter/avro/src/test/resources/schema/test_map.avsc @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "map", + "values": "string", + "name": "testMap" +} diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractStructVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractStructVector.java index 25762fda88c..3295e4e97c2 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractStructVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractStructVector.java @@ -192,7 +192,7 @@ protected ValueVector add(String childName, FieldType fieldType) { * @param name the name of the child to add * @param vector the vector to add as a child */ - protected void putChild(String name, FieldVector vector) { + public void putChild(String name, FieldVector vector) { putVector(name, vector); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java index 81dc8803311..fd9ef0e544d 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java @@ -372,6 +372,10 @@ public FieldVector getDataVector() { return vector; } + public void setDataVector(FieldVector vector) { + this.vector = vector; + } + @Override public TransferPair getTransferPair(String ref, BufferAllocator allocator) { return getTransferPair(ref, allocator, null); From 37789bee8ca6b78c65b5b8da8a82916ab1c3baa3 Mon Sep 17 00:00:00 2001 From: tianchen Date: Fri, 16 Aug 2019 13:31:30 +0800 Subject: [PATCH 02/11] support fixed type --- .../org/apache/arrow/AvroToArrowUtils.java | 37 ++++++++- .../arrow/consumers/AvroArraysConsumer.java | 15 ++-- .../arrow/consumers/AvroFixedConsumer.java | 79 +++++++++++++++++++ .../arrow/consumers/AvroMapConsumer.java | 9 ++- .../consumers/CompositeAvroConsumer.java | 15 +--- .../org/apache/arrow/AvroToArrowTest.java | 31 +++++++- .../src/test/resources/schema/test_fixed.avsc | 23 ++++++ 7 files changed, 180 insertions(+), 29 deletions(-) create mode 100644 java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFixedConsumer.java create mode 100644 java/adapter/avro/src/test/resources/schema/test_fixed.avsc diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java index de8d35c9152..4d2148d6f1e 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java @@ -20,6 +20,7 @@ import static org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE; import static org.apache.arrow.vector.types.FloatingPointPrecision.SINGLE; +import java.io.EOFException; import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -27,12 +28,27 @@ import java.util.Map; import java.util.stream.Collectors; -import org.apache.arrow.consumers.*; +import org.apache.arrow.consumers.AvroArraysConsumer; +import org.apache.arrow.consumers.AvroBooleanConsumer; +import org.apache.arrow.consumers.AvroBytesConsumer; +import org.apache.arrow.consumers.AvroDoubleConsumer; +import org.apache.arrow.consumers.AvroFixedConsumer; +import org.apache.arrow.consumers.AvroFloatConsumer; +import org.apache.arrow.consumers.AvroIntConsumer; +import org.apache.arrow.consumers.AvroLongConsumer; +import org.apache.arrow.consumers.AvroMapConsumer; +import org.apache.arrow.consumers.AvroNullConsumer; +import org.apache.arrow.consumers.AvroStringConsumer; +import org.apache.arrow.consumers.AvroUnionsConsumer; +import org.apache.arrow.consumers.CompositeAvroConsumer; +import org.apache.arrow.consumers.Consumer; +import org.apache.arrow.consumers.NullableTypeConsumer; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.util.Preconditions; import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.BitVector; import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.FixedSizeBinaryVector; import org.apache.arrow.vector.Float4Vector; import org.apache.arrow.vector.Float8Vector; import org.apache.arrow.vector.IntVector; @@ -107,6 +123,12 @@ private static Consumer createConsumer( vector = fieldType.createNewSingleVector(name, allocator, null); consumer = new AvroStringConsumer((VarCharVector) vector); break; + case FIXED: + arrowType = new ArrowType.FixedSizeBinary(schema.getFixedSize()); + fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); + vector = fieldType.createNewSingleVector(name, allocator, null); + consumer = new AvroFixedConsumer((FixedSizeBinaryVector) vector, schema.getFixedSize()); + break; case INT: arrowType = new ArrowType.Int(32, /*signed=*/true); fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); @@ -266,10 +288,17 @@ public static VectorSchemaRoot avroToArrowVectors(Schema schema, Decoder decoder VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 0); - CompositeAvroConsumer compositeConsumer = null; + CompositeAvroConsumer compositeConsumer = new CompositeAvroConsumer(consumers); + + int valueCount = 0; try { - compositeConsumer = new CompositeAvroConsumer(consumers); - compositeConsumer.consume(decoder, root); + while (true) { + compositeConsumer.consume(decoder, root); + valueCount++; + } + } catch (EOFException eof) { + // reach the end of encoder stream. + root.setRowCount(valueCount); } catch (Exception e) { compositeConsumer.close(); throw new RuntimeException("Error occurs while consume process.", e); diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java index abb9765ef1f..99a4f58299c 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java @@ -17,18 +17,13 @@ package org.apache.arrow.consumers; +import java.io.IOException; + import org.apache.arrow.vector.BitVectorHelper; import org.apache.arrow.vector.FieldVector; -import org.apache.arrow.vector.IntVector; import org.apache.arrow.vector.complex.ListVector; -import org.apache.arrow.vector.complex.impl.IntWriterImpl; -import org.apache.arrow.vector.complex.impl.UnionListWriter; -import org.apache.arrow.vector.complex.writer.BaseWriter; -import org.apache.arrow.vector.complex.writer.IntWriter; import org.apache.avro.io.Decoder; -import java.io.IOException; - /** * Consumer which consume int type values from avro decoder. * Write the data to {@link ListVector}. @@ -79,4 +74,10 @@ public void setPosition(int index) { public FieldVector getVector() { return this.vector; } + + @Override + public void close() throws Exception { + vector.close(); + delegate.close(); + } } diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFixedConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFixedConsumer.java new file mode 100644 index 00000000000..e47ec9d6037 --- /dev/null +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFixedConsumer.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.FixedSizeBinaryVector; +import org.apache.arrow.vector.complex.impl.FixedSizeBinaryWriterImpl; +import org.apache.arrow.vector.complex.writer.FixedSizeBinaryWriter; +import org.apache.avro.io.Decoder; + +import io.netty.buffer.ArrowBuf; + +/** + * Consumer which consume fixed type values from avro decoder. + * Write the data to {@link org.apache.arrow.vector.FixedSizeBinaryVector}. + */ +public class AvroFixedConsumer implements Consumer { + + private final FixedSizeBinaryWriter writer; + private final FixedSizeBinaryVector vector; + + private final ArrowBuf cacheBuffer; + private final byte[] reuseBytes; + + /** + * Instantiate a AvroFixedConsumer. + */ + public AvroFixedConsumer(FixedSizeBinaryVector vector, int size) { + this.vector = vector; + this.writer = new FixedSizeBinaryWriterImpl(vector); + cacheBuffer = vector.getAllocator().buffer(size); + reuseBytes = new byte[size]; + } + + @Override + public void consume(Decoder decoder) throws IOException { + decoder.readFixed(reuseBytes); + cacheBuffer.setBytes(0, reuseBytes); + writer.writeFixedSizeBinary(cacheBuffer); + writer.setPosition(writer.getPosition() + 1); + } + + @Override + public void addNull() { + writer.setPosition(writer.getPosition() + 1); + } + + @Override + public void setPosition(int index) { + writer.setPosition(index); + } + + @Override + public FieldVector getVector() { + return vector; + } + + @Override + public void close() throws Exception { + writer.close(); + } +} diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java index beea504b78e..cb422c004b5 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java @@ -17,13 +17,13 @@ package org.apache.arrow.consumers; +import java.io.IOException; + import org.apache.arrow.vector.BitVectorHelper; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.complex.MapVector; import org.apache.avro.io.Decoder; -import java.io.IOException; - /** * Consumer which consume map type values from avro decoder. * Write the data to {@link MapVector}. @@ -85,4 +85,9 @@ public void setPosition(int index) { public FieldVector getVector() { return this.vector; } + + @Override + public void close() throws Exception { + + } } diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/CompositeAvroConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/CompositeAvroConsumer.java index 0f4b3c3afd9..b334a83ad6a 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/CompositeAvroConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/CompositeAvroConsumer.java @@ -17,7 +17,6 @@ package org.apache.arrow.consumers; -import java.io.EOFException; import java.io.IOException; import java.util.List; @@ -40,18 +39,8 @@ public CompositeAvroConsumer(List consumers) { * Consume decoder data and write into {@link VectorSchemaRoot}. */ public void consume(Decoder decoder, VectorSchemaRoot root) throws IOException { - int valueCount = 0; - while (true) { - try { - for (Consumer consumer : consumers) { - consumer.consume(decoder); - } - valueCount++; - //reach end will throw EOFException. - } catch (EOFException eofException) { - root.setRowCount(valueCount); - break; - } + for (Consumer consumer : consumers) { + consumer.consume(decoder); } } diff --git a/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java b/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java index 029a1210cb6..bd5383523ce 100644 --- a/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java +++ b/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java @@ -27,7 +27,11 @@ import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.Paths; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Objects; import org.apache.arrow.memory.BaseAllocator; import org.apache.arrow.memory.RootAllocator; @@ -35,9 +39,7 @@ import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.MapVector; -import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.util.JsonStringArrayList; -import org.apache.arrow.vector.util.JsonStringHashMap; import org.apache.arrow.vector.util.Text; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; @@ -334,6 +336,26 @@ public void testMapType() throws Exception { checkPrimitiveResult(vals, vector.getDataVector().getChildrenFromFields().get(1)); } + @Test + public void testFixedType() throws Exception { + Schema schema = getSchema("test_fixed.avsc"); + + List data = new ArrayList<>(); + List expected = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + byte[] value = ("value" + i).getBytes(StandardCharsets.UTF_8); + expected.add(value); + GenericData.Fixed fixed = new GenericData.Fixed(schema); + fixed.bytes(value); + data.add(fixed); + } + + VectorSchemaRoot root = writeAndRead(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + checkPrimitiveResult(expected, vector); + } + @Test public void testUnionType() throws Exception { Schema schema = getSchema("test_union.avsc"); @@ -423,6 +445,9 @@ private void checkPrimitiveResult(List data, FieldVector vector) { } if (value2 instanceof byte[]) { value2 = ByteBuffer.wrap((byte[]) value2); + if (value1 instanceof byte[]) { + value1 = ByteBuffer.wrap((byte[]) value1); + } } else if (value2 instanceof Text) { value2 = value2.toString(); } diff --git a/java/adapter/avro/src/test/resources/schema/test_fixed.avsc b/java/adapter/avro/src/test/resources/schema/test_fixed.avsc new file mode 100644 index 00000000000..a4d96e9ab55 --- /dev/null +++ b/java/adapter/avro/src/test/resources/schema/test_fixed.avsc @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "fixed", + "size": 6, + "name": "testFixed" +} From 790471f6e87ff7ab85a892f1627dd0f857a84341 Mon Sep 17 00:00:00 2001 From: tianchen Date: Fri, 16 Aug 2019 20:13:12 +0800 Subject: [PATCH 03/11] fix style --- .../avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java | 1 - .../main/java/org/apache/arrow/consumers/AvroMapConsumer.java | 4 +++- .../main/java/org/apache/arrow/vector/complex/ListVector.java | 3 +++ 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java index 4d2148d6f1e..167c1b4f89b 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java @@ -199,7 +199,6 @@ private static Consumer createMapConsumer(Schema schema, String name, BufferAllo Consumer keyConsumer = new AvroStringConsumer(new VarCharVector("key", allocator)); Consumer valueConsumer = createConsumer(schema.getValueType(), schema.getValueType().getName(), allocator); - structVector.putChild(keyConsumer.getVector().getField().getName(), keyConsumer.getVector()); structVector.putChild(valueConsumer.getVector().getField().getName(), valueConsumer.getVector()); structVector.allocateNewSafe(); diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java index cb422c004b5..0fa422d1137 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java @@ -88,6 +88,8 @@ public FieldVector getVector() { @Override public void close() throws Exception { - + vector.close(); + keyDelegate.close(); + valueDelegate.close(); } } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java index fd9ef0e544d..ccc0305cee4 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java @@ -372,6 +372,9 @@ public FieldVector getDataVector() { return vector; } + /** + * Directly set the underlying vector. + */ public void setDataVector(FieldVector vector) { this.vector = vector; } From f435f66d00193ab7b7ab5a794cb903e093a889fd Mon Sep 17 00:00:00 2001 From: tianchen Date: Sat, 17 Aug 2019 12:08:07 +0800 Subject: [PATCH 04/11] add comments and fix array/map consumers --- .../org/apache/arrow/AvroToArrowUtils.java | 4 ++++ .../arrow/consumers/AvroArraysConsumer.java | 19 ++++++++++++++++--- .../arrow/consumers/AvroMapConsumer.java | 18 +++++++++++++++--- 3 files changed, 35 insertions(+), 6 deletions(-) diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java index 167c1b4f89b..9992a01fd28 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java @@ -193,16 +193,20 @@ private static Consumer createArrayConsumer(Schema schema, String name, BufferAl private static Consumer createMapConsumer(Schema schema, String name, BufferAllocator allocator) { + // create struct vector as underlying vector of the map vector, its children vectors are from delegates. StructVector structVector = new StructVector(name, allocator, FieldType.nullable(ArrowType.Struct.INSTANCE), null); + // crate keyConsumer and valueConsumer to consume key/value respectively. Consumer keyConsumer = new AvroStringConsumer(new VarCharVector("key", allocator)); Consumer valueConsumer = createConsumer(schema.getValueType(), schema.getValueType().getName(), allocator); + // directly put delegates vectors to struct vector. structVector.putChild(keyConsumer.getVector().getField().getName(), keyConsumer.getVector()); structVector.putChild(valueConsumer.getVector().getField().getName(), valueConsumer.getVector()); structVector.allocateNewSafe(); + // crate map vector and set the struct vector as its underlying vector. MapVector mapVector = MapVector.empty(name, allocator, false); mapVector.setDataVector(structVector); diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java index 99a4f58299c..16a7d20cf2b 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java @@ -34,6 +34,11 @@ public class AvroArraysConsumer implements Consumer { private final Consumer delegate; + /** + * Indicated whether has read the first block of this array. + */ + private boolean firstRead; + /** * Instantiate a ArrayConsumer. */ @@ -44,16 +49,24 @@ public AvroArraysConsumer(ListVector vector, Consumer delegete) { @Override public void consume(Decoder decoder) throws IOException { - int count = (int) decoder.arrayNext(); + + long count; + if (!firstRead) { + count = decoder.readArrayStart(); + firstRead = true; + } else { + do { + count = decoder.arrayNext(); + } while (count == 0); + } int idx = vector.getValueCount(); vector.startNewValue(idx); for (int i = 0; i < count; i++) { delegate.consume(decoder); } - decoder.skipArray(); - int end = vector.getOffsetBuffer().getInt(idx * 4) + count; + int end = (int) (vector.getOffsetBuffer().getInt(idx * 4) + count); vector.getOffsetBuffer().setInt((idx + 1) * 4, end); BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), vector.getValueCount()); diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java index 0fa422d1137..fef96dd48ea 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java @@ -35,6 +35,11 @@ public class AvroMapConsumer implements Consumer { private final Consumer keyDelegate; private final Consumer valueDelegate; + /** + * Indicated whether has read the first block of this map. + */ + private boolean firstRead; + /** * Instantiate a AvroMapConsumer. */ @@ -46,7 +51,15 @@ public AvroMapConsumer(MapVector vector, Consumer keyDelegate, Consumer valueDel @Override public void consume(Decoder decoder) throws IOException { - int count = (int) decoder.mapNext(); + long count; + if (!firstRead) { + count = decoder.readMapStart(); + firstRead = true; + } else { + do { + count = decoder.mapNext(); + } while (count == 0); + } int idx = vector.getValueCount(); vector.startNewValue(idx); @@ -54,9 +67,8 @@ public void consume(Decoder decoder) throws IOException { keyDelegate.consume(decoder); valueDelegate.consume(decoder); } - decoder.skipMap(); - int end = vector.getOffsetBuffer().getInt(idx * 4) + count; + int end = (int) (vector.getOffsetBuffer().getInt(idx * 4) + count); vector.getOffsetBuffer().setInt((idx + 1) * 4, end); From bb5346ebb2731aa761f36e4b0a69365f1d097ff8 Mon Sep 17 00:00:00 2001 From: tianchen Date: Tue, 20 Aug 2019 00:35:11 +0800 Subject: [PATCH 05/11] fix array and map consumer --- .../java/org/apache/arrow/consumers/AvroArraysConsumer.java | 5 ++--- .../java/org/apache/arrow/consumers/AvroMapConsumer.java | 5 ++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java index 16a7d20cf2b..dc89a97dec5 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java @@ -55,9 +55,7 @@ public void consume(Decoder decoder) throws IOException { count = decoder.readArrayStart(); firstRead = true; } else { - do { - count = decoder.arrayNext(); - } while (count == 0); + count = decoder.arrayNext(); } int idx = vector.getValueCount(); @@ -71,6 +69,7 @@ public void consume(Decoder decoder) throws IOException { BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), vector.getValueCount()); vector.setValueCount(idx + 1); + decoder.skipArray(); } @Override diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java index fef96dd48ea..0b3d80ad402 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java @@ -56,9 +56,7 @@ public void consume(Decoder decoder) throws IOException { count = decoder.readMapStart(); firstRead = true; } else { - do { - count = decoder.mapNext(); - } while (count == 0); + count = decoder.mapNext(); } int idx = vector.getValueCount(); @@ -81,6 +79,7 @@ public void consume(Decoder decoder) throws IOException { vector.setValueCount(idx + 1); // set data vector valueCount vector.getDataVector().setValueCount(end); + decoder.skipMap(); } @Override From 54c66621b5caa60b2c5d7ed61299a8113390e249 Mon Sep 17 00:00:00 2001 From: tianchen Date: Tue, 20 Aug 2019 21:36:05 +0800 Subject: [PATCH 06/11] avoid mem copy --- .../main/java/org/apache/arrow/AvroToArrowUtils.java | 4 ++-- .../org/apache/arrow/consumers/AvroArraysConsumer.java | 2 +- .../org/apache/arrow/consumers/AvroBytesConsumer.java | 9 +-------- .../org/apache/arrow/consumers/AvroFixedConsumer.java | 7 +------ .../org/apache/arrow/consumers/AvroMapConsumer.java | 1 - .../org/apache/arrow/consumers/AvroStringConsumer.java | 10 +--------- 6 files changed, 6 insertions(+), 27 deletions(-) diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java index 9992a01fd28..44c52c4d020 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java @@ -195,7 +195,7 @@ private static Consumer createMapConsumer(Schema schema, String name, BufferAllo // create struct vector as underlying vector of the map vector, its children vectors are from delegates. StructVector structVector = - new StructVector(name, allocator, FieldType.nullable(ArrowType.Struct.INSTANCE), null); + new StructVector(name, allocator, FieldType.nullable(ArrowType.Struct.INSTANCE), /*callBack=*/null); // crate keyConsumer and valueConsumer to consume key/value respectively. Consumer keyConsumer = new AvroStringConsumer(new VarCharVector("key", allocator)); @@ -207,7 +207,7 @@ private static Consumer createMapConsumer(Schema schema, String name, BufferAllo structVector.allocateNewSafe(); // crate map vector and set the struct vector as its underlying vector. - MapVector mapVector = MapVector.empty(name, allocator, false); + MapVector mapVector = MapVector.empty(name, allocator, /*keySorted=*/false); mapVector.setDataVector(structVector); return new AvroMapConsumer(mapVector, keyConsumer, valueConsumer); diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java index dc89a97dec5..ec7ca3b1efd 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java @@ -25,7 +25,7 @@ import org.apache.avro.io.Decoder; /** - * Consumer which consume int type values from avro decoder. + * Consumer which consume array type values from avro decoder. * Write the data to {@link ListVector}. */ public class AvroArraysConsumer implements Consumer { diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java index c0cfaec5c9b..ea1e8315365 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java @@ -24,7 +24,6 @@ import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.complex.impl.VarBinaryWriterImpl; import org.apache.arrow.vector.complex.writer.VarBinaryWriter; -import org.apache.arrow.vector.holders.VarBinaryHolder; import org.apache.avro.io.Decoder; /** @@ -57,17 +56,11 @@ public void addNull() { } private void writeValue(Decoder decoder) throws IOException { - VarBinaryHolder holder = new VarBinaryHolder(); // cacheBuffer is initialized null and create in the first consume, // if its capacity < size to read, decoder will create a new one with new capacity. cacheBuffer = decoder.readBytes(cacheBuffer); - - holder.start = 0; - holder.end = cacheBuffer.limit(); - holder.buffer = vector.getAllocator().buffer(cacheBuffer.limit()); - holder.buffer.setBytes(0, cacheBuffer, 0, cacheBuffer.limit()); - writer.write(holder); + vector.setSafe(writer.getPosition(), cacheBuffer, 0, cacheBuffer.limit()); } @Override diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFixedConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFixedConsumer.java index e47ec9d6037..77edefc7087 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFixedConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFixedConsumer.java @@ -25,8 +25,6 @@ import org.apache.arrow.vector.complex.writer.FixedSizeBinaryWriter; import org.apache.avro.io.Decoder; -import io.netty.buffer.ArrowBuf; - /** * Consumer which consume fixed type values from avro decoder. * Write the data to {@link org.apache.arrow.vector.FixedSizeBinaryVector}. @@ -36,7 +34,6 @@ public class AvroFixedConsumer implements Consumer { private final FixedSizeBinaryWriter writer; private final FixedSizeBinaryVector vector; - private final ArrowBuf cacheBuffer; private final byte[] reuseBytes; /** @@ -45,15 +42,13 @@ public class AvroFixedConsumer implements Consumer { public AvroFixedConsumer(FixedSizeBinaryVector vector, int size) { this.vector = vector; this.writer = new FixedSizeBinaryWriterImpl(vector); - cacheBuffer = vector.getAllocator().buffer(size); reuseBytes = new byte[size]; } @Override public void consume(Decoder decoder) throws IOException { decoder.readFixed(reuseBytes); - cacheBuffer.setBytes(0, reuseBytes); - writer.writeFixedSizeBinary(cacheBuffer); + vector.setSafe(writer.getPosition(), reuseBytes); writer.setPosition(writer.getPosition() + 1); } diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java index 0b3d80ad402..1c70eb9def4 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java @@ -69,7 +69,6 @@ public void consume(Decoder decoder) throws IOException { int end = (int) (vector.getOffsetBuffer().getInt(idx * 4) + count); vector.getOffsetBuffer().setInt((idx + 1) * 4, end); - int dataValueCount = vector.getDataVector().getValueCount(); for (int i = dataValueCount; i < end; i++) { BitVectorHelper.setValidityBitToOne(vector.getDataVector().getValidityBuffer(), i); diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java index 850d699607c..baca6f06e94 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java @@ -24,7 +24,6 @@ import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.complex.impl.VarCharWriterImpl; import org.apache.arrow.vector.complex.writer.VarCharWriter; -import org.apache.arrow.vector.holders.VarCharHolder; import org.apache.avro.io.Decoder; /** @@ -57,18 +56,11 @@ public void addNull() { } private void writeValue(Decoder decoder) throws IOException { - VarCharHolder holder = new VarCharHolder(); // cacheBuffer is initialized null and create in the first consume, // if its capacity < size to read, decoder will create a new one with new capacity. cacheBuffer = decoder.readBytes(cacheBuffer); - - holder.start = 0; - holder.end = cacheBuffer.limit(); - holder.buffer = vector.getAllocator().buffer(cacheBuffer.limit()); - holder.buffer.setBytes(0, cacheBuffer, 0, cacheBuffer.limit()); - - writer.write(holder); + vector.setSafe(writer.getPosition(), cacheBuffer, 0, cacheBuffer.limit()); } @Override From a415e93559614ed9e71a4bc57be02250dabb3693 Mon Sep 17 00:00:00 2001 From: tianchen Date: Wed, 21 Aug 2019 12:42:25 +0800 Subject: [PATCH 07/11] fix array/map consume logic --- .../arrow/consumers/AvroArraysConsumer.java | 28 ++++++------------- .../arrow/consumers/AvroMapConsumer.java | 25 ++++++----------- 2 files changed, 17 insertions(+), 36 deletions(-) diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java index ec7ca3b1efd..8be627e1c6a 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java @@ -34,42 +34,32 @@ public class AvroArraysConsumer implements Consumer { private final Consumer delegate; - /** - * Indicated whether has read the first block of this array. - */ - private boolean firstRead; - /** * Instantiate a ArrayConsumer. */ - public AvroArraysConsumer(ListVector vector, Consumer delegete) { + public AvroArraysConsumer(ListVector vector, Consumer delegate) { this.vector = vector; - this.delegate = delegete; + this.delegate = delegate; } @Override public void consume(Decoder decoder) throws IOException { - long count; - if (!firstRead) { - count = decoder.readArrayStart(); - firstRead = true; - } else { - count = decoder.arrayNext(); - } - int idx = vector.getValueCount(); vector.startNewValue(idx); - for (int i = 0; i < count; i++) { - delegate.consume(decoder); + long totalCount = 0; + for (long count = decoder.readArrayStart(); count != 0; count = decoder.arrayNext()) { + totalCount += count; + for (int element = 0; element < count; element++) { + delegate.consume(decoder); + } } - int end = (int) (vector.getOffsetBuffer().getInt(idx * 4) + count); + int end = (int) (vector.getOffsetBuffer().getInt(idx * 4) + totalCount); vector.getOffsetBuffer().setInt((idx + 1) * 4, end); BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), vector.getValueCount()); vector.setValueCount(idx + 1); - decoder.skipArray(); } @Override diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java index 1c70eb9def4..c26a9862dab 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java @@ -35,11 +35,6 @@ public class AvroMapConsumer implements Consumer { private final Consumer keyDelegate; private final Consumer valueDelegate; - /** - * Indicated whether has read the first block of this map. - */ - private boolean firstRead; - /** * Instantiate a AvroMapConsumer. */ @@ -51,22 +46,19 @@ public AvroMapConsumer(MapVector vector, Consumer keyDelegate, Consumer valueDel @Override public void consume(Decoder decoder) throws IOException { - long count; - if (!firstRead) { - count = decoder.readMapStart(); - firstRead = true; - } else { - count = decoder.mapNext(); - } int idx = vector.getValueCount(); vector.startNewValue(idx); - for (int i = 0; i < count; i++) { - keyDelegate.consume(decoder); - valueDelegate.consume(decoder); + long totalCount = 0; + for (long count = decoder.readArrayStart(); count != 0; count = decoder.arrayNext()) { + totalCount += count; + for (int element = 0; element < count; element++) { + keyDelegate.consume(decoder); + valueDelegate.consume(decoder); + } } - int end = (int) (vector.getOffsetBuffer().getInt(idx * 4) + count); + int end = (int) (vector.getOffsetBuffer().getInt(idx * 4) + totalCount); vector.getOffsetBuffer().setInt((idx + 1) * 4, end); int dataValueCount = vector.getDataVector().getValueCount(); @@ -78,7 +70,6 @@ public void consume(Decoder decoder) throws IOException { vector.setValueCount(idx + 1); // set data vector valueCount vector.getDataVector().setValueCount(end); - decoder.skipMap(); } @Override From 363952039b8f811b568528d628d48eb3f16690f6 Mon Sep 17 00:00:00 2001 From: tianchen Date: Fri, 23 Aug 2019 20:33:51 +0800 Subject: [PATCH 08/11] refactor --- .../org/apache/arrow/AvroToArrowUtils.java | 214 ++++++++++++++---- .../arrow/consumers/AvroArraysConsumer.java | 19 +- .../arrow/consumers/AvroBooleanConsumer.java | 15 +- .../arrow/consumers/AvroBytesConsumer.java | 27 +-- .../arrow/consumers/AvroDoubleConsumer.java | 15 +- .../arrow/consumers/AvroFixedConsumer.java | 16 +- .../arrow/consumers/AvroFloatConsumer.java | 15 +- .../arrow/consumers/AvroIntConsumer.java | 15 +- .../arrow/consumers/AvroLongConsumer.java | 15 +- .../arrow/consumers/AvroMapConsumer.java | 31 +-- .../arrow/consumers/AvroStringConsumer.java | 25 +- .../arrow/consumers/AvroStructConsumer.java | 79 +++++++ .../arrow/consumers/AvroUnionsConsumer.java | 22 +- .../vector/complex/AbstractStructVector.java | 2 +- .../arrow/vector/complex/ListVector.java | 7 - 15 files changed, 329 insertions(+), 188 deletions(-) create mode 100644 java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStructConsumer.java diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java index 44c52c4d020..28fb1c5206c 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java @@ -23,6 +23,8 @@ import java.io.EOFException; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -39,6 +41,7 @@ import org.apache.arrow.consumers.AvroMapConsumer; import org.apache.arrow.consumers.AvroNullConsumer; import org.apache.arrow.consumers.AvroStringConsumer; +import org.apache.arrow.consumers.AvroStructConsumer; import org.apache.arrow.consumers.AvroUnionsConsumer; import org.apache.arrow.consumers.CompositeAvroConsumer; import org.apache.arrow.consumers.Consumer; @@ -89,19 +92,37 @@ public class AvroToArrowUtils { *
  • DOUBLE --> ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
  • *
  • BOOLEAN --> ArrowType.Bool
  • *
  • BYTES --> ArrowType.Binary
  • + *
  • ARRAY --> ArrowType.List
  • + *
  • MAP --> ArrowType.Map
  • + *
  • FIXED --> ArrowType.FixedSizeBinary
  • * */ + private static Consumer createConsumer(Schema schema, String name, BufferAllocator allocator) { - return createConsumer(schema, name, false, INVALID_NULL_INDEX, allocator); + return createConsumer(schema, name, false, INVALID_NULL_INDEX, allocator, null); + } + + private static Consumer createConsumer(Schema schema, String name, BufferAllocator allocator, FieldVector vector) { + return createConsumer(schema, name, false, INVALID_NULL_INDEX, allocator, vector); } + /** + * Create a consumer with the given avro schema + * @param schema avro schema + * @param name arrow field name + * @param v vector to keep in consumer, if v == null, will create a new vector via field. + * @return consumer + */ private static Consumer createConsumer( Schema schema, String name, boolean nullable, int nullIndex, - BufferAllocator allocator) { + BufferAllocator allocator, + FieldVector v) { + Preconditions.checkNotNull(schema, "Avro schema object can't be null"); + Preconditions.checkNotNull(allocator, "allocator can't be null"); Type type = schema.getType(); @@ -112,63 +133,71 @@ private static Consumer createConsumer( switch (type) { case UNION: - return createUnionConsumer(schema, name, allocator); + consumer = createUnionConsumer(schema, name, allocator, v); + break; case ARRAY: - return createArrayConsumer(schema, name, allocator); + consumer = createArrayConsumer(schema, name, allocator, v); + break; case MAP: - return createMapConsumer(schema, name, allocator); + consumer = createMapConsumer(schema, name, allocator, v); + break; + //TODO implement enum and nested record type + case RECORD: + throw new UnsupportedOperationException(); + case ENUM: + throw new UnsupportedOperationException(); case STRING: arrowType = new ArrowType.Utf8(); fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); - vector = fieldType.createNewSingleVector(name, allocator, null); + vector = v != null ? v : fieldType.createNewSingleVector(name, allocator, null); consumer = new AvroStringConsumer((VarCharVector) vector); break; case FIXED: arrowType = new ArrowType.FixedSizeBinary(schema.getFixedSize()); fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); - vector = fieldType.createNewSingleVector(name, allocator, null); + vector = v != null ? v : fieldType.createNewSingleVector(name, allocator, null); consumer = new AvroFixedConsumer((FixedSizeBinaryVector) vector, schema.getFixedSize()); break; case INT: arrowType = new ArrowType.Int(32, /*signed=*/true); fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); - vector = fieldType.createNewSingleVector(name, allocator, null); + vector = v != null ? v : fieldType.createNewSingleVector(name, allocator, null); consumer = new AvroIntConsumer((IntVector) vector); break; case BOOLEAN: arrowType = new ArrowType.Bool(); fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); - vector = fieldType.createNewSingleVector(name, allocator, null); + vector = v != null ? v : fieldType.createNewSingleVector(name, allocator, null); consumer = new AvroBooleanConsumer((BitVector) vector); break; case LONG: arrowType = new ArrowType.Int(64, /*signed=*/true); fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); - vector = fieldType.createNewSingleVector(name, allocator, null); + vector = v != null ? v : fieldType.createNewSingleVector(name, allocator, null); consumer = new AvroLongConsumer((BigIntVector) vector); break; case FLOAT: arrowType = new ArrowType.FloatingPoint(SINGLE); fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); - vector = fieldType.createNewSingleVector(name, allocator, null); + vector = v != null ? v : fieldType.createNewSingleVector(name, allocator, null); consumer = new AvroFloatConsumer((Float4Vector) vector); break; case DOUBLE: arrowType = new ArrowType.FloatingPoint(DOUBLE); fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); - vector = fieldType.createNewSingleVector(name, allocator, null); + vector = v != null ? v : fieldType.createNewSingleVector(name, allocator, null); consumer = new AvroDoubleConsumer((Float8Vector) vector); break; case BYTES: arrowType = new ArrowType.Binary(); fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); - vector = fieldType.createNewSingleVector(name, allocator, null); + vector = v != null ? v : fieldType.createNewSingleVector(name, allocator, null); consumer = new AvroBytesConsumer((VarBinaryVector) vector); break; case NULL: arrowType = new ArrowType.Null(); fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); - vector = fieldType.createNewSingleVector(name, allocator, null); + vector = v != null ? v : fieldType.createNewSingleVector(name, allocator, null); consumer = new AvroNullConsumer((ZeroVector) vector); break; default: @@ -182,45 +211,139 @@ private static Consumer createConsumer( return consumer; } - private static Consumer createArrayConsumer(Schema schema, String name, BufferAllocator allocator) { + private static String getDefaultFieldName(ArrowType type) { + Types.MinorType minorType = Types.getMinorTypeForArrowType(type); + return minorType.name().toLowerCase(); + } - Consumer delegate = createConsumer(schema.getElementType(), schema.getElementType().getName(), allocator); - ListVector listVector = ListVector.empty(name, allocator); - listVector.setDataVector(delegate.getVector()); + private static Field avroSchemaToField(Schema schema, String name) { + final Type type = schema.getType(); + final ArrowType arrowType; + + switch (type) { + case UNION: + List children = new ArrayList<>(); + for (int i = 0; i < schema.getTypes().size(); i++) { + Schema childSchema = schema.getTypes().get(i); + // Union child vector should use default name + children.add(avroSchemaToField(childSchema, null)); + } + arrowType = new ArrowType.Union(UnionMode.Sparse, null); + if (name == null) { + name = getDefaultFieldName(arrowType); + } + return new Field(name, FieldType.nullable(arrowType), children); + case ARRAY: + Schema elementSchema = schema.getElementType(); + arrowType = new ArrowType.List(); + if (name == null) { + name = getDefaultFieldName(arrowType); + } + return new Field(name, FieldType.nullable(arrowType), + Collections.singletonList(avroSchemaToField(elementSchema, elementSchema.getName()))); + case MAP: + // MapVector internal struct field and key field should be non-nullable + FieldType keyFieldType = new FieldType(/*nullable=*/false, new ArrowType.Utf8(), /*dictionary=*/null); + Field keyField = new Field("key", keyFieldType, /*children=*/null); + Field valueField = avroSchemaToField(schema.getValueType(), "value"); + + FieldType structFieldType = new FieldType(false, new ArrowType.Struct(), /*dictionary=*/null); + Field structField = new Field("internal", structFieldType, Arrays.asList(keyField, valueField)); + arrowType = new ArrowType.Map(/*keySorted=*/false); + if (name == null) { + name = getDefaultFieldName(arrowType); + } + return new Field(name, FieldType.nullable(arrowType), Collections.singletonList(structField)); + case STRING: + arrowType = new ArrowType.Utf8(); + break; + case FIXED: + arrowType = new ArrowType.FixedSizeBinary(schema.getFixedSize()); + break; + case INT: + arrowType = new ArrowType.Int(32, /*signed=*/true); + break; + case BOOLEAN: + arrowType = new ArrowType.Bool(); + break; + case LONG: + arrowType = new ArrowType.Int(64, /*signed=*/true); + break; + case FLOAT: + arrowType = new ArrowType.FloatingPoint(SINGLE); + break; + case DOUBLE: + arrowType = new ArrowType.FloatingPoint(DOUBLE); + break; + case BYTES: + arrowType = new ArrowType.Binary(); + break; + case NULL: + arrowType = new ArrowType.Null(); + break; + default: + // no-op, shouldn't get here + throw new UnsupportedOperationException(); + } + + if (name == null) { + name = getDefaultFieldName(arrowType); + } + return Field.nullable(name, arrowType); + } + + private static Consumer createArrayConsumer(Schema schema, String name, BufferAllocator allocator, FieldVector v) { + + ListVector listVector; + if (v == null) { + final Field field = avroSchemaToField(schema, name); + listVector = (ListVector) field.createVector(allocator); + } else { + listVector = (ListVector) v; + } + + FieldVector dataVector = listVector.getDataVector(); + + // create delegate + Schema childSchema = schema.getElementType(); + Consumer delegate = createConsumer(childSchema, childSchema.getName(), allocator, dataVector); return new AvroArraysConsumer(listVector, delegate); } - private static Consumer createMapConsumer(Schema schema, String name, BufferAllocator allocator) { + private static Consumer createMapConsumer(Schema schema, String name, BufferAllocator allocator, FieldVector v) { - // create struct vector as underlying vector of the map vector, its children vectors are from delegates. - StructVector structVector = - new StructVector(name, allocator, FieldType.nullable(ArrowType.Struct.INSTANCE), /*callBack=*/null); + MapVector mapVector; + if (v == null) { + final Field field = avroSchemaToField(schema, name); + mapVector = (MapVector) field.createVector(allocator); + } else { + mapVector = (MapVector) v; + } - // crate keyConsumer and valueConsumer to consume key/value respectively. - Consumer keyConsumer = new AvroStringConsumer(new VarCharVector("key", allocator)); - Consumer valueConsumer = createConsumer(schema.getValueType(), schema.getValueType().getName(), allocator); + // create delegate struct consumer + StructVector structVector = (StructVector) mapVector.getDataVector(); - // directly put delegates vectors to struct vector. - structVector.putChild(keyConsumer.getVector().getField().getName(), keyConsumer.getVector()); - structVector.putChild(valueConsumer.getVector().getField().getName(), valueConsumer.getVector()); - structVector.allocateNewSafe(); + // keys in avro map are always assumed to be strings. + Consumer keyConsumer = new AvroStringConsumer( + (VarCharVector) structVector.getChildrenFromFields().get(0)); + Consumer valueConsumer = createConsumer(schema.getValueType(), schema.getValueType().getName(), + allocator, structVector.getChildrenFromFields().get(1)); - // crate map vector and set the struct vector as its underlying vector. - MapVector mapVector = MapVector.empty(name, allocator, /*keySorted=*/false); - mapVector.setDataVector(structVector); + AvroStructConsumer internalConsumer = + new AvroStructConsumer(structVector, new Consumer[] {keyConsumer, valueConsumer}); - return new AvroMapConsumer(mapVector, keyConsumer, valueConsumer); + return new AvroMapConsumer(mapVector, internalConsumer); } - private static Consumer createUnionConsumer(Schema schema, String name, BufferAllocator allocator) { + private static Consumer createUnionConsumer(Schema schema, String name, BufferAllocator allocator, FieldVector v) { int size = schema.getTypes().size(); long nullCount = schema.getTypes().stream().filter(s -> s.getType() == Type.NULL).count(); // union only has one type, convert to primitive type if (size == 1) { Schema subSchema = schema.getTypes().get(0); - return createConsumer(subSchema, name, allocator); + return createConsumer(subSchema, name, allocator, v); // size == 2 and has null type, convert to nullable primitive type } else if (size == 2 && nullCount == 1) { @@ -228,25 +351,30 @@ private static Consumer createUnionConsumer(Schema schema, String name, BufferAl int nullIndex = schema.getTypes().indexOf(nullSchema); Schema subSchema = schema.getTypes().stream().filter(s -> s.getType() != Type.NULL).findFirst().get(); Preconditions.checkNotNull(subSchema, "schema should not be null."); - return createConsumer(subSchema, name, true, nullIndex, allocator); + return createConsumer(subSchema, name, true, nullIndex, allocator, v); // real union type } else { - final FieldType fieldType = new FieldType(/*nullable=*/true, - new ArrowType.Union(UnionMode.Sparse, null), /*dictionary=*/null, getMetaData(schema)); - UnionVector unionVector = - (UnionVector) fieldType.createNewSingleVector(name, allocator, null); + UnionVector unionVector; + if (v == null) { + final Field field = avroSchemaToField(schema, name); + unionVector = (UnionVector) field.createVector(allocator); + } else { + unionVector = (UnionVector) v; + } + + List childVectors = unionVector.getChildrenFromFields(); Consumer[] delegates = new Consumer[size]; Types.MinorType[] types = new Types.MinorType[size]; for (int i = 0; i < size; i++) { + FieldVector child = childVectors.get(i); Schema subSchema = schema.getTypes().get(i); - Consumer delegate = createConsumer(subSchema, subSchema.getName(), allocator); - unionVector.directAddVector(delegate.getVector()); + Consumer delegate = createConsumer(subSchema, subSchema.getName(), allocator, child); delegates[i] = delegate; - types[i] = delegate.getVector().getMinorType(); + types[i] = child.getMinorType(); } return new AvroUnionsConsumer(unionVector, delegates, types); } diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java index 8be627e1c6a..de54be6b32b 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java @@ -19,7 +19,6 @@ import java.io.IOException; -import org.apache.arrow.vector.BitVectorHelper; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.complex.ListVector; import org.apache.avro.io.Decoder; @@ -31,9 +30,10 @@ public class AvroArraysConsumer implements Consumer { private final ListVector vector; - private final Consumer delegate; + private int currentIndex = 0; + /** * Instantiate a ArrayConsumer. */ @@ -45,8 +45,7 @@ public AvroArraysConsumer(ListVector vector, Consumer delegate) { @Override public void consume(Decoder decoder) throws IOException { - int idx = vector.getValueCount(); - vector.startNewValue(idx); + vector.startNewValue(currentIndex); long totalCount = 0; for (long count = decoder.readArrayStart(); count != 0; count = decoder.arrayNext()) { totalCount += count; @@ -54,22 +53,18 @@ public void consume(Decoder decoder) throws IOException { delegate.consume(decoder); } } - - int end = (int) (vector.getOffsetBuffer().getInt(idx * 4) + totalCount); - vector.getOffsetBuffer().setInt((idx + 1) * 4, end); - BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), vector.getValueCount()); - - vector.setValueCount(idx + 1); + vector.endValue(currentIndex, (int) totalCount); + currentIndex++; } @Override public void addNull() { - vector.setValueCount(vector.getValueCount() + 1); + currentIndex++; } @Override public void setPosition(int index) { - vector.startNewValue(index); + currentIndex = index; } @Override diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java index c2876f1d39d..73c5e72134f 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java @@ -21,8 +21,6 @@ import org.apache.arrow.vector.BitVector; import org.apache.arrow.vector.FieldVector; -import org.apache.arrow.vector.complex.impl.BitWriterImpl; -import org.apache.arrow.vector.complex.writer.BitWriter; import org.apache.avro.io.Decoder; /** @@ -31,31 +29,30 @@ */ public class AvroBooleanConsumer implements Consumer { - private final BitWriter writer; private final BitVector vector; + private int currentIndex = 0; /** * Instantiate a AvroBooleanConsumer. */ public AvroBooleanConsumer(BitVector vector) { this.vector = vector; - this.writer = new BitWriterImpl(vector); } @Override public void consume(Decoder decoder) throws IOException { - writer.writeBit(decoder.readBoolean() ? 1 : 0); - writer.setPosition(writer.getPosition() + 1); + vector.setSafe(currentIndex, decoder.readBoolean() ? 1 : 0); + currentIndex++; } @Override public void addNull() { - writer.setPosition(writer.getPosition() + 1); + currentIndex++; } @Override public void setPosition(int index) { - writer.setPosition(index); + currentIndex = index; } @Override @@ -65,7 +62,7 @@ public FieldVector getVector() { @Override public void close() throws Exception { - writer.close(); + vector.close(); } } diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java index ea1e8315365..d179c090d79 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java @@ -22,8 +22,6 @@ import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VarBinaryVector; -import org.apache.arrow.vector.complex.impl.VarBinaryWriterImpl; -import org.apache.arrow.vector.complex.writer.VarBinaryWriter; import org.apache.avro.io.Decoder; /** @@ -32,40 +30,35 @@ */ public class AvroBytesConsumer implements Consumer { - private final VarBinaryWriter writer; private final VarBinaryVector vector; private ByteBuffer cacheBuffer; + private int currentIndex; + /** * Instantiate a AvroBytesConsumer. */ public AvroBytesConsumer(VarBinaryVector vector) { this.vector = vector; - this.writer = new VarBinaryWriterImpl(vector); } @Override public void consume(Decoder decoder) throws IOException { - writeValue(decoder); - writer.setPosition(writer.getPosition() + 1); + // cacheBuffer is initialized null and create in the first consume, + // if its capacity < size to read, decoder will create a new one with new capacity. + cacheBuffer = decoder.readBytes(cacheBuffer); + vector.setSafe(currentIndex, cacheBuffer, 0, cacheBuffer.limit()); + currentIndex++; } @Override public void addNull() { - writer.setPosition(writer.getPosition() + 1); - } - - private void writeValue(Decoder decoder) throws IOException { - - // cacheBuffer is initialized null and create in the first consume, - // if its capacity < size to read, decoder will create a new one with new capacity. - cacheBuffer = decoder.readBytes(cacheBuffer); - vector.setSafe(writer.getPosition(), cacheBuffer, 0, cacheBuffer.limit()); + currentIndex++; } @Override public void setPosition(int index) { - writer.setPosition(index); + currentIndex = index; } @Override @@ -75,6 +68,6 @@ public FieldVector getVector() { @Override public void close() throws Exception { - writer.close(); + vector.close(); } } diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java index 65388314fff..f92f325cfac 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java @@ -21,8 +21,6 @@ import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.Float8Vector; -import org.apache.arrow.vector.complex.impl.Float8WriterImpl; -import org.apache.arrow.vector.complex.writer.Float8Writer; import org.apache.avro.io.Decoder; /** @@ -31,31 +29,30 @@ */ public class AvroDoubleConsumer implements Consumer { - private final Float8Writer writer; private final Float8Vector vector; + private int currentIndex; + /** * Instantiate a AvroDoubleConsumer. */ public AvroDoubleConsumer(Float8Vector vector) { this.vector = vector; - this.writer = new Float8WriterImpl(vector); } @Override public void consume(Decoder decoder) throws IOException { - writer.writeFloat8(decoder.readDouble()); - writer.setPosition(writer.getPosition() + 1); + vector.setSafe(currentIndex++, decoder.readDouble()); } @Override public void addNull() { - writer.setPosition(writer.getPosition() + 1); + currentIndex++; } @Override public void setPosition(int index) { - writer.setPosition(index); + currentIndex = index; } @Override @@ -65,6 +62,6 @@ public FieldVector getVector() { @Override public void close() throws Exception { - writer.close(); + vector.close(); } } diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFixedConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFixedConsumer.java index 77edefc7087..09f34fcb5df 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFixedConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFixedConsumer.java @@ -21,8 +21,6 @@ import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.FixedSizeBinaryVector; -import org.apache.arrow.vector.complex.impl.FixedSizeBinaryWriterImpl; -import org.apache.arrow.vector.complex.writer.FixedSizeBinaryWriter; import org.apache.avro.io.Decoder; /** @@ -31,35 +29,33 @@ */ public class AvroFixedConsumer implements Consumer { - private final FixedSizeBinaryWriter writer; private final FixedSizeBinaryVector vector; - private final byte[] reuseBytes; + private int currentIndex; + /** * Instantiate a AvroFixedConsumer. */ public AvroFixedConsumer(FixedSizeBinaryVector vector, int size) { this.vector = vector; - this.writer = new FixedSizeBinaryWriterImpl(vector); reuseBytes = new byte[size]; } @Override public void consume(Decoder decoder) throws IOException { decoder.readFixed(reuseBytes); - vector.setSafe(writer.getPosition(), reuseBytes); - writer.setPosition(writer.getPosition() + 1); + vector.setSafe(currentIndex++, reuseBytes); } @Override public void addNull() { - writer.setPosition(writer.getPosition() + 1); + currentIndex++; } @Override public void setPosition(int index) { - writer.setPosition(index); + currentIndex = index; } @Override @@ -69,6 +65,6 @@ public FieldVector getVector() { @Override public void close() throws Exception { - writer.close(); + vector.close(); } } diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java index 6256a9a2cf0..b27289a45ea 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java @@ -21,8 +21,6 @@ import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.Float4Vector; -import org.apache.arrow.vector.complex.impl.Float4WriterImpl; -import org.apache.arrow.vector.complex.writer.Float4Writer; import org.apache.avro.io.Decoder; /** @@ -31,31 +29,30 @@ */ public class AvroFloatConsumer implements Consumer { - private final Float4Writer writer; private final Float4Vector vector; + private int currentIndex; + /** * Instantiate a AvroFloatConsumer. */ public AvroFloatConsumer(Float4Vector vector) { this.vector = vector; - this.writer = new Float4WriterImpl(vector); } @Override public void consume(Decoder decoder) throws IOException { - writer.writeFloat4(decoder.readFloat()); - writer.setPosition(writer.getPosition() + 1); + vector.setSafe(currentIndex++, decoder.readFloat()); } @Override public void addNull() { - writer.setPosition(writer.getPosition() + 1); + currentIndex++; } @Override public void setPosition(int index) { - writer.setPosition(index); + currentIndex = index; } @Override @@ -65,6 +62,6 @@ public FieldVector getVector() { @Override public void close() throws Exception { - writer.close(); + vector.close(); } } diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java index 854c8d07f40..b67fa46efed 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java @@ -21,8 +21,6 @@ import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.IntVector; -import org.apache.arrow.vector.complex.impl.IntWriterImpl; -import org.apache.arrow.vector.complex.writer.IntWriter; import org.apache.avro.io.Decoder; /** @@ -31,31 +29,30 @@ */ public class AvroIntConsumer implements Consumer { - private final IntWriter writer; private final IntVector vector; + private int currentIndex; + /** * Instantiate a AvroIntConsumer. */ public AvroIntConsumer(IntVector vector) { this.vector = vector; - this.writer = new IntWriterImpl(vector); } @Override public void consume(Decoder decoder) throws IOException { - writer.writeInt(decoder.readInt()); - writer.setPosition(writer.getPosition() + 1); + vector.setSafe(currentIndex++, decoder.readInt()); } @Override public void addNull() { - writer.setPosition(writer.getPosition() + 1); + currentIndex++; } @Override public void setPosition(int index) { - writer.setPosition(index); + currentIndex = index; } @Override @@ -65,6 +62,6 @@ public FieldVector getVector() { @Override public void close() throws Exception { - writer.close(); + vector.close(); } } diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java index e0095cc53d4..231d61613d3 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java @@ -21,8 +21,6 @@ import org.apache.arrow.vector.BigIntVector; import org.apache.arrow.vector.FieldVector; -import org.apache.arrow.vector.complex.impl.BigIntWriterImpl; -import org.apache.arrow.vector.complex.writer.BigIntWriter; import org.apache.avro.io.Decoder; /** @@ -31,31 +29,30 @@ */ public class AvroLongConsumer implements Consumer { - private final BigIntWriter writer; private final BigIntVector vector; + private int currentIndex; + /** * Instantiate a AvroLongConsumer. */ public AvroLongConsumer(BigIntVector vector) { this.vector = vector; - this.writer = new BigIntWriterImpl(vector); } @Override public void consume(Decoder decoder) throws IOException { - writer.writeBigInt(decoder.readLong()); - writer.setPosition(writer.getPosition() + 1); + vector.setSafe(currentIndex++, decoder.readLong()); } @Override public void addNull() { - writer.setPosition(writer.getPosition() + 1); + currentIndex++; } @Override public void setPosition(int index) { - writer.setPosition(index); + currentIndex = index; } @Override @@ -65,6 +62,6 @@ public FieldVector getVector() { @Override public void close() throws Exception { - writer.close(); + vector.close(); } } diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java index c26a9862dab..e48de27a4a7 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java @@ -19,7 +19,6 @@ import java.io.IOException; -import org.apache.arrow.vector.BitVectorHelper; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.complex.MapVector; import org.apache.avro.io.Decoder; @@ -31,17 +30,14 @@ public class AvroMapConsumer implements Consumer { private final MapVector vector; - - private final Consumer keyDelegate; - private final Consumer valueDelegate; + private final Consumer delegate; /** * Instantiate a AvroMapConsumer. */ - public AvroMapConsumer(MapVector vector, Consumer keyDelegate, Consumer valueDelegate) { + public AvroMapConsumer(MapVector vector, Consumer delegate) { this.vector = vector; - this.keyDelegate = keyDelegate; - this.valueDelegate = valueDelegate; + this.delegate = delegate; } @Override @@ -50,26 +46,16 @@ public void consume(Decoder decoder) throws IOException { int idx = vector.getValueCount(); vector.startNewValue(idx); long totalCount = 0; - for (long count = decoder.readArrayStart(); count != 0; count = decoder.arrayNext()) { + for (long count = decoder.readMapStart(); count != 0; count = decoder.mapNext()) { totalCount += count; for (int element = 0; element < count; element++) { - keyDelegate.consume(decoder); - valueDelegate.consume(decoder); + delegate.consume(decoder); } } + vector.endValue(idx, (int) totalCount); - int end = (int) (vector.getOffsetBuffer().getInt(idx * 4) + totalCount); - vector.getOffsetBuffer().setInt((idx + 1) * 4, end); - - int dataValueCount = vector.getDataVector().getValueCount(); - for (int i = dataValueCount; i < end; i++) { - BitVectorHelper.setValidityBitToOne(vector.getDataVector().getValidityBuffer(), i); - } - - BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), vector.getValueCount()); vector.setValueCount(idx + 1); - // set data vector valueCount - vector.getDataVector().setValueCount(end); + } @Override @@ -90,7 +76,6 @@ public FieldVector getVector() { @Override public void close() throws Exception { vector.close(); - keyDelegate.close(); - valueDelegate.close(); + delegate.close(); } } diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java index baca6f06e94..8a1cc4c7e6e 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java @@ -22,8 +22,6 @@ import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VarCharVector; -import org.apache.arrow.vector.complex.impl.VarCharWriterImpl; -import org.apache.arrow.vector.complex.writer.VarCharWriter; import org.apache.avro.io.Decoder; /** @@ -33,39 +31,32 @@ public class AvroStringConsumer implements Consumer { private final VarCharVector vector; - private final VarCharWriter writer; private ByteBuffer cacheBuffer; + private int currentIndex; /** * Instantiate a AvroStringConsumer. */ public AvroStringConsumer(VarCharVector vector) { this.vector = vector; - this.writer = new VarCharWriterImpl(vector); } @Override public void consume(Decoder decoder) throws IOException { - writeValue(decoder); - writer.setPosition(writer.getPosition() + 1); + // cacheBuffer is initialized null and create in the first consume, + // if its capacity < size to read, decoder will create a new one with new capacity. + cacheBuffer = decoder.readBytes(cacheBuffer); + vector.setSafe(currentIndex++, cacheBuffer, 0, cacheBuffer.limit()); } @Override public void addNull() { - writer.setPosition(writer.getPosition() + 1); - } - - private void writeValue(Decoder decoder) throws IOException { - - // cacheBuffer is initialized null and create in the first consume, - // if its capacity < size to read, decoder will create a new one with new capacity. - cacheBuffer = decoder.readBytes(cacheBuffer); - vector.setSafe(writer.getPosition(), cacheBuffer, 0, cacheBuffer.limit()); + currentIndex++; } @Override public void setPosition(int index) { - writer.setPosition(index); + currentIndex = index; } @Override @@ -75,6 +66,6 @@ public FieldVector getVector() { @Override public void close() throws Exception { - writer.close(); + vector.close(); } } diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStructConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStructConsumer.java new file mode 100644 index 00000000000..5eee4687946 --- /dev/null +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStructConsumer.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.complex.StructVector; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume nested record type values from avro decoder. + * Write the data to {@link org.apache.arrow.vector.complex.StructVector}. + */ +public class AvroStructConsumer implements Consumer { + + private final Consumer[] delegates; + private StructVector vector; + + private int currentIndex; + + + /** + * Instantiate a AvroUnionConsumer. + */ + public AvroStructConsumer(StructVector vector, Consumer[] delegates) { + this.vector = vector; + this.delegates = delegates; + } + + @Override + public void consume(Decoder decoder) throws IOException { + + for (int i = 0; i < delegates.length; i++) { + delegates[i].consume(decoder); + } + currentIndex++; + + } + + @Override + public void addNull() { + currentIndex++; + } + + @Override + public void setPosition(int index) { + currentIndex = index; + } + + @Override + public FieldVector getVector() { + vector.setValueCount(currentIndex); + return this.vector; + } + + @Override + public void close() throws Exception { + vector.close(); + for (Consumer delegate: delegates) { + delegate.close(); + } + } +} diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java index b927a5ba5cc..5c61688d605 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java @@ -21,7 +21,6 @@ import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.complex.UnionVector; -import org.apache.arrow.vector.complex.impl.UnionWriter; import org.apache.arrow.vector.types.Types; import org.apache.avro.io.Decoder; @@ -34,15 +33,14 @@ public class AvroUnionsConsumer implements Consumer { private Consumer[] delegates; private Types.MinorType[] types; - private UnionWriter writer; private UnionVector vector; + private int currentIndex; /** - * Instantiate a AvroUnionConsumer. + * Instantiate an AvroUnionConsumer. */ public AvroUnionsConsumer(UnionVector vector, Consumer[] delegates, Types.MinorType[] types) { - this.writer = new UnionWriter(vector); this.vector = vector; this.delegates = delegates; this.types = types; @@ -51,39 +49,37 @@ public AvroUnionsConsumer(UnionVector vector, Consumer[] delegates, Types.MinorT @Override public void consume(Decoder decoder) throws IOException { int fieldIndex = decoder.readInt(); - int position = writer.getPosition(); Consumer delegate = delegates[fieldIndex]; - vector.setType(position, types[fieldIndex]); + vector.setType(currentIndex, types[fieldIndex]); // In UnionVector we need to set sub vector writer position before consume a value // because in the previous iterations we might not have written to the specific union sub vector. - delegate.setPosition(position); + delegate.setPosition(currentIndex); delegate.consume(decoder); - writer.setPosition(position + 1); - + currentIndex++; } @Override public void addNull() { - writer.setPosition(writer.getPosition() + 1); + currentIndex++; } @Override public void setPosition(int index) { - writer.setPosition(index); + currentIndex = index; } @Override public FieldVector getVector() { - vector.setValueCount(writer.getPosition()); + vector.setValueCount(currentIndex); return this.vector; } @Override public void close() throws Exception { - writer.close(); + vector.close(); for (Consumer delegate: delegates) { delegate.close(); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractStructVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractStructVector.java index 3295e4e97c2..25762fda88c 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractStructVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/AbstractStructVector.java @@ -192,7 +192,7 @@ protected ValueVector add(String childName, FieldType fieldType) { * @param name the name of the child to add * @param vector the vector to add as a child */ - public void putChild(String name, FieldVector vector) { + protected void putChild(String name, FieldVector vector) { putVector(name, vector); } diff --git a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java index ccc0305cee4..81dc8803311 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java @@ -372,13 +372,6 @@ public FieldVector getDataVector() { return vector; } - /** - * Directly set the underlying vector. - */ - public void setDataVector(FieldVector vector) { - this.vector = vector; - } - @Override public TransferPair getTransferPair(String ref, BufferAllocator allocator) { return getTransferPair(ref, allocator, null); From 788fef1a3b532bb29f7268f1716f60ce5188b42d Mon Sep 17 00:00:00 2001 From: tianchen Date: Wed, 28 Aug 2019 16:44:41 +0800 Subject: [PATCH 09/11] fix --- .../org/apache/arrow/AvroToArrowUtils.java | 22 ++++++----- .../arrow/consumers/AvroMapConsumer.java | 13 +++---- .../arrow/consumers/AvroStructConsumer.java | 7 ++-- .../org/apache/arrow/AvroToArrowTest.java | 38 ++++++++++++++----- 4 files changed, 51 insertions(+), 29 deletions(-) diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java index 28fb1c5206c..4df281f2d3a 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java @@ -149,55 +149,55 @@ private static Consumer createConsumer( case STRING: arrowType = new ArrowType.Utf8(); fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); - vector = v != null ? v : fieldType.createNewSingleVector(name, allocator, null); + vector = createVector(v, fieldType, name, allocator); consumer = new AvroStringConsumer((VarCharVector) vector); break; case FIXED: arrowType = new ArrowType.FixedSizeBinary(schema.getFixedSize()); fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); - vector = v != null ? v : fieldType.createNewSingleVector(name, allocator, null); + vector = createVector(v, fieldType, name, allocator); consumer = new AvroFixedConsumer((FixedSizeBinaryVector) vector, schema.getFixedSize()); break; case INT: arrowType = new ArrowType.Int(32, /*signed=*/true); fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); - vector = v != null ? v : fieldType.createNewSingleVector(name, allocator, null); + vector = createVector(v, fieldType, name, allocator); consumer = new AvroIntConsumer((IntVector) vector); break; case BOOLEAN: arrowType = new ArrowType.Bool(); fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); - vector = v != null ? v : fieldType.createNewSingleVector(name, allocator, null); + vector = createVector(v, fieldType, name, allocator); consumer = new AvroBooleanConsumer((BitVector) vector); break; case LONG: arrowType = new ArrowType.Int(64, /*signed=*/true); fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); - vector = v != null ? v : fieldType.createNewSingleVector(name, allocator, null); + vector = createVector(v, fieldType, name, allocator); consumer = new AvroLongConsumer((BigIntVector) vector); break; case FLOAT: arrowType = new ArrowType.FloatingPoint(SINGLE); fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); - vector = v != null ? v : fieldType.createNewSingleVector(name, allocator, null); + vector = createVector(v, fieldType, name, allocator); consumer = new AvroFloatConsumer((Float4Vector) vector); break; case DOUBLE: arrowType = new ArrowType.FloatingPoint(DOUBLE); fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); - vector = v != null ? v : fieldType.createNewSingleVector(name, allocator, null); + vector = createVector(v, fieldType, name, allocator); consumer = new AvroDoubleConsumer((Float8Vector) vector); break; case BYTES: arrowType = new ArrowType.Binary(); fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); - vector = v != null ? v : fieldType.createNewSingleVector(name, allocator, null); + vector = createVector(v, fieldType, name, allocator); consumer = new AvroBytesConsumer((VarBinaryVector) vector); break; case NULL: arrowType = new ArrowType.Null(); fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema)); - vector = v != null ? v : fieldType.createNewSingleVector(name, allocator, null); + vector = createVector(v, fieldType, name, allocator); consumer = new AvroNullConsumer((ZeroVector) vector); break; default: @@ -211,6 +211,10 @@ private static Consumer createConsumer( return consumer; } + private static FieldVector createVector(FieldVector v, FieldType fieldType, String name, BufferAllocator allocator) { + return v != null ? v : fieldType.createNewSingleVector(name, allocator, null); + } + private static String getDefaultFieldName(ArrowType type) { Types.MinorType minorType = Types.getMinorTypeForArrowType(type); return minorType.name().toLowerCase(); diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java index e48de27a4a7..6a09b586635 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java @@ -32,6 +32,8 @@ public class AvroMapConsumer implements Consumer { private final MapVector vector; private final Consumer delegate; + private int currentIndex; + /** * Instantiate a AvroMapConsumer. */ @@ -43,8 +45,7 @@ public AvroMapConsumer(MapVector vector, Consumer delegate) { @Override public void consume(Decoder decoder) throws IOException { - int idx = vector.getValueCount(); - vector.startNewValue(idx); + vector.startNewValue(currentIndex); long totalCount = 0; for (long count = decoder.readMapStart(); count != 0; count = decoder.mapNext()) { totalCount += count; @@ -52,10 +53,8 @@ public void consume(Decoder decoder) throws IOException { delegate.consume(decoder); } } - vector.endValue(idx, (int) totalCount); - - vector.setValueCount(idx + 1); - + vector.endValue(currentIndex, (int) totalCount); + currentIndex++; } @Override @@ -65,7 +64,7 @@ public void addNull() { @Override public void setPosition(int index) { - vector.startNewValue(index); + this.currentIndex = index; } @Override diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStructConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStructConsumer.java index 5eee4687946..e30af894bf1 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStructConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStructConsumer.java @@ -19,6 +19,7 @@ import java.io.IOException; +import org.apache.arrow.util.AutoCloseables; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.complex.StructVector; import org.apache.avro.io.Decoder; @@ -36,7 +37,7 @@ public class AvroStructConsumer implements Consumer { /** - * Instantiate a AvroUnionConsumer. + * Instantiate a AvroStructConsumer. */ public AvroStructConsumer(StructVector vector, Consumer[] delegates) { this.vector = vector; @@ -72,8 +73,6 @@ public FieldVector getVector() { @Override public void close() throws Exception { vector.close(); - for (Consumer delegate: delegates) { - delegate.close(); - } + AutoCloseables.close(delegates); } } diff --git a/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java b/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java index bd5383523ce..5afb13558f8 100644 --- a/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java +++ b/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java @@ -332,8 +332,14 @@ public void testMapType() throws Exception { VectorSchemaRoot root = writeAndRead(schema, data); MapVector vector = (MapVector) root.getFieldVectors().get(0); - checkPrimitiveResult(keys, vector.getDataVector().getChildrenFromFields().get(0)); - checkPrimitiveResult(vals, vector.getDataVector().getChildrenFromFields().get(1)); + checkPrimitiveResult(keys, vector.getDataVector().getChildrenFromFields().get(0), 0, 2); + checkPrimitiveResult(vals, vector.getDataVector().getChildrenFromFields().get(1), 0, 2); + + checkPrimitiveResult(keys, vector.getDataVector().getChildrenFromFields().get(0), 2, 4); + checkPrimitiveResult(vals, vector.getDataVector().getChildrenFromFields().get(1), 2, 4); + + checkPrimitiveResult(keys, vector.getDataVector().getChildrenFromFields().get(0), 4, 6); + checkPrimitiveResult(vals, vector.getDataVector().getChildrenFromFields().get(1), 4, 6); } @Test @@ -409,13 +415,6 @@ private void checkArrayResult(List expected, ListVector vector) { } } - private void checkMapResult(List expected, MapVector vector) { - assertEquals(expected.size(), vector.getValueCount()); - for (int i = 0; i < expected.size(); i++) { - assertEquals(expected.get(i), vector.getObject(i).toString()); - } - } - private void checkPrimitiveResult(List expected, List actual) { assertEquals(expected.size(), actual.size()); for (int i = 0; i < expected.size(); i++) { @@ -455,6 +454,27 @@ private void checkPrimitiveResult(List data, FieldVector vector) { } } + private void checkPrimitiveResult(List data, FieldVector vector, int start, int end) { + assertEquals(data.size(), vector.getValueCount()); + for (int i = start; i < end; i++) { + Object value1 = data.get(i); + Object value2 = vector.getObject(i); + if (value1 == null) { + assertTrue(value2 == null); + continue; + } + if (value2 instanceof byte[]) { + value2 = ByteBuffer.wrap((byte[]) value2); + if (value1 instanceof byte[]) { + value1 = ByteBuffer.wrap((byte[]) value1); + } + } else if (value2 instanceof Text) { + value2 = value2.toString(); + } + assertTrue(Objects.equals(value1, value2)); + } + } + private void checkRecordResult(Schema schema, ArrayList data, VectorSchemaRoot root) { assertEquals(data.size(), root.getRowCount()); assertEquals(schema.getFields().size(), root.getFieldVectors().size()); From ac9540d6172ff950456c200b2bf28f20e936f6f1 Mon Sep 17 00:00:00 2001 From: tianchen Date: Wed, 28 Aug 2019 17:05:34 +0800 Subject: [PATCH 10/11] fix test --- .../arrow/consumers/AvroStructConsumer.java | 3 ++ .../org/apache/arrow/AvroToArrowTest.java | 35 ++++--------------- 2 files changed, 9 insertions(+), 29 deletions(-) diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStructConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStructConsumer.java index e30af894bf1..e73c7c5a1bf 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStructConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStructConsumer.java @@ -20,6 +20,7 @@ import java.io.IOException; import org.apache.arrow.util.AutoCloseables; +import org.apache.arrow.vector.BitVectorHelper; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.complex.StructVector; import org.apache.avro.io.Decoder; @@ -50,6 +51,8 @@ public void consume(Decoder decoder) throws IOException { for (int i = 0; i < delegates.length; i++) { delegates[i].consume(decoder); } + vector.setIndexDefined(currentIndex); + BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), currentIndex); currentIndex++; } diff --git a/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java b/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java index 5afb13558f8..22eec9cd724 100644 --- a/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java +++ b/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java @@ -332,14 +332,12 @@ public void testMapType() throws Exception { VectorSchemaRoot root = writeAndRead(schema, data); MapVector vector = (MapVector) root.getFieldVectors().get(0); - checkPrimitiveResult(keys, vector.getDataVector().getChildrenFromFields().get(0), 0, 2); - checkPrimitiveResult(vals, vector.getDataVector().getChildrenFromFields().get(1), 0, 2); - - checkPrimitiveResult(keys, vector.getDataVector().getChildrenFromFields().get(0), 2, 4); - checkPrimitiveResult(vals, vector.getDataVector().getChildrenFromFields().get(1), 2, 4); - - checkPrimitiveResult(keys, vector.getDataVector().getChildrenFromFields().get(0), 4, 6); - checkPrimitiveResult(vals, vector.getDataVector().getChildrenFromFields().get(1), 4, 6); + checkPrimitiveResult(keys, vector.getDataVector().getChildrenFromFields().get(0)); + checkPrimitiveResult(vals, vector.getDataVector().getChildrenFromFields().get(1)); + assertEquals(0, vector.getOffsetBuffer().getInt(0)); + assertEquals(2, vector.getOffsetBuffer().getInt(1 * 4)); + assertEquals(4, vector.getOffsetBuffer().getInt(2 * 4)); + assertEquals(6, vector.getOffsetBuffer().getInt(3 * 4)); } @Test @@ -454,27 +452,6 @@ private void checkPrimitiveResult(List data, FieldVector vector) { } } - private void checkPrimitiveResult(List data, FieldVector vector, int start, int end) { - assertEquals(data.size(), vector.getValueCount()); - for (int i = start; i < end; i++) { - Object value1 = data.get(i); - Object value2 = vector.getObject(i); - if (value1 == null) { - assertTrue(value2 == null); - continue; - } - if (value2 instanceof byte[]) { - value2 = ByteBuffer.wrap((byte[]) value2); - if (value1 instanceof byte[]) { - value1 = ByteBuffer.wrap((byte[]) value1); - } - } else if (value2 instanceof Text) { - value2 = value2.toString(); - } - assertTrue(Objects.equals(value1, value2)); - } - } - private void checkRecordResult(Schema schema, ArrayList data, VectorSchemaRoot root) { assertEquals(data.size(), root.getRowCount()); assertEquals(schema.getFields().size(), root.getFieldVectors().size()); From 456a5325c24624d7cdb8c823252906fffeb487c0 Mon Sep 17 00:00:00 2001 From: tianchen Date: Thu, 29 Aug 2019 14:47:35 +0800 Subject: [PATCH 11/11] fix --- .../java/org/apache/arrow/consumers/AvroStructConsumer.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStructConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStructConsumer.java index e73c7c5a1bf..a6bec289c55 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStructConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStructConsumer.java @@ -20,7 +20,6 @@ import java.io.IOException; import org.apache.arrow.util.AutoCloseables; -import org.apache.arrow.vector.BitVectorHelper; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.complex.StructVector; import org.apache.avro.io.Decoder; @@ -52,7 +51,6 @@ public void consume(Decoder decoder) throws IOException { delegates[i].consume(decoder); } vector.setIndexDefined(currentIndex); - BitVectorHelper.setValidityBitToOne(vector.getValidityBuffer(), currentIndex); currentIndex++; }