From e29f59a0690963cc19d684bd1687ebd28d1f840e Mon Sep 17 00:00:00 2001 From: Peter Vary Date: Thu, 23 Nov 2023 14:07:10 +0100 Subject: [PATCH] Flink: Backport #8803 to v1.16 and v1.15 --- .../flink/source/FlinkSplitPlanner.java | 4 + .../iceberg/flink/source/ScanContext.java | 17 +++ .../TestContinuousSplitPlannerImpl.java | 143 ++++++++++++++++-- .../flink/source/FlinkSplitPlanner.java | 4 + .../iceberg/flink/source/ScanContext.java | 17 +++ .../TestContinuousSplitPlannerImpl.java | 143 ++++++++++++++++-- 6 files changed, 304 insertions(+), 24 deletions(-) diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java index ea317e93d8ba..15078809714f 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java @@ -165,6 +165,10 @@ private static > T refineScanW refinedScan = refinedScan.includeColumnStats(); } + if (context.includeStatsForColumns() != null) { + refinedScan = refinedScan.includeColumnStats(context.includeStatsForColumns()); + } + refinedScan = refinedScan.option(TableProperties.SPLIT_SIZE, context.splitSize().toString()); refinedScan = diff --git a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java index e380204e871f..4357b1f57df6 100644 --- a/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java +++ b/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java @@ -20,6 +20,7 @@ import java.io.Serializable; import java.time.Duration; +import java.util.Collection; import java.util.List; import java.util.Map; import org.apache.flink.annotation.Internal; @@ -62,6 +63,7 @@ public class ScanContext implements Serializable { private final List filters; private final long limit; private final boolean includeColumnStats; + private final Collection includeStatsForColumns; private final Integer planParallelism; private final int maxPlanningSnapshotCount; private final int maxAllowedPlanningFailures; @@ -84,6 +86,7 @@ private ScanContext( List filters, long limit, boolean includeColumnStats, + Collection includeStatsForColumns, boolean exposeLocality, Integer planParallelism, int maxPlanningSnapshotCount, @@ -114,6 +117,7 @@ private ScanContext( this.filters = filters; this.limit = limit; this.includeColumnStats = includeColumnStats; + this.includeStatsForColumns = includeStatsForColumns; this.exposeLocality = exposeLocality; this.planParallelism = planParallelism; this.maxPlanningSnapshotCount = maxPlanningSnapshotCount; @@ -248,6 +252,10 @@ public boolean includeColumnStats() { return includeColumnStats; } + public Collection includeStatsForColumns() { + return includeStatsForColumns; + } + public boolean exposeLocality() { return exposeLocality; } @@ -285,6 +293,7 @@ public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSn .filters(filters) .limit(limit) .includeColumnStats(includeColumnStats) + .includeColumnStats(includeStatsForColumns) .exposeLocality(exposeLocality) .planParallelism(planParallelism) .maxPlanningSnapshotCount(maxPlanningSnapshotCount) @@ -313,6 +322,7 @@ public ScanContext copyWithSnapshotId(long newSnapshotId) { .filters(filters) .limit(limit) .includeColumnStats(includeColumnStats) + .includeColumnStats(includeStatsForColumns) .exposeLocality(exposeLocality) .planParallelism(planParallelism) .maxPlanningSnapshotCount(maxPlanningSnapshotCount) @@ -349,6 +359,7 @@ public static class Builder { private long limit = FlinkReadOptions.LIMIT_OPTION.defaultValue(); private boolean includeColumnStats = FlinkReadOptions.INCLUDE_COLUMN_STATS_OPTION.defaultValue(); + private Collection includeStatsForColumns = null; private boolean exposeLocality; private Integer planParallelism = FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue(); @@ -464,6 +475,11 @@ public Builder includeColumnStats(boolean newIncludeColumnStats) { return this; } + public Builder includeColumnStats(Collection newIncludeStatsForColumns) { + this.includeStatsForColumns = newIncludeStatsForColumns; + return this; + } + public Builder exposeLocality(boolean newExposeLocality) { this.exposeLocality = newExposeLocality; return this; @@ -531,6 +547,7 @@ public ScanContext build() { filters, limit, includeColumnStats, + includeStatsForColumns, exposeLocality, planParallelism, maxPlanningSnapshotCount, diff --git a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java index 85174b4ab273..bb747caae589 100644 --- a/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java +++ b/flink/v1.15/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java @@ -84,7 +84,7 @@ private void appendTwoSnapshots() throws IOException { } /** @return the last enumerated snapshot id */ - private IcebergEnumeratorPosition verifyOneCycle( + private CycleResult verifyOneCycle( ContinuousSplitPlannerImpl splitPlanner, IcebergEnumeratorPosition lastPosition) throws Exception { List batch = @@ -106,7 +106,7 @@ private IcebergEnumeratorPosition verifyOneCycle( Assert.assertEquals( dataFile.path().toString(), Iterables.getOnlyElement(split.task().files()).file().path().toString()); - return result.toPosition(); + return new CycleResult(result.toPosition(), split); } @Test @@ -135,7 +135,7 @@ public void testTableScanThenIncrementalWithEmptyTable() throws Exception { // next 3 snapshots IcebergEnumeratorPosition lastPosition = emptyTableSecondDiscoveryResult.toPosition(); for (int i = 0; i < 3; ++i) { - lastPosition = verifyOneCycle(splitPlanner, lastPosition); + lastPosition = verifyOneCycle(splitPlanner, lastPosition).lastPosition; } } @@ -169,7 +169,7 @@ public void testTableScanThenIncrementalWithNonEmptyTable() throws Exception { IcebergEnumeratorPosition lastPosition = initialResult.toPosition(); for (int i = 0; i < 3; ++i) { - lastPosition = verifyOneCycle(splitPlanner, lastPosition); + lastPosition = verifyOneCycle(splitPlanner, lastPosition).lastPosition; } } @@ -206,7 +206,7 @@ public void testIncrementalFromLatestSnapshotWithEmptyTable() throws Exception { // next 3 snapshots IcebergEnumeratorPosition lastPosition = afterTwoSnapshotsAppended.toPosition(); for (int i = 0; i < 3; ++i) { - lastPosition = verifyOneCycle(splitPlanner, lastPosition); + lastPosition = verifyOneCycle(splitPlanner, lastPosition).lastPosition; } } @@ -251,7 +251,7 @@ public void testIncrementalFromLatestSnapshotWithNonEmptyTable() throws Exceptio IcebergEnumeratorPosition lastPosition = secondResult.toPosition(); for (int i = 0; i < 3; ++i) { - lastPosition = verifyOneCycle(splitPlanner, lastPosition); + lastPosition = verifyOneCycle(splitPlanner, lastPosition).lastPosition; } } @@ -281,7 +281,7 @@ public void testIncrementalFromEarliestSnapshotWithEmptyTable() throws Exception // next 3 snapshots IcebergEnumeratorPosition lastPosition = emptyTableSecondDiscoveryResult.toPosition(); for (int i = 0; i < 3; ++i) { - lastPosition = verifyOneCycle(splitPlanner, lastPosition); + lastPosition = verifyOneCycle(splitPlanner, lastPosition).lastPosition; } } @@ -323,12 +323,12 @@ public void testIncrementalFromEarliestSnapshotWithNonEmptyTable() throws Except IcebergEnumeratorPosition lastPosition = secondResult.toPosition(); for (int i = 0; i < 3; ++i) { - lastPosition = verifyOneCycle(splitPlanner, lastPosition); + lastPosition = verifyOneCycle(splitPlanner, lastPosition).lastPosition; } } @Test - public void testIncrementalFromSnapshotIdWithEmptyTable() throws Exception { + public void testIncrementalFromSnapshotIdWithEmptyTable() { ScanContext scanContextWithInvalidSnapshotId = ScanContext.builder() .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID) @@ -409,12 +409,12 @@ public void testIncrementalFromSnapshotId() throws Exception { IcebergEnumeratorPosition lastPosition = secondResult.toPosition(); for (int i = 0; i < 3; ++i) { - lastPosition = verifyOneCycle(splitPlanner, lastPosition); + lastPosition = verifyOneCycle(splitPlanner, lastPosition).lastPosition; } } @Test - public void testIncrementalFromSnapshotTimestampWithEmptyTable() throws Exception { + public void testIncrementalFromSnapshotTimestampWithEmptyTable() { ScanContext scanContextWithInvalidSnapshotId = ScanContext.builder() .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP) @@ -489,7 +489,7 @@ public void testIncrementalFromSnapshotTimestamp() throws Exception { IcebergEnumeratorPosition lastPosition = secondResult.toPosition(); for (int i = 0; i < 3; ++i) { - lastPosition = verifyOneCycle(splitPlanner, lastPosition); + lastPosition = verifyOneCycle(splitPlanner, lastPosition).lastPosition; } } @@ -529,6 +529,115 @@ public void testMaxPlanningSnapshotCount() throws Exception { thirdResult, snapshot1, snapshot2, ImmutableSet.of(dataFile2.path().toString())); } + @Test + public void testTableScanNoStats() throws Exception { + appendTwoSnapshots(); + + ScanContext scanContext = + ScanContext.builder() + .includeColumnStats(false) + .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) + .build(); + ContinuousSplitPlannerImpl splitPlanner = + new ContinuousSplitPlannerImpl(tableResource.tableLoader().clone(), scanContext, null); + + ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null); + Assert.assertEquals(1, initialResult.splits().size()); + IcebergSourceSplit split = Iterables.getOnlyElement(initialResult.splits()); + Assert.assertEquals(2, split.task().files().size()); + verifyStatCount(split, 0); + + IcebergEnumeratorPosition lastPosition = initialResult.toPosition(); + for (int i = 0; i < 3; ++i) { + CycleResult result = verifyOneCycle(splitPlanner, lastPosition); + verifyStatCount(result.split, 0); + lastPosition = result.lastPosition; + } + } + + @Test + public void testTableScanAllStats() throws Exception { + appendTwoSnapshots(); + + ScanContext scanContext = + ScanContext.builder() + .includeColumnStats(true) + .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) + .build(); + ContinuousSplitPlannerImpl splitPlanner = + new ContinuousSplitPlannerImpl(tableResource.tableLoader().clone(), scanContext, null); + + ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null); + Assert.assertEquals(1, initialResult.splits().size()); + IcebergSourceSplit split = Iterables.getOnlyElement(initialResult.splits()); + Assert.assertEquals(2, split.task().files().size()); + verifyStatCount(split, 3); + + IcebergEnumeratorPosition lastPosition = initialResult.toPosition(); + for (int i = 0; i < 3; ++i) { + CycleResult result = verifyOneCycle(splitPlanner, lastPosition); + verifyStatCount(result.split, 3); + lastPosition = result.lastPosition; + } + } + + @Test + public void testTableScanSingleStat() throws Exception { + appendTwoSnapshots(); + + ScanContext scanContext = + ScanContext.builder() + .includeColumnStats(ImmutableSet.of("data")) + .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) + .build(); + ContinuousSplitPlannerImpl splitPlanner = + new ContinuousSplitPlannerImpl(tableResource.tableLoader().clone(), scanContext, null); + + ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null); + Assert.assertEquals(1, initialResult.splits().size()); + IcebergSourceSplit split = Iterables.getOnlyElement(initialResult.splits()); + Assert.assertEquals(2, split.task().files().size()); + verifyStatCount(split, 1); + + IcebergEnumeratorPosition lastPosition = initialResult.toPosition(); + for (int i = 0; i < 3; ++i) { + CycleResult result = verifyOneCycle(splitPlanner, lastPosition); + verifyStatCount(result.split, 1); + lastPosition = result.lastPosition; + } + } + + private void verifyStatCount(IcebergSourceSplit split, int expected) { + if (expected == 0) { + split + .task() + .files() + .forEach( + f -> { + Assert.assertNull(f.file().valueCounts()); + Assert.assertNull(f.file().columnSizes()); + Assert.assertNull(f.file().lowerBounds()); + Assert.assertNull(f.file().upperBounds()); + Assert.assertNull(f.file().nanValueCounts()); + Assert.assertNull(f.file().nullValueCounts()); + }); + } else { + split + .task() + .files() + .forEach( + f -> { + Assert.assertEquals(expected, f.file().valueCounts().size()); + Assert.assertEquals(expected, f.file().columnSizes().size()); + Assert.assertEquals(expected, f.file().lowerBounds().size()); + Assert.assertEquals(expected, f.file().upperBounds().size()); + Assert.assertEquals(expected, f.file().nullValueCounts().size()); + // The nanValue is not counted for long and string fields + Assert.assertEquals(0, f.file().nanValueCounts().size()); + }); + } + } + private void verifyMaxPlanningSnapshotCountResult( ContinuousEnumerationResult result, Snapshot fromSnapshotExclusive, @@ -566,4 +675,14 @@ private Snapshot appendSnapshot(long seed, int numRecords) throws Exception { dataAppender.appendToTable(dataFile); return tableResource.table().currentSnapshot(); } + + private static class CycleResult { + IcebergEnumeratorPosition lastPosition; + IcebergSourceSplit split; + + CycleResult(IcebergEnumeratorPosition lastPosition, IcebergSourceSplit split) { + this.lastPosition = lastPosition; + this.split = split; + } + } } diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java index ea317e93d8ba..15078809714f 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/FlinkSplitPlanner.java @@ -165,6 +165,10 @@ private static > T refineScanW refinedScan = refinedScan.includeColumnStats(); } + if (context.includeStatsForColumns() != null) { + refinedScan = refinedScan.includeColumnStats(context.includeStatsForColumns()); + } + refinedScan = refinedScan.option(TableProperties.SPLIT_SIZE, context.splitSize().toString()); refinedScan = diff --git a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java index e380204e871f..4357b1f57df6 100644 --- a/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java +++ b/flink/v1.16/flink/src/main/java/org/apache/iceberg/flink/source/ScanContext.java @@ -20,6 +20,7 @@ import java.io.Serializable; import java.time.Duration; +import java.util.Collection; import java.util.List; import java.util.Map; import org.apache.flink.annotation.Internal; @@ -62,6 +63,7 @@ public class ScanContext implements Serializable { private final List filters; private final long limit; private final boolean includeColumnStats; + private final Collection includeStatsForColumns; private final Integer planParallelism; private final int maxPlanningSnapshotCount; private final int maxAllowedPlanningFailures; @@ -84,6 +86,7 @@ private ScanContext( List filters, long limit, boolean includeColumnStats, + Collection includeStatsForColumns, boolean exposeLocality, Integer planParallelism, int maxPlanningSnapshotCount, @@ -114,6 +117,7 @@ private ScanContext( this.filters = filters; this.limit = limit; this.includeColumnStats = includeColumnStats; + this.includeStatsForColumns = includeStatsForColumns; this.exposeLocality = exposeLocality; this.planParallelism = planParallelism; this.maxPlanningSnapshotCount = maxPlanningSnapshotCount; @@ -248,6 +252,10 @@ public boolean includeColumnStats() { return includeColumnStats; } + public Collection includeStatsForColumns() { + return includeStatsForColumns; + } + public boolean exposeLocality() { return exposeLocality; } @@ -285,6 +293,7 @@ public ScanContext copyWithAppendsBetween(Long newStartSnapshotId, long newEndSn .filters(filters) .limit(limit) .includeColumnStats(includeColumnStats) + .includeColumnStats(includeStatsForColumns) .exposeLocality(exposeLocality) .planParallelism(planParallelism) .maxPlanningSnapshotCount(maxPlanningSnapshotCount) @@ -313,6 +322,7 @@ public ScanContext copyWithSnapshotId(long newSnapshotId) { .filters(filters) .limit(limit) .includeColumnStats(includeColumnStats) + .includeColumnStats(includeStatsForColumns) .exposeLocality(exposeLocality) .planParallelism(planParallelism) .maxPlanningSnapshotCount(maxPlanningSnapshotCount) @@ -349,6 +359,7 @@ public static class Builder { private long limit = FlinkReadOptions.LIMIT_OPTION.defaultValue(); private boolean includeColumnStats = FlinkReadOptions.INCLUDE_COLUMN_STATS_OPTION.defaultValue(); + private Collection includeStatsForColumns = null; private boolean exposeLocality; private Integer planParallelism = FlinkConfigOptions.TABLE_EXEC_ICEBERG_WORKER_POOL_SIZE.defaultValue(); @@ -464,6 +475,11 @@ public Builder includeColumnStats(boolean newIncludeColumnStats) { return this; } + public Builder includeColumnStats(Collection newIncludeStatsForColumns) { + this.includeStatsForColumns = newIncludeStatsForColumns; + return this; + } + public Builder exposeLocality(boolean newExposeLocality) { this.exposeLocality = newExposeLocality; return this; @@ -531,6 +547,7 @@ public ScanContext build() { filters, limit, includeColumnStats, + includeStatsForColumns, exposeLocality, planParallelism, maxPlanningSnapshotCount, diff --git a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java index 85174b4ab273..bb747caae589 100644 --- a/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java +++ b/flink/v1.16/flink/src/test/java/org/apache/iceberg/flink/source/enumerator/TestContinuousSplitPlannerImpl.java @@ -84,7 +84,7 @@ private void appendTwoSnapshots() throws IOException { } /** @return the last enumerated snapshot id */ - private IcebergEnumeratorPosition verifyOneCycle( + private CycleResult verifyOneCycle( ContinuousSplitPlannerImpl splitPlanner, IcebergEnumeratorPosition lastPosition) throws Exception { List batch = @@ -106,7 +106,7 @@ private IcebergEnumeratorPosition verifyOneCycle( Assert.assertEquals( dataFile.path().toString(), Iterables.getOnlyElement(split.task().files()).file().path().toString()); - return result.toPosition(); + return new CycleResult(result.toPosition(), split); } @Test @@ -135,7 +135,7 @@ public void testTableScanThenIncrementalWithEmptyTable() throws Exception { // next 3 snapshots IcebergEnumeratorPosition lastPosition = emptyTableSecondDiscoveryResult.toPosition(); for (int i = 0; i < 3; ++i) { - lastPosition = verifyOneCycle(splitPlanner, lastPosition); + lastPosition = verifyOneCycle(splitPlanner, lastPosition).lastPosition; } } @@ -169,7 +169,7 @@ public void testTableScanThenIncrementalWithNonEmptyTable() throws Exception { IcebergEnumeratorPosition lastPosition = initialResult.toPosition(); for (int i = 0; i < 3; ++i) { - lastPosition = verifyOneCycle(splitPlanner, lastPosition); + lastPosition = verifyOneCycle(splitPlanner, lastPosition).lastPosition; } } @@ -206,7 +206,7 @@ public void testIncrementalFromLatestSnapshotWithEmptyTable() throws Exception { // next 3 snapshots IcebergEnumeratorPosition lastPosition = afterTwoSnapshotsAppended.toPosition(); for (int i = 0; i < 3; ++i) { - lastPosition = verifyOneCycle(splitPlanner, lastPosition); + lastPosition = verifyOneCycle(splitPlanner, lastPosition).lastPosition; } } @@ -251,7 +251,7 @@ public void testIncrementalFromLatestSnapshotWithNonEmptyTable() throws Exceptio IcebergEnumeratorPosition lastPosition = secondResult.toPosition(); for (int i = 0; i < 3; ++i) { - lastPosition = verifyOneCycle(splitPlanner, lastPosition); + lastPosition = verifyOneCycle(splitPlanner, lastPosition).lastPosition; } } @@ -281,7 +281,7 @@ public void testIncrementalFromEarliestSnapshotWithEmptyTable() throws Exception // next 3 snapshots IcebergEnumeratorPosition lastPosition = emptyTableSecondDiscoveryResult.toPosition(); for (int i = 0; i < 3; ++i) { - lastPosition = verifyOneCycle(splitPlanner, lastPosition); + lastPosition = verifyOneCycle(splitPlanner, lastPosition).lastPosition; } } @@ -323,12 +323,12 @@ public void testIncrementalFromEarliestSnapshotWithNonEmptyTable() throws Except IcebergEnumeratorPosition lastPosition = secondResult.toPosition(); for (int i = 0; i < 3; ++i) { - lastPosition = verifyOneCycle(splitPlanner, lastPosition); + lastPosition = verifyOneCycle(splitPlanner, lastPosition).lastPosition; } } @Test - public void testIncrementalFromSnapshotIdWithEmptyTable() throws Exception { + public void testIncrementalFromSnapshotIdWithEmptyTable() { ScanContext scanContextWithInvalidSnapshotId = ScanContext.builder() .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_ID) @@ -409,12 +409,12 @@ public void testIncrementalFromSnapshotId() throws Exception { IcebergEnumeratorPosition lastPosition = secondResult.toPosition(); for (int i = 0; i < 3; ++i) { - lastPosition = verifyOneCycle(splitPlanner, lastPosition); + lastPosition = verifyOneCycle(splitPlanner, lastPosition).lastPosition; } } @Test - public void testIncrementalFromSnapshotTimestampWithEmptyTable() throws Exception { + public void testIncrementalFromSnapshotTimestampWithEmptyTable() { ScanContext scanContextWithInvalidSnapshotId = ScanContext.builder() .startingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP) @@ -489,7 +489,7 @@ public void testIncrementalFromSnapshotTimestamp() throws Exception { IcebergEnumeratorPosition lastPosition = secondResult.toPosition(); for (int i = 0; i < 3; ++i) { - lastPosition = verifyOneCycle(splitPlanner, lastPosition); + lastPosition = verifyOneCycle(splitPlanner, lastPosition).lastPosition; } } @@ -529,6 +529,115 @@ public void testMaxPlanningSnapshotCount() throws Exception { thirdResult, snapshot1, snapshot2, ImmutableSet.of(dataFile2.path().toString())); } + @Test + public void testTableScanNoStats() throws Exception { + appendTwoSnapshots(); + + ScanContext scanContext = + ScanContext.builder() + .includeColumnStats(false) + .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) + .build(); + ContinuousSplitPlannerImpl splitPlanner = + new ContinuousSplitPlannerImpl(tableResource.tableLoader().clone(), scanContext, null); + + ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null); + Assert.assertEquals(1, initialResult.splits().size()); + IcebergSourceSplit split = Iterables.getOnlyElement(initialResult.splits()); + Assert.assertEquals(2, split.task().files().size()); + verifyStatCount(split, 0); + + IcebergEnumeratorPosition lastPosition = initialResult.toPosition(); + for (int i = 0; i < 3; ++i) { + CycleResult result = verifyOneCycle(splitPlanner, lastPosition); + verifyStatCount(result.split, 0); + lastPosition = result.lastPosition; + } + } + + @Test + public void testTableScanAllStats() throws Exception { + appendTwoSnapshots(); + + ScanContext scanContext = + ScanContext.builder() + .includeColumnStats(true) + .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) + .build(); + ContinuousSplitPlannerImpl splitPlanner = + new ContinuousSplitPlannerImpl(tableResource.tableLoader().clone(), scanContext, null); + + ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null); + Assert.assertEquals(1, initialResult.splits().size()); + IcebergSourceSplit split = Iterables.getOnlyElement(initialResult.splits()); + Assert.assertEquals(2, split.task().files().size()); + verifyStatCount(split, 3); + + IcebergEnumeratorPosition lastPosition = initialResult.toPosition(); + for (int i = 0; i < 3; ++i) { + CycleResult result = verifyOneCycle(splitPlanner, lastPosition); + verifyStatCount(result.split, 3); + lastPosition = result.lastPosition; + } + } + + @Test + public void testTableScanSingleStat() throws Exception { + appendTwoSnapshots(); + + ScanContext scanContext = + ScanContext.builder() + .includeColumnStats(ImmutableSet.of("data")) + .startingStrategy(StreamingStartingStrategy.TABLE_SCAN_THEN_INCREMENTAL) + .build(); + ContinuousSplitPlannerImpl splitPlanner = + new ContinuousSplitPlannerImpl(tableResource.tableLoader().clone(), scanContext, null); + + ContinuousEnumerationResult initialResult = splitPlanner.planSplits(null); + Assert.assertEquals(1, initialResult.splits().size()); + IcebergSourceSplit split = Iterables.getOnlyElement(initialResult.splits()); + Assert.assertEquals(2, split.task().files().size()); + verifyStatCount(split, 1); + + IcebergEnumeratorPosition lastPosition = initialResult.toPosition(); + for (int i = 0; i < 3; ++i) { + CycleResult result = verifyOneCycle(splitPlanner, lastPosition); + verifyStatCount(result.split, 1); + lastPosition = result.lastPosition; + } + } + + private void verifyStatCount(IcebergSourceSplit split, int expected) { + if (expected == 0) { + split + .task() + .files() + .forEach( + f -> { + Assert.assertNull(f.file().valueCounts()); + Assert.assertNull(f.file().columnSizes()); + Assert.assertNull(f.file().lowerBounds()); + Assert.assertNull(f.file().upperBounds()); + Assert.assertNull(f.file().nanValueCounts()); + Assert.assertNull(f.file().nullValueCounts()); + }); + } else { + split + .task() + .files() + .forEach( + f -> { + Assert.assertEquals(expected, f.file().valueCounts().size()); + Assert.assertEquals(expected, f.file().columnSizes().size()); + Assert.assertEquals(expected, f.file().lowerBounds().size()); + Assert.assertEquals(expected, f.file().upperBounds().size()); + Assert.assertEquals(expected, f.file().nullValueCounts().size()); + // The nanValue is not counted for long and string fields + Assert.assertEquals(0, f.file().nanValueCounts().size()); + }); + } + } + private void verifyMaxPlanningSnapshotCountResult( ContinuousEnumerationResult result, Snapshot fromSnapshotExclusive, @@ -566,4 +675,14 @@ private Snapshot appendSnapshot(long seed, int numRecords) throws Exception { dataAppender.appendToTable(dataFile); return tableResource.table().currentSnapshot(); } + + private static class CycleResult { + IcebergEnumeratorPosition lastPosition; + IcebergSourceSplit split; + + CycleResult(IcebergEnumeratorPosition lastPosition, IcebergSourceSplit split) { + this.lastPosition = lastPosition; + this.split = split; + } + } }