Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 42 additions & 9 deletions api/src/main/java/org/apache/iceberg/types/Types.java
Original file line number Diff line number Diff line change
Expand Up @@ -415,43 +415,64 @@ public int hashCode() {

public static class NestedField implements Serializable {
public static NestedField optional(int id, String name, Type type) {
return new NestedField(true, id, name, type, null);
return new NestedField(true, id, name, type, null, null, null);
}

public static NestedField optional(int id, String name, Type type, String doc) {
return new NestedField(true, id, name, type, doc);
return new NestedField(true, id, name, type, doc, null, null);
}

public static NestedField optional(int id, String name, Type type, String doc,
Object initialDefault, Object writeDefault) {
return new NestedField(true, id, name, type, doc, initialDefault, writeDefault);
}

public static NestedField required(int id, String name, Type type) {
return new NestedField(false, id, name, type, null);
return new NestedField(false, id, name, type, null, null, null);
}

public static NestedField required(int id, String name, Type type, String doc) {
return new NestedField(false, id, name, type, doc);
return new NestedField(false, id, name, type, doc, null, null);
}

public static NestedField required(int id, String name, Type type, String doc,
Object initialDefault, Object writeDefault) {
return new NestedField(false, id, name, type, doc, initialDefault, writeDefault);
}

public static NestedField of(int id, boolean isOptional, String name, Type type) {
return new NestedField(isOptional, id, name, type, null);
return new NestedField(isOptional, id, name, type, null, null, null);
}

public static NestedField of(int id, boolean isOptional, String name, Type type, String doc) {
return new NestedField(isOptional, id, name, type, doc);
return new NestedField(isOptional, id, name, type, doc, null, null);
}

public static NestedField of(int id, boolean isOptional, String name, Type type, String doc,
Object initialDefault, Object writeDefault) {
return new NestedField(isOptional, id, name, type, doc, initialDefault, writeDefault);
}

private final boolean isOptional;
private final int id;
private final String name;
private final Type type;
private final String doc;
private final Object initialDefault;
private final Object writeDefault;


private NestedField(boolean isOptional, int id, String name, Type type, String doc) {
private NestedField(boolean isOptional, int id, String name, Type type, String doc,
Object initialDefault, Object writeDefault) {
Preconditions.checkNotNull(name, "Name cannot be null");
Preconditions.checkNotNull(type, "Type cannot be null");
this.isOptional = isOptional;
this.id = id;
this.name = name;
this.type = type;
this.doc = doc;
this.initialDefault = initialDefault;
this.writeDefault = writeDefault;
}

public boolean isOptional() {
Expand All @@ -462,7 +483,7 @@ public NestedField asOptional() {
if (isOptional) {
return this;
}
return new NestedField(true, id, name, type, doc);
return new NestedField(true, id, name, type, doc, initialDefault, writeDefault);
}

public boolean isRequired() {
Expand All @@ -473,7 +494,11 @@ public NestedField asRequired() {
if (!isOptional) {
return this;
}
return new NestedField(false, id, name, type, doc);
return new NestedField(false, id, name, type, doc, initialDefault, writeDefault);
}

public NestedField updateWriteDefault(Object newWriteDefault) {
return new NestedField(isOptional, id, name, type, doc, initialDefault, newWriteDefault);
}

public int fieldId() {
Expand All @@ -492,6 +517,14 @@ public String doc() {
return doc;
}

public Object initialDefaultValue() {
return initialDefault;
}

public Object writeDefaultValue() {
return writeDefault;
}

@Override
public String toString() {
return String.format("%d: %s: %s %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ private AvroSchemaUtil() {
public static final String ELEMENT_ID_PROP = "element-id";
public static final String ADJUST_TO_UTC_PROP = "adjust-to-utc";

public static final String SHOULD_USE_INIT_DEFAULT = "_default_exist";

private static final Schema NULL = Schema.create(Schema.Type.NULL);
private static final Schema.Type MAP = Schema.Type.MAP;
private static final Schema.Type ARRAY = Schema.Type.ARRAY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public static <T> T visit(org.apache.iceberg.Schema iSchema, Schema schema, Avro
public static <T> T visit(Type iType, Schema schema, AvroSchemaWithTypeVisitor<T> visitor) {
switch (schema.getType()) {
case RECORD:
return visitRecord(iType != null ? iType.asStructType() : null, schema, visitor);
return visitor.visitRecord(iType != null ? iType.asStructType() : null, schema);

case UNION:
return visitUnion(iType, schema, visitor);
Expand All @@ -53,13 +53,13 @@ public static <T> T visit(Type iType, Schema schema, AvroSchemaWithTypeVisitor<T
}
}

private static <T> T visitRecord(Types.StructType struct, Schema record, AvroSchemaWithTypeVisitor<T> visitor) {
protected T visitRecord(Types.StructType struct, Schema record) {
// check to make sure this hasn't been visited before
String name = record.getFullName();
Preconditions.checkState(!visitor.recordLevels.contains(name),
Preconditions.checkState(!recordLevels.contains(name),
"Cannot process recursive Avro record %s", name);

visitor.recordLevels.push(name);
recordLevels.push(name);

List<Schema.Field> fields = record.getFields();
List<String> names = Lists.newArrayListWithExpectedSize(fields.size());
Expand All @@ -68,12 +68,12 @@ private static <T> T visitRecord(Types.StructType struct, Schema record, AvroSch
int fieldId = AvroSchemaUtil.getFieldId(field);
Types.NestedField iField = struct != null ? struct.field(fieldId) : null;
names.add(field.name());
results.add(visit(iField != null ? iField.type() : null, field.schema(), visitor));
results.add(visit(iField != null ? iField.type() : null, field.schema(), this));
}

visitor.recordLevels.pop();
recordLevels.pop();

return visitor.record(struct, record, names, results);
return record(struct, record, names, results);
}

private static <T> T visitUnion(Type type, Schema union, AvroSchemaWithTypeVisitor<T> visitor) {
Expand Down Expand Up @@ -107,7 +107,7 @@ private static <T> T visitArray(Type type, Schema array, AvroSchemaWithTypeVisit
}
}

private Deque<String> recordLevels = Lists.newLinkedList();
protected Deque<String> recordLevels = Lists.newLinkedList();

public T record(Types.StructType iStruct, Schema record, List<String> names, List<T> fields) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,16 +99,25 @@ public Schema record(Schema record, List<String> names, Iterable<Schema.Field> s

if (avroField != null) {
updatedFields.add(avroField);

} else {
Preconditions.checkArgument(
field.isOptional() || MetadataColumns.metadataFieldIds().contains(field.fieldId()),
(field.isRequired() && field.initialDefaultValue() != null) ||
field.isOptional() || MetadataColumns.metadataFieldIds().contains(field.fieldId()),
"Missing required field: %s", field.name());
// Create a field that will be defaulted to null. We assign a unique suffix to the field
// to make sure that even if records in the file have the field it is not projected.
Schema.Field newField = new Schema.Field(
field.name() + "_r" + field.fieldId(),
AvroSchemaUtil.toOption(AvroSchemaUtil.convert(field.type())), null, JsonProperties.NULL_VALUE);
// If the field from Iceberg schema has initial default value, we give a special
// mark to this created avro field, so that in the later stage, the reader can identify
// this field and use a constant reader to read the field, rather than returning null
// as delegated from avro file reader
if (field.initialDefaultValue() != null) {
newField.addProp(AvroSchemaUtil.SHOULD_USE_INIT_DEFAULT, true);
} else {
newField.addProp(AvroSchemaUtil.SHOULD_USE_INIT_DEFAULT, false);
}
newField.addProp(AvroSchemaUtil.FIELD_ID_PROP, field.fieldId());
updatedFields.add(newField);
hasChange = true;
Expand Down Expand Up @@ -146,7 +155,6 @@ public Schema.Field field(Schema.Field field, Supplier<Schema> fieldResult) {
// always copy because fields can't be reused
return AvroSchemaUtil.copyField(field, field.schema(), field.name());
}

} finally {
this.current = struct;
}
Expand Down Expand Up @@ -191,11 +199,9 @@ public Schema array(Schema array, Supplier<Schema> element) {
}

return array;

} finally {
this.current = asMapType;
}

} else {
Preconditions.checkArgument(current.isListType(),
"Incompatible projected type: %s", current);
Expand All @@ -210,7 +216,6 @@ public Schema array(Schema array, Supplier<Schema> element) {
}

return array;

} finally {
this.current = list;
}
Expand All @@ -234,7 +239,6 @@ public Schema map(Schema map, Supplier<Schema> value) {
}

return map;

} finally {
this.current = asMapType;
}
Expand All @@ -260,5 +264,4 @@ public Schema primitive(Schema primitive) {
return primitive;
}
}

}
6 changes: 6 additions & 0 deletions orc/src/main/java/org/apache/iceberg/orc/ORCSchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,12 @@ private static TypeDescription buildOrcProjection(Integer fieldId, Type type, bo
case STRUCT:
orcType = TypeDescription.createStruct();
for (Types.NestedField nestedField : type.asStructType().fields()) {
// When we have a new evolved field in Iceberg schema which has an initial default value,
// but the underlying orc file lacks that field, we ignore projecting this field to the orc
// file reader schema, but instead populate this field inside Iceberg using a ConstantReader
if (mapping.get(nestedField.fieldId()) == null && nestedField.initialDefaultValue() != null) {
continue;
}
// Using suffix _r to avoid potential underlying issues in ORC reader
// with reused column names between ORC and Iceberg;
// e.g. renaming column c -> d and adding new column d
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public static <T> T visit(
public static <T> T visit(Type iType, TypeDescription schema, OrcSchemaWithTypeVisitor<T> visitor) {
switch (schema.getCategory()) {
case STRUCT:
return visitRecord(iType != null ? iType.asStructType() : null, schema, visitor);
return visitor.visitRecord(iType != null ? iType.asStructType() : null, schema, visitor);

case UNION:
throw new UnsupportedOperationException("Cannot handle " + schema);
Expand All @@ -58,7 +58,7 @@ public static <T> T visit(Type iType, TypeDescription schema, OrcSchemaWithTypeV
}
}

private static <T> T visitRecord(
public T visitRecord(
Types.StructType struct, TypeDescription record, OrcSchemaWithTypeVisitor<T> visitor) {
List<TypeDescription> fields = record.getChildren();
List<String> names = record.getFieldNames();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,16 @@ public void setRowPositionSupplier(Supplier<Long> posSupplier) {
}
}

private static class ReadBuilder extends AvroSchemaWithTypeVisitor<ValueReader<?>> {
private final Map<Integer, ?> idToConstant;
private static class ReadBuilder extends SparkDefaultValueAwareAvroSchemaWithTypeVisitor<ValueReader<?>> {

private ReadBuilder(Map<Integer, ?> idToConstant) {
this.idToConstant = idToConstant;
super(idToConstant);
}

@Override
public ValueReader<?> record(Types.StructType expected, Schema record, List<String> names,
List<ValueReader<?>> fields) {
return SparkValueReaders.struct(fields, expected, idToConstant);
return SparkValueReaders.struct(fields, expected, getIdToConstant());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.spark.data;

import java.util.List;
import java.util.Map;
import org.apache.avro.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.avro.AvroSchemaWithTypeVisitor;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.spark.source.BaseDataReader;
import org.apache.iceberg.types.Types;

public abstract class SparkDefaultValueAwareAvroSchemaWithTypeVisitor<T> extends AvroSchemaWithTypeVisitor<T> {

private final Map<Integer, Object> idToConstant;

protected Map<Integer, Object> getIdToConstant() {
return idToConstant;
}

protected SparkDefaultValueAwareAvroSchemaWithTypeVisitor(Map<Integer, ?> idToConstant) {
this.idToConstant = Maps.newHashMap();
this.idToConstant.putAll(idToConstant);
}

@Override
public T visitRecord(Types.StructType struct, Schema record) {
// check to make sure this hasn't been visited before
String name = record.getFullName();
Preconditions.checkState(!recordLevels.contains(name),
"Cannot process recursive Avro record %s", name);

recordLevels.push(name);

List<Schema.Field> fields = record.getFields();
List<String> names = Lists.newArrayListWithExpectedSize(fields.size());
List<T> results = Lists.newArrayListWithExpectedSize(fields.size());
for (Schema.Field field : fields) {
int fieldId = AvroSchemaUtil.getFieldId(field);
Types.NestedField iField = struct != null ? struct.field(fieldId) : null;
Object shouldUseDefaultFlag = field.getObjectProp(AvroSchemaUtil.SHOULD_USE_INIT_DEFAULT);
if (iField != null && shouldUseDefaultFlag != null && (Boolean) shouldUseDefaultFlag) {
idToConstant.put(
fieldId,
BaseDataReader.convertConstant(iField.type(), iField.initialDefaultValue()));
}
names.add(field.name());
results.add(visit(iField != null ? iField.type() : null, field.schema(), this));
}

recordLevels.pop();

return record(struct, record, names, results);
}
}
Loading