diff --git a/.baseline/checkstyle/.checkstyle.xml.swp b/.baseline/checkstyle/.checkstyle.xml.swp deleted file mode 100644 index dac6cf2a8121..000000000000 Binary files a/.baseline/checkstyle/.checkstyle.xml.swp and /dev/null differ diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/reader/ArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/reader/ArrowReader.java new file mode 100644 index 000000000000..f7e1f6d79e8c --- /dev/null +++ b/arrow/src/main/java/org/apache/iceberg/arrow/reader/ArrowReader.java @@ -0,0 +1,242 @@ +package org.apache.iceberg.arrow.reader; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import javax.annotation.concurrent.NotThreadSafe; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VectorLoader; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.VectorUnloader; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.spark.TaskContext; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.execution.arrow.ArrowUtils; +import org.apache.spark.sql.execution.arrow.ArrowWriter; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.vectorized.ArrowColumnVector; +import org.apache.spark.sql.vectorized.ColumnVector; +import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.apache.spark.util.TaskCompletionListener; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/*** + * This is a helper class for Arrow reading. It provides two main converter methods. + * These converter methods are currently used to first convert a Parquet FileIterator + * into Iterator over ArrowRecordBatches. Second, the ArrowRecordBatch is made + * into Columnar Batch and exposed as an Iterator over InternalRow. The second step is to + * done to conform to Spark's current interface. When Spark adds Arrow support we will + * take the second iterator out and just return the first one. + */ +public class ArrowReader { + + private static final Logger LOG = LoggerFactory.getLogger(ArrowReader.class); + + /*** + * Accepts an iterator over ArrowRecordBatches and copies into ColumnarBatches. + * Since Spark uses Iterator over InternalRow we return this over ColumarBatch. + * @param arrowBatchIter + * @param sparkSchema + * @param timeZoneId + * @return + */ + public static InternalRowOverArrowBatchIterator fromBatchIterator( + Iterator arrowBatchIter, + StructType sparkSchema, + String timeZoneId) { + + // timeZoneId required for TimestampType in StructType + Schema arrowSchema = ArrowUtils.toArrowSchema(sparkSchema, timeZoneId); + BufferAllocator allocator = + ArrowUtils.rootAllocator().newChildAllocator("fromBatchIterator", 0, Long.MAX_VALUE); + + VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator); + + return new InternalRowOverArrowBatchIterator(arrowBatchIter, allocator, root); + } + + @NotThreadSafe + public static class InternalRowOverArrowBatchIterator implements Iterator, Closeable { + + private final Iterator arrowBatchIter; + private final BufferAllocator allocator; + private final VectorSchemaRoot root; + + private Iterator rowIter; + + InternalRowOverArrowBatchIterator(Iterator arrowBatchIter, + BufferAllocator allocator, + VectorSchemaRoot root) { + + this.arrowBatchIter = arrowBatchIter; + this.allocator = allocator; + this.root = root; + + } + + + + @Override + public boolean hasNext() { + if (rowIter != null && rowIter.hasNext()) { + return true; + } + if (arrowBatchIter.hasNext()) { + rowIter = nextBatch(); + return true; + } else { + try { + close(); + } catch (IOException ioe) { + throw new RuntimeException("Encountered an error while closing iterator. "+ioe.getMessage(), ioe); + } + return false; + } + } + + @Override + public InternalRow next() { + return rowIter.next(); + } + + private Iterator nextBatch() { + ArrowRecordBatch arrowRecordBatch = arrowBatchIter.next(); + long start = System.currentTimeMillis(); + root.setRowCount(0); + VectorLoader vectorLoader = new VectorLoader(root); + vectorLoader.load(arrowRecordBatch); + arrowRecordBatch.close(); + + List fieldVectors = root.getFieldVectors(); + ColumnVector[] columns = new ColumnVector[fieldVectors.size()]; + for(int i=0; i Created Columnar Batch with "+root.getRowCount()+ " rows" + + ". Took " + (System.currentTimeMillis() - start) + " milliseconds."); + return batch.rowIterator(); + } + + + @Override + public void close() throws IOException { + // arrowWriter.finish(); + root.close(); + allocator.close(); + } + + } + + /** + * Acceepts Iterator over InternalRow coming in from ParqeutReader's FileIterator + * and creates ArrowRecordBatches over that by collecting rows from the input iter. + * Each next() call over this iterator will collect up to maxRecordsPerBatch rows + * at a time and create an Arrow batch with it and returns an iterator over that. + * @param rowIter + * @param sparkSchema + * @param maxRecordsPerBatch + * @param timezonId + * @return + */ + public static ArrowRecordBatchIterator toBatchIterator( + Iterator rowIter, + StructType sparkSchema, int maxRecordsPerBatch, + String timezonId) { + + // StructType sparkSchema = SparkSchemaUtil.convert(icebergSchema); + TaskContext context = TaskContext.get(); + + Schema arrowSchema = ArrowUtils.toArrowSchema(sparkSchema, timezonId); + BufferAllocator allocator = ArrowUtils.rootAllocator().newChildAllocator( + "toBatchIterator", + 0, + Long.MAX_VALUE); + VectorSchemaRoot root = VectorSchemaRoot.create(arrowSchema, allocator); + + if (context!=null) { + context.addTaskCompletionListener(new TaskCompletionListener() { + @Override + public void onTaskCompletion(TaskContext context) { + root.close(); + allocator.close(); + } + }); + } + + return new ArrowRecordBatchIterator(rowIter, root, allocator, maxRecordsPerBatch); + } + + + public static class ArrowRecordBatchIterator implements Iterator, Closeable { + + final Iterator rowIterator; + final VectorSchemaRoot root; + final BufferAllocator allocator; + final int maxRecordsPerBatch; + final ArrowWriter arrowWriter; + final VectorUnloader unloader; + + ArrowRecordBatchIterator(Iterator rowIterator, + VectorSchemaRoot root, + BufferAllocator allocator, + int maxRecordsPerBatch) { + + this.unloader = new VectorUnloader(root); + this.arrowWriter = ArrowWriter.create(root); + this.rowIterator = rowIterator; + this.root = root; + this.allocator = allocator; + this.maxRecordsPerBatch = maxRecordsPerBatch; + } + + @Override + public boolean hasNext() { + + if (!rowIterator.hasNext()) { + + try { + close(); + } catch (IOException ioe) { + throw new RuntimeException("Encountered an error while closing iterator. "+ioe.getMessage(), ioe); + } + return false; + } + + return true; + } + + @Override + public ArrowRecordBatch next() { + + int rowCount = 0; + + long start = System.currentTimeMillis(); + while (rowIterator.hasNext() && (maxRecordsPerBatch <= 0 || rowCount < maxRecordsPerBatch)) { + InternalRow row = rowIterator.next(); + arrowWriter.write(row); + rowCount += 1; + } + arrowWriter.finish(); + LOG.info("[ArrowRecordBatchIterator] => Created batch with "+rowCount+ " rows. " + + "Took "+(System.currentTimeMillis() - start) + " milliseconds."); + ArrowRecordBatch batch = unloader.getRecordBatch(); + return batch; + } + + @Override + public void close() throws IOException { + // arrowWriter.finish(); + root.close(); + allocator.close(); + } + } +} diff --git a/build.gradle b/build.gradle index 282ce61d735f..51c001ca80e4 100644 --- a/build.gradle +++ b/build.gradle @@ -222,6 +222,14 @@ project(':iceberg-core') { compile("org.apache.avro:avro") { exclude group: 'org.tukaani' // xz compression is not supported } + compile("org.apache.arrow:arrow-vector") { + exclude group: 'io.netty', module: 'netty-buffer' + exclude group: 'io.netty', module: 'netty-common' + } + compileOnly("org.apache.spark:spark-hive_2.11") { + exclude group: 'org.apache.avro', module: 'avro' + } + compile "com.fasterxml.jackson.core:jackson-databind" compile "com.fasterxml.jackson.core:jackson-core" @@ -237,7 +245,11 @@ project(':iceberg-data') { dependencies { compile project(':iceberg-api') compile project(':iceberg-core') + compileOnly project(':iceberg-spark') compileOnly project(':iceberg-parquet') + compileOnly("org.apache.spark:spark-hive_2.11") { + exclude group: 'org.apache.avro', module: 'avro' + } testCompile("org.apache.hadoop:hadoop-client") { exclude group: 'org.apache.avro', module: 'avro' @@ -323,14 +335,12 @@ project(':iceberg-parquet') { dependencies { compile project(':iceberg-api') compile project(':iceberg-core') + compile project(':iceberg-arrow') - compile("org.apache.parquet:parquet-avro") { + compileOnly("org.apache.spark:spark-hive_2.11") { exclude group: 'org.apache.avro', module: 'avro' - // already shaded by Parquet - exclude group: 'it.unimi.dsi' - exclude group: 'org.codehaus.jackson' } - + compile "org.apache.parquet:parquet-avro" compileOnly "org.apache.avro:avro" compileOnly("org.apache.hadoop:hadoop-client") { exclude group: 'org.apache.avro', module: 'avro' @@ -363,6 +373,17 @@ project(':iceberg-spark') { } } +project(':iceberg-arrow') { + dependencies { +// compile project(':iceberg-spark') + compile project(':iceberg-api') + + compileOnly("org.apache.spark:spark-hive_2.11") { + exclude group: 'org.apache.avro', module: 'avro' + } + } +} + project(':iceberg-pig') { dependencies { compile project(':iceberg-api') diff --git a/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java b/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java new file mode 100644 index 000000000000..492af6180912 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/arrow/ArrowSchemaUtil.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.arrow; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import java.util.List; +import java.util.Map; +import org.apache.arrow.vector.types.DateUnit; +import org.apache.arrow.vector.types.FloatingPointPrecision; +import org.apache.arrow.vector.types.TimeUnit; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.FieldType; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.ListType; +import org.apache.iceberg.types.Types.MapType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StructType; + +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; + + +public class ArrowSchemaUtil { + static final String ORIGINAL_TYPE = "originalType"; + static final String MAP_TYPE = "mapType"; + static final String MAP_KEY = "key"; + static final String MAP_VALUE = "value"; + + private ArrowSchemaUtil() { } + + /** + * Convert Iceberg schema to Arrow Schema. + * + * @param schema iceberg schema + * @return arrow schema + */ + public static Schema convert(final org.apache.iceberg.Schema schema) { + final ImmutableList.Builder fields = ImmutableList.builder(); + + for (NestedField f : schema.columns()) { + fields.add(convert(f)); + } + + return new Schema(fields.build()); + } + + public static Field convert(final NestedField field) { + final ArrowType arrowType; + + final List children = Lists.newArrayList(); + Map metadata = null; + + switch (field.type().typeId()) { + case BINARY: + arrowType = ArrowType.Binary.INSTANCE; + break; + case FIXED: + arrowType = new ArrowType.FixedSizeBinary(((Types.FixedType) field.type()).length()); + break; + case BOOLEAN: + arrowType = ArrowType.Bool.INSTANCE; + break; + case INTEGER: + arrowType = new ArrowType.Int(Integer.SIZE, true); + break; + case LONG: + arrowType = new ArrowType.Int(Long.SIZE, true); + break; + case FLOAT: + arrowType = new ArrowType.FloatingPoint(FloatingPointPrecision.SINGLE); + break; + case DOUBLE: + arrowType = new ArrowType.FloatingPoint(FloatingPointPrecision.DOUBLE); + break; + case DECIMAL: + final Types.DecimalType decimalType = (Types.DecimalType) field.type(); + arrowType = new ArrowType.Decimal(decimalType.precision(), decimalType.scale()); + break; + case STRING: + arrowType = ArrowType.Utf8.INSTANCE; + break; + case TIME: + arrowType = new ArrowType.Time(TimeUnit.MICROSECOND, Long.SIZE); + break; + case TIMESTAMP: + arrowType = new ArrowType.Timestamp(TimeUnit.MICROSECOND, "UTC"); + break; + case DATE: + arrowType = new ArrowType.Date(DateUnit.DAY); + break; + case STRUCT: + final StructType struct = field.type().asStructType(); + arrowType = ArrowType.Struct.INSTANCE; + + for (NestedField nested : struct.fields()) { + children.add(convert(nested)); + } + break; + case LIST: + final ListType listType = field.type().asListType(); + arrowType = ArrowType.List.INSTANCE; + + for (NestedField nested : listType.fields()) { + children.add(convert(nested)); + } + break; + case MAP: + //Maps are represented as List> + metadata = ImmutableMap.of(ORIGINAL_TYPE, MAP_TYPE); + final MapType mapType = field.type().asMapType(); + arrowType = ArrowType.List.INSTANCE; + + final List entryFields = Lists.newArrayList( + convert(required(0, MAP_KEY, mapType.keyType())), + convert(optional(0, MAP_VALUE, mapType.valueType())) + ); + + final Field entry = new Field("", + new FieldType(true, new ArrowType.Struct(), null), entryFields); + children.add(entry); + break; + default: throw new UnsupportedOperationException("Unsupported field type: " + field); + } + + return new Field(field.name(), new FieldType(field.isOptional(), arrowType, null, metadata), children); + } +} diff --git a/core/src/test/java/org/apache/iceberg/arrow/ArrowSchemaUtilTest.java b/core/src/test/java/org/apache/iceberg/arrow/ArrowSchemaUtilTest.java new file mode 100644 index 000000000000..f42ebde64dfa --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/arrow/ArrowSchemaUtilTest.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.arrow; + +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.iceberg.Schema; +import org.apache.iceberg.types.Type; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.BooleanType; +import org.apache.iceberg.types.Types.DateType; +import org.apache.iceberg.types.Types.DoubleType; +import org.apache.iceberg.types.Types.ListType; +import org.apache.iceberg.types.Types.LongType; +import org.apache.iceberg.types.Types.MapType; +import org.apache.iceberg.types.Types.StringType; +import org.apache.iceberg.types.Types.TimestampType; +import org.junit.Test; + +import static org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID.Bool; +import static org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID.Date; +import static org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID.FloatingPoint; +import static org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID.Int; +import static org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID.List; +import static org.apache.arrow.vector.types.pojo.ArrowType.ArrowTypeID.Timestamp; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.apache.iceberg.types.Types.NestedField.required; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + + +public class ArrowSchemaUtilTest { + + @Test + public void convertPrimitive() { + Schema iceberg = new Schema( + optional(0, "i", Types.IntegerType.get()), + optional(1, "b", BooleanType.get()), + required(2, "d", DoubleType.get()), + required(3, "s", StringType.get()), + optional(4, "d2", DateType.get()), + optional(5, "ts", TimestampType.withoutZone()) + ); + + org.apache.arrow.vector.types.pojo.Schema arrow = ArrowSchemaUtil.convert(iceberg); + + System.out.println(iceberg); + System.out.println(arrow); + + validate(iceberg, arrow); + } + + @Test + public void convertComplex() { + Schema iceberg = new Schema( + optional(0, "m", MapType.ofOptional( + 1, 2, StringType.get(), + LongType.get()) + ), + required(3, "m2", MapType.ofOptional( + 4, 5, StringType.get(), + ListType.ofOptional(6, TimestampType.withoutZone())) + ) + ); + + org.apache.arrow.vector.types.pojo.Schema arrow = ArrowSchemaUtil.convert(iceberg); + + System.out.println(iceberg); + System.out.println(arrow); + + assertEquals(iceberg.columns().size(), arrow.getFields().size()); + } + + private void validate(Schema iceberg, org.apache.arrow.vector.types.pojo.Schema arrow) { + assertEquals(iceberg.columns().size(), arrow.getFields().size()); + + for (Types.NestedField nf : iceberg.columns()) { + Field field = arrow.findField(nf.name()); + assertNotNull("Missing filed: " + nf, field); + + validate(nf.type(), field.getType()); + } + } + + private void validate(Type iceberg, ArrowType arrow) { + switch (iceberg.typeId()) { + case BOOLEAN: assertEquals(Bool, arrow.getTypeID()); + break; + case INTEGER: assertEquals(Int, arrow.getTypeID()); + break; + case LONG: assertEquals(Int, arrow.getTypeID()); + break; + case DOUBLE: assertEquals(FloatingPoint, arrow.getTypeID()); + break; + case STRING: assertEquals(ArrowType.Utf8.INSTANCE.getTypeID(), arrow.getTypeID()); + break; + case DATE: assertEquals(Date, arrow.getTypeID()); + break; + case TIMESTAMP: assertEquals(Timestamp, arrow.getTypeID()); + break; + case MAP: assertEquals(List, arrow.getTypeID()); + break; + default: throw new UnsupportedOperationException("Check not implemented for type: " + iceberg); + } + } +} diff --git a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java index 27a625db4272..b8d59c96eab7 100644 --- a/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java +++ b/data/src/main/java/org/apache/iceberg/data/TableScanIterable.java @@ -43,6 +43,7 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.spark.SparkSchemaUtil; class TableScanIterable extends CloseableGroup implements CloseableIterable { private final TableOperations ops; @@ -89,7 +90,7 @@ private CloseableIterable open(FileScanTask task) { case PARQUET: Parquet.ReadBuilder parquet = Parquet.read(input) - .project(projection) + .project(projection, SparkSchemaUtil.convert(projection)) .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader(projection, fileSchema)) .split(task.start(), task.length()); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 494f2c57885e..9ce8dffa2661 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -55,6 +55,7 @@ import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.parquet.schema.MessageType; +import org.apache.spark.sql.types.StructType; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION; import static org.apache.iceberg.TableProperties.PARQUET_COMPRESSION_DEFAULT; @@ -273,6 +274,7 @@ public static class ReadBuilder { private Long start = null; private Long length = null; private Schema schema = null; + private StructType sparkSchema = null; private Expression filter = null; private ReadSupport readSupport = null; private Function> readerFunc = null; @@ -281,6 +283,7 @@ public static class ReadBuilder { private Map properties = Maps.newHashMap(); private boolean callInit = false; private boolean reuseContainers = false; + private int maxRecordsPerBatch = 1000; private ReadBuilder(InputFile file) { this.file = file; @@ -299,6 +302,12 @@ public ReadBuilder split(long start, long length) { return this; } + public ReadBuilder project(Schema schema, StructType sparkSchema) { + this.schema = schema; + this.sparkSchema = sparkSchema; + return this; + } + public ReadBuilder project(Schema schema) { this.schema = schema; return this; @@ -348,6 +357,12 @@ public ReadBuilder reuseContainers() { return this; } + public ReadBuilder recordsPerBatch(int numRowsPerBatch) { + + this.maxRecordsPerBatch = numRowsPerBatch; + return this; + } + @SuppressWarnings("unchecked") public CloseableIterable build() { if (readerFunc != null) { @@ -374,7 +389,7 @@ public CloseableIterable build() { ParquetReadOptions options = optionsBuilder.build(); return new org.apache.iceberg.parquet.ParquetReader<>( - file, schema, options, readerFunc, filter, reuseContainers, caseSensitive); + file, schema, options, readerFunc, filter, reuseContainers, caseSensitive, sparkSchema, maxRecordsPerBatch); } ParquetReadBuilder builder = new ParquetReadBuilder<>(ParquetIO.file(file)); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvro.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvro.java index 4c315c323400..afcfb7828f45 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvro.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetAvro.java @@ -36,8 +36,8 @@ import org.apache.iceberg.avro.UUIDConversion; import org.apache.iceberg.types.TypeUtil; -class ParquetAvro { - static Schema parquetAvroSchema(Schema avroSchema) { +public class ParquetAvro { + public static Schema parquetAvroSchema(Schema avroSchema) { return AvroSchemaVisitor.visit(avroSchema, new ParquetDecimalSchemaConverter()); } @@ -173,7 +173,7 @@ public GenericFixed toFixed(BigDecimal value, Schema schema, LogicalType type) { } } - static GenericData DEFAULT_MODEL = new SpecificData() { + public static GenericData DEFAULT_MODEL = new SpecificData() { private final Conversion fixedDecimalConversion = new FixedDecimalConversion(); private final Conversion intDecimalConversion = new IntDecimalConversion(); private final Conversion longDecimalConversion = new LongDecimalConversion(); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java index b4a675e04a3d..cbf2d1c8ae31 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetFilters.java @@ -37,9 +37,9 @@ import static org.apache.iceberg.expressions.ExpressionVisitors.visit; -class ParquetFilters { +public class ParquetFilters { - static FilterCompat.Filter convert(Schema schema, Expression expr, boolean caseSensitive) { + public static FilterCompat.Filter convert(Schema schema, Expression expr, boolean caseSensitive) { FilterPredicate pred = visit(expr, new ConvertFilterToParquet(schema, caseSensitive)); // TODO: handle AlwaysFalse.INSTANCE if (pred != null && pred != AlwaysTrue.INSTANCE) { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java index 360a05503ce6..6432483fab31 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIO.java @@ -44,11 +44,11 @@ /** * Methods in this class translate from the IO API to Parquet's IO API. */ -class ParquetIO { +public class ParquetIO { private ParquetIO() { } - static InputFile file(org.apache.iceberg.io.InputFile file) { + public static InputFile file(org.apache.iceberg.io.InputFile file) { // TODO: use reflection to avoid depending on classes from iceberg-hadoop // TODO: use reflection to avoid depending on classes from hadoop if (file instanceof HadoopInputFile) { @@ -62,7 +62,7 @@ static InputFile file(org.apache.iceberg.io.InputFile file) { return new ParquetInputFile(file); } - static OutputFile file(org.apache.iceberg.io.OutputFile file) { + public static OutputFile file(org.apache.iceberg.io.OutputFile file) { if (file instanceof HadoopOutputFile) { HadoopOutputFile hfile = (HadoopOutputFile) file; try { diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIterable.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIterable.java index bc4344872430..7d6d72c4f11b 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIterable.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetIterable.java @@ -31,7 +31,7 @@ public class ParquetIterable extends CloseableGroup implements CloseableIterable { private final ParquetReader.Builder builder; - ParquetIterable(ParquetReader.Builder builder) { + public ParquetIterable(ParquetReader.Builder builder) { this.builder = builder; } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java index 8a0b44c720b4..b0c8a31be283 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReadSupport.java @@ -41,12 +41,12 @@ * * @param Java type produced by this read support instance */ -class ParquetReadSupport extends ReadSupport { +public class ParquetReadSupport extends ReadSupport { private final Schema expectedSchema; private final ReadSupport wrapped; private final boolean callInit; - ParquetReadSupport(Schema expectedSchema, ReadSupport readSupport, boolean callInit) { + public ParquetReadSupport(Schema expectedSchema, ReadSupport readSupport, boolean callInit) { this.expectedSchema = expectedSchema; this.wrapped = readSupport; this.callInit = callInit; diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java index 2ff2bdb441e8..9f1e5fbdc648 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java @@ -23,8 +23,10 @@ import java.io.IOException; import java.util.Iterator; import java.util.List; +import java.util.TimeZone; import java.util.function.Function; import org.apache.iceberg.Schema; +import org.apache.iceberg.arrow.reader.ArrowReader; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -36,6 +38,11 @@ import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.BlockMetaData; import org.apache.parquet.schema.MessageType; +import org.apache.spark.sql.catalyst.InternalRow; +import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.iceberg.parquet.ParquetSchemaUtil.addFallbackIds; import static org.apache.iceberg.parquet.ParquetSchemaUtil.hasIds; @@ -50,18 +57,24 @@ public class ParquetReader extends CloseableGroup implements CloseableIterabl private final Expression filter; private final boolean reuseContainers; private final boolean caseSensitive; + private final StructType sparkSchema; + private final int maxRecordsPerBatch; + private static final Logger LOG = LoggerFactory.getLogger(ParquetReader.class); public ParquetReader(InputFile input, Schema expectedSchema, ParquetReadOptions options, Function> readerFunc, - Expression filter, boolean reuseContainers, boolean caseSensitive) { + Expression filter, boolean reuseContainers, boolean caseSensitive, + StructType sparkSchema, int maxRecordsPerBatch) { this.input = input; this.expectedSchema = expectedSchema; + this.sparkSchema = sparkSchema; this.options = options; this.readerFunc = readerFunc; // replace alwaysTrue with null to avoid extra work evaluating a trivial filter this.filter = filter == Expressions.alwaysTrue() ? null : filter; this.reuseContainers = reuseContainers; this.caseSensitive = caseSensitive; + this.maxRecordsPerBatch = maxRecordsPerBatch; } private static class ReadConf { @@ -185,11 +198,31 @@ private ReadConf init() { @Override public Iterator iterator() { + // create iterator over file FileIterator iter = new FileIterator<>(init()); addCloseable(iter); + return iter; } + private Iterator arrowBatchAsInternalRow(Iterator iter) { + // Convert InterRow iterator to ArrowRecordBatch Iterator + Iterator rowIterator = iter; + ArrowReader.ArrowRecordBatchIterator arrowBatchIter = ArrowReader.toBatchIterator(rowIterator, + sparkSchema, maxRecordsPerBatch, + TimeZone.getDefault().getID()); + addCloseable(arrowBatchIter); + + // Overlay InternalRow iterator over ArrowRecordbatches + ArrowReader.InternalRowOverArrowBatchIterator + rowOverbatchIter = ArrowReader.fromBatchIterator(arrowBatchIter, + sparkSchema, TimeZone.getDefault().getID()); + + addCloseable(rowOverbatchIter); + + return (Iterator)rowOverbatchIter; + } + private static class FileIterator implements Iterator, Closeable { private final ParquetFileReader reader; private final boolean[] shouldSkip; @@ -226,7 +259,11 @@ public T next() { } else { this.last = model.read(null); } - valuesRead += 1; + if (last instanceof ColumnarBatch) { + valuesRead += ((ColumnarBatch)last).numRows(); + } else { + valuesRead += 1; + } return last; } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java index ac61983b2c29..45293674f20b 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetValueReaders.java @@ -26,13 +26,18 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; +import org.apache.arrow.vector.FieldVector; +import org.apache.iceberg.types.Types; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.page.PageReadStore; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.Type; +import org.apache.spark.sql.vectorized.ArrowColumnVector; +import org.apache.spark.sql.vectorized.ColumnarBatch; import static java.util.Collections.emptyIterator; @@ -576,6 +581,85 @@ public V setValue(V value) { } } + + public static class ColumnarBatchReader implements ParquetValueReader { + + private final int numFields; + private final Types.StructType iceExpectedFields; + private final ParquetValueReader[] readers; + private final TripleIterator column; + private final TripleIterator[] columns; + private final List> children; + + @SuppressWarnings("unchecked") + public ColumnarBatchReader(List types, + Types.StructType icebergExpectedFields, + List> readers) { + + this.numFields = readers.size(); + this.iceExpectedFields = icebergExpectedFields; + this.readers = (ParquetValueReader[]) Array.newInstance( + ParquetValueReader.class, readers.size()); + this.columns = (TripleIterator[]) Array.newInstance(TripleIterator.class, readers.size()); + + + ImmutableList.Builder> columnsBuilder = ImmutableList.builder(); + for (int i = 0; i < readers.size(); i += 1) { + ParquetValueReader reader = readers.get(i); + this.readers[i] = readers.get(i); + this.columns[i] = reader.column(); + columnsBuilder.addAll(reader.columns()); + } + + this.children = columnsBuilder.build(); + if (children.size() > 0) { + this.column = children.get(0); + } else { + this.column = NullReader.NULL_COLUMN; + } + + } + + @Override + public final void setPageSource(PageReadStore pageStore) { + for (int i = 0; i < readers.length; i += 1) { + readers[i].setPageSource(pageStore); + } + } + + @Override + public final TripleIterator column() { + return column; + } + + @Override + public List> columns() { + return children; + } + + + @Override + public final ColumnarBatch read(ColumnarBatch ignore) { + + ArrowColumnVector[] arrowVectorArr = (ArrowColumnVector[])Array.newInstance(ArrowColumnVector.class, + readers.length); + + int numRows=0; + for (int i = 0; i < readers.length; i += 1) { + + FieldVector vec = readers[i].read(null); + arrowVectorArr[i] = new ArrowColumnVector(vec); + numRows = vec.getValueCount(); + } + + ColumnarBatch batch = new ColumnarBatch(arrowVectorArr); + batch.setNumRows(numRows); + + return batch; + } + + } + public abstract static class StructReader implements ParquetValueReader { private interface Setter { void set(R record, int pos, Object reuse); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteSupport.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteSupport.java index 633f9f80cecc..d097c67d9ab6 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteSupport.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriteSupport.java @@ -26,12 +26,12 @@ import org.apache.parquet.io.api.RecordConsumer; import org.apache.parquet.schema.MessageType; -class ParquetWriteSupport extends WriteSupport { +public class ParquetWriteSupport extends WriteSupport { private final MessageType type; private final Map keyValueMetadata; private final WriteSupport wrapped; - ParquetWriteSupport(MessageType type, Map keyValueMetadata, WriteSupport writeSupport) { + public ParquetWriteSupport(MessageType type, Map keyValueMetadata, WriteSupport writeSupport) { this.type = type; this.keyValueMetadata = keyValueMetadata; this.wrapped = writeSupport; diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java index c0956a11684e..e8f95ea0cb4d 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetWriter.java @@ -47,7 +47,7 @@ import static java.lang.Math.min; import static org.apache.iceberg.parquet.ParquetSchemaUtil.convert; -class ParquetWriter implements FileAppender, Closeable { +public class ParquetWriter implements FileAppender, Closeable { private static final DynConstructors.Ctor pageStoreCtor = DynConstructors .builder(PageWriteStore.class) .hiddenImpl("org.apache.parquet.hadoop.ColumnChunkPageWriteStore", diff --git a/settings.gradle b/settings.gradle index 4145cfe01fa4..e5df47b97282 100644 --- a/settings.gradle +++ b/settings.gradle @@ -25,6 +25,7 @@ include 'data' include 'orc' include 'parquet' include 'spark' +include 'arrow' include 'pig' include 'runtime' include 'hive' @@ -36,6 +37,7 @@ project(':data').name = 'iceberg-data' project(':orc').name = 'iceberg-orc' project(':parquet').name = 'iceberg-parquet' project(':spark').name = 'iceberg-spark' +project(':arrow').name = 'iceberg-arrow' project(':pig').name = 'iceberg-pig' project(':runtime').name = 'iceberg-spark-runtime' project(':hive').name = 'iceberg-hive' diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index 9a36266ffdf2..bb9330577935 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -59,6 +59,7 @@ import org.apache.spark.sql.catalyst.util.MapData; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; +import org.apache.spark.sql.vectorized.ColumnarBatch; import org.apache.spark.unsafe.types.CalendarInterval; import org.apache.spark.unsafe.types.UTF8String; @@ -112,7 +113,7 @@ public ParquetValueReader struct(Types.StructType ignored, GroupType struct, } private static class ReadBuilder extends TypeWithSchemaVisitor> { - private final MessageType type; + protected final MessageType type; ReadBuilder(MessageType type) { this.type = type; @@ -360,7 +361,7 @@ public long readLong() { } } - private static class StringReader extends PrimitiveReader { + protected static class StringReader extends PrimitiveReader { StringReader(ColumnDescriptor desc) { super(desc); } diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedParquetValueReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedParquetValueReaders.java new file mode 100644 index 000000000000..ad08b98cffe7 --- /dev/null +++ b/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedParquetValueReaders.java @@ -0,0 +1,411 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.data.vector; + +import java.math.BigDecimal; +import java.math.BigInteger; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.BitVector; +import org.apache.arrow.vector.DateDayVector; +import org.apache.arrow.vector.DecimalVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.Float4Vector; +import org.apache.arrow.vector.Float8Vector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.TimeStampMicroTZVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.iceberg.arrow.ArrowSchemaUtil; +import org.apache.iceberg.parquet.ParquetValueReaders; +import org.apache.iceberg.types.Types; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.schema.Type; +import org.apache.spark.sql.types.Decimal; + +/*** + * Parquet Value Reader implementations for Vectorization. + * Contains type-wise readers to read parquet data as vectors. + * - Returns Arrow's Field Vector for each type. + * - Null values are explicitly handled. + * - Type serialization is done based on types in Arrow. + * - Creates One Vector per RowGroup. So a Batch would have as many rows as there are in the underlying RowGroup. + * - Mapping of Iceberg type to Arrow type is done in ArrowSchemaUtil.convert() + * - Iceberg to Arrow Type mapping : + * icebergType : LONG -> Field Vector Type : org.apache.arrow.vector.BigIntVector + * icebergType : STRING -> Field Vector Type : org.apache.arrow.vector.VarCharVector + * icebergType : BOOLEAN -> Field Vector Type : org.apache.arrow.vector.BitVector + * icebergType : INTEGER -> Field Vector Type : org.apache.arrow.vector.IntVector + * icebergType : FLOAT -> Field Vector Type : org.apache.arrow.vector.Float4Vector + * icebergType : DOUBLE -> Field Vector Type : org.apache.arrow.vector.Float8Vector + * icebergType : DATE -> Field Vector Type : org.apache.arrow.vector.DateDayVector + * icebergType : TIMESTAMP -> Field Vector Type : org.apache.arrow.vector.TimeStampMicroTZVector + * icebergType : STRING -> Field Vector Type : org.apache.arrow.vector.VarCharVector + * icebergType : BINARY -> Field Vector Type : org.apache.arrow.vector.VarBinaryVector + * icebergField : DECIMAL -> Field Vector Type : org.apache.arrow.vector.DecimalVector + */ +public class VectorizedParquetValueReaders { + + public abstract static class VectorReader extends ParquetValueReaders.PrimitiveReader { + + protected FieldVector vec; + protected boolean isOptional; + + VectorReader(ColumnDescriptor desc, + Types.NestedField icebergField, + RootAllocator rootAlloc) { + + super(desc); + this.vec = ArrowSchemaUtil.convert(icebergField).createVector(rootAlloc); + this.isOptional = desc.getPrimitiveType().isRepetition(Type.Repetition.OPTIONAL); + // System.out.println("=> icebergField : "+icebergField.type().typeId().name()+" , Field Vector Type : "+vec.getClass().getName()); + } + + @Override + public FieldVector read(FieldVector ignore) { + + vec.reset(); + int i=0; + + while(column.hasNext()) { + // Todo: this check works for flat schemas only + // need to get max definition level to do proper check + if(isOptional && column.currentDefinitionLevel() == 0) { + // handle null + column.nextNull(); + nextNullAt(i); + } else { + nextValueAt(i); + } + i++; + } + vec.setValueCount(i); + return vec; + } + + + public int getRowCount() { + return vec.getValueCount(); + } + + protected abstract void nextNullAt(int i); + + protected abstract void nextValueAt(int i); + } + + protected static class StringReader extends VectorReader { + + StringReader(ColumnDescriptor desc, Types.NestedField icebergField, RootAllocator rootAlloc) { + super(desc, icebergField, rootAlloc); + } + + @Override + protected void nextNullAt(int i) { + ((VarCharVector) vec).setNull(i); + } + + @Override + protected void nextValueAt(int i) { + + Binary binary = column.nextBinary(); + if (binary == null) { + + ((VarCharVector) vec).setNull(i); + + } else { + String utf8Str = binary.toStringUsingUTF8(); + ((VarCharVector) vec).setSafe(i, utf8Str.getBytes()); + } + } + + } + + protected static class IntegerReader extends VectorReader { + + IntegerReader(ColumnDescriptor desc, + Types.NestedField icebergField, + RootAllocator rootAlloc) { + + super(desc, icebergField, rootAlloc); + } + + @Override + protected void nextNullAt(int i) { + ((IntVector) vec).setNull(i); + } + + protected void nextValueAt(int i) { + + int intValue = column.nextInteger(); + ((IntVector)vec).setSafe(i, intValue); + + } + } + + protected static class LongReader extends VectorReader { + + LongReader(ColumnDescriptor desc, + Types.NestedField icebergField, + RootAllocator rootAlloc) { + + super(desc, icebergField, rootAlloc); + } + + protected void nextNullAt(int i) { + ((BigIntVector)vec).setNull(i); + } + + protected void nextValueAt(int i) { + + long longValue = column.nextLong(); + ((BigIntVector)vec).setSafe(i, longValue); + + } + } + + protected static class TimestampMillisReader extends LongReader { + + TimestampMillisReader(ColumnDescriptor desc, + Types.NestedField icebergField, + RootAllocator rootAlloc) { + super(desc, icebergField, rootAlloc); + } + + protected void nextValueAt(int i) { + + long longValue = column.nextLong(); + ((BigIntVector)vec).setSafe(i, 1000 * longValue); + + } + } + + protected static class TimestampMicroReader extends VectorReader { + + TimestampMicroReader(ColumnDescriptor desc, + Types.NestedField icebergField, + RootAllocator rootAlloc) { + super(desc, icebergField, rootAlloc); + } + + protected void nextNullAt(int i) { + ((TimeStampMicroTZVector)vec).setNull(i); + } + + protected void nextValueAt(int i) { + + long longValue = column.nextLong(); + ((TimeStampMicroTZVector)vec).setSafe(i, longValue); + + } + } + + protected static class BooleanReader extends VectorReader { + + BooleanReader(ColumnDescriptor desc, + Types.NestedField icebergField, + RootAllocator rootAlloc) { + super(desc, icebergField, rootAlloc); + } + + protected void nextNullAt(int i) { + ((BitVector)vec).setNull(i); + } + + protected void nextValueAt(int i) { + + boolean bool = column.nextBoolean(); + ((BitVector)vec).setSafe(i, bool ? 1 : 0); + + } + } + + + + protected static class FloatReader extends VectorReader { + + FloatReader(ColumnDescriptor desc, + Types.NestedField icebergField, + RootAllocator rootAlloc) { + super(desc, icebergField, rootAlloc); + } + + protected void nextNullAt(int i) { + ((Float4Vector)vec).setNull(i); + } + + protected void nextValueAt(int i) { + + float floatValue = column.nextFloat(); + ((Float4Vector)vec).setSafe(i, floatValue); + + } + } + + protected static class DoubleReader extends VectorReader { + + DoubleReader(ColumnDescriptor desc, + Types.NestedField icebergField, + RootAllocator rootAlloc) { + super(desc, icebergField, rootAlloc); + } + + protected void nextNullAt(int i) { + ((Float8Vector)vec).setNull(i); + } + + protected void nextValueAt(int i) { + + double doubleValue = column.nextDouble(); + ((Float8Vector)vec).setSafe(i, doubleValue); + + } + } + + + protected static class BinaryReader extends VectorReader { + + BinaryReader(ColumnDescriptor desc, + Types.NestedField icebergField, + RootAllocator rootAlloc) { + super(desc, icebergField, rootAlloc); + } + + protected void nextNullAt(int i) { + ((VarBinaryVector)vec).setNull(i); + } + + protected void nextValueAt(int i) { + + Binary binaryValue = column.nextBinary(); + ((VarBinaryVector)vec).setSafe(i, binaryValue.getBytes()); + + } + } + + + + protected static class DateReader extends VectorReader { + + DateReader(ColumnDescriptor desc, + Types.NestedField icebergField, + RootAllocator rootAlloc) { + super(desc, icebergField, rootAlloc); + } + + protected void nextNullAt(int i) { + ((DateDayVector)vec).setNull(i); + } + + protected void nextValueAt(int i) { + + int dateValue = column.nextInteger(); + ((DateDayVector)vec).setSafe(i, dateValue); + + } + } + + + protected static class IntegerDecimalReader extends VectorReader { + private final int precision; + private final int scale; + + IntegerDecimalReader(ColumnDescriptor desc, + Types.NestedField icebergField, + RootAllocator rootAlloc, + int precision, int scale) { + + super(desc, icebergField, rootAlloc); + this.precision = precision; + this.scale = scale; + } + + protected void nextNullAt(int i) { + ((DecimalVector)vec).setNull(i); + } + + protected void nextValueAt(int i) { + + int decimalIntValue = column.nextInteger(); + Decimal decimalValue = Decimal.apply(decimalIntValue, precision, scale); + + ((DecimalVector)vec).setSafe(i, decimalValue.toJavaBigDecimal()); + + } + } + + protected static class LongDecimalReader extends VectorReader { + private final int precision; + private final int scale; + + LongDecimalReader(ColumnDescriptor desc, + Types.NestedField icebergField, + RootAllocator rootAlloc, + int precision, int scale) { + + super(desc, icebergField, rootAlloc); + this.precision = precision; + this.scale = scale; + } + + protected void nextNullAt(int i) { + ((DecimalVector)vec).setNull(i); + } + + protected void nextValueAt(int i) { + + long decimalLongValue = column.nextLong(); + Decimal decimalValue = Decimal.apply(decimalLongValue, precision, scale); + + ((DecimalVector)vec).setSafe(i, decimalValue.toJavaBigDecimal()); + + } + } + + + + protected static class BinaryDecimalReader extends VectorReader { + private final int precision; + private final int scale; + + BinaryDecimalReader(ColumnDescriptor desc, + Types.NestedField icebergField, + RootAllocator rootAlloc, + int precision, int scale) { + + super(desc, icebergField, rootAlloc); + this.precision = precision; + this.scale = scale; + } + + protected void nextNullAt(int i) { + ((DecimalVector)vec).setNull(i); + } + + protected void nextValueAt(int i) { + + Binary binaryValue = column.nextBinary(); + Decimal decimalValue = Decimal.fromDecimal(new BigDecimal(new BigInteger(binaryValue.getBytes()), scale)); + + ((DecimalVector)vec).setSafe(i, decimalValue.toJavaBigDecimal()); + + } + } +} diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java new file mode 100644 index 000000000000..91f69116baa7 --- /dev/null +++ b/spark/src/main/java/org/apache/iceberg/spark/data/vector/VectorizedSparkParquetReaders.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.data.vector; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.FieldVector; +import org.apache.iceberg.Schema; +import org.apache.iceberg.arrow.ArrowSchemaUtil; +import org.apache.iceberg.parquet.ParquetValueReader; +import org.apache.iceberg.parquet.ParquetValueReaders; +import org.apache.iceberg.parquet.TypeWithSchemaVisitor; +import org.apache.iceberg.types.Types; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.schema.DecimalMetadata; +import org.apache.parquet.schema.GroupType; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; +import org.apache.parquet.schema.Type; +import org.apache.spark.sql.vectorized.ColumnarBatch; + +public class VectorizedSparkParquetReaders { + + @SuppressWarnings("unchecked") + public static ParquetValueReader buildReader( + Schema tableSchema, + Schema expectedSchema, + MessageType fileSchema) { + + return (ParquetValueReader) + TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, + new ReadBuilder(tableSchema, expectedSchema, fileSchema)); + } + + private static class ReadBuilder extends TypeWithSchemaVisitor> { + protected final MessageType parquetSchema; + protected final Schema projectedIcebergSchema; + protected final Schema tableIcebergSchema; + protected final org.apache.arrow.vector.types.pojo.Schema arrowSchema; + protected final RootAllocator rootAllocator; + + ReadBuilder(Schema tableSchema, Schema projectedIcebergSchema, MessageType parquetSchema) { + this.parquetSchema = parquetSchema; + this.tableIcebergSchema = tableSchema; + this.projectedIcebergSchema = projectedIcebergSchema; + this.arrowSchema = ArrowSchemaUtil.convert(projectedIcebergSchema); + this.rootAllocator = new RootAllocator(Long.MAX_VALUE); + } + + @Override + public ParquetValueReader message(Types.StructType expected, MessageType message, + List> fieldReaders) { + return struct(expected, message.asGroupType(), fieldReaders); + } + + @Override + public ParquetValueReader struct(Types.StructType expected, GroupType struct, + List> fieldReaders) { + + // this works on struct fields and the root iceberg schema which itself is a struct. + + // match the expected struct's order + Map> readersById = Maps.newHashMap(); + Map typesById = Maps.newHashMap(); + List fields = struct.getFields(); + + for (int i = 0; i < fields.size(); i += 1) { + Type fieldType = fields.get(i); + int fieldD = parquetSchema.getMaxDefinitionLevel(path(fieldType.getName())) - 1; + int id = fieldType.getId().intValue(); + // Todo: figure out optional vield reading for vectorized reading + // readersById.put(id, (ParquetValueReader)ParquetValueReaders. + // option(fieldType, fieldD, fieldReaders.get(i))); + + readersById.put(id, (ParquetValueReader)fieldReaders.get(i)); + typesById.put(id, fieldType); + } + + List icebergFields = expected != null ? + expected.fields() : ImmutableList.of(); + + List> reorderedFields = Lists.newArrayListWithExpectedSize( + icebergFields.size()); + + List types = Lists.newArrayListWithExpectedSize(icebergFields.size()); + + for (Types.NestedField field : icebergFields) { + int id = field.fieldId(); + ParquetValueReader reader = readersById.get(id); + if (reader != null) { + reorderedFields.add(reader); + types.add(typesById.get(id)); + } else { + reorderedFields.add(ParquetValueReaders.nulls()); + types.add(null); + } + } + + return new ParquetValueReaders.ColumnarBatchReader(types, expected, reorderedFields); + } + + + @Override + public ParquetValueReader primitive(org.apache.iceberg.types.Type.PrimitiveType expected, + PrimitiveType primitive) { + + // Create arrow vector for this field + int parquetFieldId = primitive.getId().intValue(); + ColumnDescriptor desc = parquetSchema.getColumnDescription(currentPath()); + Types.NestedField icebergField = tableIcebergSchema.findField(parquetFieldId); + // int fieldD = parquetSchema.getMaxDefinitionLevel(path(primitive.getName())) - 1; + // Field field = ArrowSchemaUtil.convert(projectedIcebergSchema.findField(parquetFieldId)); + // FieldVector vec = field.createVector(rootAllocator); + + if (primitive.getOriginalType() != null) { + switch (primitive.getOriginalType()) { + case ENUM: + case JSON: + case UTF8: + return new VectorizedParquetValueReaders.StringReader(desc, icebergField, rootAllocator); + case INT_8: + case INT_16: + case INT_32: + return new VectorizedParquetValueReaders.IntegerReader(desc, icebergField, rootAllocator); + // if (expected != null && expected.typeId() == Types.LongType.get().typeId()) { + // return new ParquetValueReaders.IntAsLongReader(desc); + // } else { + // return new ParquetValueReaders.UnboxedReader(desc); + // } + case DATE: + return new VectorizedParquetValueReaders.DateReader(desc, icebergField, rootAllocator); + case INT_64: + return new VectorizedParquetValueReaders.LongReader(desc, icebergField, rootAllocator); + case TIMESTAMP_MICROS: + return new VectorizedParquetValueReaders.TimestampMicroReader(desc, icebergField, rootAllocator); + case TIMESTAMP_MILLIS: + return new VectorizedParquetValueReaders.TimestampMillisReader(desc, icebergField, rootAllocator); + case DECIMAL: + DecimalMetadata decimal = primitive.getDecimalMetadata(); + switch (primitive.getPrimitiveTypeName()) { + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + return new VectorizedParquetValueReaders.BinaryDecimalReader(desc, icebergField, rootAllocator, + decimal.getPrecision(), + decimal.getScale()); + case INT64: + return new VectorizedParquetValueReaders.LongDecimalReader(desc, icebergField, rootAllocator, + decimal.getPrecision(), + decimal.getScale()); + case INT32: + return new VectorizedParquetValueReaders.IntegerDecimalReader(desc, icebergField, rootAllocator, + decimal.getPrecision(), + decimal.getScale()); + default: + throw new UnsupportedOperationException( + "Unsupported base type for decimal: " + primitive.getPrimitiveTypeName()); + } + case BSON: + return new VectorizedParquetValueReaders.BinaryReader(desc, icebergField, rootAllocator); + default: + throw new UnsupportedOperationException( + "Unsupported logical type: " + primitive.getOriginalType()); + } + } + + switch (primitive.getPrimitiveTypeName()) { + case FIXED_LEN_BYTE_ARRAY: + case BINARY: + return new VectorizedParquetValueReaders.BinaryReader(desc, icebergField, rootAllocator); + case INT32: + return new VectorizedParquetValueReaders.IntegerReader(desc, icebergField, rootAllocator); + case FLOAT: + return new VectorizedParquetValueReaders.FloatReader(desc, icebergField, rootAllocator); + // if (expected != null && expected.typeId() == org.apache.iceberg.types.Type.TypeID.DOUBLE) { + // return new ParquetValueReaders.FloatAsDoubleReader(desc); + // } else { + // return new ParquetValueReaders.UnboxedReader<>(desc); + // } + case BOOLEAN: + return new VectorizedParquetValueReaders.BooleanReader(desc, icebergField, rootAllocator); + case INT64: + return new VectorizedParquetValueReaders.LongReader(desc, icebergField, rootAllocator); + case DOUBLE: + return new VectorizedParquetValueReaders.DoubleReader(desc, icebergField, rootAllocator); + default: + throw new UnsupportedOperationException("Unsupported type: " + primitive); + } + } + + private String[] currentPath() { + String[] path = new String[fieldNames.size()]; + if (!fieldNames.isEmpty()) { + Iterator iter = fieldNames.descendingIterator(); + for (int i = 0; iter.hasNext(); i += 1) { + path[i] = iter.next(); + } + } + + return path; + } + + protected MessageType type() { + return parquetSchema; + } + + protected String[] path(String name) { + String[] path = new String[fieldNames.size() + 1]; + path[fieldNames.size()] = name; + + if (!fieldNames.isEmpty()) { + Iterator iter = fieldNames.descendingIterator(); + for (int i = 0; iter.hasNext(); i += 1) { + path[i] = iter.next(); + } + } + + return path; + } + } + + + + +} diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java index b6dfade9a09d..b3edfc2827eb 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/IcebergSource.java @@ -46,11 +46,14 @@ import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter; import org.apache.spark.sql.streaming.OutputMode; import org.apache.spark.sql.types.StructType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class IcebergSource implements DataSourceV2, ReadSupport, WriteSupport, DataSourceRegister, StreamWriteSupport { private SparkSession lazySpark = null; private Configuration lazyConf = null; + private static final Logger LOG = LoggerFactory.getLogger(IcebergSource.class); @Override public String shortName() { @@ -98,31 +101,31 @@ protected Table findTable(DataSourceOptions options, Configuration conf) { Optional path = options.get("path"); Preconditions.checkArgument(path.isPresent(), "Cannot open table: path is not set"); - if (path.get().contains("/")) { - HadoopTables tables = new HadoopTables(conf); - return tables.load(path.get()); - } else { - HiveCatalog hiveCatalog = HiveCatalogs.loadCatalog(conf); - TableIdentifier tableIdentifier = TableIdentifier.parse(path.get()); - return hiveCatalog.loadTable(tableIdentifier); - } + // if (path.get().contains("/")) { + HadoopTables tables = new HadoopTables(conf); + return tables.load(path.get()); + // } else { + // HiveCatalog hiveCatalog = HiveCatalogs.loadCatalog(conf); + // TableIdentifier tableIdentifier = TableIdentifier.parse(path.get()); + // return hiveCatalog.loadTable(tableIdentifier); + // } } - private SparkSession lazySparkSession() { + protected SparkSession lazySparkSession() { if (lazySpark == null) { this.lazySpark = SparkSession.builder().getOrCreate(); } return lazySpark; } - private Configuration lazyBaseConf() { + protected Configuration lazyBaseConf() { if (lazyConf == null) { this.lazyConf = lazySparkSession().sparkContext().hadoopConfiguration(); } return lazyConf; } - private Table getTableAndResolveHadoopConfiguration( + protected Table getTableAndResolveHadoopConfiguration( DataSourceOptions options, Configuration conf) { // Overwrite configurations from the Spark Context with configurations from the options. mergeIcebergHadoopConfs(conf, options.asMap()); @@ -134,7 +137,7 @@ private Table getTableAndResolveHadoopConfiguration( return table; } - private static void mergeIcebergHadoopConfs( + protected static void mergeIcebergHadoopConfs( Configuration baseConf, Map options) { options.keySet().stream() .filter(key -> key.startsWith("hadoop.")) diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java index 91359ffd6800..75d72f1c465e 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java @@ -22,7 +22,6 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; -import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import java.io.Closeable; import java.io.IOException; @@ -31,6 +30,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.function.Function; import org.apache.iceberg.CombinedScanTask; @@ -59,15 +59,13 @@ import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.data.SparkAvroReader; import org.apache.iceberg.spark.data.SparkOrcReader; -import org.apache.iceberg.spark.data.SparkParquetReaders; -import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.spark.data.vector.VectorizedSparkParquetReaders; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ByteBuffers; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.Attribute; import org.apache.spark.sql.catalyst.expressions.AttributeReference; import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; -import org.apache.spark.sql.catalyst.expressions.JoinedRow; import org.apache.spark.sql.catalyst.expressions.UnsafeProjection; import org.apache.spark.sql.sources.Filter; import org.apache.spark.sql.sources.v2.DataSourceOptions; @@ -78,6 +76,7 @@ import org.apache.spark.sql.sources.v2.reader.SupportsPushDownFilters; import org.apache.spark.sql.sources.v2.reader.SupportsPushDownRequiredColumns; import org.apache.spark.sql.sources.v2.reader.SupportsReportStatistics; +import org.apache.spark.sql.sources.v2.reader.SupportsScanColumnarBatch; import org.apache.spark.sql.types.BinaryType; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.Decimal; @@ -85,14 +84,15 @@ import org.apache.spark.sql.types.StringType; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.vectorized.ColumnarBatch; import org.apache.spark.unsafe.types.UTF8String; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import scala.collection.JavaConverters; -class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushDownRequiredColumns, +class Reader implements DataSourceReader, + SupportsScanColumnarBatch, + SupportsPushDownFilters, + SupportsPushDownRequiredColumns, SupportsReportStatistics { - private static final Logger LOG = LoggerFactory.getLogger(Reader.class); private static final Filter[] NO_FILTERS = new Filter[0]; @@ -102,23 +102,33 @@ class Reader implements DataSourceReader, SupportsPushDownFilters, SupportsPushD private final FileIO fileIo; private final EncryptionManager encryptionManager; private final boolean caseSensitive; + private int numRecordsPerBatch; private StructType requestedSchema = null; private List filterExpressions = null; private Filter[] pushedFilters = NO_FILTERS; // lazy variables - private Schema schema = null; + private Schema schema; private StructType type = null; // cached because Spark accesses it multiple times private List tasks = null; // lazy cache of tasks + private static final int DEFAULT_NUM_RECORDS_PER_BATCH = 1000; Reader(Table table, boolean caseSensitive, DataSourceOptions options) { this.table = table; this.snapshotId = options.get("snapshot-id").map(Long::parseLong).orElse(null); this.asOfTimestamp = options.get("as-of-timestamp").map(Long::parseLong).orElse(null); + Optional numRecordsPerBatchOpt = options.get("iceberg.read.numrecordsperbatch"); + this.numRecordsPerBatch = DEFAULT_NUM_RECORDS_PER_BATCH; + if (numRecordsPerBatchOpt.isPresent()) { + this.numRecordsPerBatch = Integer.parseInt(numRecordsPerBatchOpt.get()); + } + // LOG.info("[IcebergSource] => Reading numRecordsPerBatch = "+numRecordsPerBatch); + if (snapshotId != null && asOfTimestamp != null) { throw new IllegalArgumentException( "Cannot scan using both snapshot-id and as-of-timestamp to select the table snapshot"); } + this.schema = table.schema(); this.fileIo = table.io(); this.encryptionManager = table.encryption(); @@ -149,19 +159,36 @@ public StructType readSchema() { } @Override - public List> planInputPartitions() { + public List> planBatchInputPartitions() { String tableSchemaString = SchemaParser.toJson(table.schema()); String expectedSchemaString = SchemaParser.toJson(lazySchema()); - List> readTasks = Lists.newArrayList(); + List> readTasks = Lists.newArrayList(); for (CombinedScanTask task : tasks()) { readTasks.add( - new ReadTask(task, tableSchemaString, expectedSchemaString, fileIo, encryptionManager, caseSensitive)); + new ReadTask(task, tableSchemaString, expectedSchemaString, fileIo, encryptionManager, + caseSensitive, numRecordsPerBatch)); } return readTasks; } + @Override + public List> planInputPartitions() { + return null; + // String tableSchemaString = SchemaParser.toJson(table.schema()); + // String expectedSchemaString = SchemaParser.toJson(lazySchema()); + // + // List> readTasks = Lists.newArrayList(); + // for (CombinedScanTask task : tasks()) { + // readTasks.add( + // new ReadTask(task, tableSchemaString, expectedSchemaString, fileIo, encryptionManager, + // caseSensitive, numRecordsPerBatch)); + // } + // + // return readTasks; + } + @Override public Filter[] pushFilters(Filter[] filters) { this.tasks = null; // invalidate cached tasks, if present @@ -256,32 +283,34 @@ public String toString() { table, lazySchema().asStruct(), filterExpressions, caseSensitive); } - private static class ReadTask implements InputPartition, Serializable { + private static class ReadTask implements InputPartition, Serializable { private final CombinedScanTask task; private final String tableSchemaString; private final String expectedSchemaString; private final FileIO fileIo; private final EncryptionManager encryptionManager; private final boolean caseSensitive; + private final int numRecordsPerBatch; private transient Schema tableSchema = null; private transient Schema expectedSchema = null; private ReadTask( CombinedScanTask task, String tableSchemaString, String expectedSchemaString, FileIO fileIo, - EncryptionManager encryptionManager, boolean caseSensitive) { + EncryptionManager encryptionManager, boolean caseSensitive, int numRecordsPerBatch) { this.task = task; this.tableSchemaString = tableSchemaString; this.expectedSchemaString = expectedSchemaString; this.fileIo = fileIo; this.encryptionManager = encryptionManager; this.caseSensitive = caseSensitive; + this.numRecordsPerBatch = numRecordsPerBatch; } @Override - public InputPartitionReader createPartitionReader() { + public InputPartitionReader createPartitionReader() { return new TaskDataReader(task, lazyTableSchema(), lazyExpectedSchema(), fileIo, - encryptionManager, caseSensitive); + encryptionManager, caseSensitive, numRecordsPerBatch); } private Schema lazyTableSchema() { @@ -299,7 +328,7 @@ private Schema lazyExpectedSchema() { } } - private static class TaskDataReader implements InputPartitionReader { + private static class TaskDataReader implements InputPartitionReader { // for some reason, the apply method can't be called from Java without reflection private static final DynMethods.UnboundMethod APPLY_PROJECTION = DynMethods.builder("apply") .impl(UnsafeProjection.class, InternalRow.class) @@ -311,13 +340,14 @@ private static class TaskDataReader implements InputPartitionReader private final FileIO fileIo; private final Map inputFiles; private final boolean caseSensitive; + private final int numRecordsPerBatch; - private Iterator currentIterator = null; + private Iterator currentIterator; private Closeable currentCloseable = null; - private InternalRow current = null; + private ColumnarBatch current = null; TaskDataReader(CombinedScanTask task, Schema tableSchema, Schema expectedSchema, FileIO fileIo, - EncryptionManager encryptionManager, boolean caseSensitive) { + EncryptionManager encryptionManager, boolean caseSensitive, int numRecordsPerBatch) { this.fileIo = fileIo; this.tasks = task.files().iterator(); this.tableSchema = tableSchema; @@ -333,6 +363,7 @@ private static class TaskDataReader implements InputPartitionReader // open last because the schemas and fileIo must be set this.currentIterator = open(tasks.next()); this.caseSensitive = caseSensitive; + this.numRecordsPerBatch = numRecordsPerBatch; } @Override @@ -353,7 +384,7 @@ public boolean next() throws IOException { } @Override - public InternalRow get() { + public ColumnarBatch get() { return current; } @@ -368,7 +399,7 @@ public void close() throws IOException { } } - private Iterator open(FileScanTask task) { + private Iterator open(FileScanTask task) { DataFile file = task.file(); // schema or rows returned by readers @@ -383,23 +414,24 @@ private Iterator open(FileScanTask task) { boolean hasExtraFilterColumns = requiredSchema.columns().size() != finalSchema.columns().size(); Schema iterSchema; - Iterator iter; - - if (hasJoinedPartitionColumns) { - // schema used to read data files - Schema readSchema = TypeUtil.selectNot(requiredSchema, idColumns); - Schema partitionSchema = TypeUtil.select(requiredSchema, idColumns); - PartitionRowConverter convertToRow = new PartitionRowConverter(partitionSchema, spec); - JoinedRow joined = new JoinedRow(); - - InternalRow partition = convertToRow.apply(file.partition()); - joined.withRight(partition); - - // create joined rows and project from the joined schema to the final schema - iterSchema = TypeUtil.join(readSchema, partitionSchema); - iter = Iterators.transform(open(task, readSchema), joined::withLeft); - - } else if (hasExtraFilterColumns) { + Iterator iter; + + // if (hasJoinedPartitionColumns) { + // // schema used to read data files + // Schema readSchema = TypeUtil.selectNot(requiredSchema, idColumns); + // Schema partitionSchema = TypeUtil.select(requiredSchema, idColumns); + // PartitionRowConverter convertToRow = new PartitionRowConverter(partitionSchema, spec); + // JoinedRow joined = new JoinedRow(); + // + // InternalRow partition = convertToRow.apply(file.partition()); + // joined.withRight(partition); + // + // // create joined rows and project from the joined schema to the final schema + // iterSchema = TypeUtil.join(readSchema, partitionSchema); + // iter = Iterators.transform(open(task, readSchema), joined::withLeft); + // + // } else if (hasExtraFilterColumns) { + if (hasExtraFilterColumns) { // add projection to the final schema iterSchema = requiredSchema; iter = open(task, requiredSchema); @@ -411,36 +443,37 @@ private Iterator open(FileScanTask task) { } // TODO: remove the projection by reporting the iterator's schema back to Spark - return Iterators.transform(iter, - APPLY_PROJECTION.bind(projection(finalSchema, iterSchema))::invoke); + // return Iterators.transform(iter, + // APPLY_PROJECTION.bind(projection(finalSchema, iterSchema))::invoke); + return iter; } - private Iterator open(FileScanTask task, Schema readSchema) { - CloseableIterable iter; - if (task.isDataTask()) { - iter = newDataIterable(task.asDataTask(), readSchema); - - } else { - InputFile location = inputFiles.get(task.file().path().toString()); - Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask"); - - switch (task.file().format()) { - case PARQUET: - iter = newParquetIterable(location, task, readSchema); - break; - - case AVRO: - iter = newAvroIterable(location, task, readSchema); - break; - - case ORC: - iter = newOrcIterable(location, task, readSchema); - break; - - default: - throw new UnsupportedOperationException( - "Cannot read unknown format: " + task.file().format()); - } + private Iterator open(FileScanTask task, Schema readSchema) { + CloseableIterable iter; + // if (task.isDataTask()) { + // iter = newDataIterable(task.asDataTask(), readSchema); + // + // } else { + InputFile location = inputFiles.get(task.file().path().toString()); + Preconditions.checkNotNull(location, "Could not find InputFile associated with FileScanTask"); + + switch (task.file().format()) { + case PARQUET: + iter = newParquetIterable(location, task, readSchema); + break; + // + // case AVRO: + // iter = newAvroIterable(location, task, readSchema); + // break; + // + // case ORC: + // iter = newOrcIterable(location, task, readSchema); + // break; + // + default: + throw new UnsupportedOperationException( + "Cannot read unknown format: " + task.file().format()); + // } } this.currentCloseable = iter; @@ -481,15 +514,17 @@ private CloseableIterable newAvroIterable(InputFile location, .build(); } - private CloseableIterable newParquetIterable(InputFile location, + private CloseableIterable newParquetIterable(InputFile location, FileScanTask task, Schema readSchema) { return Parquet.read(location) - .project(readSchema) + .project(readSchema, SparkSchemaUtil.convert(readSchema)) .split(task.start(), task.length()) - .createReaderFunc(fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema)) + .createReaderFunc(fileSchema -> VectorizedSparkParquetReaders.buildReader(tableSchema, readSchema, + fileSchema)) .filter(task.residual()) .caseSensitive(caseSensitive) + .recordsPerBatch(numRecordsPerBatch) .build(); } diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java b/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java index 4b8952698bf8..080c0d1066d9 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/RandomData.java @@ -55,7 +55,9 @@ public static List generateList(Schema schema, int numRecords, long seed RandomDataGenerator generator = new RandomDataGenerator(schema, seed); List records = Lists.newArrayListWithExpectedSize(numRecords); for (int i = 0; i < numRecords; i += 1) { - records.add((Record) TypeUtil.visit(schema, generator)); + Record rec = (Record) TypeUtil.visit(schema, generator); + // System.out.println("Add record "+rec); + records.add(rec); } return records; diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java index c20bc67a69e1..335f8a840790 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java @@ -53,6 +53,7 @@ import org.apache.spark.sql.types.MapType; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import org.apache.spark.sql.vectorized.ColumnarBatch; import org.apache.spark.unsafe.types.UTF8String; import org.junit.Assert; import scala.collection.Seq; @@ -205,6 +206,33 @@ public static void assertEqualsUnsafe(Types.StructType struct, Record rec, Inter } } + public static void assertEqualsUnsafe(Types.StructType struct, List expected, ColumnarBatch batch) { + List fields = struct.fields(); + for (int r=0; r Checking Row "+r+", field #"+i + // + " , Field:"+ fields.get(i).name() + // + " , optional:"+fields.get(i).isOptional() + // + " , type:"+fieldType.typeId() + // + " , expected:"+expectedValue); + if (actualRow.isNullAt(i)) { + + Assert.assertTrue("Expect null", (expectedValue == null)); + } else { + Object actualValue = actualRow.get(i, convert(fieldType)); + assertEqualsUnsafe(fieldType, expectedValue, actualValue); + } + } + } + } + private static void assertEqualsUnsafe(Types.ListType list, Collection expected, ArrayData actual) { Type elementType = list.elementType(); List expectedElements = Lists.newArrayList(expected); diff --git a/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetVectorizedReader.java b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetVectorizedReader.java new file mode 100644 index 000000000000..fed41b499f59 --- /dev/null +++ b/spark/src/test/java/org/apache/iceberg/spark/data/TestSparkParquetVectorizedReader.java @@ -0,0 +1,116 @@ +package org.apache.iceberg.spark.data; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import org.apache.avro.generic.GenericData; +import org.apache.iceberg.Files; +import org.apache.iceberg.Schema; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.parquet.Parquet; +import org.apache.iceberg.spark.data.vector.VectorizedSparkParquetReaders; +import org.apache.iceberg.types.TypeUtil; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.vectorized.ColumnarBatch; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Test; + +import static org.apache.iceberg.spark.data.TestHelpers.assertEqualsUnsafe; + +public class TestSparkParquetVectorizedReader extends AvroDataTest { + + @Override + protected void writeAndValidate(Schema schema) throws IOException { + + // Write test data + Assume.assumeTrue("Parquet Avro cannot write non-string map keys", null == TypeUtil.find(schema, + type -> type.isMapType() && type.asMapType().keyType() != Types.StringType.get())); + + List expected = RandomData.generateList(schema, 100, 0L); + + // write a test parquet file using iceberg writer + File testFile = temp.newFile(); + Assert.assertTrue("Delete should succeed", testFile.delete()); + + try (FileAppender writer = Parquet.write(Files.localOutput(testFile)) + .schema(schema) + .named("test") + .build()) { + writer.addAll(expected); + } + + + try(CloseableIterable batchReader = Parquet.read(Files.localInput(testFile)) + .project(schema) + .createReaderFunc(type -> VectorizedSparkParquetReaders.buildReader(schema, schema, type)) + .build()) { + + Iterator batches = batchReader.iterator(); + int numRowsRead = 0; + int numExpectedRead = 0; + while(batches.hasNext()) { + + ColumnarBatch batch = batches.next(); + numRowsRead += batch.numRows(); + + List expectedBatch = new ArrayList<>(batch.numRows()); + for(int i = numExpectedRead; i < numExpectedRead+batch.numRows(); i++) { + expectedBatch.add(expected.get(i)); + } + + // System.out.println("-> Check "+numExpectedRead+" - "+ (numExpectedRead+batch.numRows())); + assertEqualsUnsafe(schema.asStruct(), expectedBatch, batch); + + System.out.println("Batch read with "+batch.numRows()+" rows. Read "+numRowsRead+" till now. " + + "Expected batch "+expectedBatch.size()); + + numExpectedRead += batch.numRows(); + } + + Assert.assertEquals(expected.size(), numRowsRead); + + } + } + + + + + @Test + public void testArray() throws IOException { + System.out.println("Not Supported"); + } + + @Test + public void testArrayOfStructs() throws IOException { + System.out.println("Not Supported"); + } + + @Test + public void testMap() throws IOException { + System.out.println("Not Supported"); + } + + @Test + public void testNumericMapKey() throws IOException { + System.out.println("Not Supported"); + } + + @Test + public void testComplexMapKey() throws IOException { + System.out.println("Not Supported"); + } + + @Test + public void testMapOfStructs() throws IOException { + System.out.println("Not Supported"); + } + + @Test + public void testMixedTypes() throws IOException { + System.out.println("Not Supported"); + } +} diff --git a/versions.lock b/versions.lock index b7e574d1bc60..2f314a21729b 100644 --- a/versions.lock +++ b/versions.lock @@ -7,16 +7,17 @@ com.carrotsearch:hppc:0.7.2 (1 constraints: f70cda14) com.clearspring.analytics:stream:2.7.0 (1 constraints: 1a0dd136) com.esotericsoftware:kryo-shaded:4.0.2 (2 constraints: b71345a6) com.esotericsoftware:minlog:1.3.0 (1 constraints: 670e7c4f) -com.fasterxml.jackson.core:jackson-annotations:2.7.9 (4 constraints: f24786bf) +com.fasterxml.jackson.core:jackson-annotations:2.7.9 (5 constraints: f154e19f) com.fasterxml.jackson.core:jackson-core:2.7.9 (5 constraints: d748db55) -com.fasterxml.jackson.core:jackson-databind:2.7.9 (8 constraints: a77bca51) +com.fasterxml.jackson.core:jackson-databind:2.7.9 (9 constraints: a688bc53) com.fasterxml.jackson.module:jackson-module-paranamer:2.7.9 (1 constraints: e0154200) com.fasterxml.jackson.module:jackson-module-scala_2.11:2.7.9 (1 constraints: 7f0da251) com.github.ben-manes.caffeine:caffeine:2.7.0 (1 constraints: 0b050a36) com.github.luben:zstd-jni:1.3.2-2 (1 constraints: 760d7c51) -com.google.code.findbugs:jsr305:3.0.2 (10 constraints: c483db75) +com.google.code.findbugs:jsr305:3.0.2 (9 constraints: d276cf3c) com.google.code.gson:gson:2.2.4 (1 constraints: 8c0d3f2f) com.google.errorprone:error_prone_annotations:2.3.3 (2 constraints: 161a2544) +com.google.flatbuffers:flatbuffers-java:1.9.0 (2 constraints: e5199714) com.google.guava:failureaccess:1.0.1 (1 constraints: 140ae1b4) com.google.guava:guava:28.0-jre (23 constraints: cc5c2ea0) com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava (1 constraints: bd17c918) @@ -40,7 +41,6 @@ com.twitter:chill-java:0.9.3 (2 constraints: a716716f) com.twitter:chill_2.11:0.9.3 (2 constraints: 121b92c3) com.twitter:parquet-hadoop-bundle:1.6.0 (3 constraints: 7c262424) com.univocity:univocity-parsers:2.7.3 (1 constraints: c40ccb27) -com.vlkan:flatbuffers:1.2.0-3f79e055 (2 constraints: 411e1dee) commons-beanutils:commons-beanutils:1.7.0 (1 constraints: da0e635f) commons-beanutils:commons-beanutils-core:1.8.0 (1 constraints: 1d134124) commons-cli:commons-cli:1.2 (8 constraints: 9467c282) @@ -66,6 +66,7 @@ io.dropwizard.metrics:metrics-json:3.1.5 (1 constraints: 1a0dc936) io.dropwizard.metrics:metrics-jvm:3.1.5 (1 constraints: 1a0dc936) io.netty:netty:3.9.9.Final (9 constraints: 9eb0396d) io.netty:netty-all:4.1.17.Final (3 constraints: d2312526) +it.unimi.dsi:fastutil:7.0.13 (1 constraints: fc0d4043) javax.activation:activation:1.1.1 (1 constraints: 140dbb36) javax.annotation:javax.annotation-api:1.2 (2 constraints: 2d21193d) javax.inject:javax.inject:1 (4 constraints: 852d0c1a) @@ -94,10 +95,10 @@ org.antlr:antlr4-runtime:4.7 (1 constraints: 7a0e125f) org.antlr:stringtemplate:3.2.1 (1 constraints: c10a3bc6) org.apache.ant:ant:1.9.1 (3 constraints: a721ed14) org.apache.ant:ant-launcher:1.9.1 (1 constraints: 69082485) -org.apache.arrow:arrow-format:0.10.0 (1 constraints: 1f0de721) -org.apache.arrow:arrow-memory:0.10.0 (1 constraints: 1f0de721) -org.apache.arrow:arrow-vector:0.10.0 (1 constraints: e90c9734) -org.apache.avro:avro:1.8.2 (4 constraints: 3d2eebf3) +org.apache.arrow:arrow-format:0.12.0 (1 constraints: 210ded21) +org.apache.arrow:arrow-memory:0.12.0 (1 constraints: 210ded21) +org.apache.arrow:arrow-vector:0.12.0 (2 constraints: 1d122345) +org.apache.avro:avro:1.8.2 (5 constraints: 083cf387) org.apache.avro:avro-ipc:1.8.2 (1 constraints: f90b5bf4) org.apache.avro:avro-mapred:1.8.2 (2 constraints: 3a1a4787) org.apache.calcite:calcite-avatica:1.2.0-incubating (4 constraints: a044b922) diff --git a/versions.props b/versions.props index 7f718424c56a..80c334e695f8 100644 --- a/versions.props +++ b/versions.props @@ -1,6 +1,7 @@ org.slf4j:slf4j-api = 1.7.5 com.google.guava:guava = 28.0-jre org.apache.avro:avro = 1.8.2 +org.apache.arrow:arrow-vector = 0.12.0 org.apache.hadoop:* = 2.7.3 org.apache.hive:hive-standalone-metastore = 1.2.1 org.apache.orc:orc-core = 1.5.5