diff --git a/java/adapter/avro/pom.xml b/java/adapter/avro/pom.xml index 4e13d57fb34..8ef7548d38e 100644 --- a/java/adapter/avro/pom.xml +++ b/java/adapter/avro/pom.xml @@ -28,6 +28,20 @@ + + + org.apache.arrow + arrow-memory + ${project.version} + + + + + org.apache.arrow + arrow-vector + ${project.version} + + org.apache.avro avro diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java new file mode 100644 index 00000000000..4801d690125 --- /dev/null +++ b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrow.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow; + +import java.io.IOException; + +import org.apache.arrow.memory.BaseAllocator; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.io.Decoder; + +/** + * Utility class to convert Avro objects to columnar Arrow format objects. + */ +public class AvroToArrow { + + /** + * Fetch the data from {@link GenericDatumReader} and convert it to Arrow objects. + * @param schema avro schema. + * @param allocator Memory allocator to use. + * @return Arrow Data Objects {@link VectorSchemaRoot} + */ + public static VectorSchemaRoot avroToArrow(Schema schema, Decoder decoder, BaseAllocator allocator) + throws IOException { + Preconditions.checkNotNull(schema, "Avro schema object can not be null"); + + VectorSchemaRoot root = VectorSchemaRoot.create( + AvroToArrowUtils.avroToArrowSchema(schema), allocator); + AvroToArrowUtils.avroToArrowVectors(decoder, root); + return root; + } +} diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java new file mode 100644 index 00000000000..c142689ddda --- /dev/null +++ b/java/adapter/avro/src/main/java/org/apache/arrow/AvroToArrowUtils.java @@ -0,0 +1,210 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow; + +import static org.apache.arrow.vector.types.FloatingPointPrecision.DOUBLE; +import static org.apache.arrow.vector.types.FloatingPointPrecision.SINGLE; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.arrow.consumers.AvroBooleanConsumer; +import org.apache.arrow.consumers.AvroBytesConsumer; +import org.apache.arrow.consumers.AvroDoubleConsumer; +import org.apache.arrow.consumers.AvroFloatConsumer; +import org.apache.arrow.consumers.AvroIntConsumer; +import org.apache.arrow.consumers.AvroLongConsumer; +import org.apache.arrow.consumers.AvroStringConsumer; +import org.apache.arrow.consumers.Consumer; +import org.apache.arrow.util.Preconditions; +import org.apache.arrow.vector.BaseFixedWidthVector; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Type; +import org.apache.avro.io.Decoder; + +/** + * Class that does most of the work to convert Avro data into Arrow columnar format Vector objects. + */ +public class AvroToArrowUtils { + + private static final int DEFAULT_BUFFER_SIZE = 256; + + /** + * Creates an {@link org.apache.arrow.vector.types.pojo.ArrowType} from the {@link Schema.Field} + * +

This method currently performs following type mapping for Avro data types to corresponding Arrow data types. + * + *

+ */ + private static ArrowType getArrowType(Type type) { + + Preconditions.checkNotNull(type, "Avro type object can't be null"); + + switch (type) { + case STRING: + return new ArrowType.Utf8(); + case INT: + return new ArrowType.Int(32, /*signed=*/true); + case BOOLEAN: + return new ArrowType.Bool(); + case LONG: + return new ArrowType.Int(64, /*signed=*/true); + case FLOAT: + return new ArrowType.FloatingPoint(SINGLE); + case DOUBLE: + return new ArrowType.FloatingPoint(DOUBLE); + case BYTES: + return new ArrowType.Binary(); + default: + // no-op, shouldn't get here + throw new RuntimeException("Can't convert avro type %s to arrow type." + type.getName()); + } + } + + /** + * Create Arrow {@link org.apache.arrow.vector.types.pojo.Schema} object for the given Avro {@link Schema}. + */ + public static org.apache.arrow.vector.types.pojo.Schema avroToArrowSchema(Schema schema) { + + Preconditions.checkNotNull(schema, "Avro Schema object can't be null"); + List arrowFields = new ArrayList<>(); + + Schema.Type type = schema.getType(); + final Map metadata = new HashMap<>(); + schema.getObjectProps().forEach((k,v) -> metadata.put(k, v.toString())); + + if (type == Type.RECORD) { + throw new UnsupportedOperationException(); + } else if (type == Type.MAP) { + throw new UnsupportedOperationException(); + } else if (type == Type.UNION) { + throw new UnsupportedOperationException(); + } else if (type == Type.ARRAY) { + throw new UnsupportedOperationException(); + } else if (type == Type.ENUM) { + throw new UnsupportedOperationException(); + } else if (type == Type.NULL) { + throw new UnsupportedOperationException(); + } else { + final FieldType fieldType = new FieldType(true, getArrowType(type), null, null); + arrowFields.add(new Field("", fieldType, null)); + } + + return new org.apache.arrow.vector.types.pojo.Schema(arrowFields, /*metadata=*/ metadata); + } + + /** + * Create consumers to consume avro values from decoder, will reduce boxing/unboxing operations. + */ + public static Consumer[] createAvroConsumers(VectorSchemaRoot root) { + + Consumer[] consumers = new Consumer[root.getFieldVectors().size()]; + for (int i = 0; i < root.getFieldVectors().size(); i++) { + FieldVector vector = root.getFieldVectors().get(i); + Consumer consumer; + switch (vector.getMinorType()) { + case INT: + consumer = new AvroIntConsumer((IntVector) vector); + break; + case VARBINARY: + consumer = new AvroBytesConsumer((VarBinaryVector) vector); + break; + case VARCHAR: + consumer = new AvroStringConsumer((VarCharVector) vector); + break; + case BIGINT: + consumer = new AvroLongConsumer((BigIntVector) vector); + break; + case FLOAT4: + consumer = new AvroFloatConsumer((Float4Vector) vector); + break; + case FLOAT8: + consumer = new AvroDoubleConsumer((Float8Vector) vector); + break; + case BIT: + consumer = new AvroBooleanConsumer((BitVector) vector); + break; + default: + throw new RuntimeException("could not get consumer from type:" + vector.getMinorType()); + } + consumers[i] = consumer; + } + return consumers; + } + + /** + * Iterate the given Avro {@link Decoder} object to fetch the data and transpose it to populate + * the given Arrow Vector objects. + * @param decoder avro decoder to read data. + * @param root Arrow {@link VectorSchemaRoot} object to populate + */ + public static void avroToArrowVectors(Decoder decoder, VectorSchemaRoot root) throws IOException { + + Preconditions.checkNotNull(decoder, "Avro decoder object can't be null"); + Preconditions.checkNotNull(root, "VectorSchemaRoot object can't be null"); + + allocateVectors(root, DEFAULT_BUFFER_SIZE); + Consumer[] consumers = createAvroConsumers(root); + while (true) { + try { + for (Consumer consumer : consumers) { + consumer.consume(decoder); + } + //reach end will throw EOFException. + } catch (EOFException eofException) { + break; + } + } + } + + private static void allocateVectors(VectorSchemaRoot root, int size) { + List vectors = root.getFieldVectors(); + for (FieldVector fieldVector : vectors) { + if (fieldVector instanceof BaseFixedWidthVector) { + ((BaseFixedWidthVector) fieldVector).allocateNew(size); + } else { + fieldVector.allocateNew(); + } + fieldVector.setInitialCapacity(size); + } + } +} diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java new file mode 100644 index 00000000000..7bbfac1a230 --- /dev/null +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBooleanConsumer.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.complex.impl.BitWriterImpl; +import org.apache.arrow.vector.complex.writer.BitWriter; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume boolean type values from avro decoder. + * Write the data to {@link BitVector}. + */ +public class AvroBooleanConsumer implements Consumer { + + private final BitWriter writer; + + public AvroBooleanConsumer(BitVector vector) { + this.writer = new BitWriterImpl(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + writer.writeBit(decoder.readBoolean() ? 1 : 0); + writer.setPosition(writer.getPosition() + 1); + } +} diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java new file mode 100644 index 00000000000..9c3eff70d73 --- /dev/null +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroBytesConsumer.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.complex.impl.VarBinaryWriterImpl; +import org.apache.arrow.vector.complex.writer.VarBinaryWriter; +import org.apache.arrow.vector.holders.VarBinaryHolder; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume bytes type values from avro decoder. + * Write the data to {@link VarBinaryVector}. + */ +public class AvroBytesConsumer implements Consumer { + + private final VarBinaryWriter writer; + private final VarBinaryVector vector; + + public AvroBytesConsumer(VarBinaryVector vector) { + this.vector = vector; + this.writer = new VarBinaryWriterImpl(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + VarBinaryHolder holder = new VarBinaryHolder(); + ByteBuffer byteBuffer = decoder.readBytes(null); + + holder.start = 0; + holder.end = byteBuffer.capacity(); + holder.buffer = vector.getAllocator().buffer(byteBuffer.capacity()); + holder.buffer.setBytes(0, byteBuffer); + + writer.write(holder); + writer.setPosition(writer.getPosition() + 1); + } +} diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java new file mode 100644 index 00000000000..62dc315084f --- /dev/null +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroDoubleConsumer.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.complex.impl.Float8WriterImpl; +import org.apache.arrow.vector.complex.writer.Float8Writer; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume double type values from avro decoder. + * Write the data to {@link Float8Vector}. + */ +public class AvroDoubleConsumer implements Consumer { + + private final Float8Writer writer; + + public AvroDoubleConsumer(Float8Vector vector) { + this.writer = new Float8WriterImpl(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + writer.writeFloat8(decoder.readDouble()); + writer.setPosition(writer.getPosition() + 1); + } +} diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java new file mode 100644 index 00000000000..2bec2b2d090 --- /dev/null +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroFloatConsumer.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.complex.impl.Float4WriterImpl; +import org.apache.arrow.vector.complex.writer.Float4Writer; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume float type values from avro decoder. + * Write the data to {@link Float4Vector}. + */ +public class AvroFloatConsumer implements Consumer { + + private final Float4Writer writer; + + public AvroFloatConsumer(Float4Vector vector) { + this.writer = new Float4WriterImpl(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + writer.writeFloat4(decoder.readFloat()); + writer.setPosition(writer.getPosition() + 1); + } +} diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java new file mode 100644 index 00000000000..60285f06af4 --- /dev/null +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroIntConsumer.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.complex.impl.IntWriterImpl; +import org.apache.arrow.vector.complex.writer.IntWriter; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume int type values from avro decoder. + * Write the data to {@link IntVector}. + */ +public class AvroIntConsumer implements Consumer { + + private final IntWriter writer; + + public AvroIntConsumer(IntVector vector) { + this.writer = new IntWriterImpl(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + writer.writeInt(decoder.readInt()); + writer.setPosition(writer.getPosition() + 1); + } +} diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java new file mode 100644 index 00000000000..15756afd69f --- /dev/null +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroLongConsumer.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.complex.impl.BigIntWriterImpl; +import org.apache.arrow.vector.complex.writer.BigIntWriter; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume long type values from avro decoder. + * Write the data to {@link BigIntVector}. + */ +public class AvroLongConsumer implements Consumer { + + private final BigIntWriter writer; + + public AvroLongConsumer(BigIntVector vector) { + this.writer = new BigIntWriterImpl(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + writer.writeBigInt(decoder.readLong()); + writer.setPosition(writer.getPosition() + 1); + } +} diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java new file mode 100644 index 00000000000..db438f96e91 --- /dev/null +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/AvroStringConsumer.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.complex.impl.VarCharWriterImpl; +import org.apache.arrow.vector.complex.writer.VarCharWriter; +import org.apache.arrow.vector.holders.VarCharHolder; +import org.apache.avro.io.Decoder; + +/** + * Consumer which consume string type values from avro decoder. + * Write the data to {@link VarCharVector}. + */ +public class AvroStringConsumer implements Consumer { + + private final VarCharVector vector; + private final VarCharWriter writer; + + public AvroStringConsumer(VarCharVector vector) { + this.vector = vector; + this.writer = new VarCharWriterImpl(vector); + } + + @Override + public void consume(Decoder decoder) throws IOException { + VarCharHolder holder = new VarCharHolder(); + ByteBuffer byteBuffer = decoder.readBytes(null); + + holder.start = 0; + holder.end = byteBuffer.capacity(); + holder.buffer = vector.getAllocator().buffer(byteBuffer.capacity()); + holder.buffer.setBytes(0, byteBuffer); + + writer.write(holder); + writer.setPosition(writer.getPosition() + 1); + } +} diff --git a/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java new file mode 100644 index 00000000000..b3c52818491 --- /dev/null +++ b/java/adapter/avro/src/main/java/org/apache/arrow/consumers/Consumer.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.consumers; + +import java.io.IOException; + +import org.apache.avro.io.Decoder; + +/** + * An abstraction that is used to consume values from avro decoder. + */ +public interface Consumer { + + void consume(Decoder decoder) throws IOException; +} diff --git a/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java b/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java new file mode 100644 index 00000000000..d880639acc4 --- /dev/null +++ b/java/adapter/avro/src/test/java/org/apache/arrow/AvroToArrowTest.java @@ -0,0 +1,178 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Objects; + +import org.apache.arrow.memory.BaseAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.EncoderFactory; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +public class AvroToArrowTest { + + @ClassRule + public static final TemporaryFolder TMP = new TemporaryFolder(); + + private BaseAllocator allocator; + + @Before + public void init() { + allocator = new RootAllocator(Long.MAX_VALUE); + } + + private Schema getSchema(String schemaName) throws Exception { + Path schemaPath = Paths.get(TestWriteReadAvroRecord.class.getResource("/").getPath(), + "schema", schemaName); + return new Schema.Parser().parse(schemaPath.toFile()); + } + + private VectorSchemaRoot writeAndReadPrimitive(Schema schema, List data) throws Exception { + File dataFile = TMP.newFile(); + + BinaryEncoder encoder = new EncoderFactory().directBinaryEncoder(new FileOutputStream(dataFile), null); + DatumWriter writer = new GenericDatumWriter(schema); + BinaryDecoder decoder = new DecoderFactory().directBinaryDecoder(new FileInputStream(dataFile), null); + + for (Object value : data) { + writer.write(value, encoder); + } + + return AvroToArrow.avroToArrow(schema, decoder, allocator); + } + + @Test + public void testStringType() throws Exception { + Schema schema = getSchema("test_primitive_string.avsc"); + ArrayList data = new ArrayList(Arrays.asList("v1", "v2", "v3", "v4", "v5")); + + VectorSchemaRoot root = writeAndReadPrimitive(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + checkPrimitiveResult(schema, data, vector); + } + + @Test + public void testIntType() throws Exception { + Schema schema = getSchema("test_primitive_int.avsc"); + ArrayList data = new ArrayList(Arrays.asList(1, 2, 3, 4, 5)); + + VectorSchemaRoot root = writeAndReadPrimitive(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + checkPrimitiveResult(schema, data, vector); + } + + @Test + public void testLongType() throws Exception { + Schema schema = getSchema("test_primitive_long.avsc"); + ArrayList data = new ArrayList(Arrays.asList(1L, 2L, 3L, 4L, 5L)); + + VectorSchemaRoot root = writeAndReadPrimitive(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + checkPrimitiveResult(schema, data, vector); + } + + @Test + public void testFloatType() throws Exception { + Schema schema = getSchema("test_primitive_float.avsc"); + ArrayList data = new ArrayList(Arrays.asList(1.1f, 2.2f, 3.3f, 4.4f, 5.5f)); + + VectorSchemaRoot root = writeAndReadPrimitive(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + checkPrimitiveResult(schema, data, vector); + } + + @Test + public void testDoubleType() throws Exception { + Schema schema = getSchema("test_primitive_double.avsc"); + ArrayList data = new ArrayList(Arrays.asList(1.1, 2.2, 3.3, 4.4, 5.5)); + + VectorSchemaRoot root = writeAndReadPrimitive(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + checkPrimitiveResult(schema, data, vector); + } + + @Test + public void testBytesType() throws Exception { + Schema schema = getSchema("test_primitive_bytes.avsc"); + ArrayList data = new ArrayList(Arrays.asList( + ByteBuffer.wrap("value1".getBytes(StandardCharsets.UTF_8)), + ByteBuffer.wrap("value2".getBytes(StandardCharsets.UTF_8)), + ByteBuffer.wrap("value3".getBytes(StandardCharsets.UTF_8)), + ByteBuffer.wrap("value4".getBytes(StandardCharsets.UTF_8)), + ByteBuffer.wrap("value5".getBytes(StandardCharsets.UTF_8)))); + + VectorSchemaRoot root = writeAndReadPrimitive(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + checkPrimitiveResult(schema, data, vector); + } + + @Test + public void testBooleanType() throws Exception { + Schema schema = getSchema("test_primitive_boolean.avsc"); + ArrayList data = new ArrayList(Arrays.asList(true, false, true, false, true)); + + VectorSchemaRoot root = writeAndReadPrimitive(schema, data); + FieldVector vector = root.getFieldVectors().get(0); + + checkPrimitiveResult(schema, data, vector); + } + + private void checkPrimitiveResult(Schema schema, ArrayList data, FieldVector vector) { + assertEquals(data.size(), vector.getValueCount()); + for (int i = 0; i < data.size(); i++) { + Object value1 = data.get(i); + Object value2 = vector.getObject(i); + if (schema.getType() == Schema.Type.BYTES) { + value2 = ByteBuffer.wrap((byte[]) value2); + } else if (schema.getType() == Schema.Type.STRING) { + value2 = value2.toString(); + } + assertTrue(Objects.equals(value1, value2)); + } + } +} diff --git a/java/adapter/avro/src/test/resources/schema/test.avsc b/java/adapter/avro/src/test/resources/schema/test.avsc index 15fdd76361b..92c0873de1d 100644 --- a/java/adapter/avro/src/test/resources/schema/test.avsc +++ b/java/adapter/avro/src/test/resources/schema/test.avsc @@ -15,7 +15,8 @@ * limitations under the License. */ -{"namespace": "org.apache.arrow.avro", +{ + "namespace": "org.apache.arrow.avro", "type": "record", "name": "User", "fields": [ diff --git a/java/adapter/avro/src/test/resources/schema/test_primitive_boolean.avsc b/java/adapter/avro/src/test/resources/schema/test_primitive_boolean.avsc new file mode 100644 index 00000000000..7652ce72385 --- /dev/null +++ b/java/adapter/avro/src/test/resources/schema/test_primitive_boolean.avsc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "boolean", + "name": "TestBoolean" +} diff --git a/java/adapter/avro/src/test/resources/schema/test_primitive_bytes.avsc b/java/adapter/avro/src/test/resources/schema/test_primitive_bytes.avsc new file mode 100644 index 00000000000..5102430b65a --- /dev/null +++ b/java/adapter/avro/src/test/resources/schema/test_primitive_bytes.avsc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "bytes", + "name": "TestBytes" +} diff --git a/java/adapter/avro/src/test/resources/schema/test_primitive_double.avsc b/java/adapter/avro/src/test/resources/schema/test_primitive_double.avsc new file mode 100644 index 00000000000..d1ae0b605a9 --- /dev/null +++ b/java/adapter/avro/src/test/resources/schema/test_primitive_double.avsc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "double", + "name": "TestDouble" +} diff --git a/java/adapter/avro/src/test/resources/schema/test_primitive_float.avsc b/java/adapter/avro/src/test/resources/schema/test_primitive_float.avsc new file mode 100644 index 00000000000..675d1090d86 --- /dev/null +++ b/java/adapter/avro/src/test/resources/schema/test_primitive_float.avsc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "float", + "name": "TestFloat" +} diff --git a/java/adapter/avro/src/test/resources/schema/test_primitive_int.avsc b/java/adapter/avro/src/test/resources/schema/test_primitive_int.avsc new file mode 100644 index 00000000000..8fc8488281a --- /dev/null +++ b/java/adapter/avro/src/test/resources/schema/test_primitive_int.avsc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "int", + "name": "TestInt" +} diff --git a/java/adapter/avro/src/test/resources/schema/test_primitive_long.avsc b/java/adapter/avro/src/test/resources/schema/test_primitive_long.avsc new file mode 100644 index 00000000000..b9706107c09 --- /dev/null +++ b/java/adapter/avro/src/test/resources/schema/test_primitive_long.avsc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "long", + "name": "TestLong" +} diff --git a/java/adapter/avro/src/test/resources/schema/test_primitive_string.avsc b/java/adapter/avro/src/test/resources/schema/test_primitive_string.avsc new file mode 100644 index 00000000000..b4a89a7f62c --- /dev/null +++ b/java/adapter/avro/src/test/resources/schema/test_primitive_string.avsc @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +{ + "namespace": "org.apache.arrow.avro", + "type": "string", + "name": "TestString" +}