diff --git a/api/src/main/java/org/apache/iceberg/AppendFiles.java b/api/src/main/java/org/apache/iceberg/AppendFiles.java index a54b83574fd5..aefe1d1fdd69 100644 --- a/api/src/main/java/org/apache/iceberg/AppendFiles.java +++ b/api/src/main/java/org/apache/iceberg/AppendFiles.java @@ -38,10 +38,21 @@ public interface AppendFiles extends SnapshotUpdate { AppendFiles appendFile(DataFile file); /** - * Append the contents of a manifest to the table. + * Append a {@link ManifestFile} to the table. *

* The manifest must contain only appended files. All files in the manifest will be appended to * the table in the snapshot created by this update. + *

+ * By default, the manifest will be rewritten to assign all entries this update's snapshot ID. + * In that case, it is always the responsibility of the caller to manage the lifecycle of + * the original manifest. + *

+ * If manifest entries are allowed to inherit the snapshot ID assigned on commit, the manifest + * should never be deleted manually if the commit succeeds as it will become part of the table + * metadata and will be cleaned up on expiry. If the manifest gets merged with others while + * preparing a new snapshot, it will be deleted automatically if this operation is successful. + * If the commit fails, the manifest will never be deleted and it is up to the caller whether + * to delete or reuse it. * * @param file a manifest file * @return this for method chaining diff --git a/api/src/main/java/org/apache/iceberg/RewriteManifests.java b/api/src/main/java/org/apache/iceberg/RewriteManifests.java index 74032eb77a0e..f6ca9e239bd2 100644 --- a/api/src/main/java/org/apache/iceberg/RewriteManifests.java +++ b/api/src/main/java/org/apache/iceberg/RewriteManifests.java @@ -71,6 +71,17 @@ public interface RewriteManifests extends SnapshotUpdate { /** * Adds a {@link ManifestFile manifest file} to the table. The added manifest cannot contain new * or deleted files. + *

+ * By default, the manifest will be rewritten to ensure all entries have explicit snapshot IDs. + * In that case, it is always the responsibility of the caller to manage the lifecycle of + * the original manifest. + *

+ * If manifest entries are allowed to inherit the snapshot ID assigned on commit, the manifest + * should never be deleted manually if the commit succeeds as it will become part of the table + * metadata and will be cleaned up on expiry. If the manifest gets merged with others while + * preparing a new snapshot, it will be deleted automatically if this operation is successful. + * If the commit fails, the manifest will never be deleted and it is up to the caller whether + * to delete or reuse it. * * @param manifest a manifest to add * @return this for method chaining diff --git a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java index a8fe492521f7..fd6a817ba9a6 100644 --- a/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java +++ b/core/src/main/java/org/apache/iceberg/BaseMetastoreCatalog.java @@ -255,7 +255,7 @@ private static void deleteFiles(FileIO io, Set allManifests) { .executeWith(ThreadPools.getWorkerPool()) .onFailure((item, exc) -> LOG.warn("Failed to get deleted files: this may cause orphaned data files", exc)) .run(manifest -> { - try (ManifestReader reader = ManifestReader.read(io.newInputFile(manifest.path()))) { + try (ManifestReader reader = ManifestReader.read(manifest, io)) { for (ManifestEntry entry : reader.entries()) { // intern the file path because the weak key map uses identity (==) instead of equals String path = entry.file().path().toString().intern(); diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java index 72741a8d35d9..39e7a7d1c695 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteManifests.java @@ -21,6 +21,7 @@ import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -45,6 +46,8 @@ import static org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES; import static org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT; +import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED; +import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT; public class BaseRewriteManifests extends SnapshotProducer implements RewriteManifests { @@ -59,9 +62,11 @@ public class BaseRewriteManifests extends SnapshotProducer imp private final TableOperations ops; private final Map specsById; private final long manifestTargetSizeBytes; + private final boolean snapshotIdInheritanceEnabled; private final Set deletedManifests = Sets.newHashSet(); private final List addedManifests = Lists.newArrayList(); + private final List rewrittenAddedManifests = Lists.newArrayList(); private final List keptManifests = Collections.synchronizedList(new ArrayList<>()); private final List newManifests = Collections.synchronizedList(new ArrayList<>()); @@ -82,6 +87,8 @@ public class BaseRewriteManifests extends SnapshotProducer imp this.specsById = ops.current().specsById(); this.manifestTargetSizeBytes = ops.current().propertyAsLong(MANIFEST_TARGET_SIZE_BYTES, MANIFEST_TARGET_SIZE_BYTES_DEFAULT); + this.snapshotIdInheritanceEnabled = ops.current() + .propertyAsBoolean(SNAPSHOT_ID_INHERITANCE_ENABLED, SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT); } @Override @@ -102,8 +109,9 @@ public RewriteManifests set(String property, String value) { @Override protected Map summary() { + int createdManifestsCount = newManifests.size() + addedManifests.size() + rewrittenAddedManifests.size(); + summaryBuilder.set(CREATED_MANIFESTS_COUNT, String.valueOf(createdManifestsCount)); summaryBuilder.set(KEPT_MANIFESTS_COUNT, String.valueOf(keptManifests.size())); - summaryBuilder.set(CREATED_MANIFESTS_COUNT, String.valueOf(newManifests.size() + addedManifests.size())); summaryBuilder.set(REPLACED_MANIFESTS_COUNT, String.valueOf(rewrittenManifests.size() + deletedManifests.size())); summaryBuilder.set(PROCESSED_ENTRY_COUNT, String.valueOf(entryCount.get())); return summaryBuilder.build(); @@ -129,17 +137,25 @@ public RewriteManifests deleteManifest(ManifestFile manifest) { @Override public RewriteManifests addManifest(ManifestFile manifest) { - try { - // the appended manifest must be rewritten with this update's snapshot ID - addedManifests.add(copyManifest(manifest)); - } catch (IllegalArgumentException e) { - throw new IllegalArgumentException("Cannot append manifest: " + e.getMessage()); + Preconditions.checkArgument(!manifest.hasAddedFiles(), "Cannot add manifest with added files"); + Preconditions.checkArgument(!manifest.hasDeletedFiles(), "Cannot add manifest with deleted files"); + Preconditions.checkArgument( + manifest.snapshotId() == null || manifest.snapshotId() == -1, + "Snapshot id must be assigned during commit"); + + if (snapshotIdInheritanceEnabled && manifest.snapshotId() == null) { + addedManifests.add(manifest); + } else { + // the manifest must be rewritten with this update's snapshot ID + ManifestFile copiedManifest = copyManifest(manifest); + rewrittenAddedManifests.add(copiedManifest); } + return this; } private ManifestFile copyManifest(ManifestFile manifest) { - try (ManifestReader reader = ManifestReader.read(ops.io().newInputFile(manifest.path()), specsById)) { + try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), specsById)) { OutputFile newFile = manifestPath(manifestSuffix.getAndIncrement()); return ManifestWriter.copyManifest(reader, newFile, snapshotId(), summaryBuilder, ALLOWED_ENTRY_STATUSES); } catch (IOException e) { @@ -162,10 +178,14 @@ public List apply(TableMetadata base) { validateFilesCounts(); + // TODO: add sequence numbers here + Iterable newManifestsWithMetadata = Iterables.transform( + Iterables.concat(newManifests, addedManifests, rewrittenAddedManifests), + manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()).build()); + // put new manifests at the beginning List apply = new ArrayList<>(); - apply.addAll(newManifests); - apply.addAll(addedManifests); + Iterables.addAll(apply, newManifestsWithMetadata); apply.addAll(keptManifests); return apply; @@ -218,8 +238,7 @@ private void performRewrite(List currentManifests) { keptManifests.add(manifest); } else { rewrittenManifests.add(manifest); - try (ManifestReader reader = - ManifestReader.read(ops.io().newInputFile(manifest.path()), ops.current().specsById())) { + try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) { FilteredManifest filteredManifest = reader.select(Arrays.asList("*")); filteredManifest.liveEntries().forEach( entry -> appendEntry(entry, clusterByFunc.apply(entry.file()), manifest.partitionSpecId()) @@ -246,8 +265,11 @@ private void validateDeletedManifests(Set currentManifests) { } private void validateFilesCounts() { - int createdManifestsFilesCount = activeFilesCount(newManifests) + activeFilesCount(addedManifests); - int replacedManifestsFilesCount = activeFilesCount(rewrittenManifests) + activeFilesCount(deletedManifests); + Iterable createdManifests = Iterables.concat(newManifests, addedManifests, rewrittenAddedManifests); + int createdManifestsFilesCount = activeFilesCount(createdManifests); + + Iterable replacedManifests = Iterables.concat(rewrittenManifests, deletedManifests); + int replacedManifestsFilesCount = activeFilesCount(replacedManifests); if (createdManifestsFilesCount != replacedManifestsFilesCount) { throw new ValidationException( @@ -295,7 +317,9 @@ private WriterWrapper getWriter(Object key) { @Override protected void cleanUncommitted(Set committed) { cleanUncommitted(newManifests, committed); - cleanUncommitted(addedManifests, committed); + // clean up only rewrittenAddedManifests as they are always owned by the table + // don't clean up addedManifests as they are added to the manifest list and are not compacted + cleanUncommitted(rewrittenAddedManifests, committed); } private void cleanUncommitted(Iterable manifests, Set committedManifests) { diff --git a/core/src/main/java/org/apache/iceberg/DataFilesTable.java b/core/src/main/java/org/apache/iceberg/DataFilesTable.java index 722b8364e9c5..4b153b19fc2b 100644 --- a/core/src/main/java/org/apache/iceberg/DataFilesTable.java +++ b/core/src/main/java/org/apache/iceberg/DataFilesTable.java @@ -119,61 +119,37 @@ protected CloseableIterable planFiles( String schemaString = SchemaParser.toJson(schema()); String specString = PartitionSpecParser.toJson(PartitionSpec.unpartitioned()); + ResidualEvaluator residuals = ResidualEvaluator.unpartitioned(rowFilter); // Data tasks produce the table schema, not the projection schema and projection is done by processing engines. // This data task needs to use the table schema, which may not include a partition schema to avoid having an // empty struct in the schema for unpartitioned tables. Some engines, like Spark, can't handle empty structs in // all cases. return CloseableIterable.transform(manifests, manifest -> - new ManifestReadTask(ops.io(), new BaseFileScanTask( - DataFiles.fromManifest(manifest), schemaString, specString, ResidualEvaluator.unpartitioned(rowFilter)), - fileSchema)); + new ManifestReadTask(ops.io(), manifest, fileSchema, schemaString, specString, residuals)); } } - private static class ManifestReadTask implements DataTask { + private static class ManifestReadTask extends BaseFileScanTask implements DataTask { private final FileIO io; - private final FileScanTask manifestTask; + private final ManifestFile manifest; private final Schema schema; - private ManifestReadTask(FileIO io, FileScanTask manifestTask, Schema schema) { + private ManifestReadTask(FileIO io, ManifestFile manifest, Schema schema, String schemaString, + String specString, ResidualEvaluator residuals) { + super(DataFiles.fromManifest(manifest), schemaString, specString, residuals); this.io = io; - this.manifestTask = manifestTask; + this.manifest = manifest; this.schema = schema; } @Override public CloseableIterable rows() { return CloseableIterable.transform( - ManifestReader.read(io.newInputFile(manifestTask.file().path().toString())).project(schema), + ManifestReader.read(manifest, io).project(schema), file -> (GenericDataFile) file); } - @Override - public DataFile file() { - return manifestTask.file(); - } - - @Override - public PartitionSpec spec() { - return manifestTask.spec(); - } - - @Override - public long start() { - return 0; - } - - @Override - public long length() { - return manifestTask.length(); - } - - @Override - public Expression residual() { - return manifestTask.residual(); - } - @Override public Iterable split(long splitSize) { return ImmutableList.of(this); // don't split diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java index 1b5c0f56f5a7..246e10fa38c1 100644 --- a/core/src/main/java/org/apache/iceberg/FastAppend.java +++ b/core/src/main/java/org/apache/iceberg/FastAppend.java @@ -19,6 +19,8 @@ package org.apache.iceberg; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import java.io.IOException; import java.util.List; @@ -29,6 +31,9 @@ import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.io.OutputFile; +import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED; +import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT; + /** * {@link AppendFiles Append} implementation that adds a new manifest file for the write. *

@@ -37,9 +42,11 @@ class FastAppend extends SnapshotProducer implements AppendFiles { private final TableOperations ops; private final PartitionSpec spec; + private final boolean snapshotIdInheritanceEnabled; private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder(); private final List newFiles = Lists.newArrayList(); private final List appendManifests = Lists.newArrayList(); + private final List rewrittenAppendManifests = Lists.newArrayList(); private ManifestFile newManifest = null; private final AtomicInteger manifestCount = new AtomicInteger(0); private boolean hasNewFiles = false; @@ -48,6 +55,8 @@ class FastAppend extends SnapshotProducer implements AppendFiles { super(ops); this.ops = ops; this.spec = ops.current().spec(); + this.snapshotIdInheritanceEnabled = ops.current() + .propertyAsBoolean(SNAPSHOT_ID_INHERITANCE_ENABLED, SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT); } @Override @@ -81,16 +90,31 @@ public FastAppend appendFile(DataFile file) { @Override public FastAppend appendManifest(ManifestFile manifest) { - // the manifest must be rewritten with this update's snapshot ID - try (ManifestReader reader = ManifestReader.read( - ops.io().newInputFile(manifest.path()), ops.current().specsById())) { + Preconditions.checkArgument(!manifest.hasExistingFiles(), "Cannot append manifest with existing files"); + Preconditions.checkArgument(!manifest.hasDeletedFiles(), "Cannot append manifest with deleted files"); + Preconditions.checkArgument( + manifest.snapshotId() == null || manifest.snapshotId() == -1, + "Snapshot id must be assigned during commit"); + + if (snapshotIdInheritanceEnabled && manifest.snapshotId() == null) { + summaryBuilder.addedManifest(manifest); + appendManifests.add(manifest); + } else { + // the manifest must be rewritten with this update's snapshot ID + ManifestFile copiedManifest = copyManifest(manifest); + rewrittenAppendManifests.add(copiedManifest); + } + + return this; + } + + private ManifestFile copyManifest(ManifestFile manifest) { + try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) { OutputFile newManifestPath = manifestPath(manifestCount.getAndIncrement()); - appendManifests.add(ManifestWriter.copyAppendManifest(reader, newManifestPath, snapshotId(), summaryBuilder)); + return ManifestWriter.copyAppendManifest(reader, newManifestPath, snapshotId(), summaryBuilder); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest); } - - return this; } @Override @@ -106,7 +130,11 @@ public List apply(TableMetadata base) { throw new RuntimeIOException(e, "Failed to write manifest"); } - newManifests.addAll(appendManifests); + // TODO: add sequence numbers here + Iterable appendManifestsWithMetadata = Iterables.transform( + Iterables.concat(appendManifests, rewrittenAppendManifests), + manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()).build()); + Iterables.addAll(newManifests, appendManifestsWithMetadata); if (base.currentSnapshot() != null) { newManifests.addAll(base.currentSnapshot().manifests()); @@ -121,7 +149,9 @@ protected void cleanUncommitted(Set committed) { deleteFile(newManifest.path()); } - for (ManifestFile manifest : appendManifests) { + // clean up only rewrittenAppendManifests as they are always owned by the table + // don't clean up appendManifests as they are added to the manifest list and are not compacted + for (ManifestFile manifest : rewrittenAppendManifests) { if (!committed.contains(manifest)) { deleteFile(manifest.path()); } diff --git a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java index b999baf52e6f..bb741198d794 100644 --- a/core/src/main/java/org/apache/iceberg/GenericManifestFile.java +++ b/core/src/main/java/org/apache/iceberg/GenericManifestFile.java @@ -99,7 +99,7 @@ public GenericManifestFile(org.apache.avro.Schema avroSchema) { this.fromProjectionPos = null; } - public GenericManifestFile(String path, long length, int specId, long snapshotId, + public GenericManifestFile(String path, long length, int specId, Long snapshotId, int addedFilesCount, int existingFilesCount, int deletedFilesCount, List partitions) { this.avroSchema = AVRO_SCHEMA; @@ -117,7 +117,7 @@ public GenericManifestFile(String path, long length, int specId, long snapshotId this.fromProjectionPos = null; } - public GenericManifestFile(String path, long length, int specId, long snapshotId, + public GenericManifestFile(String path, long length, int specId, Long snapshotId, int addedFilesCount, long addedRowsCount, int existingFilesCount, long existingRowsCount, int deletedFilesCount, long deletedRowsCount, List partitions) { @@ -371,4 +371,33 @@ public String toString() { .add("partitions", partitions) .toString(); } + + public static CopyBuilder copyOf(ManifestFile manifestFile) { + return new CopyBuilder(manifestFile); + } + + public static class CopyBuilder { + private final GenericManifestFile manifestFile; + + private CopyBuilder(ManifestFile toCopy) { + if (toCopy instanceof GenericManifestFile) { + this.manifestFile = new GenericManifestFile((GenericManifestFile) toCopy); + } else { + this.manifestFile = new GenericManifestFile( + toCopy.path(), toCopy.length(), toCopy.partitionSpecId(), toCopy.snapshotId(), + toCopy.addedFilesCount(), toCopy.addedRowsCount(), toCopy.existingFilesCount(), + toCopy.existingRowsCount(), toCopy.deletedFilesCount(), toCopy.deletedRowsCount(), + toCopy.partitions()); + } + } + + public CopyBuilder withSnapshotId(Long newSnapshotId) { + manifestFile.snapshotId = newSnapshotId; + return this; + } + + public ManifestFile build() { + return manifestFile; + } + } } diff --git a/core/src/main/java/org/apache/iceberg/GenericPartitionFieldSummary.java b/core/src/main/java/org/apache/iceberg/GenericPartitionFieldSummary.java index 3f8784427c0d..ae1d5a53e33d 100644 --- a/core/src/main/java/org/apache/iceberg/GenericPartitionFieldSummary.java +++ b/core/src/main/java/org/apache/iceberg/GenericPartitionFieldSummary.java @@ -22,6 +22,7 @@ import com.google.common.base.MoreObjects; import java.io.Serializable; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.List; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; @@ -40,8 +41,8 @@ public class GenericPartitionFieldSummary // data fields private boolean containsNull = false; - private ByteBuffer lowerBound = null; - private ByteBuffer upperBound = null; + private byte[] lowerBound = null; + private byte[] upperBound = null; /** * Used by Avro reflection to instantiate this class when reading manifest files. @@ -75,8 +76,8 @@ public GenericPartitionFieldSummary(boolean containsNull, ByteBuffer lowerBound, ByteBuffer upperBound) { this.avroSchema = AVRO_SCHEMA; this.containsNull = containsNull; - this.lowerBound = lowerBound; - this.upperBound = upperBound; + this.lowerBound = ByteBuffers.toByteArray(lowerBound); + this.upperBound = ByteBuffers.toByteArray(upperBound); this.fromProjectionPos = null; } @@ -88,8 +89,8 @@ public GenericPartitionFieldSummary(boolean containsNull, ByteBuffer lowerBound, private GenericPartitionFieldSummary(GenericPartitionFieldSummary toCopy) { this.avroSchema = toCopy.avroSchema; this.containsNull = toCopy.containsNull; - this.lowerBound = ByteBuffers.copy(toCopy.lowerBound); - this.upperBound = ByteBuffers.copy(toCopy.upperBound); + this.lowerBound = toCopy.lowerBound == null ? null : Arrays.copyOf(toCopy.lowerBound, toCopy.lowerBound.length); + this.upperBound = toCopy.upperBound == null ? null : Arrays.copyOf(toCopy.upperBound, toCopy.upperBound.length); this.fromProjectionPos = toCopy.fromProjectionPos; } @@ -106,12 +107,12 @@ public boolean containsNull() { @Override public ByteBuffer lowerBound() { - return lowerBound; + return lowerBound != null ? ByteBuffer.wrap(lowerBound) : null; } @Override public ByteBuffer upperBound() { - return upperBound; + return upperBound != null ? ByteBuffer.wrap(upperBound) : null; } @Override @@ -135,9 +136,9 @@ public Object get(int i) { case 0: return containsNull; case 1: - return lowerBound; + return lowerBound(); case 2: - return upperBound; + return upperBound(); default: throw new UnsupportedOperationException("Unknown field ordinal: " + pos); } @@ -156,10 +157,10 @@ public void set(int i, T value) { this.containsNull = (Boolean) value; return; case 1: - this.lowerBound = (ByteBuffer) value; + this.lowerBound = ByteBuffers.toByteArray((ByteBuffer) value); return; case 2: - this.upperBound = (ByteBuffer) value; + this.upperBound = ByteBuffers.toByteArray((ByteBuffer) value); return; default: // ignore the object, it must be from a newer version of the format diff --git a/core/src/main/java/org/apache/iceberg/InheritableMetadata.java b/core/src/main/java/org/apache/iceberg/InheritableMetadata.java new file mode 100644 index 000000000000..5eebe8a01861 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/InheritableMetadata.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import java.io.Serializable; + +interface InheritableMetadata extends Serializable { + ManifestEntry apply(ManifestEntry manifestEntry); +} diff --git a/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java b/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java new file mode 100644 index 000000000000..14bf2ff176c7 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/InheritableMetadataFactory.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +class InheritableMetadataFactory { + + private static final InheritableMetadata EMPTY = new EmptyInheritableMetadata(); + + private InheritableMetadataFactory() {} + + static InheritableMetadata empty() { + return EMPTY; + } + + static InheritableMetadata fromManifest(ManifestFile manifest) { + return new BaseInheritableMetadata(manifest.snapshotId()); + } + + static class BaseInheritableMetadata implements InheritableMetadata { + + private final Long snapshotId; + + private BaseInheritableMetadata(Long snapshotId) { + this.snapshotId = snapshotId; + } + + public ManifestEntry apply(ManifestEntry manifestEntry) { + if (manifestEntry.snapshotId() == null) { + manifestEntry.setSnapshotId(snapshotId); + } + return manifestEntry; + } + } + + static class EmptyInheritableMetadata implements InheritableMetadata { + + private EmptyInheritableMetadata() {} + + public ManifestEntry apply(ManifestEntry manifestEntry) { + if (manifestEntry.snapshotId() == null) { + throw new IllegalArgumentException("Entries must have explicit snapshot ids if inherited metadata is empty"); + } + return manifestEntry; + } + } +} diff --git a/core/src/main/java/org/apache/iceberg/ManifestEntry.java b/core/src/main/java/org/apache/iceberg/ManifestEntry.java index c296c9724fa7..728d923830b8 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestEntry.java +++ b/core/src/main/java/org/apache/iceberg/ManifestEntry.java @@ -28,6 +28,7 @@ import org.apache.iceberg.types.Types.LongType; import org.apache.iceberg.types.Types.StructType; +import static org.apache.iceberg.types.Types.NestedField.optional; import static org.apache.iceberg.types.Types.NestedField.required; class ManifestEntry implements IndexedRecord, SpecificData.SchemaConstructable { @@ -49,7 +50,7 @@ public int id() { private final org.apache.avro.Schema schema; private Status status = Status.EXISTING; - private long snapshotId = 0L; + private Long snapshotId = null; private DataFile file = null; ManifestEntry(org.apache.avro.Schema schema) { @@ -71,21 +72,21 @@ private ManifestEntry(ManifestEntry toCopy, boolean fullCopy) { } } - ManifestEntry wrapExisting(long newSnapshotId, DataFile newFile) { + ManifestEntry wrapExisting(Long newSnapshotId, DataFile newFile) { this.status = Status.EXISTING; this.snapshotId = newSnapshotId; this.file = newFile; return this; } - ManifestEntry wrapAppend(long newSnapshotId, DataFile newFile) { + ManifestEntry wrapAppend(Long newSnapshotId, DataFile newFile) { this.status = Status.ADDED; this.snapshotId = newSnapshotId; this.file = newFile; return this; } - ManifestEntry wrapDelete(long newSnapshotId, DataFile newFile) { + ManifestEntry wrapDelete(Long newSnapshotId, DataFile newFile) { this.status = Status.DELETED; this.snapshotId = newSnapshotId; this.file = newFile; @@ -102,7 +103,7 @@ public Status status() { /** * @return id of the snapshot in which the file was added to the table */ - public long snapshotId() { + public Long snapshotId() { return snapshotId; } @@ -121,6 +122,10 @@ public ManifestEntry copyWithoutStats() { return new ManifestEntry(this, false /* drop stats */); } + public void setSnapshotId(Long snapshotId) { + this.snapshotId = snapshotId; + } + @Override public void put(int i, Object v) { switch (i) { @@ -170,7 +175,7 @@ static Schema wrapFileSchema(StructType fileStruct) { // ids for top-level columns are assigned from 1000 return new Schema( required(0, "status", IntegerType.get()), - required(1, "snapshot_id", LongType.get()), + optional(1, "snapshot_id", LongType.get()), required(2, "data_file", fileStruct)); } diff --git a/core/src/main/java/org/apache/iceberg/ManifestGroup.java b/core/src/main/java/org/apache/iceberg/ManifestGroup.java index 968137a73c8a..0b2dd91a77f5 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestGroup.java +++ b/core/src/main/java/org/apache/iceberg/ManifestGroup.java @@ -198,9 +198,7 @@ private Iterable> entries( Iterable> readers = Iterables.transform( matchingManifests, manifest -> { - ManifestReader reader = ManifestReader.read( - io.newInputFile(manifest.path()), - specsById); + ManifestReader reader = ManifestReader.read(manifest, io, specsById); FilteredManifest filtered = reader .filterRows(dataFilter) diff --git a/core/src/main/java/org/apache/iceberg/ManifestReader.java b/core/src/main/java/org/apache/iceberg/ManifestReader.java index 0f9c694b9d25..1ae8e3bf44e8 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestReader.java +++ b/core/src/main/java/org/apache/iceberg/ManifestReader.java @@ -34,6 +34,7 @@ import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableGroup; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.types.Types; import org.slf4j.Logger; @@ -44,7 +45,8 @@ /** * Reader for manifest files. *

- * Readers are created using the builder from {@link #read(InputFile, Map)}. + * The preferable way to create readers is using {@link #read(ManifestFile, FileIO, Map)} as + * it allows entries to inherit manifest metadata such as snapshot id. */ public class ManifestReader extends CloseableGroup implements Filterable { private static final Logger LOG = LoggerFactory.getLogger(ManifestReader.class); @@ -60,29 +62,65 @@ public class ManifestReader extends CloseableGroup implements Filterable - * Note: Most callers should use {@link #read(InputFile, Map)} to ensure that the - * schema used by filters is the latest table schema. This should be used only when reading a - * manifest without filters. + * Note: Most callers should use {@link #read(ManifestFile, FileIO, Map)} to ensure that + * manifest entries with partial metadata can inherit missing properties from the manifest metadata. + *

+ * Note: Most callers should use {@link #read(InputFile, Map)} if all manifest entries + * contain full metadata and they want to ensure that the schema used by filters is the latest + * table schema. This should be used only when reading a manifest without filters. * * @param file an InputFile * @return a manifest reader */ public static ManifestReader read(InputFile file) { - return new ManifestReader(file, null); + return new ManifestReader(file, null, InheritableMetadataFactory.empty()); } /** * Returns a new {@link ManifestReader} for an {@link InputFile}. + *

+ * Note: Most callers should use {@link #read(ManifestFile, FileIO, Map)} to ensure that + * manifest entries with partial metadata can inherit missing properties from the manifest metadata. * * @param file an InputFile * @param specsById a Map from spec ID to partition spec * @return a manifest reader */ public static ManifestReader read(InputFile file, Map specsById) { - return new ManifestReader(file, specsById); + return new ManifestReader(file, specsById, InheritableMetadataFactory.empty()); + } + + /** + * Returns a new {@link ManifestReader} for a {@link ManifestFile}. + *

+ * Note: Most callers should use {@link #read(ManifestFile, FileIO, Map)} to ensure + * the schema used by filters is the latest table schema. This should be used only when reading + * a manifest without filters. + * + * @param manifest a ManifestFile + * @param io a FileIO + * @return a manifest reader + */ + public static ManifestReader read(ManifestFile manifest, FileIO io) { + return read(manifest, io, null); + } + + /** + * Returns a new {@link ManifestReader} for a {@link ManifestFile}. + * + * @param manifest a ManifestFile + * @param io a FileIO + * @param specsById a Map from spec ID to partition spec + * @return a manifest reader + */ + public static ManifestReader read(ManifestFile manifest, FileIO io, Map specsById) { + InputFile file = io.newInputFile(manifest.path()); + InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest); + return new ManifestReader(file, specsById, inheritableMetadata); } private final InputFile file; + private final InheritableMetadata inheritableMetadata; private final Map metadata; private final PartitionSpec spec; private final Schema fileSchema; @@ -91,8 +129,10 @@ public static ManifestReader read(InputFile file, Map sp private List cachedAdds = null; private List cachedDeletes = null; - private ManifestReader(InputFile file, Map specsById) { + private ManifestReader(InputFile file, Map specsById, + InheritableMetadata inheritableMetadata) { this.file = file; + this.inheritableMetadata = inheritableMetadata; try { try (AvroIterable headerReader = Avro.read(file) @@ -217,7 +257,7 @@ CloseableIterable entries(Schema fileProjection) { addCloseable(reader); - return reader; + return CloseableIterable.transform(reader, inheritableMetadata::apply); default: throw new UnsupportedOperationException("Invalid format for manifest file: " + format); diff --git a/core/src/main/java/org/apache/iceberg/ManifestWriter.java b/core/src/main/java/org/apache/iceberg/ManifestWriter.java index 6facccfbc3cd..c99c491ee39b 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestWriter.java +++ b/core/src/main/java/org/apache/iceberg/ManifestWriter.java @@ -85,21 +85,21 @@ static ManifestFile copyManifest(ManifestReader reader, OutputFile outputFile, l /** * Create a new {@link ManifestWriter}. *

- * Manifests created by this writer are not part of a snapshot and have all entry snapshot IDs - * set to -1. + * Manifests created by this writer have all entry snapshot IDs set to null. + * All entries will inherit the snapshot ID that will be assigned to the manifest on commit. * * @param spec {@link PartitionSpec} used to produce {@link DataFile} partition tuples * @param outputFile the destination file location * @return a manifest writer */ public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) { - return new ManifestWriter(spec, outputFile, -1); + return new ManifestWriter(spec, outputFile, null); } private final OutputFile file; private final int specId; private final FileAppender writer; - private final long snapshotId; + private final Long snapshotId; private final ManifestEntry reused; private final PartitionSummary stats; @@ -111,7 +111,7 @@ public static ManifestWriter write(PartitionSpec spec, OutputFile outputFile) { private int deletedFiles = 0; private long deletedRows = 0L; - ManifestWriter(PartitionSpec spec, OutputFile file, long snapshotId) { + ManifestWriter(PartitionSpec spec, OutputFile file, Long snapshotId) { this.file = file; this.specId = spec.specId(); this.writer = newAppender(FileFormat.AVRO, spec, file); diff --git a/core/src/main/java/org/apache/iceberg/MergeAppend.java b/core/src/main/java/org/apache/iceberg/MergeAppend.java index e84f45d522a5..063315f68b8c 100644 --- a/core/src/main/java/org/apache/iceberg/MergeAppend.java +++ b/core/src/main/java/org/apache/iceberg/MergeAppend.java @@ -19,6 +19,7 @@ package org.apache.iceberg; +import com.google.common.base.Preconditions; import org.apache.iceberg.exceptions.CommitFailedException; /** @@ -49,6 +50,11 @@ public MergeAppend appendFile(DataFile file) { @Override public AppendFiles appendManifest(ManifestFile manifest) { + Preconditions.checkArgument(!manifest.hasExistingFiles(), "Cannot append manifest with existing files"); + Preconditions.checkArgument(!manifest.hasDeletedFiles(), "Cannot append manifest with deleted files"); + Preconditions.checkArgument( + manifest.snapshotId() == null || manifest.snapshotId() == -1, + "Snapshot id must be assigned during commit"); add(manifest); return this; } diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index bb6df4aad0d5..e08ae494787a 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -57,6 +57,8 @@ import static org.apache.iceberg.TableProperties.MANIFEST_MIN_MERGE_COUNT_DEFAULT; import static org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES; import static org.apache.iceberg.TableProperties.MANIFEST_TARGET_SIZE_BYTES_DEFAULT; +import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED; +import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT; abstract class MergingSnapshotProducer extends SnapshotProducer { private static final Logger LOG = LoggerFactory.getLogger(MergingSnapshotProducer.class); @@ -82,11 +84,13 @@ public String partition() { private final int minManifestsCountToMerge; private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder(); private final boolean mergeEnabled; + private final boolean snapshotIdInheritanceEnabled; // update data private final AtomicInteger manifestCount = new AtomicInteger(0); private final List newFiles = Lists.newArrayList(); private final List appendManifests = Lists.newArrayList(); + private final List rewrittenAppendManifests = Lists.newArrayList(); private final SnapshotSummary.Builder appendedManifestsSummary = SnapshotSummary.builder(); private final Set deletePaths = Sets.newHashSet(); private final Set deleteFilePartitions = Sets.newHashSet(); @@ -123,6 +127,8 @@ public String partition() { .propertyAsInt(MANIFEST_MIN_MERGE_COUNT, MANIFEST_MIN_MERGE_COUNT_DEFAULT); this.mergeEnabled = ops.current() .propertyAsBoolean(TableProperties.MANIFEST_MERGE_ENABLED, TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT); + this.snapshotIdInheritanceEnabled = ops.current() + .propertyAsBoolean(SNAPSHOT_ID_INHERITANCE_ENABLED, SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT); } @Override @@ -203,17 +209,29 @@ protected void add(DataFile file) { * Add all files in a manifest to the new snapshot. */ protected void add(ManifestFile manifest) { - // the manifest must be rewritten with this update's snapshot ID - try (ManifestReader reader = ManifestReader.read( - ops.io().newInputFile(manifest.path()), ops.current().specsById())) { - ManifestFile manifestFile = ManifestWriter.copyAppendManifest( - reader, manifestPath(manifestCount.getAndIncrement()), snapshotId(), appendedManifestsSummary); - appendManifests.add(manifestFile); - // keep reference of the first appended manifest, so that we can avoid merging first bin(s) - // which has the first appended manifest and have not crossed the limit of minManifestsCountToMerge - if (firstAppendedManifest == null) { - firstAppendedManifest = manifestFile; - } + ManifestFile appendedManifest; + if (snapshotIdInheritanceEnabled && manifest.snapshotId() == null) { + appendedManifestsSummary.addedManifest(manifest); + appendManifests.add(manifest); + appendedManifest = manifest; + } else { + // the manifest must be rewritten with this update's snapshot ID + ManifestFile copiedManifest = copyManifest(manifest); + rewrittenAppendManifests.add(copiedManifest); + appendedManifest = copiedManifest; + } + + // keep reference of the first appended manifest, so that we can avoid merging first bin(s) + // which has the first appended manifest and have not crossed the limit of minManifestsCountToMerge + if (firstAppendedManifest == null) { + firstAppendedManifest = appendedManifest; + } + } + + private ManifestFile copyManifest(ManifestFile manifest) { + try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) { + OutputFile newManifestPath = manifestPath(manifestCount.getAndIncrement()); + return ManifestWriter.copyAppendManifest(reader, newManifestPath, snapshotId(), appendedManifestsSummary); } catch (IOException e) { throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest); } @@ -250,11 +268,17 @@ public List apply(TableMetadata base) { summaryBuilder.addedFile(spec, file); } - newManifests = Iterables.concat(ImmutableList.of(newFilesAsManifest()), appendManifests); + ManifestFile newManifest = newFilesAsManifest(); + newManifests = Iterables.concat(ImmutableList.of(newManifest), appendManifests, rewrittenAppendManifests); } else { - newManifests = appendManifests; + newManifests = Iterables.concat(appendManifests, rewrittenAppendManifests); } + // TODO: add sequence numbers here + Iterable newManifestsWithMetadata = Iterables.transform( + newManifests, + manifest -> GenericManifestFile.copyOf(manifest).withSnapshotId(snapshotId()).build()); + // filter any existing manifests List filtered; if (current != null) { @@ -265,7 +289,7 @@ public List apply(TableMetadata base) { } Iterable unmergedManifests = Iterables.filter( - Iterables.concat(newManifests, filtered), + Iterables.concat(newManifestsWithMetadata, filtered), // only keep manifests that have live data files or that were written by this commit manifest -> manifest.hasAddedFiles() || manifest.hasExistingFiles() || manifest.snapshotId() == snapshotId()); @@ -381,11 +405,23 @@ private void cleanUncommittedAppends(Set committed) { this.cachedNewManifest = null; } - for (ManifestFile manifest : appendManifests) { + // rewritten manifests are always owned by the table + for (ManifestFile manifest : rewrittenAppendManifests) { if (!committed.contains(manifest)) { deleteFile(manifest.path()); } } + + // manifests that are not rewritten are only owned by the table if the commit succeeded + if (!committed.isEmpty()) { + // the commit succeeded if at least one manifest was committed + // the table now owns appendManifests; clean up any that are not used + for (ManifestFile manifest : appendManifests) { + if (!committed.contains(manifest)) { + deleteFile(manifest.path()); + } + } + } } @Override @@ -447,8 +483,7 @@ private ManifestFile filterManifest(StrictMetricsEvaluator metricsEvaluator, return manifest; } - try (ManifestReader reader = ManifestReader.read( - ops.io().newInputFile(manifest.path()), ops.current().specsById())) { + try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) { // this is reused to compare file paths with the delete set CharSequenceWrapper pathWrapper = CharSequenceWrapper.wrap(""); @@ -625,8 +660,7 @@ private ManifestFile createManifest(int specId, List bin) throws I ManifestWriter writer = new ManifestWriter(ops.current().spec(specId), out, snapshotId()); try { for (ManifestFile manifest : bin) { - try (ManifestReader reader = ManifestReader.read( - ops.io().newInputFile(manifest.path()), ops.current().specsById())) { + try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) { for (ManifestEntry entry : reader.entries()) { if (entry.status() == Status.DELETED) { // suppress deletes from previous snapshots. only files deleted by this snapshot diff --git a/core/src/main/java/org/apache/iceberg/PartitionSummary.java b/core/src/main/java/org/apache/iceberg/PartitionSummary.java index cc2f182928ee..b028319944a6 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionSummary.java +++ b/core/src/main/java/org/apache/iceberg/PartitionSummary.java @@ -19,10 +19,10 @@ package org.apache.iceberg; -import com.google.common.collect.Lists; import java.util.Arrays; import java.util.Comparator; import java.util.List; +import java.util.stream.Collectors; import org.apache.iceberg.ManifestFile.PartitionFieldSummary; import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.Conversions; @@ -43,7 +43,7 @@ class PartitionSummary { } List summaries() { - return Lists.transform(Arrays.asList(fields), PartitionFieldStats::toSummary); + return Arrays.stream(fields).map(PartitionFieldStats::toSummary).collect(Collectors.toList()); } public void update(StructLike partitionKey) { diff --git a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java index c80c6c8c156e..07a622b26466 100644 --- a/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java +++ b/core/src/main/java/org/apache/iceberg/RemoveSnapshots.java @@ -189,7 +189,7 @@ private void cleanExpiredFiles(List snapshots, Set validIds, Set Set ancestorIds = Sets.newHashSet(SnapshotUtil.ancestorIds(base.currentSnapshot(), base::snapshot)); Set validManifests = Sets.newHashSet(); - Set manifestsToScan = Sets.newHashSet(); + Set manifestsToScan = Sets.newHashSet(); for (Snapshot snapshot : snapshots) { try (CloseableIterable manifests = readManifestFiles(snapshot)) { for (ManifestFile manifest : manifests) { @@ -198,7 +198,7 @@ private void cleanExpiredFiles(List snapshots, Set validIds, Set boolean fromValidSnapshots = validIds.contains(manifest.snapshotId()); boolean isFromAncestor = ancestorIds.contains(manifest.snapshotId()); if (!fromValidSnapshots && isFromAncestor && manifest.hasDeletedFiles()) { - manifestsToScan.add(manifest.path()); + manifestsToScan.add(manifest); } } @@ -210,7 +210,7 @@ private void cleanExpiredFiles(List snapshots, Set validIds, Set Set manifestListsToDelete = Sets.newHashSet(); Set manifestsToDelete = Sets.newHashSet(); - Set manifestsToRevert = Sets.newHashSet(); + Set manifestsToRevert = Sets.newHashSet(); for (Snapshot snapshot : base.snapshots()) { long snapshotId = snapshot.snapshotId(); if (!validIds.contains(snapshotId)) { @@ -228,7 +228,7 @@ private void cleanExpiredFiles(List snapshots, Set validIds, Set // snapshot is an ancestor of the current table state. Otherwise, a snapshot that // deleted files and was rolled back will delete files that could be in the current // table state. - manifestsToScan.add(manifest.path()); + manifestsToScan.add(manifest); } if (!isFromAncestor && isFromExpiringSnapshot && manifest.hasAddedFiles()) { @@ -239,7 +239,7 @@ private void cleanExpiredFiles(List snapshots, Set validIds, Set // written and this expiration is known and there is no missing history. If history // were missing, then the snapshot could be an ancestor of the table state but the // ancestor ID set would not contain it and this would be unsafe. - manifestsToRevert.add(manifest.path()); + manifestsToRevert.add(manifest); } } } @@ -273,7 +273,8 @@ private void deleteMetadataFiles(Set manifestsToDelete, Set mani .run(deleteFunc::accept); } - private void deleteDataFiles(Set manifestsToScan, Set manifestsToRevert, Set validIds) { + private void deleteDataFiles(Set manifestsToScan, Set manifestsToRevert, + Set validIds) { Set filesToDelete = findFilesToDelete(manifestsToScan, manifestsToRevert, validIds); Tasks.foreach(filesToDelete) .noRetry().suppressFailureWhenFinished() @@ -281,8 +282,8 @@ private void deleteDataFiles(Set manifestsToScan, Set manifestsT .run(file -> deleteFunc.accept(file)); } - private Set findFilesToDelete( - Set manifestsToScan, Set manifestsToRevert, Set validIds) { + private Set findFilesToDelete(Set manifestsToScan, Set manifestsToRevert, + Set validIds) { Set filesToDelete = new ConcurrentSet<>(); Tasks.foreach(manifestsToScan) .noRetry().suppressFailureWhenFinished() @@ -290,8 +291,7 @@ private Set findFilesToDelete( .onFailure((item, exc) -> LOG.warn("Failed to get deleted files: this may cause orphaned data files", exc)) .run(manifest -> { // the manifest has deletes, scan it to find files to delete - try (ManifestReader reader = ManifestReader.read( - ops.io().newInputFile(manifest), ops.current().specsById())) { + try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) { for (ManifestEntry entry : reader.entries()) { // if the snapshot ID of the DELETE entry is no longer valid, the data can be deleted if (entry.status() == ManifestEntry.Status.DELETED && @@ -311,8 +311,7 @@ private Set findFilesToDelete( .onFailure((item, exc) -> LOG.warn("Failed to get added files: this may cause orphaned data files", exc)) .run(manifest -> { // the manifest has deletes, scan it to find files to delete - try (ManifestReader reader = ManifestReader.read( - ops.io().newInputFile(manifest), ops.current().specsById())) { + try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) { for (ManifestEntry entry : reader.entries()) { // delete any ADDED file from manifests that were reverted if (entry.status() == ManifestEntry.Status.ADDED) { diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java index 3aab6d33327a..05802267a3c8 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java @@ -308,8 +308,7 @@ protected long snapshotId() { } private static ManifestFile addMetadata(TableOperations ops, ManifestFile manifest) { - try (ManifestReader reader = ManifestReader.read( - ops.io().newInputFile(manifest.path()), ops.current().specsById())) { + try (ManifestReader reader = ManifestReader.read(manifest, ops.io(), ops.current().specsById())) { PartitionSummary stats = new PartitionSummary(ops.current().spec(manifest.partitionSpecId())); int addedFiles = 0; long addedRows = 0L; diff --git a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java index 9f422226332d..b9d0e9284b41 100644 --- a/core/src/main/java/org/apache/iceberg/SnapshotSummary.java +++ b/core/src/main/java/org/apache/iceberg/SnapshotSummary.java @@ -77,6 +77,16 @@ public void addedFile(PartitionSpec spec, DataFile file) { this.addedRecords += file.recordCount(); } + public void addedManifest(ManifestFile manifest) { + this.addedFiles += manifest.addedFilesCount(); + this.addedRecords += manifest.addedRowsCount(); + } + + public void deletedManifest(ManifestFile manifest) { + this.deletedFiles += manifest.deletedFilesCount(); + this.deletedRecords += manifest.deletedRowsCount(); + } + public void set(String property, String value) { properties.put(property, value); } diff --git a/core/src/main/java/org/apache/iceberg/TableProperties.java b/core/src/main/java/org/apache/iceberg/TableProperties.java index 4db587a1f0be..fc0712e94612 100644 --- a/core/src/main/java/org/apache/iceberg/TableProperties.java +++ b/core/src/main/java/org/apache/iceberg/TableProperties.java @@ -113,4 +113,7 @@ private TableProperties() {} public static final String WRITE_TARGET_FILE_SIZE_BYTES = "write.target-file-size-bytes"; public static final long WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT = Long.MAX_VALUE; + + public static final String SNAPSHOT_ID_INHERITANCE_ENABLED = "compatibility.snapshot-id-inheritance.enabled"; + public static final boolean SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT = false; } diff --git a/core/src/test/java/org/apache/iceberg/TableTestBase.java b/core/src/test/java/org/apache/iceberg/TableTestBase.java index 59c7cc763f6e..012d177d90b6 100644 --- a/core/src/test/java/org/apache/iceberg/TableTestBase.java +++ b/core/src/test/java/org/apache/iceberg/TableTestBase.java @@ -29,6 +29,7 @@ import java.util.Iterator; import java.util.List; import java.util.Set; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.types.Types; import org.junit.After; @@ -37,7 +38,6 @@ import org.junit.Rule; import org.junit.rules.TemporaryFolder; -import static org.apache.iceberg.Files.localInput; import static org.apache.iceberg.types.Types.NestedField.required; public class TableTestBase { @@ -77,6 +77,8 @@ public class TableTestBase { .withRecordCount(1) .build(); + static final FileIO FILE_IO = new TestTables.LocalFileIO(); + @Rule public TemporaryFolder temp = new TemporaryFolder(); @@ -174,7 +176,7 @@ ManifestFile writeManifestWithName(String name, DataFile... files) throws IOExce return writer.toManifestFile(); } - ManifestEntry manifestEntry(ManifestEntry.Status status, long snapshotId, DataFile file) { + ManifestEntry manifestEntry(ManifestEntry.Status status, Long snapshotId, DataFile file) { ManifestEntry entry = new ManifestEntry(table.spec().partitionType()); switch (status) { case ADDED: @@ -205,10 +207,10 @@ void validateSnapshot(Snapshot old, Snapshot snap, DataFile... newFiles) { long id = snap.snapshotId(); Iterator newPaths = paths(newFiles).iterator(); - for (ManifestEntry entry : ManifestReader.read(localInput(manifest.path())).entries()) { + for (ManifestEntry entry : ManifestReader.read(manifest, FILE_IO).entries()) { DataFile file = entry.file(); Assert.assertEquals("Path should match expected", newPaths.next(), file.path().toString()); - Assert.assertEquals("File's snapshot ID should match", id, entry.snapshotId()); + Assert.assertEquals("File's snapshot ID should match", id, (long) entry.snapshotId()); } Assert.assertFalse("Should find all files in the manifest", newPaths.hasNext()); @@ -237,19 +239,13 @@ List paths(DataFile... dataFiles) { static void validateManifest(ManifestFile manifest, Iterator ids, Iterator expectedFiles) { - validateManifest(manifest.path(), ids, expectedFiles); - } - - static void validateManifest(String manifest, - Iterator ids, - Iterator expectedFiles) { - for (ManifestEntry entry : ManifestReader.read(localInput(manifest)).entries()) { + for (ManifestEntry entry : ManifestReader.read(manifest, FILE_IO).entries()) { DataFile file = entry.file(); DataFile expected = expectedFiles.next(); Assert.assertEquals("Path should match expected", expected.path().toString(), file.path().toString()); Assert.assertEquals("Snapshot ID should match expected ID", - (long) ids.next(), entry.snapshotId()); + ids.next(), entry.snapshotId()); } Assert.assertFalse("Should find all files in the manifest", expectedFiles.hasNext()); @@ -259,21 +255,14 @@ static void validateManifestEntries(ManifestFile manifest, Iterator ids, Iterator expectedFiles, Iterator expectedStatuses) { - validateManifestEntries(manifest.path(), ids, expectedFiles, expectedStatuses); - } - - static void validateManifestEntries(String manifest, - Iterator ids, - Iterator expectedFiles, - Iterator expectedStatuses) { - for (ManifestEntry entry : ManifestReader.read(localInput(manifest)).entries()) { + for (ManifestEntry entry : ManifestReader.read(manifest, FILE_IO).entries()) { DataFile file = entry.file(); DataFile expected = expectedFiles.next(); final ManifestEntry.Status expectedStatus = expectedStatuses.next(); Assert.assertEquals("Path should match expected", expected.path().toString(), file.path().toString()); Assert.assertEquals("Snapshot ID should match expected ID", - (long) ids.next(), entry.snapshotId()); + ids.next(), entry.snapshotId()); Assert.assertEquals("Entry status should match expected ID", expectedStatus, entry.status()); } @@ -294,6 +283,6 @@ static Iterator files(DataFile... files) { } static Iterator files(ManifestFile manifest) { - return ManifestReader.read(localInput(manifest.path())).iterator(); + return ManifestReader.read(manifest, FILE_IO).iterator(); } } diff --git a/core/src/test/java/org/apache/iceberg/TestFastAppend.java b/core/src/test/java/org/apache/iceberg/TestFastAppend.java index de2d34f455c8..f8cb6d28f653 100644 --- a/core/src/test/java/org/apache/iceberg/TestFastAppend.java +++ b/core/src/test/java/org/apache/iceberg/TestFastAppend.java @@ -25,6 +25,7 @@ import java.io.IOException; import java.util.List; import java.util.Set; +import org.apache.iceberg.ManifestEntry.Status; import org.apache.iceberg.exceptions.CommitFailedException; import org.junit.Assert; import org.junit.Test; @@ -277,4 +278,95 @@ public void testRecoveryWithoutManifestList() { Assert.assertTrue("Should commit the same new manifest", metadata.currentSnapshot().manifests().contains(newManifest)); } + + @Test + public void testAppendManifestWithSnapshotIdInheritance() throws IOException { + table.updateProperties() + .set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, "true") + .commit(); + + Assert.assertEquals("Table should start empty", 0, listManifestFiles().size()); + + TableMetadata base = readMetadata(); + Assert.assertNull("Should not have a current snapshot", base.currentSnapshot()); + + ManifestFile manifest = writeManifest(FILE_A, FILE_B); + table.newFastAppend() + .appendManifest(manifest) + .commit(); + + Snapshot snapshot = table.currentSnapshot(); + List manifests = table.currentSnapshot().manifests(); + Assert.assertEquals("Should have 1 committed manifest", 1, manifests.size()); + + validateManifestEntries(manifests.get(0), + ids(snapshot.snapshotId(), snapshot.snapshotId()), + files(FILE_A, FILE_B), + statuses(Status.ADDED, Status.ADDED)); + + // validate that the metadata summary is correct when using appendManifest + Assert.assertEquals("Summary metadata should include 2 added files", + "2", snapshot.summary().get("added-data-files")); + Assert.assertEquals("Summary metadata should include 2 added records", + "2", snapshot.summary().get("added-records")); + Assert.assertEquals("Summary metadata should include 2 files in total", + "2", snapshot.summary().get("total-data-files")); + Assert.assertEquals("Summary metadata should include 2 records in total", + "2", snapshot.summary().get("total-records")); + } + + @Test + public void testAppendManifestFailureWithSnapshotIdInheritance() throws IOException { + table.updateProperties() + .set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, "true") + .commit(); + + Assert.assertEquals("Table should start empty", 0, listManifestFiles().size()); + + TableMetadata base = readMetadata(); + Assert.assertNull("Should not have a current snapshot", base.currentSnapshot()); + + table.updateProperties() + .set(TableProperties.COMMIT_NUM_RETRIES, "1") + .commit(); + + table.ops().failCommits(5); + + ManifestFile manifest = writeManifest(FILE_A, FILE_B); + + AppendFiles append = table.newAppend(); + append.appendManifest(manifest); + + AssertHelpers.assertThrows("Should reject commit", + CommitFailedException.class, "Injected failure", + append::commit); + + Assert.assertTrue("Append manifest should not be deleted", new File(manifest.path()).exists()); + } + + @Test + public void testInvalidAppendManifest() throws IOException { + Assert.assertEquals("Table should start empty", 0, listManifestFiles().size()); + + TableMetadata base = readMetadata(); + Assert.assertNull("Should not have a current snapshot", base.currentSnapshot()); + + ManifestFile manifestWithExistingFiles = writeManifest( + "manifest-file-1.avro", + manifestEntry(Status.EXISTING, null, FILE_A)); + AssertHelpers.assertThrows("Should reject commit", + IllegalArgumentException.class, "Cannot append manifest with existing files", + () -> table.newFastAppend() + .appendManifest(manifestWithExistingFiles) + .commit()); + + ManifestFile manifestWithDeletedFiles = writeManifest( + "manifest-file-2.avro", + manifestEntry(Status.DELETED, null, FILE_A)); + AssertHelpers.assertThrows("Should reject commit", + IllegalArgumentException.class, "Cannot append manifest with deleted files", + () -> table.newFastAppend() + .appendManifest(manifestWithDeletedFiles) + .commit()); + } } diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReader.java b/core/src/test/java/org/apache/iceberg/TestManifestReader.java new file mode 100644 index 000000000000..c10ffe63c769 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestManifestReader.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg; + +import com.google.common.collect.Iterables; +import java.io.IOException; +import org.apache.iceberg.ManifestEntry.Status; +import org.junit.Assert; +import org.junit.Test; + +public class TestManifestReader extends TableTestBase { + + @Test + public void testManifestReaderWithEmptyInheritableMetadata() throws IOException { + ManifestFile manifest = writeManifest("manifest.avro", manifestEntry(Status.EXISTING, 1000L, FILE_A)); + try (ManifestReader reader = ManifestReader.read(FILE_IO.newInputFile(manifest.path()))) { + ManifestEntry entry = Iterables.getOnlyElement(reader.entries()); + Assert.assertEquals(Status.EXISTING, entry.status()); + Assert.assertEquals(FILE_A.path(), entry.file().path()); + Assert.assertEquals(1000L, (long) entry.snapshotId()); + } + } + + @Test + public void testInvalidUsage() throws IOException { + ManifestFile manifest = writeManifest(FILE_A, FILE_B); + try (ManifestReader reader = ManifestReader.read(FILE_IO.newInputFile(manifest.path()))) { + AssertHelpers.assertThrows( + "Should not be possible to read manifest without explicit snapshot ids and inheritable metadata", + IllegalArgumentException.class, "must have explicit snapshot ids", + () -> Iterables.getOnlyElement(reader.entries())); + } + } +} diff --git a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java index 01d2b7f735f0..21690e43383d 100644 --- a/core/src/test/java/org/apache/iceberg/TestMergeAppend.java +++ b/core/src/test/java/org/apache/iceberg/TestMergeAppend.java @@ -23,6 +23,7 @@ import com.google.common.collect.Sets; import java.io.File; import java.io.IOException; +import java.util.List; import java.util.Set; import org.apache.iceberg.ManifestEntry.Status; import org.apache.iceberg.exceptions.CommitFailedException; @@ -522,4 +523,129 @@ public void testRecovery() { Assert.assertEquals("Should commit the same new manifest during retry", Lists.newArrayList(newManifest), metadata.currentSnapshot().manifests()); } + + @Test + public void testAppendManifestWithSnapshotIdInheritance() throws IOException { + table.updateProperties() + .set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, "true") + .commit(); + + Assert.assertEquals("Table should start empty", 0, listManifestFiles().size()); + + TableMetadata base = readMetadata(); + Assert.assertNull("Should not have a current snapshot", base.currentSnapshot()); + + ManifestFile manifest = writeManifest(FILE_A, FILE_B); + table.newAppend() + .appendManifest(manifest) + .commit(); + + Snapshot snapshot = table.currentSnapshot(); + long snapshotId = snapshot.snapshotId(); + List manifests = table.currentSnapshot().manifests(); + Assert.assertEquals("Should have 1 committed manifest", 1, manifests.size()); + + validateManifestEntries(manifests.get(0), + ids(snapshotId, snapshotId), + files(FILE_A, FILE_B), + statuses(Status.ADDED, Status.ADDED)); + + // validate that the metadata summary is correct when using appendManifest + Assert.assertEquals("Summary metadata should include 2 added files", + "2", snapshot.summary().get("added-data-files")); + Assert.assertEquals("Summary metadata should include 2 added records", + "2", snapshot.summary().get("added-records")); + Assert.assertEquals("Summary metadata should include 2 files in total", + "2", snapshot.summary().get("total-data-files")); + Assert.assertEquals("Summary metadata should include 2 records in total", + "2", snapshot.summary().get("total-records")); + } + + @Test + public void testMergedAppendManifestCleanupWithSnapshotIdInheritance() throws IOException { + table.updateProperties() + .set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, "true") + .commit(); + + Assert.assertEquals("Table should start empty", 0, listManifestFiles().size()); + + TableMetadata base = readMetadata(); + Assert.assertNull("Should not have a current snapshot", base.currentSnapshot()); + + table.updateProperties() + .set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "1") + .commit(); + + ManifestFile manifest1 = writeManifestWithName("manifest-file-1.avro", FILE_A, FILE_B); + table.newAppend() + .appendManifest(manifest1) + .commit(); + + Assert.assertTrue("Unmerged append manifest should not be deleted", new File(manifest1.path()).exists()); + + ManifestFile manifest2 = writeManifestWithName("manifest-file-2.avro", FILE_C, FILE_D); + table.newAppend() + .appendManifest(manifest2) + .commit(); + + Snapshot snapshot = table.currentSnapshot(); + + Assert.assertEquals("Manifests should be merged into 1", 1, snapshot.manifests().size()); + Assert.assertFalse("Merged append manifest should be deleted", new File(manifest2.path()).exists()); + } + + @Test + public void testAppendManifestFailureWithSnapshotIdInheritance() throws IOException { + table.updateProperties() + .set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, "true") + .commit(); + + Assert.assertEquals("Table should start empty", 0, listManifestFiles().size()); + + TableMetadata base = readMetadata(); + Assert.assertNull("Should not have a current snapshot", base.currentSnapshot()); + + table.updateProperties() + .set(TableProperties.COMMIT_NUM_RETRIES, "1") + .commit(); + + table.ops().failCommits(5); + + ManifestFile manifest = writeManifest(FILE_A, FILE_B); + + AppendFiles append = table.newAppend(); + append.appendManifest(manifest); + + AssertHelpers.assertThrows("Should reject commit", + CommitFailedException.class, "Injected failure", + append::commit); + + Assert.assertTrue("Append manifest should not be deleted", new File(manifest.path()).exists()); + } + + @Test + public void testInvalidAppendManifest() throws IOException { + Assert.assertEquals("Table should start empty", 0, listManifestFiles().size()); + + TableMetadata base = readMetadata(); + Assert.assertNull("Should not have a current snapshot", base.currentSnapshot()); + + ManifestFile manifestWithExistingFiles = writeManifest( + "manifest-file-1.avro", + manifestEntry(Status.EXISTING, null, FILE_A)); + AssertHelpers.assertThrows("Should reject commit", + IllegalArgumentException.class, "Cannot append manifest with existing files", + () -> table.newAppend() + .appendManifest(manifestWithExistingFiles) + .commit()); + + ManifestFile manifestWithDeletedFiles = writeManifest( + "manifest-file-2.avro", + manifestEntry(Status.DELETED, null, FILE_A)); + AssertHelpers.assertThrows("Should reject commit", + IllegalArgumentException.class, "Cannot append manifest with deleted files", + () -> table.newAppend() + .appendManifest(manifestWithDeletedFiles) + .commit()); + } } diff --git a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java index 4aa244f4b467..c1bd7a895b4a 100644 --- a/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java +++ b/core/src/test/java/org/apache/iceberg/TestRewriteManifests.java @@ -20,23 +20,104 @@ package org.apache.iceberg; import com.google.common.collect.Iterables; +import java.io.File; import java.io.IOException; import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Map; +import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.expressions.Expressions; import org.junit.Assert; import org.junit.Test; -import static org.apache.iceberg.Files.localInput; import static org.apache.iceberg.TableProperties.MANIFEST_MERGE_ENABLED; +import static org.apache.iceberg.TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; public class TestRewriteManifests extends TableTestBase { + @Test + public void testRewriteManifestsAppendedDirectly() throws IOException { + Table table = load(); + + table.updateProperties().set(SNAPSHOT_ID_INHERITANCE_ENABLED, "true").commit(); + + ManifestFile newManifest = writeManifest( + "manifest-file-1.avro", + manifestEntry(ManifestEntry.Status.ADDED, null, FILE_A)); + + table.newFastAppend() + .appendManifest(newManifest) + .commit(); + long appendId = table.currentSnapshot().snapshotId(); + + Assert.assertEquals(1, table.currentSnapshot().manifests().size()); + + table.rewriteManifests() + .clusterBy(file -> "") + .commit(); + + List manifests = table.currentSnapshot().manifests(); + Assert.assertEquals(1, manifests.size()); + + validateManifestEntries(manifests.get(0), + ids(appendId), + files(FILE_A), + statuses(ManifestEntry.Status.EXISTING)); + } + + @Test + public void testRewriteManifestsGeneratedAndAppendedDirectly() throws IOException { + Table table = load(); + + table.updateProperties().set(SNAPSHOT_ID_INHERITANCE_ENABLED, "true").commit(); + + ManifestFile newManifest = writeManifest( + "manifest-file-1.avro", + manifestEntry(ManifestEntry.Status.ADDED, null, FILE_A)); + + table.newFastAppend() + .appendManifest(newManifest) + .commit(); + long manifestAppendId = table.currentSnapshot().snapshotId(); + + table.newFastAppend() + .appendFile(FILE_B) + .commit(); + long fileAppendId = table.currentSnapshot().snapshotId(); + + Assert.assertEquals(2, table.currentSnapshot().manifests().size()); + + table.rewriteManifests() + .clusterBy(file -> "") + .commit(); + + List manifests = table.currentSnapshot().manifests(); + Assert.assertEquals("Manifests must be merged into 1", 1, manifests.size()); + + // get the correct file order + List files; + List ids; + try (ManifestReader reader = ManifestReader.read(manifests.get(0), table.io())) { + if (reader.iterator().next().path().equals(FILE_A.path())) { + files = Arrays.asList(FILE_A, FILE_B); + ids = Arrays.asList(manifestAppendId, fileAppendId); + } else { + files = Arrays.asList(FILE_B, FILE_A); + ids = Arrays.asList(fileAppendId, manifestAppendId); + } + } + + validateManifestEntries(manifests.get(0), + ids.iterator(), + files.iterator(), + statuses(ManifestEntry.Status.EXISTING, ManifestEntry.Status.EXISTING)); + } + @Test public void testReplaceManifestsSeparate() { Table table = load(); @@ -95,7 +176,7 @@ public void testReplaceManifestsConsolidate() throws IOException { // get the file order correct List files; List ids; - try (ManifestReader reader = ManifestReader.read(localInput(manifests.get(0).path()))) { + try (ManifestReader reader = ManifestReader.read(manifests.get(0), table.io())) { if (reader.iterator().next().path().equals(FILE_A.path())) { files = Arrays.asList(FILE_A, FILE_B); ids = Arrays.asList(appendIdA, appendIdB); @@ -137,7 +218,7 @@ public void testReplaceManifestsWithFilter() throws IOException { table.rewriteManifests() .clusterBy(file -> "file") .rewriteIf(manifest -> { - try (ManifestReader reader = ManifestReader.read(localInput(manifest.path()))) { + try (ManifestReader reader = ManifestReader.read(manifest, table.io())) { return !reader.iterator().next().path().equals(FILE_A.path()); } catch (IOException x) { throw new RuntimeIOException(x); @@ -151,7 +232,7 @@ public void testReplaceManifestsWithFilter() throws IOException { // get the file order correct List files; List ids; - try (ManifestReader reader = ManifestReader.read(localInput(manifests.get(0).path()))) { + try (ManifestReader reader = ManifestReader.read(manifests.get(0), table.io())) { if (reader.iterator().next().path().equals(FILE_B.path())) { files = Arrays.asList(FILE_B, FILE_C); ids = Arrays.asList(appendIdB, appendIdC); @@ -221,7 +302,7 @@ public void testConcurrentRewriteManifest() throws IOException { table.rewriteManifests() .clusterBy(file -> "file") .rewriteIf(manifest -> { - try (ManifestReader reader = ManifestReader.read(localInput(manifest.path()))) { + try (ManifestReader reader = ManifestReader.read(manifest, table.io())) { return !reader.iterator().next().path().equals(FILE_A.path()); } catch (IOException x) { throw new RuntimeIOException(x); @@ -241,7 +322,7 @@ public void testConcurrentRewriteManifest() throws IOException { // get the file order correct List files; List ids; - try (ManifestReader reader = ManifestReader.read(localInput(manifests.get(0).path()))) { + try (ManifestReader reader = ManifestReader.read(manifests.get(0), table.io())) { if (reader.iterator().next().path().equals(FILE_A.path())) { files = Arrays.asList(FILE_A, FILE_B); ids = Arrays.asList(appendIdA, appendIdB); @@ -392,6 +473,73 @@ public void testBasicManifestReplacement() throws IOException { statuses(ManifestEntry.Status.ADDED, ManifestEntry.Status.ADDED)); } + @Test + public void testBasicManifestReplacementWithSnapshotIdInheritance() throws IOException { + Assert.assertNull("Table should be empty", table.currentSnapshot()); + + table.updateProperties() + .set(SNAPSHOT_ID_INHERITANCE_ENABLED, "true") + .commit(); + + table.newFastAppend() + .appendFile(FILE_A) + .appendFile(FILE_B) + .commit(); + + Snapshot firstSnapshot = table.currentSnapshot(); + List firstSnapshotManifests = firstSnapshot.manifests(); + Assert.assertEquals(1, firstSnapshotManifests.size()); + ManifestFile firstSnapshotManifest = firstSnapshotManifests.get(0); + + table.newFastAppend() + .appendFile(FILE_C) + .appendFile(FILE_D) + .commit(); + Snapshot secondSnapshot = table.currentSnapshot(); + + ManifestFile firstNewManifest = writeManifest( + "manifest-file-1.avro", + manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_A)); + ManifestFile secondNewManifest = writeManifest( + "manifest-file-2.avro", + manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_B)); + + RewriteManifests rewriteManifests = table.rewriteManifests(); + rewriteManifests.deleteManifest(firstSnapshotManifest); + rewriteManifests.addManifest(firstNewManifest); + rewriteManifests.addManifest(secondNewManifest); + rewriteManifests.commit(); + + Snapshot snapshot = table.currentSnapshot(); + List manifests = snapshot.manifests(); + Assert.assertEquals(3, manifests.size()); + + validateSummary(snapshot, 1, 1, 2, 0); + + validateManifestEntries( + manifests.get(0), + ids(firstSnapshot.snapshotId()), + files(FILE_A), + statuses(ManifestEntry.Status.EXISTING)); + + validateManifestEntries( + manifests.get(1), + ids(firstSnapshot.snapshotId()), + files(FILE_B), + statuses(ManifestEntry.Status.EXISTING)); + + validateManifestEntries( + manifests.get(2), + ids(secondSnapshot.snapshotId(), secondSnapshot.snapshotId()), + files(FILE_C, FILE_D), + statuses(ManifestEntry.Status.ADDED, ManifestEntry.Status.ADDED)); + + // validate that any subsequent operation does not fail + table.newDelete() + .deleteFromRowFilter(Expressions.alwaysTrue()) + .commit(); + } + @Test public void testWithMultiplePartitionSpec() throws IOException { Assert.assertNull("Table should be empty", table.currentSnapshot()); @@ -737,7 +885,7 @@ public void testManifestReplacementCombinedWithRewrite() throws IOException { .addManifest(newManifest) .clusterBy(dataFile -> "const-value") .rewriteIf(manifest -> { - try (ManifestReader reader = ManifestReader.read(localInput(manifest.path()))) { + try (ManifestReader reader = ManifestReader.read(manifest, table.io())) { return !reader.iterator().next().path().equals(FILE_B.path()); } catch (IOException x) { throw new RuntimeIOException(x); @@ -847,7 +995,7 @@ public void testInvalidUsage() throws IOException { manifestEntry(ManifestEntry.Status.ADDED, snapshot.snapshotId(), FILE_A)); AssertHelpers.assertThrows("Should reject commit", - IllegalArgumentException.class, "Cannot append manifest: Invalid manifest", + IllegalArgumentException.class, "Cannot add manifest with added files", () -> table.rewriteManifests() .deleteManifest(manifest) .addManifest(invalidAddedFileManifest) @@ -858,7 +1006,7 @@ public void testInvalidUsage() throws IOException { manifestEntry(ManifestEntry.Status.DELETED, snapshot.snapshotId(), FILE_A)); AssertHelpers.assertThrows("Should reject commit", - IllegalArgumentException.class, "Cannot append manifest: Invalid manifest", + IllegalArgumentException.class, "Cannot add manifest with deleted files", () -> table.rewriteManifests() .deleteManifest(manifest) .addManifest(invalidDeletedFileManifest) @@ -871,6 +1019,98 @@ public void testInvalidUsage() throws IOException { .commit()); } + @Test + public void testManifestReplacementFailure() throws IOException { + Assert.assertNull("Table should be empty", table.currentSnapshot()); + + table.newFastAppend() + .appendFile(FILE_A) + .commit(); + + Snapshot firstSnapshot = table.currentSnapshot(); + List firstSnapshotManifests = firstSnapshot.manifests(); + Assert.assertEquals(1, firstSnapshotManifests.size()); + ManifestFile firstSnapshotManifest = firstSnapshotManifests.get(0); + + table.newFastAppend() + .appendFile(FILE_B) + .commit(); + + Snapshot secondSnapshot = table.currentSnapshot(); + List secondSnapshotManifests = secondSnapshot.manifests(); + Assert.assertEquals(2, secondSnapshotManifests.size()); + ManifestFile secondSnapshotManifest = secondSnapshotManifests.get(0); + + ManifestFile newManifest = writeManifest( + "manifest-file.avro", + manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_A), + manifestEntry(ManifestEntry.Status.EXISTING, secondSnapshot.snapshotId(), FILE_B)); + + table.updateProperties() + .set(TableProperties.COMMIT_NUM_RETRIES, "1") + .commit(); + + table.ops().failCommits(5); + + RewriteManifests rewriteManifests = table.rewriteManifests(); + rewriteManifests.deleteManifest(firstSnapshotManifest); + rewriteManifests.deleteManifest(secondSnapshotManifest); + rewriteManifests.addManifest(newManifest); + + AssertHelpers.assertThrows("Should reject commit", + CommitFailedException.class, "Injected failure", + rewriteManifests::commit); + + Assert.assertTrue("New manifest should not be deleted", new File(newManifest.path()).exists()); + } + + @Test + public void testManifestReplacementFailureWithSnapshotIdInheritance() throws IOException { + Assert.assertNull("Table should be empty", table.currentSnapshot()); + + table.updateProperties().set(SNAPSHOT_ID_INHERITANCE_ENABLED, "true").commit(); + + table.newFastAppend() + .appendFile(FILE_A) + .commit(); + + Snapshot firstSnapshot = table.currentSnapshot(); + List firstSnapshotManifests = firstSnapshot.manifests(); + Assert.assertEquals(1, firstSnapshotManifests.size()); + ManifestFile firstSnapshotManifest = firstSnapshotManifests.get(0); + + table.newFastAppend() + .appendFile(FILE_B) + .commit(); + + Snapshot secondSnapshot = table.currentSnapshot(); + List secondSnapshotManifests = secondSnapshot.manifests(); + Assert.assertEquals(2, secondSnapshotManifests.size()); + ManifestFile secondSnapshotManifest = secondSnapshotManifests.get(0); + + ManifestFile newManifest = writeManifest( + "manifest-file.avro", + manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshot.snapshotId(), FILE_A), + manifestEntry(ManifestEntry.Status.EXISTING, secondSnapshot.snapshotId(), FILE_B)); + + table.updateProperties() + .set(TableProperties.COMMIT_NUM_RETRIES, "1") + .commit(); + + table.ops().failCommits(5); + + RewriteManifests rewriteManifests = table.rewriteManifests(); + rewriteManifests.deleteManifest(firstSnapshotManifest); + rewriteManifests.deleteManifest(secondSnapshotManifest); + rewriteManifests.addManifest(newManifest); + + AssertHelpers.assertThrows("Should reject commit", + CommitFailedException.class, "Injected failure", + rewriteManifests::commit); + + Assert.assertTrue("New manifest should not be deleted", new File(newManifest.path()).exists()); + } + private void validateSummary(Snapshot snapshot, int replaced, int kept, int created, int entryCount) { Map summary = snapshot.summary(); Assert.assertEquals( diff --git a/core/src/test/java/org/apache/iceberg/TestTransaction.java b/core/src/test/java/org/apache/iceberg/TestTransaction.java index a920f28c90c6..13a298b37a16 100644 --- a/core/src/test/java/org/apache/iceberg/TestTransaction.java +++ b/core/src/test/java/org/apache/iceberg/TestTransaction.java @@ -22,6 +22,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import java.io.File; +import java.io.IOException; import java.util.List; import java.util.Set; import java.util.UUID; @@ -536,6 +537,63 @@ public void testTransactionRetryAndAppendManifests() throws Exception { 1, table.currentSnapshot().manifests().size()); } + @Test + public void testTransactionRetryAndAppendManifestsWithSnapshotIdInheritance() throws Exception { + // use only one retry and aggressively merge manifests + table.updateProperties() + .set(TableProperties.COMMIT_NUM_RETRIES, "1") + .set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "0") + .set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, "true") + .commit(); + + Assert.assertEquals("Table should be on version 1", 1, (int) version()); + + table.newAppend() + .appendFile(FILE_A) + .appendFile(FILE_B) + .commit(); + + Assert.assertEquals("Table should be on version 2 after append", 2, (int) version()); + Assert.assertEquals("Append should create one manifest", 1, table.currentSnapshot().manifests().size()); + + TableMetadata base = readMetadata(); + + Transaction txn = table.newTransaction(); + + ManifestFile appendManifest = writeManifestWithName("input.m0", FILE_D); + txn.newAppend() + .appendManifest(appendManifest) + .commit(); + + Assert.assertSame("Base metadata should not change when commit is created", base, readMetadata()); + Assert.assertEquals("Table should be on version 2 after txn create", 2, (int) version()); + + Assert.assertEquals("Append should have one merged manifest", 1, txn.table().currentSnapshot().manifests().size()); + ManifestFile mergedManifest = txn.table().currentSnapshot().manifests().get(0); + + // cause the transaction commit to fail and retry + table.newAppend() + .appendFile(FILE_C) + .commit(); + + Assert.assertEquals("Table should be on version 3 after real append", 3, (int) version()); + + txn.commitTransaction(); + + Assert.assertEquals("Table should be on version 4 after commit", 4, (int) version()); + + Assert.assertTrue("Transaction should hijack the delete of the original append manifest", + ((BaseTransaction) txn).deletedFiles().contains(appendManifest.path())); + Assert.assertFalse("Append manifest should be deleted", new File(appendManifest.path()).exists()); + + Assert.assertTrue("Transaction should hijack the delete of the first merged manifest", + ((BaseTransaction) txn).deletedFiles().contains(mergedManifest.path())); + Assert.assertFalse("Merged append manifest should be deleted", new File(mergedManifest.path()).exists()); + + Assert.assertEquals("Should merge all commit manifests into a single manifest", + 1, table.currentSnapshot().manifests().size()); + } + @Test public void testTransactionNoCustomDeleteFunc() { AssertHelpers.assertThrows("Should fail setting a custom delete function with a transaction", @@ -568,4 +626,63 @@ public void testTransactionFastAppends() { List manifests = table.currentSnapshot().manifests(); Assert.assertEquals("Expected 2 manifests", 2, manifests.size()); } + + @Test + public void testTransactionRewriteManifestsAppendedDirectly() throws IOException { + Table table = load(); + + table.updateProperties() + .set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, "true") + .set(TableProperties.MANIFEST_MIN_MERGE_COUNT, "0") + .commit(); + + table.newFastAppend() + .appendFile(FILE_A) + .commit(); + long firstSnapshotId = table.currentSnapshot().snapshotId(); + + table.newFastAppend() + .appendFile(FILE_B) + .commit(); + long secondSnapshotId = table.currentSnapshot().snapshotId(); + + List manifests = table.currentSnapshot().manifests(); + Assert.assertEquals("Should have 2 manifests after 2 appends", 2, manifests.size()); + + ManifestFile newManifest = writeManifest( + "manifest-file-1.avro", + manifestEntry(ManifestEntry.Status.EXISTING, firstSnapshotId, FILE_A), + manifestEntry(ManifestEntry.Status.EXISTING, secondSnapshotId, FILE_B)); + + Transaction txn = table.newTransaction(); + txn.rewriteManifests() + .deleteManifest(manifests.get(0)) + .deleteManifest(manifests.get(1)) + .addManifest(newManifest) + .commit(); + txn.newAppend() + .appendFile(FILE_C) + .commit(); + txn.commitTransaction(); + + long finalSnapshotId = table.currentSnapshot().snapshotId(); + long finalSnapshotTimestamp = System.currentTimeMillis(); + + Assert.assertTrue("Append manifest should not be deleted", new File(newManifest.path()).exists()); + + List finalManifests = table.currentSnapshot().manifests(); + Assert.assertEquals("Should have 1 final manifest", 1, finalManifests.size()); + + validateManifestEntries(finalManifests.get(0), + ids(finalSnapshotId, firstSnapshotId, secondSnapshotId), + files(FILE_C, FILE_A, FILE_B), + statuses(ManifestEntry.Status.ADDED, ManifestEntry.Status.EXISTING, ManifestEntry.Status.EXISTING)); + + table.expireSnapshots() + .expireOlderThan(finalSnapshotTimestamp + 1) + .retainLast(1) + .commit(); + + Assert.assertFalse("Append manifest should be deleted on expiry", new File(newManifest.path()).exists()); + } } diff --git a/site/docs/configuration.md b/site/docs/configuration.md index f98183a95721..ac007df5ec80 100644 --- a/site/docs/configuration.md +++ b/site/docs/configuration.md @@ -60,6 +60,11 @@ Iceberg tables support table properties to configure table behavior, like the de | commit.manifest.min-count-to-merge | 100 | Minimum number of manifests to accumulate before merging | | commit.manifest-merge.enabled | true | Controls whether to automatically merge manifests on writes | +### Compatibility flags + +| Property | Default | Description | +| --------------------------------------------- | -------- | ------------------------------------------------------------- | +| compatibility.snapshot-id-inheritance.enabled | false | Enables committing snapshots without explicit snapshot IDs | ## Spark options diff --git a/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala b/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala index d5279ffce777..c14bb6a58bb6 100644 --- a/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala +++ b/spark/src/main/scala/org/apache/iceberg/spark/SparkTableUtil.scala @@ -25,14 +25,15 @@ import java.util import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.iceberg.{DataFile, DataFiles, FileFormat, ManifestFile, ManifestWriter} -import org.apache.iceberg.{Metrics, MetricsConfig, PartitionSpec, Table} +import org.apache.iceberg.{Metrics, MetricsConfig, PartitionSpec, Table, TableProperties} import org.apache.iceberg.exceptions.NoSuchTableException import org.apache.iceberg.hadoop.{HadoopFileIO, HadoopInputFile, SerializableConfiguration} import org.apache.iceberg.orc.OrcMetrics import org.apache.iceberg.parquet.ParquetUtil +import org.apache.iceberg.util.PropertyUtil import org.apache.parquet.hadoop.ParquetFileReader import org.apache.spark.TaskContext -import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.{DataFrame, Encoder, Encoders, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition} @@ -420,7 +421,7 @@ object SparkTableUtil { private def buildManifest( conf: SerializableConfiguration, spec: PartitionSpec, - basePath: String): Iterator[SparkDataFile] => Iterator[Manifest] = { files => + basePath: String): Iterator[SparkDataFile] => Iterator[ManifestFile] = { files => if (files.hasNext) { val io = new HadoopFileIO(conf.get()) val ctx = TaskContext.get() @@ -436,40 +437,12 @@ object SparkTableUtil { } val manifestFile = writer.toManifestFile - Seq(Manifest(manifestFile.path, manifestFile.length, manifestFile.partitionSpecId)).iterator + Seq(manifestFile).iterator } else { Seq.empty.iterator } } - private case class Manifest(location: String, fileLength: Long, specId: Int) { - def toManifestFile: ManifestFile = new ManifestFile { - override def path: String = location - - override def length: Long = fileLength - - override def partitionSpecId: Int = specId - - override def snapshotId: java.lang.Long = null - - override def addedFilesCount: Integer = null - - override def addedRowsCount(): java.lang.Long = null - - override def existingFilesCount: Integer = null - - override def existingRowsCount(): java.lang.Long = null - - override def deletedFilesCount: Integer = null - - override def deletedRowsCount(): java.lang.Long = null - - override def partitions: java.util.List[ManifestFile.PartitionFieldSummary] = null - - override def copy: ManifestFile = this - } - } - /** * Import files from an existing Spark table to an Iceberg table. * @@ -542,6 +515,8 @@ object SparkTableUtil { spec: PartitionSpec, stagingDir: String): Unit = { + implicit val manifestFileEncoder: Encoder[ManifestFile] = Encoders.javaSerialization[ManifestFile] + import spark.implicits._ val conf = spark.sessionState.newHadoopConf() @@ -559,14 +534,24 @@ object SparkTableUtil { .collect() try { + val snapshotIdInheritanceEnabled = PropertyUtil.propertyAsBoolean( + targetTable.properties, + TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, + TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED_DEFAULT) + val append = targetTable.newAppend() - manifests.foreach(manifest => append.appendManifest(manifest.toManifestFile)) + manifests.foreach(manifest => append.appendManifest(manifest)) append.commit() - } finally { - val io = new HadoopFileIO(conf) - manifests.foreach { manifest => - Try(io.deleteFile(manifest.location)) + + if (!snapshotIdInheritanceEnabled) { + // delete original manifests as they were rewritten before the commit + manifests.foreach(manifest => Try(targetTable.io.deleteFile(manifest.path))) } + } catch { + case e: Throwable => + // always clean up created manifests if the append fails + manifests.foreach(manifest => Try(targetTable.io.deleteFile(manifest.path))) + throw e; } } } diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java index bd4175ea4243..dd3b6516ddd0 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHadoopTables.java @@ -32,17 +32,20 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.spark.SparkTableUtil; import org.apache.iceberg.spark.data.TestHelpers; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.TableIdentifier; import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; @@ -168,6 +171,58 @@ public void testFilesTable() throws Exception { TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), expected.get(0), actual.get(0)); } + @Test + public void testFilesTableWithSnapshotIdInheritance() throws Exception { + Table table = TABLES.create(SCHEMA, PartitionSpec.builderFor(SCHEMA).identity("id").build(), tableLocation); + + table.updateProperties() + .set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, "true") + .commit(); + + Table entriesTable = TABLES.load(tableLocation + "#entries"); + Table filesTable = TABLES.load(tableLocation + "#files"); + + List records = Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b") + ); + + try { + Dataset inputDF = spark.createDataFrame(records, SimpleRecord.class); + inputDF.select("id", "data").write() + .format("parquet") + .mode("append") + .partitionBy("id") + .saveAsTable("parquet_table"); + + String stagingLocation = table.location() + "/metadata"; + SparkTableUtil.importSparkTable(spark, new TableIdentifier("parquet_table"), table, stagingLocation); + + List actual = spark.read() + .format("iceberg") + .load(tableLocation + "#files") + .collectAsList(); + + List expected = Lists.newArrayList(); + for (ManifestFile manifest : table.currentSnapshot().manifests()) { + InputFile in = table.io().newInputFile(manifest.path()); + try (CloseableIterable rows = Avro.read(in).project(entriesTable.schema()).build()) { + for (GenericData.Record record : rows) { + expected.add((GenericData.Record) record.get("data_file")); + } + } + } + + Assert.assertEquals("Files table should have one row", 2, expected.size()); + Assert.assertEquals("Actual results should have one row", 2, actual.size()); + TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), expected.get(0), actual.get(0)); + TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), expected.get(1), actual.get(1)); + + } finally { + spark.sql("DROP TABLE parquet_table"); + } + } + @Test public void testFilesUnpartitionedTable() throws Exception { Table table = TABLES.create(SCHEMA, tableLocation); diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java index 042bce9dd7c6..a381f66b6812 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestIcebergSourceHiveTables.java @@ -33,6 +33,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.avro.Avro; import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.catalog.TableIdentifier; @@ -42,6 +43,7 @@ import org.apache.iceberg.hive.TestHiveMetastore; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.spark.SparkTableUtil; import org.apache.iceberg.spark.data.TestHelpers; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; @@ -232,6 +234,65 @@ public synchronized void testHiveFilesTable() throws Exception { } } + @Test + public synchronized void testHiveFilesTableWithSnapshotIdInheritance() throws Exception { + TableIdentifier tableIdentifier = TableIdentifier.of("db", "files_inheritance_test"); + try { + PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("id").build(); + Table table = catalog.createTable(tableIdentifier, SCHEMA, spec); + + table.updateProperties() + .set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, "true") + .commit(); + + Table entriesTable = catalog.loadTable(TableIdentifier.of("db", "files_inheritance_test", "entries")); + Table filesTable = catalog.loadTable(TableIdentifier.of("db", "files_inheritance_test", "files")); + + List records = Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b") + ); + + Dataset inputDF = spark.createDataFrame(records, SimpleRecord.class); + inputDF.select("id", "data").write() + .format("parquet") + .mode("append") + .partitionBy("id") + .saveAsTable("parquet_table"); + + String stagingLocation = table.location() + "/metadata"; + SparkTableUtil.importSparkTable( + spark, new org.apache.spark.sql.catalyst.TableIdentifier("parquet_table"), table, stagingLocation); + + List actual = spark.read() + .format("iceberg") + .load(tableIdentifier + ".files") + .collectAsList(); + + List expected = Lists.newArrayList(); + for (ManifestFile manifest : table.currentSnapshot().manifests()) { + InputFile in = table.io().newInputFile(manifest.path()); + try (CloseableIterable rows = Avro.read(in).project(entriesTable.schema()).build()) { + for (GenericData.Record record : rows) { + expected.add((GenericData.Record) record.get("data_file")); + } + } + } + + Assert.assertEquals("Files table should have one row", 2, expected.size()); + Assert.assertEquals("Actual results should have one row", 2, actual.size()); + TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), expected.get(0), actual.get(0)); + TestHelpers.assertEqualsSafe(filesTable.schema().asStruct(), expected.get(1), actual.get(1)); + + } finally { + spark.sql("DROP TABLE parquet_table"); + clients.run(client -> { + client.dropTable(tableIdentifier.namespace().level(0), tableIdentifier.name()); + return null; + }); + } + } + @Test public synchronized void testHiveFilesUnpartitionedTable() throws Exception { TableIdentifier tableIdentifier = TableIdentifier.of("db", "unpartitioned_files_test"); diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java index 5357187393aa..7fa335a13be8 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestParquetWrite.java @@ -44,7 +44,6 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; -import static org.apache.iceberg.Files.localInput; import static org.apache.iceberg.types.Types.NestedField.optional; public class TestParquetWrite { @@ -104,7 +103,7 @@ public void testBasicWrite() throws IOException { Assert.assertEquals("Number of rows should match", expected.size(), actual.size()); Assert.assertEquals("Result rows should match", expected, actual); for (ManifestFile manifest : table.currentSnapshot().manifests()) { - for (DataFile file : ManifestReader.read(localInput(manifest.path()), null)) { + for (DataFile file : ManifestReader.read(manifest, table.io())) { Assert.assertNotNull("Split offsets not present", file.splitOffsets()); Assert.assertEquals("Should have reported record count as 1", 1, file.recordCount()); Assert.assertNotNull("Column sizes metric not present", file.columnSizes()); @@ -286,7 +285,7 @@ public void testUnpartitionedCreateWithTargetFileSizeViaTableProperties() throws List files = Lists.newArrayList(); for (ManifestFile manifest : table.currentSnapshot().manifests()) { - for (DataFile file : ManifestReader.read(localInput(manifest.path()), null)) { + for (DataFile file : ManifestReader.read(manifest, table.io())) { files.add(file); } } @@ -331,7 +330,7 @@ public void testPartitionedCreateWithTargetFileSizeViaOption() throws IOExceptio List files = Lists.newArrayList(); for (ManifestFile manifest : table.currentSnapshot().manifests()) { - for (DataFile file : ManifestReader.read(localInput(manifest.path()), null)) { + for (DataFile file : ManifestReader.read(manifest, table.io())) { files.add(file); } } diff --git a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtilWithInMemoryCatalog.java b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtilWithInMemoryCatalog.java index 68bf3ccfc27f..80918e727699 100644 --- a/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtilWithInMemoryCatalog.java +++ b/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkTableUtilWithInMemoryCatalog.java @@ -231,6 +231,51 @@ public void testImportPartitions() throws IOException { } } + @Test + public void testImportPartitionsWithSnapshotInheritance() throws IOException { + Table table = TABLES.create(SCHEMA, SPEC, tableLocation); + + table.updateProperties() + .set(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, "true") + .commit(); + + List records = Lists.newArrayList( + new SimpleRecord(1, "a"), + new SimpleRecord(2, "b"), + new SimpleRecord(3, "c") + ); + + File parquetTableDir = temp.newFolder("parquet_table"); + String parquetTableLocation = parquetTableDir.toURI().toString(); + + try { + Dataset inputDF = spark.createDataFrame(records, SimpleRecord.class); + inputDF.select("id", "data").write() + .format("parquet") + .mode("append") + .option("path", parquetTableLocation) + .partitionBy("data") + .saveAsTable("parquet_table"); + + File stagingDir = temp.newFolder("staging-dir"); + Seq partitions = SparkTableUtil.getPartitionsByFilter(spark, "parquet_table", "data = 'a'"); + SparkTableUtil.importSparkPartitions(spark, partitions, table, table.spec(), stagingDir.toString()); + + List expectedRecords = Lists.newArrayList(new SimpleRecord(1, "a")); + + List actualRecords = spark.read() + .format("iceberg") + .load(tableLocation) + .orderBy("id") + .as(Encoders.bean(SimpleRecord.class)) + .collectAsList(); + + Assert.assertEquals("Result rows should match", expectedRecords, actualRecords); + } finally { + spark.sql("DROP TABLE parquet_table"); + } + } + private void checkFieldMetrics(Dataset fileDF, Types.NestedField field, boolean isNull) { List metricRows = fileDF .selectExpr(