Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,16 @@ public interface RewriteDataFiles extends SnapshotUpdate<RewriteDataFiles, Rewri
/**
* Forces the rewrite job order based on the value.
* <p><ul>
* <li> If rewrite.job-order=bytes-asc, then rewrite the smallest job groups first.
* <li> If rewrite.job-order=bytes-desc, then rewrite the largest job groups first.
* <li> If rewrite.job-order=files-asc, then rewrite the job groups with the least files first.
* <li> If rewrite.job-order=files-desc, then rewrite the job groups with the most files first.
* <li> If rewrite.job-order=none, then rewrite job groups in the order they were planned (no
* <li> If rewrite-job-order=bytes-asc, then rewrite the smallest job groups first.
* <li> If rewrite-job-order=bytes-desc, then rewrite the largest job groups first.
* <li> If rewrite-job-order=files-asc, then rewrite the job groups with the least files first.
* <li> If rewrite-job-order=files-desc, then rewrite the job groups with the most files first.
* <li> If rewrite-job-order=none, then rewrite job groups in the order they were planned (no
* specific ordering).
* </ul><p>
* Defaults to none.
*/
String REWRITE_JOB_ORDER = "rewrite.job-order";
String REWRITE_JOB_ORDER = "rewrite-job-order";
String REWRITE_JOB_ORDER_DEFAULT = RewriteJobOrder.NONE.orderName();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.math.RoundingMode;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -34,6 +35,7 @@
import java.util.stream.Stream;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
Expand Down Expand Up @@ -79,7 +81,8 @@ abstract class BaseRewriteDataFilesSparkAction
MAX_FILE_GROUP_SIZE_BYTES,
PARTIAL_PROGRESS_ENABLED,
PARTIAL_PROGRESS_MAX_COMMITS,
TARGET_FILE_SIZE_BYTES
TARGET_FILE_SIZE_BYTES,
REWRITE_JOB_ORDER
);

private final Table table;
Expand All @@ -88,6 +91,7 @@ abstract class BaseRewriteDataFilesSparkAction
private int maxConcurrentFileGroupRewrites;
private int maxCommits;
private boolean partialProgressEnabled;
private RewriteJobOrder rewriteJobOrder;
private RewriteStrategy strategy = null;

protected BaseRewriteDataFilesSparkAction(SparkSession spark, Table table) {
Expand Down Expand Up @@ -153,7 +157,6 @@ public RewriteDataFiles.Result execute() {
}

validateAndInitOptions();
strategy = strategy.options(options());

Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition = planFileGroups(startingSnapshotId);
RewriteExecutionContext ctx = new RewriteExecutionContext(fileGroupsByPartition);
Expand All @@ -173,7 +176,7 @@ public RewriteDataFiles.Result execute() {
}
}

