diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java
index 77f34df1479..4df281f2d3a 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java
@@ -20,21 +20,28 @@
import static org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE;
import static org.apache.arrow.vector.types.FloatingPointPrecision.SINGLE;
+import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import org.apache.arrow.consumers.AvroArraysConsumer;
import org.apache.arrow.consumers.AvroBooleanConsumer;
import org.apache.arrow.consumers.AvroBytesConsumer;
import org.apache.arrow.consumers.AvroDoubleConsumer;
+import org.apache.arrow.consumers.AvroFixedConsumer;
import org.apache.arrow.consumers.AvroFloatConsumer;
import org.apache.arrow.consumers.AvroIntConsumer;
import org.apache.arrow.consumers.AvroLongConsumer;
+import org.apache.arrow.consumers.AvroMapConsumer;
import org.apache.arrow.consumers.AvroNullConsumer;
import org.apache.arrow.consumers.AvroStringConsumer;
+import org.apache.arrow.consumers.AvroStructConsumer;
import org.apache.arrow.consumers.AvroUnionsConsumer;
import org.apache.arrow.consumers.CompositeAvroConsumer;
import org.apache.arrow.consumers.Consumer;
@@ -44,6 +51,7 @@
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.FixedSizeBinaryVector;
import org.apache.arrow.vector.Float4Vector;
import org.apache.arrow.vector.Float8Vector;
import org.apache.arrow.vector.IntVector;
@@ -51,6 +59,9 @@
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ZeroVector;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.complex.StructVector;
import org.apache.arrow.vector.complex.UnionVector;
import org.apache.arrow.vector.types.Types;
import org.apache.arrow.vector.types.UnionMode;
@@ -81,19 +92,37 @@ public class AvroToArrowUtils {
*
DOUBLE --> ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
* BOOLEAN --> ArrowType.Bool
* BYTES --> ArrowType.Binary
+ * ARRAY --> ArrowType.List
+ * MAP --> ArrowType.Map
+ * FIXED --> ArrowType.FixedSizeBinary
*
*/
+
private static Consumer createConsumer(Schema schema, String name, BufferAllocator allocator) {
- return createConsumer(schema, name, false, INVALID_NULL_INDEX, allocator);
+ return createConsumer(schema, name, false, INVALID_NULL_INDEX, allocator, null);
+ }
+
+ private static Consumer createConsumer(Schema schema, String name, BufferAllocator allocator, FieldVector vector) {
+ return createConsumer(schema, name, false, INVALID_NULL_INDEX, allocator, vector);
}
+ /**
+ * Create a consumer with the given avro schema
+ * @param schema avro schema
+ * @param name arrow field name
+ * @param v vector to keep in consumer, if v == null, will create a new vector via field.
+ * @return consumer
+ */
private static Consumer createConsumer(
Schema schema,
String name,
boolean nullable,
int nullIndex,
- BufferAllocator allocator) {
+ BufferAllocator allocator,
+ FieldVector v) {
+
Preconditions.checkNotNull(schema, "Avro schema object can't be null");
+ Preconditions.checkNotNull(allocator, "allocator can't be null");
Type type = schema.getType();
@@ -104,53 +133,71 @@ private static Consumer createConsumer(
switch (type) {
case UNION:
- return createUnionConsumer(schema, name, allocator);
+ consumer = createUnionConsumer(schema, name, allocator, v);
+ break;
+ case ARRAY:
+ consumer = createArrayConsumer(schema, name, allocator, v);
+ break;
+ case MAP:
+ consumer = createMapConsumer(schema, name, allocator, v);
+ break;
+ //TODO implement enum and nested record type
+ case RECORD:
+ throw new UnsupportedOperationException();
+ case ENUM:
+ throw new UnsupportedOperationException();
case STRING:
arrowType = new ArrowType.Utf8();
fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
- vector = fieldType.createNewSingleVector(name, allocator, null);
+ vector = createVector(v, fieldType, name, allocator);
consumer = new AvroStringConsumer((VarCharVector) vector);
break;
+ case FIXED:
+ arrowType = new ArrowType.FixedSizeBinary(schema.getFixedSize());
+ fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
+ vector = createVector(v, fieldType, name, allocator);
+ consumer = new AvroFixedConsumer((FixedSizeBinaryVector) vector, schema.getFixedSize());
+ break;
case INT:
arrowType = new ArrowType.Int(32, /*signed=*/true);
fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
- vector = fieldType.createNewSingleVector(name, allocator, null);
+ vector = createVector(v, fieldType, name, allocator);
consumer = new AvroIntConsumer((IntVector) vector);
break;
case BOOLEAN:
arrowType = new ArrowType.Bool();
fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
- vector = fieldType.createNewSingleVector(name, allocator, null);
+ vector = createVector(v, fieldType, name, allocator);
consumer = new AvroBooleanConsumer((BitVector) vector);
break;
case LONG:
arrowType = new ArrowType.Int(64, /*signed=*/true);
fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
- vector = fieldType.createNewSingleVector(name, allocator, null);
+ vector = createVector(v, fieldType, name, allocator);
consumer = new AvroLongConsumer((BigIntVector) vector);
break;
case FLOAT:
arrowType = new ArrowType.FloatingPoint(SINGLE);
fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
- vector = fieldType.createNewSingleVector(name, allocator, null);
+ vector = createVector(v, fieldType, name, allocator);
consumer = new AvroFloatConsumer((Float4Vector) vector);
break;
case DOUBLE:
arrowType = new ArrowType.FloatingPoint(DOUBLE);
fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
- vector = fieldType.createNewSingleVector(name, allocator, null);
+ vector = createVector(v, fieldType, name, allocator);
consumer = new AvroDoubleConsumer((Float8Vector) vector);
break;
case BYTES:
arrowType = new ArrowType.Binary();
fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
- vector = fieldType.createNewSingleVector(name, allocator, null);
+ vector = createVector(v, fieldType, name, allocator);
consumer = new AvroBytesConsumer((VarBinaryVector) vector);
break;
case NULL:
arrowType = new ArrowType.Null();
fieldType = new FieldType(nullable, arrowType, /*dictionary=*/null, getMetaData(schema));
- vector = fieldType.createNewSingleVector(name, allocator, null);
+ vector = createVector(v, fieldType, name, allocator);
consumer = new AvroNullConsumer((ZeroVector) vector);
break;
default:
@@ -164,14 +211,143 @@ private static Consumer createConsumer(
return consumer;
}
- private static Consumer createUnionConsumer(Schema schema, String name, BufferAllocator allocator) {
+ private static FieldVector createVector(FieldVector v, FieldType fieldType, String name, BufferAllocator allocator) {
+ return v != null ? v : fieldType.createNewSingleVector(name, allocator, null);
+ }
+
+ private static String getDefaultFieldName(ArrowType type) {
+ Types.MinorType minorType = Types.getMinorTypeForArrowType(type);
+ return minorType.name().toLowerCase();
+ }
+
+ private static Field avroSchemaToField(Schema schema, String name) {
+ final Type type = schema.getType();
+ final ArrowType arrowType;
+
+ switch (type) {
+ case UNION:
+ List children = new ArrayList<>();
+ for (int i = 0; i < schema.getTypes().size(); i++) {
+ Schema childSchema = schema.getTypes().get(i);
+ // Union child vector should use default name
+ children.add(avroSchemaToField(childSchema, null));
+ }
+ arrowType = new ArrowType.Union(UnionMode.Sparse, null);
+ if (name == null) {
+ name = getDefaultFieldName(arrowType);
+ }
+ return new Field(name, FieldType.nullable(arrowType), children);
+ case ARRAY:
+ Schema elementSchema = schema.getElementType();
+ arrowType = new ArrowType.List();
+ if (name == null) {
+ name = getDefaultFieldName(arrowType);
+ }
+ return new Field(name, FieldType.nullable(arrowType),
+ Collections.singletonList(avroSchemaToField(elementSchema, elementSchema.getName())));
+ case MAP:
+ // MapVector internal struct field and key field should be non-nullable
+ FieldType keyFieldType = new FieldType(/*nullable=*/false, new ArrowType.Utf8(), /*dictionary=*/null);
+ Field keyField = new Field("key", keyFieldType, /*children=*/null);
+ Field valueField = avroSchemaToField(schema.getValueType(), "value");
+
+ FieldType structFieldType = new FieldType(false, new ArrowType.Struct(), /*dictionary=*/null);
+ Field structField = new Field("internal", structFieldType, Arrays.asList(keyField, valueField));
+ arrowType = new ArrowType.Map(/*keySorted=*/false);
+ if (name == null) {
+ name = getDefaultFieldName(arrowType);
+ }
+ return new Field(name, FieldType.nullable(arrowType), Collections.singletonList(structField));
+ case STRING:
+ arrowType = new ArrowType.Utf8();
+ break;
+ case FIXED:
+ arrowType = new ArrowType.FixedSizeBinary(schema.getFixedSize());
+ break;
+ case INT:
+ arrowType = new ArrowType.Int(32, /*signed=*/true);
+ break;
+ case BOOLEAN:
+ arrowType = new ArrowType.Bool();
+ break;
+ case LONG:
+ arrowType = new ArrowType.Int(64, /*signed=*/true);
+ break;
+ case FLOAT:
+ arrowType = new ArrowType.FloatingPoint(SINGLE);
+ break;
+ case DOUBLE:
+ arrowType = new ArrowType.FloatingPoint(DOUBLE);
+ break;
+ case BYTES:
+ arrowType = new ArrowType.Binary();
+ break;
+ case NULL:
+ arrowType = new ArrowType.Null();
+ break;
+ default:
+ // no-op, shouldn't get here
+ throw new UnsupportedOperationException();
+ }
+
+ if (name == null) {
+ name = getDefaultFieldName(arrowType);
+ }
+ return Field.nullable(name, arrowType);
+ }
+
+ private static Consumer createArrayConsumer(Schema schema, String name, BufferAllocator allocator, FieldVector v) {
+
+ ListVector listVector;
+ if (v == null) {
+ final Field field = avroSchemaToField(schema, name);
+ listVector = (ListVector) field.createVector(allocator);
+ } else {
+ listVector = (ListVector) v;
+ }
+
+ FieldVector dataVector = listVector.getDataVector();
+
+ // create delegate
+ Schema childSchema = schema.getElementType();
+ Consumer delegate = createConsumer(childSchema, childSchema.getName(), allocator, dataVector);
+
+ return new AvroArraysConsumer(listVector, delegate);
+ }
+
+ private static Consumer createMapConsumer(Schema schema, String name, BufferAllocator allocator, FieldVector v) {
+
+ MapVector mapVector;
+ if (v == null) {
+ final Field field = avroSchemaToField(schema, name);
+ mapVector = (MapVector) field.createVector(allocator);
+ } else {
+ mapVector = (MapVector) v;
+ }
+
+ // create delegate struct consumer
+ StructVector structVector = (StructVector) mapVector.getDataVector();
+
+ // keys in avro map are always assumed to be strings.
+ Consumer keyConsumer = new AvroStringConsumer(
+ (VarCharVector) structVector.getChildrenFromFields().get(0));
+ Consumer valueConsumer = createConsumer(schema.getValueType(), schema.getValueType().getName(),
+ allocator, structVector.getChildrenFromFields().get(1));
+
+ AvroStructConsumer internalConsumer =
+ new AvroStructConsumer(structVector, new Consumer[] {keyConsumer, valueConsumer});
+
+ return new AvroMapConsumer(mapVector, internalConsumer);
+ }
+
+ private static Consumer createUnionConsumer(Schema schema, String name, BufferAllocator allocator, FieldVector v) {
int size = schema.getTypes().size();
long nullCount = schema.getTypes().stream().filter(s -> s.getType() == Type.NULL).count();
// union only has one type, convert to primitive type
if (size == 1) {
Schema subSchema = schema.getTypes().get(0);
- return createConsumer(subSchema, name, allocator);
+ return createConsumer(subSchema, name, allocator, v);
// size == 2 and has null type, convert to nullable primitive type
} else if (size == 2 && nullCount == 1) {
@@ -179,25 +355,30 @@ private static Consumer createUnionConsumer(Schema schema, String name, BufferAl
int nullIndex = schema.getTypes().indexOf(nullSchema);
Schema subSchema = schema.getTypes().stream().filter(s -> s.getType() != Type.NULL).findFirst().get();
Preconditions.checkNotNull(subSchema, "schema should not be null.");
- return createConsumer(subSchema, name, true, nullIndex, allocator);
+ return createConsumer(subSchema, name, true, nullIndex, allocator, v);
// real union type
} else {
- final FieldType fieldType = new FieldType(/*nullable=*/true,
- new ArrowType.Union(UnionMode.Sparse, null), /*dictionary=*/null, getMetaData(schema));
- UnionVector unionVector =
- (UnionVector) fieldType.createNewSingleVector(name, allocator, null);
+ UnionVector unionVector;
+ if (v == null) {
+ final Field field = avroSchemaToField(schema, name);
+ unionVector = (UnionVector) field.createVector(allocator);
+ } else {
+ unionVector = (UnionVector) v;
+ }
+
+ List childVectors = unionVector.getChildrenFromFields();
Consumer[] delegates = new Consumer[size];
Types.MinorType[] types = new Types.MinorType[size];
for (int i = 0; i < size; i++) {
+ FieldVector child = childVectors.get(i);
Schema subSchema = schema.getTypes().get(i);
- Consumer delegate = createConsumer(subSchema, subSchema.getName(), allocator);
- unionVector.directAddVector(delegate.getVector());
+ Consumer delegate = createConsumer(subSchema, subSchema.getName(), allocator, child);
delegates[i] = delegate;
- types[i] = delegate.getVector().getMinorType();
+ types[i] = child.getMinorType();
}
return new AvroUnionsConsumer(unionVector, delegates, types);
}
@@ -227,10 +408,6 @@ public static VectorSchemaRoot avroToArrowVectors(Schema schema, Decoder decoder
consumers.add(consumer);
vectors.add(consumer.getVector());
}
- } else if (type == Type.MAP) {
- throw new UnsupportedOperationException();
- } else if (type == Type.ARRAY) {
- throw new UnsupportedOperationException();
} else if (type == Type.ENUM) {
throw new UnsupportedOperationException();
} else {
@@ -246,10 +423,17 @@ public static VectorSchemaRoot avroToArrowVectors(Schema schema, Decoder decoder
VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors, 0);
- CompositeAvroConsumer compositeConsumer = null;
+ CompositeAvroConsumer compositeConsumer = new CompositeAvroConsumer(consumers);
+
+ int valueCount = 0;
try {
- compositeConsumer = new CompositeAvroConsumer(consumers);
- compositeConsumer.consume(decoder, root);
+ while (true) {
+ compositeConsumer.consume(decoder, root);
+ valueCount++;
+ }
+ } catch (EOFException eof) {
+ // reach the end of encoder stream.
+ root.setRowCount(valueCount);
} catch (Exception e) {
compositeConsumer.close();
throw new RuntimeException("Error occurs while consume process.", e);
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java
new file mode 100644
index 00000000000..de54be6b32b
--- /dev/null
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroArraysConsumer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers;
+
+import java.io.IOException;
+
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which consume array type values from avro decoder.
+ * Write the data to {@link ListVector}.
+ */
+public class AvroArraysConsumer implements Consumer {
+
+ private final ListVector vector;
+ private final Consumer delegate;
+
+ private int currentIndex = 0;
+
+ /**
+ * Instantiate a ArrayConsumer.
+ */
+ public AvroArraysConsumer(ListVector vector, Consumer delegate) {
+ this.vector = vector;
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void consume(Decoder decoder) throws IOException {
+
+ vector.startNewValue(currentIndex);
+ long totalCount = 0;
+ for (long count = decoder.readArrayStart(); count != 0; count = decoder.arrayNext()) {
+ totalCount += count;
+ for (int element = 0; element < count; element++) {
+ delegate.consume(decoder);
+ }
+ }
+ vector.endValue(currentIndex, (int) totalCount);
+ currentIndex++;
+ }
+
+ @Override
+ public void addNull() {
+ currentIndex++;
+ }
+
+ @Override
+ public void setPosition(int index) {
+ currentIndex = index;
+ }
+
+ @Override
+ public FieldVector getVector() {
+ return this.vector;
+ }
+
+ @Override
+ public void close() throws Exception {
+ vector.close();
+ delegate.close();
+ }
+}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java
index c2876f1d39d..73c5e72134f 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java
@@ -21,8 +21,6 @@
import org.apache.arrow.vector.BitVector;
import org.apache.arrow.vector.FieldVector;
-import org.apache.arrow.vector.complex.impl.BitWriterImpl;
-import org.apache.arrow.vector.complex.writer.BitWriter;
import org.apache.avro.io.Decoder;
/**
@@ -31,31 +29,30 @@
*/
public class AvroBooleanConsumer implements Consumer {
- private final BitWriter writer;
private final BitVector vector;
+ private int currentIndex = 0;
/**
* Instantiate a AvroBooleanConsumer.
*/
public AvroBooleanConsumer(BitVector vector) {
this.vector = vector;
- this.writer = new BitWriterImpl(vector);
}
@Override
public void consume(Decoder decoder) throws IOException {
- writer.writeBit(decoder.readBoolean() ? 1 : 0);
- writer.setPosition(writer.getPosition() + 1);
+ vector.setSafe(currentIndex, decoder.readBoolean() ? 1 : 0);
+ currentIndex++;
}
@Override
public void addNull() {
- writer.setPosition(writer.getPosition() + 1);
+ currentIndex++;
}
@Override
public void setPosition(int index) {
- writer.setPosition(index);
+ currentIndex = index;
}
@Override
@@ -65,7 +62,7 @@ public FieldVector getVector() {
@Override
public void close() throws Exception {
- writer.close();
+ vector.close();
}
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java
index c0cfaec5c9b..d179c090d79 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java
@@ -22,9 +22,6 @@
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VarBinaryVector;
-import org.apache.arrow.vector.complex.impl.VarBinaryWriterImpl;
-import org.apache.arrow.vector.complex.writer.VarBinaryWriter;
-import org.apache.arrow.vector.holders.VarBinaryHolder;
import org.apache.avro.io.Decoder;
/**
@@ -33,46 +30,35 @@
*/
public class AvroBytesConsumer implements Consumer {
- private final VarBinaryWriter writer;
private final VarBinaryVector vector;
private ByteBuffer cacheBuffer;
+ private int currentIndex;
+
/**
* Instantiate a AvroBytesConsumer.
*/
public AvroBytesConsumer(VarBinaryVector vector) {
this.vector = vector;
- this.writer = new VarBinaryWriterImpl(vector);
}
@Override
public void consume(Decoder decoder) throws IOException {
- writeValue(decoder);
- writer.setPosition(writer.getPosition() + 1);
- }
-
- @Override
- public void addNull() {
- writer.setPosition(writer.getPosition() + 1);
- }
-
- private void writeValue(Decoder decoder) throws IOException {
- VarBinaryHolder holder = new VarBinaryHolder();
-
// cacheBuffer is initialized null and create in the first consume,
// if its capacity < size to read, decoder will create a new one with new capacity.
cacheBuffer = decoder.readBytes(cacheBuffer);
+ vector.setSafe(currentIndex, cacheBuffer, 0, cacheBuffer.limit());
+ currentIndex++;
+ }
- holder.start = 0;
- holder.end = cacheBuffer.limit();
- holder.buffer = vector.getAllocator().buffer(cacheBuffer.limit());
- holder.buffer.setBytes(0, cacheBuffer, 0, cacheBuffer.limit());
- writer.write(holder);
+ @Override
+ public void addNull() {
+ currentIndex++;
}
@Override
public void setPosition(int index) {
- writer.setPosition(index);
+ currentIndex = index;
}
@Override
@@ -82,6 +68,6 @@ public FieldVector getVector() {
@Override
public void close() throws Exception {
- writer.close();
+ vector.close();
}
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java
index 65388314fff..f92f325cfac 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java
@@ -21,8 +21,6 @@
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.Float8Vector;
-import org.apache.arrow.vector.complex.impl.Float8WriterImpl;
-import org.apache.arrow.vector.complex.writer.Float8Writer;
import org.apache.avro.io.Decoder;
/**
@@ -31,31 +29,30 @@
*/
public class AvroDoubleConsumer implements Consumer {
- private final Float8Writer writer;
private final Float8Vector vector;
+ private int currentIndex;
+
/**
* Instantiate a AvroDoubleConsumer.
*/
public AvroDoubleConsumer(Float8Vector vector) {
this.vector = vector;
- this.writer = new Float8WriterImpl(vector);
}
@Override
public void consume(Decoder decoder) throws IOException {
- writer.writeFloat8(decoder.readDouble());
- writer.setPosition(writer.getPosition() + 1);
+ vector.setSafe(currentIndex++, decoder.readDouble());
}
@Override
public void addNull() {
- writer.setPosition(writer.getPosition() + 1);
+ currentIndex++;
}
@Override
public void setPosition(int index) {
- writer.setPosition(index);
+ currentIndex = index;
}
@Override
@@ -65,6 +62,6 @@ public FieldVector getVector() {
@Override
public void close() throws Exception {
- writer.close();
+ vector.close();
}
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFixedConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFixedConsumer.java
new file mode 100644
index 00000000000..09f34fcb5df
--- /dev/null
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFixedConsumer.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers;
+
+import java.io.IOException;
+
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.FixedSizeBinaryVector;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which consume fixed type values from avro decoder.
+ * Write the data to {@link org.apache.arrow.vector.FixedSizeBinaryVector}.
+ */
+public class AvroFixedConsumer implements Consumer {
+
+ private final FixedSizeBinaryVector vector;
+ private final byte[] reuseBytes;
+
+ private int currentIndex;
+
+ /**
+ * Instantiate a AvroFixedConsumer.
+ */
+ public AvroFixedConsumer(FixedSizeBinaryVector vector, int size) {
+ this.vector = vector;
+ reuseBytes = new byte[size];
+ }
+
+ @Override
+ public void consume(Decoder decoder) throws IOException {
+ decoder.readFixed(reuseBytes);
+ vector.setSafe(currentIndex++, reuseBytes);
+ }
+
+ @Override
+ public void addNull() {
+ currentIndex++;
+ }
+
+ @Override
+ public void setPosition(int index) {
+ currentIndex = index;
+ }
+
+ @Override
+ public FieldVector getVector() {
+ return vector;
+ }
+
+ @Override
+ public void close() throws Exception {
+ vector.close();
+ }
+}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java
index 6256a9a2cf0..b27289a45ea 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java
@@ -21,8 +21,6 @@
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.Float4Vector;
-import org.apache.arrow.vector.complex.impl.Float4WriterImpl;
-import org.apache.arrow.vector.complex.writer.Float4Writer;
import org.apache.avro.io.Decoder;
/**
@@ -31,31 +29,30 @@
*/
public class AvroFloatConsumer implements Consumer {
- private final Float4Writer writer;
private final Float4Vector vector;
+ private int currentIndex;
+
/**
* Instantiate a AvroFloatConsumer.
*/
public AvroFloatConsumer(Float4Vector vector) {
this.vector = vector;
- this.writer = new Float4WriterImpl(vector);
}
@Override
public void consume(Decoder decoder) throws IOException {
- writer.writeFloat4(decoder.readFloat());
- writer.setPosition(writer.getPosition() + 1);
+ vector.setSafe(currentIndex++, decoder.readFloat());
}
@Override
public void addNull() {
- writer.setPosition(writer.getPosition() + 1);
+ currentIndex++;
}
@Override
public void setPosition(int index) {
- writer.setPosition(index);
+ currentIndex = index;
}
@Override
@@ -65,6 +62,6 @@ public FieldVector getVector() {
@Override
public void close() throws Exception {
- writer.close();
+ vector.close();
}
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java
index 854c8d07f40..b67fa46efed 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java
@@ -21,8 +21,6 @@
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.IntVector;
-import org.apache.arrow.vector.complex.impl.IntWriterImpl;
-import org.apache.arrow.vector.complex.writer.IntWriter;
import org.apache.avro.io.Decoder;
/**
@@ -31,31 +29,30 @@
*/
public class AvroIntConsumer implements Consumer {
- private final IntWriter writer;
private final IntVector vector;
+ private int currentIndex;
+
/**
* Instantiate a AvroIntConsumer.
*/
public AvroIntConsumer(IntVector vector) {
this.vector = vector;
- this.writer = new IntWriterImpl(vector);
}
@Override
public void consume(Decoder decoder) throws IOException {
- writer.writeInt(decoder.readInt());
- writer.setPosition(writer.getPosition() + 1);
+ vector.setSafe(currentIndex++, decoder.readInt());
}
@Override
public void addNull() {
- writer.setPosition(writer.getPosition() + 1);
+ currentIndex++;
}
@Override
public void setPosition(int index) {
- writer.setPosition(index);
+ currentIndex = index;
}
@Override
@@ -65,6 +62,6 @@ public FieldVector getVector() {
@Override
public void close() throws Exception {
- writer.close();
+ vector.close();
}
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java
index e0095cc53d4..231d61613d3 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java
@@ -21,8 +21,6 @@
import org.apache.arrow.vector.BigIntVector;
import org.apache.arrow.vector.FieldVector;
-import org.apache.arrow.vector.complex.impl.BigIntWriterImpl;
-import org.apache.arrow.vector.complex.writer.BigIntWriter;
import org.apache.avro.io.Decoder;
/**
@@ -31,31 +29,30 @@
*/
public class AvroLongConsumer implements Consumer {
- private final BigIntWriter writer;
private final BigIntVector vector;
+ private int currentIndex;
+
/**
* Instantiate a AvroLongConsumer.
*/
public AvroLongConsumer(BigIntVector vector) {
this.vector = vector;
- this.writer = new BigIntWriterImpl(vector);
}
@Override
public void consume(Decoder decoder) throws IOException {
- writer.writeBigInt(decoder.readLong());
- writer.setPosition(writer.getPosition() + 1);
+ vector.setSafe(currentIndex++, decoder.readLong());
}
@Override
public void addNull() {
- writer.setPosition(writer.getPosition() + 1);
+ currentIndex++;
}
@Override
public void setPosition(int index) {
- writer.setPosition(index);
+ currentIndex = index;
}
@Override
@@ -65,6 +62,6 @@ public FieldVector getVector() {
@Override
public void close() throws Exception {
- writer.close();
+ vector.close();
}
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java
new file mode 100644
index 00000000000..6a09b586635
--- /dev/null
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroMapConsumer.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers;
+
+import java.io.IOException;
+
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which consume map type values from avro decoder.
+ * Write the data to {@link MapVector}.
+ */
+public class AvroMapConsumer implements Consumer {
+
+ private final MapVector vector;
+ private final Consumer delegate;
+
+ private int currentIndex;
+
+ /**
+ * Instantiate a AvroMapConsumer.
+ */
+ public AvroMapConsumer(MapVector vector, Consumer delegate) {
+ this.vector = vector;
+ this.delegate = delegate;
+ }
+
+ @Override
+ public void consume(Decoder decoder) throws IOException {
+
+ vector.startNewValue(currentIndex);
+ long totalCount = 0;
+ for (long count = decoder.readMapStart(); count != 0; count = decoder.mapNext()) {
+ totalCount += count;
+ for (int element = 0; element < count; element++) {
+ delegate.consume(decoder);
+ }
+ }
+ vector.endValue(currentIndex, (int) totalCount);
+ currentIndex++;
+ }
+
+ @Override
+ public void addNull() {
+ vector.setValueCount(vector.getValueCount() + 1);
+ }
+
+ @Override
+ public void setPosition(int index) {
+ this.currentIndex = index;
+ }
+
+ @Override
+ public FieldVector getVector() {
+ return this.vector;
+ }
+
+ @Override
+ public void close() throws Exception {
+ vector.close();
+ delegate.close();
+ }
+}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java
index 850d699607c..8a1cc4c7e6e 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java
@@ -22,9 +22,6 @@
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VarCharVector;
-import org.apache.arrow.vector.complex.impl.VarCharWriterImpl;
-import org.apache.arrow.vector.complex.writer.VarCharWriter;
-import org.apache.arrow.vector.holders.VarCharHolder;
import org.apache.avro.io.Decoder;
/**
@@ -34,46 +31,32 @@
public class AvroStringConsumer implements Consumer {
private final VarCharVector vector;
- private final VarCharWriter writer;
private ByteBuffer cacheBuffer;
+ private int currentIndex;
/**
* Instantiate a AvroStringConsumer.
*/
public AvroStringConsumer(VarCharVector vector) {
this.vector = vector;
- this.writer = new VarCharWriterImpl(vector);
}
@Override
public void consume(Decoder decoder) throws IOException {
- writeValue(decoder);
- writer.setPosition(writer.getPosition() + 1);
- }
-
- @Override
- public void addNull() {
- writer.setPosition(writer.getPosition() + 1);
- }
-
- private void writeValue(Decoder decoder) throws IOException {
- VarCharHolder holder = new VarCharHolder();
-
// cacheBuffer is initialized null and create in the first consume,
// if its capacity < size to read, decoder will create a new one with new capacity.
cacheBuffer = decoder.readBytes(cacheBuffer);
+ vector.setSafe(currentIndex++, cacheBuffer, 0, cacheBuffer.limit());
+ }
- holder.start = 0;
- holder.end = cacheBuffer.limit();
- holder.buffer = vector.getAllocator().buffer(cacheBuffer.limit());
- holder.buffer.setBytes(0, cacheBuffer, 0, cacheBuffer.limit());
-
- writer.write(holder);
+ @Override
+ public void addNull() {
+ currentIndex++;
}
@Override
public void setPosition(int index) {
- writer.setPosition(index);
+ currentIndex = index;
}
@Override
@@ -83,6 +66,6 @@ public FieldVector getVector() {
@Override
public void close() throws Exception {
- writer.close();
+ vector.close();
}
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStructConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStructConsumer.java
new file mode 100644
index 00000000000..a6bec289c55
--- /dev/null
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStructConsumer.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.arrow.consumers;
+
+import java.io.IOException;
+
+import org.apache.arrow.util.AutoCloseables;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.complex.StructVector;
+import org.apache.avro.io.Decoder;
+
+/**
+ * Consumer which consume nested record type values from avro decoder.
+ * Write the data to {@link org.apache.arrow.vector.complex.StructVector}.
+ */
+public class AvroStructConsumer implements Consumer {
+
+ private final Consumer[] delegates;
+ private StructVector vector;
+
+ private int currentIndex;
+
+
+ /**
+ * Instantiate a AvroStructConsumer.
+ */
+ public AvroStructConsumer(StructVector vector, Consumer[] delegates) {
+ this.vector = vector;
+ this.delegates = delegates;
+ }
+
+ @Override
+ public void consume(Decoder decoder) throws IOException {
+
+ for (int i = 0; i < delegates.length; i++) {
+ delegates[i].consume(decoder);
+ }
+ vector.setIndexDefined(currentIndex);
+ currentIndex++;
+
+ }
+
+ @Override
+ public void addNull() {
+ currentIndex++;
+ }
+
+ @Override
+ public void setPosition(int index) {
+ currentIndex = index;
+ }
+
+ @Override
+ public FieldVector getVector() {
+ vector.setValueCount(currentIndex);
+ return this.vector;
+ }
+
+ @Override
+ public void close() throws Exception {
+ vector.close();
+ AutoCloseables.close(delegates);
+ }
+}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java
index b927a5ba5cc..5c61688d605 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroUnionsConsumer.java
@@ -21,7 +21,6 @@
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.complex.UnionVector;
-import org.apache.arrow.vector.complex.impl.UnionWriter;
import org.apache.arrow.vector.types.Types;
import org.apache.avro.io.Decoder;
@@ -34,15 +33,14 @@ public class AvroUnionsConsumer implements Consumer {
private Consumer[] delegates;
private Types.MinorType[] types;
- private UnionWriter writer;
private UnionVector vector;
+ private int currentIndex;
/**
- * Instantiate a AvroUnionConsumer.
+ * Instantiate an AvroUnionConsumer.
*/
public AvroUnionsConsumer(UnionVector vector, Consumer[] delegates, Types.MinorType[] types) {
- this.writer = new UnionWriter(vector);
this.vector = vector;
this.delegates = delegates;
this.types = types;
@@ -51,39 +49,37 @@ public AvroUnionsConsumer(UnionVector vector, Consumer[] delegates, Types.MinorT
@Override
public void consume(Decoder decoder) throws IOException {
int fieldIndex = decoder.readInt();
- int position = writer.getPosition();
Consumer delegate = delegates[fieldIndex];
- vector.setType(position, types[fieldIndex]);
+ vector.setType(currentIndex, types[fieldIndex]);
// In UnionVector we need to set sub vector writer position before consume a value
// because in the previous iterations we might not have written to the specific union sub vector.
- delegate.setPosition(position);
+ delegate.setPosition(currentIndex);
delegate.consume(decoder);
- writer.setPosition(position + 1);
-
+ currentIndex++;
}
@Override
public void addNull() {
- writer.setPosition(writer.getPosition() + 1);
+ currentIndex++;
}
@Override
public void setPosition(int index) {
- writer.setPosition(index);
+ currentIndex = index;
}
@Override
public FieldVector getVector() {
- vector.setValueCount(writer.getPosition());
+ vector.setValueCount(currentIndex);
return this.vector;
}
@Override
public void close() throws Exception {
- writer.close();
+ vector.close();
for (Consumer delegate: delegates) {
delegate.close();
}
diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/CompositeAvroConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/CompositeAvroConsumer.java
index 0f4b3c3afd9..b334a83ad6a 100644
--- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/CompositeAvroConsumer.java
+++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/CompositeAvroConsumer.java
@@ -17,7 +17,6 @@
package org.apache.arrow.consumers;
-import java.io.EOFException;
import java.io.IOException;
import java.util.List;
@@ -40,18 +39,8 @@ public CompositeAvroConsumer(List consumers) {
* Consume decoder data and write into {@link VectorSchemaRoot}.
*/
public void consume(Decoder decoder, VectorSchemaRoot root) throws IOException {
- int valueCount = 0;
- while (true) {
- try {
- for (Consumer consumer : consumers) {
- consumer.consume(decoder);
- }
- valueCount++;
- //reach end will throw EOFException.
- } catch (EOFException eofException) {
- root.setRowCount(valueCount);
- break;
- }
+ for (Consumer consumer : consumers) {
+ consumer.consume(decoder);
}
}
diff --git a/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java b/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java
index 47840d61f65..22eec9cd724 100644
--- a/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java
+++ b/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java
@@ -29,6 +29,7 @@
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Objects;
@@ -36,6 +37,9 @@
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.complex.ListVector;
+import org.apache.arrow.vector.complex.MapVector;
+import org.apache.arrow.vector.util.JsonStringArrayList;
import org.apache.arrow.vector.util.Text;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
@@ -286,6 +290,76 @@ public void testNullableBooleanType() throws Exception {
checkRecordResult(schema, data, root);
}
+ @Test
+ public void testArrayType() throws Exception {
+ Schema schema = getSchema("test_array.avsc");
+ List data = new ArrayList(Arrays.asList(
+ Arrays.asList("11", "222", "999"),
+ Arrays.asList("12222", "2333", "1000"),
+ Arrays.asList("1rrr", "2ggg"),
+ Arrays.asList("1vvv", "2bbb"),
+ Arrays.asList("1fff", "2")));
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ FieldVector vector = root.getFieldVectors().get(0);
+
+ checkArrayResult(data, (ListVector) vector);
+ }
+
+ @Test
+ public void testMapType() throws Exception {
+ Schema schema = getSchema("test_map.avsc");
+
+ List keys = Arrays.asList("key1", "key2", "key3", "key4", "key5", "key6");
+ List vals = Arrays.asList("val1", "val2", "val3", "val4", "val5", "val6");
+
+ List data = new ArrayList<>();
+ LinkedHashMap map1 = new LinkedHashMap();
+ map1.put(keys.get(0), vals.get(0));
+ map1.put(keys.get(1), vals.get(1));
+ data.add(map1);
+
+ LinkedHashMap map2 = new LinkedHashMap();
+ map2.put(keys.get(2), vals.get(2));
+ map2.put(keys.get(3), vals.get(3));
+ data.add(map2);
+
+ LinkedHashMap map3 = new LinkedHashMap();
+ map3.put(keys.get(4), vals.get(4));
+ map3.put(keys.get(5), vals.get(5));
+ data.add(map3);
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ MapVector vector = (MapVector) root.getFieldVectors().get(0);
+
+ checkPrimitiveResult(keys, vector.getDataVector().getChildrenFromFields().get(0));
+ checkPrimitiveResult(vals, vector.getDataVector().getChildrenFromFields().get(1));
+ assertEquals(0, vector.getOffsetBuffer().getInt(0));
+ assertEquals(2, vector.getOffsetBuffer().getInt(1 * 4));
+ assertEquals(4, vector.getOffsetBuffer().getInt(2 * 4));
+ assertEquals(6, vector.getOffsetBuffer().getInt(3 * 4));
+ }
+
+ @Test
+ public void testFixedType() throws Exception {
+ Schema schema = getSchema("test_fixed.avsc");
+
+ List data = new ArrayList<>();
+ List expected = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ byte[] value = ("value" + i).getBytes(StandardCharsets.UTF_8);
+ expected.add(value);
+ GenericData.Fixed fixed = new GenericData.Fixed(schema);
+ fixed.bytes(value);
+ data.add(fixed);
+ }
+
+ VectorSchemaRoot root = writeAndRead(schema, data);
+ FieldVector vector = root.getFieldVectors().get(0);
+
+ checkPrimitiveResult(expected, vector);
+ }
+
@Test
public void testUnionType() throws Exception {
Schema schema = getSchema("test_union.avsc");
@@ -332,7 +406,32 @@ public void testNullableUnionType() throws Exception {
checkPrimitiveResult(expected, vector);
}
- private void checkPrimitiveResult(ArrayList data, FieldVector vector) {
+ private void checkArrayResult(List expected, ListVector vector) {
+ assertEquals(expected.size(), vector.getValueCount());
+ for (int i = 0; i < expected.size(); i++) {
+ checkPrimitiveResult(expected.get(i), (JsonStringArrayList) vector.getObject(i));
+ }
+ }
+
+ private void checkPrimitiveResult(List expected, List actual) {
+ assertEquals(expected.size(), actual.size());
+ for (int i = 0; i < expected.size(); i++) {
+ Object value1 = expected.get(i);
+ Object value2 = actual.get(i);
+ if (value1 == null) {
+ assertTrue(value2 == null);
+ continue;
+ }
+ if (value2 instanceof byte[]) {
+ value2 = ByteBuffer.wrap((byte[]) value2);
+ } else if (value2 instanceof Text) {
+ value2 = value2.toString();
+ }
+ assertTrue(Objects.equals(value1, value2));
+ }
+ }
+
+ private void checkPrimitiveResult(List data, FieldVector vector) {
assertEquals(data.size(), vector.getValueCount());
for (int i = 0; i < data.size(); i++) {
Object value1 = data.get(i);
@@ -343,6 +442,9 @@ private void checkPrimitiveResult(ArrayList data, FieldVector vector) {
}
if (value2 instanceof byte[]) {
value2 = ByteBuffer.wrap((byte[]) value2);
+ if (value1 instanceof byte[]) {
+ value1 = ByteBuffer.wrap((byte[]) value1);
+ }
} else if (value2 instanceof Text) {
value2 = value2.toString();
}
diff --git a/java/adapter/avro/src/test/resources/schema/test_array.avsc b/java/adapter/avro/src/test/resources/schema/test_array.avsc
new file mode 100644
index 00000000000..5b75a4031d8
--- /dev/null
+++ b/java/adapter/avro/src/test/resources/schema/test_array.avsc
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "array",
+ "items": "string",
+ "name": "testArray"
+}
diff --git a/java/adapter/avro/src/test/resources/schema/test_fixed.avsc b/java/adapter/avro/src/test/resources/schema/test_fixed.avsc
new file mode 100644
index 00000000000..a4d96e9ab55
--- /dev/null
+++ b/java/adapter/avro/src/test/resources/schema/test_fixed.avsc
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "fixed",
+ "size": 6,
+ "name": "testFixed"
+}
diff --git a/java/adapter/avro/src/test/resources/schema/test_map.avsc b/java/adapter/avro/src/test/resources/schema/test_map.avsc
new file mode 100644
index 00000000000..0dfa3a595bb
--- /dev/null
+++ b/java/adapter/avro/src/test/resources/schema/test_map.avsc
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+{
+ "namespace": "org.apache.arrow.avro",
+ "type": "map",
+ "values": "string",
+ "name": "testMap"
+}