Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion core/src/main/java/org/apache/iceberg/avro/Avro.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.mapping.NameMapping;

import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_DEFAULT;
Expand Down Expand Up @@ -170,6 +171,7 @@ public static class ReadBuilder {
private final ClassLoader defaultLoader = Thread.currentThread().getContextClassLoader();
private final InputFile file;
private final Map<String, String> renames = Maps.newLinkedHashMap();
private NameMapping nameMapping;
Comment thread
rdsr marked this conversation as resolved.
private boolean reuseContainers = false;
private org.apache.iceberg.Schema schema = null;
private Function<Schema, DatumReader<?>> createReaderFunc = readSchema -> {
Expand Down Expand Up @@ -223,10 +225,15 @@ public ReadBuilder rename(String fullName, String newName) {
return this;
}

public ReadBuilder nameMapping(NameMapping newNameMapping) {
this.nameMapping = newNameMapping;
return this;
}

public <D> AvroIterable<D> build() {
Preconditions.checkNotNull(schema, "Schema is required");
return new AvroIterable<>(file,
new ProjectionDatumReader<>(createReaderFunc, schema, renames),
new ProjectionDatumReader<>(createReaderFunc, schema, renames, nameMapping),
start, length, reuseContainers);
}
}
Expand Down
82 changes: 74 additions & 8 deletions core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.avro.JsonProperties;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.iceberg.mapping.MappedField;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
Expand Down Expand Up @@ -83,8 +86,8 @@ public static Map<Type, Schema> convertTypes(Types.StructType type, String name)
return ImmutableMap.copyOf(converter.getConversionMap());
}

