diff --git a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java index f00596fa46e8..51b3ce9c9ded 100644 --- a/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java +++ b/api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java @@ -92,16 +92,16 @@ public interface RewriteDataFiles extends SnapshotUpdate

* Defaults to none. */ - String REWRITE_JOB_ORDER = "rewrite.job-order"; + String REWRITE_JOB_ORDER = "rewrite-job-order"; String REWRITE_JOB_ORDER_DEFAULT = RewriteJobOrder.NONE.orderName(); /** diff --git a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java index 943409549f17..4f0c27fdcc75 100644 --- a/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java +++ b/spark/v3.0/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.math.RoundingMode; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; @@ -34,6 +35,7 @@ import java.util.stream.Stream; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.RewriteJobOrder; import org.apache.iceberg.SortOrder; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; @@ -79,7 +81,8 @@ abstract class BaseRewriteDataFilesSparkAction MAX_FILE_GROUP_SIZE_BYTES, PARTIAL_PROGRESS_ENABLED, PARTIAL_PROGRESS_MAX_COMMITS, - TARGET_FILE_SIZE_BYTES + TARGET_FILE_SIZE_BYTES, + REWRITE_JOB_ORDER ); private final Table table; @@ -88,6 +91,7 @@ abstract class BaseRewriteDataFilesSparkAction private int maxConcurrentFileGroupRewrites; private int maxCommits; private boolean partialProgressEnabled; + private RewriteJobOrder rewriteJobOrder; private RewriteStrategy strategy = null; protected BaseRewriteDataFilesSparkAction(SparkSession spark, Table table) { @@ -153,7 +157,6 @@ public RewriteDataFiles.Result execute() { } validateAndInitOptions(); - strategy = strategy.options(options()); Map>> fileGroupsByPartition = planFileGroups(startingSnapshotId); RewriteExecutionContext ctx = new RewriteExecutionContext(fileGroupsByPartition); @@ -173,7 +176,7 @@ public RewriteDataFiles.Result execute() { } } - private Map>> planFileGroups(long startingSnapshotId) { + Map>> planFileGroups(long startingSnapshotId) { CloseableIterable fileScanTasks = table.newScan() .useSnapshot(startingSnapshotId) .filter(filter) @@ -335,11 +338,10 @@ private Result doExecuteWithPartialProgress(RewriteExecutionContext ctx, Stream< return new BaseRewriteDataFilesResult(rewriteResults); } - private Stream toGroupStream(RewriteExecutionContext ctx, - Map>> fileGroupsByPartition) { + Stream toGroupStream(RewriteExecutionContext ctx, + Map>> fileGroupsByPartition) { - // Todo Add intelligence to the order in which we do rewrites instead of just using partition order - return fileGroupsByPartition.entrySet().stream() + Stream rewriteFileGroupStream = fileGroupsByPartition.entrySet().stream() .flatMap(e -> { StructLike partition = e.getKey(); List> fileGroups = e.getValue(); @@ -350,9 +352,26 @@ private Stream toGroupStream(RewriteExecutionContext ctx, return new RewriteFileGroup(info, tasks); }); }); + + return rewriteFileGroupStream.sorted(rewriteGroupComparator()); + } + + private Comparator rewriteGroupComparator() { + switch (rewriteJobOrder) { + case BYTES_ASC: + return Comparator.comparing(RewriteFileGroup::sizeInBytes); + case BYTES_DESC: + return Comparator.comparing(RewriteFileGroup::sizeInBytes, Comparator.reverseOrder()); + case FILES_ASC: + return Comparator.comparing(RewriteFileGroup::numFiles); + case FILES_DESC: + return Comparator.comparing(RewriteFileGroup::numFiles, Comparator.reverseOrder()); + default: + return (fileGroupOne, fileGroupTwo) -> 0; + } } - private void validateAndInitOptions() { + void validateAndInitOptions() { Set validOptions = Sets.newHashSet(strategy.validOptions()); validOptions.addAll(VALID_OPTIONS); @@ -363,6 +382,8 @@ private void validateAndInitOptions() { "Cannot use options %s, they are not supported by the action or the strategy %s", invalidKeys, strategy.name()); + strategy = strategy.options(options()); + maxConcurrentFileGroupRewrites = PropertyUtil.propertyAsInt(options(), MAX_CONCURRENT_FILE_GROUP_REWRITES, MAX_CONCURRENT_FILE_GROUP_REWRITES_DEFAULT); @@ -375,6 +396,10 @@ private void validateAndInitOptions() { PARTIAL_PROGRESS_ENABLED, PARTIAL_PROGRESS_ENABLED_DEFAULT); + rewriteJobOrder = RewriteJobOrder.fromName(PropertyUtil.propertyAsString(options(), + REWRITE_JOB_ORDER, + REWRITE_JOB_ORDER_DEFAULT)); + Preconditions.checkArgument(maxConcurrentFileGroupRewrites >= 1, "Cannot set %s to %s, the value must be positive.", MAX_CONCURRENT_FILE_GROUP_REWRITES, maxConcurrentFileGroupRewrites); diff --git a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index a907a80719cc..2e62fb0b2503 100644 --- a/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -40,6 +40,7 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RewriteJobOrder; import org.apache.iceberg.RowDelta; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; @@ -74,6 +75,7 @@ import org.apache.iceberg.spark.FileRewriteCoordinator; import org.apache.iceberg.spark.FileScanTaskSetManager; import org.apache.iceberg.spark.SparkTestBase; +import org.apache.iceberg.spark.actions.BaseRewriteDataFilesSparkAction.RewriteExecutionContext; import org.apache.iceberg.spark.source.ThreeColumnRecord; import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.Conversions; @@ -716,6 +718,12 @@ public void testInvalidOptions() { () -> basicRewrite(table) .option("foobarity", "-5") .execute()); + + AssertHelpers.assertThrows("Cannot set rewrite-job-order to foo", + IllegalArgumentException.class, + () -> basicRewrite(table) + .option(RewriteDataFiles.REWRITE_JOB_ORDER, "foo") + .execute()); } @Test @@ -947,6 +955,144 @@ public void testInvalidAPIUsage() { "Cannot set strategy", () -> actions().rewriteDataFiles(table).sort(SortOrder.unsorted()).binPack()); } + @Test + public void testRewriteJobOrderBytesAsc() { + Table table = createTablePartitioned(4, 2); + writeRecords(1, SCALE, 1); + writeRecords(2, SCALE, 2); + writeRecords(3, SCALE, 3); + writeRecords(4, SCALE, 4); + table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit(); + + BaseRewriteDataFilesSparkAction basicRewrite = + (BaseRewriteDataFilesSparkAction) basicRewrite(table) + .binPack(); + List expected = toGroupStream(table, basicRewrite) + .mapToLong(RewriteFileGroup::sizeInBytes) + .boxed() + .collect(Collectors.toList()); + + BaseRewriteDataFilesSparkAction jobOrderRewrite = + (BaseRewriteDataFilesSparkAction) basicRewrite(table) + .option(RewriteDataFiles.REWRITE_JOB_ORDER, RewriteJobOrder.BYTES_ASC.orderName()) + .binPack(); + List actual = toGroupStream(table, jobOrderRewrite) + .mapToLong(RewriteFileGroup::sizeInBytes) + .boxed() + .collect(Collectors.toList()); + + expected.sort(Comparator.naturalOrder()); + Assert.assertEquals("Size in bytes order should be ascending", actual, expected); + Collections.reverse(expected); + Assert.assertNotEquals("Size in bytes order should not be descending", actual, expected); + } + + @Test + public void testRewriteJobOrderBytesDesc() { + Table table = createTablePartitioned(4, 2); + writeRecords(1, SCALE, 1); + writeRecords(2, SCALE, 2); + writeRecords(3, SCALE, 3); + writeRecords(4, SCALE, 4); + table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit(); + + BaseRewriteDataFilesSparkAction basicRewrite = + (BaseRewriteDataFilesSparkAction) basicRewrite(table) + .binPack(); + List expected = toGroupStream(table, basicRewrite) + .mapToLong(RewriteFileGroup::sizeInBytes) + .boxed() + .collect(Collectors.toList()); + + BaseRewriteDataFilesSparkAction jobOrderRewrite = + (BaseRewriteDataFilesSparkAction) basicRewrite(table) + .option(RewriteDataFiles.REWRITE_JOB_ORDER, RewriteJobOrder.BYTES_DESC.orderName()) + .binPack(); + List actual = toGroupStream(table, jobOrderRewrite) + .mapToLong(RewriteFileGroup::sizeInBytes) + .boxed() + .collect(Collectors.toList()); + + expected.sort(Comparator.reverseOrder()); + Assert.assertEquals("Size in bytes order should be descending", actual, expected); + Collections.reverse(expected); + Assert.assertNotEquals("Size in bytes order should not be ascending", actual, expected); + } + + @Test + public void testRewriteJobOrderFilesAsc() { + Table table = createTablePartitioned(4, 2); + writeRecords(1, SCALE, 1); + writeRecords(2, SCALE, 2); + writeRecords(3, SCALE, 3); + writeRecords(4, SCALE, 4); + table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit(); + + BaseRewriteDataFilesSparkAction basicRewrite = + (BaseRewriteDataFilesSparkAction) basicRewrite(table) + .binPack(); + List expected = toGroupStream(table, basicRewrite) + .mapToLong(RewriteFileGroup::numFiles) + .boxed() + .collect(Collectors.toList()); + + BaseRewriteDataFilesSparkAction jobOrderRewrite = + (BaseRewriteDataFilesSparkAction) basicRewrite(table) + .option(RewriteDataFiles.REWRITE_JOB_ORDER, RewriteJobOrder.FILES_ASC.orderName()) + .binPack(); + List actual = toGroupStream(table, jobOrderRewrite) + .mapToLong(RewriteFileGroup::numFiles) + .boxed() + .collect(Collectors.toList()); + + expected.sort(Comparator.naturalOrder()); + Assert.assertEquals("Number of files order should be ascending", actual, expected); + Collections.reverse(expected); + Assert.assertNotEquals("Number of files order should not be descending", actual, expected); + } + + @Test + public void testRewriteJobOrderFilesDesc() { + Table table = createTablePartitioned(4, 2); + writeRecords(1, SCALE, 1); + writeRecords(2, SCALE, 2); + writeRecords(3, SCALE, 3); + writeRecords(4, SCALE, 4); + table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit(); + + BaseRewriteDataFilesSparkAction basicRewrite = + (BaseRewriteDataFilesSparkAction) basicRewrite(table) + .binPack(); + List expected = toGroupStream(table, basicRewrite) + .mapToLong(RewriteFileGroup::numFiles) + .boxed() + .collect(Collectors.toList()); + + BaseRewriteDataFilesSparkAction jobOrderRewrite = + (BaseRewriteDataFilesSparkAction) basicRewrite(table) + .option(RewriteDataFiles.REWRITE_JOB_ORDER, RewriteJobOrder.FILES_DESC.orderName()) + .binPack(); + List actual = toGroupStream(table, jobOrderRewrite) + .mapToLong(RewriteFileGroup::numFiles) + .boxed() + .collect(Collectors.toList()); + + expected.sort(Comparator.reverseOrder()); + Assert.assertEquals("Number of files order should be descending", actual, expected); + Collections.reverse(expected); + Assert.assertNotEquals("Number of files order should not be ascending", actual, expected); + } + + private Stream toGroupStream(Table table, + BaseRewriteDataFilesSparkAction rewrite) { + rewrite.validateAndInitOptions(); + Map>> fileGroupsByPartition = + rewrite.planFileGroups(table.currentSnapshot().snapshotId()); + + return rewrite.toGroupStream( + new RewriteExecutionContext(fileGroupsByPartition), fileGroupsByPartition); + } + protected List currentData() { return rowsToJava(spark.read().format("iceberg").load(tableLocation) .sort("c1", "c2", "c3") diff --git a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java index 943409549f17..1cbfa4cecf27 100644 --- a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java +++ b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/actions/BaseRewriteDataFilesSparkAction.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.math.RoundingMode; import java.util.Collections; +import java.util.Comparator; import java.util.List; import java.util.Map; import java.util.Set; @@ -34,6 +35,7 @@ import java.util.stream.Stream; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.RewriteJobOrder; import org.apache.iceberg.SortOrder; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; @@ -79,7 +81,8 @@ abstract class BaseRewriteDataFilesSparkAction MAX_FILE_GROUP_SIZE_BYTES, PARTIAL_PROGRESS_ENABLED, PARTIAL_PROGRESS_MAX_COMMITS, - TARGET_FILE_SIZE_BYTES + TARGET_FILE_SIZE_BYTES, + REWRITE_JOB_ORDER ); private final Table table; @@ -88,6 +91,7 @@ abstract class BaseRewriteDataFilesSparkAction private int maxConcurrentFileGroupRewrites; private int maxCommits; private boolean partialProgressEnabled; + private RewriteJobOrder rewriteJobOrder; private RewriteStrategy strategy = null; protected BaseRewriteDataFilesSparkAction(SparkSession spark, Table table) { @@ -153,7 +157,6 @@ public RewriteDataFiles.Result execute() { } validateAndInitOptions(); - strategy = strategy.options(options()); Map>> fileGroupsByPartition = planFileGroups(startingSnapshotId); RewriteExecutionContext ctx = new RewriteExecutionContext(fileGroupsByPartition); @@ -173,7 +176,7 @@ public RewriteDataFiles.Result execute() { } } - private Map>> planFileGroups(long startingSnapshotId) { + Map>> planFileGroups(long startingSnapshotId) { CloseableIterable fileScanTasks = table.newScan() .useSnapshot(startingSnapshotId) .filter(filter) @@ -335,11 +338,10 @@ private Result doExecuteWithPartialProgress(RewriteExecutionContext ctx, Stream< return new BaseRewriteDataFilesResult(rewriteResults); } - private Stream toGroupStream(RewriteExecutionContext ctx, - Map>> fileGroupsByPartition) { + Stream toGroupStream(RewriteExecutionContext ctx, + Map>> fileGroupsByPartition) { - // Todo Add intelligence to the order in which we do rewrites instead of just using partition order - return fileGroupsByPartition.entrySet().stream() + Stream rewriteFileGroupStream = fileGroupsByPartition.entrySet().stream() .flatMap(e -> { StructLike partition = e.getKey(); List> fileGroups = e.getValue(); @@ -350,9 +352,25 @@ private Stream toGroupStream(RewriteExecutionContext ctx, return new RewriteFileGroup(info, tasks); }); }); + return rewriteFileGroupStream.sorted(rewriteGroupComparator()); + } + + private Comparator rewriteGroupComparator() { + switch (rewriteJobOrder) { + case BYTES_ASC: + return Comparator.comparing(RewriteFileGroup::sizeInBytes); + case BYTES_DESC: + return Comparator.comparing(RewriteFileGroup::sizeInBytes, Comparator.reverseOrder()); + case FILES_ASC: + return Comparator.comparing(RewriteFileGroup::numFiles); + case FILES_DESC: + return Comparator.comparing(RewriteFileGroup::numFiles, Comparator.reverseOrder()); + default: + return (fileGroupOne, fileGroupTwo) -> 0; + } } - private void validateAndInitOptions() { + void validateAndInitOptions() { Set validOptions = Sets.newHashSet(strategy.validOptions()); validOptions.addAll(VALID_OPTIONS); @@ -363,6 +381,8 @@ private void validateAndInitOptions() { "Cannot use options %s, they are not supported by the action or the strategy %s", invalidKeys, strategy.name()); + strategy = strategy.options(options()); + maxConcurrentFileGroupRewrites = PropertyUtil.propertyAsInt(options(), MAX_CONCURRENT_FILE_GROUP_REWRITES, MAX_CONCURRENT_FILE_GROUP_REWRITES_DEFAULT); @@ -375,6 +395,10 @@ private void validateAndInitOptions() { PARTIAL_PROGRESS_ENABLED, PARTIAL_PROGRESS_ENABLED_DEFAULT); + rewriteJobOrder = RewriteJobOrder.fromName(PropertyUtil.propertyAsString(options(), + REWRITE_JOB_ORDER, + REWRITE_JOB_ORDER_DEFAULT)); + Preconditions.checkArgument(maxConcurrentFileGroupRewrites >= 1, "Cannot set %s to %s, the value must be positive.", MAX_CONCURRENT_FILE_GROUP_REWRITES, maxConcurrentFileGroupRewrites); diff --git a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index a907a80719cc..2e62fb0b2503 100644 --- a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -40,6 +40,7 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.RewriteJobOrder; import org.apache.iceberg.RowDelta; import org.apache.iceberg.Schema; import org.apache.iceberg.SortOrder; @@ -74,6 +75,7 @@ import org.apache.iceberg.spark.FileRewriteCoordinator; import org.apache.iceberg.spark.FileScanTaskSetManager; import org.apache.iceberg.spark.SparkTestBase; +import org.apache.iceberg.spark.actions.BaseRewriteDataFilesSparkAction.RewriteExecutionContext; import org.apache.iceberg.spark.source.ThreeColumnRecord; import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.Conversions; @@ -716,6 +718,12 @@ public void testInvalidOptions() { () -> basicRewrite(table) .option("foobarity", "-5") .execute()); + + AssertHelpers.assertThrows("Cannot set rewrite-job-order to foo", + IllegalArgumentException.class, + () -> basicRewrite(table) + .option(RewriteDataFiles.REWRITE_JOB_ORDER, "foo") + .execute()); } @Test @@ -947,6 +955,144 @@ public void testInvalidAPIUsage() { "Cannot set strategy", () -> actions().rewriteDataFiles(table).sort(SortOrder.unsorted()).binPack()); } + @Test + public void testRewriteJobOrderBytesAsc() { + Table table = createTablePartitioned(4, 2); + writeRecords(1, SCALE, 1); + writeRecords(2, SCALE, 2); + writeRecords(3, SCALE, 3); + writeRecords(4, SCALE, 4); + table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit(); + + BaseRewriteDataFilesSparkAction basicRewrite = + (BaseRewriteDataFilesSparkAction) basicRewrite(table) + .binPack(); + List expected = toGroupStream(table, basicRewrite) + .mapToLong(RewriteFileGroup::sizeInBytes) + .boxed() + .collect(Collectors.toList()); + + BaseRewriteDataFilesSparkAction jobOrderRewrite = + (BaseRewriteDataFilesSparkAction) basicRewrite(table) + .option(RewriteDataFiles.REWRITE_JOB_ORDER, RewriteJobOrder.BYTES_ASC.orderName()) + .binPack(); + List actual = toGroupStream(table, jobOrderRewrite) + .mapToLong(RewriteFileGroup::sizeInBytes) + .boxed() + .collect(Collectors.toList()); + + expected.sort(Comparator.naturalOrder()); + Assert.assertEquals("Size in bytes order should be ascending", actual, expected); + Collections.reverse(expected); + Assert.assertNotEquals("Size in bytes order should not be descending", actual, expected); + } + + @Test + public void testRewriteJobOrderBytesDesc() { + Table table = createTablePartitioned(4, 2); + writeRecords(1, SCALE, 1); + writeRecords(2, SCALE, 2); + writeRecords(3, SCALE, 3); + writeRecords(4, SCALE, 4); + table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit(); + + BaseRewriteDataFilesSparkAction basicRewrite = + (BaseRewriteDataFilesSparkAction) basicRewrite(table) + .binPack(); + List expected = toGroupStream(table, basicRewrite) + .mapToLong(RewriteFileGroup::sizeInBytes) + .boxed() + .collect(Collectors.toList()); + + BaseRewriteDataFilesSparkAction jobOrderRewrite = + (BaseRewriteDataFilesSparkAction) basicRewrite(table) + .option(RewriteDataFiles.REWRITE_JOB_ORDER, RewriteJobOrder.BYTES_DESC.orderName()) + .binPack(); + List actual = toGroupStream(table, jobOrderRewrite) + .mapToLong(RewriteFileGroup::sizeInBytes) + .boxed() + .collect(Collectors.toList()); + + expected.sort(Comparator.reverseOrder()); + Assert.assertEquals("Size in bytes order should be descending", actual, expected); + Collections.reverse(expected); + Assert.assertNotEquals("Size in bytes order should not be ascending", actual, expected); + } + + @Test + public void testRewriteJobOrderFilesAsc() { + Table table = createTablePartitioned(4, 2); + writeRecords(1, SCALE, 1); + writeRecords(2, SCALE, 2); + writeRecords(3, SCALE, 3); + writeRecords(4, SCALE, 4); + table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit(); + + BaseRewriteDataFilesSparkAction basicRewrite = + (BaseRewriteDataFilesSparkAction) basicRewrite(table) + .binPack(); + List expected = toGroupStream(table, basicRewrite) + .mapToLong(RewriteFileGroup::numFiles) + .boxed() + .collect(Collectors.toList()); + + BaseRewriteDataFilesSparkAction jobOrderRewrite = + (BaseRewriteDataFilesSparkAction) basicRewrite(table) + .option(RewriteDataFiles.REWRITE_JOB_ORDER, RewriteJobOrder.FILES_ASC.orderName()) + .binPack(); + List actual = toGroupStream(table, jobOrderRewrite) + .mapToLong(RewriteFileGroup::numFiles) + .boxed() + .collect(Collectors.toList()); + + expected.sort(Comparator.naturalOrder()); + Assert.assertEquals("Number of files order should be ascending", actual, expected); + Collections.reverse(expected); + Assert.assertNotEquals("Number of files order should not be descending", actual, expected); + } + + @Test + public void testRewriteJobOrderFilesDesc() { + Table table = createTablePartitioned(4, 2); + writeRecords(1, SCALE, 1); + writeRecords(2, SCALE, 2); + writeRecords(3, SCALE, 3); + writeRecords(4, SCALE, 4); + table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit(); + + BaseRewriteDataFilesSparkAction basicRewrite = + (BaseRewriteDataFilesSparkAction) basicRewrite(table) + .binPack(); + List expected = toGroupStream(table, basicRewrite) + .mapToLong(RewriteFileGroup::numFiles) + .boxed() + .collect(Collectors.toList()); + + BaseRewriteDataFilesSparkAction jobOrderRewrite = + (BaseRewriteDataFilesSparkAction) basicRewrite(table) + .option(RewriteDataFiles.REWRITE_JOB_ORDER, RewriteJobOrder.FILES_DESC.orderName()) + .binPack(); + List actual = toGroupStream(table, jobOrderRewrite) + .mapToLong(RewriteFileGroup::numFiles) + .boxed() + .collect(Collectors.toList()); + + expected.sort(Comparator.reverseOrder()); + Assert.assertEquals("Number of files order should be descending", actual, expected); + Collections.reverse(expected); + Assert.assertNotEquals("Number of files order should not be ascending", actual, expected); + } + + private Stream toGroupStream(Table table, + BaseRewriteDataFilesSparkAction rewrite) { + rewrite.validateAndInitOptions(); + Map>> fileGroupsByPartition = + rewrite.planFileGroups(table.currentSnapshot().snapshotId()); + + return rewrite.toGroupStream( + new RewriteExecutionContext(fileGroupsByPartition), fileGroupsByPartition); + } + protected List currentData() { return rowsToJava(spark.read().format("iceberg").load(tableLocation) .sort("c1", "c2", "c3") diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index 1d8695053123..dee3bc557d7c 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -810,7 +810,7 @@ public void testInvalidOptions() { .option("foobarity", "-5") .execute()); - AssertHelpers.assertThrows("Cannot set rewrite.job-order to foo", + AssertHelpers.assertThrows("Cannot set rewrite-job-order to foo", IllegalArgumentException.class, () -> basicRewrite(table) .option(RewriteDataFiles.REWRITE_JOB_ORDER, "foo")