From 7a59d0b392eda6ef1922e2e721e617b200f07f1d Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 22 Apr 2020 15:16:22 -0700 Subject: [PATCH 1/4] Store ManifestEntry fields in DataFile. This updates DataFile to contain metadata from ManifestEntry because the separation no longer makes sense. V1 metadata files still use ManifestEntry, but v2 will not. --- .../java/org/apache/iceberg/DataFile.java | 69 +++++-- .../java/org/apache/iceberg/FileStatus.java | 42 ++++ .../org/apache/iceberg/types/TypeUtil.java | 41 +++- .../java/org/apache/iceberg/TestHelpers.java | 15 ++ .../org/apache/iceberg/GenericDataFile.java | 186 +++++++++++++++++- .../apache/iceberg/GenericManifestEntry.java | 179 +---------------- .../org/apache/iceberg/ManifestEntry.java | 10 +- .../apache/iceberg/ManifestEntryWrapper.java | 94 +++++++++ .../org/apache/iceberg/ManifestWriter.java | 15 +- .../java/org/apache/iceberg/V1Metadata.java | 44 ++++- .../org/apache/iceberg/TableTestBase.java | 4 +- .../apache/iceberg/spark/SparkDataFile.java | 16 ++ .../iceberg/TestDataFileSerialization.java | 8 +- 13 files changed, 503 insertions(+), 220 deletions(-) create mode 100644 api/src/main/java/org/apache/iceberg/FileStatus.java create mode 100644 core/src/main/java/org/apache/iceberg/ManifestEntryWrapper.java diff --git a/api/src/main/java/org/apache/iceberg/DataFile.java b/api/src/main/java/org/apache/iceberg/DataFile.java index 90758a3d73d0..c481f84c66d7 100644 --- a/api/src/main/java/org/apache/iceberg/DataFile.java +++ b/api/src/main/java/org/apache/iceberg/DataFile.java @@ -22,6 +22,7 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Map; +import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.BinaryType; import org.apache.iceberg.types.Types.IntegerType; import org.apache.iceberg.types.Types.ListType; @@ -37,31 +38,61 @@ * Interface for files listed in a table manifest. */ public interface DataFile { + Types.NestedField FILE_PATH = required(100, "file_path", StringType.get()); + Types.NestedField FILE_FORMAT = required(101, "file_format", StringType.get()); + Types.NestedField RECORD_COUNT = required(103, "record_count", LongType.get()); + Types.NestedField FILE_SIZE = required(104, "file_size_in_bytes", LongType.get()); + Types.NestedField BLOCK_SIZE = required(105, "block_size_in_bytes", LongType.get()); + Types.NestedField COLUMN_SIZES = optional(108, "column_sizes", MapType.ofRequired(117, 118, + IntegerType.get(), LongType.get())); + Types.NestedField VALUE_COUNTS = optional(109, "value_counts", MapType.ofRequired(119, 120, + IntegerType.get(), LongType.get())); + Types.NestedField NULL_VALUE_COUNTS = optional(110, "null_value_counts", MapType.ofRequired(121, 122, + IntegerType.get(), LongType.get())); + Types.NestedField LOWER_BOUNDS = optional(125, "lower_bounds", MapType.ofRequired(126, 127, + IntegerType.get(), BinaryType.get())); + Types.NestedField UPPER_BOUNDS = optional(128, "upper_bounds", MapType.ofRequired(129, 130, + IntegerType.get(), BinaryType.get())); + Types.NestedField KEY_METADATA = optional(131, "key_metadata", BinaryType.get()); + Types.NestedField SPLIT_OFFSETS = optional(132, "split_offsets", ListType.ofRequired(133, LongType.get())); + int PARTITION_ID = 102; + String PARTITION_NAME = "partition"; + // NEXT ID TO ASSIGN: 134 + static StructType getType(StructType partitionType) { // IDs start at 100 to leave room for changes to ManifestEntry return StructType.of( - required(100, "file_path", StringType.get()), - required(101, "file_format", StringType.get()), - required(102, "partition", partitionType), - required(103, "record_count", LongType.get()), - required(104, "file_size_in_bytes", LongType.get()), - required(105, "block_size_in_bytes", LongType.get()), - optional(108, "column_sizes", MapType.ofRequired(117, 118, - IntegerType.get(), LongType.get())), - optional(109, "value_counts", MapType.ofRequired(119, 120, - IntegerType.get(), LongType.get())), - optional(110, "null_value_counts", MapType.ofRequired(121, 122, - IntegerType.get(), LongType.get())), - optional(125, "lower_bounds", MapType.ofRequired(126, 127, - IntegerType.get(), BinaryType.get())), - optional(128, "upper_bounds", MapType.ofRequired(129, 130, - IntegerType.get(), BinaryType.get())), - optional(131, "key_metadata", BinaryType.get()), - optional(132, "split_offsets", ListType.ofRequired(133, LongType.get())) - // NEXT ID TO ASSIGN: 134 + FILE_PATH, + FILE_FORMAT, + required(PARTITION_ID, PARTITION_NAME, partitionType), + RECORD_COUNT, + FILE_SIZE, + BLOCK_SIZE, + COLUMN_SIZES, + VALUE_COUNTS, + NULL_VALUE_COUNTS, + LOWER_BOUNDS, + UPPER_BOUNDS, + KEY_METADATA, + SPLIT_OFFSETS ); } + /** + * @return the status of the file, whether EXISTING, ADDED, or DELETED + */ + FileStatus status(); + + /** + * @return id of the snapshot in which the file was added to the table + */ + Long snapshotId(); + + /** + * @return the sequence number of the snapshot in which the file was added to the table + */ + Long sequenceNumber(); + /** * @return fully qualified path to the file, suitable for constructing a Hadoop Path */ diff --git a/api/src/main/java/org/apache/iceberg/FileStatus.java b/api/src/main/java/org/apache/iceberg/FileStatus.java new file mode 100644 index 000000000000..8378e3d962bd --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/FileStatus.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +/** + * The status of a data or delete file in a manifest: + * 0 - existing (added in a different manifest) + * 1 - added to the table in the manifest + * 2 - deleted from the table in the manifest + */ +public enum FileStatus { + EXISTING(0), + ADDED(1), + DELETED(2); + + private final int id; + + FileStatus(int id) { + this.id = id; + } + + public int id() { + return id; + } +} diff --git a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java index 82c23f1e3715..4fc15075d125 100644 --- a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java +++ b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java @@ -40,35 +40,58 @@ private TypeUtil() {} public static Schema select(Schema schema, Set fieldIds) { Preconditions.checkNotNull(schema, "Schema cannot be null"); - Preconditions.checkNotNull(fieldIds, "Field ids cannot be null"); - Type result = visit(schema, new PruneColumns(fieldIds)); + Types.StructType result = select(schema.asStruct(), fieldIds); if (schema.asStruct() == result) { return schema; } else if (result != null) { if (schema.getAliases() != null) { - return new Schema(result.asNestedType().fields(), schema.getAliases()); + return new Schema(result.fields(), schema.getAliases()); } else { - return new Schema(result.asNestedType().fields()); + return new Schema(result.fields()); } } return new Schema(ImmutableList.of(), schema.getAliases()); } + public static Types.StructType select(Types.StructType struct, Set fieldIds) { + Preconditions.checkNotNull(struct, "Struct cannot be null"); + Preconditions.checkNotNull(fieldIds, "Field ids cannot be null"); + + Type result = visit(struct, new PruneColumns(fieldIds)); + if (struct == result) { + return struct; + } else if (result != null) { + return result.asStructType(); + } + + return Types.StructType.of(); + } + public static Set getProjectedIds(Schema schema) { - return visit(schema, new GetProjectedIds()); + return ImmutableSet.copyOf(getIdsInternal(schema.asStruct())); } - public static Set getProjectedIds(Type schema) { - if (schema.isPrimitiveType()) { + public static Set getProjectedIds(Type type) { + if (type.isPrimitiveType()) { return ImmutableSet.of(); } - return ImmutableSet.copyOf(visit(schema, new GetProjectedIds())); + return ImmutableSet.copyOf(getIdsInternal(type)); + } + + private static Set getIdsInternal(Type type) { + return visit(type, new GetProjectedIds()); + } + + public static Types.StructType selectNot(Types.StructType struct, Set fieldIds) { + Set projectedIds = getIdsInternal(struct); + projectedIds.removeAll(fieldIds); + return select(struct, projectedIds); } public static Schema selectNot(Schema schema, Set fieldIds) { - Set projectedIds = getProjectedIds(schema); + Set projectedIds = getIdsInternal(schema.asStruct()); projectedIds.removeAll(fieldIds); return select(schema, projectedIds); } diff --git a/api/src/test/java/org/apache/iceberg/TestHelpers.java b/api/src/test/java/org/apache/iceberg/TestHelpers.java index 4989281ffe96..b2ad2215076c 100644 --- a/api/src/test/java/org/apache/iceberg/TestHelpers.java +++ b/api/src/test/java/org/apache/iceberg/TestHelpers.java @@ -303,6 +303,21 @@ public TestDataFile(String path, StructLike partition, long recordCount, this.upperBounds = upperBounds; } + @Override + public FileStatus status() { + return FileStatus.ADDED; + } + + @Override + public Long snapshotId() { + return 0L; + } + + @Override + public Long sequenceNumber() { + return 0L; + } + @Override public CharSequence path() { return path; diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java b/core/src/main/java/org/apache/iceberg/GenericDataFile.java index aa427cbc5e16..2402424205f7 100644 --- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericDataFile.java @@ -29,6 +29,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; import org.apache.avro.specific.SpecificData; import org.apache.iceberg.avro.AvroSchemaUtil; @@ -49,6 +50,12 @@ public PartitionData copy() { private int[] fromProjectionPos; private Types.StructType partitionType; + // ManifestEntry fields + private ManifestEntry asEntry = null; + private FileStatus status = FileStatus.ADDED; + private Long snapshotId = null; + private Long sequenceNumber = null; + private String filePath = null; private FileFormat format = null; private PartitionData partitionData = null; @@ -70,11 +77,11 @@ public PartitionData copy() { private static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024; /** - * Used by Avro reflection to instantiate this class when reading manifest files. + * Used by DelegatingManifestEntry instantiate this class when reading manifest files. */ - @SuppressWarnings("checkstyle:RedundantModifier") // Must be public - public GenericDataFile(org.apache.avro.Schema avroSchema) { + private GenericDataFile(org.apache.avro.Schema avroSchema, AsManifestEntry asEntry) { this.avroSchema = avroSchema; + this.asEntry = asEntry; Types.StructType schema = AvroSchemaUtil.convert(avroSchema).asNestedType().asStructType(); @@ -112,6 +119,7 @@ public GenericDataFile(org.apache.avro.Schema avroSchema) { this.format = format; this.partitionData = EMPTY_PARTITION_DATA; this.partitionType = EMPTY_PARTITION_DATA.getPartitionType(); + this.asEntry = new AsManifestEntry(this, partitionType); this.recordCount = recordCount; this.fileSizeInBytes = fileSizeInBytes; } @@ -122,6 +130,7 @@ public GenericDataFile(org.apache.avro.Schema avroSchema) { this.format = format; this.partitionData = partition; this.partitionType = partition.getPartitionType(); + this.asEntry = new AsManifestEntry(this, partitionType); this.recordCount = recordCount; this.fileSizeInBytes = fileSizeInBytes; } @@ -139,6 +148,7 @@ public GenericDataFile(org.apache.avro.Schema avroSchema) { this.partitionData = partition; this.partitionType = partition.getPartitionType(); } + this.asEntry = new AsManifestEntry(this, partitionType); // this will throw NPE if metrics.recordCount is null this.recordCount = metrics.recordCount(); @@ -165,10 +175,14 @@ public GenericDataFile(org.apache.avro.Schema avroSchema) { * @param fullCopy whether to copy all fields or to drop column-level stats */ private GenericDataFile(GenericDataFile toCopy, boolean fullCopy) { + this.status = toCopy.status; + this.snapshotId = toCopy.snapshotId; + this.sequenceNumber = toCopy.sequenceNumber; this.filePath = toCopy.filePath; this.format = toCopy.format; this.partitionData = toCopy.partitionData.copy(); this.partitionType = toCopy.partitionType; + this.asEntry = new AsManifestEntry(this, partitionType); this.recordCount = toCopy.recordCount; this.fileSizeInBytes = toCopy.fileSizeInBytes; if (fullCopy) { @@ -196,6 +210,25 @@ private GenericDataFile(GenericDataFile toCopy, boolean fullCopy) { GenericDataFile() { } + @Override + public FileStatus status() { + return status; + } + + @Override + public Long snapshotId() { + return snapshotId; + } + + @Override + public Long sequenceNumber() { + return sequenceNumber; + } + + public ManifestEntry asEntry() { + return asEntry; + } + @Override public CharSequence path() { return filePath; @@ -399,12 +432,12 @@ public String toString() { } @Override - public DataFile copyWithoutStats() { + public GenericDataFile copyWithoutStats() { return new GenericDataFile(this, false /* drop stats */); } @Override - public DataFile copy() { + public GenericDataFile copy() { return new GenericDataFile(this, true /* full copy */); } @@ -425,4 +458,147 @@ private static List copy(List list) { } return null; } + + /** + * An adapter that makes a DataFile appear like a ManifestEntry for v1 metadata files. + */ + static class AsManifestEntry + implements ManifestEntry, IndexedRecord, SpecificData.SchemaConstructable, StructLike, Serializable { + private transient Schema avroSchema = null; + private GenericDataFile file = null; + + /** + * Used by Avro reflection to instantiate this class when reading manifest files. + * + * @param avroSchema an Avro read schema + */ + protected AsManifestEntry(Schema avroSchema) { + this.avroSchema = avroSchema; + this.file = new GenericDataFile(avroSchema.getField("data_file").schema(), this); + } + + /** + * Used by DataFile to create a ManifestEntry adapter. + * + * @param file a GenericDataFile that contains manifest entry data + */ + protected AsManifestEntry(GenericDataFile file, Types.StructType partitionType) { + this.avroSchema = AvroSchemaUtil.convert(ManifestEntry.getSchema(partitionType), "manifest_entry"); + this.file = file; + } + + /** + * Constructor for Java serialization. + */ + AsManifestEntry() { + } + + @Override + public Schema getSchema() { + return avroSchema; + } + + @Override + public Status status() { + return ManifestEntry.Status.values()[file.status.id()]; + } + + @Override + public Long snapshotId() { + return file.snapshotId; + } + + @Override + public void setSnapshotId(long snapshotId) { + file.snapshotId = snapshotId; + } + + @Override + public Long sequenceNumber() { + return file.sequenceNumber; + } + + @Override + public void setSequenceNumber(long sequenceNumber) { + file.sequenceNumber = sequenceNumber; + } + + @Override + public DataFile file() { + return file; + } + + @Override + public ManifestEntry copy() { + return file.copy().asEntry; + } + + @Override + public ManifestEntry copyWithoutStats() { + return file.copyWithoutStats().asEntry; + } + + @Override + public void put(int pos, Object value) { + switch (pos) { + case 0: + file.status = FileStatus.values()[(Integer) value]; + return; + case 1: + file.snapshotId = (Long) value; + return; + case 2: + file.sequenceNumber = (Long) value; + return; + case 3: + if (file != value) { + throw new IllegalArgumentException("Cannot replace data file"); + } + return; + default: + // ignore the object, it must be from a newer version of the format + } + } + + @Override + public void set(int pos, T value) { + put(pos, value); + } + + @Override + public Object get(int i) { + switch (i) { + case 0: + return file.status.id(); + case 1: + return file.snapshotId; + case 2: + return file.sequenceNumber; + case 3: + return file; + default: + throw new UnsupportedOperationException("Unknown field ordinal: " + i); + } + } + + @Override + public T get(int pos, Class javaClass) { + return javaClass.cast(get(pos)); + } + + @Override + public int size() { + return avroSchema.getFields().size(); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("status", file.status) + .add("snapshot_id", file.snapshotId) + .add("sequence_number", file.sequenceNumber) + .add("file", file) + .toString(); + } + } } diff --git a/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java b/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java index 9fc29a62bbff..49a812cc8239 100644 --- a/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java +++ b/core/src/main/java/org/apache/iceberg/GenericManifestEntry.java @@ -19,182 +19,21 @@ package org.apache.iceberg; -import com.google.common.base.MoreObjects; -import org.apache.avro.generic.IndexedRecord; -import org.apache.avro.specific.SpecificData; -import org.apache.iceberg.avro.AvroSchemaUtil; -import org.apache.iceberg.types.Types; - -class GenericManifestEntry implements ManifestEntry, IndexedRecord, SpecificData.SchemaConstructable, StructLike { - private final org.apache.avro.Schema schema; - private final V1Metadata.IndexedDataFile fileWrapper; - private Status status = Status.EXISTING; - private Long snapshotId = null; - private Long sequenceNumber = null; - private DataFile file = null; - - GenericManifestEntry(org.apache.avro.Schema schema) { - this.schema = schema; - this.fileWrapper = null; // do not use the file wrapper to read - } - - GenericManifestEntry(Types.StructType partitionType) { - this.schema = AvroSchemaUtil.convert(V1Metadata.entrySchema(partitionType), "manifest_entry"); - this.fileWrapper = new V1Metadata.IndexedDataFile(schema.getField("data_file").schema()); - } - - private GenericManifestEntry(GenericManifestEntry toCopy, boolean fullCopy) { - this.schema = toCopy.schema; - this.fileWrapper = new V1Metadata.IndexedDataFile(schema.getField("data_file").schema()); - this.status = toCopy.status; - this.snapshotId = toCopy.snapshotId; - if (fullCopy) { - this.file = toCopy.file().copy(); - } else { - this.file = toCopy.file().copyWithoutStats(); - } - } - - ManifestEntry wrapExisting(Long newSnapshotId, Long newSequenceNumber, DataFile newFile) { - this.status = Status.EXISTING; - this.snapshotId = newSnapshotId; - this.sequenceNumber = newSequenceNumber; - this.file = newFile; - return this; - } - - ManifestEntry wrapAppend(Long newSnapshotId, DataFile newFile) { - this.status = Status.ADDED; - this.snapshotId = newSnapshotId; - this.sequenceNumber = null; - this.file = newFile; - return this; - } - - ManifestEntry wrapDelete(Long newSnapshotId, DataFile newFile) { - this.status = Status.DELETED; - this.snapshotId = newSnapshotId; - this.sequenceNumber = null; - this.file = newFile; - return this; - } - - /** - * @return the status of the file, whether EXISTING, ADDED, or DELETED - */ - @Override - public Status status() { - return status; - } +import org.apache.avro.Schema; +class GenericManifestEntry extends GenericDataFile.AsManifestEntry { /** - * @return id of the snapshot in which the file was added to the table + * Used by Avro reflection to instantiate this class when reading manifest files. + * + * @param avroSchema an Avro read schema */ - @Override - public Long snapshotId() { - return snapshotId; - } - - @Override - public Long sequenceNumber() { - return sequenceNumber; + GenericManifestEntry(Schema avroSchema) { + super(avroSchema); } /** - * @return a file + * Constructor for Java serialization. */ - @Override - public DataFile file() { - return file; - } - - @Override - public ManifestEntry copy() { - return new GenericManifestEntry(this, true /* full copy */); - } - - @Override - public ManifestEntry copyWithoutStats() { - return new GenericManifestEntry(this, false /* drop stats */); - } - - @Override - public void setSnapshotId(long newSnapshotId) { - this.snapshotId = newSnapshotId; - } - - @Override - public void setSequenceNumber(long newSequenceNumber) { - this.sequenceNumber = newSequenceNumber; - } - - @Override - public void put(int i, Object v) { - switch (i) { - case 0: - this.status = Status.values()[(Integer) v]; - return; - case 1: - this.snapshotId = (Long) v; - return; - case 2: - this.sequenceNumber = (Long) v; - return; - case 3: - this.file = (DataFile) v; - return; - default: - // ignore the object, it must be from a newer version of the format - } - } - - @Override - public void set(int pos, T value) { - put(pos, value); - } - - @Override - public Object get(int i) { - switch (i) { - case 0: - return status.id(); - case 1: - return snapshotId; - case 2: - return sequenceNumber; - case 3: - if (fileWrapper == null || file instanceof GenericDataFile) { - return file; - } else { - return fileWrapper.wrap(file); - } - default: - throw new UnsupportedOperationException("Unknown field ordinal: " + i); - } - } - - @Override - public T get(int pos, Class javaClass) { - return javaClass.cast(get(pos)); - } - - @Override - public org.apache.avro.Schema getSchema() { - return schema; - } - - @Override - public int size() { - return 4; - } - - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("status", status) - .add("snapshot_id", snapshotId) - .add("sequence_number", sequenceNumber) - .add("file", file) - .toString(); + GenericManifestEntry() { } } diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntry.java b/core/src/main/java/org/apache/iceberg/ManifestEntry.java index bc03850c3633..77f170040f56 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestEntry.java +++ b/core/src/main/java/org/apache/iceberg/ManifestEntry.java @@ -19,6 +19,9 @@ package org.apache.iceberg; +import com.google.common.collect.Sets; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.StructType; @@ -54,7 +57,12 @@ static Schema getSchema(StructType partitionType) { } static Schema wrapFileSchema(StructType fileType) { - return new Schema(STATUS, SNAPSHOT_ID, SEQUENCE_NUMBER, required(DATA_FILE_ID, "data_file", fileType)); + // remove ManifestEntry fields from the file type when wrapping to avoid duplication + Set toRemove = Sets.newHashSet(STATUS.fieldId(), SNAPSHOT_ID.fieldId(), SEQUENCE_NUMBER.fieldId()); + StructType v1FileType = StructType.of(fileType.fields().stream() + .filter(field -> !toRemove.contains(field.fieldId())) + .collect(Collectors.toList())); + return new Schema(STATUS, SNAPSHOT_ID, SEQUENCE_NUMBER, required(DATA_FILE_ID, "data_file", v1FileType)); } /** diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntryWrapper.java b/core/src/main/java/org/apache/iceberg/ManifestEntryWrapper.java new file mode 100644 index 000000000000..ea384254f454 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/ManifestEntryWrapper.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +/** + * Wrapper used to replace ManifestEntry values when writing entries to a manifest. + */ +class ManifestEntryWrapper implements ManifestEntry { + private DataFile wrapped = null; + private Status status = null; + private Long snapshotId = null; + private Long sequenceNumber = null; + + ManifestEntry wrapExisting(Long newSnapshotId, Long newSequenceNumber, DataFile newFile) { + this.status = Status.EXISTING; + this.snapshotId = newSnapshotId; + this.sequenceNumber = newSequenceNumber; + this.wrapped = newFile; + return this; + } + + ManifestEntry wrapAppend(Long newSnapshotId, DataFile newFile) { + this.status = Status.ADDED; + this.snapshotId = newSnapshotId; + this.sequenceNumber = null; + this.wrapped = newFile; + return this; + } + + ManifestEntry wrapDelete(Long newSnapshotId, Long newSequenceNumber, DataFile newFile) { + this.status = Status.DELETED; + this.snapshotId = newSnapshotId; + this.sequenceNumber = newSequenceNumber; + this.wrapped = newFile; + return this; + } + + @Override + public Status status() { + return status; + } + + @Override + public Long snapshotId() { + return snapshotId; + } + + @Override + public Long sequenceNumber() { + return sequenceNumber; + } + + @Override + public DataFile file() { + return wrapped; + } + + @Override + public void setSnapshotId(long snapshotId) { + this.snapshotId = snapshotId; + } + + @Override + public void setSequenceNumber(long sequenceNumber) { + this.sequenceNumber = sequenceNumber; + } + + @Override + public ManifestEntry copy() { + throw new UnsupportedOperationException("Cannot copy a ManifestEntryWrapper"); + } + + @Override + public ManifestEntry copyWithoutStats() { + throw new UnsupportedOperationException("Cannot copy a ManifestEntryWrapper"); + } +} diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index ddd74c8a3e1d..7ba87ae438fd 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -54,7 +54,7 @@ public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) { private final int specId; private final FileAppender writer; private final Long snapshotId; - private final GenericManifestEntry reused; + private final ManifestEntryWrapper reused; private final PartitionSummary stats; private boolean closed = false; @@ -71,7 +71,7 @@ private ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) { this.specId = spec.specId(); this.writer = newAppender(spec, file); this.snapshotId = snapshotId; - this.reused = new GenericManifestEntry(spec.partitionType()); + this.reused = new ManifestEntryWrapper(); this.stats = new PartitionSummary(spec); } @@ -121,11 +121,9 @@ void add(ManifestEntry entry) { * Add an existing entry for a data file. * * @param existingFile a data file - * @param fileSnapshotId snapshot ID when the data file was added to the table - * @param sequenceNumber sequence number for the data file */ - public void existing(DataFile existingFile, long fileSnapshotId, long sequenceNumber) { - addEntry(reused.wrapExisting(fileSnapshotId, sequenceNumber, existingFile)); + public void existing(DataFile existingFile) { + addEntry(reused.wrapExisting(existingFile.snapshotId(), existingFile.sequenceNumber(), existingFile)); } void existing(ManifestEntry entry) { @@ -140,13 +138,13 @@ void existing(ManifestEntry entry) { * @param deletedFile a data file */ public void delete(DataFile deletedFile) { - addEntry(reused.wrapDelete(snapshotId, deletedFile)); + addEntry(reused.wrapDelete(snapshotId, deletedFile.sequenceNumber(), deletedFile)); } void delete(ManifestEntry entry) { // Use the current Snapshot ID for the delete. It is safe to delete the data file from disk // when this Snapshot has been removed or when there are no Snapshots older than this one. - addEntry(reused.wrapDelete(snapshotId, entry.file())); + addEntry(reused.wrapDelete(snapshotId, entry.sequenceNumber(), entry.file())); } @Override @@ -237,4 +235,5 @@ protected FileAppender newAppender(PartitionSpec spec, OutputFile } } } + } diff --git a/core/src/main/java/org/apache/iceberg/V1Metadata.java b/core/src/main/java/org/apache/iceberg/V1Metadata.java index a905b4d6ec55..31499d324173 100644 --- a/core/src/main/java/org/apache/iceberg/V1Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V1Metadata.java @@ -24,7 +24,7 @@ import java.util.Map; import org.apache.avro.generic.IndexedRecord; import org.apache.iceberg.avro.AvroSchemaUtil; -import org.apache.iceberg.types.Types; +import org.apache.iceberg.types.Types.StructType; import static org.apache.iceberg.types.Types.NestedField.required; @@ -181,23 +181,42 @@ public ManifestFile copy() { } } - static Schema entrySchema(Types.StructType partitionType) { - return wrapFileSchema(DataFile.getType(partitionType)); + static Schema entrySchema(StructType partitionType) { + return wrapFileSchema(dataFileSchema(partitionType)); } - static Schema wrapFileSchema(Types.StructType fileSchema) { + static Schema wrapFileSchema(StructType fileSchema) { // this is used to build projection schemas return new Schema( ManifestEntry.STATUS, ManifestEntry.SNAPSHOT_ID, required(ManifestEntry.DATA_FILE_ID, "data_file", fileSchema)); } + static StructType dataFileSchema(StructType partitionType) { + // IDs start at 100 to leave room for changes to ManifestEntry + return StructType.of( + DataFile.FILE_PATH, + DataFile.FILE_FORMAT, + required(DataFile.PARTITION_ID, DataFile.PARTITION_NAME, partitionType), + DataFile.RECORD_COUNT, + DataFile.FILE_SIZE, + DataFile.BLOCK_SIZE, + DataFile.COLUMN_SIZES, + DataFile.VALUE_COUNTS, + DataFile.NULL_VALUE_COUNTS, + DataFile.LOWER_BOUNDS, + DataFile.UPPER_BOUNDS, + DataFile.KEY_METADATA, + DataFile.SPLIT_OFFSETS + ); + } + static class IndexedManifestEntry implements ManifestEntry, IndexedRecord { private final org.apache.avro.Schema avroSchema; private final IndexedDataFile fileWrapper; private ManifestEntry wrapped = null; - IndexedManifestEntry(Types.StructType partitionType) { + IndexedManifestEntry(StructType partitionType) { this.avroSchema = AvroSchemaUtil.convert(entrySchema(partitionType), "manifest_entry"); this.fileWrapper = new IndexedDataFile(avroSchema.getField("data_file").schema()); } @@ -337,6 +356,21 @@ public org.apache.avro.Schema getSchema() { return avroSchema; } + @Override + public FileStatus status() { + return wrapped.status(); + } + + @Override + public Long snapshotId() { + return wrapped.snapshotId(); + } + + @Override + public Long sequenceNumber() { + return wrapped.sequenceNumber(); + } + @Override public CharSequence path() { return wrapped.path(); diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index c66e8b11ac26..4677be4a99ab 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -183,14 +183,14 @@ ManifestFile writeManifestWithName(String name, DataFile... files) throws IOExce } ManifestEntry manifestEntry(ManifestEntry.Status status, Long snapshotId, DataFile file) { - GenericManifestEntry entry = new GenericManifestEntry(table.spec().partitionType()); + ManifestEntryWrapper entry = new ManifestEntryWrapper(); switch (status) { case ADDED: return entry.wrapAppend(snapshotId, file); case EXISTING: return entry.wrapExisting(snapshotId, 0L, file); case DELETED: - return entry.wrapDelete(snapshotId, file); + return entry.wrapDelete(snapshotId, 0L, file); default: throw new IllegalArgumentException("Unexpected entry status: " + status); } diff --git a/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java b/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java index 1875c3d5ca9e..d7f178cdfd63 100644 --- a/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java +++ b/spark/src/main/java/org/apache/iceberg/spark/SparkDataFile.java @@ -25,6 +25,7 @@ import java.util.Map; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileStatus; import org.apache.iceberg.StructLike; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; @@ -56,6 +57,21 @@ public SparkDataFile wrap(Row row) { return this; } + @Override + public FileStatus status() { + return null; + } + + @Override + public Long snapshotId() { + return null; + } + + @Override + public Long sequenceNumber() { + return null; + } + @Override public CharSequence path() { return wrapped.getAs(fieldPositions[0]); diff --git a/spark/src/test/java/org/apache/iceberg/TestDataFileSerialization.java b/spark/src/test/java/org/apache/iceberg/TestDataFileSerialization.java index 64c9d519f16b..3bc75f01e0f5 100644 --- a/spark/src/test/java/org/apache/iceberg/TestDataFileSerialization.java +++ b/spark/src/test/java/org/apache/iceberg/TestDataFileSerialization.java @@ -129,6 +129,12 @@ public void testDataFileJavaSerialization() throws Exception { } private void checkDataFile(DataFile expected, DataFile actual) { + Assert.assertEquals("Should match the serialized status", + expected.status(), actual.status()); + Assert.assertEquals("Should match the serialized snapshot id", + expected.snapshotId(), actual.snapshotId()); + Assert.assertEquals("Should match the serialized sequence number", + expected.sequenceNumber(), actual.sequenceNumber()); Assert.assertEquals("Should match the serialized record path", expected.path(), actual.path()); Assert.assertEquals("Should match the serialized record format", @@ -151,7 +157,7 @@ private void checkDataFile(DataFile expected, DataFile actual) { expected.keyMetadata(), actual.keyMetadata()); Assert.assertEquals("Should match the serialized record offsets", expected.splitOffsets(), actual.splitOffsets()); - Assert.assertEquals("Should match the serialized record offsets", + Assert.assertEquals("Should match the serialized key metadata", expected.keyMetadata(), actual.keyMetadata()); } From 05c5c86188f7f2bc249aaf383a8809144ed9b06f Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 22 Apr 2020 15:27:18 -0700 Subject: [PATCH 2/4] Revert "Add select and selectNot to TypeUtil for structs." This reverts commit 76bded6be08440beec9a93e421e5cea688439a4b. --- .../org/apache/iceberg/types/TypeUtil.java | 41 ++++--------------- 1 file changed, 9 insertions(+), 32 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java index 4fc15075d125..82c23f1e3715 100644 --- a/api/src/main/java/org/apache/iceberg/types/TypeUtil.java +++ b/api/src/main/java/org/apache/iceberg/types/TypeUtil.java @@ -40,58 +40,35 @@ private TypeUtil() {} public static Schema select(Schema schema, Set fieldIds) { Preconditions.checkNotNull(schema, "Schema cannot be null"); + Preconditions.checkNotNull(fieldIds, "Field ids cannot be null"); - Types.StructType result = select(schema.asStruct(), fieldIds); + Type result = visit(schema, new PruneColumns(fieldIds)); if (schema.asStruct() == result) { return schema; } else if (result != null) { if (schema.getAliases() != null) { - return new Schema(result.fields(), schema.getAliases()); + return new Schema(result.asNestedType().fields(), schema.getAliases()); } else { - return new Schema(result.fields()); + return new Schema(result.asNestedType().fields()); } } return new Schema(ImmutableList.of(), schema.getAliases()); } - public static Types.StructType select(Types.StructType struct, Set fieldIds) { - Preconditions.checkNotNull(struct, "Struct cannot be null"); - Preconditions.checkNotNull(fieldIds, "Field ids cannot be null"); - - Type result = visit(struct, new PruneColumns(fieldIds)); - if (struct == result) { - return struct; - } else if (result != null) { - return result.asStructType(); - } - - return Types.StructType.of(); - } - public static Set getProjectedIds(Schema schema) { - return ImmutableSet.copyOf(getIdsInternal(schema.asStruct())); + return visit(schema, new GetProjectedIds()); } - public static Set getProjectedIds(Type type) { - if (type.isPrimitiveType()) { + public static Set getProjectedIds(Type schema) { + if (schema.isPrimitiveType()) { return ImmutableSet.of(); } - return ImmutableSet.copyOf(getIdsInternal(type)); - } - - private static Set getIdsInternal(Type type) { - return visit(type, new GetProjectedIds()); - } - - public static Types.StructType selectNot(Types.StructType struct, Set fieldIds) { - Set projectedIds = getIdsInternal(struct); - projectedIds.removeAll(fieldIds); - return select(struct, projectedIds); + return ImmutableSet.copyOf(visit(schema, new GetProjectedIds())); } public static Schema selectNot(Schema schema, Set fieldIds) { - Set projectedIds = getIdsInternal(schema.asStruct()); + Set projectedIds = getProjectedIds(schema); projectedIds.removeAll(fieldIds); return select(schema, projectedIds); } From 9f485eb3276eafbb1b3b7c7832f032dbb648cb8d Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 22 Apr 2020 15:31:44 -0700 Subject: [PATCH 3/4] Roll back unnecessary changes in DataFile and V1Metadata. --- .../java/org/apache/iceberg/DataFile.java | 54 +++++++------------ .../java/org/apache/iceberg/V1Metadata.java | 29 ++-------- 2 files changed, 24 insertions(+), 59 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/DataFile.java b/api/src/main/java/org/apache/iceberg/DataFile.java index c481f84c66d7..aee057d22212 100644 --- a/api/src/main/java/org/apache/iceberg/DataFile.java +++ b/api/src/main/java/org/apache/iceberg/DataFile.java @@ -22,7 +22,6 @@ import java.nio.ByteBuffer; import java.util.List; import java.util.Map; -import org.apache.iceberg.types.Types; import org.apache.iceberg.types.Types.BinaryType; import org.apache.iceberg.types.Types.IntegerType; import org.apache.iceberg.types.Types.ListType; @@ -38,43 +37,28 @@ * Interface for files listed in a table manifest. */ public interface DataFile { - Types.NestedField FILE_PATH = required(100, "file_path", StringType.get()); - Types.NestedField FILE_FORMAT = required(101, "file_format", StringType.get()); - Types.NestedField RECORD_COUNT = required(103, "record_count", LongType.get()); - Types.NestedField FILE_SIZE = required(104, "file_size_in_bytes", LongType.get()); - Types.NestedField BLOCK_SIZE = required(105, "block_size_in_bytes", LongType.get()); - Types.NestedField COLUMN_SIZES = optional(108, "column_sizes", MapType.ofRequired(117, 118, - IntegerType.get(), LongType.get())); - Types.NestedField VALUE_COUNTS = optional(109, "value_counts", MapType.ofRequired(119, 120, - IntegerType.get(), LongType.get())); - Types.NestedField NULL_VALUE_COUNTS = optional(110, "null_value_counts", MapType.ofRequired(121, 122, - IntegerType.get(), LongType.get())); - Types.NestedField LOWER_BOUNDS = optional(125, "lower_bounds", MapType.ofRequired(126, 127, - IntegerType.get(), BinaryType.get())); - Types.NestedField UPPER_BOUNDS = optional(128, "upper_bounds", MapType.ofRequired(129, 130, - IntegerType.get(), BinaryType.get())); - Types.NestedField KEY_METADATA = optional(131, "key_metadata", BinaryType.get()); - Types.NestedField SPLIT_OFFSETS = optional(132, "split_offsets", ListType.ofRequired(133, LongType.get())); - int PARTITION_ID = 102; - String PARTITION_NAME = "partition"; - // NEXT ID TO ASSIGN: 134 - static StructType getType(StructType partitionType) { // IDs start at 100 to leave room for changes to ManifestEntry return StructType.of( - FILE_PATH, - FILE_FORMAT, - required(PARTITION_ID, PARTITION_NAME, partitionType), - RECORD_COUNT, - FILE_SIZE, - BLOCK_SIZE, - COLUMN_SIZES, - VALUE_COUNTS, - NULL_VALUE_COUNTS, - LOWER_BOUNDS, - UPPER_BOUNDS, - KEY_METADATA, - SPLIT_OFFSETS + required(100, "file_path", StringType.get()), + required(101, "file_format", StringType.get()), + required(102, "partition", partitionType), + required(103, "record_count", LongType.get()), + required(104, "file_size_in_bytes", LongType.get()), + required(105, "block_size_in_bytes", LongType.get()), + optional(108, "column_sizes", MapType.ofRequired(117, 118, + IntegerType.get(), LongType.get())), + optional(109, "value_counts", MapType.ofRequired(119, 120, + IntegerType.get(), LongType.get())), + optional(110, "null_value_counts", MapType.ofRequired(121, 122, + IntegerType.get(), LongType.get())), + optional(125, "lower_bounds", MapType.ofRequired(126, 127, + IntegerType.get(), BinaryType.get())), + optional(128, "upper_bounds", MapType.ofRequired(129, 130, + IntegerType.get(), BinaryType.get())), + optional(131, "key_metadata", BinaryType.get()), + optional(132, "split_offsets", ListType.ofRequired(133, LongType.get())) + // NEXT ID TO ASSIGN: 134 ); } diff --git a/core/src/main/java/org/apache/iceberg/V1Metadata.java b/core/src/main/java/org/apache/iceberg/V1Metadata.java index 31499d324173..f8dddd1eafaf 100644 --- a/core/src/main/java/org/apache/iceberg/V1Metadata.java +++ b/core/src/main/java/org/apache/iceberg/V1Metadata.java @@ -24,7 +24,7 @@ import java.util.Map; import org.apache.avro.generic.IndexedRecord; import org.apache.iceberg.avro.AvroSchemaUtil; -import org.apache.iceberg.types.Types.StructType; +import org.apache.iceberg.types.Types; import static org.apache.iceberg.types.Types.NestedField.required; @@ -181,42 +181,23 @@ public ManifestFile copy() { } } - static Schema entrySchema(StructType partitionType) { - return wrapFileSchema(dataFileSchema(partitionType)); + static Schema entrySchema(Types.StructType partitionType) { + return wrapFileSchema(DataFile.getType(partitionType)); } - static Schema wrapFileSchema(StructType fileSchema) { + static Schema wrapFileSchema(Types.StructType fileSchema) { // this is used to build projection schemas return new Schema( ManifestEntry.STATUS, ManifestEntry.SNAPSHOT_ID, required(ManifestEntry.DATA_FILE_ID, "data_file", fileSchema)); } - static StructType dataFileSchema(StructType partitionType) { - // IDs start at 100 to leave room for changes to ManifestEntry - return StructType.of( - DataFile.FILE_PATH, - DataFile.FILE_FORMAT, - required(DataFile.PARTITION_ID, DataFile.PARTITION_NAME, partitionType), - DataFile.RECORD_COUNT, - DataFile.FILE_SIZE, - DataFile.BLOCK_SIZE, - DataFile.COLUMN_SIZES, - DataFile.VALUE_COUNTS, - DataFile.NULL_VALUE_COUNTS, - DataFile.LOWER_BOUNDS, - DataFile.UPPER_BOUNDS, - DataFile.KEY_METADATA, - DataFile.SPLIT_OFFSETS - ); - } - static class IndexedManifestEntry implements ManifestEntry, IndexedRecord { private final org.apache.avro.Schema avroSchema; private final IndexedDataFile fileWrapper; private ManifestEntry wrapped = null; - IndexedManifestEntry(StructType partitionType) { + IndexedManifestEntry(Types.StructType partitionType) { this.avroSchema = AvroSchemaUtil.convert(entrySchema(partitionType), "manifest_entry"); this.fileWrapper = new IndexedDataFile(avroSchema.getField("data_file").schema()); } From 42a538a6aa37ad50d643e87231db760455a0d904 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 22 Apr 2020 15:32:18 -0700 Subject: [PATCH 4/4] Fix javadoc for rename. --- core/src/main/java/org/apache/iceberg/GenericDataFile.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java b/core/src/main/java/org/apache/iceberg/GenericDataFile.java index 2402424205f7..d5b34f77777d 100644 --- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericDataFile.java @@ -77,7 +77,7 @@ public PartitionData copy() { private static final long DEFAULT_BLOCK_SIZE = 64 * 1024 * 1024; /** - * Used by DelegatingManifestEntry instantiate this class when reading manifest files. + * Used by AsManifestEntry to instantiate this class when reading manifest files. */ private GenericDataFile(org.apache.avro.Schema avroSchema, AsManifestEntry asEntry) { this.avroSchema = avroSchema;