From 3b84da09137319686ae7e2bdde253d9a9c30b2ad Mon Sep 17 00:00:00 2001 From: Aitozi Date: Tue, 21 May 2024 18:06:00 +0800 Subject: [PATCH 1/3] [core] Introduce FileSource to DataFileMeta --- .../paimon/append/AppendOnlyWriter.java | 3 +- .../org/apache/paimon/io/DataFileMeta.java | 49 +++++++++++++----- .../paimon/io/DataFileMetaSerializer.java | 7 ++- .../paimon/io/KeyValueDataFileWriter.java | 9 +++- .../paimon/io/KeyValueFileWriterFactory.java | 14 +++-- .../apache/paimon/io/RowDataFileWriter.java | 9 +++- .../paimon/io/RowDataRollingFileWriter.java | 6 ++- .../apache/paimon/manifest/FileSource.java | 51 +++++++++++++++++++ .../paimon/mergetree/MergeTreeWriter.java | 2 +- .../compact/ChangelogMergeTreeRewriter.java | 3 +- .../compact/MergeTreeCompactRewriter.java | 2 +- .../apache/paimon/migrate/FileMetaUtils.java | 4 +- .../operation/AppendOnlyFileStoreWrite.java | 3 +- ...endOnlyTableCompactionCoordinatorTest.java | 4 +- .../paimon/append/AppendOnlyWriterTest.java | 4 +- .../crosspartition/IndexBootstrapTest.java | 4 +- .../paimon/io/DataFileTestDataGenerator.java | 4 +- .../apache/paimon/io/DataFileTestUtils.java | 10 ++-- .../paimon/io/KeyValueFileReadWriteTest.java | 11 ++-- .../paimon/io/RollingFileWriterTest.java | 3 +- .../ManifestCommittableSerializerTest.java | 3 +- .../manifest/ManifestFileMetaTestBase.java | 7 +-- .../paimon/mergetree/ContainsLevelsTest.java | 2 +- .../apache/paimon/mergetree/LevelsTest.java | 4 +- .../paimon/mergetree/LookupLevelsTest.java | 2 +- .../paimon/mergetree/MergeTreeTestBase.java | 2 +- .../compact/IntervalPartitionTest.java | 4 +- .../compact/UniversalCompactionTest.java | 4 +- .../paimon/operation/ExpireSnapshotsTest.java | 4 +- .../table/source/SplitGeneratorTest.java | 4 +- .../apache/paimon/spark/ScanHelperTest.scala | 7 ++- 31 files changed, 184 insertions(+), 61 deletions(-) create mode 100644 paimon-core/src/main/java/org/apache/paimon/manifest/FileSource.java diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index e26d302dac84..75f9f32db3d2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -256,7 +256,8 @@ private RowDataRollingFileWriter createRollingRowWriter() { seqNumCounter, fileCompression, statsCollectors, - fileIndexOptions); + fileIndexOptions, + false); } private void trySyncLatestCompaction(boolean blocking) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java index d62bb2d158bf..bfdfaf2493a2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMeta.java @@ -22,6 +22,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.Timestamp; import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.stats.SimpleStats; import org.apache.paimon.stats.SimpleStatsConverter; import org.apache.paimon.types.ArrayType; @@ -30,6 +31,7 @@ import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowType; +import org.apache.paimon.types.TinyIntType; import javax.annotation.Nullable; @@ -83,6 +85,8 @@ public class DataFileMeta { // file index filter bytes, if it is small, store in data file meta private final @Nullable byte[] embeddedIndex; + private final @Nullable FileSource fileSource; + public static DataFileMeta forAppend( String fileName, long fileSize, @@ -90,7 +94,8 @@ public static DataFileMeta forAppend( SimpleStats rowStats, long minSequenceNumber, long maxSequenceNumber, - long schemaId) { + long schemaId, + @Nullable FileSource fileSource) { return forAppend( fileName, fileSize, @@ -100,7 +105,8 @@ public static DataFileMeta forAppend( maxSequenceNumber, schemaId, Collections.emptyList(), - null); + null, + fileSource); } public static DataFileMeta forAppend( @@ -112,7 +118,8 @@ public static DataFileMeta forAppend( long maxSequenceNumber, long schemaId, List extraFiles, - @Nullable byte[] embeddedIndex) { + @Nullable byte[] embeddedIndex, + @Nullable FileSource fileSource) { return new DataFileMeta( fileName, fileSize, @@ -128,7 +135,8 @@ public static DataFileMeta forAppend( extraFiles, Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(), 0L, - embeddedIndex); + embeddedIndex, + fileSource); } public DataFileMeta( @@ -144,7 +152,8 @@ public DataFileMeta( long schemaId, int level, @Nullable Long deleteRowCount, - @Nullable byte[] embeddedIndex) { + @Nullable byte[] embeddedIndex, + @Nullable FileSource fileSource) { this( fileName, fileSize, @@ -160,7 +169,8 @@ public DataFileMeta( Collections.emptyList(), Timestamp.fromLocalDateTime(LocalDateTime.now()).toMillisTimestamp(), deleteRowCount, - embeddedIndex); + embeddedIndex, + fileSource); } public DataFileMeta( @@ -178,7 +188,8 @@ public DataFileMeta( List extraFiles, Timestamp creationTime, @Nullable Long deleteRowCount, - @Nullable byte[] embeddedIndex) { + @Nullable byte[] embeddedIndex, + @Nullable FileSource fileSource) { this.fileName = fileName; this.fileSize = fileSize; @@ -198,6 +209,7 @@ public DataFileMeta( this.creationTime = creationTime; this.deleteRowCount = deleteRowCount; + this.fileSource = fileSource; } public String fileName() { @@ -291,6 +303,10 @@ public String fileFormat() { return split[split.length - 1]; } + public Optional fileSource() { + return Optional.ofNullable(fileSource); + } + public DataFileMeta upgrade(int newLevel) { checkArgument(newLevel > this.level); return new DataFileMeta( @@ -308,7 +324,8 @@ public DataFileMeta upgrade(int newLevel) { extraFiles, creationTime, deleteRowCount, - embeddedIndex); + embeddedIndex, + fileSource); } public List collectFiles(DataFilePathFactory pathFactory) { @@ -334,7 +351,8 @@ public DataFileMeta copy(List newExtraFiles) { newExtraFiles, creationTime, deleteRowCount, - embeddedIndex); + embeddedIndex, + fileSource); } @Override @@ -360,7 +378,8 @@ public boolean equals(Object o) { && level == that.level && Objects.equals(extraFiles, that.extraFiles) && Objects.equals(creationTime, that.creationTime) - && Objects.equals(deleteRowCount, that.deleteRowCount); + && Objects.equals(deleteRowCount, that.deleteRowCount) + && Objects.equals(fileSource, that.fileSource); } @Override @@ -380,7 +399,8 @@ public int hashCode() { level, extraFiles, creationTime, - deleteRowCount); + deleteRowCount, + fileSource); } @Override @@ -389,7 +409,8 @@ public String toString() { "{fileName: %s, fileSize: %d, rowCount: %d, embeddedIndex: %s, " + "minKey: %s, maxKey: %s, keyStats: %s, valueStats: %s, " + "minSequenceNumber: %d, maxSequenceNumber: %d, " - + "schemaId: %d, level: %d, extraFiles: %s, creationTime: %s, deleteRowCount: %d}", + + "schemaId: %d, level: %d, extraFiles: %s, creationTime: %s, " + + "deleteRowCount: %d, fileSource: %s}", fileName, fileSize, rowCount, @@ -404,7 +425,8 @@ public String toString() { level, extraFiles, creationTime, - deleteRowCount); + deleteRowCount, + fileSource); } public static RowType schema() { @@ -424,6 +446,7 @@ public static RowType schema() { fields.add(new DataField(12, "_CREATION_TIME", DataTypes.TIMESTAMP_MILLIS())); fields.add(new DataField(13, "_DELETE_ROW_COUNT", new BigIntType(true))); fields.add(new DataField(14, "_EMBEDDED_FILE_INDEX", newBytesType(true))); + fields.add(new DataField(15, "_FILE_SOURCE", new TinyIntType(true))); return new RowType(fields); } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java index 68707a6c4909..78891d064979 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/DataFileMetaSerializer.java @@ -21,6 +21,7 @@ import org.apache.paimon.data.BinaryString; import org.apache.paimon.data.GenericRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.stats.SimpleStats; import org.apache.paimon.utils.ObjectSerializer; @@ -55,7 +56,8 @@ public InternalRow toRow(DataFileMeta meta) { toStringArrayData(meta.extraFiles()), meta.creationTime(), meta.deleteRowCount().orElse(null), - meta.embeddedIndex()); + meta.embeddedIndex(), + meta.fileSource().map(FileSource::toByteValue).orElse(null)); } @Override @@ -75,6 +77,7 @@ public DataFileMeta fromRow(InternalRow row) { fromStringArrayData(row.getArray(11)), row.getTimestamp(12, 3), row.isNullAt(13) ? null : row.getLong(13), - row.isNullAt(14) ? null : row.getBinary(14)); + row.isNullAt(14) ? null : row.getBinary(14), + row.isNullAt(15) ? null : FileSource.fromByteValue(row.getByte(15))); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java index bfea680fad33..c3961ca87b11 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java @@ -28,6 +28,7 @@ import org.apache.paimon.format.SimpleStatsExtractor; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.stats.SimpleStats; import org.apache.paimon.stats.SimpleStatsConverter; import org.apache.paimon.types.RowType; @@ -68,6 +69,7 @@ public class KeyValueDataFileWriter private long minSeqNumber = Long.MAX_VALUE; private long maxSeqNumber = Long.MIN_VALUE; private long deleteRecordCount = 0; + private final boolean isCompact; public KeyValueDataFileWriter( FileIO fileIO, @@ -80,7 +82,8 @@ public KeyValueDataFileWriter( long schemaId, int level, String compression, - CoreOptions options) { + CoreOptions options, + boolean isCompact) { super( fileIO, factory, @@ -100,6 +103,7 @@ public KeyValueDataFileWriter( this.keyStatsConverter = new SimpleStatsConverter(keyType); this.valueStatsConverter = new SimpleStatsConverter(valueType); this.keySerializer = new InternalRowSerializer(keyType); + this.isCompact = isCompact; } @Override @@ -170,6 +174,7 @@ public DataFileMeta result() throws IOException { level, deleteRecordCount, // TODO: enable file filter for primary key table (e.g. deletion table). - null); + null, + isCompact ? FileSource.COMPACT : FileSource.APPEND); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java index 0d39161483fc..8b9d4c28edec 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java @@ -81,9 +81,12 @@ public DataFilePathFactory pathFactory(int level) { return formatContext.pathFactory(level); } - public RollingFileWriter createRollingMergeTreeFileWriter(int level) { + public RollingFileWriter createRollingMergeTreeFileWriter( + int level, boolean isCompact) { return new RollingFileWriter<>( - () -> createDataFileWriter(formatContext.pathFactory(level).newPath(), level), + () -> + createDataFileWriter( + formatContext.pathFactory(level).newPath(), level, isCompact), suggestedFileSize); } @@ -91,11 +94,11 @@ public RollingFileWriter createRollingChangelogFileWrite return new RollingFileWriter<>( () -> createDataFileWriter( - formatContext.pathFactory(level).newChangelogPath(), level), + formatContext.pathFactory(level).newChangelogPath(), level, false), suggestedFileSize); } - private KeyValueDataFileWriter createDataFileWriter(Path path, int level) { + private KeyValueDataFileWriter createDataFileWriter(Path path, int level, boolean isCompact) { KeyValueSerializer kvSerializer = new KeyValueSerializer(keyType, valueType); return new KeyValueDataFileWriter( fileIO, @@ -108,7 +111,8 @@ private KeyValueDataFileWriter createDataFileWriter(Path path, int level) { schemaId, level, formatContext.compression(level), - options); + options, + isCompact); } public void deleteFile(String filename, int level) { diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java index e0521ca79887..744e37731297 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java @@ -24,6 +24,7 @@ import org.apache.paimon.format.SimpleStatsExtractor; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.statistics.SimpleColStatsCollector; import org.apache.paimon.stats.SimpleStats; import org.apache.paimon.stats.SimpleStatsConverter; @@ -48,6 +49,7 @@ public class RowDataFileWriter extends StatsCollectingSingleFileWriter new RowDataFileWriter( @@ -57,7 +58,8 @@ public RowDataRollingFileWriter( seqNumCounter, fileCompression, statsCollectors, - fileIndexOptions), + fileIndexOptions, + isCompact), targetFileSize); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/FileSource.java b/paimon-core/src/main/java/org/apache/paimon/manifest/FileSource.java new file mode 100644 index 000000000000..c62aea2d64b5 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileSource.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.manifest; + +/** The Source of a file. */ +public enum FileSource { + + /** The file from new input. */ + APPEND((byte) 0), + + /** The file from compaction. */ + COMPACT((byte) 1); + + private final byte value; + + FileSource(byte value) { + this.value = value; + } + + public byte toByteValue() { + return value; + } + + public static FileSource fromByteValue(byte value) { + switch (value) { + case 0: + return APPEND; + case 1: + return COMPACT; + default: + throw new UnsupportedOperationException( + "Unsupported byte value '" + value + "' for value kind."); + } + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java index 6becbf73149c..07e6bc9a3932 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java @@ -212,7 +212,7 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul ? writerFactory.createRollingChangelogFileWriter(0) : null; final RollingFileWriter dataWriter = - writerFactory.createRollingMergeTreeFileWriter(0); + writerFactory.createRollingMergeTreeFileWriter(0, false); try { writeBuffer.forEach( diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java index a1aabc4e7d16..17fc7b8b7afc 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java @@ -128,7 +128,8 @@ private CompactResult rewriteOrProduceChangelog( readerForMergeTree(sections, createMergeWrapper(outputLevel)) .toCloseableIterator(); if (rewriteCompactFile) { - compactFileWriter = writerFactory.createRollingMergeTreeFileWriter(outputLevel); + compactFileWriter = + writerFactory.createRollingMergeTreeFileWriter(outputLevel, true); } if (produceChangelog) { changelogFileWriter = writerFactory.createRollingChangelogFileWriter(outputLevel); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java index d399d599da76..bbfd0fa58174 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java @@ -73,7 +73,7 @@ public CompactResult rewrite( protected CompactResult rewriteCompaction( int outputLevel, boolean dropDelete, List> sections) throws Exception { RollingFileWriter writer = - writerFactory.createRollingMergeTreeFileWriter(outputLevel); + writerFactory.createRollingMergeTreeFileWriter(outputLevel, true); RecordReader reader = readerForMergeTree(sections, new ReducerMergeFunctionWrapper(mfFactory.create())); if (dropDelete) { diff --git a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java index 1ffc0cb5d112..7c84c87d792e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java @@ -30,6 +30,7 @@ import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.statistics.SimpleColStatsCollector; import org.apache.paimon.stats.SimpleStats; import org.apache.paimon.stats.SimpleStatsConverter; @@ -164,7 +165,8 @@ private static DataFileMeta constructFileMeta( stats, 0, 0, - ((FileStoreTable) table).schema().id()); + ((FileStoreTable) table).schema().id(), + FileSource.APPEND); } public static BinaryRow writePartitionValue( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java index 560648d41b90..bfc90cb32aa6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java @@ -177,7 +177,8 @@ public AppendOnlyCompactManager.CompactRewriter compactRewriter( new LongCounter(toCompact.get(0).minSequenceNumber()), fileCompression, statsCollectors, - fileIndexOptions); + fileIndexOptions, + true); try { rewriter.write(bucketReader(partition, bucket).read(toCompact)); } finally { diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java index ec4212f16a50..bd0c7da296bf 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyTableCompactionCoordinatorTest.java @@ -23,6 +23,7 @@ import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.schema.Schema; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; @@ -187,6 +188,7 @@ private DataFileMeta newFile(long fileSize) { 0, 0, 0L, - null); + null, + FileSource.APPEND); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java index 6d0c5975490a..b5cc55f9c65a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/append/AppendOnlyWriterTest.java @@ -34,6 +34,7 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.memory.HeapMemorySegmentPool; import org.apache.paimon.memory.MemoryPoolFactory; import org.apache.paimon.options.MemorySize; @@ -646,6 +647,7 @@ private DataFileMeta generateCompactAfter(List toCompact) throws I }), minSeq, maxSeq, - toCompact.get(0).schemaId()); + toCompact.get(0).schemaId(), + FileSource.APPEND); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java index 19541f607f00..bbb1abfd3559 100644 --- a/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/crosspartition/IndexBootstrapTest.java @@ -24,6 +24,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.data.Timestamp; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; import org.apache.paimon.table.Table; @@ -141,7 +142,8 @@ private static DataFileMeta newFile(long timeMillis) { .atZone(ZoneId.systemDefault()) .toLocalDateTime()), 0L, - null); + null, + FileSource.APPEND); } private Pair row(int pt, int col, int pk, int bucket) { diff --git a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java index e891406c1f90..810cef860784 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestDataGenerator.java @@ -22,6 +22,7 @@ import org.apache.paimon.TestKeyValueGenerator; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.format.SimpleStatsCollector; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.statistics.FullSimpleColStatsCollector; import org.apache.paimon.statistics.SimpleColStatsCollector; import org.apache.paimon.stats.SimpleStatsConverter; @@ -162,7 +163,8 @@ private Data createDataFile(List kvs, int level, BinaryRow partition, 0, level, kvs.stream().filter(kv -> kv.valueKind().isRetract()).count(), - null), + null, + FileSource.APPEND), kvs); } diff --git a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java index 96ba262081ab..f31b21f1e437 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/DataFileTestUtils.java @@ -21,6 +21,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.BinaryRowWriter; import org.apache.paimon.data.Timestamp; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.stats.StatsTestUtils; import java.util.Collections; @@ -54,7 +55,8 @@ public static DataFileMeta newFile(long minSeq, long maxSeq) { Collections.emptyList(), Timestamp.fromEpochMillis(100), maxSeq - minSeq + 1, - null); + null, + FileSource.APPEND); } public static DataFileMeta newFile() { @@ -71,7 +73,8 @@ public static DataFileMeta newFile() { 0, 0, 0L, - null); + null, + FileSource.APPEND); } public static DataFileMeta newFile( @@ -94,7 +97,8 @@ public static DataFileMeta newFile( 0, level, deleteRowCount, - null); + null, + FileSource.APPEND); } public static BinaryRow row(int i) { diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java index e56e0deb9816..bc0f4a3aeb18 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java @@ -101,7 +101,7 @@ private void testWriteAndReadDataFileImpl(String format) throws Exception { DataFileMetaSerializer serializer = new DataFileMetaSerializer(); RollingFileWriter writer = - writerFactory.createRollingMergeTreeFileWriter(0); + writerFactory.createRollingMergeTreeFileWriter(0, false); writer.write(CloseableIterator.fromList(data.content, kv -> {})); writer.close(); List actualMetas = writer.result(); @@ -130,7 +130,8 @@ public void testCleanUpForException() throws IOException { FailingFileIO.getFailingPath(failingName, tempDir.toString()), "avro"); try { - FileWriter writer = writerFactory.createRollingMergeTreeFileWriter(0); + FileWriter writer = + writerFactory.createRollingMergeTreeFileWriter(0, false); writer.write(CloseableIterator.fromList(data.content, kv -> {})); } catch (Throwable e) { if (e.getCause() != null) { @@ -154,7 +155,7 @@ public void testKeyProjection() throws Exception { DataFileMetaSerializer serializer = new DataFileMetaSerializer(); RollingFileWriter writer = - writerFactory.createRollingMergeTreeFileWriter(0); + writerFactory.createRollingMergeTreeFileWriter(0, false); writer.write(CloseableIterator.fromList(data.content, kv -> {})); writer.close(); List actualMetas = writer.result(); @@ -192,7 +193,7 @@ public void testValueProjection() throws Exception { DataFileMetaSerializer serializer = new DataFileMetaSerializer(); RollingFileWriter writer = - writerFactory.createRollingMergeTreeFileWriter(0); + writerFactory.createRollingMergeTreeFileWriter(0, false); writer.write(CloseableIterator.fromList(data.content, kv -> {})); writer.close(); List actualMetas = writer.result(); @@ -397,7 +398,7 @@ public void testReaderUseFileSizeFromMetadata(String format) throws Exception { DataFileMetaSerializer serializer = new DataFileMetaSerializer(); RollingFileWriter writer = - writerFactory.createRollingMergeTreeFileWriter(0); + writerFactory.createRollingMergeTreeFileWriter(0, false); writer.write(CloseableIterator.fromList(data.content, kv -> {})); writer.close(); List actualMetas = writer.result(); diff --git a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java index 04712ffd87cc..9fb7e5f9c2c0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java @@ -88,7 +88,8 @@ public void initialize(String identifier) { StatsCollectorFactories.createStatsFactories( new CoreOptions(new HashMap<>()), SCHEMA.getFieldNames()), - new FileIndexOptions()), + new FileIndexOptions(), + false), TARGET_FILE_SIZE); } diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java index d9c24eddb6da..099c38003391 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestCommittableSerializerTest.java @@ -116,6 +116,7 @@ public static DataFileMeta newFile(int name, int level) { 0, level, 0L, - null); + null, + FileSource.APPEND); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java index a86d6f9ed215..e74af8b30e33 100644 --- a/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/manifest/ManifestFileMetaTestBase.java @@ -83,8 +83,8 @@ protected ManifestEntry makeEntry(boolean isAdd, String fileName, Integer partit Collections.emptyList(), Timestamp.fromEpochMillis(200000), 0L, // not used - null // not used - )); + null, // not used + FileSource.APPEND)); } protected ManifestFileMeta makeManifest(ManifestEntry... entries) { @@ -245,6 +245,7 @@ public static ManifestEntry makeEntry( 0, // not used 0, // not used 0L, - null)); + null, + FileSource.APPEND)); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java index 9ec0e38e8d10..91bb4cd2beea 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java @@ -206,7 +206,7 @@ private KeyValue kv(int key, int value) { private DataFileMeta newFile(int level, KeyValue... records) throws IOException { RollingFileWriter writer = - createWriterFactory().createRollingMergeTreeFileWriter(level); + createWriterFactory().createRollingMergeTreeFileWriter(level, false); for (KeyValue kv : records) { writer.write(kv); } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java index 4763585b00a4..a86aa445b2a6 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LevelsTest.java @@ -20,6 +20,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.FileSource; import org.junit.jupiter.api.Test; @@ -81,6 +82,7 @@ public static DataFileMeta newFile(int level) { 0, level, 0L, - null); + null, + FileSource.APPEND); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java index d38b516fcf4f..d915e247a2ae 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java @@ -286,7 +286,7 @@ private KeyValue kv(int key, int value, long seqNumber) { private DataFileMeta newFile(int level, KeyValue... records) throws IOException { RollingFileWriter writer = - createWriterFactory().createRollingMergeTreeFileWriter(level); + createWriterFactory().createRollingMergeTreeFileWriter(level, false); for (KeyValue kv : records) { writer.write(kv); } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java index 3f9bd3e6ea85..f0aba748bf30 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java @@ -608,7 +608,7 @@ public CompactResult rewrite( int outputLevel, boolean dropDelete, List> sections) throws Exception { RollingFileWriter writer = - writerFactory.createRollingMergeTreeFileWriter(outputLevel); + writerFactory.createRollingMergeTreeFileWriter(outputLevel, true); RecordReader reader = MergeTreeReaders.readerForMergeTree( sections, diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java index 684eb7fc3916..4d4117c7d11f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/IntervalPartitionTest.java @@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRowWriter; import org.apache.paimon.data.Timestamp; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.mergetree.SortedRun; import org.apache.paimon.stats.StatsTestUtils; @@ -182,7 +183,8 @@ private DataFileMeta makeInterval(int left, int right) { Collections.emptyList(), Timestamp.fromEpochMillis(100000), 0L, - null); + null, + FileSource.APPEND); } private List> toMultiset(List> sections) { diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java index 0d891f2c78d3..25d263a93102 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/UniversalCompactionTest.java @@ -20,6 +20,7 @@ import org.apache.paimon.compact.CompactUnit; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.mergetree.LevelSortedRun; import org.apache.paimon.mergetree.SortedRun; @@ -357,6 +358,7 @@ private LevelSortedRun level(int level, long size) { } static DataFileMeta file(long size) { - return new DataFileMeta("", size, 1, null, null, null, null, 0, 0, 0, 0, 0L, null); + return new DataFileMeta( + "", size, 1, null, null, null, null, 0, 0, 0, 0, 0L, null, FileSource.APPEND); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java index 626c13c018fc..8d56a516904f 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/ExpireSnapshotsTest.java @@ -30,6 +30,7 @@ import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.manifest.FileKind; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.manifest.ManifestEntry; import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction; import org.apache.paimon.schema.Schema; @@ -205,7 +206,8 @@ public void testExpireExtraFiles() throws IOException { extraFiles, Timestamp.now(), 0L, - null); + null, + FileSource.APPEND); ManifestEntry add = new ManifestEntry(FileKind.ADD, partition, 0, 1, dataFile); ManifestEntry delete = new ManifestEntry(FileKind.DELETE, partition, 0, 1, dataFile); diff --git a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java index a3685d5f6245..3594a8a784f7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/source/SplitGeneratorTest.java @@ -20,6 +20,7 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.table.BucketMode; import org.apache.paimon.utils.Pair; @@ -56,7 +57,8 @@ public static DataFileMeta newFileFromSequence( 0, 0, 0L, - null); + null, + FileSource.APPEND); } @Test diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala index 7b150c1fc8a6..6c7042d23f1b 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala @@ -21,12 +21,11 @@ package org.apache.paimon.spark import org.apache.paimon.CoreOptions import org.apache.paimon.data.BinaryRow import org.apache.paimon.io.DataFileMeta -import org.apache.paimon.table.source.{DataSplit, RawFile, Split} - +import org.apache.paimon.manifest.FileSource +import org.apache.paimon.table.source.{DataSplit, Split} import org.junit.jupiter.api.Assertions import java.util.HashMap - import scala.collection.JavaConverters._ import scala.collection.mutable @@ -41,7 +40,7 @@ class ScanHelperTest extends PaimonSparkTestBase { 0.until(fileNum).foreach { i => val path = s"f$i.parquet" - files += DataFileMeta.forAppend(path, 750000, 30000, null, 0, 29999, 1) + files += DataFileMeta.forAppend(path, 750000, 30000, null, 0, 29999, 1, FileSource.APPEND) } val dataSplits = mutable.ArrayBuffer.empty[Split] From 7027ed8083a9a4e7c6d2053e5f2e211daa24805a Mon Sep 17 00:00:00 2001 From: Aitozi Date: Tue, 21 May 2024 18:15:09 +0800 Subject: [PATCH 2/3] fix compile --- .../flink/sink/CompactionTaskSimpleSerializerTest.java | 4 +++- .../flink/source/FileStoreSourceSplitGeneratorTest.java | 5 +++-- .../flink/source/FileStoreSourceSplitSerializerTest.java | 4 +++- .../test/scala/org/apache/paimon/spark/ScanHelperTest.scala | 2 ++ 4 files changed, 11 insertions(+), 4 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java index b46015fbc27b..56813a7da55f 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactionTaskSimpleSerializerTest.java @@ -21,6 +21,7 @@ import org.apache.paimon.append.AppendOnlyCompactionTask; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.table.sink.CompactionTaskSerializer; import org.junit.jupiter.api.Test; @@ -76,6 +77,7 @@ private DataFileMeta newFile() { 0, 0, 0L, - null); + null, + FileSource.APPEND); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java index 820086f92d9d..ad30f6388c90 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitGeneratorTest.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.source; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.stats.StatsTestUtils; import org.apache.paimon.table.source.DataFilePlan; import org.apache.paimon.table.source.DataSplit; @@ -113,8 +114,8 @@ private DataSplit dataSplit(int partition, int bucket, String... fileNames) { 0, // not used 0, // not used 0L, // not used - null // not used - )); + null, // not used + FileSource.APPEND)); } return DataSplit.builder() .withSnapshot(1) diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java index 795373e96a73..f2c0732a2208 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceSplitSerializerTest.java @@ -20,6 +20,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.stats.StatsTestUtils; import org.apache.paimon.table.source.DataSplit; @@ -86,7 +87,8 @@ public static DataFileMeta newFile(int level) { 0, level, 0L, - null); + null, + FileSource.APPEND); } public static FileStoreSourceSplit newSourceSplit( diff --git a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala index 6c7042d23f1b..7fae33953dd7 100644 --- a/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala +++ b/paimon-spark/paimon-spark-common/src/test/scala/org/apache/paimon/spark/ScanHelperTest.scala @@ -23,9 +23,11 @@ import org.apache.paimon.data.BinaryRow import org.apache.paimon.io.DataFileMeta import org.apache.paimon.manifest.FileSource import org.apache.paimon.table.source.{DataSplit, Split} + import org.junit.jupiter.api.Assertions import java.util.HashMap + import scala.collection.JavaConverters._ import scala.collection.mutable From 3fd6255afb36dff64efc98511baed62c9e72a420 Mon Sep 17 00:00:00 2001 From: Aitozi Date: Wed, 22 May 2024 16:59:30 +0800 Subject: [PATCH 3/3] fix comments --- .../org/apache/paimon/append/AppendOnlyWriter.java | 3 ++- .../apache/paimon/io/KeyValueDataFileWriter.java | 8 ++++---- .../paimon/io/KeyValueFileWriterFactory.java | 14 +++++++++----- .../org/apache/paimon/io/RowDataFileWriter.java | 8 ++++---- .../apache/paimon/io/RowDataRollingFileWriter.java | 5 +++-- .../apache/paimon/mergetree/MergeTreeWriter.java | 3 ++- .../compact/ChangelogMergeTreeRewriter.java | 4 +++- .../compact/MergeTreeCompactRewriter.java | 3 ++- .../paimon/operation/AppendOnlyFileStoreWrite.java | 3 ++- .../paimon/io/KeyValueFileReadWriteTest.java | 11 ++++++----- .../apache/paimon/io/RollingFileWriterTest.java | 3 ++- .../paimon/mergetree/ContainsLevelsTest.java | 3 ++- .../apache/paimon/mergetree/LookupLevelsTest.java | 3 ++- .../apache/paimon/mergetree/MergeTreeTestBase.java | 3 ++- 14 files changed, 45 insertions(+), 29 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java index 75f9f32db3d2..c5a47ff54190 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyWriter.java @@ -32,6 +32,7 @@ import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.io.DataIncrement; import org.apache.paimon.io.RowDataRollingFileWriter; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.memory.MemoryOwner; import org.apache.paimon.memory.MemorySegmentPool; import org.apache.paimon.operation.AppendOnlyFileStoreWrite; @@ -257,7 +258,7 @@ private RowDataRollingFileWriter createRollingRowWriter() { fileCompression, statsCollectors, fileIndexOptions, - false); + FileSource.APPEND); } private void trySyncLatestCompaction(boolean blocking) diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java index c3961ca87b11..46db1ddd1b71 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueDataFileWriter.java @@ -63,13 +63,13 @@ public class KeyValueDataFileWriter private final SimpleStatsConverter keyStatsConverter; private final SimpleStatsConverter valueStatsConverter; private final InternalRowSerializer keySerializer; + private final FileSource fileSource; private BinaryRow minKey = null; private InternalRow maxKey = null; private long minSeqNumber = Long.MAX_VALUE; private long maxSeqNumber = Long.MIN_VALUE; private long deleteRecordCount = 0; - private final boolean isCompact; public KeyValueDataFileWriter( FileIO fileIO, @@ -83,7 +83,7 @@ public KeyValueDataFileWriter( int level, String compression, CoreOptions options, - boolean isCompact) { + FileSource fileSource) { super( fileIO, factory, @@ -103,7 +103,7 @@ public KeyValueDataFileWriter( this.keyStatsConverter = new SimpleStatsConverter(keyType); this.valueStatsConverter = new SimpleStatsConverter(valueType); this.keySerializer = new InternalRowSerializer(keyType); - this.isCompact = isCompact; + this.fileSource = fileSource; } @Override @@ -175,6 +175,6 @@ public DataFileMeta result() throws IOException { deleteRecordCount, // TODO: enable file filter for primary key table (e.g. deletion table). null, - isCompact ? FileSource.COMPACT : FileSource.APPEND); + fileSource); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java index 8b9d4c28edec..72f9b3f65153 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/KeyValueFileWriterFactory.java @@ -28,6 +28,7 @@ import org.apache.paimon.format.SimpleStatsExtractor; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.statistics.SimpleColStatsCollector; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.FileStorePathFactory; @@ -82,11 +83,11 @@ public DataFilePathFactory pathFactory(int level) { } public RollingFileWriter createRollingMergeTreeFileWriter( - int level, boolean isCompact) { + int level, FileSource fileSource) { return new RollingFileWriter<>( () -> createDataFileWriter( - formatContext.pathFactory(level).newPath(), level, isCompact), + formatContext.pathFactory(level).newPath(), level, fileSource), suggestedFileSize); } @@ -94,11 +95,14 @@ public RollingFileWriter createRollingChangelogFileWrite return new RollingFileWriter<>( () -> createDataFileWriter( - formatContext.pathFactory(level).newChangelogPath(), level, false), + formatContext.pathFactory(level).newChangelogPath(), + level, + FileSource.APPEND), suggestedFileSize); } - private KeyValueDataFileWriter createDataFileWriter(Path path, int level, boolean isCompact) { + private KeyValueDataFileWriter createDataFileWriter( + Path path, int level, FileSource fileSource) { KeyValueSerializer kvSerializer = new KeyValueSerializer(keyType, valueType); return new KeyValueDataFileWriter( fileIO, @@ -112,7 +116,7 @@ private KeyValueDataFileWriter createDataFileWriter(Path path, int level, boolea level, formatContext.compression(level), options, - isCompact); + fileSource); } public void deleteFile(String filename, int level) { diff --git a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java index 744e37731297..3da9098174cf 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/RowDataFileWriter.java @@ -49,7 +49,7 @@ public class RowDataFileWriter extends StatsCollectingSingleFileWriter new RowDataFileWriter( @@ -59,7 +60,7 @@ public RowDataRollingFileWriter( fileCompression, statsCollectors, fileIndexOptions, - isCompact), + fileSource), targetFileSize); } } diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java index 07e6bc9a3932..a42952914eb0 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/MergeTreeWriter.java @@ -30,6 +30,7 @@ import org.apache.paimon.io.DataIncrement; import org.apache.paimon.io.KeyValueFileWriterFactory; import org.apache.paimon.io.RollingFileWriter; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.memory.MemoryOwner; import org.apache.paimon.memory.MemorySegmentPool; import org.apache.paimon.mergetree.compact.MergeFunction; @@ -212,7 +213,7 @@ private void flushWriteBuffer(boolean waitForLatestCompaction, boolean forcedFul ? writerFactory.createRollingChangelogFileWriter(0) : null; final RollingFileWriter dataWriter = - writerFactory.createRollingMergeTreeFileWriter(0, false); + writerFactory.createRollingMergeTreeFileWriter(0, FileSource.APPEND); try { writeBuffer.forEach( diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java index 17fc7b8b7afc..a03d53329ea2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/ChangelogMergeTreeRewriter.java @@ -26,6 +26,7 @@ import org.apache.paimon.io.FileReaderFactory; import org.apache.paimon.io.KeyValueFileWriterFactory; import org.apache.paimon.io.RollingFileWriter; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.mergetree.MergeSorter; import org.apache.paimon.mergetree.SortedRun; import org.apache.paimon.utils.CloseableIterator; @@ -129,7 +130,8 @@ private CompactResult rewriteOrProduceChangelog( .toCloseableIterator(); if (rewriteCompactFile) { compactFileWriter = - writerFactory.createRollingMergeTreeFileWriter(outputLevel, true); + writerFactory.createRollingMergeTreeFileWriter( + outputLevel, FileSource.COMPACT); } if (produceChangelog) { changelogFileWriter = writerFactory.createRollingChangelogFileWriter(outputLevel); diff --git a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java index bbfd0fa58174..89e82fe3f4ca 100644 --- a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/MergeTreeCompactRewriter.java @@ -25,6 +25,7 @@ import org.apache.paimon.io.FileReaderFactory; import org.apache.paimon.io.KeyValueFileWriterFactory; import org.apache.paimon.io.RollingFileWriter; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.mergetree.DropDeleteReader; import org.apache.paimon.mergetree.MergeSorter; import org.apache.paimon.mergetree.MergeTreeReaders; @@ -73,7 +74,7 @@ public CompactResult rewrite( protected CompactResult rewriteCompaction( int outputLevel, boolean dropDelete, List> sections) throws Exception { RollingFileWriter writer = - writerFactory.createRollingMergeTreeFileWriter(outputLevel, true); + writerFactory.createRollingMergeTreeFileWriter(outputLevel, FileSource.COMPACT); RecordReader reader = readerForMergeTree(sections, new ReducerMergeFunctionWrapper(mfFactory.create())); if (dropDelete) { diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java index bfc90cb32aa6..64cbb5fd7579 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreWrite.java @@ -33,6 +33,7 @@ import org.apache.paimon.io.DataFileMeta; import org.apache.paimon.io.DataFilePathFactory; import org.apache.paimon.io.RowDataRollingFileWriter; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.options.MemorySize; import org.apache.paimon.reader.RecordReaderIterator; import org.apache.paimon.statistics.SimpleColStatsCollector; @@ -178,7 +179,7 @@ public AppendOnlyCompactManager.CompactRewriter compactRewriter( fileCompression, statsCollectors, fileIndexOptions, - true); + FileSource.COMPACT); try { rewriter.write(bucketReader(partition, bucket).read(toCompact)); } finally { diff --git a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java index bc0f4a3aeb18..9f1405680347 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/KeyValueFileReadWriteTest.java @@ -33,6 +33,7 @@ import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.options.Options; import org.apache.paimon.reader.RecordReaderIterator; import org.apache.paimon.stats.StatsTestUtils; @@ -101,7 +102,7 @@ private void testWriteAndReadDataFileImpl(String format) throws Exception { DataFileMetaSerializer serializer = new DataFileMetaSerializer(); RollingFileWriter writer = - writerFactory.createRollingMergeTreeFileWriter(0, false); + writerFactory.createRollingMergeTreeFileWriter(0, FileSource.APPEND); writer.write(CloseableIterator.fromList(data.content, kv -> {})); writer.close(); List actualMetas = writer.result(); @@ -131,7 +132,7 @@ public void testCleanUpForException() throws IOException { try { FileWriter writer = - writerFactory.createRollingMergeTreeFileWriter(0, false); + writerFactory.createRollingMergeTreeFileWriter(0, FileSource.APPEND); writer.write(CloseableIterator.fromList(data.content, kv -> {})); } catch (Throwable e) { if (e.getCause() != null) { @@ -155,7 +156,7 @@ public void testKeyProjection() throws Exception { DataFileMetaSerializer serializer = new DataFileMetaSerializer(); RollingFileWriter writer = - writerFactory.createRollingMergeTreeFileWriter(0, false); + writerFactory.createRollingMergeTreeFileWriter(0, FileSource.APPEND); writer.write(CloseableIterator.fromList(data.content, kv -> {})); writer.close(); List actualMetas = writer.result(); @@ -193,7 +194,7 @@ public void testValueProjection() throws Exception { DataFileMetaSerializer serializer = new DataFileMetaSerializer(); RollingFileWriter writer = - writerFactory.createRollingMergeTreeFileWriter(0, false); + writerFactory.createRollingMergeTreeFileWriter(0, FileSource.APPEND); writer.write(CloseableIterator.fromList(data.content, kv -> {})); writer.close(); List actualMetas = writer.result(); @@ -398,7 +399,7 @@ public void testReaderUseFileSizeFromMetadata(String format) throws Exception { DataFileMetaSerializer serializer = new DataFileMetaSerializer(); RollingFileWriter writer = - writerFactory.createRollingMergeTreeFileWriter(0, false); + writerFactory.createRollingMergeTreeFileWriter(0, FileSource.APPEND); writer.write(CloseableIterator.fromList(data.content, kv -> {})); writer.close(); List actualMetas = writer.result(); diff --git a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java index 9fb7e5f9c2c0..2ac2a8e4d319 100644 --- a/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/io/RollingFileWriterTest.java @@ -25,6 +25,7 @@ import org.apache.paimon.format.FileFormat; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.options.Options; import org.apache.paimon.statistics.SimpleColStatsCollector; import org.apache.paimon.types.DataType; @@ -89,7 +90,7 @@ public void initialize(String identifier) { new CoreOptions(new HashMap<>()), SCHEMA.getFieldNames()), new FileIndexOptions(), - false), + FileSource.APPEND), TARGET_FILE_SIZE); } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java index 91bb4cd2beea..0bd60c6737d0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/ContainsLevelsTest.java @@ -33,6 +33,7 @@ import org.apache.paimon.io.RollingFileWriter; import org.apache.paimon.io.cache.CacheManager; import org.apache.paimon.lookup.hash.HashLookupStoreFactory; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; import org.apache.paimon.schema.KeyValueFieldsExtractor; @@ -206,7 +207,7 @@ private KeyValue kv(int key, int value) { private DataFileMeta newFile(int level, KeyValue... records) throws IOException { RollingFileWriter writer = - createWriterFactory().createRollingMergeTreeFileWriter(level, false); + createWriterFactory().createRollingMergeTreeFileWriter(level, FileSource.APPEND); for (KeyValue kv : records) { writer.write(kv); } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java index d915e247a2ae..b40c53d1250a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/LookupLevelsTest.java @@ -33,6 +33,7 @@ import org.apache.paimon.io.RollingFileWriter; import org.apache.paimon.io.cache.CacheManager; import org.apache.paimon.lookup.hash.HashLookupStoreFactory; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; import org.apache.paimon.schema.KeyValueFieldsExtractor; @@ -286,7 +287,7 @@ private KeyValue kv(int key, int value, long seqNumber) { private DataFileMeta newFile(int level, KeyValue... records) throws IOException { RollingFileWriter writer = - createWriterFactory().createRollingMergeTreeFileWriter(level, false); + createWriterFactory().createRollingMergeTreeFileWriter(level, FileSource.APPEND); for (KeyValue kv : records) { writer.write(kv); } diff --git a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java index f0aba748bf30..33b81a37319b 100644 --- a/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/mergetree/MergeTreeTestBase.java @@ -36,6 +36,7 @@ import org.apache.paimon.io.KeyValueFileReaderFactory; import org.apache.paimon.io.KeyValueFileWriterFactory; import org.apache.paimon.io.RollingFileWriter; +import org.apache.paimon.manifest.FileSource; import org.apache.paimon.memory.HeapMemorySegmentPool; import org.apache.paimon.mergetree.compact.AbstractCompactRewriter; import org.apache.paimon.mergetree.compact.CompactRewriter; @@ -608,7 +609,7 @@ public CompactResult rewrite( int outputLevel, boolean dropDelete, List> sections) throws Exception { RollingFileWriter writer = - writerFactory.createRollingMergeTreeFileWriter(outputLevel, true); + writerFactory.createRollingMergeTreeFileWriter(outputLevel, FileSource.COMPACT); RecordReader reader = MergeTreeReaders.readerForMergeTree( sections,