Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public DataTableSource(
null,
null,
null,
false);
null);
}

public DataTableSource(
Expand All @@ -64,7 +64,7 @@ public DataTableSource(
@Nullable int[][] projectFields,
@Nullable Long limit,
@Nullable WatermarkStrategy<RowData> watermarkStrategy,
boolean isBatchCountStar) {
@Nullable Long countPushed) {
super(
tableIdentifier,
table,
Expand All @@ -75,7 +75,7 @@ public DataTableSource(
projectFields,
limit,
watermarkStrategy,
isBatchCountStar);
countPushed);
}

@Override
Expand All @@ -90,7 +90,7 @@ public DataTableSource copy() {
projectFields,
limit,
watermarkStrategy,
isBatchCountStar);
countPushed);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,15 @@
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.flink.lookup.FileStoreLookupFunction;
import org.apache.paimon.flink.lookup.LookupRuntimeProviderFactory;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.utils.Projection;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
Expand Down Expand Up @@ -74,6 +74,7 @@
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL;
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_EMIT_STRATEGY;
import static org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

/**
* Table source to create {@link StaticFileStoreSource} or {@link ContinuousFileStoreSource} under
Expand All @@ -98,7 +99,7 @@ public abstract class BaseDataTableSource extends FlinkTableSource
@Nullable protected final LogStoreTableFactory logStoreTableFactory;

@Nullable protected WatermarkStrategy<RowData> watermarkStrategy;
protected boolean isBatchCountStar;
@Nullable protected Long countPushed;

public BaseDataTableSource(
ObjectIdentifier tableIdentifier,
Expand All @@ -110,7 +111,7 @@ public BaseDataTableSource(
@Nullable int[][] projectFields,
@Nullable Long limit,
@Nullable WatermarkStrategy<RowData> watermarkStrategy,
boolean isBatchCountStar) {
@Nullable Long countPushed) {
super(table, predicate, projectFields, limit);
this.tableIdentifier = tableIdentifier;
this.streaming = streaming;
Expand All @@ -120,7 +121,7 @@ public BaseDataTableSource(
this.projectFields = projectFields;
this.limit = limit;
this.watermarkStrategy = watermarkStrategy;
this.isBatchCountStar = isBatchCountStar;
this.countPushed = countPushed;
}

@Override
Expand Down Expand Up @@ -159,7 +160,7 @@ public ChangelogMode getChangelogMode() {

@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
if (isBatchCountStar) {
if (countPushed != null) {
return createCountStarScan();
}

Expand Down Expand Up @@ -212,10 +213,8 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
}

private ScanRuntimeProvider createCountStarScan() {
TableScan scan = table.newReadBuilder().withFilter(predicate).newScan();
List<PartitionEntry> partitionEntries = scan.listPartitionEntries();
long rowCount = partitionEntries.stream().mapToLong(PartitionEntry::recordCount).sum();
NumberSequenceRowSource source = new NumberSequenceRowSource(rowCount, rowCount);
checkNotNull(countPushed);
NumberSequenceRowSource source = new NumberSequenceRowSource(countPushed, countPushed);
return new SourceProvider() {
@Override
public Source<RowData, ?, ?> createSource() {
Expand Down Expand Up @@ -303,15 +302,6 @@ public boolean applyAggregates(
return false;
}

if (!table.primaryKeys().isEmpty()) {
return false;
}

CoreOptions options = ((DataTable) table).coreOptions();
if (options.deletionVectorsEnabled()) {
return false;
}

if (groupingSets.size() != 1) {
return false;
}
Expand All @@ -334,7 +324,22 @@ public boolean applyAggregates(
return false;
}

isBatchCountStar = true;
List<Split> splits =
table.newReadBuilder().dropStats().withFilter(predicate).newScan().plan().splits();
long countPushed = 0;
for (Split s : splits) {
if (!(s instanceof DataSplit)) {
return false;
}
DataSplit split = (DataSplit) s;
if (!split.mergedRowCountAvailable()) {
return false;
}

countPushed += split.mergedRowCount();
}

this.countPushed = countPushed;
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public DataTableSource(
null,
null,
null,
false);
null);
}

public DataTableSource(
Expand All @@ -84,7 +84,7 @@ public DataTableSource(
@Nullable Long limit,
@Nullable WatermarkStrategy<RowData> watermarkStrategy,
@Nullable List<String> dynamicPartitionFilteringFields,
boolean isBatchCountStar) {
@Nullable Long countPushed) {
super(
tableIdentifier,
table,
Expand All @@ -95,7 +95,7 @@ public DataTableSource(
projectFields,
limit,
watermarkStrategy,
isBatchCountStar);
countPushed);
this.dynamicPartitionFilteringFields = dynamicPartitionFilteringFields;
}

Expand All @@ -112,7 +112,7 @@ public DataTableSource copy() {
limit,
watermarkStrategy,
dynamicPartitionFilteringFields,
isBatchCountStar);
countPushed);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,19 +561,35 @@ public void testCountStarAppendWithDv() {

String sql = "SELECT COUNT(*) FROM count_append_dv";
assertThat(sql(sql)).containsOnly(Row.of(2L));
validateCount1NotPushDown(sql);
validateCount1PushDown(sql);
}

@Test
public void testCountStarPK() {
sql("CREATE TABLE count_pk (f0 INT PRIMARY KEY NOT ENFORCED, f1 STRING)");
sql("INSERT INTO count_pk VALUES (1, 'a'), (2, 'b')");
sql(
"CREATE TABLE count_pk (f0 INT PRIMARY KEY NOT ENFORCED, f1 STRING) WITH ('file.format' = 'avro')");
sql("INSERT INTO count_pk VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')");
sql("INSERT INTO count_pk VALUES (1, 'e')");

String sql = "SELECT COUNT(*) FROM count_pk";
assertThat(sql(sql)).containsOnly(Row.of(2L));
assertThat(sql(sql)).containsOnly(Row.of(4L));
validateCount1NotPushDown(sql);
}

@Test
public void testCountStarPKDv() {
sql(
"CREATE TABLE count_pk_dv (f0 INT PRIMARY KEY NOT ENFORCED, f1 STRING) WITH ("
+ "'file.format' = 'avro', "
+ "'deletion-vectors.enabled' = 'true')");
sql("INSERT INTO count_pk_dv VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd')");
sql("INSERT INTO count_pk_dv VALUES (1, 'e')");

String sql = "SELECT COUNT(*) FROM count_pk_dv";
assertThat(sql(sql)).containsOnly(Row.of(4L));
validateCount1PushDown(sql);
}

@Test
public void testParquetRowDecimalAndTimestamp() {
sql(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ public void testTableFilterPartitionStatistics() throws Exception {
null,
null,
null,
false);
null);
Assertions.assertThat(partitionFilterSource.reportStatistics().getRowCount()).isEqualTo(5L);
Map<String, ColStats<?>> colStatsMap = new HashMap<>();
colStatsMap.put("pt", ColStats.newColStats(0, 1L, 1, 2, 0L, null, null));
Expand Down Expand Up @@ -232,7 +232,7 @@ public void testTableFilterKeyStatistics() throws Exception {
null,
null,
null,
false);
null);
Assertions.assertThat(keyFilterSource.reportStatistics().getRowCount()).isEqualTo(2L);
Map<String, ColStats<?>> colStatsMap = new HashMap<>();
colStatsMap.put("pt", ColStats.newColStats(0, 1L, 2, 2, 0L, null, null));
Expand Down Expand Up @@ -311,7 +311,7 @@ public void testTableFilterValueStatistics() throws Exception {
null,
null,
null,
false);
null);
Assertions.assertThat(keyFilterSource.reportStatistics().getRowCount()).isEqualTo(4L);
Map<String, ColStats<?>> colStatsMap = new HashMap<>();
colStatsMap.put("pt", ColStats.newColStats(0, 4L, 2, 2, 0L, null, null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void testTableFilterValueStatistics() throws Exception {
null,
null,
null,
false);
null);
Assertions.assertThat(keyFilterSource.reportStatistics().getRowCount()).isEqualTo(9L);
// TODO validate column statistics
}
Expand Down
Loading