From 77334f89cf663a5dd82da29c90bf2da67e13915c Mon Sep 17 00:00:00 2001 From: Jingsong Date: Sun, 15 Dec 2024 23:05:30 +0800 Subject: [PATCH] [flink] Enable limit pushdown and count optimization for dv table --- .../paimon/flink/source/DataTableSource.java | 8 ++-- .../flink/source/BaseDataTableSource.java | 45 ++++++++++--------- .../paimon/flink/source/DataTableSource.java | 8 ++-- .../paimon/flink/BatchFileStoreITCase.java | 24 ++++++++-- .../FileStoreTableStatisticsTestBase.java | 6 +-- .../PrimaryKeyTableStatisticsTest.java | 2 +- 6 files changed, 57 insertions(+), 36 deletions(-) diff --git a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java index ee00d41832cd..f41f8da6c820 100644 --- a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java +++ b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/source/DataTableSource.java @@ -51,7 +51,7 @@ public DataTableSource( null, null, null, - false); + null); } public DataTableSource( @@ -64,7 +64,7 @@ public DataTableSource( @Nullable int[][] projectFields, @Nullable Long limit, @Nullable WatermarkStrategy watermarkStrategy, - boolean isBatchCountStar) { + @Nullable Long countPushed) { super( tableIdentifier, table, @@ -75,7 +75,7 @@ public DataTableSource( projectFields, limit, watermarkStrategy, - isBatchCountStar); + countPushed); } @Override @@ -90,7 +90,7 @@ public DataTableSource copy() { projectFields, limit, watermarkStrategy, - isBatchCountStar); + countPushed); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java index 5dbbdcedd82a..a94d799773bc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java @@ -28,7 +28,6 @@ import org.apache.paimon.flink.log.LogStoreTableFactory; import org.apache.paimon.flink.lookup.FileStoreLookupFunction; import org.apache.paimon.flink.lookup.LookupRuntimeProviderFactory; -import org.apache.paimon.manifest.PartitionEntry; import org.apache.paimon.options.ConfigOption; import org.apache.paimon.options.Options; import org.apache.paimon.predicate.Predicate; @@ -36,7 +35,8 @@ import org.apache.paimon.table.DataTable; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; -import org.apache.paimon.table.source.TableScan; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.Split; import org.apache.paimon.utils.Projection; import org.apache.flink.api.common.eventtime.WatermarkStrategy; @@ -74,6 +74,7 @@ import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL; import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_EMIT_STRATEGY; import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT; +import static org.apache.paimon.utils.Preconditions.checkNotNull; /** * Table source to create {@link StaticFileStoreSource} or {@link ContinuousFileStoreSource} under @@ -98,7 +99,7 @@ public abstract class BaseDataTableSource extends FlinkTableSource @Nullable protected final LogStoreTableFactory logStoreTableFactory; @Nullable protected WatermarkStrategy watermarkStrategy; - protected boolean isBatchCountStar; + @Nullable protected Long countPushed; public BaseDataTableSource( ObjectIdentifier tableIdentifier, @@ -110,7 +111,7 @@ public BaseDataTableSource( @Nullable int[][] projectFields, @Nullable Long limit, @Nullable WatermarkStrategy watermarkStrategy, - boolean isBatchCountStar) { + @Nullable Long countPushed) { super(table, predicate, projectFields, limit); this.tableIdentifier = tableIdentifier; this.streaming = streaming; @@ -120,7 +121,7 @@ public BaseDataTableSource( this.projectFields = projectFields; this.limit = limit; this.watermarkStrategy = watermarkStrategy; - this.isBatchCountStar = isBatchCountStar; + this.countPushed = countPushed; } @Override @@ -159,7 +160,7 @@ public ChangelogMode getChangelogMode() { @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { - if (isBatchCountStar) { + if (countPushed != null) { return createCountStarScan(); } @@ -212,10 +213,8 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) { } private ScanRuntimeProvider createCountStarScan() { - TableScan scan = table.newReadBuilder().withFilter(predicate).newScan(); - List partitionEntries = scan.listPartitionEntries(); - long rowCount = partitionEntries.stream().mapToLong(PartitionEntry::recordCount).sum(); - NumberSequenceRowSource source = new NumberSequenceRowSource(rowCount, rowCount); + checkNotNull(countPushed); + NumberSequenceRowSource source = new NumberSequenceRowSource(countPushed, countPushed); return new SourceProvider() { @Override public Source createSource() { @@ -303,15 +302,6 @@ public boolean applyAggregates( return false; } - if (!table.primaryKeys().isEmpty()) { - return false; - } - - CoreOptions options = ((DataTable) table).coreOptions(); - if (options.deletionVectorsEnabled()) { - return false; - } - if (groupingSets.size() != 1) { return false; } @@ -334,7 +324,22 @@ public boolean applyAggregates( return false; } - isBatchCountStar = true; + List splits = + table.newReadBuilder().dropStats().withFilter(predicate).newScan().plan().splits(); + long countPushed = 0; + for (Split s : splits) { + if (!(s instanceof DataSplit)) { + return false; + } + DataSplit split = (DataSplit) s; + if (!split.mergedRowCountAvailable()) { + return false; + } + + countPushed += split.mergedRowCount(); + } + + this.countPushed = countPushed; return true; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java index 53a1b5f63083..2b470cb4383a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/DataTableSource.java @@ -70,7 +70,7 @@ public DataTableSource( null, null, null, - false); + null); } public DataTableSource( @@ -84,7 +84,7 @@ public DataTableSource( @Nullable Long limit, @Nullable WatermarkStrategy watermarkStrategy, @Nullable List dynamicPartitionFilteringFields, - boolean isBatchCountStar) { + @Nullable Long countPushed) { super( tableIdentifier, table, @@ -95,7 +95,7 @@ public DataTableSource( projectFields, limit, watermarkStrategy, - isBatchCountStar); + countPushed); this.dynamicPartitionFilteringFields = dynamicPartitionFilteringFields; } @@ -112,7 +112,7 @@ public DataTableSource copy() { limit, watermarkStrategy, dynamicPartitionFilteringFields, - isBatchCountStar); + countPushed); } @Override diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java index cdc114b048a1..d48b6e771236 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java @@ -561,19 +561,35 @@ public void testCountStarAppendWithDv() { String sql = "SELECT COUNT(*) FROM count_append_dv"; assertThat(sql(sql)).containsOnly(Row.of(2L)); - validateCount1NotPushDown(sql); + validateCount1PushDown(sql); } @Test public void testCountStarPK() { - sql("CREATE TABLE count_pk (f0 INT PRIMARY KEY NOT ENFORCED, f1 STRING)"); - sql("INSERT INTO count_pk VALUES (1, 'a'), (2, 'b')"); + sql( + "CREATE TABLE count_pk (f0 INT PRIMARY KEY NOT ENFORCED, f1 STRING) WITH ('file.format' = 'avro')"); + sql("INSERT INTO count_pk VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')"); + sql("INSERT INTO count_pk VALUES (1, 'e')"); String sql = "SELECT COUNT(*) FROM count_pk"; - assertThat(sql(sql)).containsOnly(Row.of(2L)); + assertThat(sql(sql)).containsOnly(Row.of(4L)); validateCount1NotPushDown(sql); } + @Test + public void testCountStarPKDv() { + sql( + "CREATE TABLE count_pk_dv (f0 INT PRIMARY KEY NOT ENFORCED, f1 STRING) WITH (" + + "'file.format' = 'avro', " + + "'deletion-vectors.enabled' = 'true')"); + sql("INSERT INTO count_pk_dv VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')"); + sql("INSERT INTO count_pk_dv VALUES (1, 'e')"); + + String sql = "SELECT COUNT(*) FROM count_pk_dv"; + assertThat(sql(sql)).containsOnly(Row.of(4L)); + validateCount1PushDown(sql); + } + @Test public void testParquetRowDecimalAndTimestamp() { sql( diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java index 826bf28d1248..42a47ea1e298 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/FileStoreTableStatisticsTestBase.java @@ -153,7 +153,7 @@ public void testTableFilterPartitionStatistics() throws Exception { null, null, null, - false); + null); Assertions.assertThat(partitionFilterSource.reportStatistics().getRowCount()).isEqualTo(5L); Map> colStatsMap = new HashMap<>(); colStatsMap.put("pt", ColStats.newColStats(0, 1L, 1, 2, 0L, null, null)); @@ -232,7 +232,7 @@ public void testTableFilterKeyStatistics() throws Exception { null, null, null, - false); + null); Assertions.assertThat(keyFilterSource.reportStatistics().getRowCount()).isEqualTo(2L); Map> colStatsMap = new HashMap<>(); colStatsMap.put("pt", ColStats.newColStats(0, 1L, 2, 2, 0L, null, null)); @@ -311,7 +311,7 @@ public void testTableFilterValueStatistics() throws Exception { null, null, null, - false); + null); Assertions.assertThat(keyFilterSource.reportStatistics().getRowCount()).isEqualTo(4L); Map> colStatsMap = new HashMap<>(); colStatsMap.put("pt", ColStats.newColStats(0, 4L, 2, 2, 0L, null, null)); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java index f5d4121672b0..ea47df2d9d72 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/statistics/PrimaryKeyTableStatisticsTest.java @@ -52,7 +52,7 @@ public void testTableFilterValueStatistics() throws Exception { null, null, null, - false); + null); Assertions.assertThat(keyFilterSource.reportStatistics().getRowCount()).isEqualTo(9L); // TODO validate column statistics }