From 8f371bc8e6339cfc7dd7c7bee6d583d2b41e93ea Mon Sep 17 00:00:00 2001 From: Andrei Ionescu Date: Mon, 28 Oct 2019 16:24:15 +0200 Subject: [PATCH 1/7] Fix Iceberg Reader for nested partitions --- .../iceberg/avro/BuildAvroProjection.java | 7 ++- .../apache/iceberg/spark/source/Reader.java | 11 +++- .../spark/source/TestParquetWrite.java | 62 +++++++++++++++++++ 3 files changed, 76 insertions(+), 4 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java index 35b8981ca0fe..5861d3df9e36 100644 --- a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java +++ b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java @@ -81,13 +81,14 @@ public Schema record(Schema record, List names, Iterable s List expectedFields = struct.fields(); for (int i = 0; i < expectedFields.size(); i += 1) { Types.NestedField field = expectedFields.get(i); + String sanitizedFieldName = AvroSchemaUtil.sanitize(field.name()); // detect reordering - if (i < fields.size() && !field.name().equals(fields.get(i).name())) { + if (i < fields.size() && !sanitizedFieldName.equals(fields.get(i).name())) { hasChange = true; } - Schema.Field avroField = updateMap.get(AvroSchemaUtil.makeCompatibleName(field.name())); + Schema.Field avroField = updateMap.get(sanitizedFieldName); if (avroField != null) { updatedFields.add(avroField); @@ -123,7 +124,7 @@ public Schema.Field field(Schema.Field field, Supplier fieldResult) { return null; } - String expectedName = expectedField.name(); + String expectedName = AvroSchemaUtil.sanitize(expectedField.name()); this.current = expectedField.type(); try { diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java index 43c966af0a1e..78993c775946 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java @@ -24,6 +24,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import java.io.Closeable; import java.io.IOException; import java.io.Serializable; @@ -396,7 +397,15 @@ private Iterator open(FileScanTask task) { // schema or rows returned by readers Schema finalSchema = expectedSchema; PartitionSpec spec = task.spec(); - Set idColumns = spec.identitySourceIds(); + + Set idColumns = Sets.newHashSet(); + for (Integer i : spec.identitySourceIds()) { + if (spec.schema().columns().stream() + .noneMatch(j -> j.type().isStructType() && j.type().asStructType().field(i) != null) + ) { + idColumns.add(i); + } + } // schema needed for the projection and filtering StructType sparkType = SparkSchemaUtil.convert(finalSchema); diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java index 5357187393aa..9b47aed1a38a 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java @@ -32,6 +32,7 @@ import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; @@ -409,4 +410,65 @@ public void testWriteProjectionWithMiddle() throws IOException { Assert.assertEquals("Number of rows should match", expected.size(), actual.size()); Assert.assertEquals("Result rows should match", expected, actual); } + + @Test + public void testNestedPartitioning() throws IOException { + Schema nestedSchema = new Schema( + optional(1, "id", Types.IntegerType.get()), + optional(2, "data", Types.StringType.get()), + optional(3, "nestedData", Types.StructType.of( + optional(4, "id", Types.IntegerType.get()), + optional(5, "moreData", Types.StringType.get()))) + ); + + File parent = temp.newFolder("parquet"); + File location = new File(parent, "test"); + + HadoopTables tables = new HadoopTables(new Configuration()); + PartitionSpec spec = PartitionSpec.builderFor(nestedSchema) + .identity("id") + .identity("nestedData.moreData") + .build(); + Table table = tables.create(nestedSchema, spec, location.toString()); + + List jsons = Lists.newArrayList( + "{ \"id\": 1, \"data\": \"a\", \"nestedData\": { \"id\": 100, \"moreData\": \"p1\"} }", + "{ \"id\": 2, \"data\": \"b\", \"nestedData\": { \"id\": 200, \"moreData\": \"p1\"} }", + "{ \"id\": 3, \"data\": \"c\", \"nestedData\": { \"id\": 300, \"moreData\": \"p2\"} }", + "{ \"id\": 4, \"data\": \"d\", \"nestedData\": { \"id\": 400, \"moreData\": \"p2\"} }" + ); + Dataset df = spark.read().schema(SparkSchemaUtil.convert(nestedSchema)) + .json(spark.createDataset(jsons, Encoders.STRING())); + + // TODO: incoming columns must be ordered according to the table's schema + df.select("id", "data", "nestedData").write() + .format("iceberg") + .mode("append") + .save(location.toString()); + + table.refresh(); + + Dataset result = spark.read() + .format("iceberg") + .load(location.toString()); + + List actual = result.orderBy("id").collectAsList(); + Assert.assertEquals("Number of rows should match", jsons.size(), actual.size()); + Assert.assertEquals("Row 1 col 1 is 1", 1, actual.get(0).getInt(0)); + Assert.assertEquals("Row 1 col 2 is a", "a", actual.get(0).getString(1)); + Assert.assertEquals("Row 1 col 3,1 is 100", 100, actual.get(0).getStruct(2).getInt(0)); + Assert.assertEquals("Row 1 col 3,2 is p1", "p1", actual.get(0).getStruct(2).getString(1)); + Assert.assertEquals("Row 2 col 1 is 2", 2, actual.get(1).getInt(0)); + Assert.assertEquals("Row 2 col 2 is b", "b", actual.get(1).getString(1)); + Assert.assertEquals("Row 2 col 3,1 is 200", 200, actual.get(1).getStruct(2).getInt(0)); + Assert.assertEquals("Row 2 col 3,2 is p1", "p1", actual.get(1).getStruct(2).getString(1)); + Assert.assertEquals("Row 3 col 1 is 3", 3, actual.get(2).getInt(0)); + Assert.assertEquals("Row 3 col 2 is c", "c", actual.get(2).getString(1)); + Assert.assertEquals("Row 3 col 3,1 is 300", 300, actual.get(2).getStruct(2).getInt(0)); + Assert.assertEquals("Row 3 col 3,2 is p2", "p2", actual.get(2).getStruct(2).getString(1)); + Assert.assertEquals("Row 4 col 1 is 4", 4, actual.get(3).getInt(0)); + Assert.assertEquals("Row 4 col 2 is d", "d", actual.get(3).getString(1)); + Assert.assertEquals("Row 4 col 3,1 is 400", 400, actual.get(3).getStruct(2).getInt(0)); + Assert.assertEquals("Row 4 col 3,2 is p2", "p2", actual.get(3).getStruct(2).getString(1)); + } } From 69fb017cf14f08f4ff6782d6e95bb8d11463e935 Mon Sep 17 00:00:00 2001 From: Andrei Ionescu Date: Wed, 13 Nov 2019 16:20:32 +0200 Subject: [PATCH 2/7] Use build reader to read datasets --- .../spark/data/SparkParquetReaders.java | 38 +++-- .../apache/iceberg/spark/source/Reader.java | 136 +++++++----------- 2 files changed, 78 insertions(+), 96 deletions(-) diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index 9a36266ffdf2..15b0cdc8247a 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -26,6 +26,7 @@ import java.math.BigInteger; import java.nio.ByteBuffer; import java.util.Arrays; +import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -68,21 +69,27 @@ private SparkParquetReaders() { @SuppressWarnings("unchecked") public static ParquetValueReader buildReader(Schema expectedSchema, - MessageType fileSchema) { + MessageType fileSchema, + Map partitionValues) { if (ParquetSchemaUtil.hasIds(fileSchema)) { return (ParquetValueReader) TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, - new ReadBuilder(fileSchema)); + new ReadBuilder(fileSchema, partitionValues)); } else { return (ParquetValueReader) TypeWithSchemaVisitor.visit(expectedSchema.asStruct(), fileSchema, - new FallbackReadBuilder(fileSchema)); + new FallbackReadBuilder(fileSchema, partitionValues)); } } + public static ParquetValueReader buildReader(Schema expectedSchema, + MessageType fileSchema) { + return SparkParquetReaders.buildReader(expectedSchema, fileSchema, Collections.emptyMap()); + } + private static class FallbackReadBuilder extends ReadBuilder { - FallbackReadBuilder(MessageType type) { - super(type); + FallbackReadBuilder(MessageType type, Map partitionValues) { + super(type, partitionValues); } @Override @@ -113,9 +120,11 @@ public ParquetValueReader struct(Types.StructType ignored, GroupType struct, private static class ReadBuilder extends TypeWithSchemaVisitor> { private final MessageType type; + private final Map partitionValues; - ReadBuilder(MessageType type) { + ReadBuilder(MessageType type, Map partitionValues) { this.type = type; + this.partitionValues = partitionValues; } @Override @@ -146,13 +155,18 @@ public ParquetValueReader struct(Types.StructType expected, GroupType struct, List types = Lists.newArrayListWithExpectedSize(expectedFields.size()); for (Types.NestedField field : expectedFields) { int id = field.fieldId(); - ParquetValueReader reader = readersById.get(id); - if (reader != null) { - reorderedFields.add(reader); - types.add(typesById.get(id)); - } else { - reorderedFields.add(ParquetValueReaders.nulls()); + if (partitionValues.containsKey(id)) { + reorderedFields.add(ParquetValueReaders.constant(partitionValues.get(id))); types.add(null); + } else { + ParquetValueReader reader = readersById.get(id); + if (reader != null) { + reorderedFields.add(reader); + types.add(typesById.get(id)); + } else { + reorderedFields.add(ParquetValueReaders.nulls()); + types.add(null); + } } } diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java index 78993c775946..efb959e305a5 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java @@ -24,21 +24,23 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import java.io.Closeable; import java.io.IOException; import java.io.Serializable; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.function.Function; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataTask; import org.apache.iceberg.FileScanTask; -import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; @@ -68,8 +70,6 @@ import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.catalyst.expressions.Attribute; import org.apache.spark.sql.catalyst.expressions.AttributeReference; -import org.apache.spark.sql.catalyst.expressions.GenericInternalRow; -import org.apache.spark.sql.catalyst.expressions.JoinedRow; import org.apache.spark.sql.catalyst.expressions.UnsafeProjection; import org.apache.spark.sql.sources.Filter; import org.apache.spark.sql.sources.v2.DataSourceOptions; @@ -397,7 +397,6 @@ private Iterator open(FileScanTask task) { // schema or rows returned by readers Schema finalSchema = expectedSchema; PartitionSpec spec = task.spec(); - Set idColumns = Sets.newHashSet(); for (Integer i : spec.identitySourceIds()) { if (spec.schema().columns().stream() @@ -407,9 +406,8 @@ private Iterator open(FileScanTask task) { } } - // schema needed for the projection and filtering - StructType sparkType = SparkSchemaUtil.convert(finalSchema); - Schema requiredSchema = SparkSchemaUtil.prune(tableSchema, sparkType, task.residual(), caseSensitive); + Schema requiredSchema = SparkSchemaUtil.prune(tableSchema, + SparkSchemaUtil.convert(finalSchema), task.residual(), caseSensitive); boolean hasJoinedPartitionColumns = !idColumns.isEmpty(); boolean hasExtraFilterColumns = requiredSchema.columns().size() != finalSchema.columns().size(); @@ -417,36 +415,46 @@ private Iterator open(FileScanTask task) { Iterator iter; if (hasJoinedPartitionColumns) { - // schema used to read data files Schema readSchema = TypeUtil.selectNot(requiredSchema, idColumns); Schema partitionSchema = TypeUtil.select(requiredSchema, idColumns); - PartitionRowConverter convertToRow = new PartitionRowConverter(partitionSchema, spec); - JoinedRow joined = new JoinedRow(); - InternalRow partition = convertToRow.apply(file.partition()); - joined.withRight(partition); + Map partitionValueMap = Maps.newHashMap(); + Map partitionSpecFieldIndexMap = Maps.newHashMap(); + Map> partitionSpecJavaTypeMap = Maps.newHashMap(); + Map partitionSpecDataTypeMap = Maps.newHashMap(); + for (int i = 0; i < spec.fields().size(); i++) { + String fieldName = spec.fields().get(i).name(); + partitionSpecFieldIndexMap.put(fieldName, i); + partitionSpecJavaTypeMap.put(fieldName, spec.javaClasses()[i]); + partitionSpecDataTypeMap.put(fieldName, getPartitionType(partitionSchema, fieldName)); + } - // create joined rows and project from the joined schema to the final schema - iterSchema = TypeUtil.join(readSchema, partitionSchema); - iter = Iterators.transform(open(task, readSchema), joined::withLeft); + List columns = new ArrayList<>(readSchema.columns()); + for (Types.NestedField field : partitionSchema.columns()) { + int partitionIndex = partitionSpecFieldIndexMap.get(field.name()); + DataType dataType = partitionSpecDataTypeMap.get(field.name()); + Class javaType = partitionSpecJavaTypeMap.get(field.name()); + Object convertedValue = getPartitionValue(file.partition(), partitionIndex, dataType, javaType); + partitionValueMap.put(field.fieldId(), convertedValue); + columns.add(field); + } + columns.sort(Comparator.comparingInt(Types.NestedField::fieldId)); + iterSchema = new Schema(columns); + iter = open(task, finalSchema, partitionValueMap); } else if (hasExtraFilterColumns) { - // add projection to the final schema iterSchema = requiredSchema; - iter = open(task, requiredSchema); - + iter = open(task, requiredSchema, Collections.emptyMap()); } else { - // return the base iterator iterSchema = finalSchema; - iter = open(task, finalSchema); + iter = open(task, finalSchema, Collections.emptyMap()); } - // TODO: remove the projection by reporting the iterator's schema back to Spark return Iterators.transform(iter, APPLY_PROJECTION.bind(projection(finalSchema, iterSchema))::invoke); } - private Iterator open(FileScanTask task, Schema readSchema) { + private Iterator open(FileScanTask task, Schema readSchema, Map partitionValues) { CloseableIterable iter; if (task.isDataTask()) { iter = newDataIterable(task.asDataTask(), readSchema); @@ -457,7 +465,7 @@ private Iterator open(FileScanTask task, Schema readSchema) { switch (task.file().format()) { case PARQUET: - iter = newParquetIterable(location, task, readSchema); + iter = newParquetIterable(location, task, readSchema, partitionValues); break; case AVRO: @@ -513,12 +521,14 @@ private CloseableIterable newAvroIterable(InputFile location, } private CloseableIterable newParquetIterable(InputFile location, - FileScanTask task, - Schema readSchema) { + FileScanTask task, + Schema readSchema, + Map partitionValues) { + return Parquet.read(location) .project(readSchema) .split(task.start(), task.length()) - .createReaderFunc(fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema)) + .createReaderFunc(fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema, partitionValues)) .filter(task.residual()) .caseSensitive(caseSensitive) .build(); @@ -542,69 +552,27 @@ private CloseableIterable newDataIterable(DataTask task, Schema rea return CloseableIterable.transform( asSparkRows, APPLY_PROJECTION.bind(projection(readSchema, tableSchema))::invoke); } - } - private static class PartitionRowConverter implements Function { - private final DataType[] types; - private final int[] positions; - private final Class[] javaTypes; - private final GenericInternalRow reusedRow; - - PartitionRowConverter(Schema partitionSchema, PartitionSpec spec) { - StructType partitionType = SparkSchemaUtil.convert(partitionSchema); - StructField[] fields = partitionType.fields(); - - this.types = new DataType[fields.length]; - this.positions = new int[types.length]; - this.javaTypes = new Class[types.length]; - this.reusedRow = new GenericInternalRow(types.length); - - List partitionFields = spec.fields(); - for (int rowIndex = 0; rowIndex < fields.length; rowIndex += 1) { - this.types[rowIndex] = fields[rowIndex].dataType(); - - int sourceId = partitionSchema.columns().get(rowIndex).fieldId(); - for (int specIndex = 0; specIndex < partitionFields.size(); specIndex += 1) { - PartitionField field = spec.fields().get(specIndex); - if (field.sourceId() == sourceId && "identity".equals(field.transform().toString())) { - positions[rowIndex] = specIndex; - javaTypes[rowIndex] = spec.javaClasses()[specIndex]; - break; - } - } + private DataType getPartitionType(Schema partitionSchema, String fieldName) { + if (partitionSchema.findField(fieldName) != null) { + return SparkSchemaUtil.convert(partitionSchema.findField(fieldName).type()); + } else { + return SparkSchemaUtil.convert(tableSchema.findField(fieldName).type()); } } - @Override - public InternalRow apply(StructLike tuple) { - for (int i = 0; i < types.length; i += 1) { - Object value = tuple.get(positions[i], javaTypes[i]); - if (value != null) { - reusedRow.update(i, convert(value, types[i])); - } else { - reusedRow.setNullAt(i); + private Object getPartitionValue(StructLike schema, int index, DataType sparkType, Class javaType) { + Object convertedValue = schema.get(index, javaType); + if (convertedValue != null) { + if (sparkType instanceof StringType) { + convertedValue = UTF8String.fromString(convertedValue.toString()); + } else if (sparkType instanceof BinaryType) { + convertedValue = ByteBuffers.toByteArray((ByteBuffer) convertedValue); + } else if (sparkType instanceof DecimalType) { + convertedValue = Decimal.fromDecimal(convertedValue); } } - - return reusedRow; - } - - /** - * Converts the objects into instances used by Spark's InternalRow. - * - * @param value a data value - * @param type the Spark data type - * @return the value converted to the representation expected by Spark's InternalRow. - */ - private static Object convert(Object value, DataType type) { - if (type instanceof StringType) { - return UTF8String.fromString(value.toString()); - } else if (type instanceof BinaryType) { - return ByteBuffers.toByteArray((ByteBuffer) value); - } else if (type instanceof DecimalType) { - return Decimal.fromDecimal(value); - } - return value; + return convertedValue; } } From f508e2946b232af36811457d1afac8f2df98fc76 Mon Sep 17 00:00:00 2001 From: Andrei Ionescu Date: Mon, 25 Nov 2019 16:05:11 +0200 Subject: [PATCH 3/7] Simplify the reader implementation and address feeedback --- .../apache/iceberg/spark/source/Reader.java | 92 ++++++++++--------- 1 file changed, 47 insertions(+), 45 deletions(-) diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java index efb959e305a5..6a656afbff12 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java @@ -25,14 +25,11 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import java.io.Closeable; import java.io.IOException; import java.io.Serializable; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -397,17 +394,10 @@ private Iterator open(FileScanTask task) { // schema or rows returned by readers Schema finalSchema = expectedSchema; PartitionSpec spec = task.spec(); - Set idColumns = Sets.newHashSet(); - for (Integer i : spec.identitySourceIds()) { - if (spec.schema().columns().stream() - .noneMatch(j -> j.type().isStructType() && j.type().asStructType().field(i) != null) - ) { - idColumns.add(i); - } - } + Set idColumns = spec.identitySourceIds(); - Schema requiredSchema = SparkSchemaUtil.prune(tableSchema, - SparkSchemaUtil.convert(finalSchema), task.residual(), caseSensitive); + StructType sparkType = SparkSchemaUtil.convert(finalSchema); + Schema requiredSchema = SparkSchemaUtil.prune(tableSchema, sparkType, task.residual(), caseSensitive); boolean hasJoinedPartitionColumns = !idColumns.isEmpty(); boolean hasExtraFilterColumns = requiredSchema.columns().size() != finalSchema.columns().size(); @@ -415,32 +405,28 @@ private Iterator open(FileScanTask task) { Iterator iter; if (hasJoinedPartitionColumns) { - Schema readSchema = TypeUtil.selectNot(requiredSchema, idColumns); Schema partitionSchema = TypeUtil.select(requiredSchema, idColumns); Map partitionValueMap = Maps.newHashMap(); - Map partitionSpecFieldIndexMap = Maps.newHashMap(); - Map> partitionSpecJavaTypeMap = Maps.newHashMap(); - Map partitionSpecDataTypeMap = Maps.newHashMap(); + Map partitionSpecFieldIndexMap = Maps.newHashMap(); + Map partitionSpecDataTypeMap = Maps.newHashMap(); for (int i = 0; i < spec.fields().size(); i++) { - String fieldName = spec.fields().get(i).name(); - partitionSpecFieldIndexMap.put(fieldName, i); - partitionSpecJavaTypeMap.put(fieldName, spec.javaClasses()[i]); - partitionSpecDataTypeMap.put(fieldName, getPartitionType(partitionSchema, fieldName)); + Integer sourceId = spec.fields().get(i).sourceId(); + partitionSpecFieldIndexMap.put(sourceId, i); + partitionSpecDataTypeMap.put(sourceId, getPartitionType(partitionSchema, sourceId)); } - List columns = new ArrayList<>(readSchema.columns()); - for (Types.NestedField field : partitionSchema.columns()) { - int partitionIndex = partitionSpecFieldIndexMap.get(field.name()); - DataType dataType = partitionSpecDataTypeMap.get(field.name()); - Class javaType = partitionSpecJavaTypeMap.get(field.name()); - Object convertedValue = getPartitionValue(file.partition(), partitionIndex, dataType, javaType); - partitionValueMap.put(field.fieldId(), convertedValue); - columns.add(field); + Set projectedIds = TypeUtil.getProjectedIds(partitionSchema); + for (Integer sourceId : projectedIds) { + if (partitionSpecFieldIndexMap.containsKey(sourceId)) { + int partitionIndex = partitionSpecFieldIndexMap.get(sourceId); + DataType dataType = partitionSpecDataTypeMap.get(sourceId); + Object partitionValue = convert(file.partition().get(partitionIndex, Object.class), dataType); + partitionValueMap.put(sourceId, partitionValue); + } } - columns.sort(Comparator.comparingInt(Types.NestedField::fieldId)); - iterSchema = new Schema(columns); + iterSchema = requiredSchema; iter = open(task, finalSchema, partitionValueMap); } else if (hasExtraFilterColumns) { iterSchema = requiredSchema; @@ -553,27 +539,43 @@ private CloseableIterable newDataIterable(DataTask task, Schema rea asSparkRows, APPLY_PROJECTION.bind(projection(readSchema, tableSchema))::invoke); } - private DataType getPartitionType(Schema partitionSchema, String fieldName) { - if (partitionSchema.findField(fieldName) != null) { - return SparkSchemaUtil.convert(partitionSchema.findField(fieldName).type()); + /** + * Returns the Spark data type for a partition. + * + * If the field is not found in the partition schema we try to retrieve it from the full table schema. + * + * @param partitionSchema the Iceberg schema for partitions + * @param fieldId the id of the field + * @return the Spark data type of the field + */ + private DataType getPartitionType(Schema partitionSchema, Integer fieldId) { + if (partitionSchema.findField(fieldId) != null) { + return SparkSchemaUtil.convert(partitionSchema.findField(fieldId).type()); } else { - return SparkSchemaUtil.convert(tableSchema.findField(fieldName).type()); + return SparkSchemaUtil.convert(tableSchema.findField(fieldId).type()); } } - private Object getPartitionValue(StructLike schema, int index, DataType sparkType, Class javaType) { - Object convertedValue = schema.get(index, javaType); - if (convertedValue != null) { - if (sparkType instanceof StringType) { - convertedValue = UTF8String.fromString(convertedValue.toString()); - } else if (sparkType instanceof BinaryType) { - convertedValue = ByteBuffers.toByteArray((ByteBuffer) convertedValue); - } else if (sparkType instanceof DecimalType) { - convertedValue = Decimal.fromDecimal(convertedValue); + /** + * Converts the objects into instances used by Spark's InternalRow. + * + * @param value a data value + * @param type the Spark data type + * @return the value converted to the representation expected by Spark's InternalRow. + */ + private static Object convert(Object value, DataType type) { + if (value != null) { + if (type instanceof StringType) { + return UTF8String.fromString(value.toString()); + } else if (type instanceof BinaryType) { + return ByteBuffers.toByteArray((ByteBuffer) value); + } else if (type instanceof DecimalType) { + return Decimal.fromDecimal(value); } } - return convertedValue; + return value; } + } private static class StructLikeInternalRow implements StructLike { From e93f5e26d3eafa5b542ab9f3156316dae3787dc4 Mon Sep 17 00:00:00 2001 From: Andrei Ionescu Date: Tue, 26 Nov 2019 11:53:40 +0200 Subject: [PATCH 4/7] One more simplification round --- .../apache/iceberg/spark/source/Reader.java | 25 ++++++++----------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java index 6a656afbff12..784c52b657c3 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java @@ -38,6 +38,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DataTask; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; @@ -61,7 +62,6 @@ import org.apache.iceberg.spark.data.SparkAvroReader; import org.apache.iceberg.spark.data.SparkOrcReader; import org.apache.iceberg.spark.data.SparkParquetReaders; -import org.apache.iceberg.types.TypeUtil; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.ByteBuffers; import org.apache.spark.sql.catalyst.InternalRow; @@ -396,6 +396,7 @@ private Iterator open(FileScanTask task) { PartitionSpec spec = task.spec(); Set idColumns = spec.identitySourceIds(); + // schema needed for the projection and filtering StructType sparkType = SparkSchemaUtil.convert(finalSchema); Schema requiredSchema = SparkSchemaUtil.prune(tableSchema, sparkType, task.residual(), caseSensitive); boolean hasJoinedPartitionColumns = !idColumns.isEmpty(); @@ -405,25 +406,22 @@ private Iterator open(FileScanTask task) { Iterator iter; if (hasJoinedPartitionColumns) { - Schema partitionSchema = TypeUtil.select(requiredSchema, idColumns); - Map partitionValueMap = Maps.newHashMap(); Map partitionSpecFieldIndexMap = Maps.newHashMap(); Map partitionSpecDataTypeMap = Maps.newHashMap(); + for (int i = 0; i < spec.fields().size(); i++) { - Integer sourceId = spec.fields().get(i).sourceId(); + PartitionField partitionField = spec.fields().get(i); + Integer sourceId = partitionField.sourceId(); partitionSpecFieldIndexMap.put(sourceId, i); - partitionSpecDataTypeMap.put(sourceId, getPartitionType(partitionSchema, sourceId)); + DataType partitionType = SparkSchemaUtil.convert(spec.partitionType().field(partitionField.name()).type()); + partitionSpecDataTypeMap.put(sourceId, partitionType); } - Set projectedIds = TypeUtil.getProjectedIds(partitionSchema); - for (Integer sourceId : projectedIds) { - if (partitionSpecFieldIndexMap.containsKey(sourceId)) { - int partitionIndex = partitionSpecFieldIndexMap.get(sourceId); - DataType dataType = partitionSpecDataTypeMap.get(sourceId); - Object partitionValue = convert(file.partition().get(partitionIndex, Object.class), dataType); - partitionValueMap.put(sourceId, partitionValue); - } + for (Map.Entry entry : partitionSpecFieldIndexMap.entrySet()) { + Object partitionValue = convert(file.partition().get(entry.getValue(), Object.class), + partitionSpecDataTypeMap.get(entry.getKey())); + partitionValueMap.put(entry.getKey(), partitionValue); } iterSchema = requiredSchema; @@ -575,7 +573,6 @@ private static Object convert(Object value, DataType type) { } return value; } - } private static class StructLikeInternalRow implements StructLike { From 123830e3c564b4f1a6e90ebd461c8726f6f3ffba Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 4 Dec 2019 17:14:21 -0800 Subject: [PATCH 5/7] Fix Spark Parquet read path. --- .../iceberg/avro/BuildAvroProjection.java | 4 +- .../spark/data/SparkParquetReaders.java | 10 +- .../apache/iceberg/spark/source/Reader.java | 109 +++++------------- 3 files changed, 33 insertions(+), 90 deletions(-) diff --git a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java index 5861d3df9e36..0687ecf97a89 100644 --- a/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java +++ b/core/src/main/java/org/apache/iceberg/avro/BuildAvroProjection.java @@ -81,7 +81,7 @@ public Schema record(Schema record, List names, Iterable s List expectedFields = struct.fields(); for (int i = 0; i < expectedFields.size(); i += 1) { Types.NestedField field = expectedFields.get(i); - String sanitizedFieldName = AvroSchemaUtil.sanitize(field.name()); + String sanitizedFieldName = AvroSchemaUtil.makeCompatibleName(field.name()); // detect reordering if (i < fields.size() && !sanitizedFieldName.equals(fields.get(i).name())) { @@ -124,7 +124,7 @@ public Schema.Field field(Schema.Field field, Supplier fieldResult) { return null; } - String expectedName = AvroSchemaUtil.sanitize(expectedField.name()); + String expectedName = AvroSchemaUtil.makeCompatibleName(expectedField.name()); this.current = expectedField.type(); try { diff --git a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java index 15b0cdc8247a..ea8f725f3afc 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java +++ b/spark/src/main/java/org/apache/iceberg/spark/data/SparkParquetReaders.java @@ -67,6 +67,11 @@ public class SparkParquetReaders { private SparkParquetReaders() { } + public static ParquetValueReader buildReader(Schema expectedSchema, + MessageType fileSchema) { + return SparkParquetReaders.buildReader(expectedSchema, fileSchema, Collections.emptyMap()); + } + @SuppressWarnings("unchecked") public static ParquetValueReader buildReader(Schema expectedSchema, MessageType fileSchema, @@ -82,11 +87,6 @@ public static ParquetValueReader buildReader(Schema expectedSchema, } } - public static ParquetValueReader buildReader(Schema expectedSchema, - MessageType fileSchema) { - return SparkParquetReaders.buildReader(expectedSchema, fileSchema, Collections.emptyMap()); - } - private static class FallbackReadBuilder extends ReadBuilder { FallbackReadBuilder(MessageType type, Map partitionValues) { super(type, partitionValues); diff --git a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java index 784c52b657c3..bb71d8069592 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java +++ b/spark/src/main/java/org/apache/iceberg/spark/source/Reader.java @@ -29,11 +29,9 @@ import java.io.IOException; import java.io.Serializable; import java.nio.ByteBuffer; -import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.iceberg.CombinedScanTask; import org.apache.iceberg.DataFile; import org.apache.iceberg.DataTask; @@ -82,7 +80,6 @@ import org.apache.spark.sql.types.Decimal; import org.apache.spark.sql.types.DecimalType; import org.apache.spark.sql.types.StringType; -import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import org.apache.spark.unsafe.types.UTF8String; import org.slf4j.Logger; @@ -394,48 +391,22 @@ private Iterator open(FileScanTask task) { // schema or rows returned by readers Schema finalSchema = expectedSchema; PartitionSpec spec = task.spec(); - Set idColumns = spec.identitySourceIds(); // schema needed for the projection and filtering StructType sparkType = SparkSchemaUtil.convert(finalSchema); Schema requiredSchema = SparkSchemaUtil.prune(tableSchema, sparkType, task.residual(), caseSensitive); - boolean hasJoinedPartitionColumns = !idColumns.isEmpty(); boolean hasExtraFilterColumns = requiredSchema.columns().size() != finalSchema.columns().size(); - Schema iterSchema; - Iterator iter; + // build a map of partition values for reconstructing records + Map partitionValues = partitionMap(spec, file.partition()); - if (hasJoinedPartitionColumns) { - Map partitionValueMap = Maps.newHashMap(); - Map partitionSpecFieldIndexMap = Maps.newHashMap(); - Map partitionSpecDataTypeMap = Maps.newHashMap(); - - for (int i = 0; i < spec.fields().size(); i++) { - PartitionField partitionField = spec.fields().get(i); - Integer sourceId = partitionField.sourceId(); - partitionSpecFieldIndexMap.put(sourceId, i); - DataType partitionType = SparkSchemaUtil.convert(spec.partitionType().field(partitionField.name()).type()); - partitionSpecDataTypeMap.put(sourceId, partitionType); - } - - for (Map.Entry entry : partitionSpecFieldIndexMap.entrySet()) { - Object partitionValue = convert(file.partition().get(entry.getValue(), Object.class), - partitionSpecDataTypeMap.get(entry.getKey())); - partitionValueMap.put(entry.getKey(), partitionValue); - } - - iterSchema = requiredSchema; - iter = open(task, finalSchema, partitionValueMap); - } else if (hasExtraFilterColumns) { - iterSchema = requiredSchema; - iter = open(task, requiredSchema, Collections.emptyMap()); + if (hasExtraFilterColumns) { + return Iterators.transform( + open(task, requiredSchema, partitionValues), + APPLY_PROJECTION.bind(projection(finalSchema, requiredSchema))::invoke); } else { - iterSchema = finalSchema; - iter = open(task, finalSchema, Collections.emptyMap()); + return open(task, finalSchema, partitionValues); } - - return Iterators.transform(iter, - APPLY_PROJECTION.bind(projection(finalSchema, iterSchema))::invoke); } private Iterator open(FileScanTask task, Schema readSchema, Map partitionValues) { @@ -537,23 +508,6 @@ private CloseableIterable newDataIterable(DataTask task, Schema rea asSparkRows, APPLY_PROJECTION.bind(projection(readSchema, tableSchema))::invoke); } - /** - * Returns the Spark data type for a partition. - * - * If the field is not found in the partition schema we try to retrieve it from the full table schema. - * - * @param partitionSchema the Iceberg schema for partitions - * @param fieldId the id of the field - * @return the Spark data type of the field - */ - private DataType getPartitionType(Schema partitionSchema, Integer fieldId) { - if (partitionSchema.findField(fieldId) != null) { - return SparkSchemaUtil.convert(partitionSchema.findField(fieldId).type()); - } else { - return SparkSchemaUtil.convert(tableSchema.findField(fieldId).type()); - } - } - /** * Converts the objects into instances used by Spark's InternalRow. * @@ -573,39 +527,28 @@ private static Object convert(Object value, DataType type) { } return value; } - } - private static class StructLikeInternalRow implements StructLike { - private final DataType[] types; - private InternalRow row = null; - - StructLikeInternalRow(StructType struct) { - this.types = new DataType[struct.size()]; - StructField[] fields = struct.fields(); - for (int i = 0; i < fields.length; i += 1) { - types[i] = fields[i].dataType(); + /** + * Creates a map from field ID to Spark value for a partition tuple. + * + * @param spec a partition spec + * @param partition a partition tuple + * @return a map from field ID to Spark value + */ + private static Map partitionMap(PartitionSpec spec, StructLike partition) { + Map partitionValues = Maps.newHashMap(); + + List fields = spec.fields(); + for (int i = 0; i < fields.size(); i += 1) { + PartitionField field = fields.get(i); + if ("identity".equals(field.transform().toString())) { + partitionValues.put(field.sourceId(), convert( + partition.get(i, spec.javaClasses()[i]), + SparkSchemaUtil.convert(spec.partitionType().field(field.name()).type()))); + } } - } - - public StructLikeInternalRow setRow(InternalRow row) { - this.row = row; - return this; - } - @Override - public int size() { - return types.length; - } - - @Override - @SuppressWarnings("unchecked") - public T get(int pos, Class javaClass) { - return javaClass.cast(row.get(pos, types[pos])); - } - - @Override - public void set(int pos, T value) { - throw new UnsupportedOperationException("Not implemented: set"); + return partitionValues; } } } From e43ceae2af61e053004d1f802cb71164eafbb8f8 Mon Sep 17 00:00:00 2001 From: Andrei Ionescu Date: Thu, 5 Dec 2019 15:10:56 +0200 Subject: [PATCH 6/7] Add day partition in nested partition unit test --- .../spark/source/TestParquetWrite.java | 28 +++++++++++++++---- 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java index 9b47aed1a38a..305b4628dc0b 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java @@ -22,6 +22,8 @@ import com.google.common.collect.Lists; import java.io.File; import java.io.IOException; +import java.sql.Timestamp; +import java.time.Instant; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.DataFile; @@ -418,7 +420,8 @@ public void testNestedPartitioning() throws IOException { optional(2, "data", Types.StringType.get()), optional(3, "nestedData", Types.StructType.of( optional(4, "id", Types.IntegerType.get()), - optional(5, "moreData", Types.StringType.get()))) + optional(5, "moreData", Types.StringType.get()))), + optional(6, "timestamp", Types.TimestampType.withZone()) ); File parent = temp.newFolder("parquet"); @@ -427,21 +430,26 @@ public void testNestedPartitioning() throws IOException { HadoopTables tables = new HadoopTables(new Configuration()); PartitionSpec spec = PartitionSpec.builderFor(nestedSchema) .identity("id") + .day("timestamp") .identity("nestedData.moreData") .build(); Table table = tables.create(nestedSchema, spec, location.toString()); List jsons = Lists.newArrayList( - "{ \"id\": 1, \"data\": \"a\", \"nestedData\": { \"id\": 100, \"moreData\": \"p1\"} }", - "{ \"id\": 2, \"data\": \"b\", \"nestedData\": { \"id\": 200, \"moreData\": \"p1\"} }", - "{ \"id\": 3, \"data\": \"c\", \"nestedData\": { \"id\": 300, \"moreData\": \"p2\"} }", - "{ \"id\": 4, \"data\": \"d\", \"nestedData\": { \"id\": 400, \"moreData\": \"p2\"} }" + "{ \"id\": 1, \"data\": \"a\", \"nestedData\": { \"id\": 100, \"moreData\": \"p1\"}, " + + "\"timestamp\": \"2017-12-01T10:12:55.034Z\" }", + "{ \"id\": 2, \"data\": \"b\", \"nestedData\": { \"id\": 200, \"moreData\": \"p1\"}, " + + "\"timestamp\": \"2017-12-02T10:12:55.034Z\" }", + "{ \"id\": 3, \"data\": \"c\", \"nestedData\": { \"id\": 300, \"moreData\": \"p2\"}, " + + "\"timestamp\": \"2017-12-03T10:12:55.034Z\" }", + "{ \"id\": 4, \"data\": \"d\", \"nestedData\": { \"id\": 400, \"moreData\": \"p2\"}, " + + "\"timestamp\": \"2017-12-04T10:12:55.034Z\" }" ); Dataset df = spark.read().schema(SparkSchemaUtil.convert(nestedSchema)) .json(spark.createDataset(jsons, Encoders.STRING())); // TODO: incoming columns must be ordered according to the table's schema - df.select("id", "data", "nestedData").write() + df.select("id", "data", "nestedData", "timestamp").write() .format("iceberg") .mode("append") .save(location.toString()); @@ -458,17 +466,25 @@ public void testNestedPartitioning() throws IOException { Assert.assertEquals("Row 1 col 2 is a", "a", actual.get(0).getString(1)); Assert.assertEquals("Row 1 col 3,1 is 100", 100, actual.get(0).getStruct(2).getInt(0)); Assert.assertEquals("Row 1 col 3,2 is p1", "p1", actual.get(0).getStruct(2).getString(1)); + Assert.assertEquals("Row 1 col 4 is 2017-12-01T10:12:55.034+00:00", + 0, actual.get(0).getTimestamp(3).compareTo(Timestamp.from(Instant.parse("2017-12-01T10:12:55.034Z")))); Assert.assertEquals("Row 2 col 1 is 2", 2, actual.get(1).getInt(0)); Assert.assertEquals("Row 2 col 2 is b", "b", actual.get(1).getString(1)); Assert.assertEquals("Row 2 col 3,1 is 200", 200, actual.get(1).getStruct(2).getInt(0)); Assert.assertEquals("Row 2 col 3,2 is p1", "p1", actual.get(1).getStruct(2).getString(1)); + Assert.assertEquals("Row 2 col 4 is 2017-12-02 12:12:55.034", + 0, actual.get(1).getTimestamp(3).compareTo(Timestamp.from(Instant.parse("2017-12-02T10:12:55.034Z")))); Assert.assertEquals("Row 3 col 1 is 3", 3, actual.get(2).getInt(0)); Assert.assertEquals("Row 3 col 2 is c", "c", actual.get(2).getString(1)); Assert.assertEquals("Row 3 col 3,1 is 300", 300, actual.get(2).getStruct(2).getInt(0)); Assert.assertEquals("Row 3 col 3,2 is p2", "p2", actual.get(2).getStruct(2).getString(1)); + Assert.assertEquals("Row 3 col 4 is 2017-12-03 12:12:55.034", + 0, actual.get(2).getTimestamp(3).compareTo(Timestamp.from(Instant.parse("2017-12-03T10:12:55.034Z")))); Assert.assertEquals("Row 4 col 1 is 4", 4, actual.get(3).getInt(0)); Assert.assertEquals("Row 4 col 2 is d", "d", actual.get(3).getString(1)); Assert.assertEquals("Row 4 col 3,1 is 400", 400, actual.get(3).getStruct(2).getInt(0)); Assert.assertEquals("Row 4 col 3,2 is p2", "p2", actual.get(3).getStruct(2).getString(1)); + Assert.assertEquals("Row 4 col 4 is 2017-12-04 12:12:55.034", + 0, actual.get(3).getTimestamp(3).compareTo(Timestamp.from(Instant.parse("2017-12-04T10:12:55.034Z")))); } } From 1b3e39f3ad5acfb15e54e7e857886836803fab09 Mon Sep 17 00:00:00 2001 From: Andrei Ionescu Date: Fri, 6 Dec 2019 13:23:38 +0200 Subject: [PATCH 7/7] Add unit test for Avro with nested partitions --- .../iceberg/spark/source/TestAvroWrite.java | 143 ++++++++++++++++++ 1 file changed, 143 insertions(+) create mode 100644 spark/src/test/java/org/apache/iceberg/spark/source/TestAvroWrite.java diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestAvroWrite.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestAvroWrite.java new file mode 100644 index 000000000000..1a79a8e43d26 --- /dev/null +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestAvroWrite.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.spark.source; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import java.io.File; +import java.io.IOException; +import java.sql.Timestamp; +import java.time.Instant; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.hadoop.HadoopTables; +import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import static org.apache.iceberg.types.Types.NestedField.optional; + +public class TestAvroWrite { + + @Rule + public TemporaryFolder temp = new TemporaryFolder(); + + private static SparkSession spark = null; + + @BeforeClass + public static void startSpark() { + TestAvroWrite.spark = SparkSession.builder().master("local[2]").getOrCreate(); + } + + @AfterClass + public static void stopSpark() { + SparkSession currentSpark = TestAvroWrite.spark; + TestAvroWrite.spark = null; + currentSpark.stop(); + } + + @Test + public void testNestedPartitioning() throws IOException { + Schema nestedSchema = new Schema( + optional(1, "id", Types.IntegerType.get()), + optional(2, "data", Types.StringType.get()), + optional(3, "nestedData", Types.StructType.of( + optional(4, "id", Types.IntegerType.get()), + optional(5, "moreData", Types.StringType.get()))), + optional(6, "timestamp", Types.TimestampType.withZone()) + ); + + File parent = temp.newFolder("parquet"); + File location = new File(parent, "test/iceberg"); + + HadoopTables tables = new HadoopTables(new Configuration()); + PartitionSpec spec = PartitionSpec.builderFor(nestedSchema) + .identity("id") + .day("timestamp") + .identity("nestedData.moreData") + .build(); + Table table = tables.create(nestedSchema, spec, ImmutableMap.of("write.format.default", "avro"), + location.toString()); + + List jsons = Lists.newArrayList( + "{ \"id\": 1, \"data\": \"a\", \"nestedData\": { \"id\": 100, \"moreData\": \"p1\"}, " + + "\"timestamp\": \"2017-12-01T10:12:55.034Z\" }", + "{ \"id\": 2, \"data\": \"b\", \"nestedData\": { \"id\": 200, \"moreData\": \"p1\"}, " + + "\"timestamp\": \"2017-12-02T10:12:55.034Z\" }", + "{ \"id\": 3, \"data\": \"c\", \"nestedData\": { \"id\": 300, \"moreData\": \"p2\"}, " + + "\"timestamp\": \"2017-12-03T10:12:55.034Z\" }", + "{ \"id\": 4, \"data\": \"d\", \"nestedData\": { \"id\": 400, \"moreData\": \"p2\"}, " + + "\"timestamp\": \"2017-12-04T10:12:55.034Z\" }" + ); + Dataset df = spark.read().schema(SparkSchemaUtil.convert(nestedSchema)) + .json(spark.createDataset(jsons, Encoders.STRING())); + + // TODO: incoming columns must be ordered according to the table's schema + df.select("id", "data", "nestedData", "timestamp").write() + .format("iceberg") + .mode("append") + .save(location.toString()); + + table.refresh(); + + Dataset result = spark.read() + .format("iceberg") + .load(location.toString()); + + List actual = result.orderBy("id").collectAsList(); + Assert.assertEquals("Number of rows should match", jsons.size(), actual.size()); + Assert.assertEquals("Row 1 col 1 is 1", 1, actual.get(0).getInt(0)); + Assert.assertEquals("Row 1 col 2 is a", "a", actual.get(0).getString(1)); + Assert.assertEquals("Row 1 col 3,1 is 100", 100, actual.get(0).getStruct(2).getInt(0)); + Assert.assertEquals("Row 1 col 3,2 is p1", "p1", actual.get(0).getStruct(2).getString(1)); + Assert.assertEquals("Row 1 col 4 is 2017-12-01T10:12:55.034+00:00", + 0, actual.get(0).getTimestamp(3).compareTo(Timestamp.from(Instant.parse("2017-12-01T10:12:55.034Z")))); + Assert.assertEquals("Row 2 col 1 is 2", 2, actual.get(1).getInt(0)); + Assert.assertEquals("Row 2 col 2 is b", "b", actual.get(1).getString(1)); + Assert.assertEquals("Row 2 col 3,1 is 200", 200, actual.get(1).getStruct(2).getInt(0)); + Assert.assertEquals("Row 2 col 3,2 is p1", "p1", actual.get(1).getStruct(2).getString(1)); + Assert.assertEquals("Row 2 col 4 is 2017-12-02 12:12:55.034", + 0, actual.get(1).getTimestamp(3).compareTo(Timestamp.from(Instant.parse("2017-12-02T10:12:55.034Z")))); + Assert.assertEquals("Row 3 col 1 is 3", 3, actual.get(2).getInt(0)); + Assert.assertEquals("Row 3 col 2 is c", "c", actual.get(2).getString(1)); + Assert.assertEquals("Row 3 col 3,1 is 300", 300, actual.get(2).getStruct(2).getInt(0)); + Assert.assertEquals("Row 3 col 3,2 is p2", "p2", actual.get(2).getStruct(2).getString(1)); + Assert.assertEquals("Row 3 col 4 is 2017-12-03 12:12:55.034", + 0, actual.get(2).getTimestamp(3).compareTo(Timestamp.from(Instant.parse("2017-12-03T10:12:55.034Z")))); + Assert.assertEquals("Row 4 col 1 is 4", 4, actual.get(3).getInt(0)); + Assert.assertEquals("Row 4 col 2 is d", "d", actual.get(3).getString(1)); + Assert.assertEquals("Row 4 col 3,1 is 400", 400, actual.get(3).getStruct(2).getInt(0)); + Assert.assertEquals("Row 4 col 3,2 is p2", "p2", actual.get(3).getStruct(2).getString(1)); + Assert.assertEquals("Row 4 col 4 is 2017-12-04 12:12:55.034", + 0, actual.get(3).getTimestamp(3).compareTo(Timestamp.from(Instant.parse("2017-12-04T10:12:55.034Z")))); + } +}