private Map<StructLike, List<List<FileScanTask>>> planFileGroups(long startingSnapshotId) {
Map<StructLike, List<List<FileScanTask>>> planFileGroups(long startingSnapshotId) {
CloseableIterable<FileScanTask> fileScanTasks = table.newScan()
.useSnapshot(startingSnapshotId)
.filter(filter)
Expand Down Expand Up @@ -335,11 +338,10 @@ private Result doExecuteWithPartialProgress(RewriteExecutionContext ctx, Stream<
return new BaseRewriteDataFilesResult(rewriteResults);
}

private Stream<RewriteFileGroup> toGroupStream(RewriteExecutionContext ctx,
Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition) {
Stream<RewriteFileGroup> toGroupStream(RewriteExecutionContext ctx,
Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition) {

// Todo Add intelligence to the order in which we do rewrites instead of just using partition order
return fileGroupsByPartition.entrySet().stream()
Stream<RewriteFileGroup> rewriteFileGroupStream = fileGroupsByPartition.entrySet().stream()
.flatMap(e -> {
StructLike partition = e.getKey();
List<List<FileScanTask>> fileGroups = e.getValue();
Expand All @@ -350,9 +352,26 @@ private Stream<RewriteFileGroup> toGroupStream(RewriteExecutionContext ctx,
return new RewriteFileGroup(info, tasks);
});
});

return rewriteFileGroupStream.sorted(rewriteGroupComparator());
}

private Comparator<RewriteFileGroup> rewriteGroupComparator() {
switch (rewriteJobOrder) {
case BYTES_ASC:
return Comparator.comparing(RewriteFileGroup::sizeInBytes);
case BYTES_DESC:
return Comparator.comparing(RewriteFileGroup::sizeInBytes, Comparator.reverseOrder());
case FILES_ASC:
return Comparator.comparing(RewriteFileGroup::numFiles);
case FILES_DESC:
return Comparator.comparing(RewriteFileGroup::numFiles, Comparator.reverseOrder());
default:
return (fileGroupOne, fileGroupTwo) -> 0;
}
}

private void validateAndInitOptions() {
void validateAndInitOptions() {
Set<String> validOptions = Sets.newHashSet(strategy.validOptions());
validOptions.addAll(VALID_OPTIONS);

Expand All @@ -363,6 +382,8 @@ private void validateAndInitOptions() {
"Cannot use options %s, they are not supported by the action or the strategy %s",
invalidKeys, strategy.name());

strategy = strategy.options(options());

maxConcurrentFileGroupRewrites = PropertyUtil.propertyAsInt(options(),
MAX_CONCURRENT_FILE_GROUP_REWRITES,
MAX_CONCURRENT_FILE_GROUP_REWRITES_DEFAULT);
Expand All @@ -375,6 +396,10 @@ private void validateAndInitOptions() {
PARTIAL_PROGRESS_ENABLED,
PARTIAL_PROGRESS_ENABLED_DEFAULT);

rewriteJobOrder = RewriteJobOrder.fromName(PropertyUtil.propertyAsString(options(),
REWRITE_JOB_ORDER,
REWRITE_JOB_ORDER_DEFAULT));

Preconditions.checkArgument(maxConcurrentFileGroupRewrites >= 1,
"Cannot set %s to %s, the value must be positive.",
MAX_CONCURRENT_FILE_GROUP_REWRITES, maxConcurrentFileGroupRewrites);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.RewriteJobOrder;
import org.apache.iceberg.RowDelta;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
Expand Down Expand Up @@ -74,6 +75,7 @@
import org.apache.iceberg.spark.FileRewriteCoordinator;
import org.apache.iceberg.spark.FileScanTaskSetManager;
import org.apache.iceberg.spark.SparkTestBase;
import org.apache.iceberg.spark.actions.BaseRewriteDataFilesSparkAction.RewriteExecutionContext;
import org.apache.iceberg.spark.source.ThreeColumnRecord;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Conversions;
Expand Down Expand Up @@ -716,6 +718,12 @@ public void testInvalidOptions() {
() -> basicRewrite(table)
.option("foobarity", "-5")
.execute());

AssertHelpers.assertThrows("Cannot set rewrite-job-order to foo",
IllegalArgumentException.class,
() -> basicRewrite(table)
.option(RewriteDataFiles.REWRITE_JOB_ORDER, "foo")
.execute());
}

@Test
Expand Down Expand Up @@ -947,6 +955,144 @@ public void testInvalidAPIUsage() {
"Cannot set strategy", () -> actions().rewriteDataFiles(table).sort(SortOrder.unsorted()).binPack());
}

@Test
public void testRewriteJobOrderBytesAsc() {
Table table = createTablePartitioned(4, 2);
writeRecords(1, SCALE, 1);
writeRecords(2, SCALE, 2);
writeRecords(3, SCALE, 3);
writeRecords(4, SCALE, 4);
table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();

BaseRewriteDataFilesSparkAction basicRewrite =
(BaseRewriteDataFilesSparkAction) basicRewrite(table)
.binPack();
List<Long> expected = toGroupStream(table, basicRewrite)
.mapToLong(RewriteFileGroup::sizeInBytes)
.boxed()
.collect(Collectors.toList());

BaseRewriteDataFilesSparkAction jobOrderRewrite =
(BaseRewriteDataFilesSparkAction) basicRewrite(table)
.option(RewriteDataFiles.REWRITE_JOB_ORDER, RewriteJobOrder.BYTES_ASC.orderName())
.binPack();
List<Long> actual = toGroupStream(table, jobOrderRewrite)
.mapToLong(RewriteFileGroup::sizeInBytes)
.boxed()
.collect(Collectors.toList());

expected.sort(Comparator.naturalOrder());
Assert.assertEquals("Size in bytes order should be ascending", actual, expected);
Collections.reverse(expected);
Assert.assertNotEquals("Size in bytes order should not be descending", actual, expected);
}

@Test
public void testRewriteJobOrderBytesDesc() {
Table table = createTablePartitioned(4, 2);
writeRecords(1, SCALE, 1);
writeRecords(2, SCALE, 2);
writeRecords(3, SCALE, 3);
writeRecords(4, SCALE, 4);
table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();

BaseRewriteDataFilesSparkAction basicRewrite =
(BaseRewriteDataFilesSparkAction) basicRewrite(table)
.binPack();
List<Long> expected = toGroupStream(table, basicRewrite)
.mapToLong(RewriteFileGroup::sizeInBytes)
.boxed()
.collect(Collectors.toList());

BaseRewriteDataFilesSparkAction jobOrderRewrite =
(BaseRewriteDataFilesSparkAction) basicRewrite(table)
.option(RewriteDataFiles.REWRITE_JOB_ORDER, RewriteJobOrder.BYTES_DESC.orderName())
.binPack();
List<Long> actual = toGroupStream(table, jobOrderRewrite)
.mapToLong(RewriteFileGroup::sizeInBytes)
.boxed()
.collect(Collectors.toList());

expected.sort(Comparator.reverseOrder());
Assert.assertEquals("Size in bytes order should be descending", actual, expected);
Collections.reverse(expected);
Assert.assertNotEquals("Size in bytes order should not be ascending", actual, expected);
}

@Test
public void testRewriteJobOrderFilesAsc() {
Table table = createTablePartitioned(4, 2);
writeRecords(1, SCALE, 1);
writeRecords(2, SCALE, 2);
writeRecords(3, SCALE, 3);
writeRecords(4, SCALE, 4);
table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();

BaseRewriteDataFilesSparkAction basicRewrite =
(BaseRewriteDataFilesSparkAction) basicRewrite(table)
.binPack();
List<Long> expected = toGroupStream(table, basicRewrite)
.mapToLong(RewriteFileGroup::numFiles)
.boxed()
.collect(Collectors.toList());

BaseRewriteDataFilesSparkAction jobOrderRewrite =
(BaseRewriteDataFilesSparkAction) basicRewrite(table)
.option(RewriteDataFiles.REWRITE_JOB_ORDER, RewriteJobOrder.FILES_ASC.orderName())
.binPack();
List<Long> actual = toGroupStream(table, jobOrderRewrite)
.mapToLong(RewriteFileGroup::numFiles)
.boxed()
.collect(Collectors.toList());

expected.sort(Comparator.naturalOrder());
Assert.assertEquals("Number of files order should be ascending", actual, expected);
Collections.reverse(expected);
Assert.assertNotEquals("Number of files order should not be descending", actual, expected);
}

@Test
public void testRewriteJobOrderFilesDesc() {
Table table = createTablePartitioned(4, 2);
writeRecords(1, SCALE, 1);
writeRecords(2, SCALE, 2);
writeRecords(3, SCALE, 3);
writeRecords(4, SCALE, 4);
table.updateProperties().set(TableProperties.FORMAT_VERSION, "2").commit();

BaseRewriteDataFilesSparkAction basicRewrite =
(BaseRewriteDataFilesSparkAction) basicRewrite(table)
.binPack();
List<Long> expected = toGroupStream(table, basicRewrite)
.mapToLong(RewriteFileGroup::numFiles)
.boxed()
.collect(Collectors.toList());

BaseRewriteDataFilesSparkAction jobOrderRewrite =
(BaseRewriteDataFilesSparkAction) basicRewrite(table)
.option(RewriteDataFiles.REWRITE_JOB_ORDER, RewriteJobOrder.FILES_DESC.orderName())
.binPack();
List<Long> actual = toGroupStream(table, jobOrderRewrite)
.mapToLong(RewriteFileGroup::numFiles)
.boxed()
.collect(Collectors.toList());

expected.sort(Comparator.reverseOrder());
Assert.assertEquals("Number of files order should be descending", actual, expected);
Collections.reverse(expected);
Assert.assertNotEquals("Number of files order should not be ascending", actual, expected);
}

private Stream<RewriteFileGroup> toGroupStream(Table table,
BaseRewriteDataFilesSparkAction rewrite) {
rewrite.validateAndInitOptions();
Map<StructLike, List<List<FileScanTask>>> fileGroupsByPartition =
rewrite.planFileGroups(table.currentSnapshot().snapshotId());

return rewrite.toGroupStream(
new RewriteExecutionContext(fileGroupsByPartition), fileGroupsByPartition);
}

protected List<Object[]> currentData() {
return rowsToJava(spark.read().format("iceberg").load(tableLocation)
.sort("c1", "c2", "c3")
Expand Down
Loading