From b77a27839873e7ee1aa35721a68726da758ab7c1 Mon Sep 17 00:00:00 2001 From: "ranxianglei.rxl" Date: Thu, 2 Jan 2025 12:17:12 +0800 Subject: [PATCH 1/2] [core] manifest with buckets pushdown to reduce manifest reads --- .../apache/paimon/manifest/ManifestFile.java | 7 ++++- .../operation/AbstractFileStoreScan.java | 31 ++++++++++++++++++- .../paimon/operation/FileStoreScan.java | 3 ++ .../table/FallbackReadFileStoreTable.java | 9 ++++++ .../table/source/AbstractDataTableScan.java | 7 +++++ .../paimon/table/source/InnerTableScan.java | 11 +++++++ .../table/source/snapshot/SnapshotReader.java | 3 ++ .../source/snapshot/SnapshotReaderImpl.java | 6 ++++ .../paimon/table/system/AuditLogTable.java | 25 +++++++++++++++ .../operation/KeyValueFileStoreScanTest.java | 25 +++++++++++++++ 10 files changed, 125 insertions(+), 2 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java index 1aba2ef19561..0a495096df26 100644 --- a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java +++ b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestFile.java @@ -27,6 +27,7 @@ import org.apache.paimon.fs.Path; import org.apache.paimon.io.RollingFileWriter; import org.apache.paimon.io.SingleFileWriter; +import org.apache.paimon.predicate.Predicate; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.stats.SimpleStatsConverter; import org.apache.paimon.types.RowType; @@ -207,6 +208,10 @@ public boolean isCacheEnabled() { } public ManifestFile create() { + return create(null); + } + + public ManifestFile create(List filters) { RowType entryType = VersionedObjectSerializer.versionType(ManifestEntry.SCHEMA); return new ManifestFile( fileIO, @@ -214,7 +219,7 @@ public ManifestFile create() { partitionType, new ManifestEntrySerializer(), entryType, - fileFormat.createReaderFactory(entryType), + fileFormat.createReaderFactory(entryType, filters), fileFormat.createWriterFactory(entryType), compression, pathFactory.manifestFileFactory(), diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java index 4f8a1f3264de..392a3aea145e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java @@ -35,9 +35,13 @@ import org.apache.paimon.operation.metrics.ScanStats; import org.apache.paimon.partition.PartitionPredicate; import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.source.ScanMode; +import org.apache.paimon.types.DataType; +import org.apache.paimon.types.IntType; +import org.apache.paimon.types.RowType; import org.apache.paimon.utils.BiFilter; import org.apache.paimon.utils.Filter; import org.apache.paimon.utils.Pair; @@ -79,6 +83,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { private Snapshot specifiedSnapshot = null; private Filter bucketFilter = null; + private Collection buckets; private BiFilter totalAwareBucketFilter = null; protected ScanMode scanMode = ScanMode.ALL; private Filter levelFilter = null; @@ -127,6 +132,14 @@ public FileStoreScan withPartitionFilter(PartitionPredicate predicate) { @Override public FileStoreScan withBucket(int bucket) { this.bucketFilter = i -> i == bucket; + this.buckets = Collections.singletonList(bucket); + return this; + } + + @Override + public FileStoreScan withBuckets(Collection buckets) { + this.bucketFilter = buckets::contains; + this.buckets = buckets; return this; } @@ -427,7 +440,7 @@ private List readManifest( @Nullable Filter additionalTFilter) { List entries = manifestFileFactory - .create() + .create(createPushDownFilter(buckets)) .withCacheMetrics( scanMetrics != null ? scanMetrics.getCacheMetrics() : null) .read( @@ -471,6 +484,22 @@ private Filter createCacheRowFilter() { return row -> manifestCacheFilter.test(partitionGetter.apply(row), bucketGetter.apply(row)); } + /** + * Read the corresponding entries based on the current required bucket, but push down into file + * format . + */ + private static List createPushDownFilter(Collection buckets) { + if (buckets == null || buckets.isEmpty()) { + return null; + } + List predicates = new ArrayList<>(); + PredicateBuilder predicateBuilder = + new PredicateBuilder( + RowType.of(new DataType[] {new IntType()}, new String[] {"_BUCKET"})); + predicates.add(predicateBuilder.in(0, new ArrayList<>(buckets))); + return predicates; + } + /** * Read the corresponding entries based on the current required partition and bucket. * diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java index 179d16de6cd2..8e9dc31757fe 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java @@ -38,6 +38,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collection; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; @@ -57,6 +58,8 @@ public interface FileStoreScan { FileStoreScan withBucket(int bucket); + FileStoreScan withBuckets(Collection buckets); + FileStoreScan withBucketFilter(Filter bucketFilter); FileStoreScan withTotalAwareBucketFilter(BiFilter bucketFilter); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java index e3e290f06086..050b0841074d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java @@ -33,6 +33,7 @@ import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.DataTableScan; import org.apache.paimon.table.source.InnerTableRead; +import org.apache.paimon.table.source.InnerTableScan; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.TableRead; import org.apache.paimon.table.source.TableScan; @@ -43,6 +44,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -265,6 +267,13 @@ public Scan withBucketFilter(Filter bucketFilter) { return this; } + @Override + public InnerTableScan withBuckets(Collection buckets) { + mainScan.withBuckets(buckets); + fallbackScan.withBuckets(buckets); + return this; + } + @Override public Scan withLevelFilter(Filter levelFilter) { mainScan.withLevelFilter(levelFilter); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java index 24c6943f546f..1f478f283b68 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java @@ -48,6 +48,7 @@ import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.SnapshotManager; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Optional; @@ -79,6 +80,12 @@ public AbstractDataTableScan withBucketFilter(Filter bucketFilter) { return this; } + @Override + public AbstractDataTableScan withBuckets(Collection buckets) { + snapshotReader.withBuckets(buckets); + return this; + } + @Override public AbstractDataTableScan withPartitionFilter(Map partitionSpec) { snapshotReader.withPartitionFilter(partitionSpec); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java index c2425ff16f97..58f884528054 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java @@ -23,6 +23,8 @@ import org.apache.paimon.predicate.Predicate; import org.apache.paimon.utils.Filter; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -47,6 +49,15 @@ default InnerTableScan withBucketFilter(Filter bucketFilter) { return this; } + default InnerTableScan withBucket(Integer bucket) { + return withBuckets(Collections.singletonList(bucket)); + } + + default InnerTableScan withBuckets(Collection buckets) { + // return this is not safe for too many class not impl this method and withBucketFilter + return this; + } + default InnerTableScan withLevelFilter(Filter levelFilter) { return this; } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java index f3e0a92b8fc7..50a5b6940c3e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java @@ -39,6 +39,7 @@ import javax.annotation.Nullable; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -83,6 +84,8 @@ public interface SnapshotReader { SnapshotReader withBucket(int bucket); + SnapshotReader withBuckets(Collection buckets); + SnapshotReader withBucketFilter(Filter bucketFilter); SnapshotReader withDataFileNameFilter(Filter fileNameFilter); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index bf19ba10c689..c2c62102859d 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -54,6 +54,7 @@ import javax.annotation.Nullable; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -253,6 +254,11 @@ public SnapshotReader withBucket(int bucket) { return this; } + public SnapshotReader withBuckets(Collection buckets) { + scan.withBuckets(buckets); + return this; + } + @Override public SnapshotReader withBucketFilter(Filter bucketFilter) { scan.withBucketFilter(bucketFilter); diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java index 1cb967f8d1e2..f1dc3331b4ee 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java @@ -70,6 +70,7 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -342,6 +343,12 @@ public SnapshotReader withBucket(int bucket) { return this; } + @Override + public SnapshotReader withBuckets(Collection buckets) { + wrapped.withBuckets(buckets); + return this; + } + @Override public SnapshotReader withBucketFilter(Filter bucketFilter) { wrapped.withBucketFilter(bucketFilter); @@ -452,6 +459,12 @@ public InnerTableScan withBucketFilter(Filter bucketFilter) { return this; } + @Override + public InnerTableScan withBuckets(Collection buckets) { + batchScan.withBuckets(buckets); + return this; + } + @Override public InnerTableScan withLevelFilter(Filter levelFilter) { batchScan.withLevelFilter(levelFilter); @@ -489,6 +502,18 @@ public StreamDataTableScan withFilter(Predicate predicate) { return this; } + @Override + public InnerTableScan withBucketFilter(Filter bucketFilter) { + streamScan.withBucketFilter(bucketFilter); + return this; + } + + @Override + public InnerTableScan withBuckets(Collection buckets) { + streamScan.withBuckets(buckets); + return this; + } + @Override public StartingContext startingContext() { return streamScan.startingContext(); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java index 4f3d5c1c24dd..f20fd06d31e0 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/KeyValueFileStoreScanTest.java @@ -39,6 +39,7 @@ import org.junit.jupiter.api.io.TempDir; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -224,6 +225,30 @@ public void testWithBucket() throws Exception { runTestExactMatch(scan, snapshot.id(), expected); } + @Test + public void testWithBuckets() throws Exception { + ThreadLocalRandom random = ThreadLocalRandom.current(); + List data = generateData(random.nextInt(1000) + 1); + Snapshot snapshot = writeData(data); + + int wantedBucket1 = random.nextInt(NUM_BUCKETS); + int wantedBucket2 = random.nextInt(NUM_BUCKETS); + int wantedBucket3 = random.nextInt(NUM_BUCKETS); + Set buckets = + new HashSet<>(Arrays.asList(wantedBucket1, wantedBucket2, wantedBucket3)); + + FileStoreScan scan = store.newScan(); + scan.withSnapshot(snapshot.id()); + scan.withBuckets(buckets); + + Map expected = + store.toKvMap( + data.stream() + .filter(kv -> buckets.contains(getBucket(kv))) + .collect(Collectors.toList())); + runTestExactMatch(scan, snapshot.id(), expected); + } + @Test public void testWithSnapshot() throws Exception { ThreadLocalRandom random = ThreadLocalRandom.current(); From 41ac3a7ac76076fc80662c7c738cb9d8c9c97b83 Mon Sep 17 00:00:00 2001 From: "ranxianglei.rxl" Date: Thu, 2 Jan 2025 14:17:48 +0800 Subject: [PATCH 2/2] [test] fix flink org.apache.paimon.flink.action.RemoveOrphanFilesActionITCaseBase#testCleanWithBranch --- .../paimon/table/source/snapshot/SnapshotReaderImpl.java | 8 +++++++- paimon-flink/paimon-flink-1.18/pom.xml | 8 ++++++++ paimon-flink/paimon-flink-1.19/pom.xml | 9 +++++++++ 3 files changed, 24 insertions(+), 1 deletion(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java index c2c62102859d..f24b0760e6f3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java @@ -291,7 +291,13 @@ public SnapshotReader withShard(int indexOfThisSubtask, int numberOfParallelSubt Math.abs(file.hashCode() % numberOfParallelSubtasks) == indexOfThisSubtask); } else { - withBucketFilter(bucket -> bucket % numberOfParallelSubtasks == indexOfThisSubtask); + Set buckets = new HashSet<>(); + for (int bucket = 0; bucket < this.tableSchema.numBuckets(); bucket++) { + if (bucket % numberOfParallelSubtasks == indexOfThisSubtask) { + buckets.add(bucket); + } + } + withBuckets(buckets); } return this; } diff --git a/paimon-flink/paimon-flink-1.18/pom.xml b/paimon-flink/paimon-flink-1.18/pom.xml index 75e6f1f0f484..a5cb781b06be 100644 --- a/paimon-flink/paimon-flink-1.18/pom.xml +++ b/paimon-flink/paimon-flink-1.18/pom.xml @@ -80,6 +80,14 @@ under the License. test + + org.apache.paimon + paimon-format + ${project.version} + test + + + org.apache.flink flink-test-utils diff --git a/paimon-flink/paimon-flink-1.19/pom.xml b/paimon-flink/paimon-flink-1.19/pom.xml index 8d6ea90b8b80..10d815cf42dc 100644 --- a/paimon-flink/paimon-flink-1.19/pom.xml +++ b/paimon-flink/paimon-flink-1.19/pom.xml @@ -87,6 +87,15 @@ under the License. test + + + org.apache.paimon + paimon-format + ${project.version} + test + + + org.apache.flink flink-test-utils