From fc0abd4a0568d913ac1918fcc80d05ab6147310e Mon Sep 17 00:00:00 2001 From: Aitozi Date: Wed, 3 Apr 2024 19:43:07 +0800 Subject: [PATCH 1/2] [core] support decouple the delta files lifecycle --- .../org/apache/paimon/AbstractFileStore.java | 16 +- .../java/org/apache/paimon/FileStore.java | 5 +- .../paimon/append/AppendOnlyWriter.java | 3 +- .../org/apache/paimon/io/DataFileMeta.java | 51 ++++-- .../paimon/io/DataFileMetaSerializer.java | 7 +- .../paimon/io/KeyValueDataFileWriter.java | 9 +- .../paimon/io/KeyValueFileWriterFactory.java | 14 +- .../apache/paimon/io/RowDataFileWriter.java | 9 +- .../paimon/io/RowDataRollingFileWriter.java | 6 +- .../apache/paimon/manifest/FileSource.java | 51 ++++++ .../manifest/ManifestEntrySerializer.java | 1 + .../paimon/mergetree/MergeTreeWriter.java | 2 +- .../compact/ChangelogMergeTreeRewriter.java | 3 +- .../compact/MergeTreeCompactRewriter.java | 2 +- .../apache/paimon/migrate/FileMetaUtils.java | 4 +- .../operation/AppendOnlyFileStoreWrite.java | 4 +- .../paimon/operation/ChangelogDeletion.java | 117 +++++++++++++ .../paimon/operation/FileDeletionBase.java | 139 +++++++++++++++- .../paimon/operation/OrphanFilesClean.java | 75 +++++++-- .../paimon/operation/SnapshotDeletion.java | 155 +++-------------- .../apache/paimon/operation/TagDeletion.java | 4 +- .../paimon/schema/SchemaValidation.java | 38 +++-- .../paimon/table/AbstractFileStoreTable.java | 52 ++---- .../paimon/table/ExpireChangelogImpl.java | 81 ++++----- .../apache/paimon/table/ExpireSnapshots.java | 74 ++++++++- .../paimon/table/ExpireSnapshotsImpl.java | 41 ++--- .../apache/paimon/table/ReadonlyTable.java | 11 +- .../apache/paimon/table/RollbackHelper.java | 25 ++- .../java/org/apache/paimon/table/Table.java | 6 +- .../table/source/AbstractInnerTableScan.java | 3 - .../source/InnerStreamTableScanImpl.java | 5 +- ...ContinuousFromSnapshotStartingScanner.java | 9 +- ...ontinuousFromTimestampStartingScanner.java | 7 +- .../paimon/utils/NextSnapshotFetcher.java | 12 +- .../java/org/apache/paimon/TestFileStore.java | 156 +++++++++++++----- ...endOnlyTableCompactionCoordinatorTest.java | 1 + .../paimon/append/AppendOnlyWriterTest.java | 4 +- .../crosspartition/IndexBootstrapTest.java | 1 + .../paimon/io/DataFileTestDataGenerator.java | 1 + .../apache/paimon/io/DataFileTestUtils.java | 3 + .../paimon/io/KeyValueFileReadWriteTest.java | 11 +- .../paimon/io/RollingFileWriterTest.java | 3 +- .../ManifestCommittableSerializerTest.java | 1 + .../manifest/ManifestFileMetaTestBase.java | 5 +- .../paimon/mergetree/ContainsLevelsTest.java | 2 +- .../apache/paimon/mergetree/LevelsTest.java | 1 + .../paimon/mergetree/LookupLevelsTest.java | 2 +- .../paimon/mergetree/MergeTreeTestBase.java | 2 +- .../compact/IntervalPartitionTest.java | 1 + .../compact/UniversalCompactionTest.java | 4 +- .../paimon/operation/ExpireSnapshotsTest.java | 35 ++-- .../operation/OrphanFilesCleanTest.java | 10 +- .../table/IndexFileExpireTableTest.java | 13 +- .../table/PrimaryKeyFileStoreTableTest.java | 12 +- .../table/source/SplitGeneratorTest.java | 4 +- ...nuousFromTimestampStartingScannerTest.java | 23 +-- .../procedure/ExpireSnapshotsProcedure.java | 17 +- .../flink/ContinuousFileStoreITCase.java | 12 +- .../ExpireSnapshotsProcedureITCase.java | 73 ++++++++ .../CompactionTaskSimpleSerializerTest.java | 4 +- .../FileStoreSourceSplitGeneratorTest.java | 5 +- .../FileStoreSourceSplitSerializerTest.java | 4 +- .../procedure/ExpireSnapshotsProcedure.java | 42 ++++- .../apache/paimon/spark/ScanHelperTest.scala | 3 +- .../ExpireSnapshotsProcedureTest.scala | 3 +- 65 files changed, 1026 insertions(+), 473 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/manifest/FileSource.java create mode 100644 paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java create mode 100644 paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedureITCase.java diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index 07a73d3de2aa..a164849cb2d3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -28,6 +28,7 @@ import org.apache.paimon.manifest.ManifestList; import org.apache.paimon.metastore.AddPartitionTagCallback; import org.apache.paimon.metastore.MetastoreClient; +import org.apache.paimon.operation.ChangelogDeletion; import org.apache.paimon.operation.FileStoreCommitImpl; import org.apache.paimon.operation.PartitionExpire; import org.apache.paimon.operation.SnapshotDeletion; @@ -201,8 +202,21 @@ public FileStoreCommitImpl newCommit(String commitUser, String branchName) { } @Override - public SnapshotDeletion newSnapshotDeletion() { + public SnapshotDeletion newSnapshotDeletion(CoreOptions options) { return new SnapshotDeletion( + fileIO, + pathFactory(), + manifestFileFactory().create(), + manifestListFactory().create(), + newIndexFileHandler(), + newStatsFileHandler(), + options.changelogLifecycleDecoupled(), + options.changelogProducer() != CoreOptions.ChangelogProducer.NONE); + } + + @Override + public ChangelogDeletion newChangelogDeletion(CoreOptions options) { + return new ChangelogDeletion( fileIO, pathFactory(), manifestFileFactory().create(), diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java b/paimon-core/src/main/java/org/apache/paimon/FileStore.java index 870feffdef68..5955499b86e7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java @@ -22,6 +22,7 @@ import org.apache.paimon.manifest.ManifestCacheFilter; import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestList; +import org.apache.paimon.operation.ChangelogDeletion; import org.apache.paimon.operation.FileStoreCommit; import org.apache.paimon.operation.FileStoreScan; import org.apache.paimon.operation.FileStoreWrite; @@ -83,7 +84,9 @@ public interface FileStore extends Serializable { FileStoreCommit newCommit(String commitUser, String branchName); - SnapshotDeletion newSnapshotDeletion(); + SnapshotDeletion newSnapshotDeletion(CoreOptions options); + + ChangelogDeletion newChangelogDeletion(CoreOptions options); TagManager newTagManager(); diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index 0ae21e6b28ee..b193802de1e0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -251,7 +251,8 @@ private RowDataRollingFileWriter createRollingRowWriter() { seqNumCounter, fileCompression, statsCollectors, - fileIndexOptions); + fileIndexOptions, + false); } private void trySyncLatestCompaction(boolean blocking) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java index fdf7266bd51e..5acb9b417996 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java @@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.Timestamp; import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.stats.BinaryTableStats; import org.apache.paimon.stats.FieldStatsArraySerializer; import org.apache.paimon.types.ArrayType; @@ -31,12 +32,14 @@ import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowType; +import org.apache.paimon.types.TinyIntType; import javax.annotation.Nullable; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Objects; @@ -85,6 +88,7 @@ public class DataFileMeta { // file index filter bytes, if it is small, store in data file meta private final @Nullable byte[] embeddedIndex; + private final @Nullable FileSource fileSource; public static DataFileMeta forAppend( String fileName, @@ -93,7 +97,8 @@ public static DataFileMeta forAppend( BinaryTableStats rowStats, long minSequenceNumber, long maxSequenceNumber, - long schemaId) { + long schemaId, + FileSource fileSource) { return forAppend( fileName, fileSize, @@ -103,7 +108,8 @@ public static DataFileMeta forAppend( maxSequenceNumber, schemaId, Collections.emptyList(), - null); + null, + fileSource); } public static DataFileMeta forAppend( @@ -115,7 +121,8 @@ public static DataFileMeta forAppend( long maxSequenceNumber, long schemaId, List extraFiles, - @Nullable byte[] embeddedIndex) { + @Nullable byte[] embeddedIndex, + @Nullable FileSource fileSource) { return new DataFileMeta( fileName, fileSize, @@ -131,7 +138,8 @@ public static DataFileMeta forAppend( extraFiles, Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(), 0L, - embeddedIndex); + embeddedIndex, + fileSource); } public DataFileMeta( @@ -147,7 +155,8 @@ public DataFileMeta( long schemaId, int level, @Nullable Long deleteRowCount, - @Nullable byte[] embeddedIndex) { + @Nullable byte[] embeddedIndex, + @Nullable FileSource fileSource) { this( fileName, fileSize, @@ -163,7 +172,8 @@ public DataFileMeta( Collections.emptyList(), Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(), deleteRowCount, - embeddedIndex); + embeddedIndex, + fileSource); } public DataFileMeta( @@ -181,7 +191,8 @@ public DataFileMeta( List extraFiles, Timestamp creationTime, @Nullable Long deleteRowCount, - @Nullable byte[] embeddedIndex) { + @Nullable byte[] embeddedIndex, + @Nullable FileSource fileSource) { this.fileName = fileName; this.fileSize = fileSize; @@ -201,6 +212,7 @@ public DataFileMeta( this.creationTime = creationTime; this.deleteRowCount = deleteRowCount; + this.fileSource = fileSource; } public String fileName() { @@ -223,6 +235,10 @@ public Optional deleteRowCount() { return Optional.ofNullable(deleteRowCount); } + public Optional fileSource() { + return Optional.ofNullable(fileSource); + } + public byte[] embeddedIndex() { return embeddedIndex; } @@ -313,7 +329,8 @@ public DataFileMeta upgrade(int newLevel) { extraFiles, creationTime, deleteRowCount, - embeddedIndex); + embeddedIndex, + fileSource); } public List collectFiles(DataFilePathFactory pathFactory) { @@ -339,7 +356,8 @@ public DataFileMeta copy(List newExtraFiles) { newExtraFiles, creationTime, deleteRowCount, - embeddedIndex); + embeddedIndex, + fileSource); } @Override @@ -365,7 +383,8 @@ public boolean equals(Object o) { && level == that.level && Objects.equals(extraFiles, that.extraFiles) && Objects.equals(creationTime, that.creationTime) - && Objects.equals(deleteRowCount, that.deleteRowCount); + && Objects.equals(deleteRowCount, that.deleteRowCount) + && Objects.equals(fileSource, that.fileSource); } @Override @@ -374,7 +393,7 @@ public int hashCode() { fileName, fileSize, rowCount, - embeddedIndex, + Arrays.hashCode(embeddedIndex), minKey, maxKey, keyStats, @@ -385,7 +404,8 @@ public int hashCode() { level, extraFiles, creationTime, - deleteRowCount); + deleteRowCount, + fileSource); } @Override @@ -394,11 +414,10 @@ public String toString() { "{fileName: %s, fileSize: %d, rowCount: %d, " + "minKey: %s, maxKey: %s, keyStats: %s, valueStats: %s, " + "minSequenceNumber: %d, maxSequenceNumber: %d, " - + "schemaId: %d, level: %d, extraFiles: %s, creationTime: %s, deleteRowCount: %d}", + + "schemaId: %d, level: %d, extraFiles: %s, creationTime: %s, deleteRowCount: %d, fileSource: %s}", fileName, fileSize, rowCount, - embeddedIndex, minKey, maxKey, keyStats, @@ -409,7 +428,8 @@ public String toString() { level, extraFiles, creationTime, - deleteRowCount); + deleteRowCount, + fileSource); } public static RowType schema() { @@ -429,6 +449,7 @@ public static RowType schema() { fields.add(new DataField(12, "_CREATION_TIME", DataTypes.TIMESTAMP_MILLIS())); fields.add(new DataField(13, "_DELETE_ROW_COUNT", new BigIntType(true))); fields.add(new DataField(14, "_EMBEDDED_FILE_INDEX", newBytesType(true))); + fields.add(new DataField(15, "_FILE_SOURCE", new TinyIntType(true))); return new RowType(fields); } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java index 3de1fcac9f4a..fb2b397760e5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java @@ -21,6 +21,7 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.stats.BinaryTableStats; import org.apache.paimon.utils.ObjectSerializer; @@ -55,7 +56,8 @@ public InternalRow toRow(DataFileMeta meta) { toStringArrayData(meta.extraFiles()), meta.creationTime(), meta.deleteRowCount().orElse(null), - meta.embeddedIndex()); + meta.embeddedIndex(), + meta.fileSource().map(FileSource::toByteValue).orElse(null)); } @Override @@ -75,6 +77,7 @@ public DataFileMeta fromRow(InternalRow row) { fromStringArrayData(row.getArray(11)), row.getTimestamp(12, 3), row.isNullAt(13) ? null : row.getLong(13), - row.isNullAt(14) ? null : row.getBinary(14)); + row.isNullAt(14) ? null : row.getBinary(14), + row.isNullAt(15) ? null : FileSource.fromByteValue(row.getByte(15))); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java index 0f1223ccac45..e6950d715e56 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java @@ -28,6 +28,7 @@ import org.apache.paimon.format.TableStatsExtractor; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.stats.BinaryTableStats; import org.apache.paimon.stats.FieldStatsArraySerializer; import org.apache.paimon.types.RowType; @@ -68,6 +69,7 @@ public class KeyValueDataFileWriter private long minSeqNumber = Long.MAX_VALUE; private long maxSeqNumber = Long.MIN_VALUE; private long deleteRecordCount = 0; + private final boolean isCompact; public KeyValueDataFileWriter( FileIO fileIO, @@ -80,7 +82,8 @@ public KeyValueDataFileWriter( long schemaId, int level, String compression, - CoreOptions options) { + CoreOptions options, + boolean isCompact) { super( fileIO, factory, @@ -100,6 +103,7 @@ public KeyValueDataFileWriter( this.keyStatsConverter = new FieldStatsArraySerializer(keyType); this.valueStatsConverter = new FieldStatsArraySerializer(valueType); this.keySerializer = new InternalRowSerializer(keyType); + this.isCompact = isCompact; } @Override @@ -170,6 +174,7 @@ public DataFileMeta result() throws IOException { level, deleteRecordCount, // TODO: enable file filter for primary key table (e.g. deletion table). - null); + null, + isCompact ? FileSource.COMPACT : FileSource.APPEND); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java index 276e03f305f5..8ec017e4c80d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java @@ -81,9 +81,12 @@ public DataFilePathFactory pathFactory(int level) { return formatContext.pathFactory(level); } - public RollingFileWriter createRollingMergeTreeFileWriter(int level) { + public RollingFileWriter createRollingMergeTreeFileWriter( + int level, boolean isCompact) { return new RollingFileWriter<>( - () -> createDataFileWriter(formatContext.pathFactory(level).newPath(), level), + () -> + createDataFileWriter( + formatContext.pathFactory(level).newPath(), level, isCompact), suggestedFileSize); } @@ -91,11 +94,11 @@ public RollingFileWriter createRollingChangelogFileWrite return new RollingFileWriter<>( () -> createDataFileWriter( - formatContext.pathFactory(level).newChangelogPath(), level), + formatContext.pathFactory(level).newChangelogPath(), level, false), suggestedFileSize); } - private KeyValueDataFileWriter createDataFileWriter(Path path, int level) { + private KeyValueDataFileWriter createDataFileWriter(Path path, int level, boolean isCompact) { KeyValueSerializer kvSerializer = new KeyValueSerializer(keyType, valueType); return new KeyValueDataFileWriter( fileIO, @@ -108,7 +111,8 @@ private KeyValueDataFileWriter createDataFileWriter(Path path, int level) { schemaId, level, formatContext.compression(level), - options); + options, + isCompact); } public void deleteFile(String filename, int level) { diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java index ce441b279cc7..45b40d3c8499 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java @@ -24,6 +24,7 @@ import org.apache.paimon.format.TableStatsExtractor; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.statistics.FieldStatsCollector; import org.apache.paimon.stats.BinaryTableStats; import org.apache.paimon.stats.FieldStatsArraySerializer; @@ -48,6 +49,7 @@ public class RowDataFileWriter extends StatsCollectingSingleFileWriter new RowDataFileWriter( @@ -57,7 +58,8 @@ public RowDataRollingFileWriter( seqNumCounter, fileCompression, statsCollectors, - fileIndexOptions), + fileIndexOptions, + isCompact), targetFileSize); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/FileSource.java b/paimon-core/src/main/java/org/apache/paimon/manifest/FileSource.java new file mode 100644 index 000000000000..c62aea2d64b5 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileSource.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.manifest; + +/** The Source of a file. */ +public enum FileSource { + + /** The file from new input. */ + APPEND((byte) 0), + + /** The file from compaction. */ + COMPACT((byte) 1); + + private final byte value; + + FileSource(byte value) { + this.value = value; + } + + public byte toByteValue() { + return value; + } + + public static FileSource fromByteValue(byte value) { + switch (value) { + case 0: + return APPEND; + case 1: + return COMPACT; + default: + throw new UnsupportedOperationException( + "Unsupported byte value '" + value + "' for value kind."); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java index a733ba9e120e..e0497956eb51 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java @@ -66,6 +66,7 @@ public ManifestEntry convertFrom(int version, InternalRow row) { "The current version %s is not compatible with the version %s, please recreate the table.", getVersion(), version)); } + throw new IllegalArgumentException("Unsupported version: " + version); } return new ManifestEntry( diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java index 17673fc06fa7..9baacecbf4f4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java @@ -207,7 +207,7 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul ? writerFactory.createRollingChangelogFileWriter(0) : null; final RollingFileWriter dataWriter = - writerFactory.createRollingMergeTreeFileWriter(0); + writerFactory.createRollingMergeTreeFileWriter(0, false); try { writeBuffer.forEach( diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java index 98553030d00d..a505b0933e45 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java @@ -128,7 +128,8 @@ private CompactResult rewriteOrProduceChangelog( readerForMergeTree(sections, createMergeWrapper(outputLevel)) .toCloseableIterator(); if (rewriteCompactFile) { - compactFileWriter = writerFactory.createRollingMergeTreeFileWriter(outputLevel); + compactFileWriter = + writerFactory.createRollingMergeTreeFileWriter(outputLevel, true); } if (produceChangelog) { changelogFileWriter = writerFactory.createRollingChangelogFileWriter(outputLevel); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java index 4cfe00f3f73e..47b50aa9f0aa 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java @@ -73,7 +73,7 @@ public CompactResult rewrite( protected CompactResult rewriteCompaction( int outputLevel, boolean dropDelete, List> sections) throws Exception { RollingFileWriter writer = - writerFactory.createRollingMergeTreeFileWriter(outputLevel); + writerFactory.createRollingMergeTreeFileWriter(outputLevel, true); RecordReader reader = readerForMergeTree(sections, new ReducerMergeFunctionWrapper(mfFactory.create())); if (dropDelete) { diff --git a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java index 2a0ec86f4a08..6d187a538c5e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java @@ -30,6 +30,7 @@ import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.statistics.FieldStatsCollector; import org.apache.paimon.stats.BinaryTableStats; import org.apache.paimon.stats.FieldStatsArraySerializer; @@ -165,7 +166,8 @@ private static DataFileMeta constructFileMeta( stats, 0, 0, - ((FileStoreTable) table).schema().id()); + ((FileStoreTable) table).schema().id(), + FileSource.APPEND); } public static BinaryRow writePartitionValue( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java index fc3f2a3d6d20..c547e7424ef7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java @@ -179,7 +179,9 @@ public AppendOnlyCompactManager.CompactRewriter compactRewriter( new LongCounter(toCompact.get(0).minSequenceNumber()), fileCompression, statsCollectors, - fileIndexOptions); + fileIndexOptions, + true); + try { rewriter.write(bucketReader(partition, bucket).read(toCompact)); } finally { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java b/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java new file mode 100644 index 000000000000..198c66a1ceac --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.operation; + +import org.apache.paimon.Changelog; +import org.apache.paimon.Snapshot; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.index.IndexFileHandler; +import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.manifest.IndexManifestEntry; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.ManifestFile; +import org.apache.paimon.manifest.ManifestFileMeta; +import org.apache.paimon.manifest.ManifestList; +import org.apache.paimon.stats.StatsFileHandler; +import org.apache.paimon.utils.FileStorePathFactory; + +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.Predicate; + +/** Delete changelog files. */ +public class ChangelogDeletion extends FileDeletionBase { + public ChangelogDeletion( + FileIO fileIO, + FileStorePathFactory pathFactory, + ManifestFile manifestFile, + ManifestList manifestList, + IndexFileHandler indexFileHandler, + StatsFileHandler statsFileHandler) { + super(fileIO, pathFactory, manifestFile, manifestList, indexFileHandler, statsFileHandler); + } + + @Override + public void cleanUnusedDataFiles(Changelog changelog, Predicate skipper) { + if (changelog.changelogManifestList() != null) { + deleteAddedDataFiles(changelog.changelogManifestList()); + } + + if (manifestList.exists(changelog.deltaManifestList())) { + cleanUnusedDataFiles(changelog.deltaManifestList(), skipper); + } + } + + @Override + public void cleanUnusedManifests(Changelog changelog, Set skippingSet) { + if (changelog.changelogManifestList() != null) { + cleanUnusedManifestList(changelog.changelogManifestList(), skippingSet); + } + + if (manifestList.exists(changelog.deltaManifestList())) { + cleanUnusedManifestList(changelog.deltaManifestList(), skippingSet); + } + + if (manifestList.exists(changelog.baseManifestList())) { + cleanUnusedManifestList(changelog.baseManifestList(), skippingSet); + } + + // the index and statics manifest list should handle by snapshot deletion. + } + + public Set manifestSkippingSet(List skippingSnapshots) { + Set skippingSet = new HashSet<>(); + + for (Snapshot skippingSnapshot : skippingSnapshots) { + // base manifests + if (manifestList.exists(skippingSnapshot.baseManifestList())) { + skippingSet.add(skippingSnapshot.baseManifestList()); + manifestList.read(skippingSnapshot.baseManifestList()).stream() + .map(ManifestFileMeta::fileName) + .forEach(skippingSet::add); + } + + // delta manifests + if (manifestList.exists(skippingSnapshot.deltaManifestList())) { + skippingSet.add(skippingSnapshot.deltaManifestList()); + manifestList.read(skippingSnapshot.deltaManifestList()).stream() + .map(ManifestFileMeta::fileName) + .forEach(skippingSet::add); + } + + // index manifests + String indexManifest = skippingSnapshot.indexManifest(); + if (indexManifest != null) { + skippingSet.add(indexManifest); + indexFileHandler.readManifest(indexManifest).stream() + .map(IndexManifestEntry::indexFile) + .map(IndexFileMeta::fileName) + .forEach(skippingSet::add); + } + + // statistics + if (skippingSnapshot.statistics() != null) { + skippingSet.add(skippingSnapshot.statistics()); + } + } + + return skippingSet; + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java index 8bd422e53f00..7de8f4d1f46c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileDeletionBase.java @@ -25,6 +25,7 @@ import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.manifest.FileEntry; +import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.IndexManifestEntry; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFile; @@ -33,6 +34,8 @@ import org.apache.paimon.stats.StatsFileHandler; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.FileUtils; +import org.apache.paimon.utils.Pair; +import org.apache.paimon.utils.TagManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,7 +60,7 @@ * Base class for file deletion including methods for clean data files, manifest files and empty * data directories. */ -public abstract class FileDeletionBase { +public abstract class FileDeletionBase { private static final Logger LOG = LoggerFactory.getLogger(FileDeletionBase.class); @@ -71,6 +74,12 @@ public abstract class FileDeletionBase { protected final Map> deletionBuckets; protected final Executor ioExecutor; + /** Used to record which tag is cached in tagged snapshots list. */ + private int cachedTagIndex = -1; + + /** Used to cache data files used by current tag. */ + private final Map>> cachedTagDataFiles = new HashMap<>(); + public FileDeletionBase( FileIO fileIO, FileStorePathFactory pathFactory, @@ -95,7 +104,7 @@ public FileDeletionBase( * @param skipper if the test result of a data file is true, it will be skipped when deleting; * else it will be deleted */ - public abstract void cleanUnusedDataFiles(Snapshot snapshot, Predicate skipper); + public abstract void cleanUnusedDataFiles(T snapshot, Predicate skipper); /** * Clean metadata files that will not be used anymore of a snapshot, including data manifests, @@ -104,7 +113,7 @@ public FileDeletionBase( * @param snapshot {@link Snapshot} that will be cleaned * @param skippingSet manifests that should not be deleted */ - public abstract void cleanUnusedManifests(Snapshot snapshot, Set skippingSet); + public abstract void cleanUnusedManifests(T snapshot, Set skippingSet); /** Try to delete data directories that may be empty after data file deletion. */ public void cleanDataDirectories() { @@ -151,6 +160,105 @@ protected void recordDeletionBuckets(ManifestEntry entry) { .add(entry.bucket()); } + public void cleanUnusedDataFiles(String manifestList, Predicate skipper) { + // try read manifests + List manifestFileNames = readManifestFileNames(tryReadManifestList(manifestList)); + List manifestEntries; + // data file path -> (original manifest entry, extra file paths) + Map>> dataFileToDelete = new HashMap<>(); + for (String manifest : manifestFileNames) { + try { + manifestEntries = manifestFile.read(manifest); + } catch (Exception e) { + // cancel deletion if any exception occurs + LOG.warn("Failed to read some manifest files. Cancel deletion.", e); + return; + } + + getDataFileToDelete(dataFileToDelete, manifestEntries); + } + + doCleanUnusedDataFile(dataFileToDelete, skipper); + } + + protected void doCleanUnusedDataFile( + Map>> dataFileToDelete, + Predicate skipper) { + List actualDataFileToDelete = new ArrayList<>(); + dataFileToDelete.forEach( + (path, pair) -> { + ManifestEntry entry = pair.getLeft(); + // check whether we should skip the data file + if (!skipper.test(entry)) { + // delete data files + actualDataFileToDelete.add(path); + actualDataFileToDelete.addAll(pair.getRight()); + + recordDeletionBuckets(entry); + } + }); + deleteFiles(actualDataFileToDelete, fileIO::deleteQuietly); + } + + protected void getDataFileToDelete( + Map>> dataFileToDelete, + List dataFileEntries) { + // we cannot delete a data file directly when we meet a DELETE entry, because that + // file might be upgraded + for (ManifestEntry entry : dataFileEntries) { + Path bucketPath = pathFactory.bucketPath(entry.partition(), entry.bucket()); + Path dataFilePath = new Path(bucketPath, entry.file().fileName()); + switch (entry.kind()) { + case ADD: + dataFileToDelete.remove(dataFilePath); + break; + case DELETE: + List extraFiles = new ArrayList<>(entry.file().extraFiles().size()); + for (String file : entry.file().extraFiles()) { + extraFiles.add(new Path(bucketPath, file)); + } + dataFileToDelete.put(dataFilePath, Pair.of(entry, extraFiles)); + break; + default: + throw new UnsupportedOperationException( + "Unknown value kind " + entry.kind().name()); + } + } + } + + /** + * Delete added file in the manifest list files. Added files marked as "ADD" in manifests. + * + * @param manifestListName name of manifest list + */ + public void deleteAddedDataFiles(String manifestListName) { + List manifestFileNames = + readManifestFileNames(tryReadManifestList(manifestListName)); + for (String file : manifestFileNames) { + try { + List manifestEntries = manifestFile.read(file); + deleteAddedDataFiles(manifestEntries); + } catch (Exception e) { + // We want to delete the data file, so just ignore the unavailable files + LOG.info("Failed to read manifest " + file + ". Ignore it.", e); + } + } + } + + private void deleteAddedDataFiles(List manifestEntries) { + List dataFileToDelete = new ArrayList<>(); + for (ManifestEntry entry : manifestEntries) { + if (entry.kind() == FileKind.ADD) { + dataFileToDelete.add( + new Path( + pathFactory.bucketPath(entry.partition(), entry.bucket()), + entry.file().fileName())); + recordDeletionBuckets(entry); + } + } + deleteFiles(dataFileToDelete, fileIO::deleteQuietly); + } + public void cleanUnusedStatisticsManifests(Snapshot snapshot, Set skippingSet) { // clean statistics if (snapshot.statistics() != null && !skippingSet.contains(snapshot.statistics())) { @@ -194,9 +302,15 @@ public void cleanUnusedManifestList(String manifestName, Set skippingSet } public void cleanUnusedManifests( - Snapshot snapshot, Set skippingSet, boolean deleteChangelog) { - cleanUnusedManifestList(snapshot.baseManifestList(), skippingSet); - cleanUnusedManifestList(snapshot.deltaManifestList(), skippingSet); + Snapshot snapshot, + Set skippingSet, + boolean deleteDataManifestLists, + boolean deleteChangelog) { + if (deleteDataManifestLists) { + cleanUnusedManifestList(snapshot.baseManifestList(), skippingSet); + cleanUnusedManifestList(snapshot.deltaManifestList(), skippingSet); + } + if (deleteChangelog && snapshot.changelogManifestList() != null) { cleanUnusedManifestList(snapshot.changelogManifestList(), skippingSet); } @@ -204,6 +318,19 @@ public void cleanUnusedManifests( cleanUnusedStatisticsManifests(snapshot, skippingSet); } + public Predicate dataFileSkipper( + List taggedSnapshots, long expiringSnapshotId) throws Exception { + int index = TagManager.findPreviousTag(taggedSnapshots, expiringSnapshotId); + // refresh tag data files + if (index >= 0 && cachedTagIndex != index) { + cachedTagIndex = index; + cachedTagDataFiles.clear(); + addMergedDataFiles(cachedTagDataFiles, taggedSnapshots.get(index)); + } + + return entry -> index >= 0 && containsDataFile(cachedTagDataFiles, entry); + } + /** * It is possible that a job was killed during expiration and some manifest files have been * deleted, so if the clean methods need to get manifests of a snapshot to be cleaned, we should diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index e78174db7ac9..16d854ea43a4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -212,25 +212,72 @@ private Map getCandidateDeletingFiles() { private List getUsedFilesForChangelog(Changelog changelog) { List files = new ArrayList<>(); - if (changelog.changelogManifestList() != null) { - files.add(changelog.changelogManifestList()); - } - + List manifestFileMetas = new ArrayList<>(); try { // try to read manifests - List manifestFileMetas = - retryReadingFiles( - () -> - manifestList.readWithIOException( - changelog.changelogManifestList())); - if (manifestFileMetas == null) { - return Collections.emptyList(); + // changelog manifest + List changelogManifest = new ArrayList<>(); + if (changelog.changelogManifestList() != null) { + files.add(changelog.changelogManifestList()); + changelogManifest = + retryReadingFiles( + () -> + manifestList.readWithIOException( + changelog.changelogManifestList())); + if (changelogManifest != null) { + manifestFileMetas.addAll(changelogManifest); + } } - List manifestFileName = + + // base manifest + if (manifestList.exists(changelog.baseManifestList())) { + files.add(changelog.baseManifestList()); + List baseManifest = + retryReadingFiles( + () -> + manifestList.readWithIOException( + changelog.baseManifestList())); + if (baseManifest != null) { + manifestFileMetas.addAll(baseManifest); + } + } + + // delta manifest + List deltaManifest = null; + if (manifestList.exists(changelog.deltaManifestList())) { + files.add(changelog.deltaManifestList()); + deltaManifest = + retryReadingFiles( + () -> + manifestList.readWithIOException( + changelog.deltaManifestList())); + if (deltaManifest != null) { + manifestFileMetas.addAll(deltaManifest); + } + } + + files.addAll( manifestFileMetas.stream() .map(ManifestFileMeta::fileName) - .collect(Collectors.toList()); - files.addAll(manifestFileName); + .collect(Collectors.toList())); + + // data file + List manifestFileName = new ArrayList<>(); + if (changelog.changelogManifestList() != null) { + manifestFileName.addAll( + changelogManifest == null + ? new ArrayList<>() + : changelogManifest.stream() + .map(ManifestFileMeta::fileName) + .collect(Collectors.toList())); + } else { + manifestFileName.addAll( + deltaManifest == null + ? new ArrayList<>() + : deltaManifest.stream() + .map(ManifestFileMeta::fileName) + .collect(Collectors.toList())); + } // try to read data files List dataFiles = retryReadingDataFiles(manifestFileName); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java index a0c442fcd96d..14e348f17b29 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/SnapshotDeletion.java @@ -20,23 +20,17 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.annotation.VisibleForTesting; -import org.apache.paimon.data.BinaryRow; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.index.IndexFileHandler; -import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestList; import org.apache.paimon.stats.StatsFileHandler; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Pair; -import org.apache.paimon.utils.TagManager; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -44,15 +38,11 @@ import java.util.function.Predicate; /** Delete snapshot files. */ -public class SnapshotDeletion extends FileDeletionBase { - - private static final Logger LOG = LoggerFactory.getLogger(SnapshotDeletion.class); +public class SnapshotDeletion extends FileDeletionBase { - /** Used to record which tag is cached in tagged snapshots list. */ - private int cachedTagIndex = -1; + private final boolean changelogDecoupled; - /** Used to cache data files used by current tag. */ - private final Map>> cachedTagDataFiles = new HashMap<>(); + private final boolean produceChangelog; public SnapshotDeletion( FileIO fileIO, @@ -60,84 +50,39 @@ public SnapshotDeletion( ManifestFile manifestFile, ManifestList manifestList, IndexFileHandler indexFileHandler, - StatsFileHandler statsFileHandler) { + StatsFileHandler statsFileHandler, + boolean changelogDecoupled, + boolean produceChangelog) { super(fileIO, pathFactory, manifestFile, manifestList, indexFileHandler, statsFileHandler); + this.produceChangelog = produceChangelog; + this.changelogDecoupled = changelogDecoupled; } @Override public void cleanUnusedDataFiles(Snapshot snapshot, Predicate skipper) { - cleanUnusedDataFiles(snapshot.deltaManifestList(), skipper); - } - - public void cleanUnusedDataFiles(String manifestList, Predicate skipper) { - // try read manifests - List manifestFileNames = readManifestFileNames(tryReadManifestList(manifestList)); - List manifestEntries; - // data file path -> (original manifest entry, extra file paths) - Map>> dataFileToDelete = new HashMap<>(); - for (String manifest : manifestFileNames) { - try { - manifestEntries = manifestFile.read(manifest); - } catch (Exception e) { - // cancel deletion if any exception occurs - LOG.warn("Failed to read some manifest files. Cancel deletion.", e); - return; - } - - getDataFileToDelete(dataFileToDelete, manifestEntries); + if (changelogDecoupled && !produceChangelog) { + // Skip clean the 'APPEND' data files.If we do not have the file source information + // eg: the old version table file, we just skip clean this here, let it done by + // ExpireChangelogImpl + Predicate enriched = + manifestEntry -> + skipper.test(manifestEntry) + || (manifestEntry.file().fileSource().orElse(FileSource.APPEND) + == FileSource.APPEND); + cleanUnusedDataFiles(snapshot.deltaManifestList(), enriched); + } else { + cleanUnusedDataFiles(snapshot.deltaManifestList(), skipper); } - - doCleanUnusedDataFile(dataFileToDelete, skipper); } @Override public void cleanUnusedManifests(Snapshot snapshot, Set skippingSet) { - cleanUnusedManifests(snapshot, skippingSet, true); - } - - private void getDataFileToDelete( - Map>> dataFileToDelete, - List dataFileEntries) { - // we cannot delete a data file directly when we meet a DELETE entry, because that - // file might be upgraded - for (ManifestEntry entry : dataFileEntries) { - Path bucketPath = pathFactory.bucketPath(entry.partition(), entry.bucket()); - Path dataFilePath = new Path(bucketPath, entry.file().fileName()); - switch (entry.kind()) { - case ADD: - dataFileToDelete.remove(dataFilePath); - break; - case DELETE: - List extraFiles = new ArrayList<>(entry.file().extraFiles().size()); - for (String file : entry.file().extraFiles()) { - extraFiles.add(new Path(bucketPath, file)); - } - dataFileToDelete.put(dataFilePath, Pair.of(entry, extraFiles)); - break; - default: - throw new UnsupportedOperationException( - "Unknown value kind " + entry.kind().name()); - } - } - } - - private void doCleanUnusedDataFile( - Map>> dataFileToDelete, - Predicate skipper) { - List actualDataFileToDelete = new ArrayList<>(); - dataFileToDelete.forEach( - (path, pair) -> { - ManifestEntry entry = pair.getLeft(); - // check whether we should skip the data file - if (!skipper.test(entry)) { - // delete data files - actualDataFileToDelete.add(path); - actualDataFileToDelete.addAll(pair.getRight()); - - recordDeletionBuckets(entry); - } - }); - deleteFiles(actualDataFileToDelete, fileIO::deleteQuietly); + // delay clean the base and delta manifest lists when changelog decoupled enabled + cleanUnusedManifests( + snapshot, + skippingSet, + !changelogDecoupled || produceChangelog, + !changelogDecoupled); } @VisibleForTesting @@ -146,50 +91,4 @@ void cleanUnusedDataFile(List dataFileLog) { getDataFileToDelete(dataFileToDelete, dataFileLog); doCleanUnusedDataFile(dataFileToDelete, f -> false); } - - /** - * Delete added file in the manifest list files. Added files marked as "ADD" in manifests. - * - * @param manifestListName name of manifest list - */ - public void deleteAddedDataFiles(String manifestListName) { - List manifestFileNames = - readManifestFileNames(tryReadManifestList(manifestListName)); - for (String file : manifestFileNames) { - try { - List manifestEntries = manifestFile.read(file); - deleteAddedDataFiles(manifestEntries); - } catch (Exception e) { - // We want to delete the data file, so just ignore the unavailable files - LOG.info("Failed to read manifest " + file + ". Ignore it.", e); - } - } - } - - private void deleteAddedDataFiles(List manifestEntries) { - List dataFileToDelete = new ArrayList<>(); - for (ManifestEntry entry : manifestEntries) { - if (entry.kind() == FileKind.ADD) { - dataFileToDelete.add( - new Path( - pathFactory.bucketPath(entry.partition(), entry.bucket()), - entry.file().fileName())); - recordDeletionBuckets(entry); - } - } - deleteFiles(dataFileToDelete, fileIO::deleteQuietly); - } - - public Predicate dataFileSkipper( - List taggedSnapshots, long expiringSnapshotId) throws Exception { - int index = TagManager.findPreviousTag(taggedSnapshots, expiringSnapshotId); - // refresh tag data files - if (index >= 0 && cachedTagIndex != index) { - cachedTagIndex = index; - cachedTagDataFiles.clear(); - addMergedDataFiles(cachedTagDataFiles, taggedSnapshots.get(index)); - } - - return entry -> index >= 0 && containsDataFile(cachedTagDataFiles, entry); - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java index 7ef258046d82..3b1174223676 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/TagDeletion.java @@ -43,7 +43,7 @@ import java.util.function.Predicate; /** Delete tag files. */ -public class TagDeletion extends FileDeletionBase { +public class TagDeletion extends FileDeletionBase { private static final Logger LOG = LoggerFactory.getLogger(TagDeletion.class); @@ -85,7 +85,7 @@ public void cleanUnusedDataFiles(Snapshot taggedSnapshot, Predicate skippingSet) { // doesn't clean changelog files because they are handled by SnapshotDeletion - cleanUnusedManifests(taggedSnapshot, skippingSet, false); + cleanUnusedManifests(taggedSnapshot, skippingSet, true, false); } public Predicate dataFileSkipper(Snapshot fromSnapshot) throws Exception { diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java index fab5f31fd3ec..1bd4a78ea6a4 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java @@ -117,23 +117,7 @@ public static void validateTableSchema(TableSchema schema) { ChangelogProducer.LOOKUP)); } - checkArgument( - options.snapshotNumRetainMin() > 0, - SNAPSHOT_NUM_RETAINED_MIN.key() + " should be at least 1"); - checkArgument( - options.snapshotNumRetainMin() <= options.snapshotNumRetainMax(), - SNAPSHOT_NUM_RETAINED_MIN.key() - + " should not be larger than " - + SNAPSHOT_NUM_RETAINED_MAX.key()); - - checkArgument( - options.changelogNumRetainMin() > 0, - CHANGELOG_NUM_RETAINED_MIN.key() + " should be at least 1"); - checkArgument( - options.changelogNumRetainMin() <= options.changelogNumRetainMax(), - CHANGELOG_NUM_RETAINED_MIN.key() - + " should not be larger than " - + CHANGELOG_NUM_RETAINED_MAX.key()); + validateSnapshotAndChangelogRetainOption(options); // Get the format type here which will try to convert string value to {@Code // FileFormatType}. If the string value is illegal, an exception will be thrown. @@ -190,6 +174,26 @@ public static void validateTableSchema(TableSchema schema) { } } + public static void validateSnapshotAndChangelogRetainOption(CoreOptions options) { + checkArgument( + options.snapshotNumRetainMin() > 0, + SNAPSHOT_NUM_RETAINED_MIN.key() + " should be at least 1"); + checkArgument( + options.snapshotNumRetainMin() <= options.snapshotNumRetainMax(), + SNAPSHOT_NUM_RETAINED_MIN.key() + + " should not be larger than " + + SNAPSHOT_NUM_RETAINED_MAX.key()); + + checkArgument( + options.changelogNumRetainMin() > 0, + CHANGELOG_NUM_RETAINED_MIN.key() + " should be at least 1"); + checkArgument( + options.changelogNumRetainMin() <= options.changelogNumRetainMax(), + CHANGELOG_NUM_RETAINED_MIN.key() + + " should not be larger than " + + CHANGELOG_NUM_RETAINED_MAX.key()); + } + private static void validateOnlyContainPrimitiveType( List fields, List fieldNames, String errorMessageIntro) { if (!fieldNames.isEmpty()) { diff --git a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java index 1ca321d563a5..dba2958de708 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java @@ -288,22 +288,20 @@ public SnapshotManager snapshotManager() { } @Override - public ExpireSnapshots newExpireSnapshots() { - return new ExpireSnapshotsImpl( + public ExpireSnapshots newExpireSnapshots(CoreOptions options) { + return new ExpireSnapshots.Expire( snapshotManager(), - store().newSnapshotDeletion(), + store().newSnapshotDeletion(options), + store().newChangelogDeletion(options), store().newTagManager(), - coreOptions().snapshotExpireCleanEmptyDirectories(), - coreOptions().changelogLifecycleDecoupled()); - } - - @Override - public ExpireSnapshots newExpireChangelog() { - return new ExpireChangelogImpl( - snapshotManager(), - tagManager(), - store().newSnapshotDeletion(), - coreOptions().snapshotExpireCleanEmptyDirectories()); + options.snapshotExpireCleanEmptyDirectories(), + options.snapshotNumRetainMax(), + options.snapshotNumRetainMin(), + options.snapshotTimeRetain().toMillis(), + options.changelogNumRetainMax(), + options.changelogNumRetainMin(), + options.changelogTimeRetain().toMillis(), + options.snapshotExpireLimit()); } @Override @@ -316,27 +314,8 @@ public TableCommitImpl newCommit(String commitUser, String branchName) { CoreOptions options = coreOptions(); Runnable snapshotExpire = null; if (!options.writeOnly()) { - boolean changelogDecoupled = options.changelogLifecycleDecoupled(); - ExpireSnapshots expireChangelog = - newExpireChangelog() - .maxDeletes(options.snapshotExpireLimit()) - .retainMin(options.changelogNumRetainMin()) - .retainMax(options.changelogNumRetainMax()); - ExpireSnapshots expireSnapshots = - newExpireSnapshots() - .retainMax(options.snapshotNumRetainMax()) - .retainMin(options.snapshotNumRetainMin()) - .maxDeletes(options.snapshotExpireLimit()); - long snapshotTimeRetain = options.snapshotTimeRetain().toMillis(); - long changelogTimeRetain = options.changelogTimeRetain().toMillis(); - snapshotExpire = - () -> { - long current = System.currentTimeMillis(); - expireSnapshots.olderThanMills(current - snapshotTimeRetain).expire(); - if (changelogDecoupled) { - expireChangelog.olderThanMills(current - changelogTimeRetain).expire(); - } - }; + ExpireSnapshots expireSnapshots = newExpireSnapshots(options); + snapshotExpire = expireSnapshots::expire; } return new TableCommitImpl( @@ -574,7 +553,8 @@ private RollbackHelper rollbackHelper() { snapshotManager(), tagManager(), fileIO, - store().newSnapshotDeletion(), + store().newSnapshotDeletion(store().options()), + store().newChangelogDeletion(store().options()), store().newTagDeletion()); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java index b237fe63069e..2f22faf2766a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireChangelogImpl.java @@ -19,8 +19,10 @@ package org.apache.paimon.table; import org.apache.paimon.Changelog; +import org.apache.paimon.Snapshot; import org.apache.paimon.consumer.ConsumerManager; -import org.apache.paimon.operation.SnapshotDeletion; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.operation.ChangelogDeletion; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -30,7 +32,9 @@ import java.io.IOException; import java.io.UncheckedIOException; -import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.function.Predicate; /** Cleanup the changelog in changelog directory. */ public class ExpireChangelogImpl implements ExpireSnapshots { @@ -39,54 +43,39 @@ public class ExpireChangelogImpl implements ExpireSnapshots { private final SnapshotManager snapshotManager; private final ConsumerManager consumerManager; - private final SnapshotDeletion snapshotDeletion; + private final ChangelogDeletion changelogDeletion; private final boolean cleanEmptyDirectories; private final TagManager tagManager; - private long olderThanMills = 0; - public int retainMin = 1; - private int retainMax = Integer.MAX_VALUE; - private int maxDeletes = Integer.MAX_VALUE; + private final int retainMax; + private final int retainMin; + private final long changelogTimeToRetain; + private final int maxDeletes; - public ExpireChangelogImpl( + ExpireChangelogImpl( SnapshotManager snapshotManager, TagManager tagManager, - SnapshotDeletion snapshotDeletion, - boolean cleanEmptyDirectories) { + ChangelogDeletion changelogDeletion, + boolean cleanEmptyDirectories, + int retainMax, + int retainMin, + long changelogTimeToRetain, + int maxDeletes) { this.snapshotManager = snapshotManager; this.tagManager = tagManager; + this.changelogDeletion = changelogDeletion; this.consumerManager = new ConsumerManager(snapshotManager.fileIO(), snapshotManager.tablePath()); - this.snapshotDeletion = snapshotDeletion; this.cleanEmptyDirectories = cleanEmptyDirectories; - } - - @Override - public ExpireChangelogImpl retainMax(int retainMax) { this.retainMax = retainMax; - return this; - } - - @Override - public ExpireChangelogImpl retainMin(int retainMin) { this.retainMin = retainMin; - return this; - } - - @Override - public ExpireChangelogImpl olderThanMills(long olderThanMills) { - this.olderThanMills = olderThanMills; - return this; - } - - @Override - public ExpireChangelogImpl maxDeletes(int maxDeletes) { + this.changelogTimeToRetain = changelogTimeToRetain; this.maxDeletes = maxDeletes; - return this; } @Override public int expire() { + long olderThanMills = System.currentTimeMillis() - changelogTimeToRetain; Long latestSnapshotId = snapshotManager.latestSnapshotId(); if (latestSnapshotId == null) { // no snapshot, nothing to expire @@ -143,24 +132,36 @@ public int expireUntil(long earliestId, long endExclusiveId) { if (LOG.isDebugEnabled()) { LOG.debug("Changelog expire range is [" + earliestId + ", " + endExclusiveId + ")"); } + List taggedSnapshots = tagManager.taggedSnapshots(); + + List skippingSnapshots = + TagManager.findOverlappedSnapshots(taggedSnapshots, earliestId, endExclusiveId); + skippingSnapshots.add(snapshotManager.changelog(endExclusiveId)); + Set manifestSkippSet = changelogDeletion.manifestSkippingSet(skippingSnapshots); for (long id = earliestId; id < endExclusiveId; id++) { if (LOG.isDebugEnabled()) { - LOG.debug("Ready to delete changelog files from snapshot #" + id); + LOG.debug("Ready to delete changelog files from changelog #" + id); } Changelog changelog = snapshotManager.longLivedChangelog(id); - // delete changelog files - if (changelog.changelogManifestList() != null) { - snapshotDeletion.deleteAddedDataFiles(changelog.changelogManifestList()); - snapshotDeletion.cleanUnusedManifestList( - changelog.changelogManifestList(), new HashSet<>()); + Predicate skipper; + try { + skipper = changelogDeletion.dataFileSkipper(taggedSnapshots, id); + } catch (Exception e) { + LOG.info( + String.format( + "Skip cleaning data files of changelog '%s' due to failed to build skipping set.", + id), + e); + continue; } - + changelogDeletion.cleanUnusedDataFiles(changelog, skipper); + changelogDeletion.cleanUnusedManifests(changelog, manifestSkippSet); snapshotManager.fileIO().deleteQuietly(snapshotManager.longLivedChangelogPath(id)); } if (cleanEmptyDirectories) { - snapshotDeletion.cleanDataDirectories(); + changelogDeletion.cleanDataDirectories(); } writeEarliestHintFile(endExclusiveId); return (int) (endExclusiveId - earliestId); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshots.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshots.java index a4459d75fd30..f8e6830bd23e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshots.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshots.java @@ -18,17 +18,79 @@ package org.apache.paimon.table; +import org.apache.paimon.operation.ChangelogDeletion; +import org.apache.paimon.operation.SnapshotDeletion; +import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.TagManager; + +import javax.annotation.Nullable; + /** Expire snapshots. */ public interface ExpireSnapshots { - ExpireSnapshots retainMax(int retainMax); + /** @return How many snapshots have been expired. */ + int expire(); + + /** Expire the snapshots and changelogs. */ + class Expire implements ExpireSnapshots { - ExpireSnapshots retainMin(int retainMin); + private @Nullable ExpireChangelogImpl expireChangelogs; + private final ExpireSnapshotsImpl expireSnapshots; - ExpireSnapshots olderThanMills(long olderThanMills); + public Expire( + SnapshotManager snapshotManager, + SnapshotDeletion snapshotDeletion, + ChangelogDeletion changelogDeletion, + TagManager tagManager, + boolean cleanEmptyDirectories, + int snapshotRetainMax, + int snapshotRetainMin, + long snapshotTimeToRetain, + int changelogRetainMax, + int changelogRetainMin, + long changelogTimeToRetain, + int maxDeletes) { + boolean changelogDecoupled = + changelogRetainMax > snapshotRetainMax + || changelogRetainMin > snapshotRetainMin + || changelogTimeToRetain > snapshotTimeToRetain; + this.expireSnapshots = + new ExpireSnapshotsImpl( + snapshotManager, + snapshotDeletion, + tagManager, + cleanEmptyDirectories, + changelogDecoupled, + snapshotRetainMax, + snapshotRetainMin, + snapshotTimeToRetain, + maxDeletes); + if (changelogDecoupled) { + this.expireChangelogs = + new ExpireChangelogImpl( + snapshotManager, + tagManager, + changelogDeletion, + cleanEmptyDirectories, + changelogRetainMax, + changelogRetainMin, + changelogTimeToRetain, + maxDeletes); + } + } - ExpireSnapshots maxDeletes(int maxDeletes); + public ExpireSnapshotsImpl getExpireSnapshots() { + return expireSnapshots; + } - /** @return How many snapshots have been expired. */ - int expire(); + @Override + public int expire() { + int snapshot = expireSnapshots.expire(); + if (expireChangelogs != null) { + snapshot += expireChangelogs.expire(); + } + + return snapshot; + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java index 50cf39eb8971..4d957e342550 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ExpireSnapshotsImpl.java @@ -50,17 +50,21 @@ public class ExpireSnapshotsImpl implements ExpireSnapshots { /** Whether to clean the changelog or delta files. */ private final boolean changelogDecoupled; - private int retainMax = Integer.MAX_VALUE; - private int retainMin = 1; - private long olderThanMills = 0; - private int maxDeletes = Integer.MAX_VALUE; + private final int retainMax; + private final int retainMin; + private final long snapshotTimeToRetain; + private final int maxDeletes; - public ExpireSnapshotsImpl( + ExpireSnapshotsImpl( SnapshotManager snapshotManager, SnapshotDeletion snapshotDeletion, TagManager tagManager, boolean cleanEmptyDirectories, - boolean changelogDecoupled) { + boolean changelogDecoupled, + int retainMax, + int retainMin, + long snapshotTimeToRetain, + int maxDeletes) { this.snapshotManager = snapshotManager; this.consumerManager = new ConsumerManager(snapshotManager.fileIO(), snapshotManager.tablePath()); @@ -68,34 +72,15 @@ public ExpireSnapshotsImpl( this.tagManager = tagManager; this.cleanEmptyDirectories = cleanEmptyDirectories; this.changelogDecoupled = changelogDecoupled; - } - - @Override - public ExpireSnapshotsImpl retainMax(int retainMax) { this.retainMax = retainMax; - return this; - } - - @Override - public ExpireSnapshotsImpl retainMin(int retainMin) { this.retainMin = retainMin; - return this; - } - - @Override - public ExpireSnapshotsImpl olderThanMills(long olderThanMills) { - this.olderThanMills = olderThanMills; - return this; - } - - @Override - public ExpireSnapshotsImpl maxDeletes(int maxDeletes) { + this.snapshotTimeToRetain = snapshotTimeToRetain; this.maxDeletes = maxDeletes; - return this; } @Override public int expire() { + long olderThanMills = System.currentTimeMillis() - snapshotTimeToRetain; Long latestSnapshotId = snapshotManager.latestSnapshotId(); if (latestSnapshotId == null) { // no snapshot, nothing to expire @@ -226,7 +211,7 @@ public int expireUntil(long earliestId, long endExclusiveId) { } Snapshot snapshot = snapshotManager.snapshot(id); - snapshotDeletion.cleanUnusedManifests(snapshot, skippingSet, !changelogDecoupled); + snapshotDeletion.cleanUnusedManifests(snapshot, skippingSet); if (changelogDecoupled) { commitChangelog(new Changelog(snapshot)); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java index be4976f1003e..6e1b807ee6a5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/ReadonlyTable.java @@ -18,6 +18,7 @@ package org.apache.paimon.table; +import org.apache.paimon.CoreOptions; import org.apache.paimon.stats.Statistics; import org.apache.paimon.table.sink.BatchWriteBuilder; import org.apache.paimon.table.sink.InnerTableCommit; @@ -167,18 +168,10 @@ default void deleteBranch(String branchName) { } @Override - default ExpireSnapshots newExpireSnapshots() { + default ExpireSnapshots newExpireSnapshots(CoreOptions options) { throw new UnsupportedOperationException( String.format( "Readonly Table %s does not support expireSnapshots.", this.getClass().getSimpleName())); } - - @Override - default ExpireSnapshots newExpireChangelog() { - throw new UnsupportedOperationException( - String.format( - "Readonly Table %s does not support expireChangelog.", - this.getClass().getSimpleName())); - } } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java index 90801caf0978..bd608cdcaf4c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/RollbackHelper.java @@ -22,6 +22,7 @@ import org.apache.paimon.Snapshot; import org.apache.paimon.fs.FileIO; import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.operation.ChangelogDeletion; import org.apache.paimon.operation.SnapshotDeletion; import org.apache.paimon.operation.TagDeletion; import org.apache.paimon.utils.SnapshotManager; @@ -51,6 +52,7 @@ public class RollbackHelper { private final TagManager tagManager; private final FileIO fileIO; private final SnapshotDeletion snapshotDeletion; + private final ChangelogDeletion changelogDeletion; private final TagDeletion tagDeletion; public RollbackHelper( @@ -58,11 +60,13 @@ public RollbackHelper( TagManager tagManager, FileIO fileIO, SnapshotDeletion snapshotDeletion, + ChangelogDeletion changelogDeletion, TagDeletion tagDeletion) { this.snapshotManager = snapshotManager; this.tagManager = tagManager; this.fileIO = fileIO; this.snapshotDeletion = snapshotDeletion; + this.changelogDeletion = changelogDeletion; this.tagDeletion = tagDeletion; } @@ -72,6 +76,7 @@ public void cleanLargerThan(Snapshot retainedSnapshot) { List cleanedSnapshots = cleanSnapshotsDataFiles(retainedSnapshot); List cleanedChangelogs = cleanLongLivedChangelogDataFiles(retainedSnapshot); List cleanedTags = cleanTagsDataFiles(retainedSnapshot); + Set cleanedIds = new HashSet<>(); // clean manifests // this can be used for snapshots and tags manifests cleaning both @@ -79,17 +84,18 @@ public void cleanLargerThan(Snapshot retainedSnapshot) { for (Snapshot snapshot : cleanedSnapshots) { snapshotDeletion.cleanUnusedManifests(snapshot, manifestsSkippingSet); + cleanedIds.add(snapshot.id()); } for (Changelog changelog : cleanedChangelogs) { - if (changelog.changelogManifestList() != null) { - snapshotDeletion.cleanUnusedManifestList( - changelog.changelogManifestList(), new HashSet<>()); - } + changelogDeletion.cleanUnusedManifests(changelog, manifestsSkippingSet); + cleanedIds.add(changelog.id()); } - cleanedTags.removeAll(cleanedSnapshots); for (Snapshot snapshot : cleanedTags) { + if (cleanedIds.contains(snapshot.id())) { + continue; + } tagDeletion.cleanUnusedManifests(snapshot, manifestsSkippingSet); } @@ -122,7 +128,9 @@ private List cleanSnapshotsDataFiles(Snapshot retainedSnapshot) { // when deleting non-existing data files for (Snapshot snapshot : toBeCleaned) { snapshotDeletion.deleteAddedDataFiles(snapshot.deltaManifestList()); - snapshotDeletion.deleteAddedDataFiles(snapshot.changelogManifestList()); + if (snapshot.changelogManifestList() != null) { + snapshotDeletion.deleteAddedDataFiles(snapshot.changelogManifestList()); + } } // delete directories @@ -149,9 +157,8 @@ private List cleanLongLivedChangelogDataFiles(Snapshot retainedSnapsh // delete data files of changelog for (Changelog changelog : toBeCleaned) { - if (changelog.changelogManifestList() != null) { - snapshotDeletion.deleteAddedDataFiles(changelog.changelogManifestList()); - } + // clean the deleted file + changelogDeletion.cleanUnusedDataFiles(changelog, manifestEntry -> false); } // delete directories diff --git a/paimon-core/src/main/java/org/apache/paimon/table/Table.java b/paimon-core/src/main/java/org/apache/paimon/table/Table.java index 3650b773cb5a..ec9204dfe567 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/Table.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/Table.java @@ -18,6 +18,7 @@ package org.apache.paimon.table; +import org.apache.paimon.CoreOptions; import org.apache.paimon.annotation.Experimental; import org.apache.paimon.annotation.Public; import org.apache.paimon.stats.Statistics; @@ -105,10 +106,7 @@ public interface Table extends Serializable { /** Manually expire snapshots, parameters can be controlled independently of table options. */ @Experimental - ExpireSnapshots newExpireSnapshots(); - - @Experimental - ExpireSnapshots newExpireChangelog(); + ExpireSnapshots newExpireSnapshots(CoreOptions options); // =============== Read & Write Operations ================== diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java index 6601bf548043..bb9950bd8b47 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractInnerTableScan.java @@ -122,7 +122,6 @@ protected StartingScanner createStartingScanner(boolean isStreaming) { return new ContinuousFromSnapshotStartingScanner( snapshotManager, consumer.get().nextSnapshot(), - options.changelogProducer() != ChangelogProducer.NONE, options.changelogLifecycleDecoupled()); } } @@ -152,7 +151,6 @@ protected StartingScanner createStartingScanner(boolean isStreaming) { ? new ContinuousFromTimestampStartingScanner( snapshotManager, startupMillis, - options.changelogProducer() != ChangelogProducer.NONE, options.changelogLifecycleDecoupled()) : new StaticFromTimestampStartingScanner(snapshotManager, startupMillis); case FROM_FILE_CREATION_TIME: @@ -164,7 +162,6 @@ protected StartingScanner createStartingScanner(boolean isStreaming) { ? new ContinuousFromSnapshotStartingScanner( snapshotManager, options.scanSnapshotId(), - options.changelogProducer() != ChangelogProducer.NONE, options.changelogLifecycleDecoupled()) : new StaticFromSnapshotStartingScanner( snapshotManager, options.scanSnapshotId()); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java index 3e03dc0a1b81..8ef8f4e78942 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerStreamTableScanImpl.java @@ -80,10 +80,7 @@ public InnerStreamTableScanImpl( this.supportStreamingReadOverwrite = supportStreamingReadOverwrite; this.defaultValueAssigner = defaultValueAssigner; this.nextSnapshotProvider = - new NextSnapshotFetcher( - snapshotManager, - options.changelogLifecycleDecoupled(), - options.changelogProducer() != CoreOptions.ChangelogProducer.NONE); + new NextSnapshotFetcher(snapshotManager, options.changelogLifecycleDecoupled()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java index 38c01f35d245..d8e614222857 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromSnapshotStartingScanner.java @@ -27,18 +27,13 @@ */ public class ContinuousFromSnapshotStartingScanner extends AbstractStartingScanner { - private final boolean changelogAsFollowup; private final boolean changelogDecoupled; public ContinuousFromSnapshotStartingScanner( - SnapshotManager snapshotManager, - long snapshotId, - boolean changelogAsFollowup, - boolean changelogDecoupled) { + SnapshotManager snapshotManager, long snapshotId, boolean changelogDecoupled) { super(snapshotManager); this.startingSnapshotId = snapshotId; this.changelogDecoupled = changelogDecoupled; - this.changelogAsFollowup = changelogAsFollowup; } @Override @@ -54,7 +49,7 @@ public Result scan(SnapshotReader snapshotReader) { private Long getEarliestId() { Long earliestId; - if (changelogAsFollowup && changelogDecoupled) { + if (changelogDecoupled) { Long earliestChangelogId = snapshotManager.earliestLongLivedChangelogId(); earliestId = earliestChangelogId == null diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java index 7e39e0859781..941174835537 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScanner.java @@ -37,13 +37,10 @@ public class ContinuousFromTimestampStartingScanner extends AbstractStartingScan private final boolean startFromChangelog; public ContinuousFromTimestampStartingScanner( - SnapshotManager snapshotManager, - long startupMillis, - boolean changelogAsFollowup, - boolean changelogDecoupled) { + SnapshotManager snapshotManager, long startupMillis, boolean changelogDecoupled) { super(snapshotManager); this.startupMillis = startupMillis; - this.startFromChangelog = changelogAsFollowup && changelogDecoupled; + this.startFromChangelog = changelogDecoupled; this.startingSnapshotId = this.snapshotManager.earlierThanTimeMills(startupMillis, startFromChangelog); } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/NextSnapshotFetcher.java b/paimon-core/src/main/java/org/apache/paimon/utils/NextSnapshotFetcher.java index 87f1fb84984c..021673950d9c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/NextSnapshotFetcher.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/NextSnapshotFetcher.java @@ -32,16 +32,10 @@ public class NextSnapshotFetcher { public static final Logger LOG = LoggerFactory.getLogger(NextSnapshotFetcher.class); private final SnapshotManager snapshotManager; private final boolean changelogDecoupled; - // Only support changelog as follow-up now. - private final boolean changelogAsFollowup; - public NextSnapshotFetcher( - SnapshotManager snapshotManager, - boolean changelogDecoupled, - boolean changelogAsFollowup) { + public NextSnapshotFetcher(SnapshotManager snapshotManager, boolean changelogDecoupled) { this.snapshotManager = snapshotManager; this.changelogDecoupled = changelogDecoupled; - this.changelogAsFollowup = changelogAsFollowup; } @Nullable @@ -59,9 +53,7 @@ public Snapshot getNextSnapshot(long nextSnapshotId) { return null; } - if (!changelogAsFollowup - || !changelogDecoupled - || !snapshotManager.longLivedChangelogExists(nextSnapshotId)) { + if (!changelogDecoupled || !snapshotManager.longLivedChangelogExists(nextSnapshotId)) { throw new OutOfRangeException( String.format( "The snapshot with id %d has expired. You can: " diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java index 6adc3aff04f8..ab90c19fa8dd 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java @@ -26,6 +26,8 @@ import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.IndexIncrement; +import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.manifest.ManifestFileMeta; @@ -46,9 +48,7 @@ import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.CatalogEnvironment; -import org.apache.paimon.table.ExpireChangelogImpl; import org.apache.paimon.table.ExpireSnapshots; -import org.apache.paimon.table.ExpireSnapshotsImpl; import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.ScanMode; @@ -160,44 +160,58 @@ public ExpireSnapshots newExpire( numRetainedMin, numRetainedMax, millisRetained, - snapshotExpireCleanEmptyDirectories, - false); + numRetainedMin, + numRetainedMax, + millisRetained, + snapshotExpireCleanEmptyDirectories); } public ExpireSnapshots newExpire(int numRetainedMin, int numRetainedMax, long millisRetained) { - return newExpire(numRetainedMin, numRetainedMax, millisRetained, true, false); + return newExpire(numRetainedMin, numRetainedMax, millisRetained, true); } public ExpireSnapshots newExpire( - int numRetainedMin, - int numRetainedMax, - long millisRetained, - boolean snapshotExpireCleanEmptyDirectories, - boolean changelogDecoupled) { - return new ExpireSnapshotsImpl( - snapshotManager(), - newSnapshotDeletion(), - new TagManager(fileIO, options.path()), - snapshotExpireCleanEmptyDirectories, - changelogDecoupled) - .retainMax(numRetainedMax) - .retainMin(numRetainedMin) - .olderThanMills(System.currentTimeMillis() - millisRetained); - } - - public ExpireSnapshots newChangelogExpire( - int numRetainedMin, - int numRetainedMax, - long millisRetained, + int snapshotNumRetainedMin, + int snapshotNumRetainedMax, + long snapshotMillisRetained, + int changelogNumRetainedMin, + int changelogNumRetainedMax, + long changelogMillisRetained, boolean snapshotExpireCleanEmptyDirectories) { - return new ExpireChangelogImpl( - snapshotManager(), - new TagManager(fileIO, options.path()), - newSnapshotDeletion(), - snapshotExpireCleanEmptyDirectories) - .retainMin(numRetainedMin) - .retainMax(numRetainedMax) - .olderThanMills(System.currentTimeMillis() - millisRetained); + Map origin = new HashMap<>(options.toMap()); + origin.put( + CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), + String.valueOf(snapshotNumRetainedMax)); + origin.put( + CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), + String.valueOf(snapshotNumRetainedMin)); + origin.put( + CoreOptions.SNAPSHOT_TIME_RETAINED.key(), + String.format("%dms", snapshotMillisRetained)); + origin.put( + CoreOptions.CHANGELOG_NUM_RETAINED_MAX.key(), + String.valueOf(changelogNumRetainedMax)); + origin.put( + CoreOptions.CHANGELOG_NUM_RETAINED_MIN.key(), + String.valueOf(changelogNumRetainedMin)); + origin.put( + CoreOptions.CHANGELOG_TIME_RETAINED.key(), + String.format("%dms", changelogMillisRetained)); + + CoreOptions newOption = new CoreOptions(origin); + return new ExpireSnapshots.Expire( + snapshotManager(), + newSnapshotDeletion(newOption), + newChangelogDeletion(newOption), + new TagManager(fileIO, options.path()), + snapshotExpireCleanEmptyDirectories, + newOption.snapshotNumRetainMax(), + newOption.snapshotNumRetainMin(), + newOption.snapshotTimeRetain().toMillis(), + newOption.changelogNumRetainMax(), + newOption.changelogNumRetainMin(), + newOption.changelogTimeRetain().toMillis(), + Integer.MAX_VALUE); } public List commitData( @@ -608,6 +622,12 @@ private static Set getSnapshotFileInUse( FileStorePathFactory pathFactory, ManifestList manifestList) { Set result = new HashSet<>(); + SchemaManager schemaManager = new SchemaManager(fileIO, snapshotManager.tablePath()); + CoreOptions options = new CoreOptions(schemaManager.latest().get().options()); + boolean produceChangelog = + options.changelogProducer() != CoreOptions.ChangelogProducer.NONE; + // The option from the table may not align with the expiration config + boolean changelogDecoupled = snapshotManager.earliestLongLivedChangelogId() != null; Path snapshotPath = snapshotManager.snapshotPath(snapshotId); Snapshot snapshot = Snapshot.fromPath(fileIO, snapshotPath); @@ -635,6 +655,27 @@ private static Set getSnapshotFileInUse( entry.file().fileName())); } + // Add 'DELETE' 'APPEND' file in snapshot + // These 'delete' files can be merged by the plan#splits, + // so it's not shown in the entries above. + // In other words, these files are not used (by snapshot or changelog) now, + // but it can only be cleaned after this snapshot expired, so we should add it to the file + // use list. + if (changelogDecoupled && !produceChangelog) { + entries = scan.withManifestList(snapshot.deltaManifests(manifestList)).plan().files(); + for (ManifestEntry entry : entries) { + // append delete file are delayed to delete + if (entry.kind() == FileKind.DELETE + && entry.file().fileSource().orElse(FileSource.APPEND) + == FileSource.APPEND) { + result.add( + new Path( + pathFactory.bucketPath(entry.partition(), entry.bucket()), + entry.file().fileName())); + } + } + } + return result; } @@ -646,6 +687,10 @@ private static Set getChangelogFileInUse( FileStorePathFactory pathFactory, ManifestList manifestList) { Set result = new HashSet<>(); + SchemaManager schemaManager = new SchemaManager(fileIO, snapshotManager.tablePath()); + CoreOptions options = new CoreOptions(schemaManager.latest().get().options()); + boolean produceChangelog = + options.changelogProducer() != CoreOptions.ChangelogProducer.NONE; Path changelogPath = snapshotManager.longLivedChangelogPath(changelogId); Changelog changelog = Changelog.fromPath(fileIO, changelogPath); @@ -654,24 +699,51 @@ private static Set getChangelogFileInUse( result.add(changelogPath); // manifest lists + if (!produceChangelog) { + result.add(pathFactory.toManifestListPath(changelog.baseManifestList())); + result.add(pathFactory.toManifestListPath(changelog.deltaManifestList())); + } if (changelog.changelogManifestList() != null) { result.add(pathFactory.toManifestListPath(changelog.changelogManifestList())); } // manifests - List manifests = new ArrayList<>(); - manifests.addAll(changelog.changelogManifests(manifestList)); + List manifests = + new ArrayList<>(changelog.changelogManifests(manifestList)); + if (!produceChangelog) { + manifests.addAll(changelog.dataManifests(manifestList)); + } manifests.forEach(m -> result.add(pathFactory.toManifestFilePath(m.fileName()))); - // data file - List entries = scan.withManifestList(manifests).plan().files(); - for (ManifestEntry entry : entries) { - result.add( - new Path( - pathFactory.bucketPath(entry.partition(), entry.bucket()), - entry.file().fileName())); + // not all manifests contains useful data file + // (1) produceChangelog = 'true': data file in changelog manifests + // (2) produceChangelog = 'false': 'APPEND' data file in delta manifests + + // delta file + if (!produceChangelog) { + for (ManifestEntry entry : + scan.withManifestList(changelog.deltaManifests(manifestList)).plan().files()) { + if (entry.file().fileSource().orElse(FileSource.APPEND) == FileSource.APPEND) { + result.add( + new Path( + pathFactory.bucketPath(entry.partition(), entry.bucket()), + entry.file().fileName())); + } + } + } else { + // changelog + for (ManifestEntry entry : + scan.withManifestList(changelog.changelogManifests(manifestList)) + .plan() + .files()) { + result.add( + new Path( + pathFactory.bucketPath(entry.partition(), entry.bucket()), + entry.file().fileName())); + } } + return result; } diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java index 21b8825f7a86..f94679ff9a8c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java @@ -187,6 +187,7 @@ private DataFileMeta newFile(long fileSize) { 0, 0, 0L, + null, null); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java index 9211cfe60591..96eb666b0fa4 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java @@ -34,6 +34,7 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.memory.HeapMemorySegmentPool; import org.apache.paimon.memory.MemoryPoolFactory; import org.apache.paimon.options.MemorySize; @@ -646,6 +647,7 @@ private DataFileMeta generateCompactAfter(List toCompact) throws I }), minSeq, maxSeq, - toCompact.get(0).schemaId()); + toCompact.get(0).schemaId(), + FileSource.APPEND); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java index 3eff9b7cdf33..cc22c4f9b164 100644 --- a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java @@ -139,6 +139,7 @@ private static DataFileMeta newFile(long timeMillis) { .atZone(ZoneId.systemDefault()) .toLocalDateTime()), 0L, + null, null); } diff --git a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java index 3b4921c88026..84133a4fd31d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java @@ -162,6 +162,7 @@ private Data createDataFile(List kvs, int level, BinaryRow partition, 0, level, kvs.stream().filter(kv -> kv.valueKind().isRetract()).count(), + null, null), kvs); } diff --git a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java index d5f6ceda32a9..5423ffaf6033 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java @@ -52,6 +52,7 @@ public static DataFileMeta newFile(long minSeq, long maxSeq) { Collections.emptyList(), Timestamp.fromEpochMillis(100), maxSeq - minSeq + 1, + null, null); } @@ -69,6 +70,7 @@ public static DataFileMeta newFile() { 0, 0, 0L, + null, null); } @@ -92,6 +94,7 @@ public static DataFileMeta newFile( 0, level, deleteRowCount, + null, null); } diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java index f049d07a1e8b..a6918276db4c 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java @@ -101,7 +101,7 @@ private void testWriteAndReadDataFileImpl(String format) throws Exception { DataFileMetaSerializer serializer = new DataFileMetaSerializer(); RollingFileWriter writer = - writerFactory.createRollingMergeTreeFileWriter(0); + writerFactory.createRollingMergeTreeFileWriter(0, false); writer.write(CloseableIterator.fromList(data.content, kv -> {})); writer.close(); List actualMetas = writer.result(); @@ -130,7 +130,8 @@ public void testCleanUpForException() throws IOException { FailingFileIO.getFailingPath(failingName, tempDir.toString()), "avro"); try { - FileWriter writer = writerFactory.createRollingMergeTreeFileWriter(0); + FileWriter writer = + writerFactory.createRollingMergeTreeFileWriter(0, false); writer.write(CloseableIterator.fromList(data.content, kv -> {})); } catch (Throwable e) { if (e.getCause() != null) { @@ -154,7 +155,7 @@ public void testKeyProjection() throws Exception { DataFileMetaSerializer serializer = new DataFileMetaSerializer(); RollingFileWriter writer = - writerFactory.createRollingMergeTreeFileWriter(0); + writerFactory.createRollingMergeTreeFileWriter(0, false); writer.write(CloseableIterator.fromList(data.content, kv -> {})); writer.close(); List actualMetas = writer.result(); @@ -192,7 +193,7 @@ public void testValueProjection() throws Exception { DataFileMetaSerializer serializer = new DataFileMetaSerializer(); RollingFileWriter writer = - writerFactory.createRollingMergeTreeFileWriter(0); + writerFactory.createRollingMergeTreeFileWriter(0, false); writer.write(CloseableIterator.fromList(data.content, kv -> {})); writer.close(); List actualMetas = writer.result(); @@ -397,7 +398,7 @@ public void testReaderUseFileSizeFromMetadata(String format) throws Exception { DataFileMetaSerializer serializer = new DataFileMetaSerializer(); RollingFileWriter writer = - writerFactory.createRollingMergeTreeFileWriter(0); + writerFactory.createRollingMergeTreeFileWriter(0, false); writer.write(CloseableIterator.fromList(data.content, kv -> {})); writer.close(); List actualMetas = writer.result(); diff --git a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java index 1801c2dd0125..81abcedd8470 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java @@ -89,7 +89,8 @@ public void initialize(String identifier) { StatsCollectorFactories.createStatsFactories( new CoreOptions(new HashMap<>()), SCHEMA.getFieldNames()), - new FileIndexOptions()), + new FileIndexOptions(), + false), TARGET_FILE_SIZE); } diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java index 473b333a6740..b23ae1550fc2 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java @@ -116,6 +116,7 @@ public static DataFileMeta newFile(int name, int level) { 0, level, 0L, + null, null); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java index 9120e7b6c900..ef438a1b213a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java @@ -83,8 +83,8 @@ protected ManifestEntry makeEntry(boolean isAdd, String fileName, Integer partit Collections.emptyList(), Timestamp.fromEpochMillis(200000), 0L, // not used - null // not used - )); + null, // not used + null)); } protected ManifestFileMeta makeManifest(ManifestEntry... entries) { @@ -245,6 +245,7 @@ public static ManifestEntry makeEntry( 0, // not used 0, // not used 0L, + null, null)); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java index 9ec0e38e8d10..91bb4cd2beea 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java @@ -206,7 +206,7 @@ private KeyValue kv(int key, int value) { private DataFileMeta newFile(int level, KeyValue... records) throws IOException { RollingFileWriter writer = - createWriterFactory().createRollingMergeTreeFileWriter(level); + createWriterFactory().createRollingMergeTreeFileWriter(level, false); for (KeyValue kv : records) { writer.write(kv); } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java index 4763585b00a4..922c27aa0c66 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java @@ -81,6 +81,7 @@ public static DataFileMeta newFile(int level) { 0, level, 0L, + null, null); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java index d38b516fcf4f..d915e247a2ae 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java @@ -286,7 +286,7 @@ private KeyValue kv(int key, int value, long seqNumber) { private DataFileMeta newFile(int level, KeyValue... records) throws IOException { RollingFileWriter writer = - createWriterFactory().createRollingMergeTreeFileWriter(level); + createWriterFactory().createRollingMergeTreeFileWriter(level, false); for (KeyValue kv : records) { writer.write(kv); } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java index 3f9bd3e6ea85..c4f55924b4d9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java @@ -608,7 +608,7 @@ public CompactResult rewrite( int outputLevel, boolean dropDelete, List> sections) throws Exception { RollingFileWriter writer = - writerFactory.createRollingMergeTreeFileWriter(outputLevel); + writerFactory.createRollingMergeTreeFileWriter(outputLevel, false); RecordReader reader = MergeTreeReaders.readerForMergeTree( sections, diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java index 89007a33a61c..2fe89684eeeb 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java @@ -182,6 +182,7 @@ private DataFileMeta makeInterval(int left, int right) { Collections.emptyList(), Timestamp.fromEpochMillis(100000), 0L, + null, null); } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java index 0d891f2c78d3..25d263a93102 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java @@ -20,6 +20,7 @@ import org.apache.paimon.compact.CompactUnit; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.mergetree.LevelSortedRun; import org.apache.paimon.mergetree.SortedRun; @@ -357,6 +358,7 @@ private LevelSortedRun level(int level, long size) { } static DataFileMeta file(long size) { - return new DataFileMeta("", size, 1, null, null, null, null, 0, 0, 0, 0, 0L, null); + return new DataFileMeta( + "", size, 1, null, null, null, null, 0, 0, 0, 0, 0L, null, FileSource.APPEND); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java index 626c13c018fc..d2dc888b0cc6 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java @@ -30,6 +30,7 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction; import org.apache.paimon.schema.Schema; @@ -82,7 +83,7 @@ public void beforeEach() throws Exception { TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(), TestKeyValueGenerator.getPrimaryKeys( TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED), - Collections.emptyMap(), + store.options().toMap(), null)); } @@ -175,7 +176,9 @@ public void testMixedSnapshotAndTagDeletion() throws Exception { @Test public void testExpireExtraFiles() throws IOException { - ExpireSnapshotsImpl expire = (ExpireSnapshotsImpl) store.newExpire(1, 3, Long.MAX_VALUE); + ExpireSnapshotsImpl expire = + ((ExpireSnapshots.Expire) store.newExpire(1, 3, Long.MAX_VALUE)) + .getExpireSnapshots(); // write test files BinaryRow partition = gen.getPartition(gen.next()); @@ -205,7 +208,8 @@ public void testExpireExtraFiles() throws IOException { extraFiles, Timestamp.now(), 0L, - null); + null, + FileSource.APPEND); ManifestEntry add = new ManifestEntry(FileKind.ADD, partition, 0, 1, dataFile); ManifestEntry delete = new ManifestEntry(FileKind.DELETE, partition, 0, 1, dataFile); @@ -337,8 +341,9 @@ public void testExpireWithTime() throws Exception { commit(5, allData, snapshotPositions); long expireMillis = System.currentTimeMillis(); // expire twice to check for idempotence - expire.olderThanMills(expireMillis - 1000).expire(); - expire.olderThanMills(expireMillis - 1000).expire(); + + expire.expire(); + expire.expire(); int latestSnapshotId = requireNonNull(snapshotManager.latestSnapshotId()).intValue(); for (int i = 1; i <= latestSnapshotId; i++) { @@ -399,9 +404,9 @@ public void testExpireWithUpgradedFile() throws Exception { public void testChangelogOutLivedSnapshot() throws Exception { List allData = new ArrayList<>(); List snapshotPositions = new ArrayList<>(); - commit(10, allData, snapshotPositions); - ExpireSnapshots snapshot = store.newExpire(1, 2, Long.MAX_VALUE, true, true); - ExpireSnapshots changelog = store.newChangelogExpire(1, 3, Long.MAX_VALUE, true); + commit(40, allData, snapshotPositions); + ExpireSnapshots snapshot = + store.newExpire(1, 10, Long.MAX_VALUE, 1, Integer.MAX_VALUE, Long.MAX_VALUE, true); // expire twice to check for idempotence snapshot.expire(); snapshot.expire(); @@ -412,21 +417,25 @@ public void testChangelogOutLivedSnapshot() throws Exception { int earliestLongLivedChangelogId = snapshotManager.earliestLongLivedChangelogId().intValue(); - // 2 snapshot in /snapshot - assertThat(latestSnapshotId - earliestSnapshotId).isEqualTo(1); + // 10 snapshot in /snapshot dir + assertThat(latestSnapshotId - earliestSnapshotId).isEqualTo(9); assertThat(earliestLongLivedChangelogId).isEqualTo(1); // The changelog id and snapshot id is continuous assertThat(earliestSnapshotId - latestLongLivedChangelogId).isEqualTo(1); + for (int i = earliestLongLivedChangelogId; i <= latestLongLivedChangelogId; i++) { + // assert changelog id is continuous + assertThat(snapshotManager.longLivedChangelogExists(i)).isTrue(); + } - changelog.expire(); - changelog.expire(); + snapshot = store.newExpire(1, 10, Long.MAX_VALUE, 1, 20, Integer.MAX_VALUE, true); + snapshot.expire(); assertThat(snapshotManager.latestSnapshotId().intValue()).isEqualTo(latestSnapshotId); assertThat(snapshotManager.earliestSnapshotId().intValue()).isEqualTo(earliestSnapshotId); assertThat(snapshotManager.latestLongLivedChangelogId()) .isEqualTo(snapshotManager.earliestSnapshotId() - 1); assertThat(snapshotManager.earliestLongLivedChangelogId()) - .isEqualTo(snapshotManager.earliestSnapshotId() - 1); + .isEqualTo(snapshotManager.earliestSnapshotId() - 10); store.assertCleaned(); } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java index c5bd22dc52bf..8265746932d8 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/OrphanFilesCleanTest.java @@ -59,6 +59,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; import java.util.ArrayList; @@ -328,14 +330,16 @@ private void validateSnapshot(Snapshot snapshot, List data) throws Exc assertThat(result).containsExactlyInAnyOrderElementsOf(TestPojo.formatData(data)); } - @Test - public void testCleanOrphanFilesWithChangelogDecoupled() throws Exception { + @ValueSource(strings = {"none", "input"}) + @ParameterizedTest(name = "changelog-producer = {0}") + public void testCleanOrphanFilesWithChangelogDecoupled(String changelogProducer) + throws Exception { // recreate the table with another option this.write.close(); this.commit.close(); int commitTimes = 30; Options options = new Options(); - options.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.INPUT); + options.setString(CoreOptions.CHANGELOG_PRODUCER.key(), changelogProducer); options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 15); options.set(CoreOptions.CHANGELOG_NUM_RETAINED_MAX, 20); FileStoreTable table = createFileStoreTable(rowType, options); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java index b7117cf0a600..fe93b0f569de 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java @@ -59,11 +59,14 @@ private Pair createRow(int partition, int bucket, int key, @Test public void testIndexFileExpiration() throws Exception { prepareExpireTable(); - ExpireSnapshotsImpl expire = (ExpireSnapshotsImpl) table.newExpireSnapshots(); long indexFileSize = indexFileSize(); long indexManifestSize = indexManifestSize(); + ExpireSnapshotsImpl expire = + ((ExpireSnapshots.Expire) table.newExpireSnapshots(table.coreOptions())) + .getExpireSnapshots(); + expire.expireUntil(1, 2); checkIndexFiles(2); assertThat(indexFileSize()).isEqualTo(indexFileSize - 1); @@ -88,7 +91,9 @@ public void testIndexFileExpiration() throws Exception { @Test public void testIndexFileExpirationWithTag() throws Exception { prepareExpireTable(); - ExpireSnapshotsImpl expire = (ExpireSnapshotsImpl) table.newExpireSnapshots(); + ExpireSnapshotsImpl expire = + ((ExpireSnapshots.Expire) table.newExpireSnapshots(table.coreOptions())) + .getExpireSnapshots(); table.createTag("tag3", 3); table.createTag("tag5", 5); @@ -114,7 +119,9 @@ public void testIndexFileExpirationWithTag() throws Exception { @Test public void testIndexFileExpirationWhenDeletingTag() throws Exception { prepareExpireTable(); - ExpireSnapshotsImpl expire = (ExpireSnapshotsImpl) table.newExpireSnapshots(); + ExpireSnapshotsImpl expire = + ((ExpireSnapshots.Expire) table.newExpireSnapshots(table.coreOptions())) + .getExpireSnapshots(); table.createTag("tag3", 3); table.createTag("tag5", 5); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java index 43d338eb8fc3..23f46f52a731 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java @@ -65,6 +65,8 @@ import org.apache.paimon.utils.CompatibilityTestUtils; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.File; import java.nio.file.Files; @@ -1456,15 +1458,15 @@ public void testLookupWithDropDelete() throws Exception { "1|2|200|binary|varbinary|mapKey:mapVal|multiset")); } - @Test - public void testRollbackToTagWithChangelogDecoupled() throws Exception { + @ParameterizedTest(name = "changelog-producer = {0}") + @ValueSource(strings = {"none", "input"}) + public void testRollbackToTagWithChangelogDecoupled(String changelogProducer) throws Exception { int commitTimes = ThreadLocalRandom.current().nextInt(100) + 6; FileStoreTable table = createFileStoreTable( options -> - options.set( - CoreOptions.CHANGELOG_PRODUCER, - CoreOptions.ChangelogProducer.INPUT)); + options.setString( + CoreOptions.CHANGELOG_PRODUCER.key(), changelogProducer)); prepareRollbackTable(commitTimes, table); int t1 = 1; diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java index 8223afa5f576..8e44be195aa9 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java @@ -20,6 +20,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.table.BucketMode; import org.apache.paimon.utils.Pair; @@ -56,7 +57,8 @@ public static DataFileMeta newFileFromSequence( 0, 0, 0L, - null); + null, + FileSource.APPEND); } @Test diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java index 51c2c10e7260..46093f2cba0d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousFromTimestampStartingScannerTest.java @@ -29,6 +29,8 @@ import org.apache.paimon.utils.TraceableFileIO; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.util.UUID; @@ -65,8 +67,7 @@ public void testScan() throws Exception { long timestamp = snapshotManager.snapshot(3).timeMillis(); ContinuousFromTimestampStartingScanner scanner = - new ContinuousFromTimestampStartingScanner( - snapshotManager, timestamp, false, false); + new ContinuousFromTimestampStartingScanner(snapshotManager, timestamp, false); StartingScanner.NextSnapshot result = (StartingScanner.NextSnapshot) scanner.scan(snapshotReader); assertThat(result.nextSnapshotId()).isEqualTo(3); @@ -80,7 +81,7 @@ public void testNoSnapshot() { SnapshotManager snapshotManager = table.snapshotManager(); ContinuousFromTimestampStartingScanner scanner = new ContinuousFromTimestampStartingScanner( - snapshotManager, System.currentTimeMillis(), false, false); + snapshotManager, System.currentTimeMillis(), false); assertThat(scanner.scan(snapshotReader)).isInstanceOf(StartingScanner.NoSnapshot.class); } @@ -100,8 +101,7 @@ public void testNoSnapshotBeforeTimestamp() throws Exception { long timestamp = snapshotManager.snapshot(1).timeMillis(); ContinuousFromTimestampStartingScanner scanner = - new ContinuousFromTimestampStartingScanner( - snapshotManager, timestamp, false, false); + new ContinuousFromTimestampStartingScanner(snapshotManager, timestamp, false); StartingScanner.NextSnapshot result = (StartingScanner.NextSnapshot) scanner.scan(snapshotReader); // next snapshot @@ -111,14 +111,15 @@ public void testNoSnapshotBeforeTimestamp() throws Exception { commit.close(); } - @Test - public void testScanFromChangelog() throws Exception { + @ParameterizedTest(name = "changelog-producer = {0}") + @ValueSource(strings = {"none", "input"}) + public void testScanFromChangelog(String changelogProducer) throws Exception { Options options = new Options(); options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, 2); options.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, 1); options.set(CoreOptions.CHANGELOG_NUM_RETAINED_MAX, 20); options.set(CoreOptions.CHANGELOG_NUM_RETAINED_MIN, 10); - options.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.INPUT); + options.setString(CoreOptions.CHANGELOG_PRODUCER.key(), changelogProducer); FileStoreTable table = createFileStoreTable( true, @@ -158,20 +159,20 @@ public void testScanFromChangelog() throws Exception { ContinuousFromTimestampStartingScanner scanner = new ContinuousFromTimestampStartingScanner( - snapshotManager, snapshotManager.snapshot(3).timeMillis(), true, true); + snapshotManager, snapshotManager.snapshot(3).timeMillis(), true); StartingScanner.NextSnapshot result = (StartingScanner.NextSnapshot) scanner.scan(snapshotReader); assertThat(result.nextSnapshotId()).isEqualTo(3); scanner = new ContinuousFromTimestampStartingScanner( - snapshotManager, snapshotManager.snapshot(2).timeMillis(), true, true); + snapshotManager, snapshotManager.snapshot(2).timeMillis(), true); assertThat(((StartingScanner.NextSnapshot) scanner.scan(snapshotReader)).nextSnapshotId()) .isEqualTo(2); scanner = new ContinuousFromTimestampStartingScanner( - snapshotManager, snapshotManager.changelog(1).timeMillis(), true, true); + snapshotManager, snapshotManager.changelog(1).timeMillis(), true); assertThat(((StartingScanner.NextSnapshot) scanner.scan(snapshotReader)).nextSnapshotId()) .isEqualTo(1); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedure.java index 32a0fcc5b9a2..4823ed63b2d8 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedure.java @@ -18,10 +18,15 @@ package org.apache.paimon.flink.procedure; +import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.schema.SchemaValidation; import org.apache.flink.table.procedure.ProcedureContext; +import java.util.HashMap; +import java.util.Map; + /** A procedure to expire snapshots. */ public class ExpireSnapshotsProcedure extends ProcedureBase { @@ -32,8 +37,14 @@ public String identifier() { public String[] call(ProcedureContext procedureContext, String tableId, int retainMax) throws Catalog.TableNotExistException { - return new String[] { - table(tableId).newExpireSnapshots().retainMax(retainMax).expire() + "" - }; + Map options = new HashMap<>(table(tableId).options()); + options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), String.valueOf(retainMax)); + if (!options.containsKey(CoreOptions.CHANGELOG_NUM_RETAINED_MAX.key())) { + options.put(CoreOptions.CHANGELOG_NUM_RETAINED_MAX.key(), String.valueOf(retainMax)); + } + CoreOptions newOption = new CoreOptions(options); + SchemaValidation.validateSnapshotAndChangelogRetainOption(newOption); + + return new String[] {String.valueOf(table(tableId).newExpireSnapshots(newOption).expire())}; } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java index 1fbed646ecf1..8fa66ea399ab 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java @@ -33,6 +33,8 @@ import org.apache.flink.util.CloseableIterator; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.util.ArrayList; import java.util.Arrays; @@ -597,16 +599,18 @@ public void testScanFromOldSchema() throws Exception { assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of(3, "c")); } - @Test - public void testScanFromChangelog() throws Exception { + @ParameterizedTest(name = "changelog-producer = {0}") + @ValueSource(strings = {"none", "input"}) + public void testScanFromChangelog(String changelogProducer) throws Exception { batchSql( "CREATE TABLE IF NOT EXISTS T3 (a STRING, b STRING, c STRING, PRIMARY KEY (a) NOT ENFORCED)\n" - + " WITH ('changelog-producer'='input', 'bucket' = '1', \n" + + " WITH ('changelog-producer'='%s', 'bucket' = '1', \n" + " 'snapshot.num-retained.max' = '2',\n" + " 'snapshot.num-retained.min' = '1',\n" + " 'changelog.num-retained.max' = '3',\n" + " 'changelog.num-retained.min' = '1'\n" - + ")"); + + ")", + changelogProducer); batchSql("INSERT INTO T3 VALUES ('1', '2', '3')"); batchSql("INSERT INTO T3 VALUES ('4', '5', '6')"); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedureITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedureITCase.java new file mode 100644 index 000000000000..8a6503ff8557 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/ExpireSnapshotsProcedureITCase.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.flink.CatalogITCaseBase; +import org.apache.paimon.table.FileStoreTable; + +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +/** IT Case for {@link ExpireSnapshotsProcedure}. */ +public class ExpireSnapshotsProcedureITCase extends CatalogITCaseBase { + + @Test + public void testExpire() throws Exception { + sql( + "CREATE TABLE T (" + + " k INT," + + " v INT," + + " PRIMARY KEY (k) NOT ENFORCED" + + ") WITH (" + + " 'bucket' = '1'" + + ")"); + FileStoreTable table = paimonTable("T"); + + for (int i = 0; i < 15; i++) { + sql("INSERT INTO T VALUES (1, 1)"); + } + + sql("CALL sys.expire_snapshots('default.T', 10)"); + Assertions.assertThat( + table.snapshotManager() + .fileIO() + .exists(table.snapshotManager().changelogDirectory())) + .isFalse(); + + sql( + "CREATE TABLE T_1 (" + + " k INT," + + " v INT," + + " PRIMARY KEY (k) NOT ENFORCED" + + ") WITH (" + + " 'bucket' = '1'," + + " 'snapshot.num-retained.max' = '12'," + + " 'changelog.num-retained.max' = '12'" + + ")"); + table = paimonTable("T_1"); + + for (int i = 0; i < 15; i++) { + sql("INSERT INTO T_1 VALUES (1, 1)"); + } + Assertions.assertThat(table.snapshotManager().changelogs().hasNext()).isFalse(); + + sql("CALL sys.expire_snapshots('default.T_1', 10)"); + Assertions.assertThat(table.snapshotManager().changelogs().hasNext()).isTrue(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java index 42c125fbbf83..09d95f6cd4c5 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java @@ -21,6 +21,7 @@ import org.apache.paimon.append.AppendOnlyCompactionTask; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.table.sink.CompactionTaskSerializer; import org.junit.jupiter.api.Test; @@ -76,6 +77,7 @@ private DataFileMeta newFile() { 0, 0, 0L, - null); + null, + FileSource.APPEND); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java index ef364a1ed278..8aef032e7ad1 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.source; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.stats.StatsTestUtils; import org.apache.paimon.table.source.DataFilePlan; import org.apache.paimon.table.source.DataSplit; @@ -113,8 +114,8 @@ private DataSplit dataSplit(int partition, int bucket, String... fileNames) { 0, // not used 0, // not used 0L, // not used - null // not used - )); + null, // not used + FileSource.APPEND)); } return DataSplit.builder() .withSnapshot(1) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java index 8d7d4c04e172..839a9f3152a0 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java @@ -20,6 +20,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.stats.StatsTestUtils; import org.apache.paimon.table.source.DataSplit; @@ -86,7 +87,8 @@ public static DataFileMeta newFile(int level) { 0, level, 0L, - null); + null, + FileSource.APPEND); } public static FileStoreSourceSplit newSourceSplit( diff --git a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedure.java b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedure.java index 98fc6548a723..4ee4f12b80cf 100644 --- a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedure.java +++ b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedure.java @@ -18,6 +18,8 @@ package org.apache.paimon.spark.procedure; +import org.apache.paimon.CoreOptions; +import org.apache.paimon.schema.SchemaValidation; import org.apache.paimon.table.ExpireSnapshots; import org.apache.spark.sql.catalyst.InternalRow; @@ -27,6 +29,9 @@ import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; +import java.util.HashMap; +import java.util.Map; + import static org.apache.spark.sql.types.DataTypes.IntegerType; import static org.apache.spark.sql.types.DataTypes.StringType; import static org.apache.spark.sql.types.DataTypes.TimestampType; @@ -74,19 +79,46 @@ public InternalRow[] call(InternalRow args) { return modifyPaimonTable( tableIdent, table -> { - ExpireSnapshots expireSnapshots = table.newExpireSnapshots(); + Map options = new HashMap<>(table.options()); if (retainMax != null) { - expireSnapshots.retainMax(retainMax); + options.put( + CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), + String.valueOf(retainMax)); + if (!options.containsKey(CoreOptions.CHANGELOG_NUM_RETAINED_MAX.key())) { + options.put( + CoreOptions.CHANGELOG_NUM_RETAINED_MAX.key(), + String.valueOf(retainMax)); + } } if (retainMin != null) { - expireSnapshots.retainMin(retainMin); + options.put( + CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), + String.valueOf(retainMin)); + if (!options.containsKey(CoreOptions.CHANGELOG_NUM_RETAINED_MIN.key())) { + options.put( + CoreOptions.CHANGELOG_NUM_RETAINED_MIN.key(), + String.valueOf(retainMin)); + } } if (olderThanMills != null) { - expireSnapshots.olderThanMills(olderThanMills); + long snapshotRetainMs = System.currentTimeMillis() - olderThanMills; + options.put( + CoreOptions.SNAPSHOT_TIME_RETAINED.key(), + String.format("%dms", snapshotRetainMs)); + if (!options.containsKey(CoreOptions.CHANGELOG_TIME_RETAINED.key())) { + options.put( + CoreOptions.CHANGELOG_TIME_RETAINED.key(), + String.format("%dms", snapshotRetainMs)); + } } if (maxDeletes != null) { - expireSnapshots.maxDeletes(maxDeletes); + options.put( + CoreOptions.SNAPSHOT_EXPIRE_LIMIT.key(), + String.valueOf(maxDeletes)); } + CoreOptions newOption = new CoreOptions(options); + SchemaValidation.validateSnapshotAndChangelogRetainOption(newOption); + ExpireSnapshots expireSnapshots = table.newExpireSnapshots(newOption); int deleted = expireSnapshots.expire(); return new InternalRow[] {newInternalRow(deleted)}; }); diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala index c6bda9e7cb0e..cc1591b03462 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala @@ -21,6 +21,7 @@ package org.apache.paimon.spark import org.apache.paimon.CoreOptions import org.apache.paimon.data.BinaryRow import org.apache.paimon.io.DataFileMeta +import org.apache.paimon.manifest.FileSource import org.apache.paimon.table.source.{DataSplit, RawFile, Split} import org.junit.jupiter.api.Assertions @@ -42,7 +43,7 @@ class ScanHelperTest extends PaimonSparkTestBase { 0.until(fileNum).foreach { i => val path = s"f$i.parquet" - files += DataFileMeta.forAppend(path, 750000, 30000, null, 0, 29999, 1) + files += DataFileMeta.forAppend(path, 750000, 30000, null, 0, 29999, 1, FileSource.APPEND) rawFiles += new RawFile(s"/a/b/$path", 0, 75000, "parquet", 0, 30000) } diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala index 6fcb48d68400..15b5d9dc9add 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/procedure/ExpireSnapshotsProcedureTest.scala @@ -71,7 +71,8 @@ class ExpireSnapshotsProcedureTest extends PaimonSparkTestBase with StreamTest { // expire checkAnswer( - spark.sql("CALL paimon.sys.expire_snapshots(table => 'test.T', retain_max => 2)"), + spark.sql( + "CALL paimon.sys.expire_snapshots(table => 'test.T', retain_max => 2, retain_min => 1)"), Row(1) :: Nil) checkAnswer( From b6af9fca0dea764bcfcae235f91d98866f91292f Mon Sep 17 00:00:00 2001 From: Aitozi Date: Thu, 18 Apr 2024 14:40:45 +0800 Subject: [PATCH 2/2] fix --- .../org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala index dbe069be3f68..7285e5ded3e0 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/sql/AnalyzeTableTestBase.scala @@ -288,7 +288,7 @@ abstract class AnalyzeTableTestBase extends PaimonSparkTestBase { Assertions.assertEquals(2, statsFileCount(tableLocation, fileIO)) // test expire statistic - spark.sql("CALL sys.expire_snapshots(table => 'test.T', retain_max => 1)") + spark.sql("CALL sys.expire_snapshots(table => 'test.T', retain_max => 1, retain_min => 1)") Assertions.assertEquals(1, statsFileCount(tableLocation, fileIO)) val orphanStats = new Path(tableLocation, "statistics/stats-orphan-0")