Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/avro/Avro.java
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ public ReadBuilder rename(String fullName, String newName) {
return this;
}

public ReadBuilder nameMapping(NameMapping newNameMapping) {
public ReadBuilder withNameMapping(NameMapping newNameMapping) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rdsr, just want to confirm that you're okay with this change? If it is going to cause you problems because you've already deployed it, we can go with the original.

this.nameMapping = newNameMapping;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ private Record writeAndRead(Schema writeSchema,

Iterable<GenericData.Record> records = Avro.read(Files.localInput(file))
.project(readSchema)
.nameMapping(nameMapping)
.withNameMapping(nameMapping)
.build();

return Iterables.getOnlyElement(records);
Expand Down
110 changes: 110 additions & 0 deletions parquet/src/main/java/org/apache/iceberg/parquet/ApplyNameMapping.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.iceberg.parquet;

import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.iceberg.mapping.MappedField;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.parquet.Preconditions;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;

class ApplyNameMapping extends ParquetTypeVisitor<Type> {
private final NameMapping nameMapping;

ApplyNameMapping(NameMapping nameMapping) {
this.nameMapping = nameMapping;
}

@Override
public Type message(MessageType message, List<Type> fields) {
Types.MessageTypeBuilder builder = org.apache.parquet.schema.Types.buildMessage();
fields.stream().filter(Objects::nonNull).forEach(builder::addField);

return builder.named(message.getName());
}

@Override
public Type struct(GroupType struct, List<Type> types) {
MappedField field = nameMapping.find(currentPath());
List<Type> actualTypes = types.stream().filter(Objects::nonNull).collect(Collectors.toList());
Type structType = struct.withNewFields(actualTypes);

return field == null ? structType : structType.withId(field.id());
}

@Override
public Type list(GroupType list, Type elementType) {
Preconditions.checkArgument(elementType != null,
"List type must have element field");

MappedField field = nameMapping.find(currentPath());
Type listType = org.apache.parquet.schema.Types.list(list.getRepetition())
.element(elementType)
.named(list.getName());

return field == null ? listType : listType.withId(field.id());
}

@Override
public Type map(GroupType map, Type keyType, Type valueType) {
Preconditions.checkArgument(keyType != null && valueType != null,
"Map type must have both key field and value field");

MappedField field = nameMapping.find(currentPath());
Type mapType = org.apache.parquet.schema.Types.map(map.getRepetition())
.key(keyType)
.value(valueType)
.named(map.getName());

return field == null ? mapType : mapType.withId(field.id());
}

@Override
public Type primitive(PrimitiveType primitive) {
MappedField field = nameMapping.find(currentPath());
return field == null ? primitive : primitive.withId(field.id());
}

@Override
public void beforeRepeatedElement(Type element) {
// do not add the repeated element's name
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add comments to these methods to explain why they are here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, let me add the comments back.


@Override
public void afterRepeatedElement(Type element) {
// do not remove the repeated element's name
}

@Override
public void beforeRepeatedKeyValue(Type keyValue) {
// do not add the repeated element's name
}

@Override
public void afterRepeatedKeyValue(Type keyValue) {
// do not remove the repeated element's name
}
}
25 changes: 21 additions & 4 deletions parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
Expand Down Expand Up @@ -312,6 +313,7 @@ public static class ReadBuilder {
private boolean callInit = false;
private boolean reuseContainers = false;
private int maxRecordsPerBatch = 10000;
private NameMapping nameMapping = null;

private ReadBuilder(InputFile file) {
this.file = file;
Expand Down Expand Up @@ -393,6 +395,11 @@ public ReadBuilder recordsPerBatch(int numRowsPerBatch) {
return this;
}

public ReadBuilder withNameMapping(NameMapping newNameMapping) {
this.nameMapping = newNameMapping;
return this;
}

@SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"})
public <D> CloseableIterable<D> build() {
if (readerFunc != null || batchedReaderFunc != null) {
Expand All @@ -419,11 +426,11 @@ public <D> CloseableIterable<D> build() {
ParquetReadOptions options = optionsBuilder.build();

if (batchedReaderFunc != null) {
return new VectorizedParquetReader(file, schema, options, batchedReaderFunc, filter, reuseContainers,
caseSensitive, maxRecordsPerBatch);
return new VectorizedParquetReader(file, schema, options, batchedReaderFunc, nameMapping, filter,
reuseContainers, caseSensitive, maxRecordsPerBatch);
} else {
return new org.apache.iceberg.parquet.ParquetReader<>(
file, schema, options, readerFunc, filter, reuseContainers, caseSensitive);
file, schema, options, readerFunc, nameMapping, filter, reuseContainers, caseSensitive);
}
}

Expand Down Expand Up @@ -475,6 +482,10 @@ public <D> CloseableIterable<D> build() {
builder.withFileRange(start, start + length);
}

if (nameMapping != null) {
builder.withNameMapping(nameMapping);
}

return new ParquetIterable<>(builder);
}
}
Expand All @@ -483,6 +494,7 @@ private static class ParquetReadBuilder<T> extends ParquetReader.Builder<T> {
private Schema schema = null;
private ReadSupport<T> readSupport = null;
private boolean callInit = false;
private NameMapping nameMapping = null;

private ParquetReadBuilder(org.apache.parquet.io.InputFile file) {
super(file);
Expand All @@ -493,6 +505,11 @@ public ParquetReadBuilder<T> project(Schema newSchema) {
return this;
}

public ParquetReadBuilder<T> withNameMapping(NameMapping newNameMapping) {
this.nameMapping = newNameMapping;
return this;
}

public ParquetReadBuilder<T> readSupport(ReadSupport<T> newReadSupport) {
this.readSupport = newReadSupport;
return this;
Expand All @@ -505,7 +522,7 @@ public ParquetReadBuilder<T> callInit() {

@Override
protected ReadSupport<T> getReadSupport() {
return new ParquetReadSupport<>(schema, readSupport, callInit);
return new ParquetReadSupport<>(schema, readSupport, callInit, nameMapping);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.Schema;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.parquet.avro.AvroReadSupport;
Expand All @@ -41,11 +42,13 @@ class ParquetReadSupport<T> extends ReadSupport<T> {
private final Schema expectedSchema;
private final ReadSupport<T> wrapped;
private final boolean callInit;
private final NameMapping nameMapping;

ParquetReadSupport(Schema expectedSchema, ReadSupport<T> readSupport, boolean callInit) {
ParquetReadSupport(Schema expectedSchema, ReadSupport<T> readSupport, boolean callInit, NameMapping nameMapping) {
this.expectedSchema = expectedSchema;
this.wrapped = readSupport;
this.callInit = callInit;
this.nameMapping = nameMapping;
}

@Override
Expand All @@ -55,9 +58,15 @@ public ReadContext init(Configuration configuration, Map<String, String> keyValu
// matching to the file's columns by full path, so this must select columns by using the path
// in the file's schema.

MessageType projection = ParquetSchemaUtil.hasIds(fileSchema) ?
ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema) :
ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema);
MessageType projection;
if (ParquetSchemaUtil.hasIds(fileSchema)) {
projection = ParquetSchemaUtil.pruneColumns(fileSchema, expectedSchema);
} else if (nameMapping != null) {
MessageType typeWithIds = ParquetSchemaUtil.applyNameMapping(fileSchema, nameMapping);
projection = ParquetSchemaUtil.pruneColumns(typeWithIds, expectedSchema);
} else {
projection = ParquetSchemaUtil.pruneColumnsFallback(fileSchema, expectedSchema);
}

// override some known backward-compatibility options
configuration.set("parquet.strict.typing", "false");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.hadoop.ParquetFileReader;
Expand All @@ -42,9 +43,10 @@ public class ParquetReader<T> extends CloseableGroup implements CloseableIterabl
private final Expression filter;
private final boolean reuseContainers;
private final boolean caseSensitive;
private final NameMapping nameMapping;

public ParquetReader(InputFile input, Schema expectedSchema, ParquetReadOptions options,
Function<MessageType, ParquetValueReader<?>> readerFunc,
Function<MessageType, ParquetValueReader<?>> readerFunc, NameMapping nameMapping,
Expression filter, boolean reuseContainers, boolean caseSensitive) {
this.input = input;
this.expectedSchema = expectedSchema;
Expand All @@ -54,14 +56,16 @@ public ParquetReader(InputFile input, Schema expectedSchema, ParquetReadOptions
this.filter = filter == Expressions.alwaysTrue() ? null : filter;
this.reuseContainers = reuseContainers;
this.caseSensitive = caseSensitive;
this.nameMapping = nameMapping;
}

private ReadConf<T> conf = null;

private ReadConf<T> init() {
if (conf == null) {
ReadConf<T> readConf = new ReadConf<>(
input, options, expectedSchema, filter, readerFunc, null, reuseContainers, caseSensitive, null);
input, options, expectedSchema, filter, readerFunc, null, nameMapping, reuseContainers,
caseSensitive, null);
this.conf = readConf.copy();
return readConf;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@

package org.apache.iceberg.parquet;

import java.util.List;
import java.util.Set;
import org.apache.iceberg.Schema;
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.TypeUtil;
import org.apache.iceberg.types.Types;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types.MessageTypeBuilder;

Expand Down Expand Up @@ -83,22 +87,7 @@ public static MessageType pruneColumnsFallback(MessageType fileSchema, Schema ex
}

public static boolean hasIds(MessageType fileSchema) {
try {
// Try to convert the type to Iceberg. If an ID assignment is needed, return false.
ParquetTypeVisitor.visit(fileSchema, new MessageTypeToType(fileSchema) {
@Override
protected int nextId() {
throw new IllegalStateException("Needed to assign ID");
}
});

// no assignment was needed
return true;

} catch (IllegalStateException e) {
// at least one field was missing an id.
return false;
}
return ParquetTypeVisitor.visit(fileSchema, new HasIds());
}

public static MessageType addFallbackIds(MessageType fileSchema) {
Expand All @@ -112,4 +101,41 @@ public static MessageType addFallbackIds(MessageType fileSchema) {

return builder.named(fileSchema.getName());
}

public static MessageType applyNameMapping(MessageType fileSchema, NameMapping nameMapping) {
return (MessageType) ParquetTypeVisitor.visit(fileSchema, new ApplyNameMapping(nameMapping));
}

public static class HasIds extends ParquetTypeVisitor<Boolean> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good.

@Override
public Boolean message(MessageType message, List<Boolean> fields) {
return struct(message, fields);
}

@Override
public Boolean struct(GroupType struct, List<Boolean> hasIds) {
for (Boolean hasId : hasIds) {
if (hasId) {
return true;
}
}
return struct.getId() != null;
}

@Override
public Boolean list(GroupType array, Boolean hasId) {
return hasId || array.getId() != null;
}

@Override
public Boolean map(GroupType map, Boolean keyHasId, Boolean valueHasId) {
return keyHasId || valueHasId || map.getId() != null;
}

@Override
public Boolean primitive(PrimitiveType primitive) {
return primitive.getId() != null;
}
}

}
Loading