From 54479c8c087e053db1ec669d42d52e5fb27e87b3 Mon Sep 17 00:00:00 2001 From: tianchen Date: Mon, 8 Jul 2019 12:24:46 +0800 Subject: [PATCH 1/5] Initial implement to convert Avro record with primitive types --- java/adapter/avro/pom.xml | 14 + .../java/org/apache/arrow/AvroToArrow.java | 71 ++++ .../org/apache/arrow/AvroToArrowUtils.java | 273 ++++++++++++++ .../org/apache/arrow/AvroToArrowTest.java | 334 ++++++++++++++++++ .../test/resources/schema/test_boolean.avsc | 24 ++ .../src/test/resources/schema/test_bytes.avsc | 24 ++ .../test/resources/schema/test_double.avsc | 24 ++ .../src/test/resources/schema/test_float.avsc | 24 ++ .../src/test/resources/schema/test_int.avsc | 24 ++ .../src/test/resources/schema/test_long.avsc | 24 ++ .../schema/test_mixed_primitive.avsc | 26 ++ .../test/resources/schema/test_string.avsc | 24 ++ 12 files changed, 886 insertions(+) create mode 100644 java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java create mode 100644 java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java create mode 100644 java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java create mode 100644 java/adapter/avro/src/test/resources/schema/test_boolean.avsc create mode 100644 java/adapter/avro/src/test/resources/schema/test_bytes.avsc create mode 100644 java/adapter/avro/src/test/resources/schema/test_double.avsc create mode 100644 java/adapter/avro/src/test/resources/schema/test_float.avsc create mode 100644 java/adapter/avro/src/test/resources/schema/test_int.avsc create mode 100644 java/adapter/avro/src/test/resources/schema/test_long.avsc create mode 100644 java/adapter/avro/src/test/resources/schema/test_mixed_primitive.avsc create mode 100644 java/adapter/avro/src/test/resources/schema/test_string.avsc diff --git a/java/adapter/avro/pom.xml b/java/adapter/avro/pom.xml index 4e13d57fb34..8ef7548d38e 100644 --- a/java/adapter/avro/pom.xml +++ b/java/adapter/avro/pom.xml @@ -28,6 +28,20 @@ + + + org.apache.arrow + arrow-memory + ${project.version} + + + + + org.apache.arrow + arrow-vector + ${project.version} + + org.apache.avro avro diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java new file mode 100644 index 00000000000..6005982814e --- /dev/null +++ b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow; + +import java.io.File; +import java.io.IOException; + +import org.apache.arrow.memory.BaseAllocator; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; + +/** + * Utility class to convert Avro objects to columnar Arrow format objects. + */ +public class AvroToArrow { + + /** + * Fetch the data from {@link DataFileReader} and convert it to Arrow objects. + * @param reader avro data file reader. + * @param allocator Memory allocator to use. + * @return Arrow Data Objects {@link VectorSchemaRoot} + */ + public static VectorSchemaRoot readToArrow(DataFileReader reader, BaseAllocator allocator) { + Preconditions.checkNotNull(reader, "Avro DataFileReader object can not be null"); + + VectorSchemaRoot root = VectorSchemaRoot.create( + AvroToArrowUtils.avroToArrowSchema(reader.getSchema()), allocator); + AvroToArrowUtils.avroToArrowVectors(reader, root); + return root; + } + + /** + * Fetch the data with given avro schema file and dataFile, convert it to Arrow objects. + * @param schemaFile avro schema file. + * @param dataFile avro data file. + * @param allocator Memory allocator to use. + * @return Arrow Data Objects {@link VectorSchemaRoot} + */ + public static VectorSchemaRoot readToArrow(File schemaFile, File dataFile, BaseAllocator allocator) + throws IOException { + + Schema schema = new Schema.Parser().parse(schemaFile); + DatumReader datumReader = new GenericDatumReader(schema); + DataFileReader dataFileReader = new DataFileReader(dataFile, datumReader); + + return readToArrow(dataFileReader, allocator); + } + +} + + diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java new file mode 100644 index 00000000000..31066523033 --- /dev/null +++ b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow; + +import static org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE; +import static org.apache.arrow.vector.types.FloatingPointPrecision.SINGLE; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.BaseFixedWidthVector; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.complex.impl.VarBinaryWriterImpl; +import org.apache.arrow.vector.holders.BigIntHolder; +import org.apache.arrow.vector.holders.BitHolder; +import org.apache.arrow.vector.holders.Float4Holder; +import org.apache.arrow.vector.holders.Float8Holder; +import org.apache.arrow.vector.holders.IntHolder; +import org.apache.arrow.vector.holders.VarBinaryHolder; +import org.apache.arrow.vector.holders.VarCharHolder; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Type; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.util.Utf8; + +/** + * Class that does most of the work to convert Avro data into Arrow columnar format Vector objects. + */ +public class AvroToArrowUtils { + + private static final int DEFAULT_BUFFER_SIZE = 256; + + /** + * Creates an {@link org.apache.arrow.vector.types.pojo.ArrowType} from the {@link Schema.Field} + * +

This method currently performs following type mapping for Avro data types to corresponding Arrow data types. + * + *

    + *
  • STRING --> ArrowType.Utf8
  • + *
  • INT --> ArrowType.Int(32, signed)
  • + *
  • LONG --> ArrowType.Int(64, signed)
  • + *
  • FLOAT --> ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE)
  • + *
  • DOUBLE --> ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE)
  • + *
  • BOOLEAN --> ArrowType.Bool
  • + *
  • BYTES --> ArrowType.Binary
  • + *
+ */ + private static ArrowType getArrowTypeForAvroField(Schema.Field field) { + + Preconditions.checkNotNull(field, "Avro Field object can't be null"); + + Type avroType = field.schema().getType(); + final ArrowType arrowType; + + switch (avroType) { + case STRING: + arrowType = new ArrowType.Utf8(); + break; + case INT: + arrowType = new ArrowType.Int(32, /*signed=*/true); + break; + case BOOLEAN: + arrowType = new ArrowType.Bool(); + break; + case LONG: + arrowType = new ArrowType.Int(64, /*signed=*/true); + break; + case FLOAT: + arrowType = new ArrowType.FloatingPoint(SINGLE); + break; + case DOUBLE: + arrowType = new ArrowType.FloatingPoint(DOUBLE); + break; + case BYTES: + arrowType = new ArrowType.Binary(); + break; + default: + // no-op, shouldn't get here + throw new RuntimeException("Can't convert avro type %s to arrow type." + avroType.getName()); + } + + return arrowType; + } + + /** + * Create Arrow {@link org.apache.arrow.vector.types.pojo.Schema} object for the given Avro {@link Schema}. + */ + public static org.apache.arrow.vector.types.pojo.Schema avroToArrowSchema(Schema schema) { + Preconditions.checkNotNull(schema, "Avro Schema object can't be null"); + + List arrowFields = new ArrayList<>(); + for (Schema.Field field : schema.getFields()) { + final ArrowType arrowType = getArrowTypeForAvroField(field); + if (arrowType != null) { + final FieldType fieldType = new FieldType(true, arrowType, /* dictionary encoding */ null, null); + List children = null; + //TODO handle complex type children fields + arrowFields.add(new Field(field.name(), fieldType, children)); + } + } + return new org.apache.arrow.vector.types.pojo.Schema(arrowFields, null); + } + + /** + * Iterate the given Avro {@link DataFileReader} object to fetch the data and transpose it to populate + * the given Arrow Vector objects. + * @param reader avro reader to read data. + * @param root Arrow {@link VectorSchemaRoot} object to populate + */ + public static void avroToArrowVectors(DataFileReader reader, VectorSchemaRoot root) { + + Preconditions.checkNotNull(reader, "Avro DataFileReader object can't be null"); + Preconditions.checkNotNull(root, "VectorSchemaRoot object can't be null"); + + allocateVectors(root, DEFAULT_BUFFER_SIZE); + + int rowCount = 0; + while (reader.hasNext()) { + GenericRecord record = reader.next(); + for (Schema.Field field : reader.getSchema().getFields()) { + Object value = record.get(field.name()); + avroToFieldVector( + value, + field.schema().getType(), + rowCount, + root.getVector(field.name())); + } + rowCount++; + } + root.setRowCount(rowCount); + } + + /** + * Put the value into the vector at specific position. + */ + public static void avroToFieldVector(Object value, Type avroType, int rowCount, FieldVector vector) { + switch (avroType) { + case STRING: + updateVector((VarCharVector) vector, value, rowCount); + break; + case INT: + updateVector((IntVector) vector, value, rowCount); + break; + case BOOLEAN: + updateVector((BitVector)vector, value, rowCount); + break; + case LONG: + updateVector((BigIntVector) vector, value, rowCount); + break; + case FLOAT: + updateVector((Float4Vector) vector, value, rowCount); + break; + case DOUBLE: + updateVector((Float8Vector) vector, value, rowCount); + break; + case BYTES: + updateVector((VarBinaryVector) vector, value, rowCount); + break; + default: + // no-op, shouldn't get here + throw new RuntimeException("Can't convert avro type %s to arrow type." + avroType.getName()); + } + } + + private static void updateVector(VarBinaryVector varBinaryVector, Object value, int rowCount) { + VarBinaryHolder holder = new VarBinaryHolder(); + Preconditions.checkNotNull(value, "value should not be null"); + VarBinaryWriterImpl writer = new VarBinaryWriterImpl(varBinaryVector); + + byte[] bytes = ((ByteBuffer) value).array(); + holder.buffer = varBinaryVector.getAllocator().buffer(bytes.length); + holder.buffer.setBytes(0, bytes, 0, bytes.length); + holder.start = 0; + holder.end = bytes.length; + + writer.setPosition(rowCount); + writer.write(holder); + } + + private static void updateVector(Float4Vector float4Vector, Object value, int rowCount) { + Float4Holder holder = new Float4Holder(); + Preconditions.checkNotNull(value, "value should not be null"); + holder.value = (float) value; + float4Vector.setSafe(rowCount, holder); + float4Vector.setValueCount(rowCount + 1); + } + + private static void updateVector(Float8Vector float8Vector, Object value, int rowCount) { + Float8Holder holder = new Float8Holder(); + Preconditions.checkNotNull(value, "value should not be null"); + holder.value = (double) value; + float8Vector.setSafe(rowCount, holder); + float8Vector.setValueCount(rowCount + 1); + } + + private static void updateVector(BigIntVector bigIntVector, Object value, int rowCount) { + BigIntHolder holder = new BigIntHolder(); + Preconditions.checkNotNull(value, "value should not be null"); + holder.value = (long) value; + bigIntVector.setSafe(rowCount, holder); + bigIntVector.setValueCount(rowCount + 1); + } + + private static void updateVector(BitVector bitVector, Object value, int rowCount) { + BitHolder holder = new BitHolder(); + Preconditions.checkNotNull(value, "value should not be null"); + holder.value = (boolean) value ? 1 : 0; + bitVector.setSafe(rowCount, holder); + bitVector.setValueCount(rowCount + 1); + } + + private static void updateVector(IntVector intVector, Object value, int rowCount) { + IntHolder holder = new IntHolder(); + Preconditions.checkNotNull(value, "value should not be null"); + holder.value = (int) value; + intVector.setSafe(rowCount, holder); + intVector.setValueCount(rowCount + 1); + } + + private static void updateVector(VarCharVector varcharVector, Object value, int rowCount) { + VarCharHolder holder = new VarCharHolder(); + Preconditions.checkNotNull(value, "value should not be null"); + varcharVector.setIndexDefined(rowCount); + byte[] bytes = ((Utf8)value).getBytes(); + holder.buffer = varcharVector.getAllocator().buffer(bytes.length); + holder.buffer.setBytes(0, bytes, 0, bytes.length); + holder.start = 0; + holder.end = bytes.length; + varcharVector.setSafe(rowCount, holder); + varcharVector.setValueCount(rowCount + 1); + } + + private static void allocateVectors(VectorSchemaRoot root, int size) { + List vectors = root.getFieldVectors(); + for (FieldVector fieldVector : vectors) { + if (fieldVector instanceof BaseFixedWidthVector) { + ((BaseFixedWidthVector) fieldVector).allocateNew(size); + } else { + fieldVector.allocateNew(); + } + fieldVector.setInitialCapacity(size); + } + } +} diff --git a/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java b/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java new file mode 100644 index 00000000000..683e2c92eb4 --- /dev/null +++ b/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; + +import org.apache.arrow.memory.BaseAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DatumWriter; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class AvroToArrowTest { + + @ClassRule + public static final TemporaryFolder TMP = new TemporaryFolder(); + + private BaseAllocator allocator; + + @Before + public void init() { + allocator = new RootAllocator(Long.MAX_VALUE); + } + + @Test + public void testStringType() throws Exception { + File dataFile = TMP.newFile(); + Path schemaPath = Paths + .get(TestWriteReadAvroRecord.class.getResource("/").getPath(), "schema", "test_string.avsc"); + Schema schema = new Schema.Parser().parse(schemaPath.toFile()); + + //write data to disk + DatumWriter datumWriter = new GenericDatumWriter(schema); + DataFileWriter dataFileWriter = new DataFileWriter(datumWriter); + dataFileWriter.create(schema, dataFile); + + List data = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + GenericRecord record = new GenericData.Record(schema); + record.put("f0", "value" + i); + dataFileWriter.append(record); + data.add(record); + } + dataFileWriter.close(); + + DatumReader datumReader = new GenericDatumReader(schema); + DataFileReader + dataFileReader = new DataFileReader(dataFile, datumReader); + + VectorSchemaRoot root = AvroToArrow.readToArrow(dataFileReader, allocator); + + checkResult(schema, data, root); + } + + @Test + public void testIntType() throws Exception { + File dataFile = TMP.newFile(); + Path schemaPath = Paths + .get(TestWriteReadAvroRecord.class.getResource("/").getPath(), "schema", "test_int.avsc"); + Schema schema = new Schema.Parser().parse(schemaPath.toFile()); + + //write data to disk + DatumWriter datumWriter = new GenericDatumWriter(schema); + DataFileWriter dataFileWriter = new DataFileWriter(datumWriter); + dataFileWriter.create(schema, dataFile); + + List data = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + GenericRecord record = new GenericData.Record(schema); + record.put("f0", i); + dataFileWriter.append(record); + data.add(record); + } + dataFileWriter.close(); + + DatumReader datumReader = new GenericDatumReader(schema); + DataFileReader + dataFileReader = new DataFileReader(dataFile, datumReader); + + VectorSchemaRoot root = AvroToArrow.readToArrow(dataFileReader, allocator); + + checkResult(schema, data, root); + } + + @Test + public void testBooleanType() throws Exception { + File dataFile = TMP.newFile(); + Path schemaPath = Paths + .get(TestWriteReadAvroRecord.class.getResource("/").getPath(), "schema", "test_boolean.avsc"); + Schema schema = new Schema.Parser().parse(schemaPath.toFile()); + + //write data to disk + DatumWriter datumWriter = new GenericDatumWriter(schema); + DataFileWriter dataFileWriter = new DataFileWriter(datumWriter); + dataFileWriter.create(schema, dataFile); + + List data = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + GenericRecord record = new GenericData.Record(schema); + record.put("f0", true); + dataFileWriter.append(record); + data.add(record); + } + dataFileWriter.close(); + + DatumReader datumReader = new GenericDatumReader(schema); + DataFileReader + dataFileReader = new DataFileReader(dataFile, datumReader); + + VectorSchemaRoot root = AvroToArrow.readToArrow(dataFileReader, allocator); + + checkResult(schema, data, root); + } + + @Test + public void testLongType() throws Exception { + File dataFile = TMP.newFile(); + Path schemaPath = Paths + .get(TestWriteReadAvroRecord.class.getResource("/").getPath(), "schema", "test_long.avsc"); + Schema schema = new Schema.Parser().parse(schemaPath.toFile()); + + //write data to disk + DatumWriter datumWriter = new GenericDatumWriter(schema); + DataFileWriter dataFileWriter = new DataFileWriter(datumWriter); + dataFileWriter.create(schema, dataFile); + + List data = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + GenericRecord record = new GenericData.Record(schema); + record.put("f0", (long) i); + dataFileWriter.append(record); + data.add(record); + } + dataFileWriter.close(); + + DatumReader datumReader = new GenericDatumReader(schema); + DataFileReader + dataFileReader = new DataFileReader(dataFile, datumReader); + + VectorSchemaRoot root = AvroToArrow.readToArrow(dataFileReader, allocator); + + checkResult(schema, data, root); + } + + @Test + public void testFloatType() throws Exception { + File dataFile = TMP.newFile(); + Path schemaPath = Paths + .get(TestWriteReadAvroRecord.class.getResource("/").getPath(), "schema", "test_float.avsc"); + Schema schema = new Schema.Parser().parse(schemaPath.toFile()); + + //write data to disk + DatumWriter datumWriter = new GenericDatumWriter(schema); + DataFileWriter dataFileWriter = new DataFileWriter(datumWriter); + dataFileWriter.create(schema, dataFile); + + float suffix = 0.33f; + List data = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + GenericRecord record = new GenericData.Record(schema); + record.put("f0", i + suffix); + dataFileWriter.append(record); + data.add(record); + } + dataFileWriter.close(); + + DatumReader datumReader = new GenericDatumReader(schema); + DataFileReader + dataFileReader = new DataFileReader(dataFile, datumReader); + + VectorSchemaRoot root = AvroToArrow.readToArrow(dataFileReader, allocator); + + checkResult(schema, data, root); + } + + @Test + public void testDoubleType() throws Exception { + File dataFile = TMP.newFile(); + Path schemaPath = Paths + .get(TestWriteReadAvroRecord.class.getResource("/").getPath(), "schema", "test_double.avsc"); + Schema schema = new Schema.Parser().parse(schemaPath.toFile()); + + //write data to disk + DatumWriter datumWriter = new GenericDatumWriter(schema); + DataFileWriter dataFileWriter = new DataFileWriter(datumWriter); + dataFileWriter.create(schema, dataFile); + + double suffix = 0.33; + List data = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + GenericRecord record = new GenericData.Record(schema); + record.put("f0", i + suffix); + dataFileWriter.append(record); + data.add(record); + } + dataFileWriter.close(); + + DatumReader datumReader = new GenericDatumReader(schema); + DataFileReader + dataFileReader = new DataFileReader(dataFile, datumReader); + + VectorSchemaRoot root = AvroToArrow.readToArrow(dataFileReader, allocator); + + checkResult(schema, data, root); + } + + @Test + public void testMixedType() throws Exception { + File dataFile = TMP.newFile(); + Path schemaPath = Paths + .get(TestWriteReadAvroRecord.class.getResource("/").getPath(), "schema", "test_mixed_primitive.avsc"); + Schema schema = new Schema.Parser().parse(schemaPath.toFile()); + + //write data to disk + DatumWriter datumWriter = new GenericDatumWriter(schema); + DataFileWriter dataFileWriter = new DataFileWriter(datumWriter); + dataFileWriter.create(schema, dataFile); + + double suffix = 0.33; + List data = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + GenericRecord record = new GenericData.Record(schema); + record.put("f0", "test" + i); + record.put("f1", i); + record.put("f2", i + suffix); + dataFileWriter.append(record); + data.add(record); + } + dataFileWriter.close(); + + DatumReader datumReader = new GenericDatumReader(schema); + DataFileReader + dataFileReader = new DataFileReader(dataFile, datumReader); + + VectorSchemaRoot root = AvroToArrow.readToArrow(dataFileReader, allocator); + + checkResult(schema, data, root); + } + + @Test + public void testBytesType() throws Exception { + File dataFile = TMP.newFile(); + Path schemaPath = Paths + .get(TestWriteReadAvroRecord.class.getResource("/").getPath(), "schema", "test_bytes.avsc"); + Schema schema = new Schema.Parser().parse(schemaPath.toFile()); + + //write data to disk + DatumWriter datumWriter = new GenericDatumWriter(schema); + DataFileWriter dataFileWriter = new DataFileWriter(datumWriter); + dataFileWriter.create(schema, dataFile); + + List data = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + GenericRecord record = new GenericData.Record(schema); + ByteBuffer value = ByteBuffer.wrap(("test" + i).getBytes(StandardCharsets.UTF_8)); + record.put("f0", value); + dataFileWriter.append(record); + data.add(record); + } + dataFileWriter.close(); + + DatumReader datumReader = new GenericDatumReader(schema); + DataFileReader + dataFileReader = new DataFileReader(dataFile, datumReader); + + VectorSchemaRoot root = AvroToArrow.readToArrow(dataFileReader, allocator); + + checkResult(schema, data, root); + } + + private void checkResult(Schema schema, List rawRecords, VectorSchemaRoot vectorSchemaRoot) { + List fields = schema.getFields(); + + for (int i = 0; i < rawRecords.size(); i++) { + GenericRecord record = rawRecords.get(i); + for (Schema.Field field: fields) { + Object value = record.get(field.name()); + FieldVector vector = vectorSchemaRoot.getVector(field.name()); + if (value == null) { + assertTrue(vector.isNull(i)); + } else { + Object value1 = value; + Object value2 = vector.getObject(i); + if (field.schema().getType() == Schema.Type.STRING) { + value1 = value1.toString(); + value2 = value2.toString(); + } else if (field.schema().getType() == Schema.Type.BYTES) { + //convert to ByteBuffer for equals check + value2 = ByteBuffer.wrap((byte[]) value2); + } + assertEquals(value1, value2); + } + } + } + + } + +} diff --git a/java/adapter/avro/src/test/resources/schema/test_boolean.avsc b/java/adapter/avro/src/test/resources/schema/test_boolean.avsc new file mode 100644 index 00000000000..604a790e73d --- /dev/null +++ b/java/adapter/avro/src/test/resources/schema/test_boolean.avsc @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"namespace": "org.apache.arrow.avro", + "type": "record", + "name": "TestBoolean", + "fields": [ + {"name": "f0", "type": "boolean"} + ] +} diff --git a/java/adapter/avro/src/test/resources/schema/test_bytes.avsc b/java/adapter/avro/src/test/resources/schema/test_bytes.avsc new file mode 100644 index 00000000000..bd699926714 --- /dev/null +++ b/java/adapter/avro/src/test/resources/schema/test_bytes.avsc @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"namespace": "org.apache.arrow.avro", + "type": "record", + "name": "TestBytes", + "fields": [ + {"name": "f0", "type": "bytes"} + ] +} diff --git a/java/adapter/avro/src/test/resources/schema/test_double.avsc b/java/adapter/avro/src/test/resources/schema/test_double.avsc new file mode 100644 index 00000000000..00cbf533165 --- /dev/null +++ b/java/adapter/avro/src/test/resources/schema/test_double.avsc @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"namespace": "org.apache.arrow.avro", + "type": "record", + "name": "TestDouble", + "fields": [ + {"name": "f0", "type": "double"} + ] +} diff --git a/java/adapter/avro/src/test/resources/schema/test_float.avsc b/java/adapter/avro/src/test/resources/schema/test_float.avsc new file mode 100644 index 00000000000..9e8ffe0a4d3 --- /dev/null +++ b/java/adapter/avro/src/test/resources/schema/test_float.avsc @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"namespace": "org.apache.arrow.avro", + "type": "record", + "name": "TestFloat", + "fields": [ + {"name": "f0", "type": "float"} + ] +} diff --git a/java/adapter/avro/src/test/resources/schema/test_int.avsc b/java/adapter/avro/src/test/resources/schema/test_int.avsc new file mode 100644 index 00000000000..4ffe8f50cfb --- /dev/null +++ b/java/adapter/avro/src/test/resources/schema/test_int.avsc @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"namespace": "org.apache.arrow.avro", + "type": "record", + "name": "TestInt", + "fields": [ + {"name": "f0", "type": "int"} + ] +} diff --git a/java/adapter/avro/src/test/resources/schema/test_long.avsc b/java/adapter/avro/src/test/resources/schema/test_long.avsc new file mode 100644 index 00000000000..de3e4be137f --- /dev/null +++ b/java/adapter/avro/src/test/resources/schema/test_long.avsc @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"namespace": "org.apache.arrow.avro", + "type": "record", + "name": "TestLong", + "fields": [ + {"name": "f0", "type": "long"} + ] +} diff --git a/java/adapter/avro/src/test/resources/schema/test_mixed_primitive.avsc b/java/adapter/avro/src/test/resources/schema/test_mixed_primitive.avsc new file mode 100644 index 00000000000..7b375e3bc76 --- /dev/null +++ b/java/adapter/avro/src/test/resources/schema/test_mixed_primitive.avsc @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"namespace": "org.apache.arrow.avro", + "type": "record", + "name": "TestMixed", + "fields": [ + {"name": "f0", "type": "string"}, + {"name": "f1", "type": "int"}, + {"name": "f2", "type": "double"} + ] +} diff --git a/java/adapter/avro/src/test/resources/schema/test_string.avsc b/java/adapter/avro/src/test/resources/schema/test_string.avsc new file mode 100644 index 00000000000..52be1475753 --- /dev/null +++ b/java/adapter/avro/src/test/resources/schema/test_string.avsc @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{"namespace": "org.apache.arrow.avro", + "type": "record", + "name": "TestString", + "fields": [ + {"name": "f0", "type": "string"} + ] +} From 61d2dac9d7bb873b8fb6906fd4c48d7ed8293df4 Mon Sep 17 00:00:00 2001 From: tianchen Date: Wed, 10 Jul 2019 11:04:30 +0800 Subject: [PATCH 2/5] fix style --- .../avro/src/main/java/org/apache/arrow/AvroToArrow.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java index 6005982814e..d6125a26054 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java @@ -67,5 +67,3 @@ public static VectorSchemaRoot readToArrow(File schemaFile, File dataFile, BaseA } } - - From 7c3a7305448b96f0c1a58215f79e09329a50bcef Mon Sep 17 00:00:00 2001 From: tianchen Date: Sat, 13 Jul 2019 11:20:18 +0800 Subject: [PATCH 3/5] add consumers and use GenericDatumReader --- .../java/org/apache/arrow/AvroToArrow.java | 36 +- .../org/apache/arrow/AvroToArrowUtils.java | 230 +++++------- .../arrow/consumers/AvroBooleanConsumer.java | 44 +++ .../arrow/consumers/AvroBytesConsumer.java | 56 +++ .../arrow/consumers/AvroDoubleConsumer.java | 44 +++ .../arrow/consumers/AvroFloatConsumer.java | 44 +++ .../arrow/consumers/AvroIntConsumer.java} | 32 +- .../arrow/consumers/AvroLongConsumer.java | 44 +++ .../arrow/consumers/AvroStringConsumer.java | 56 +++ .../org/apache/arrow/consumers/Consumer.java} | 18 +- .../org/apache/arrow/AvroToArrowTest.java | 328 +++++++----------- .../schema/test_mixed_primitive.avsc | 26 -- ...t_int.avsc => test_primitive_boolean.avsc} | 10 +- ...st_long.avsc => test_primitive_bytes.avsc} | 10 +- ..._bytes.avsc => test_primitive_double.avsc} | 10 +- ...t_float.avsc => test_primitive_float.avsc} | 10 +- .../resources/schema/test_primitive_int.avsc | 22 ++ .../resources/schema/test_primitive_long.avsc | 22 ++ .../schema/test_primitive_string.avsc | 22 ++ .../test/resources/schema/test_string.avsc | 24 -- 20 files changed, 623 insertions(+), 465 deletions(-) create mode 100644 java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java create mode 100644 java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java create mode 100644 java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java create mode 100644 java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java rename java/adapter/avro/src/{test/resources/schema/test_double.avsc => main/java/org/apache/arrow/consumers/AvroIntConsumer.java} (52%) create mode 100644 java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java create mode 100644 java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java rename java/adapter/avro/src/{test/resources/schema/test_boolean.avsc => main/java/org/apache/arrow/consumers/Consumer.java} (75%) delete mode 100644 java/adapter/avro/src/test/resources/schema/test_mixed_primitive.avsc rename java/adapter/avro/src/test/resources/schema/{test_int.avsc => test_primitive_boolean.avsc} (86%) rename java/adapter/avro/src/test/resources/schema/{test_long.avsc => test_primitive_bytes.avsc} (85%) rename java/adapter/avro/src/test/resources/schema/{test_bytes.avsc => test_primitive_double.avsc} (85%) rename java/adapter/avro/src/test/resources/schema/{test_float.avsc => test_primitive_float.avsc} (85%) create mode 100644 java/adapter/avro/src/test/resources/schema/test_primitive_int.avsc create mode 100644 java/adapter/avro/src/test/resources/schema/test_primitive_long.avsc create mode 100644 java/adapter/avro/src/test/resources/schema/test_primitive_string.avsc delete mode 100644 java/adapter/avro/src/test/resources/schema/test_string.avsc diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java index d6125a26054..fc49b073e5a 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java @@ -17,17 +17,11 @@ package org.apache.arrow; -import java.io.File; -import java.io.IOException; - import org.apache.arrow.memory.BaseAllocator; import org.apache.arrow.util.Preconditions; import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.avro.Schema; -import org.apache.avro.file.DataFileReader; import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.DatumReader; +import org.apache.avro.io.Decoder; /** * Utility class to convert Avro objects to columnar Arrow format objects. @@ -35,35 +29,17 @@ public class AvroToArrow { /** - * Fetch the data from {@link DataFileReader} and convert it to Arrow objects. - * @param reader avro data file reader. + * Fetch the data from {@link GenericDatumReader} and convert it to Arrow objects. + * @param reader avro datum reader. * @param allocator Memory allocator to use. * @return Arrow Data Objects {@link VectorSchemaRoot} */ - public static VectorSchemaRoot readToArrow(DataFileReader reader, BaseAllocator allocator) { - Preconditions.checkNotNull(reader, "Avro DataFileReader object can not be null"); + public static VectorSchemaRoot avroToArrow(GenericDatumReader reader, Decoder decoder, BaseAllocator allocator) { + Preconditions.checkNotNull(reader, "Avro reader object can not be null"); VectorSchemaRoot root = VectorSchemaRoot.create( AvroToArrowUtils.avroToArrowSchema(reader.getSchema()), allocator); - AvroToArrowUtils.avroToArrowVectors(reader, root); + AvroToArrowUtils.avroToArrowVectors(decoder, root); return root; } - - /** - * Fetch the data with given avro schema file and dataFile, convert it to Arrow objects. - * @param schemaFile avro schema file. - * @param dataFile avro data file. - * @param allocator Memory allocator to use. - * @return Arrow Data Objects {@link VectorSchemaRoot} - */ - public static VectorSchemaRoot readToArrow(File schemaFile, File dataFile, BaseAllocator allocator) - throws IOException { - - Schema schema = new Schema.Parser().parse(schemaFile); - DatumReader datumReader = new GenericDatumReader(schema); - DataFileReader dataFileReader = new DataFileReader(dataFile, datumReader); - - return readToArrow(dataFileReader, allocator); - } - } diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java index 31066523033..ef62c77da96 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java @@ -20,10 +20,18 @@ import static org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE; import static org.apache.arrow.vector.types.FloatingPointPrecision.SINGLE; -import java.nio.ByteBuffer; +import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.arrow.consumers.AvroBooleanConsumer; +import org.apache.arrow.consumers.AvroBytesConsumer; +import org.apache.arrow.consumers.AvroDoubleConsumer; +import org.apache.arrow.consumers.AvroFloatConsumer; +import org.apache.arrow.consumers.AvroIntConsumer; +import org.apache.arrow.consumers.AvroLongConsumer; +import org.apache.arrow.consumers.AvroStringConsumer; +import org.apache.arrow.consumers.Consumer; import org.apache.arrow.util.Preconditions; import org.apache.arrow.vector.BaseFixedWidthVector; import org.apache.arrow.vector.BigIntVector; @@ -32,25 +40,18 @@ import org.apache.arrow.vector.Float4Vector; import org.apache.arrow.vector.Float8Vector; import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.ValueVector; import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; -import org.apache.arrow.vector.complex.impl.VarBinaryWriterImpl; -import org.apache.arrow.vector.holders.BigIntHolder; -import org.apache.arrow.vector.holders.BitHolder; -import org.apache.arrow.vector.holders.Float4Holder; -import org.apache.arrow.vector.holders.Float8Holder; -import org.apache.arrow.vector.holders.IntHolder; -import org.apache.arrow.vector.holders.VarBinaryHolder; -import org.apache.arrow.vector.holders.VarCharHolder; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.FieldType; import org.apache.avro.Schema; import org.apache.avro.Schema.Type; -import org.apache.avro.file.DataFileReader; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.util.Utf8; +import org.apache.avro.io.Decoder; + +import sun.reflect.generics.reflectiveObjects.NotImplementedException; /** * Class that does most of the work to convert Avro data into Arrow columnar format Vector objects. @@ -74,14 +75,13 @@ public class AvroToArrowUtils { *
  • BYTES --> ArrowType.Binary
  • * */ - private static ArrowType getArrowTypeForAvroField(Schema.Field field) { + private static ArrowType getArrowType(Type type) { - Preconditions.checkNotNull(field, "Avro Field object can't be null"); + Preconditions.checkNotNull(type, "Avro type object can't be null"); - Type avroType = field.schema().getType(); final ArrowType arrowType; - switch (avroType) { + switch (type) { case STRING: arrowType = new ArrowType.Utf8(); break; @@ -105,7 +105,7 @@ private static ArrowType getArrowTypeForAvroField(Schema.Field field) { break; default: // no-op, shouldn't get here - throw new RuntimeException("Can't convert avro type %s to arrow type." + avroType.getName()); + throw new RuntimeException("Can't convert avro type %s to arrow type." + type.getName()); } return arrowType; @@ -115,148 +115,94 @@ private static ArrowType getArrowTypeForAvroField(Schema.Field field) { * Create Arrow {@link org.apache.arrow.vector.types.pojo.Schema} object for the given Avro {@link Schema}. */ public static org.apache.arrow.vector.types.pojo.Schema avroToArrowSchema(Schema schema) { - Preconditions.checkNotNull(schema, "Avro Schema object can't be null"); + Preconditions.checkNotNull(schema, "Avro Schema object can't be null"); List arrowFields = new ArrayList<>(); - for (Schema.Field field : schema.getFields()) { - final ArrowType arrowType = getArrowTypeForAvroField(field); - if (arrowType != null) { - final FieldType fieldType = new FieldType(true, arrowType, /* dictionary encoding */ null, null); - List children = null; - //TODO handle complex type children fields - arrowFields.add(new Field(field.name(), fieldType, children)); - } + + Schema.Type type = schema.getType(); + + if (type == Type.RECORD) { + throw new NotImplementedException(); + } else if (type == Type.MAP) { + throw new NotImplementedException(); + } else if (type == Type.UNION) { + throw new NotImplementedException(); + } else if (type == Type.ARRAY) { + throw new NotImplementedException(); + } else if (type == Type.ENUM) { + throw new NotImplementedException(); + } else if (type == Type.NULL) { + throw new NotImplementedException(); + } else { + final FieldType fieldType = new FieldType(true, getArrowType(type), null, null); + arrowFields.add(new Field("f0", fieldType, null)); } + return new org.apache.arrow.vector.types.pojo.Schema(arrowFields, null); } /** - * Iterate the given Avro {@link DataFileReader} object to fetch the data and transpose it to populate - * the given Arrow Vector objects. - * @param reader avro reader to read data. - * @param root Arrow {@link VectorSchemaRoot} object to populate + * Create consumers to consume avro values from decoder, will reduce boxing/unboxing operations. */ - public static void avroToArrowVectors(DataFileReader reader, VectorSchemaRoot root) { - - Preconditions.checkNotNull(reader, "Avro DataFileReader object can't be null"); - Preconditions.checkNotNull(root, "VectorSchemaRoot object can't be null"); - - allocateVectors(root, DEFAULT_BUFFER_SIZE); - - int rowCount = 0; - while (reader.hasNext()) { - GenericRecord record = reader.next(); - for (Schema.Field field : reader.getSchema().getFields()) { - Object value = record.get(field.name()); - avroToFieldVector( - value, - field.schema().getType(), - rowCount, - root.getVector(field.name())); + public static List createAvroConsumers(VectorSchemaRoot root) { + List consumers = new ArrayList<>(); + + for (ValueVector vector : root.getFieldVectors()) { + Consumer consumer; + switch (vector.getMinorType()) { + case INT: + consumer = new AvroIntConsumer((IntVector) vector); + break; + case VARBINARY: + consumer = new AvroBytesConsumer((VarBinaryVector) vector); + break; + case VARCHAR: + consumer = new AvroStringConsumer((VarCharVector) vector); + break; + case BIGINT: + consumer = new AvroLongConsumer((BigIntVector) vector); + break; + case FLOAT4: + consumer = new AvroFloatConsumer((Float4Vector) vector); + break; + case FLOAT8: + consumer = new AvroDoubleConsumer((Float8Vector) vector); + break; + case BIT: + consumer = new AvroBooleanConsumer((BitVector) vector); + break; + default: + throw new RuntimeException("could not get consumer from type:" + vector.getMinorType()); } - rowCount++; + consumers.add(consumer); } - root.setRowCount(rowCount); + return consumers; } /** - * Put the value into the vector at specific position. + * Iterate the given Avro {@link Decoder} object to fetch the data and transpose it to populate + * the given Arrow Vector objects. + * @param decoder avro decoder to read data. + * @param root Arrow {@link VectorSchemaRoot} object to populate */ - public static void avroToFieldVector(Object value, Type avroType, int rowCount, FieldVector vector) { - switch (avroType) { - case STRING: - updateVector((VarCharVector) vector, value, rowCount); - break; - case INT: - updateVector((IntVector) vector, value, rowCount); - break; - case BOOLEAN: - updateVector((BitVector)vector, value, rowCount); - break; - case LONG: - updateVector((BigIntVector) vector, value, rowCount); - break; - case FLOAT: - updateVector((Float4Vector) vector, value, rowCount); - break; - case DOUBLE: - updateVector((Float8Vector) vector, value, rowCount); - break; - case BYTES: - updateVector((VarBinaryVector) vector, value, rowCount); - break; - default: - // no-op, shouldn't get here - throw new RuntimeException("Can't convert avro type %s to arrow type." + avroType.getName()); - } - } - - private static void updateVector(VarBinaryVector varBinaryVector, Object value, int rowCount) { - VarBinaryHolder holder = new VarBinaryHolder(); - Preconditions.checkNotNull(value, "value should not be null"); - VarBinaryWriterImpl writer = new VarBinaryWriterImpl(varBinaryVector); - - byte[] bytes = ((ByteBuffer) value).array(); - holder.buffer = varBinaryVector.getAllocator().buffer(bytes.length); - holder.buffer.setBytes(0, bytes, 0, bytes.length); - holder.start = 0; - holder.end = bytes.length; + public static void avroToArrowVectors(Decoder decoder, VectorSchemaRoot root) { - writer.setPosition(rowCount); - writer.write(holder); - } - - private static void updateVector(Float4Vector float4Vector, Object value, int rowCount) { - Float4Holder holder = new Float4Holder(); - Preconditions.checkNotNull(value, "value should not be null"); - holder.value = (float) value; - float4Vector.setSafe(rowCount, holder); - float4Vector.setValueCount(rowCount + 1); - } - - private static void updateVector(Float8Vector float8Vector, Object value, int rowCount) { - Float8Holder holder = new Float8Holder(); - Preconditions.checkNotNull(value, "value should not be null"); - holder.value = (double) value; - float8Vector.setSafe(rowCount, holder); - float8Vector.setValueCount(rowCount + 1); - } - - private static void updateVector(BigIntVector bigIntVector, Object value, int rowCount) { - BigIntHolder holder = new BigIntHolder(); - Preconditions.checkNotNull(value, "value should not be null"); - holder.value = (long) value; - bigIntVector.setSafe(rowCount, holder); - bigIntVector.setValueCount(rowCount + 1); - } - - private static void updateVector(BitVector bitVector, Object value, int rowCount) { - BitHolder holder = new BitHolder(); - Preconditions.checkNotNull(value, "value should not be null"); - holder.value = (boolean) value ? 1 : 0; - bitVector.setSafe(rowCount, holder); - bitVector.setValueCount(rowCount + 1); - } + Preconditions.checkNotNull(decoder, "Avro decoder object can't be null"); + Preconditions.checkNotNull(root, "VectorSchemaRoot object can't be null"); - private static void updateVector(IntVector intVector, Object value, int rowCount) { - IntHolder holder = new IntHolder(); - Preconditions.checkNotNull(value, "value should not be null"); - holder.value = (int) value; - intVector.setSafe(rowCount, holder); - intVector.setValueCount(rowCount + 1); - } + allocateVectors(root, DEFAULT_BUFFER_SIZE); - private static void updateVector(VarCharVector varcharVector, Object value, int rowCount) { - VarCharHolder holder = new VarCharHolder(); - Preconditions.checkNotNull(value, "value should not be null"); - varcharVector.setIndexDefined(rowCount); - byte[] bytes = ((Utf8)value).getBytes(); - holder.buffer = varcharVector.getAllocator().buffer(bytes.length); - holder.buffer.setBytes(0, bytes, 0, bytes.length); - holder.start = 0; - holder.end = bytes.length; - varcharVector.setSafe(rowCount, holder); - varcharVector.setValueCount(rowCount + 1); + List consumers = createAvroConsumers(root); + while (true) { + try { + for (Consumer consumer : consumers) { + consumer.consume(decoder); + } + //reach end will throw EOFException. + } catch (IOException eofException) { + break; + } + } } private static void allocateVectors(VectorSchemaRoot root, int size) { diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java new file mode 100644 index 00000000000..7bbfac1a230 --- /dev/null +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.complex.impl.BitWriterImpl; +import org.apache.arrow.vector.complex.writer.BitWriter; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume boolean type values from avro decoder. + * Write the data to {@link BitVector}. + */ +public class AvroBooleanConsumer implements Consumer { + + private final BitWriter writer; + + public AvroBooleanConsumer(BitVector vector) { + this.writer = new BitWriterImpl(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + writer.writeBit(decoder.readBoolean() ? 1 : 0); + writer.setPosition(writer.getPosition() + 1); + } +} diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java new file mode 100644 index 00000000000..9c3eff70d73 --- /dev/null +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.complex.impl.VarBinaryWriterImpl; +import org.apache.arrow.vector.complex.writer.VarBinaryWriter; +import org.apache.arrow.vector.holders.VarBinaryHolder; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume bytes type values from avro decoder. + * Write the data to {@link VarBinaryVector}. + */ +public class AvroBytesConsumer implements Consumer { + + private final VarBinaryWriter writer; + private final VarBinaryVector vector; + + public AvroBytesConsumer(VarBinaryVector vector) { + this.vector = vector; + this.writer = new VarBinaryWriterImpl(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + VarBinaryHolder holder = new VarBinaryHolder(); + ByteBuffer byteBuffer = decoder.readBytes(null); + + holder.start = 0; + holder.end = byteBuffer.capacity(); + holder.buffer = vector.getAllocator().buffer(byteBuffer.capacity()); + holder.buffer.setBytes(0, byteBuffer); + + writer.write(holder); + writer.setPosition(writer.getPosition() + 1); + } +} diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java new file mode 100644 index 00000000000..62dc315084f --- /dev/null +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.complex.impl.Float8WriterImpl; +import org.apache.arrow.vector.complex.writer.Float8Writer; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume double type values from avro decoder. + * Write the data to {@link Float8Vector}. + */ +public class AvroDoubleConsumer implements Consumer { + + private final Float8Writer writer; + + public AvroDoubleConsumer(Float8Vector vector) { + this.writer = new Float8WriterImpl(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + writer.writeFloat8(decoder.readDouble()); + writer.setPosition(writer.getPosition() + 1); + } +} diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java new file mode 100644 index 00000000000..2bec2b2d090 --- /dev/null +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.complex.impl.Float4WriterImpl; +import org.apache.arrow.vector.complex.writer.Float4Writer; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume float type values from avro decoder. + * Write the data to {@link Float4Vector}. + */ +public class AvroFloatConsumer implements Consumer { + + private final Float4Writer writer; + + public AvroFloatConsumer(Float4Vector vector) { + this.writer = new Float4WriterImpl(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + writer.writeFloat4(decoder.readFloat()); + writer.setPosition(writer.getPosition() + 1); + } +} diff --git a/java/adapter/avro/src/test/resources/schema/test_double.avsc b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java similarity index 52% rename from java/adapter/avro/src/test/resources/schema/test_double.avsc rename to java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java index 00cbf533165..60285f06af4 100644 --- a/java/adapter/avro/src/test/resources/schema/test_double.avsc +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java @@ -15,10 +15,30 @@ * limitations under the License. */ -{"namespace": "org.apache.arrow.avro", - "type": "record", - "name": "TestDouble", - "fields": [ - {"name": "f0", "type": "double"} - ] +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.complex.impl.IntWriterImpl; +import org.apache.arrow.vector.complex.writer.IntWriter; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume int type values from avro decoder. + * Write the data to {@link IntVector}. + */ +public class AvroIntConsumer implements Consumer { + + private final IntWriter writer; + + public AvroIntConsumer(IntVector vector) { + this.writer = new IntWriterImpl(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + writer.writeInt(decoder.readInt()); + writer.setPosition(writer.getPosition() + 1); + } } diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java new file mode 100644 index 00000000000..15756afd69f --- /dev/null +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.complex.impl.BigIntWriterImpl; +import org.apache.arrow.vector.complex.writer.BigIntWriter; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume long type values from avro decoder. + * Write the data to {@link BigIntVector}. + */ +public class AvroLongConsumer implements Consumer { + + private final BigIntWriter writer; + + public AvroLongConsumer(BigIntVector vector) { + this.writer = new BigIntWriterImpl(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + writer.writeBigInt(decoder.readLong()); + writer.setPosition(writer.getPosition() + 1); + } +} diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java new file mode 100644 index 00000000000..154e47b2c3d --- /dev/null +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.complex.impl.VarCharWriterImpl; +import org.apache.arrow.vector.complex.writer.VarCharWriter; +import org.apache.arrow.vector.holders.VarCharHolder; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume string type values from avro decoder. + * Write the data to {@link VarCharVector}. + */ +public class AvroStringConsumer implements Consumer { + + private final VarCharVector vector; + private final VarCharWriter writer; + + public AvroStringConsumer(VarCharVector vector) { + this.vector = vector; + this.writer = new VarCharWriterImpl(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + VarCharHolder holder = new VarCharHolder(); + byte[] value = decoder.readString().getBytes(StandardCharsets.UTF_8); + + holder.start = 0; + holder.end = value.length; + holder.buffer = vector.getAllocator().buffer(value.length); + holder.buffer.setBytes(0, value); + + writer.write(holder); + writer.setPosition(writer.getPosition() + 1); + } +} diff --git a/java/adapter/avro/src/test/resources/schema/test_boolean.avsc b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java similarity index 75% rename from java/adapter/avro/src/test/resources/schema/test_boolean.avsc rename to java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java index 604a790e73d..b3c52818491 100644 --- a/java/adapter/avro/src/test/resources/schema/test_boolean.avsc +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java @@ -15,10 +15,16 @@ * limitations under the License. */ -{"namespace": "org.apache.arrow.avro", - "type": "record", - "name": "TestBoolean", - "fields": [ - {"name": "f0", "type": "boolean"} - ] +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.avro.io.Decoder; + +/** + * An abstraction that is used to consume values from avro decoder. + */ +public interface Consumer { + + void consume(Decoder decoder) throws IOException; } diff --git a/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java b/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java index 683e2c92eb4..c88fa8e2ba1 100644 --- a/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java +++ b/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java @@ -21,26 +21,28 @@ import static org.junit.Assert.assertTrue; import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.nio.file.Paths; import java.util.ArrayList; -import java.util.List; +import java.util.Arrays; +import java.util.Objects; import org.apache.arrow.memory.BaseAllocator; import org.apache.arrow.memory.RootAllocator; import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.avro.Schema; -import org.apache.avro.file.DataFileReader; -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.DatumReader; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.EncoderFactory; import org.junit.Before; import org.junit.ClassRule; import org.junit.Test; @@ -61,274 +63,186 @@ public void init() { @Test public void testStringType() throws Exception { File dataFile = TMP.newFile(); - Path schemaPath = Paths - .get(TestWriteReadAvroRecord.class.getResource("/").getPath(), "schema", "test_string.avsc"); + Path schemaPath = Paths.get(TestWriteReadAvroRecord.class.getResource("/").getPath(), + "schema", "test_primitive_string.avsc"); Schema schema = new Schema.Parser().parse(schemaPath.toFile()); - //write data to disk - DatumWriter datumWriter = new GenericDatumWriter(schema); - DataFileWriter dataFileWriter = new DataFileWriter(datumWriter); - dataFileWriter.create(schema, dataFile); - - List data = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - GenericRecord record = new GenericData.Record(schema); - record.put("f0", "value" + i); - dataFileWriter.append(record); - data.add(record); - } - dataFileWriter.close(); + BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(new FileOutputStream(dataFile), null); + DatumWriter writer = new GenericDatumWriter(schema); + BinaryDecoder decoder = new DecoderFactory().directBinaryDecoder(new FileInputStream(dataFile), null); + GenericDatumReader reader = new GenericDatumReader(schema); - DatumReader datumReader = new GenericDatumReader(schema); - DataFileReader - dataFileReader = new DataFileReader(dataFile, datumReader); + ArrayList data = new ArrayList(Arrays.asList("v1", "v2", "v3", "v4", "v5")); + for (String value : data) { + writer.write(value, encoder); + } - VectorSchemaRoot root = AvroToArrow.readToArrow(dataFileReader, allocator); + VectorSchemaRoot root = AvroToArrow.avroToArrow(reader, decoder, allocator); + FieldVector vector = root.getFieldVectors().get(0); - checkResult(schema, data, root); + checkPrimitiveResult(schema, data, vector); } @Test public void testIntType() throws Exception { - File dataFile = TMP.newFile(); - Path schemaPath = Paths - .get(TestWriteReadAvroRecord.class.getResource("/").getPath(), "schema", "test_int.avsc"); - Schema schema = new Schema.Parser().parse(schemaPath.toFile()); - - //write data to disk - DatumWriter datumWriter = new GenericDatumWriter(schema); - DataFileWriter dataFileWriter = new DataFileWriter(datumWriter); - dataFileWriter.create(schema, dataFile); - - List data = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - GenericRecord record = new GenericData.Record(schema); - record.put("f0", i); - dataFileWriter.append(record); - data.add(record); - } - dataFileWriter.close(); - - DatumReader datumReader = new GenericDatumReader(schema); - DataFileReader - dataFileReader = new DataFileReader(dataFile, datumReader); - - VectorSchemaRoot root = AvroToArrow.readToArrow(dataFileReader, allocator); - checkResult(schema, data, root); - } - - @Test - public void testBooleanType() throws Exception { File dataFile = TMP.newFile(); - Path schemaPath = Paths - .get(TestWriteReadAvroRecord.class.getResource("/").getPath(), "schema", "test_boolean.avsc"); + Path schemaPath = Paths.get(TestWriteReadAvroRecord.class.getResource("/").getPath(), + "schema", "test_primitive_int.avsc"); Schema schema = new Schema.Parser().parse(schemaPath.toFile()); - //write data to disk - DatumWriter datumWriter = new GenericDatumWriter(schema); - DataFileWriter dataFileWriter = new DataFileWriter(datumWriter); - dataFileWriter.create(schema, dataFile); - - List data = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - GenericRecord record = new GenericData.Record(schema); - record.put("f0", true); - dataFileWriter.append(record); - data.add(record); - } - dataFileWriter.close(); + BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(new FileOutputStream(dataFile), null); + DatumWriter writer = new GenericDatumWriter(schema); + BinaryDecoder decoder = new DecoderFactory().directBinaryDecoder(new FileInputStream(dataFile), null); + GenericDatumReader reader = new GenericDatumReader(schema); - DatumReader datumReader = new GenericDatumReader(schema); - DataFileReader - dataFileReader = new DataFileReader(dataFile, datumReader); + ArrayList data = new ArrayList(Arrays.asList(1, 2, 3, 4, 5)); + for (int value : data) { + writer.write(value, encoder); + } - VectorSchemaRoot root = AvroToArrow.readToArrow(dataFileReader, allocator); + VectorSchemaRoot root = AvroToArrow.avroToArrow(reader, decoder, allocator); + FieldVector vector = root.getFieldVectors().get(0); - checkResult(schema, data, root); + checkPrimitiveResult(schema, data, vector); } @Test public void testLongType() throws Exception { + File dataFile = TMP.newFile(); - Path schemaPath = Paths - .get(TestWriteReadAvroRecord.class.getResource("/").getPath(), "schema", "test_long.avsc"); + Path schemaPath = Paths.get(TestWriteReadAvroRecord.class.getResource("/").getPath(), + "schema", "test_primitive_long.avsc"); Schema schema = new Schema.Parser().parse(schemaPath.toFile()); - //write data to disk - DatumWriter datumWriter = new GenericDatumWriter(schema); - DataFileWriter dataFileWriter = new DataFileWriter(datumWriter); - dataFileWriter.create(schema, dataFile); - - List data = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - GenericRecord record = new GenericData.Record(schema); - record.put("f0", (long) i); - dataFileWriter.append(record); - data.add(record); - } - dataFileWriter.close(); + BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(new FileOutputStream(dataFile), null); + DatumWriter writer = new GenericDatumWriter(schema); + BinaryDecoder decoder = new DecoderFactory().directBinaryDecoder(new FileInputStream(dataFile), null); + GenericDatumReader reader = new GenericDatumReader(schema); - DatumReader datumReader = new GenericDatumReader(schema); - DataFileReader - dataFileReader = new DataFileReader(dataFile, datumReader); + ArrayList data = new ArrayList(Arrays.asList(1L, 2L, 3L, 4L, 5L)); + for (long value : data) { + writer.write(value, encoder); + } - VectorSchemaRoot root = AvroToArrow.readToArrow(dataFileReader, allocator); + VectorSchemaRoot root = AvroToArrow.avroToArrow(reader, decoder, allocator); + FieldVector vector = root.getFieldVectors().get(0); - checkResult(schema, data, root); + checkPrimitiveResult(schema, data, vector); } @Test public void testFloatType() throws Exception { + File dataFile = TMP.newFile(); - Path schemaPath = Paths - .get(TestWriteReadAvroRecord.class.getResource("/").getPath(), "schema", "test_float.avsc"); + Path schemaPath = Paths.get(TestWriteReadAvroRecord.class.getResource("/").getPath(), + "schema", "test_primitive_float.avsc"); Schema schema = new Schema.Parser().parse(schemaPath.toFile()); - //write data to disk - DatumWriter datumWriter = new GenericDatumWriter(schema); - DataFileWriter dataFileWriter = new DataFileWriter(datumWriter); - dataFileWriter.create(schema, dataFile); - - float suffix = 0.33f; - List data = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - GenericRecord record = new GenericData.Record(schema); - record.put("f0", i + suffix); - dataFileWriter.append(record); - data.add(record); - } - dataFileWriter.close(); + BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(new FileOutputStream(dataFile), null); + DatumWriter writer = new GenericDatumWriter(schema); + BinaryDecoder decoder = new DecoderFactory().directBinaryDecoder(new FileInputStream(dataFile), null); + GenericDatumReader reader = new GenericDatumReader(schema); - DatumReader datumReader = new GenericDatumReader(schema); - DataFileReader - dataFileReader = new DataFileReader(dataFile, datumReader); + ArrayList data = new ArrayList(Arrays.asList(1.1f, 2.2f, 3.3f, 4.4f, 5.5f)); + for (float value : data) { + writer.write(value, encoder); + } - VectorSchemaRoot root = AvroToArrow.readToArrow(dataFileReader, allocator); + VectorSchemaRoot root = AvroToArrow.avroToArrow(reader, decoder, allocator); + FieldVector vector = root.getFieldVectors().get(0); - checkResult(schema, data, root); + checkPrimitiveResult(schema, data, vector); } @Test public void testDoubleType() throws Exception { + File dataFile = TMP.newFile(); - Path schemaPath = Paths - .get(TestWriteReadAvroRecord.class.getResource("/").getPath(), "schema", "test_double.avsc"); + Path schemaPath = Paths.get(TestWriteReadAvroRecord.class.getResource("/").getPath(), + "schema", "test_primitive_double.avsc"); Schema schema = new Schema.Parser().parse(schemaPath.toFile()); - //write data to disk - DatumWriter datumWriter = new GenericDatumWriter(schema); - DataFileWriter dataFileWriter = new DataFileWriter(datumWriter); - dataFileWriter.create(schema, dataFile); - - double suffix = 0.33; - List data = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - GenericRecord record = new GenericData.Record(schema); - record.put("f0", i + suffix); - dataFileWriter.append(record); - data.add(record); - } - dataFileWriter.close(); + BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(new FileOutputStream(dataFile), null); + DatumWriter writer = new GenericDatumWriter(schema); + BinaryDecoder decoder = new DecoderFactory().directBinaryDecoder(new FileInputStream(dataFile), null); + GenericDatumReader reader = new GenericDatumReader(schema); - DatumReader datumReader = new GenericDatumReader(schema); - DataFileReader - dataFileReader = new DataFileReader(dataFile, datumReader); + ArrayList data = new ArrayList(Arrays.asList(1.1, 2.2, 3.3, 4.4, 5.5)); + for (double value : data) { + writer.write(value, encoder); + } - VectorSchemaRoot root = AvroToArrow.readToArrow(dataFileReader, allocator); + VectorSchemaRoot root = AvroToArrow.avroToArrow(reader, decoder, allocator); + FieldVector vector = root.getFieldVectors().get(0); - checkResult(schema, data, root); + checkPrimitiveResult(schema, data, vector); } @Test - public void testMixedType() throws Exception { + public void testBytesType() throws Exception { File dataFile = TMP.newFile(); - Path schemaPath = Paths - .get(TestWriteReadAvroRecord.class.getResource("/").getPath(), "schema", "test_mixed_primitive.avsc"); + Path schemaPath = Paths.get(TestWriteReadAvroRecord.class.getResource("/").getPath(), + "schema", "test_primitive_bytes.avsc"); Schema schema = new Schema.Parser().parse(schemaPath.toFile()); - //write data to disk - DatumWriter datumWriter = new GenericDatumWriter(schema); - DataFileWriter dataFileWriter = new DataFileWriter(datumWriter); - dataFileWriter.create(schema, dataFile); - - double suffix = 0.33; - List data = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - GenericRecord record = new GenericData.Record(schema); - record.put("f0", "test" + i); - record.put("f1", i); - record.put("f2", i + suffix); - dataFileWriter.append(record); - data.add(record); + BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(new FileOutputStream(dataFile), null); + DatumWriter writer = new GenericDatumWriter(schema); + BinaryDecoder decoder = new DecoderFactory().directBinaryDecoder(new FileInputStream(dataFile), null); + GenericDatumReader reader = new GenericDatumReader(schema); + + ArrayList data = new ArrayList(Arrays.asList( + "value1".getBytes(StandardCharsets.UTF_8), + "value2".getBytes(StandardCharsets.UTF_8), + "value3".getBytes(StandardCharsets.UTF_8), + "value4".getBytes(StandardCharsets.UTF_8), + "value5".getBytes(StandardCharsets.UTF_8))); + for (byte[] value : data) { + writer.write(ByteBuffer.wrap(value), encoder); } - dataFileWriter.close(); - DatumReader datumReader = new GenericDatumReader(schema); - DataFileReader - dataFileReader = new DataFileReader(dataFile, datumReader); + VectorSchemaRoot root = AvroToArrow.avroToArrow(reader, decoder, allocator); + FieldVector vector = root.getFieldVectors().get(0); - VectorSchemaRoot root = AvroToArrow.readToArrow(dataFileReader, allocator); - - checkResult(schema, data, root); + checkPrimitiveResult(schema, data, vector); } @Test - public void testBytesType() throws Exception { + public void testBooleanType() throws Exception { + File dataFile = TMP.newFile(); - Path schemaPath = Paths - .get(TestWriteReadAvroRecord.class.getResource("/").getPath(), "schema", "test_bytes.avsc"); + Path schemaPath = Paths.get(TestWriteReadAvroRecord.class.getResource("/").getPath(), + "schema", "test_primitive_boolean.avsc"); Schema schema = new Schema.Parser().parse(schemaPath.toFile()); - //write data to disk - DatumWriter datumWriter = new GenericDatumWriter(schema); - DataFileWriter dataFileWriter = new DataFileWriter(datumWriter); - dataFileWriter.create(schema, dataFile); - - List data = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - GenericRecord record = new GenericData.Record(schema); - ByteBuffer value = ByteBuffer.wrap(("test" + i).getBytes(StandardCharsets.UTF_8)); - record.put("f0", value); - dataFileWriter.append(record); - data.add(record); - } - dataFileWriter.close(); + BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(new FileOutputStream(dataFile), null); + DatumWriter writer = new GenericDatumWriter(schema); + BinaryDecoder decoder = new DecoderFactory().directBinaryDecoder(new FileInputStream(dataFile), null); + GenericDatumReader reader = new GenericDatumReader(schema); - DatumReader datumReader = new GenericDatumReader(schema); - DataFileReader - dataFileReader = new DataFileReader(dataFile, datumReader); + ArrayList data = new ArrayList(Arrays.asList(true, false, true, false, true)); + for (boolean value : data) { + writer.write(value, encoder); + } - VectorSchemaRoot root = AvroToArrow.readToArrow(dataFileReader, allocator); + VectorSchemaRoot root = AvroToArrow.avroToArrow(reader, decoder, allocator); + FieldVector vector = root.getFieldVectors().get(0); - checkResult(schema, data, root); + checkPrimitiveResult(schema, data, vector); } - private void checkResult(Schema schema, List rawRecords, VectorSchemaRoot vectorSchemaRoot) { - List fields = schema.getFields(); - - for (int i = 0; i < rawRecords.size(); i++) { - GenericRecord record = rawRecords.get(i); - for (Schema.Field field: fields) { - Object value = record.get(field.name()); - FieldVector vector = vectorSchemaRoot.getVector(field.name()); - if (value == null) { - assertTrue(vector.isNull(i)); - } else { - Object value1 = value; - Object value2 = vector.getObject(i); - if (field.schema().getType() == Schema.Type.STRING) { - value1 = value1.toString(); - value2 = value2.toString(); - } else if (field.schema().getType() == Schema.Type.BYTES) { - //convert to ByteBuffer for equals check - value2 = ByteBuffer.wrap((byte[]) value2); - } - assertEquals(value1, value2); - } + private void checkPrimitiveResult(Schema schema, ArrayList data, FieldVector vector) { + assertEquals(data.size(), vector.getValueCount()); + for (int i = 0; i < data.size(); i++) { + Object value1 = data.get(i); + Object value2 = vector.getObject(i); + if (schema.getType() == Schema.Type.BYTES) { + value1 = ByteBuffer.wrap((byte[]) value1); + value2 = ByteBuffer.wrap((byte[]) value2); + } else if (schema.getType() == Schema.Type.STRING) { + value2 = value2.toString(); } + assertTrue(Objects.equals(value1, value2)); } - } - } diff --git a/java/adapter/avro/src/test/resources/schema/test_mixed_primitive.avsc b/java/adapter/avro/src/test/resources/schema/test_mixed_primitive.avsc deleted file mode 100644 index 7b375e3bc76..00000000000 --- a/java/adapter/avro/src/test/resources/schema/test_mixed_primitive.avsc +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -{"namespace": "org.apache.arrow.avro", - "type": "record", - "name": "TestMixed", - "fields": [ - {"name": "f0", "type": "string"}, - {"name": "f1", "type": "int"}, - {"name": "f2", "type": "double"} - ] -} diff --git a/java/adapter/avro/src/test/resources/schema/test_int.avsc b/java/adapter/avro/src/test/resources/schema/test_primitive_boolean.avsc similarity index 86% rename from java/adapter/avro/src/test/resources/schema/test_int.avsc rename to java/adapter/avro/src/test/resources/schema/test_primitive_boolean.avsc index 4ffe8f50cfb..7652ce72385 100644 --- a/java/adapter/avro/src/test/resources/schema/test_int.avsc +++ b/java/adapter/avro/src/test/resources/schema/test_primitive_boolean.avsc @@ -15,10 +15,8 @@ * limitations under the License. */ -{"namespace": "org.apache.arrow.avro", - "type": "record", - "name": "TestInt", - "fields": [ - {"name": "f0", "type": "int"} - ] +{ + "namespace": "org.apache.arrow.avro", + "type": "boolean", + "name": "TestBoolean" } diff --git a/java/adapter/avro/src/test/resources/schema/test_long.avsc b/java/adapter/avro/src/test/resources/schema/test_primitive_bytes.avsc similarity index 85% rename from java/adapter/avro/src/test/resources/schema/test_long.avsc rename to java/adapter/avro/src/test/resources/schema/test_primitive_bytes.avsc index de3e4be137f..5102430b65a 100644 --- a/java/adapter/avro/src/test/resources/schema/test_long.avsc +++ b/java/adapter/avro/src/test/resources/schema/test_primitive_bytes.avsc @@ -15,10 +15,8 @@ * limitations under the License. */ -{"namespace": "org.apache.arrow.avro", - "type": "record", - "name": "TestLong", - "fields": [ - {"name": "f0", "type": "long"} - ] +{ + "namespace": "org.apache.arrow.avro", + "type": "bytes", + "name": "TestBytes" } diff --git a/java/adapter/avro/src/test/resources/schema/test_bytes.avsc b/java/adapter/avro/src/test/resources/schema/test_primitive_double.avsc similarity index 85% rename from java/adapter/avro/src/test/resources/schema/test_bytes.avsc rename to java/adapter/avro/src/test/resources/schema/test_primitive_double.avsc index bd699926714..d1ae0b605a9 100644 --- a/java/adapter/avro/src/test/resources/schema/test_bytes.avsc +++ b/java/adapter/avro/src/test/resources/schema/test_primitive_double.avsc @@ -15,10 +15,8 @@ * limitations under the License. */ -{"namespace": "org.apache.arrow.avro", - "type": "record", - "name": "TestBytes", - "fields": [ - {"name": "f0", "type": "bytes"} - ] +{ + "namespace": "org.apache.arrow.avro", + "type": "double", + "name": "TestDouble" } diff --git a/java/adapter/avro/src/test/resources/schema/test_float.avsc b/java/adapter/avro/src/test/resources/schema/test_primitive_float.avsc similarity index 85% rename from java/adapter/avro/src/test/resources/schema/test_float.avsc rename to java/adapter/avro/src/test/resources/schema/test_primitive_float.avsc index 9e8ffe0a4d3..675d1090d86 100644 --- a/java/adapter/avro/src/test/resources/schema/test_float.avsc +++ b/java/adapter/avro/src/test/resources/schema/test_primitive_float.avsc @@ -15,10 +15,8 @@ * limitations under the License. */ -{"namespace": "org.apache.arrow.avro", - "type": "record", - "name": "TestFloat", - "fields": [ - {"name": "f0", "type": "float"} - ] +{ + "namespace": "org.apache.arrow.avro", + "type": "float", + "name": "TestFloat" } diff --git a/java/adapter/avro/src/test/resources/schema/test_primitive_int.avsc b/java/adapter/avro/src/test/resources/schema/test_primitive_int.avsc new file mode 100644 index 00000000000..8fc8488281a --- /dev/null +++ b/java/adapter/avro/src/test/resources/schema/test_primitive_int.avsc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "int", + "name": "TestInt" +} diff --git a/java/adapter/avro/src/test/resources/schema/test_primitive_long.avsc b/java/adapter/avro/src/test/resources/schema/test_primitive_long.avsc new file mode 100644 index 00000000000..b9706107c09 --- /dev/null +++ b/java/adapter/avro/src/test/resources/schema/test_primitive_long.avsc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "long", + "name": "TestLong" +} diff --git a/java/adapter/avro/src/test/resources/schema/test_primitive_string.avsc b/java/adapter/avro/src/test/resources/schema/test_primitive_string.avsc new file mode 100644 index 00000000000..b4a89a7f62c --- /dev/null +++ b/java/adapter/avro/src/test/resources/schema/test_primitive_string.avsc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "string", + "name": "TestString" +} diff --git a/java/adapter/avro/src/test/resources/schema/test_string.avsc b/java/adapter/avro/src/test/resources/schema/test_string.avsc deleted file mode 100644 index 52be1475753..00000000000 --- a/java/adapter/avro/src/test/resources/schema/test_string.avsc +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -{"namespace": "org.apache.arrow.avro", - "type": "record", - "name": "TestString", - "fields": [ - {"name": "f0", "type": "string"} - ] -} From fa3f39afb456a08d34c67ecf6e7eaa1601cd6174 Mon Sep 17 00:00:00 2001 From: tianchen Date: Mon, 15 Jul 2019 12:52:30 +0800 Subject: [PATCH 4/5] resolve comments --- .../java/org/apache/arrow/AvroToArrow.java | 12 +- .../org/apache/arrow/AvroToArrowUtils.java | 51 +++---- .../arrow/consumers/AvroStringConsumer.java | 10 +- .../org/apache/arrow/AvroToArrowTest.java | 144 +++++------------- .../avro/src/test/resources/schema/test.avsc | 3 +- 5 files changed, 74 insertions(+), 146 deletions(-) diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java index fc49b073e5a..4801d690125 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java @@ -17,9 +17,12 @@ package org.apache.arrow; +import java.io.IOException; + import org.apache.arrow.memory.BaseAllocator; import org.apache.arrow.util.Preconditions; import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.avro.Schema; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.io.Decoder; @@ -30,15 +33,16 @@ public class AvroToArrow { /** * Fetch the data from {@link GenericDatumReader} and convert it to Arrow objects. - * @param reader avro datum reader. + * @param schema avro schema. * @param allocator Memory allocator to use. * @return Arrow Data Objects {@link VectorSchemaRoot} */ - public static VectorSchemaRoot avroToArrow(GenericDatumReader reader, Decoder decoder, BaseAllocator allocator) { - Preconditions.checkNotNull(reader, "Avro reader object can not be null"); + public static VectorSchemaRoot avroToArrow(Schema schema, Decoder decoder, BaseAllocator allocator) + throws IOException { + Preconditions.checkNotNull(schema, "Avro schema object can not be null"); VectorSchemaRoot root = VectorSchemaRoot.create( - AvroToArrowUtils.avroToArrowSchema(reader.getSchema()), allocator); + AvroToArrowUtils.avroToArrowSchema(schema), allocator); AvroToArrowUtils.avroToArrowVectors(decoder, root); return root; } diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java index ef62c77da96..184d09ae3ff 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java @@ -20,9 +20,12 @@ import static org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE; import static org.apache.arrow.vector.types.FloatingPointPrecision.SINGLE; +import java.io.EOFException; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.arrow.consumers.AvroBooleanConsumer; import org.apache.arrow.consumers.AvroBytesConsumer; @@ -40,7 +43,6 @@ import org.apache.arrow.vector.Float4Vector; import org.apache.arrow.vector.Float8Vector; import org.apache.arrow.vector.IntVector; -import org.apache.arrow.vector.ValueVector; import org.apache.arrow.vector.VarBinaryVector; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.VectorSchemaRoot; @@ -79,36 +81,25 @@ private static ArrowType getArrowType(Type type) { Preconditions.checkNotNull(type, "Avro type object can't be null"); - final ArrowType arrowType; - switch (type) { case STRING: - arrowType = new ArrowType.Utf8(); - break; + return new ArrowType.Utf8(); case INT: - arrowType = new ArrowType.Int(32, /*signed=*/true); - break; + return new ArrowType.Int(32, /*signed=*/true); case BOOLEAN: - arrowType = new ArrowType.Bool(); - break; + return new ArrowType.Bool(); case LONG: - arrowType = new ArrowType.Int(64, /*signed=*/true); - break; + return new ArrowType.Int(64, /*signed=*/true); case FLOAT: - arrowType = new ArrowType.FloatingPoint(SINGLE); - break; + return new ArrowType.FloatingPoint(SINGLE); case DOUBLE: - arrowType = new ArrowType.FloatingPoint(DOUBLE); - break; + return new ArrowType.FloatingPoint(DOUBLE); case BYTES: - arrowType = new ArrowType.Binary(); - break; + return new ArrowType.Binary(); default: // no-op, shouldn't get here throw new RuntimeException("Can't convert avro type %s to arrow type." + type.getName()); } - - return arrowType; } /** @@ -120,6 +111,8 @@ public static org.apache.arrow.vector.types.pojo.Schema avroToArrowSchema(Schema List arrowFields = new ArrayList<>(); Schema.Type type = schema.getType(); + final Map metadata = new HashMap<>(); + schema.getObjectProps().forEach((k,v) -> metadata.put(k, v.toString())); if (type == Type.RECORD) { throw new NotImplementedException(); @@ -135,19 +128,20 @@ public static org.apache.arrow.vector.types.pojo.Schema avroToArrowSchema(Schema throw new NotImplementedException(); } else { final FieldType fieldType = new FieldType(true, getArrowType(type), null, null); - arrowFields.add(new Field("f0", fieldType, null)); + arrowFields.add(new Field("", fieldType, null)); } - return new org.apache.arrow.vector.types.pojo.Schema(arrowFields, null); + return new org.apache.arrow.vector.types.pojo.Schema(arrowFields, /*metadata=*/ metadata); } /** * Create consumers to consume avro values from decoder, will reduce boxing/unboxing operations. */ - public static List createAvroConsumers(VectorSchemaRoot root) { - List consumers = new ArrayList<>(); + public static Consumer[] createAvroConsumers(VectorSchemaRoot root) { - for (ValueVector vector : root.getFieldVectors()) { + Consumer[] consumers = new Consumer[root.getFieldVectors().size()]; + for (int i = 0; i < root.getFieldVectors().size(); i++) { + FieldVector vector = root.getFieldVectors().get(i); Consumer consumer; switch (vector.getMinorType()) { case INT: @@ -174,7 +168,7 @@ public static List createAvroConsumers(VectorSchemaRoot root) { default: throw new RuntimeException("could not get consumer from type:" + vector.getMinorType()); } - consumers.add(consumer); + consumers[i] = consumer; } return consumers; } @@ -185,21 +179,20 @@ public static List createAvroConsumers(VectorSchemaRoot root) { * @param decoder avro decoder to read data. * @param root Arrow {@link VectorSchemaRoot} object to populate */ - public static void avroToArrowVectors(Decoder decoder, VectorSchemaRoot root) { + public static void avroToArrowVectors(Decoder decoder, VectorSchemaRoot root) throws IOException { Preconditions.checkNotNull(decoder, "Avro decoder object can't be null"); Preconditions.checkNotNull(root, "VectorSchemaRoot object can't be null"); allocateVectors(root, DEFAULT_BUFFER_SIZE); - - List consumers = createAvroConsumers(root); + Consumer[] consumers = createAvroConsumers(root); while (true) { try { for (Consumer consumer : consumers) { consumer.consume(decoder); } //reach end will throw EOFException. - } catch (IOException eofException) { + } catch (EOFException eofException) { break; } } diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java index 154e47b2c3d..db438f96e91 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java @@ -18,7 +18,7 @@ package org.apache.arrow.consumers; import java.io.IOException; -import java.nio.charset.StandardCharsets; +import java.nio.ByteBuffer; import org.apache.arrow.vector.VarCharVector; import org.apache.arrow.vector.complex.impl.VarCharWriterImpl; @@ -43,12 +43,12 @@ public AvroStringConsumer(VarCharVector vector) { @Override public void consume(Decoder decoder) throws IOException { VarCharHolder holder = new VarCharHolder(); - byte[] value = decoder.readString().getBytes(StandardCharsets.UTF_8); + ByteBuffer byteBuffer = decoder.readBytes(null); holder.start = 0; - holder.end = value.length; - holder.buffer = vector.getAllocator().buffer(value.length); - holder.buffer.setBytes(0, value); + holder.end = byteBuffer.capacity(); + holder.buffer = vector.getAllocator().buffer(byteBuffer.capacity()); + holder.buffer.setBytes(0, byteBuffer); writer.write(holder); writer.setPosition(writer.getPosition() + 1); diff --git a/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java b/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java index c88fa8e2ba1..d880639acc4 100644 --- a/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java +++ b/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java @@ -29,6 +29,7 @@ import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.Objects; import org.apache.arrow.memory.BaseAllocator; @@ -36,7 +37,6 @@ import org.apache.arrow.vector.FieldVector; import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.io.BinaryDecoder; import org.apache.avro.io.BinaryEncoder; @@ -60,24 +60,32 @@ public void init() { allocator = new RootAllocator(Long.MAX_VALUE); } - @Test - public void testStringType() throws Exception { - File dataFile = TMP.newFile(); + private Schema getSchema(String schemaName) throws Exception { Path schemaPath = Paths.get(TestWriteReadAvroRecord.class.getResource("/").getPath(), - "schema", "test_primitive_string.avsc"); - Schema schema = new Schema.Parser().parse(schemaPath.toFile()); + "schema", schemaName); + return new Schema.Parser().parse(schemaPath.toFile()); + } + + private VectorSchemaRoot writeAndReadPrimitive(Schema schema, List data) throws Exception { + File dataFile = TMP.newFile(); BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(new FileOutputStream(dataFile), null); DatumWriter writer = new GenericDatumWriter(schema); BinaryDecoder decoder = new DecoderFactory().directBinaryDecoder(new FileInputStream(dataFile), null); - GenericDatumReader reader = new GenericDatumReader(schema); - ArrayList data = new ArrayList(Arrays.asList("v1", "v2", "v3", "v4", "v5")); - for (String value : data) { + for (Object value : data) { writer.write(value, encoder); } - VectorSchemaRoot root = AvroToArrow.avroToArrow(reader, decoder, allocator); + return AvroToArrow.avroToArrow(schema, decoder, allocator); + } + + @Test + public void testStringType() throws Exception { + Schema schema = getSchema("test_primitive_string.avsc"); + ArrayList data = new ArrayList(Arrays.asList("v1", "v2", "v3", "v4", "v5")); + + VectorSchemaRoot root = writeAndReadPrimitive(schema, data); FieldVector vector = root.getFieldVectors().get(0); checkPrimitiveResult(schema, data, vector); @@ -85,23 +93,10 @@ public void testStringType() throws Exception { @Test public void testIntType() throws Exception { - - File dataFile = TMP.newFile(); - Path schemaPath = Paths.get(TestWriteReadAvroRecord.class.getResource("/").getPath(), - "schema", "test_primitive_int.avsc"); - Schema schema = new Schema.Parser().parse(schemaPath.toFile()); - - BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(new FileOutputStream(dataFile), null); - DatumWriter writer = new GenericDatumWriter(schema); - BinaryDecoder decoder = new DecoderFactory().directBinaryDecoder(new FileInputStream(dataFile), null); - GenericDatumReader reader = new GenericDatumReader(schema); - + Schema schema = getSchema("test_primitive_int.avsc"); ArrayList data = new ArrayList(Arrays.asList(1, 2, 3, 4, 5)); - for (int value : data) { - writer.write(value, encoder); - } - VectorSchemaRoot root = AvroToArrow.avroToArrow(reader, decoder, allocator); + VectorSchemaRoot root = writeAndReadPrimitive(schema, data); FieldVector vector = root.getFieldVectors().get(0); checkPrimitiveResult(schema, data, vector); @@ -109,23 +104,10 @@ public void testIntType() throws Exception { @Test public void testLongType() throws Exception { - - File dataFile = TMP.newFile(); - Path schemaPath = Paths.get(TestWriteReadAvroRecord.class.getResource("/").getPath(), - "schema", "test_primitive_long.avsc"); - Schema schema = new Schema.Parser().parse(schemaPath.toFile()); - - BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(new FileOutputStream(dataFile), null); - DatumWriter writer = new GenericDatumWriter(schema); - BinaryDecoder decoder = new DecoderFactory().directBinaryDecoder(new FileInputStream(dataFile), null); - GenericDatumReader reader = new GenericDatumReader(schema); - + Schema schema = getSchema("test_primitive_long.avsc"); ArrayList data = new ArrayList(Arrays.asList(1L, 2L, 3L, 4L, 5L)); - for (long value : data) { - writer.write(value, encoder); - } - VectorSchemaRoot root = AvroToArrow.avroToArrow(reader, decoder, allocator); + VectorSchemaRoot root = writeAndReadPrimitive(schema, data); FieldVector vector = root.getFieldVectors().get(0); checkPrimitiveResult(schema, data, vector); @@ -133,23 +115,10 @@ public void testLongType() throws Exception { @Test public void testFloatType() throws Exception { - - File dataFile = TMP.newFile(); - Path schemaPath = Paths.get(TestWriteReadAvroRecord.class.getResource("/").getPath(), - "schema", "test_primitive_float.avsc"); - Schema schema = new Schema.Parser().parse(schemaPath.toFile()); - - BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(new FileOutputStream(dataFile), null); - DatumWriter writer = new GenericDatumWriter(schema); - BinaryDecoder decoder = new DecoderFactory().directBinaryDecoder(new FileInputStream(dataFile), null); - GenericDatumReader reader = new GenericDatumReader(schema); - + Schema schema = getSchema("test_primitive_float.avsc"); ArrayList data = new ArrayList(Arrays.asList(1.1f, 2.2f, 3.3f, 4.4f, 5.5f)); - for (float value : data) { - writer.write(value, encoder); - } - VectorSchemaRoot root = AvroToArrow.avroToArrow(reader, decoder, allocator); + VectorSchemaRoot root = writeAndReadPrimitive(schema, data); FieldVector vector = root.getFieldVectors().get(0); checkPrimitiveResult(schema, data, vector); @@ -157,23 +126,10 @@ public void testFloatType() throws Exception { @Test public void testDoubleType() throws Exception { - - File dataFile = TMP.newFile(); - Path schemaPath = Paths.get(TestWriteReadAvroRecord.class.getResource("/").getPath(), - "schema", "test_primitive_double.avsc"); - Schema schema = new Schema.Parser().parse(schemaPath.toFile()); - - BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(new FileOutputStream(dataFile), null); - DatumWriter writer = new GenericDatumWriter(schema); - BinaryDecoder decoder = new DecoderFactory().directBinaryDecoder(new FileInputStream(dataFile), null); - GenericDatumReader reader = new GenericDatumReader(schema); - + Schema schema = getSchema("test_primitive_double.avsc"); ArrayList data = new ArrayList(Arrays.asList(1.1, 2.2, 3.3, 4.4, 5.5)); - for (double value : data) { - writer.write(value, encoder); - } - VectorSchemaRoot root = AvroToArrow.avroToArrow(reader, decoder, allocator); + VectorSchemaRoot root = writeAndReadPrimitive(schema, data); FieldVector vector = root.getFieldVectors().get(0); checkPrimitiveResult(schema, data, vector); @@ -181,27 +137,15 @@ public void testDoubleType() throws Exception { @Test public void testBytesType() throws Exception { - File dataFile = TMP.newFile(); - Path schemaPath = Paths.get(TestWriteReadAvroRecord.class.getResource("/").getPath(), - "schema", "test_primitive_bytes.avsc"); - Schema schema = new Schema.Parser().parse(schemaPath.toFile()); - - BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(new FileOutputStream(dataFile), null); - DatumWriter writer = new GenericDatumWriter(schema); - BinaryDecoder decoder = new DecoderFactory().directBinaryDecoder(new FileInputStream(dataFile), null); - GenericDatumReader reader = new GenericDatumReader(schema); - - ArrayList data = new ArrayList(Arrays.asList( - "value1".getBytes(StandardCharsets.UTF_8), - "value2".getBytes(StandardCharsets.UTF_8), - "value3".getBytes(StandardCharsets.UTF_8), - "value4".getBytes(StandardCharsets.UTF_8), - "value5".getBytes(StandardCharsets.UTF_8))); - for (byte[] value : data) { - writer.write(ByteBuffer.wrap(value), encoder); - } - - VectorSchemaRoot root = AvroToArrow.avroToArrow(reader, decoder, allocator); + Schema schema = getSchema("test_primitive_bytes.avsc"); + ArrayList data = new ArrayList(Arrays.asList( + ByteBuffer.wrap("value1".getBytes(StandardCharsets.UTF_8)), + ByteBuffer.wrap("value2".getBytes(StandardCharsets.UTF_8)), + ByteBuffer.wrap("value3".getBytes(StandardCharsets.UTF_8)), + ByteBuffer.wrap("value4".getBytes(StandardCharsets.UTF_8)), + ByteBuffer.wrap("value5".getBytes(StandardCharsets.UTF_8)))); + + VectorSchemaRoot root = writeAndReadPrimitive(schema, data); FieldVector vector = root.getFieldVectors().get(0); checkPrimitiveResult(schema, data, vector); @@ -209,23 +153,10 @@ public void testBytesType() throws Exception { @Test public void testBooleanType() throws Exception { - - File dataFile = TMP.newFile(); - Path schemaPath = Paths.get(TestWriteReadAvroRecord.class.getResource("/").getPath(), - "schema", "test_primitive_boolean.avsc"); - Schema schema = new Schema.Parser().parse(schemaPath.toFile()); - - BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(new FileOutputStream(dataFile), null); - DatumWriter writer = new GenericDatumWriter(schema); - BinaryDecoder decoder = new DecoderFactory().directBinaryDecoder(new FileInputStream(dataFile), null); - GenericDatumReader reader = new GenericDatumReader(schema); - + Schema schema = getSchema("test_primitive_boolean.avsc"); ArrayList data = new ArrayList(Arrays.asList(true, false, true, false, true)); - for (boolean value : data) { - writer.write(value, encoder); - } - VectorSchemaRoot root = AvroToArrow.avroToArrow(reader, decoder, allocator); + VectorSchemaRoot root = writeAndReadPrimitive(schema, data); FieldVector vector = root.getFieldVectors().get(0); checkPrimitiveResult(schema, data, vector); @@ -237,7 +168,6 @@ private void checkPrimitiveResult(Schema schema, ArrayList data, FieldVector vec Object value1 = data.get(i); Object value2 = vector.getObject(i); if (schema.getType() == Schema.Type.BYTES) { - value1 = ByteBuffer.wrap((byte[]) value1); value2 = ByteBuffer.wrap((byte[]) value2); } else if (schema.getType() == Schema.Type.STRING) { value2 = value2.toString(); diff --git a/java/adapter/avro/src/test/resources/schema/test.avsc b/java/adapter/avro/src/test/resources/schema/test.avsc index 15fdd76361b..92c0873de1d 100644 --- a/java/adapter/avro/src/test/resources/schema/test.avsc +++ b/java/adapter/avro/src/test/resources/schema/test.avsc @@ -15,7 +15,8 @@ * limitations under the License. */ -{"namespace": "org.apache.arrow.avro", +{ + "namespace": "org.apache.arrow.avro", "type": "record", "name": "User", "fields": [ From 24394780f297906ef2047b221baee1a67aecc2f2 Mon Sep 17 00:00:00 2001 From: tianchen Date: Thu, 18 Jul 2019 14:35:42 +0800 Subject: [PATCH 5/5] use UnsupportedOperationException --- .../java/org/apache/arrow/AvroToArrowUtils.java | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java index 184d09ae3ff..c142689ddda 100644 --- a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java +++ b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java @@ -53,8 +53,6 @@ import org.apache.avro.Schema.Type; import org.apache.avro.io.Decoder; -import sun.reflect.generics.reflectiveObjects.NotImplementedException; - /** * Class that does most of the work to convert Avro data into Arrow columnar format Vector objects. */ @@ -115,17 +113,17 @@ public static org.apache.arrow.vector.types.pojo.Schema avroToArrowSchema(Schema schema.getObjectProps().forEach((k,v) -> metadata.put(k, v.toString())); if (type == Type.RECORD) { - throw new NotImplementedException(); + throw new UnsupportedOperationException(); } else if (type == Type.MAP) { - throw new NotImplementedException(); + throw new UnsupportedOperationException(); } else if (type == Type.UNION) { - throw new NotImplementedException(); + throw new UnsupportedOperationException(); } else if (type == Type.ARRAY) { - throw new NotImplementedException(); + throw new UnsupportedOperationException(); } else if (type == Type.ENUM) { - throw new NotImplementedException(); + throw new UnsupportedOperationException(); } else if (type == Type.NULL) { - throw new NotImplementedException(); + throw new UnsupportedOperationException(); } else { final FieldType fieldType = new FieldType(true, getArrowType(type), null, null); arrowFields.add(new Field("", fieldType, null));