public static Schema pruneColumns(Schema schema, Set<Integer> selectedIds) {
return new PruneColumns(selectedIds).rootSchema(schema);
public static Schema pruneColumns(Schema schema, Set<Integer> selectedIds, NameMapping nameMapping) {
return new PruneColumns(selectedIds, nameMapping).rootSchema(schema);
}

public static Schema buildAvroProjection(Schema schema, org.apache.iceberg.Schema expected,
Expand Down Expand Up @@ -196,15 +199,35 @@ static Schema createProjectionMap(String recordName,
return LogicalMap.get().addToSchema(Schema.createArray(keyValueRecord));
}

private static int getId(Schema schema, String propertyName) {
private static Integer getId(Schema schema, String propertyName) {
Integer id = getId(schema, propertyName, null, null);
Preconditions.checkNotNull(id, "Missing expected '%s' property", propertyName);
return id;
}

private static Integer getId(Schema schema, String propertyName, NameMapping nameMapping, List<String> names) {
if (schema.getType() == UNION) {
return getId(fromOption(schema), propertyName);
return getId(fromOption(schema), propertyName, nameMapping, names);
}

Object id = schema.getObjectProp(propertyName);
Preconditions.checkNotNull(id, "Missing expected '%s' property", propertyName);
if (id != null) {
return toInt(id);
} else if (nameMapping != null) {
MappedField mappedField = nameMapping.find(names);
if (mappedField != null) {
return mappedField.id();
}
}

return toInt(id);
return null;
}

static boolean hasProperty(Schema schema, String propertyName) {
if (schema.getType() == UNION) {
return hasProperty(fromOption(schema), propertyName);
}
return schema.getObjectProp(propertyName) != null;
}

public static int getKeyId(Schema schema) {
Expand All @@ -213,23 +236,66 @@ public static int getKeyId(Schema schema) {
return getId(schema, KEY_ID_PROP);
}

static Integer getKeyId(Schema schema, NameMapping nameMapping, Iterable<String> parentFieldNames) {
Preconditions.checkArgument(schema.getType() == MAP,
"Cannot get map key id for non-map schema: %s", schema);
List<String> names = Lists.newArrayList(parentFieldNames);
names.add("key");
return getId(schema, KEY_ID_PROP, nameMapping, names);
}

public static int getValueId(Schema schema) {
Preconditions.checkArgument(schema.getType() == MAP,
"Cannot get map value id for non-map schema: %s", schema);
return getId(schema, VALUE_ID_PROP);
}

static Integer getValueId(Schema schema, NameMapping nameMapping, Iterable<String> parentFieldNames) {
Preconditions.checkArgument(schema.getType() == MAP,
"Cannot get map value id for non-map schema: %s", schema);
List<String> names = Lists.newArrayList(parentFieldNames);
names.add("value");
return getId(schema, VALUE_ID_PROP, nameMapping, names);
}

public static int getElementId(Schema schema) {
Preconditions.checkArgument(schema.getType() == ARRAY,
"Cannot get array element id for non-array schema: %s", schema);
return getId(schema, ELEMENT_ID_PROP);
}

static Integer getElementId(Schema schema, NameMapping nameMapping, Iterable<String> parentFieldNames) {
Preconditions.checkArgument(schema.getType() == ARRAY,
"Cannot get array element id for non-array schema: %s", schema);
List<String> names = Lists.newArrayList(parentFieldNames);
names.add("element");
return getId(schema, ELEMENT_ID_PROP, nameMapping, names);
}

public static int getFieldId(Schema.Field field) {
Object id = field.getObjectProp(FIELD_ID_PROP);
Integer id = getFieldId(field, null, null);
Preconditions.checkNotNull(id, "Missing expected '%s' property", FIELD_ID_PROP);
return id;
}

static Integer getFieldId(Schema.Field field, NameMapping nameMapping, Iterable<String> parentFieldNames) {
Object id = field.getObjectProp(FIELD_ID_PROP);
if (id != null) {
return toInt(id);
} else if (nameMapping != null) {
List<String> names = Lists.newArrayList(parentFieldNames);
names.add(field.name());
MappedField mappedField = nameMapping.find(names);
if (mappedField != null) {
return mappedField.id();
}
}

return null;
}

return toInt(id);
public static boolean hasFieldId(Schema.Field field) {
return field.getObjectProp(FIELD_ID_PROP) != null;
}

private static int toInt(Object value) {
Expand Down
25 changes: 22 additions & 3 deletions core/src/main/java/org/apache/iceberg/avro/AvroSchemaVisitor.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ public static <T> T visit(Schema schema, AvroSchemaVisitor<T> visitor) {
List<T> results = Lists.newArrayListWithExpectedSize(fields.size());
for (Schema.Field field : schema.getFields()) {
names.add(field.name());
results.add(visit(field.schema(), visitor));
T result = visitWithName(field.name(), field.schema(), visitor);
results.add(result);
}

visitor.recordLevels.pop();
Expand All @@ -57,17 +58,35 @@ public static <T> T visit(Schema schema, AvroSchemaVisitor<T> visitor) {
return visitor.union(schema, options);

case ARRAY:
return visitor.array(schema, visit(schema.getElementType(), visitor));
if (schema.getLogicalType() instanceof LogicalMap || AvroSchemaUtil.isKeyValueSchema(schema.getElementType())) {
return visitor.array(schema, visit(schema.getElementType(), visitor));
} else {
return visitor.array(schema, visitWithName("element", schema.getElementType(), visitor));
}

case MAP:
return visitor.map(schema, visit(schema.getValueType(), visitor));
return visitor.map(schema, visitWithName("value", schema.getValueType(), visitor));

default:
return visitor.primitive(schema);
}
}

private Deque<String> recordLevels = Lists.newLinkedList();
private Deque<String> fieldNames = Lists.newLinkedList();

protected Deque<String> fieldNames() {
return fieldNames;
}

private static <T> T visitWithName(String name, Schema schema, AvroSchemaVisitor<T> visitor) {
try {
visitor.fieldNames.addLast(name);
return visit(schema, visitor);
} finally {
visitor.fieldNames.removeLast();
}
}

public T record(Schema record, List<String> names, List<T> fields) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,10 @@ public Schema record(Schema record, List<String> names, Iterable<Schema.Field> s

} else {
Preconditions.checkArgument(field.isOptional(), "Missing required field: %s", field.name());
// create a field that will be defaulted to null
// Create a field that will be defaulted to null. We assign a unique suffix to the field
// to make sure that even if records in the file have the field it is not projected.
Schema.Field newField = new Schema.Field(
field.name(),
field.name() + "_r" + field.fieldId(),
AvroSchemaUtil.toOption(AvroSchemaUtil.convert(field.type())), null, JsonProperties.NULL_VALUE);
newField.addProp(AvroSchemaUtil.FIELD_ID_PROP, field.fieldId());
updatedFields.add(newField);
Expand All @@ -115,7 +116,7 @@ public Schema record(Schema record, List<String> names, Iterable<Schema.Field> s
public Schema.Field field(Schema.Field field, Supplier<Schema> fieldResult) {
Types.StructType struct = current.asNestedType().asStructType();
int fieldId = AvroSchemaUtil.getFieldId(field);
Types.NestedField expectedField = struct.field(fieldId); // TODO: what if there are no ids?
Types.NestedField expectedField = struct.field(fieldId);
Comment thread
rdsr marked this conversation as resolved.

// if the field isn't present, it was not selected
if (expectedField == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,29 +26,33 @@
import org.apache.avro.Schema;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.types.TypeUtil;

public class ProjectionDatumReader<D> implements DatumReader<D> {
private final Function<Schema, DatumReader<?>> getReader;
private final org.apache.iceberg.Schema expectedSchema;
private final Map<String, String> renames;
private final NameMapping nameMapping;
private Schema readSchema = null;
private Schema fileSchema = null;
private DatumReader<D> wrapped = null;

public ProjectionDatumReader(Function<Schema, DatumReader<?>> getReader,
org.apache.iceberg.Schema expectedSchema,
Map<String, String> renames) {
Map<String, String> renames,
NameMapping nameMapping) {
this.getReader = getReader;
this.expectedSchema = expectedSchema;
this.renames = renames;
this.nameMapping = nameMapping;
}

@Override
public void setSchema(Schema newFileSchema) {
this.fileSchema = newFileSchema;
Set<Integer> projectedIds = TypeUtil.getProjectedIds(expectedSchema);
Schema prunedSchema = AvroSchemaUtil.pruneColumns(newFileSchema, projectedIds);
Schema prunedSchema = AvroSchemaUtil.pruneColumns(newFileSchema, projectedIds, nameMapping);
Comment thread
rdsr marked this conversation as resolved.
this.readSchema = AvroSchemaUtil.buildAvroProjection(prunedSchema, expectedSchema, renames);
this.wrapped = newDatumReader();
}
Expand Down
Loading