Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ private static <T extends Scan<T, FileScanTask, CombinedScanTask>> T refineScanW
refinedScan = refinedScan.includeColumnStats();
}

if (context.includeStatsForColumns() != null) {
refinedScan = refinedScan.includeColumnStats(context.includeStatsForColumns());
}

refinedScan = refinedScan.option(TableProperties.SPLIT_SIZE, context.splitSize().toString());

refinedScan =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.Serializable;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.flink.annotation.Internal;
Expand Down Expand Up @@ -62,6 +63,7 @@ public class ScanContext implements Serializable {
private final List<Expression> filters;
private final long limit;
private final boolean includeColumnStats;
private final Collection<String> includeStatsForColumns;
private final Integer planParallelism;
private final int maxPlanningSnapshotCount;
private final int maxAllowedPlanningFailures;
Expand All @@ -84,6 +86,7 @@ private ScanContext(
List<Expression> filters,
long limit,
boolean includeColumnStats,
Collection<String> includeStatsForColumns,
boolean exposeLocality,
Integer planParallelism,
int maxPlanningSnapshotCount,
Expand Down Expand Up @@ -114,6 +117,7 @@ private ScanContext(
this.filters = filters;
this.limit = limit;
this.includeColumnStats = includeColumnStats;
this.includeStatsForColumns = includeStatsForColumns;
this.exposeLocality = exposeLocality;
this.planParallelism = planParallelism;
this.maxPlanningSnapshotCount = maxPlanningSnapshotCount;
Expand Down Expand Up @@ -248,6 +252,10 @@ public boolean includeColumnStats() {
return includeColumnStats;
}

public Collection<String> includeStatsForColumns() {
return includeStatsForColumns;
}

public boolean exposeLocality() {
return exposeLocality;
}
Expand Down Expand Up @@ -285,6 +293,7 @@ public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSn
.filters(filters)
.limit(limit)
.includeColumnStats(includeColumnStats)
.includeColumnStats(includeStatsForColumns)
.exposeLocality(exposeLocality)
.planParallelism(planParallelism)
.maxPlanningSnapshotCount(maxPlanningSnapshotCount)
Expand Down Expand Up @@ -313,6 +322,7 @@ public ScanContext copyWithSnapshotId(long newSnapshotId) {
.filters(filters)
.limit(limit)
.includeColumnStats(includeColumnStats)
.includeColumnStats(includeStatsForColumns)
.exposeLocality(exposeLocality)
.planParallelism(planParallelism)
.maxPlanningSnapshotCount(maxPlanningSnapshotCount)
Expand Down Expand Up @@ -349,6 +359,7 @@ public static class Builder {
private long limit = FlinkReadOptions.LIMIT_OPTION.defaultValue();
private boolean includeColumnStats =
FlinkReadOptions.INCLUDE_COLUMN_STATS_OPTION.defaultValue();
private Collection<String> includeStatsForColumns = null;
private boolean exposeLocality;
private Integer planParallelism =
FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue();
Expand Down Expand Up @@ -464,6 +475,11 @@ public Builder includeColumnStats(boolean newIncludeColumnStats) {
return this;
}

public Builder includeColumnStats(Collection<String> newIncludeStatsForColumns) {
this.includeStatsForColumns = newIncludeStatsForColumns;
return this;
}

public Builder exposeLocality(boolean newExposeLocality) {
this.exposeLocality = newExposeLocality;
return this;
Expand Down Expand Up @@ -531,6 +547,7 @@ public ScanContext build() {
filters,
limit,
includeColumnStats,
includeStatsForColumns,
exposeLocality,
planParallelism,
maxPlanningSnapshotCount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private void appendTwoSnapshots() throws IOException {
}

/** @return the last enumerated snapshot id */
private IcebergEnumeratorPosition verifyOneCycle(
private CycleResult verifyOneCycle(
ContinuousSplitPlannerImpl splitPlanner, IcebergEnumeratorPosition lastPosition)
throws Exception {
List<Record> batch =
Expand All @@ -106,7 +106,7 @@ private IcebergEnumeratorPosition verifyOneCycle(
Assert.assertEquals(
dataFile.path().toString(),
Iterables.getOnlyElement(split.task().files()).file().path().toString());
return result.toPosition();
return new CycleResult(result.toPosition(), split);
}

@Test
Expand Down Expand Up @@ -135,7 +135,7 @@ public void testTableScanThenIncrementalWithEmptyTable() throws Exception {
// next 3 snapshots
IcebergEnumeratorPosition lastPosition = emptyTableSecondDiscoveryResult.toPosition();
for (int i = 0; i < 3; ++i) {
lastPosition = verifyOneCycle(splitPlanner, lastPosition);
lastPosition = verifyOneCycle(splitPlanner, lastPosition).lastPosition;
}
}

Expand Down Expand Up @@ -169,7 +169,7 @@ public void testTableScanThenIncrementalWithNonEmptyTable() throws Exception {

IcebergEnumeratorPosition lastPosition = initialResult.toPosition();
for (int i = 0; i < 3; ++i) {
lastPosition = verifyOneCycle(splitPlanner, lastPosition);
lastPosition = verifyOneCycle(splitPlanner, lastPosition).lastPosition;
}
}

Expand Down Expand Up @@ -206,7 +206,7 @@ public void testIncrementalFromLatestSnapshotWithEmptyTable() throws Exception {
// next 3 snapshots
IcebergEnumeratorPosition lastPosition = afterTwoSnapshotsAppended.toPosition();
for (int i = 0; i < 3; ++i) {
lastPosition = verifyOneCycle(splitPlanner, lastPosition);
lastPosition = verifyOneCycle(splitPlanner, lastPosition).lastPosition;
}
}

Expand Down Expand Up @@ -251,7 +251,7 @@ public void testIncrementalFromLatestSnapshotWithNonEmptyTable() throws Exceptio

IcebergEnumeratorPosition lastPosition = secondResult.toPosition();
for (int i = 0; i < 3; ++i) {
lastPosition = verifyOneCycle(splitPlanner, lastPosition);
lastPosition = verifyOneCycle(splitPlanner, lastPosition).lastPosition;
}
}

Expand Down Expand Up @@ -281,7 +281,7 @@ public void testIncrementalFromEarliestSnapshotWithEmptyTable() throws Exception
// next 3 snapshots
IcebergEnumeratorPosition lastPosition = emptyTableSecondDiscoveryResult.toPosition();
for (int i = 0; i < 3; ++i) {
lastPosition = verifyOneCycle(splitPlanner, lastPosition);
lastPosition = verifyOneCycle(splitPlanner, lastPosition).lastPosition;
}
}

Expand Down Expand Up @@ -323,12 +323,12 @@ public void testIncrementalFromEarliestSnapshotWithNonEmptyTable() throws Except

IcebergEnumeratorPosition lastPosition = secondResult.toPosition();
for (int i = 0; i < 3; ++i) {
lastPosition = verifyOneCycle(splitPlanner, lastPosition);
lastPosition = verifyOneCycle(splitPlanner, lastPosition).lastPosition;
}
}

@Test
public void testIncrementalFromSnapshotIdWithEmptyTable() throws Exception {
public void testIncrementalFromSnapshotIdWithEmptyTable() {
ScanContext scanContextWithInvalidSnapshotId =
ScanContext.builder()
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID)
Expand Down Expand Up @@ -409,12 +409,12 @@ public void testIncrementalFromSnapshotId() throws Exception {

IcebergEnumeratorPosition lastPosition = secondResult.toPosition();
for (int i = 0; i < 3; ++i) {
lastPosition = verifyOneCycle(splitPlanner, lastPosition);
lastPosition = verifyOneCycle(splitPlanner, lastPosition).lastPosition;
}
}

@Test
public void testIncrementalFromSnapshotTimestampWithEmptyTable() throws Exception {
public void testIncrementalFromSnapshotTimestampWithEmptyTable() {
ScanContext scanContextWithInvalidSnapshotId =
ScanContext.builder()
.startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP)
Expand Down Expand Up @@ -489,7 +489,7 @@ public void testIncrementalFromSnapshotTimestamp() throws Exception {

IcebergEnumeratorPosition lastPosition = secondResult.toPosition();
for (int i = 0; i < 3; ++i) {
lastPosition = verifyOneCycle(splitPlanner, lastPosition);
lastPosition = verifyOneCycle(splitPlanner, lastPosition).lastPosition;
}
}

Expand Down Expand Up @@ -529,6 +529,115 @@ public void testMaxPlanningSnapshotCount() throws Exception {
thirdResult, snapshot1, snapshot2, ImmutableSet.of(dataFile2.path().toString()));
}

@Test
public void testTableScanNoStats() throws Exception {
appendTwoSnapshots();

ScanContext scanContext =
ScanContext.builder()
.includeColumnStats(false)
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.build();
ContinuousSplitPlannerImpl splitPlanner =
new ContinuousSplitPlannerImpl(tableResource.tableLoader().clone(), scanContext, null);

ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null);
Assert.assertEquals(1, initialResult.splits().size());
IcebergSourceSplit split = Iterables.getOnlyElement(initialResult.splits());
Assert.assertEquals(2, split.task().files().size());
verifyStatCount(split, 0);

IcebergEnumeratorPosition lastPosition = initialResult.toPosition();
for (int i = 0; i < 3; ++i) {
CycleResult result = verifyOneCycle(splitPlanner, lastPosition);
verifyStatCount(result.split, 0);
lastPosition = result.lastPosition;
}
}

@Test
public void testTableScanAllStats() throws Exception {
appendTwoSnapshots();

ScanContext scanContext =
ScanContext.builder()
.includeColumnStats(true)
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.build();
ContinuousSplitPlannerImpl splitPlanner =
new ContinuousSplitPlannerImpl(tableResource.tableLoader().clone(), scanContext, null);

ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null);
Assert.assertEquals(1, initialResult.splits().size());
IcebergSourceSplit split = Iterables.getOnlyElement(initialResult.splits());
Assert.assertEquals(2, split.task().files().size());
verifyStatCount(split, 3);

IcebergEnumeratorPosition lastPosition = initialResult.toPosition();
for (int i = 0; i < 3; ++i) {
CycleResult result = verifyOneCycle(splitPlanner, lastPosition);
verifyStatCount(result.split, 3);
lastPosition = result.lastPosition;
}
}

@Test
public void testTableScanSingleStat() throws Exception {
appendTwoSnapshots();

ScanContext scanContext =
ScanContext.builder()
.includeColumnStats(ImmutableSet.of("data"))
.startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL)
.build();
ContinuousSplitPlannerImpl splitPlanner =
new ContinuousSplitPlannerImpl(tableResource.tableLoader().clone(), scanContext, null);

ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null);
Assert.assertEquals(1, initialResult.splits().size());
IcebergSourceSplit split = Iterables.getOnlyElement(initialResult.splits());
Assert.assertEquals(2, split.task().files().size());
verifyStatCount(split, 1);

IcebergEnumeratorPosition lastPosition = initialResult.toPosition();
for (int i = 0; i < 3; ++i) {
CycleResult result = verifyOneCycle(splitPlanner, lastPosition);
verifyStatCount(result.split, 1);
lastPosition = result.lastPosition;
}
}

private void verifyStatCount(IcebergSourceSplit split, int expected) {
if (expected == 0) {
split
.task()
.files()
.forEach(
f -> {
Assert.assertNull(f.file().valueCounts());
Assert.assertNull(f.file().columnSizes());
Assert.assertNull(f.file().lowerBounds());
Assert.assertNull(f.file().upperBounds());
Assert.assertNull(f.file().nanValueCounts());
Assert.assertNull(f.file().nullValueCounts());
});
} else {
split
.task()
.files()
.forEach(
f -> {
Assert.assertEquals(expected, f.file().valueCounts().size());
Assert.assertEquals(expected, f.file().columnSizes().size());
Assert.assertEquals(expected, f.file().lowerBounds().size());
Assert.assertEquals(expected, f.file().upperBounds().size());
Assert.assertEquals(expected, f.file().nullValueCounts().size());
// The nanValue is not counted for long and string fields
Assert.assertEquals(0, f.file().nanValueCounts().size());
});
}
}

private void verifyMaxPlanningSnapshotCountResult(
ContinuousEnumerationResult result,
Snapshot fromSnapshotExclusive,
Expand Down Expand Up @@ -566,4 +675,14 @@ private Snapshot appendSnapshot(long seed, int numRecords) throws Exception {
dataAppender.appendToTable(dataFile);
return tableResource.table().currentSnapshot();
}

private static class CycleResult {
IcebergEnumeratorPosition lastPosition;
IcebergSourceSplit split;

CycleResult(IcebergEnumeratorPosition lastPosition, IcebergSourceSplit split) {
this.lastPosition = lastPosition;
this.split = split;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ private static <T extends Scan<T, FileScanTask, CombinedScanTask>> T refineScanW
refinedScan = refinedScan.includeColumnStats();
}

if (context.includeStatsForColumns() != null) {
refinedScan = refinedScan.includeColumnStats(context.includeStatsForColumns());
}

refinedScan = refinedScan.option(TableProperties.SPLIT_SIZE, context.splitSize().toString());

refinedScan =
Expand Down
Loading