diff --git a/core/src/main/java/org/apache/iceberg/actions/FileRewriteExecutor.java b/core/src/main/java/org/apache/iceberg/actions/FileRewriteExecutor.java
new file mode 100644
index 000000000000..5d589c6931c5
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/actions/FileRewriteExecutor.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+
+/**
+ * A class for rewriting content file groups ({@link FileRewriteGroup}). The lifecycle for the
+ * executor looks like the following:
+ *
+ *
+ *
{@link #init(Map)} initializes the executor with the configuration parameters
+ *
{@link #initPlan(FileRewritePlan)} initializes the executor with the configuration
+ * calculated during planning ({@link FileRewritePlan#writeMaxFileSize()}, {@link
+ * RewriteFilePlan#outputSpecId()}
+ *
{@link #rewrite(FileRewriteGroup)} called for every group in the plan to do the actual
+ * rewrite of the files, and returns the generated new files.
+ *
+ *
+ * A single executor could be used to rewrite multiple groups for the same plan.
+ *
+ * @param the Java type of the plan info
+ * @param the Java type of the tasks to read content files
+ * @param the Java type of the content files
+ * @param the Java type of the planned groups
+ * @param
the Java type of the plan to execute
+ */
+public interface FileRewriteExecutor<
+ I,
+ T extends ContentScanTask,
+ F extends ContentFile,
+ G extends FileRewriteGroup,
+ P extends FileRewritePlan> {
+
+ /** Returns a description for this rewriter. */
+ default String description() {
+ return getClass().getName();
+ }
+
+ /**
+ * Returns a set of supported options for this rewriter. Only options specified in this list will
+ * be accepted at runtime. Any other options will be rejected.
+ */
+ Set validOptions();
+
+ /**
+ * Initializes this rewriter using provided options.
+ *
+ * @param options options to initialize this rewriter
+ */
+ void init(Map options);
+
+ /**
+ * Initializes the rewriter using the information generated during planning.
+ *
+ * @param plan containing the configuration data
+ */
+ void initPlan(P plan);
+
+ /**
+ * Rewrite a group of files represented by the given list of scan tasks.
+ *
+ *
The implementation is supposed to be engine-specific (e.g. Spark, Flink, Trino).
+ *
+ * @param group of scan tasks for files to be rewritten together
+ * @return a set of newly written files
+ */
+ Set rewrite(G group);
+}
diff --git a/core/src/main/java/org/apache/iceberg/actions/FileRewriteGroup.java b/core/src/main/java/org/apache/iceberg/actions/FileRewriteGroup.java
new file mode 100644
index 000000000000..08f6e050b163
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/actions/FileRewriteGroup.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.util.Comparator;
+import java.util.List;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.RewriteJobOrder;
+
+/**
+ * Container class representing a set of files to be rewritten by a {@link FileRewriteExecutor}.
+ *
+ * @param the Java type of the plan info
+ * @param the Java type of the tasks to read content files
+ * @param the Java type of the content files
+ */
+public abstract class FileRewriteGroup, F extends ContentFile> {
+ private final I info;
+ private final List fileScanTasks;
+ private final long splitSize;
+ private final int expectedOutputFiles;
+
+ FileRewriteGroup(I info, List fileScanTasks, long splitSize, int expectedOutputFiles) {
+ this.info = info;
+ this.fileScanTasks = fileScanTasks;
+ this.splitSize = splitSize;
+ this.expectedOutputFiles = expectedOutputFiles;
+ }
+
+ /** Identifiers and partition information about the group. */
+ public I info() {
+ return info;
+ }
+
+ /** Input of the group. {@link ContentScanTask}s to read. */
+ public List fileScans() {
+ return fileScanTasks;
+ }
+
+ /** Expected split size for the output files. */
+ public long splitSize() {
+ return splitSize;
+ }
+
+ /** Expected number of the output files. */
+ public int expectedOutputFiles() {
+ return expectedOutputFiles;
+ }
+
+ /** Accumulated size for the input files. */
+ public long sizeInBytes() {
+ return fileScanTasks.stream().mapToLong(T::length).sum();
+ }
+
+ /** Number of the input files. */
+ public int numInputFiles() {
+ return fileScanTasks.size();
+ }
+
+ /** Comparator to order the FileRewriteGroups based on a provided {@link RewriteJobOrder}. */
+ public static , F extends ContentFile>
+ Comparator> taskComparator(RewriteJobOrder rewriteJobOrder) {
+ switch (rewriteJobOrder) {
+ case BYTES_ASC:
+ return Comparator.comparing(FileRewriteGroup::sizeInBytes);
+ case BYTES_DESC:
+ return Comparator.comparing(FileRewriteGroup::sizeInBytes, Comparator.reverseOrder());
+ case FILES_ASC:
+ return Comparator.comparing(FileRewriteGroup::numInputFiles);
+ case FILES_DESC:
+ return Comparator.comparing(FileRewriteGroup::numInputFiles, Comparator.reverseOrder());
+ default:
+ return (unused, unused2) -> 0;
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/actions/FileRewritePlan.java b/core/src/main/java/org/apache/iceberg/actions/FileRewritePlan.java
new file mode 100644
index 000000000000..f313fd1b070d
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/actions/FileRewritePlan.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.StructLike;
+
+/**
+ * Result of the file rewrite planning as generated by the {@link FileRewritePlanner#plan()}.
+ *
+ *
The plan contains the stream of the planned groups and statistics about the number of the
+ * generated groups, like the total number of the groups and the groups per partition. The plan also
+ * contains some calculated values required by the {@link FileRewriteExecutor}s where the values are
+ * based on the input data and the planning parameters.
+ *
+ *
Groups in a plan could be processed independently. For example, in Spark this means that each
+ * group would be rewritten in its own Spark job.
+ *
+ * @param the Java type of the plan info
+ * @param the Java type of the tasks to read content files
+ * @param the Java type of the content files
+ * @param the Java type of the planned groups
+ */
+public abstract class FileRewritePlan<
+ I,
+ T extends ContentScanTask,
+ F extends ContentFile,
+ G extends FileRewriteGroup> {
+ private final Stream groups;
+ private final int totalGroupCount;
+ private final Map groupsInPartition;
+ private final long writeMaxFileSize;
+
+ protected FileRewritePlan(
+ Stream groups,
+ int totalGroupCount,
+ Map groupsInPartition,
+ long writeMaxFileSize) {
+ this.groups = groups;
+ this.totalGroupCount = totalGroupCount;
+ this.groupsInPartition = groupsInPartition;
+ this.writeMaxFileSize = writeMaxFileSize;
+ }
+
+ /** The stream of the generated {@link FileRewriteGroup}s. */
+ public Stream groups() {
+ return groups;
+ }
+
+ /** The number of the generated groups in the given partition. */
+ public int groupsInPartition(StructLike partition) {
+ return groupsInPartition.get(partition);
+ }
+
+ /** The total number of the groups generated by this plan. */
+ public int totalGroupCount() {
+ return totalGroupCount;
+ }
+
+ /** Calculated maximum file size based on the planner target file size configuration */
+ public long writeMaxFileSize() {
+ return writeMaxFileSize;
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/actions/FileRewritePlanner.java b/core/src/main/java/org/apache/iceberg/actions/FileRewritePlanner.java
new file mode 100644
index 000000000000..0d242bbe6bb0
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/actions/FileRewritePlanner.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+
+/**
+ * A class for planning content file rewrites.
+ *
+ *
The entire rewrite operation is broken down into pieces. The grouping is based on partitioning
+ * and the planning could create multiple groups within a partition. As a result {@link
+ * FileRewritePlan} is generated which contains the data need by the {@link FileRewriteExecutor}s
+ * which execute the actual file rewrite.
+ *
+ *
The lifecycle of the planner is:
+ *
+ *
+ *
{@link #init(Map)} initializes the planner with the configuration parameters
+ *
{@link #plan()} generates the plan for the given configuration
+ *
+ *
+ * @param the Java type of the plan info
+ * @param the Java type of the tasks to read content files
+ * @param the Java type of the content files
+ * @param the Java type of the planned groups
+ */
+public interface FileRewritePlanner<
+ I,
+ T extends ContentScanTask,
+ F extends ContentFile,
+ G extends FileRewriteGroup> {
+
+ /** Returns a description for this rewriter. */
+ default String description() {
+ return getClass().getName();
+ }
+
+ /**
+ * Returns a set of supported options for this rewriter. Only options specified in this list will
+ * be accepted at runtime. Any other options will be rejected.
+ */
+ Set validOptions();
+
+ /**
+ * Initializes this rewriter using provided options.
+ *
+ * @param options options to initialize this rewriter
+ */
+ void init(Map options);
+
+ /**
+ * Generates the plan for rewrite.
+ *
+ * @return the generated plan which could be executed during the compaction
+ */
+ FileRewritePlan plan();
+}
diff --git a/core/src/main/java/org/apache/iceberg/actions/FileRewriter.java b/core/src/main/java/org/apache/iceberg/actions/FileRewriter.java
index 7c6b4e8d7ef5..f014aea0c034 100644
--- a/core/src/main/java/org/apache/iceberg/actions/FileRewriter.java
+++ b/core/src/main/java/org/apache/iceberg/actions/FileRewriter.java
@@ -34,7 +34,10 @@
*
* @param the Java type of tasks to read content files
* @param the Java type of content files
+ * @deprecated since 1.8.0, will be removed in 1.9.0; use {@link FileRewritePlanner} and {@link
+ * FileRewriteExecutor}.
*/
+@Deprecated
public interface FileRewriter, F extends ContentFile> {
/** Returns a description for this rewriter. */
diff --git a/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java
index dfc9842780f5..996e7b0f8ba2 100644
--- a/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java
+++ b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java
@@ -31,26 +31,26 @@
import org.apache.iceberg.util.DataFileSet;
/**
- * Container class representing a set of files to be rewritten by a RewriteAction and the new files
- * which have been written by the action.
+ * Container class representing a set of data files to be rewritten by a RewriteAction and the new
+ * files which have been written by the action.
*/
-public class RewriteFileGroup {
- private final FileGroupInfo info;
- private final List fileScanTasks;
-
+public class RewriteFileGroup extends FileRewriteGroup {
private DataFileSet addedFiles = DataFileSet.create();
+ /**
+ * @deprecated since 1.8.0, will be removed in 1.9.0.
+ */
+ @Deprecated
public RewriteFileGroup(FileGroupInfo info, List fileScanTasks) {
- this.info = info;
- this.fileScanTasks = fileScanTasks;
- }
-
- public FileGroupInfo info() {
- return info;
+ this(info, fileScanTasks, 0L, 0);
}
- public List fileScans() {
- return fileScanTasks;
+ public RewriteFileGroup(
+ FileGroupInfo info,
+ List fileScanTasks,
+ long splitSize,
+ int expectedOutputFiles) {
+ super(info, fileScanTasks, splitSize, expectedOutputFiles);
}
public void setOutputFiles(Set files) {
@@ -70,9 +70,9 @@ public Set addedFiles() {
public RewriteDataFiles.FileGroupRewriteResult asResult() {
Preconditions.checkState(addedFiles != null, "Cannot get result, Group was never rewritten");
return ImmutableRewriteDataFiles.FileGroupRewriteResult.builder()
- .info(info)
+ .info(info())
.addedDataFilesCount(addedFiles.size())
- .rewrittenDataFilesCount(fileScanTasks.size())
+ .rewrittenDataFilesCount(fileScans().size())
.rewrittenBytesCount(sizeInBytes())
.build();
}
@@ -80,8 +80,8 @@ public RewriteDataFiles.FileGroupRewriteResult asResult() {
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
- .add("info", info)
- .add("numRewrittenFiles", fileScanTasks.size())
+ .add("info", info())
+ .add("numRewrittenFiles", fileScans().size())
.add(
"numAddedFiles",
addedFiles == null ? "Rewrite Incomplete" : Integer.toString(addedFiles.size()))
@@ -89,14 +89,19 @@ public String toString() {
.toString();
}
- public long sizeInBytes() {
- return fileScanTasks.stream().mapToLong(FileScanTask::length).sum();
- }
-
+ /**
+ * @deprecated since 1.8.0, will be removed in 1.9.0. Use {@link #numInputFiles()} instead.
+ */
+ @Deprecated
public int numFiles() {
- return fileScanTasks.size();
+ return fileScans().size();
}
+ /**
+ * @deprecated since 1.8.0, will be removed in 1.9.0. Use {@link
+ * FileRewriteGroup#taskComparator(RewriteJobOrder)} instead.
+ */
+ @Deprecated
public static Comparator comparator(RewriteJobOrder rewriteJobOrder) {
switch (rewriteJobOrder) {
case BYTES_ASC:
diff --git a/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroupPlanner.java b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroupPlanner.java
new file mode 100644
index 000000000000..2ba632fcf061
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroupPlanner.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.RewriteJobOrder;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.actions.RewriteDataFiles.FileGroupInfo;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Groups specified files in the {@link Table} by {@link RewriteFileGroup}s. These will be grouped
+ * by partitions. Extends {@link SizeBasedFileRewritePlanner} with delete file threshold and job
+ * {@link RewriteDataFiles#REWRITE_JOB_ORDER} handling.
+ */
+public class RewriteFileGroupPlanner
+ extends SizeBasedFileRewritePlanner {
+ /**
+ * The minimum number of deletes that needs to be associated with a data file for it to be
+ * considered for rewriting. If a data file has this number of deletes or more, it will be
+ * rewritten regardless of its file size determined by {@link #MIN_FILE_SIZE_BYTES} and {@link
+ * #MAX_FILE_SIZE_BYTES}. If a file group contains a file that satisfies this condition, the file
+ * group will be rewritten regardless of the number of files in the file group determined by
+ * {@link #MIN_INPUT_FILES}.
+ *
+ *
Defaults to Integer.MAX_VALUE, which means this feature is not enabled by default.
+ */
+ public static final String DELETE_FILE_THRESHOLD = "delete-file-threshold";
+
+ public static final int DELETE_FILE_THRESHOLD_DEFAULT = Integer.MAX_VALUE;
+
+ private static final Logger LOG = LoggerFactory.getLogger(RewriteFileGroupPlanner.class);
+
+ private final Expression filter;
+ private final Long snapshotId;
+ private final boolean caseSensitive;
+
+ private int deleteFileThreshold;
+ private RewriteJobOrder rewriteJobOrder;
+
+ public RewriteFileGroupPlanner(Table table) {
+ this(table, Expressions.alwaysTrue());
+ }
+
+ public RewriteFileGroupPlanner(Table table, Expression filter) {
+ this(
+ table,
+ filter,
+ table.currentSnapshot() != null ? table.currentSnapshot().snapshotId() : null,
+ false);
+ }
+
+ /**
+ * Creates the planner for the given table.
+ *
+ * @param table to plan for
+ * @param filter used to remove files from the plan
+ * @param snapshotId used as a basis for planning - should be used as starting snapshot id at
+ * commit time when replacing the files
+ * @param caseSensitive property used for scanning
+ */
+ public RewriteFileGroupPlanner(
+ Table table, Expression filter, Long snapshotId, boolean caseSensitive) {
+ super(table);
+ this.filter = filter;
+ this.snapshotId = snapshotId;
+ this.caseSensitive = caseSensitive;
+ }
+
+ @Override
+ public Set validOptions() {
+ return ImmutableSet.builder()
+ .addAll(super.validOptions())
+ .add(DELETE_FILE_THRESHOLD)
+ .add(RewriteDataFiles.REWRITE_JOB_ORDER)
+ .build();
+ }
+
+ @Override
+ public void init(Map options) {
+ super.init(options);
+ this.deleteFileThreshold = deleteFileThreshold(options);
+ this.rewriteJobOrder =
+ RewriteJobOrder.fromName(
+ PropertyUtil.propertyAsString(
+ options,
+ RewriteDataFiles.REWRITE_JOB_ORDER,
+ RewriteDataFiles.REWRITE_JOB_ORDER_DEFAULT));
+ }
+
+ @Override
+ protected Iterable filterFiles(Iterable tasks) {
+ return Iterables.filter(tasks, task -> wronglySized(task) || tooManyDeletes(task));
+ }
+
+ @Override
+ protected Iterable> filterFileGroups(List> groups) {
+ return Iterables.filter(groups, this::shouldRewrite);
+ }
+
+ @Override
+ protected long defaultTargetFileSize() {
+ return PropertyUtil.propertyAsLong(
+ table().properties(),
+ TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
+ TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+ }
+
+ @Override
+ public RewriteFilePlan plan() {
+ StructLikeMap>> plan = planFileGroups();
+ RewriteExecutionContext ctx = new RewriteExecutionContext();
+ Stream groups =
+ plan.entrySet().stream()
+ .filter(e -> !e.getValue().isEmpty())
+ .flatMap(
+ e -> {
+ StructLike partition = e.getKey();
+ List> scanGroups = e.getValue();
+ return scanGroups.stream()
+ .map(
+ tasks -> {
+ long inputSize = inputSize(tasks);
+ return newRewriteGroup(
+ ctx,
+ partition,
+ tasks,
+ splitSize(inputSize),
+ numOutputFiles(inputSize));
+ });
+ })
+ .sorted(FileRewriteGroup.taskComparator(rewriteJobOrder));
+ Map groupsInPartition = plan.transformValues(List::size);
+ int totalGroupCount = groupsInPartition.values().stream().reduce(Integer::sum).orElse(0);
+ return new RewriteFilePlan(
+ groups, totalGroupCount, groupsInPartition, writeMaxFileSize(), outputSpecId());
+ }
+
+ private CloseableIterable tasks() {
+ TableScan scan =
+ table().newScan().filter(filter).caseSensitive(caseSensitive).ignoreResiduals();
+
+ if (snapshotId != null) {
+ scan = scan.useSnapshot(snapshotId);
+ }
+
+ return scan.planFiles();
+ }
+
+ private int deleteFileThreshold(Map options) {
+ int value =
+ PropertyUtil.propertyAsInt(options, DELETE_FILE_THRESHOLD, DELETE_FILE_THRESHOLD_DEFAULT);
+ Preconditions.checkArgument(
+ value >= 0, "'%s' is set to %s but must be >= 0", DELETE_FILE_THRESHOLD, value);
+ return value;
+ }
+
+ private boolean tooManyDeletes(FileScanTask task) {
+ return task.deletes() != null && task.deletes().size() >= deleteFileThreshold;
+ }
+
+ private boolean shouldRewrite(List group) {
+ return enoughInputFiles(group)
+ || enoughContent(group)
+ || tooMuchContent(group)
+ || anyTaskHasTooManyDeletes(group);
+ }
+
+ private boolean anyTaskHasTooManyDeletes(List group) {
+ return group.stream().anyMatch(this::tooManyDeletes);
+ }
+
+ private StructLikeMap>> planFileGroups() {
+ CloseableIterable fileScanTasks = tasks();
+
+ try {
+ Types.StructType partitionType = table().spec().partitionType();
+ StructLikeMap> filesByPartition =
+ groupByPartition(table(), partitionType, fileScanTasks);
+ return filesByPartition.transformValues(tasks -> ImmutableList.copyOf(planFileGroups(tasks)));
+ } finally {
+ try {
+ fileScanTasks.close();
+ } catch (IOException io) {
+ LOG.error("Cannot properly close file iterable while planning for rewrite", io);
+ }
+ }
+ }
+
+ private StructLikeMap> groupByPartition(
+ Table table, Types.StructType partitionType, Iterable tasks) {
+ StructLikeMap> filesByPartition = StructLikeMap.create(partitionType);
+ StructLike emptyStruct = GenericRecord.create(partitionType);
+
+ for (FileScanTask task : tasks) {
+ // If a task uses an incompatible partition spec the data inside could contain values
+ // which belong to multiple partitions in the current spec. Treating all such files as
+ // un-partitioned and grouping them together helps to minimize new files made.
+ StructLike taskPartition =
+ task.file().specId() == table.spec().specId() ? task.file().partition() : emptyStruct;
+
+ filesByPartition.computeIfAbsent(taskPartition, unused -> Lists.newArrayList()).add(task);
+ }
+
+ return filesByPartition;
+ }
+
+ private RewriteFileGroup newRewriteGroup(
+ RewriteExecutionContext ctx,
+ StructLike partition,
+ List tasks,
+ long splitSize,
+ int numOutputSize) {
+ FileGroupInfo info =
+ ImmutableRewriteDataFiles.FileGroupInfo.builder()
+ .globalIndex(ctx.currentGlobalIndex())
+ .partitionIndex(ctx.currentPartitionIndex(partition))
+ .partition(partition)
+ .build();
+ return new RewriteFileGroup(info, Lists.newArrayList(tasks), splitSize, numOutputSize);
+ }
+
+ private static class RewriteExecutionContext {
+ private final Map partitionIndexMap;
+ private final AtomicInteger groupIndex;
+
+ private RewriteExecutionContext() {
+ this.partitionIndexMap = Maps.newConcurrentMap();
+ this.groupIndex = new AtomicInteger(1);
+ }
+
+ private int currentGlobalIndex() {
+ return groupIndex.getAndIncrement();
+ }
+
+ private int currentPartitionIndex(StructLike partition) {
+ return partitionIndexMap.merge(partition, 1, Integer::sum);
+ }
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/actions/RewriteFilePlan.java b/core/src/main/java/org/apache/iceberg/actions/RewriteFilePlan.java
new file mode 100644
index 000000000000..b6d9a35ebd9f
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/actions/RewriteFilePlan.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.StructLike;
+
+/** Result of the data file rewrite planning. */
+public class RewriteFilePlan
+ extends FileRewritePlan<
+ RewriteDataFiles.FileGroupInfo, FileScanTask, DataFile, RewriteFileGroup> {
+ private final int outputSpecId;
+
+ public RewriteFilePlan(
+ Stream groups,
+ int totalGroupCount,
+ Map groupsInPartition,
+ long writeMaxFileSize,
+ int outputSpecId) {
+ super(groups, totalGroupCount, groupsInPartition, writeMaxFileSize);
+ this.outputSpecId = outputSpecId;
+ }
+
+ /** Partition specification id for the target files */
+ public int outputSpecId() {
+ return outputSpecId;
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletePlan.java b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletePlan.java
new file mode 100644
index 000000000000..15ee241ad99e
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletePlan.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.util.Map;
+import java.util.stream.Stream;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.StructLike;
+
+/** Result of the positional delete file rewrite planning. */
+public class RewritePositionDeletePlan
+ extends FileRewritePlan<
+ RewritePositionDeleteFiles.FileGroupInfo,
+ PositionDeletesScanTask,
+ DeleteFile,
+ RewritePositionDeletesGroup> {
+ public RewritePositionDeletePlan(
+ Stream groups,
+ int totalGroupCount,
+ Map groupsInPartition,
+ long writeMaxFileSize) {
+ super(groups, totalGroupCount, groupsInPartition, writeMaxFileSize);
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java
index d1c688417a64..c7b1f9ddaf51 100644
--- a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java
+++ b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java
@@ -35,27 +35,37 @@
* Container class representing a set of position delete files to be rewritten by a {@link
* RewritePositionDeleteFiles} and the new files which have been written by the action.
*/
-public class RewritePositionDeletesGroup {
- private final FileGroupInfo info;
- private final List tasks;
+public class RewritePositionDeletesGroup
+ extends FileRewriteGroup {
private final long maxRewrittenDataSequenceNumber;
private DeleteFileSet addedDeleteFiles = DeleteFileSet.create();
+ /**
+ * @deprecated since 1.8.0, will be removed in 1.9.0.
+ */
+ @Deprecated
public RewritePositionDeletesGroup(FileGroupInfo info, List tasks) {
+ this(info, tasks, 0L, 0);
+ }
+
+ public RewritePositionDeletesGroup(
+ FileGroupInfo info,
+ List tasks,
+ long splitSize,
+ int expectedOutputFiles) {
+ super(info, tasks, splitSize, expectedOutputFiles);
Preconditions.checkArgument(!tasks.isEmpty(), "Tasks must not be empty");
- this.info = info;
- this.tasks = tasks;
this.maxRewrittenDataSequenceNumber =
tasks.stream().mapToLong(t -> t.file().dataSequenceNumber()).max().getAsLong();
}
- public FileGroupInfo info() {
- return info;
- }
-
+ /**
+ * @deprecated since 1.8.0, will be removed in 1.9.0. Use {@link #fileScans()} instead.
+ */
+ @Deprecated
public List tasks() {
- return tasks;
+ return fileScans();
}
public void setOutputFiles(Set files) {
@@ -67,7 +77,7 @@ public long maxRewrittenDataSequenceNumber() {
}
public Set rewrittenDeleteFiles() {
- return tasks().stream()
+ return fileScans().stream()
.map(PositionDeletesScanTask::file)
.collect(Collectors.toCollection(DeleteFileSet::create));
}
@@ -81,9 +91,9 @@ public FileGroupRewriteResult asResult() {
addedDeleteFiles != null, "Cannot get result, Group was never rewritten");
return ImmutableRewritePositionDeleteFiles.FileGroupRewriteResult.builder()
- .info(info)
+ .info(info())
.addedDeleteFilesCount(addedDeleteFiles.size())
- .rewrittenDeleteFilesCount(tasks.size())
+ .rewrittenDeleteFilesCount(fileScans().size())
.rewrittenBytesCount(rewrittenBytes())
.addedBytesCount(addedBytes())
.build();
@@ -92,8 +102,8 @@ public FileGroupRewriteResult asResult() {
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
- .add("info", info)
- .add("numRewrittenPositionDeleteFiles", tasks.size())
+ .add("info", info())
+ .add("numRewrittenPositionDeleteFiles", fileScans().size())
.add(
"numAddedPositionDeleteFiles",
addedDeleteFiles == null
@@ -105,17 +115,26 @@ public String toString() {
}
public long rewrittenBytes() {
- return tasks.stream().mapToLong(PositionDeletesScanTask::length).sum();
+ return fileScans().stream().mapToLong(PositionDeletesScanTask::length).sum();
}
public long addedBytes() {
return addedDeleteFiles.stream().mapToLong(DeleteFile::fileSizeInBytes).sum();
}
+ /**
+ * @deprecated since 1.8.0, will be removed in 1.9.0. Use {@link #numInputFiles()} instead.
+ */
+ @Deprecated
public int numRewrittenDeleteFiles() {
- return tasks.size();
+ return fileScans().size();
}
+ /**
+ * @deprecated since 1.8.0, will be removed in 1.9.0. Use {@link
+ * FileRewriteGroup#taskComparator(RewriteJobOrder)} instead.
+ */
+ @Deprecated
public static Comparator comparator(RewriteJobOrder order) {
switch (order) {
case BYTES_ASC:
diff --git a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroupPlanner.java b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroupPlanner.java
new file mode 100644
index 000000000000..14bd3cabefcc
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroupPlanner.java
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Stream;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.MetadataTableType;
+import org.apache.iceberg.MetadataTableUtils;
+import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.PositionDeletesScanTask;
+import org.apache.iceberg.PositionDeletesTable;
+import org.apache.iceberg.RewriteJobOrder;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.actions.RewritePositionDeleteFiles.FileGroupInfo;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.PropertyUtil;
+import org.apache.iceberg.util.StructLikeMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Groups specified files in the {@link Table} by {@link RewriteFileGroup}s. These will be grouped
+ * by partitions. Extends the {@link SizeBasedFileRewritePlanner} with {@link
+ * RewritePositionDeleteFiles#REWRITE_JOB_ORDER} handling.
+ */
+public class RewritePositionDeletesGroupPlanner
+ extends SizeBasedFileRewritePlanner<
+ FileGroupInfo, PositionDeletesScanTask, DeleteFile, RewritePositionDeletesGroup> {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RewritePositionDeletesGroupPlanner.class);
+
+ private final Expression filter;
+ private final boolean caseSensitive;
+ private RewriteJobOrder rewriteJobOrder;
+
+ public RewritePositionDeletesGroupPlanner(Table table) {
+ this(table, Expressions.alwaysTrue(), false);
+ }
+
+ /**
+ * Creates the planner for the given table.
+ *
+ * @param table to plan for
+ * @param filter used to remove files from the plan
+ * @param caseSensitive property used for scanning
+ */
+ public RewritePositionDeletesGroupPlanner(Table table, Expression filter, boolean caseSensitive) {
+ super(table);
+ this.caseSensitive = caseSensitive;
+ this.filter = filter;
+ }
+
+ @Override
+ public Set validOptions() {
+ return ImmutableSet.builder()
+ .addAll(super.validOptions())
+ .add(RewritePositionDeleteFiles.REWRITE_JOB_ORDER)
+ .build();
+ }
+
+ @Override
+ public void init(Map options) {
+ super.init(options);
+ this.rewriteJobOrder =
+ RewriteJobOrder.fromName(
+ PropertyUtil.propertyAsString(
+ options,
+ RewritePositionDeleteFiles.REWRITE_JOB_ORDER,
+ RewritePositionDeleteFiles.REWRITE_JOB_ORDER_DEFAULT));
+ }
+
+ @Override
+ public RewritePositionDeletePlan plan() {
+ StructLikeMap>> plan = planFileGroups();
+ RewriteExecutionContext ctx = new RewriteExecutionContext();
+ Stream groups =
+ plan.entrySet().stream()
+ .filter(e -> !e.getValue().isEmpty())
+ .flatMap(
+ e -> {
+ StructLike partition = e.getKey();
+ List> scanGroups = e.getValue();
+ return scanGroups.stream()
+ .map(
+ tasks -> {
+ long inputSize = inputSize(tasks);
+ return newRewriteGroup(
+ ctx,
+ partition,
+ tasks,
+ splitSize(inputSize),
+ numOutputFiles(inputSize));
+ });
+ })
+ .sorted(FileRewriteGroup.taskComparator(rewriteJobOrder));
+ Map groupsInPartition = plan.transformValues(List::size);
+ int totalGroupCount = groupsInPartition.values().stream().reduce(Integer::sum).orElse(0);
+ return new RewritePositionDeletePlan(
+ groups, totalGroupCount, groupsInPartition, writeMaxFileSize());
+ }
+
+ @Override
+ protected Iterable filterFiles(Iterable tasks) {
+ return Iterables.filter(tasks, this::wronglySized);
+ }
+
+ @Override
+ protected Iterable> filterFileGroups(
+ List> groups) {
+ return Iterables.filter(groups, this::shouldRewrite);
+ }
+
+ @Override
+ protected long defaultTargetFileSize() {
+ return PropertyUtil.propertyAsLong(
+ table().properties(),
+ TableProperties.DELETE_TARGET_FILE_SIZE_BYTES,
+ TableProperties.DELETE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+ }
+
+ private StructLikeMap>> planFileGroups() {
+ Table deletesTable =
+ MetadataTableUtils.createMetadataTableInstance(table(), MetadataTableType.POSITION_DELETES);
+ CloseableIterable fileTasks = planFiles(deletesTable);
+
+ try {
+ Types.StructType partitionType = Partitioning.partitionType(deletesTable);
+ StructLikeMap> fileTasksByPartition =
+ groupByPartition(partitionType, fileTasks);
+ return fileTasksByPartition.transformValues(
+ tasks -> ImmutableList.copyOf(planFileGroups(tasks)));
+ } finally {
+ try {
+ fileTasks.close();
+ } catch (IOException io) {
+ LOG.error("Cannot properly close file iterable while planning for rewrite", io);
+ }
+ }
+ }
+
+ private CloseableIterable planFiles(Table deletesTable) {
+ PositionDeletesTable.PositionDeletesBatchScan scan =
+ (PositionDeletesTable.PositionDeletesBatchScan) deletesTable.newBatchScan();
+ return CloseableIterable.transform(
+ scan.baseTableFilter(filter).caseSensitive(caseSensitive).ignoreResiduals().planFiles(),
+ PositionDeletesScanTask.class::cast);
+ }
+
+ private StructLikeMap> groupByPartition(
+ Types.StructType partitionType, Iterable tasks) {
+ StructLikeMap> filesByPartition =
+ StructLikeMap.create(partitionType);
+
+ for (PositionDeletesScanTask task : tasks) {
+ StructLike coerced = coercePartition(task, partitionType);
+
+ List partitionTasks = filesByPartition.get(coerced);
+ if (partitionTasks == null) {
+ partitionTasks = Lists.newArrayList();
+ }
+ partitionTasks.add(task);
+ filesByPartition.put(coerced, partitionTasks);
+ }
+
+ return filesByPartition;
+ }
+
+ private RewritePositionDeletesGroup newRewriteGroup(
+ RewriteExecutionContext ctx,
+ StructLike partition,
+ List tasks,
+ long splitSize,
+ int numOutputSize) {
+ ImmutableRewritePositionDeleteFiles.FileGroupInfo info =
+ ImmutableRewritePositionDeleteFiles.FileGroupInfo.builder()
+ .globalIndex(ctx.currentGlobalIndex())
+ .partitionIndex(ctx.currentPartitionIndex(partition))
+ .partition(partition)
+ .build();
+ return new RewritePositionDeletesGroup(
+ info, Lists.newArrayList(tasks), splitSize, numOutputSize);
+ }
+
+ private boolean shouldRewrite(List group) {
+ return enoughInputFiles(group) || enoughContent(group) || tooMuchContent(group);
+ }
+
+ private static class RewriteExecutionContext {
+ private final Map partitionIndexMap;
+ private final AtomicInteger groupIndex;
+
+ private RewriteExecutionContext() {
+ this.partitionIndexMap = Maps.newConcurrentMap();
+ this.groupIndex = new AtomicInteger(1);
+ }
+
+ private int currentGlobalIndex() {
+ return groupIndex.getAndIncrement();
+ }
+
+ private int currentPartitionIndex(StructLike partition) {
+ return partitionIndexMap.merge(partition, 1, Integer::sum);
+ }
+ }
+
+ private StructLike coercePartition(PositionDeletesScanTask task, Types.StructType partitionType) {
+ return PartitionUtil.coercePartition(partitionType, task.spec(), task.partition());
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/actions/SizeBasedDataRewriter.java b/core/src/main/java/org/apache/iceberg/actions/SizeBasedDataRewriter.java
index e5b5908804e7..5c9e2321fd82 100644
--- a/core/src/main/java/org/apache/iceberg/actions/SizeBasedDataRewriter.java
+++ b/core/src/main/java/org/apache/iceberg/actions/SizeBasedDataRewriter.java
@@ -41,10 +41,17 @@ public abstract class SizeBasedDataRewriter extends SizeBasedFileRewriterDefaults to Integer.MAX_VALUE, which means this feature is not enabled by default.
+ *
+ * @deprecated since 1.8.0, will be removed in 1.9.0; use {@link
+ * RewriteFileGroupPlanner#DELETE_FILE_THRESHOLD}.
*/
- public static final String DELETE_FILE_THRESHOLD = "delete-file-threshold";
+ @Deprecated public static final String DELETE_FILE_THRESHOLD = "delete-file-threshold";
- public static final int DELETE_FILE_THRESHOLD_DEFAULT = Integer.MAX_VALUE;
+ /**
+ * @deprecated since 1.8.0, will be removed in 1.9.0; use {@link
+ * RewriteFileGroupPlanner#DELETE_FILE_THRESHOLD_DEFAULT}.
+ */
+ @Deprecated public static final int DELETE_FILE_THRESHOLD_DEFAULT = Integer.MAX_VALUE;
private int deleteFileThreshold;
diff --git a/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewritePlanner.java b/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewritePlanner.java
new file mode 100644
index 000000000000..edaec5af0f27
--- /dev/null
+++ b/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewritePlanner.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import java.math.RoundingMode;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.iceberg.ContentFile;
+import org.apache.iceberg.ContentScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.relocated.com.google.common.math.LongMath;
+import org.apache.iceberg.util.BinPacking;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A file rewrite planner that determines which files to rewrite based on their size.
+ *
+ *
If files are smaller than the {@link #MIN_FILE_SIZE_BYTES} threshold or larger than the {@link
+ * #MAX_FILE_SIZE_BYTES} threshold, they are considered targets for being rewritten.
+ *
+ *
Once selected, files are grouped based on the {@link BinPacking bin-packing algorithm} into
+ * groups of no more than {@link #MAX_FILE_GROUP_SIZE_BYTES}. Groups will be actually rewritten if
+ * they contain more than {@link #MIN_INPUT_FILES} or if they would produce at least one file of
+ * {@link #TARGET_FILE_SIZE_BYTES}.
+ *
+ *
Note that implementations may add extra conditions for selecting files or filtering groups.
+ */
+public abstract class SizeBasedFileRewritePlanner<
+ I,
+ T extends ContentScanTask,
+ F extends ContentFile,
+ G extends FileRewriteGroup>
+ implements FileRewritePlanner {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SizeBasedFileRewritePlanner.class);
+
+ /** The target output file size that this file rewriter will attempt to generate. */
+ public static final String TARGET_FILE_SIZE_BYTES = "target-file-size-bytes";
+
+ /**
+ * Controls which files will be considered for rewriting. Files with sizes under this threshold
+ * will be considered for rewriting regardless of any other criteria.
+ *
+ *
Defaults to 75% of the target file size.
+ */
+ public static final String MIN_FILE_SIZE_BYTES = "min-file-size-bytes";
+
+ public static final double MIN_FILE_SIZE_DEFAULT_RATIO = 0.75;
+
+ /**
+ * Controls which files will be considered for rewriting. Files with sizes above this threshold
+ * will be considered for rewriting regardless of any other criteria.
+ *
+ *
Defaults to 180% of the target file size.
+ */
+ public static final String MAX_FILE_SIZE_BYTES = "max-file-size-bytes";
+
+ public static final double MAX_FILE_SIZE_DEFAULT_RATIO = 1.80;
+
+ /**
+ * Any file group exceeding this number of files will be rewritten regardless of other criteria.
+ * This config ensures file groups that contain many files are compacted even if the total size of
+ * that group is less than the target file size. This can also be thought of as the maximum number
+ * of wrongly sized files that could remain in a partition after rewriting.
+ */
+ public static final String MIN_INPUT_FILES = "min-input-files";
+
+ public static final int MIN_INPUT_FILES_DEFAULT = 5;
+
+ /** Overrides other options and forces rewriting of all provided files. */
+ public static final String REWRITE_ALL = "rewrite-all";
+
+ public static final boolean REWRITE_ALL_DEFAULT = false;
+
+ /**
+ * This option controls the largest amount of data that should be rewritten in a single file
+ * group. It helps with breaking down the rewriting of very large partitions which may not be
+ * rewritable otherwise due to the resource constraints of the cluster. For example, a sort-based
+ * rewrite may not scale to TB-sized partitions, and those partitions need to be worked on in
+ * small subsections to avoid exhaustion of resources.
+ */
+ public static final String MAX_FILE_GROUP_SIZE_BYTES = "max-file-group-size-bytes";
+
+ public static final long MAX_FILE_GROUP_SIZE_BYTES_DEFAULT = 100L * 1024 * 1024 * 1024; // 100 GB
+
+ private static final long SPLIT_OVERHEAD = 5L * 1024;
+
+ private final Table table;
+ private long targetFileSize;
+ private long minFileSize;
+ private long maxFileSize;
+ private int minInputFiles;
+ private boolean rewriteAll;
+ private long maxGroupSize;
+ private int outputSpecId;
+
+ protected SizeBasedFileRewritePlanner(Table table) {
+ this.table = table;
+ }
+
+ /** Expected target file size before configuration. */
+ protected abstract long defaultTargetFileSize();
+
+ /** Additional filter for tasks before grouping. */
+ protected abstract Iterable filterFiles(Iterable tasks);
+
+ /** Additional filter for groups. */
+ protected abstract Iterable> filterFileGroups(List> groups);
+
+ @Override
+ public Set validOptions() {
+ return ImmutableSet.of(
+ TARGET_FILE_SIZE_BYTES,
+ MIN_FILE_SIZE_BYTES,
+ MAX_FILE_SIZE_BYTES,
+ MIN_INPUT_FILES,
+ REWRITE_ALL,
+ MAX_FILE_GROUP_SIZE_BYTES);
+ }
+
+ @Override
+ public void init(Map options) {
+ Map sizeThresholds = sizeThresholds(options);
+ this.targetFileSize = sizeThresholds.get(TARGET_FILE_SIZE_BYTES);
+ this.minFileSize = sizeThresholds.get(MIN_FILE_SIZE_BYTES);
+ this.maxFileSize = sizeThresholds.get(MAX_FILE_SIZE_BYTES);
+ this.minInputFiles = minInputFiles(options);
+ this.rewriteAll = rewriteAll(options);
+ this.maxGroupSize = maxGroupSize(options);
+ this.outputSpecId = outputSpecId(options);
+
+ if (rewriteAll) {
+ LOG.info("Configured to rewrite all provided files in table {}", table.name());
+ }
+ }
+
+ protected Table table() {
+ return table;
+ }
+
+ protected boolean wronglySized(T task) {
+ return task.length() < minFileSize || task.length() > maxFileSize;
+ }
+
+ protected Iterable> planFileGroups(Iterable tasks) {
+ Iterable filteredTasks = rewriteAll ? tasks : filterFiles(tasks);
+ BinPacking.ListPacker packer = new BinPacking.ListPacker<>(maxGroupSize, 1, false);
+ List> groups = packer.pack(filteredTasks, ContentScanTask::length);
+ return rewriteAll ? groups : filterFileGroups(groups);
+ }
+
+ protected boolean enoughInputFiles(List group) {
+ return group.size() > 1 && group.size() >= minInputFiles;
+ }
+
+ protected boolean enoughContent(List group) {
+ return group.size() > 1 && inputSize(group) > targetFileSize;
+ }
+
+ protected boolean tooMuchContent(List group) {
+ return inputSize(group) > maxFileSize;
+ }
+
+ protected long inputSize(List group) {
+ return group.stream().mapToLong(ContentScanTask::length).sum();
+ }
+
+ /**
+ * Calculates the split size to use in bin-packing rewrites.
+ *
+ *
This method determines the target split size as the input size divided by the desired number
+ * of output files. The final split size is adjusted to be at least as big as the target file size
+ * but less than the max write file size.
+ */
+ protected long splitSize(long inputSize) {
+ long estimatedSplitSize = (inputSize / numOutputFiles(inputSize)) + SPLIT_OVERHEAD;
+ if (estimatedSplitSize < targetFileSize) {
+ return targetFileSize;
+ } else {
+ return Math.min(estimatedSplitSize, writeMaxFileSize());
+ }
+ }
+
+ /**
+ * Determines the preferable number of output files when rewriting a particular file group.
+ *
+ *
If the rewriter is handling 10.1 GB of data with a target file size of 1 GB, it could
+ * produce 11 files, one of which would only have 0.1 GB. This would most likely be less
+ * preferable to 10 files with 1.01 GB each. So this method decides whether to round up or round
+ * down based on what the estimated average file size will be if the remainder (0.1 GB) is
+ * distributed amongst other files. If the new average file size is no more than 10% greater than
+ * the target file size, then this method will round down when determining the number of output
+ * files. Otherwise, the remainder will be written into a separate file.
+ *
+ * @param inputSize a total input size for a file group
+ * @return the number of files this rewriter should create
+ */
+ protected int numOutputFiles(long inputSize) {
+ if (inputSize < targetFileSize) {
+ return 1;
+ }
+
+ long numFilesWithRemainder = LongMath.divide(inputSize, targetFileSize, RoundingMode.CEILING);
+ long numFilesWithoutRemainder = LongMath.divide(inputSize, targetFileSize, RoundingMode.FLOOR);
+ long avgFileSizeWithoutRemainder = inputSize / numFilesWithoutRemainder;
+
+ if (LongMath.mod(inputSize, targetFileSize) > minFileSize) {
+ // the remainder file is of a valid size for this rewrite so keep it
+ return (int) numFilesWithRemainder;
+
+ } else if (avgFileSizeWithoutRemainder < Math.min(1.1 * targetFileSize, writeMaxFileSize())) {
+ // if the reminder is distributed amongst other files,
+ // the average file size will be no more than 10% bigger than the target file size
+ // so round down and distribute remainder amongst other files
+ return (int) numFilesWithoutRemainder;
+
+ } else {
+ // keep the remainder file as it is not OK to distribute it amongst other files
+ return (int) numFilesWithRemainder;
+ }
+ }
+
+ /**
+ * Estimates a larger max target file size than the target size used in task creation to avoid
+ * creating tiny remainder files.
+ *
+ *
While we create tasks that should all be smaller than our target size, there is a chance
+ * that the actual data will end up being larger than our target size due to various factors of
+ * compression, serialization, which are outside our control. If this occurs, instead of making a
+ * single file that is close in size to our target, we would end up producing one file of the
+ * target size, and then a small extra file with the remaining data.
+ *
+ *
For example, if our target is 512 MB, we may generate a rewrite task that should be 500 MB.
+ * When we write the data we may find we actually have to write out 530 MB. If we use the target
+ * size while writing, we would produce a 512 MB file and an 18 MB file. If instead we use a
+ * larger size estimated by this method, then we end up writing a single file.
+ *
+ * @return the target size plus one half of the distance between max and target
+ */
+ protected long writeMaxFileSize() {
+ return (long) (targetFileSize + ((maxFileSize - targetFileSize) * 0.5));
+ }
+
+ protected int outputSpecId() {
+ return outputSpecId;
+ }
+
+ private int outputSpecId(Map options) {
+ int specId =
+ PropertyUtil.propertyAsInt(options, RewriteDataFiles.OUTPUT_SPEC_ID, table.spec().specId());
+ Preconditions.checkArgument(
+ table.specs().containsKey(specId),
+ "Cannot use output spec id %s because the table does not contain a reference to this spec-id.",
+ specId);
+ return specId;
+ }
+
+ private Map sizeThresholds(Map options) {
+ long target =
+ PropertyUtil.propertyAsLong(options, TARGET_FILE_SIZE_BYTES, defaultTargetFileSize());
+
+ long defaultMin = (long) (target * MIN_FILE_SIZE_DEFAULT_RATIO);
+ long min = PropertyUtil.propertyAsLong(options, MIN_FILE_SIZE_BYTES, defaultMin);
+
+ long defaultMax = (long) (target * MAX_FILE_SIZE_DEFAULT_RATIO);
+ long max = PropertyUtil.propertyAsLong(options, MAX_FILE_SIZE_BYTES, defaultMax);
+
+ Preconditions.checkArgument(
+ target > 0, "'%s' is set to %s but must be > 0", TARGET_FILE_SIZE_BYTES, target);
+
+ Preconditions.checkArgument(
+ min >= 0, "'%s' is set to %s but must be >= 0", MIN_FILE_SIZE_BYTES, min);
+
+ Preconditions.checkArgument(
+ target > min,
+ "'%s' (%s) must be > '%s' (%s), all new files will be smaller than the min threshold",
+ TARGET_FILE_SIZE_BYTES,
+ target,
+ MIN_FILE_SIZE_BYTES,
+ min);
+
+ Preconditions.checkArgument(
+ target < max,
+ "'%s' (%s) must be < '%s' (%s), all new files will be larger than the max threshold",
+ TARGET_FILE_SIZE_BYTES,
+ target,
+ MAX_FILE_SIZE_BYTES,
+ max);
+
+ Map values = Maps.newHashMap();
+
+ values.put(TARGET_FILE_SIZE_BYTES, target);
+ values.put(MIN_FILE_SIZE_BYTES, min);
+ values.put(MAX_FILE_SIZE_BYTES, max);
+
+ return values;
+ }
+
+ private int minInputFiles(Map options) {
+ int value = PropertyUtil.propertyAsInt(options, MIN_INPUT_FILES, MIN_INPUT_FILES_DEFAULT);
+ Preconditions.checkArgument(
+ value > 0, "'%s' is set to %s but must be > 0", MIN_INPUT_FILES, value);
+ return value;
+ }
+
+ private long maxGroupSize(Map options) {
+ long value =
+ PropertyUtil.propertyAsLong(
+ options, MAX_FILE_GROUP_SIZE_BYTES, MAX_FILE_GROUP_SIZE_BYTES_DEFAULT);
+ Preconditions.checkArgument(
+ value > 0, "'%s' is set to %s but must be > 0", MAX_FILE_GROUP_SIZE_BYTES, value);
+ return value;
+ }
+
+ private boolean rewriteAll(Map options) {
+ return PropertyUtil.propertyAsBoolean(options, REWRITE_ALL, REWRITE_ALL_DEFAULT);
+ }
+}
diff --git a/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java b/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java
index cea7003c1a38..00ef0b6694de 100644
--- a/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java
+++ b/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java
@@ -47,7 +47,11 @@
* {@link #TARGET_FILE_SIZE_BYTES}.
*
*
Note that implementations may add extra conditions for selecting files or filtering groups.
+ *
+ * @deprecated since 1.8.0, will be removed in 1.9.0; use {@link SizeBasedFileRewritePlanner} and
+ * {@link FileRewriteExecutor}.
*/
+@Deprecated
public abstract class SizeBasedFileRewriter, F extends ContentFile>
implements FileRewriter {
diff --git a/core/src/main/java/org/apache/iceberg/actions/SizeBasedPositionDeletesRewriter.java b/core/src/main/java/org/apache/iceberg/actions/SizeBasedPositionDeletesRewriter.java
index c08a31a731f4..60f37b79d24c 100644
--- a/core/src/main/java/org/apache/iceberg/actions/SizeBasedPositionDeletesRewriter.java
+++ b/core/src/main/java/org/apache/iceberg/actions/SizeBasedPositionDeletesRewriter.java
@@ -26,6 +26,11 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.util.PropertyUtil;
+/**
+ * @deprecated since 1.8.0, will be removed in 1.9.0; use {@link RewritePositionDeletesGroupPlanner}
+ * and {@link FileRewriteExecutor}.
+ */
+@Deprecated
public abstract class SizeBasedPositionDeletesRewriter
extends SizeBasedFileRewriter {
diff --git a/core/src/test/java/org/apache/iceberg/actions/TestRewriteFileGroupPlanner.java b/core/src/test/java/org/apache/iceberg/actions/TestRewriteFileGroupPlanner.java
new file mode 100644
index 000000000000..48991cd601b7
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/actions/TestRewriteFileGroupPlanner.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.apache.iceberg.actions.RewriteDataFiles.REWRITE_JOB_ORDER;
+import static org.apache.iceberg.actions.RewriteFileGroupPlanner.MAX_FILE_SIZE_DEFAULT_RATIO;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MockFileScanTask;
+import org.apache.iceberg.RewriteJobOrder;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TestBase;
+import org.apache.iceberg.TestTables;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+class TestRewriteFileGroupPlanner {
+ private static final Map REWRITE_ALL =
+ ImmutableMap.of(RewriteFileGroupPlanner.REWRITE_ALL, "true");
+
+ private static final DataFile FILE_1 = newDataFile("data_bucket=0", 10);
+ private static final DataFile FILE_2 = newDataFile("data_bucket=0", 10);
+ private static final DataFile FILE_3 = newDataFile("data_bucket=0", 10);
+ private static final DataFile FILE_4 = newDataFile("data_bucket=1", 11);
+ private static final DataFile FILE_5 = newDataFile("data_bucket=1", 11);
+ private static final DataFile FILE_6 = newDataFile("data_bucket=2", 50);
+
+ private static final Map> EXPECTED =
+ ImmutableMap.of(
+ RewriteJobOrder.FILES_DESC,
+ ImmutableList.of(FILE_1.partition(), FILE_4.partition(), FILE_6.partition()),
+ RewriteJobOrder.FILES_ASC,
+ ImmutableList.of(FILE_6.partition(), FILE_4.partition(), FILE_1.partition()),
+ RewriteJobOrder.BYTES_DESC,
+ ImmutableList.of(FILE_6.partition(), FILE_1.partition(), FILE_4.partition()),
+ RewriteJobOrder.BYTES_ASC,
+ ImmutableList.of(FILE_4.partition(), FILE_1.partition(), FILE_6.partition()));
+
+ @TempDir private File tableDir = null;
+ private TestTables.TestTable table = null;
+
+ @BeforeEach
+ public void setupTable() throws Exception {
+ this.table = TestTables.create(tableDir, "test", TestBase.SCHEMA, TestBase.SPEC, 3);
+ }
+
+ @AfterEach
+ public void cleanupTables() {
+ TestTables.clearTables();
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = RewriteJobOrder.class,
+ names = {"FILES_DESC", "FILES_ASC", "BYTES_DESC", "BYTES_ASC"})
+ void testJobOrder(RewriteJobOrder order) {
+ addFiles();
+ RewriteFileGroupPlanner planner = new RewriteFileGroupPlanner(table);
+ planner.init(
+ ImmutableMap.of(
+ RewriteFileGroupPlanner.REWRITE_ALL, "true", REWRITE_JOB_ORDER, order.name()));
+ RewriteFilePlan result = planner.plan();
+ List groups = result.groups().collect(Collectors.toList());
+ assertThat(groups.stream().map(group -> group.info().partition()).collect(Collectors.toList()))
+ .isEqualTo(EXPECTED.get(order));
+ assertThat(result.totalGroupCount()).isEqualTo(3);
+ EXPECTED.get(order).forEach(s -> assertThat(result.groupsInPartition(s)).isEqualTo(1));
+ }
+
+ @Test
+ void testUnpartitionedTable() {
+ table.updateSpec().removeField("data_bucket").commit();
+ table.refresh();
+
+ table
+ .newAppend()
+ .appendFile(newDataFile("", 10))
+ .appendFile(newDataFile("", 20))
+ .appendFile(newDataFile("", 30))
+ .commit();
+ RewriteFileGroupPlanner planner = new RewriteFileGroupPlanner(table);
+ planner.init(
+ ImmutableMap.of(
+ RewriteFileGroupPlanner.MIN_INPUT_FILES,
+ "1",
+ RewriteFileGroupPlanner.MIN_FILE_SIZE_BYTES,
+ "30"));
+ RewriteFilePlan result = planner.plan();
+ assertThat(result.totalGroupCount()).isEqualTo(1);
+ assertThat(result.groups().iterator().next().numInputFiles()).isEqualTo(2);
+ }
+
+ @Test
+ void testMaxGroupSize() {
+ addFiles();
+ RewriteFileGroupPlanner planner = new RewriteFileGroupPlanner(table);
+ planner.init(
+ ImmutableMap.of(
+ RewriteFileGroupPlanner.REWRITE_ALL,
+ "true",
+ RewriteFileGroupPlanner.MAX_FILE_GROUP_SIZE_BYTES,
+ "10"));
+ RewriteFilePlan result = planner.plan();
+ assertThat(result.totalGroupCount()).isEqualTo(6);
+ assertThat(result.groupsInPartition(FILE_1.partition())).isEqualTo(3);
+ assertThat(result.groupsInPartition(FILE_4.partition())).isEqualTo(2);
+ assertThat(result.groupsInPartition(FILE_6.partition())).isEqualTo(1);
+ }
+
+ @Test
+ void testEmptyTable() {
+ RewriteFileGroupPlanner planner = new RewriteFileGroupPlanner(table);
+
+ planner.init(REWRITE_ALL);
+
+ RewriteFilePlan result = planner.plan();
+
+ assertThat(table.currentSnapshot()).as("Table must be empty").isNull();
+ assertThat(result.totalGroupCount()).isZero();
+ }
+
+ @Test
+ void testFilter() {
+ addFiles();
+ RewriteFileGroupPlanner planner =
+ new RewriteFileGroupPlanner(
+ table,
+ Expressions.or(
+ Expressions.equal(Expressions.bucket("data", 16), 0),
+ Expressions.equal(Expressions.bucket("data", 16), 2)));
+ planner.init(REWRITE_ALL);
+ RewriteFilePlan plan = planner.plan();
+ List groups = plan.groups().collect(Collectors.toList());
+
+ assertThat(plan.totalGroupCount()).isEqualTo(2);
+ assertThat(groups).hasSize(2);
+ assertThat(groups.stream().mapToLong(FileRewriteGroup::numInputFiles).sum()).isEqualTo(4);
+ }
+
+ @Test
+ void testWriteMaxFileSize() {
+ int targetFileSize = 10;
+ addFiles();
+
+ RewriteFileGroupPlanner planner = new RewriteFileGroupPlanner(table);
+ planner.init(
+ ImmutableMap.of(
+ RewriteFileGroupPlanner.REWRITE_ALL,
+ "true",
+ RewriteFileGroupPlanner.TARGET_FILE_SIZE_BYTES,
+ String.valueOf(targetFileSize)));
+ RewriteFilePlan plan = planner.plan();
+ assertThat(plan.writeMaxFileSize())
+ .isGreaterThan(targetFileSize)
+ .isLessThan((long) (targetFileSize * MAX_FILE_SIZE_DEFAULT_RATIO));
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testOutputSpec(boolean specific) {
+ addFiles();
+
+ int oldSpecId = table.spec().specId();
+ table.updateSpec().removeField("data_bucket").commit();
+ table.newAppend().appendFile(newDataFile("", 10)).commit();
+ table.refresh();
+ int newSpecId = table.spec().specId();
+
+ RewriteFileGroupPlanner planner = new RewriteFileGroupPlanner(table);
+
+ Map options = Maps.newHashMap(REWRITE_ALL);
+ if (specific) {
+ options.put(RewriteDataFiles.OUTPUT_SPEC_ID, String.valueOf(oldSpecId));
+ }
+
+ planner.init(options);
+
+ RewriteFilePlan plan = planner.plan();
+ assertThat(plan.outputSpecId()).isEqualTo(specific ? oldSpecId : newSpecId);
+ }
+
+ @Test
+ void testValidOptions() {
+ RewriteFileGroupPlanner planner = new RewriteFileGroupPlanner(table);
+
+ assertThat(planner.validOptions())
+ .as("Planner must report all supported options")
+ .isEqualTo(
+ ImmutableSet.of(
+ RewriteFileGroupPlanner.TARGET_FILE_SIZE_BYTES,
+ RewriteFileGroupPlanner.MIN_FILE_SIZE_BYTES,
+ RewriteFileGroupPlanner.MAX_FILE_SIZE_BYTES,
+ RewriteFileGroupPlanner.MIN_INPUT_FILES,
+ RewriteFileGroupPlanner.REWRITE_ALL,
+ RewriteFileGroupPlanner.MAX_FILE_GROUP_SIZE_BYTES,
+ RewriteFileGroupPlanner.DELETE_FILE_THRESHOLD,
+ RewriteDataFiles.REWRITE_JOB_ORDER));
+ }
+
+ @Test
+ void testInvalidOption() {
+ RewriteFileGroupPlanner planner = new RewriteFileGroupPlanner(table);
+
+ Map invalidRewriteJobOrderOptions =
+ ImmutableMap.of(RewriteDataFiles.REWRITE_JOB_ORDER, "foo");
+ assertThatThrownBy(() -> planner.init(invalidRewriteJobOrderOptions))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid rewrite job order name: foo");
+
+ Map invalidOutputSpecIdOptions =
+ ImmutableMap.of(RewriteDataFiles.OUTPUT_SPEC_ID, String.valueOf(1234));
+ assertThatThrownBy(() -> planner.init(invalidOutputSpecIdOptions))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage(
+ "Cannot use output spec id 1234 because the table does not contain a reference to this spec-id.");
+
+ Map invalidDeleteFileThresholdOptions =
+ ImmutableMap.of(RewriteFileGroupPlanner.DELETE_FILE_THRESHOLD, "-1");
+ assertThatThrownBy(() -> planner.init(invalidDeleteFileThresholdOptions))
+ .hasMessageContaining("'delete-file-threshold' is set to -1 but must be >= 0");
+ }
+
+ @Test
+ void testBinPackDataSelectFiles() {
+ RewriteFileGroupPlanner planner = new RewriteFileGroupPlanner(table);
+
+ checkDataFileSizeFiltering(planner);
+ checkDataFilesDeleteThreshold(planner);
+ checkDataFileGroupWithEnoughFiles(planner);
+ checkDataFileGroupWithEnoughData(planner);
+ checkDataFileGroupWithTooMuchData(planner);
+ }
+
+ private void checkDataFileSizeFiltering(RewriteFileGroupPlanner planner) {
+ FileScanTask tooSmallTask = new MockFileScanTask(100L);
+ FileScanTask optimal = new MockFileScanTask(450);
+ FileScanTask tooBigTask = new MockFileScanTask(1000L);
+ List tasks = ImmutableList.of(tooSmallTask, optimal, tooBigTask);
+
+ Map options =
+ ImmutableMap.of(
+ RewriteFileGroupPlanner.MIN_FILE_SIZE_BYTES, "250",
+ RewriteFileGroupPlanner.TARGET_FILE_SIZE_BYTES, "500",
+ RewriteFileGroupPlanner.MAX_FILE_SIZE_BYTES, "750",
+ RewriteFileGroupPlanner.DELETE_FILE_THRESHOLD, String.valueOf(Integer.MAX_VALUE));
+ planner.init(options);
+
+ Iterable> groups = planner.planFileGroups(tasks);
+ assertThat(groups).as("Must have 1 group").hasSize(1);
+ List group = Iterables.getOnlyElement(groups);
+ assertThat(group).as("Must rewrite 2 files").hasSize(2);
+ }
+
+ private void checkDataFilesDeleteThreshold(RewriteFileGroupPlanner planner) {
+ FileScanTask tooManyDeletesTask = MockFileScanTask.mockTaskWithDeletes(1000L, 3);
+ FileScanTask optimalTask = MockFileScanTask.mockTaskWithDeletes(1000L, 1);
+ List tasks = ImmutableList.of(tooManyDeletesTask, optimalTask);
+
+ Map options =
+ ImmutableMap.of(
+ RewriteFileGroupPlanner.MIN_FILE_SIZE_BYTES, "1",
+ RewriteFileGroupPlanner.TARGET_FILE_SIZE_BYTES, "2000",
+ RewriteFileGroupPlanner.MAX_FILE_SIZE_BYTES, "5000",
+ RewriteFileGroupPlanner.DELETE_FILE_THRESHOLD, "2");
+ planner.init(options);
+
+ Iterable> groups = planner.planFileGroups(tasks);
+ assertThat(groups).as("Must have 1 group").hasSize(1);
+ List group = Iterables.getOnlyElement(groups);
+ assertThat(group).as("Must rewrite 1 file").hasSize(1);
+ }
+
+ private void checkDataFileGroupWithEnoughFiles(RewriteFileGroupPlanner planner) {
+ List tasks =
+ ImmutableList.of(
+ new MockFileScanTask(100L),
+ new MockFileScanTask(100L),
+ new MockFileScanTask(100L),
+ new MockFileScanTask(100L));
+
+ Map options =
+ ImmutableMap.of(
+ RewriteFileGroupPlanner.MIN_INPUT_FILES, "3",
+ RewriteFileGroupPlanner.MIN_FILE_SIZE_BYTES, "150",
+ RewriteFileGroupPlanner.TARGET_FILE_SIZE_BYTES, "1000",
+ RewriteFileGroupPlanner.MAX_FILE_SIZE_BYTES, "5000",
+ RewriteFileGroupPlanner.DELETE_FILE_THRESHOLD, String.valueOf(Integer.MAX_VALUE));
+ planner.init(options);
+
+ Iterable> groups = planner.planFileGroups(tasks);
+ assertThat(groups).as("Must have 1 group").hasSize(1);
+ List group = Iterables.getOnlyElement(groups);
+ assertThat(group).as("Must rewrite 4 files").hasSize(4);
+ }
+
+ private void checkDataFileGroupWithEnoughData(RewriteFileGroupPlanner planner) {
+ List tasks =
+ ImmutableList.of(
+ new MockFileScanTask(100L), new MockFileScanTask(100L), new MockFileScanTask(100L));
+
+ Map options =
+ ImmutableMap.of(
+ RewriteFileGroupPlanner.MIN_INPUT_FILES, "5",
+ RewriteFileGroupPlanner.MIN_FILE_SIZE_BYTES, "200",
+ RewriteFileGroupPlanner.TARGET_FILE_SIZE_BYTES, "250",
+ RewriteFileGroupPlanner.MAX_FILE_SIZE_BYTES, "500",
+ RewriteFileGroupPlanner.DELETE_FILE_THRESHOLD, String.valueOf(Integer.MAX_VALUE));
+ planner.init(options);
+
+ Iterable> groups = planner.planFileGroups(tasks);
+ assertThat(groups).as("Must have 1 group").hasSize(1);
+ List group = Iterables.getOnlyElement(groups);
+ assertThat(group).as("Must rewrite 3 files").hasSize(3);
+ }
+
+ private void checkDataFileGroupWithTooMuchData(RewriteFileGroupPlanner planner) {
+ List tasks = ImmutableList.of(new MockFileScanTask(2000L));
+
+ Map options =
+ ImmutableMap.of(
+ RewriteFileGroupPlanner.MIN_INPUT_FILES, "5",
+ RewriteFileGroupPlanner.MIN_FILE_SIZE_BYTES, "200",
+ RewriteFileGroupPlanner.TARGET_FILE_SIZE_BYTES, "250",
+ RewriteFileGroupPlanner.MAX_FILE_SIZE_BYTES, "500",
+ RewriteFileGroupPlanner.DELETE_FILE_THRESHOLD, String.valueOf(Integer.MAX_VALUE));
+ planner.init(options);
+
+ Iterable> groups = planner.planFileGroups(tasks);
+ assertThat(groups).as("Must have 1 group").hasSize(1);
+ List group = Iterables.getOnlyElement(groups);
+ assertThat(group).as("Must rewrite big file").hasSize(1);
+ }
+
+ private void addFiles() {
+ table
+ .newAppend()
+ .appendFile(FILE_1)
+ .appendFile(FILE_2)
+ .appendFile(FILE_3)
+ .appendFile(FILE_4)
+ .appendFile(FILE_5)
+ .appendFile(FILE_6)
+ .commit();
+ }
+
+ private static DataFile newDataFile(String partitionPath, long fileSize) {
+ return DataFiles.builder(TestBase.SPEC)
+ .withPath("/path/to/data-" + UUID.randomUUID() + ".parquet")
+ .withFileSizeInBytes(fileSize)
+ .withPartitionPath(partitionPath)
+ .withRecordCount(1)
+ .build();
+ }
+}
diff --git a/core/src/test/java/org/apache/iceberg/actions/TestRewritePositionDeletesGroupPlanner.java b/core/src/test/java/org/apache/iceberg/actions/TestRewritePositionDeletesGroupPlanner.java
new file mode 100644
index 000000000000..07858706d9ee
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/actions/TestRewritePositionDeletesGroupPlanner.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.apache.iceberg.actions.RewritePositionDeleteFiles.REWRITE_JOB_ORDER;
+import static org.apache.iceberg.actions.RewritePositionDeletesGroupPlanner.MAX_FILE_SIZE_DEFAULT_RATIO;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileMetadata;
+import org.apache.iceberg.PartitionData;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RewriteJobOrder;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.TestBase;
+import org.apache.iceberg.TestTables;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+
+class TestRewritePositionDeletesGroupPlanner {
+ private static final Map REWRITE_ALL =
+ ImmutableMap.of(RewritePositionDeletesGroupPlanner.REWRITE_ALL, "true");
+
+ private static final DataFile FILE_1 = newDataFile("data_bucket=0");
+ private static final DataFile FILE_2 = newDataFile("data_bucket=1");
+ private static final DataFile FILE_3 = newDataFile("data_bucket=2");
+ private static final Map> EXPECTED =
+ ImmutableMap.of(
+ RewriteJobOrder.FILES_DESC,
+ ImmutableList.of(FILE_1.partition(), FILE_2.partition(), FILE_3.partition()),
+ RewriteJobOrder.FILES_ASC,
+ ImmutableList.of(FILE_3.partition(), FILE_2.partition(), FILE_1.partition()),
+ RewriteJobOrder.BYTES_DESC,
+ ImmutableList.of(FILE_3.partition(), FILE_1.partition(), FILE_2.partition()),
+ RewriteJobOrder.BYTES_ASC,
+ ImmutableList.of(FILE_2.partition(), FILE_1.partition(), FILE_3.partition()));
+
+ @TempDir private File tableDir = null;
+ private TestTables.TestTable table = null;
+
+ @BeforeEach
+ public void setupTable() throws Exception {
+ this.table = TestTables.create(tableDir, "test", TestBase.SCHEMA, TestBase.SPEC, 2);
+ }
+
+ @AfterEach
+ public void cleanupTables() {
+ TestTables.clearTables();
+ }
+
+ @ParameterizedTest
+ @EnumSource(
+ value = RewriteJobOrder.class,
+ names = {"FILES_DESC", "FILES_ASC", "BYTES_DESC", "BYTES_ASC"})
+ void testJobOrder(RewriteJobOrder order) {
+ addFiles();
+ RewritePositionDeletesGroupPlanner planner = new RewritePositionDeletesGroupPlanner(table);
+ planner.init(
+ ImmutableMap.of(
+ RewriteFileGroupPlanner.REWRITE_ALL, "true", REWRITE_JOB_ORDER, order.name()));
+ RewritePositionDeletePlan result = planner.plan();
+ List groups = result.groups().collect(Collectors.toList());
+ assertThat(
+ groups.stream()
+ .map(
+ group ->
+ new PartitionData(TestBase.SPEC.partitionType())
+ .copyFor(group.info().partition()))
+ .collect(Collectors.toList()))
+ .isEqualTo(EXPECTED.get(order));
+ assertThat(result.totalGroupCount()).isEqualTo(3);
+ EXPECTED.get(order).forEach(s -> assertThat(result.groupsInPartition(s)).isEqualTo(1));
+ }
+
+ @Test
+ void testUnpartitionedTable() {
+ table.updateSpec().removeField("data_bucket").commit();
+ table.refresh();
+
+ table
+ .newRowDelta()
+ .addRows(newDataFile(""))
+ .addDeletes(newDeleteFile(10))
+ .addDeletes(newDeleteFile(20))
+ .addDeletes(newDeleteFile(30))
+ .commit();
+
+ RewritePositionDeletesGroupPlanner planner = new RewritePositionDeletesGroupPlanner(table);
+ planner.init(
+ ImmutableMap.of(
+ RewriteFileGroupPlanner.MIN_INPUT_FILES,
+ "1",
+ RewriteFileGroupPlanner.MIN_FILE_SIZE_BYTES,
+ "30"));
+ RewritePositionDeletePlan result = planner.plan();
+ assertThat(result.totalGroupCount()).isEqualTo(1);
+ assertThat(result.groups().iterator().next().numInputFiles()).isEqualTo(2);
+ }
+
+ @Test
+ void testMaxGroupSize() {
+ addFiles();
+ RewritePositionDeletesGroupPlanner planner = new RewritePositionDeletesGroupPlanner(table);
+ planner.init(
+ ImmutableMap.of(
+ RewritePositionDeletesGroupPlanner.REWRITE_ALL,
+ "true",
+ RewritePositionDeletesGroupPlanner.MAX_FILE_GROUP_SIZE_BYTES,
+ "10"));
+ RewritePositionDeletePlan result = planner.plan();
+ assertThat(result.totalGroupCount()).isEqualTo(6);
+ assertThat(result.groupsInPartition(FILE_1.partition())).isEqualTo(3);
+ assertThat(result.groupsInPartition(FILE_2.partition())).isEqualTo(2);
+ assertThat(result.groupsInPartition(FILE_3.partition())).isEqualTo(1);
+ }
+
+ @Test
+ void testEmptyTable() {
+ RewritePositionDeletesGroupPlanner planner = new RewritePositionDeletesGroupPlanner(table);
+
+ planner.init(REWRITE_ALL);
+
+ RewritePositionDeletePlan result = planner.plan();
+
+ assertThat(table.currentSnapshot()).as("Table must be empty").isNull();
+ assertThat(result.totalGroupCount()).isZero();
+ }
+
+ @Test
+ void testFilter() {
+ addFiles();
+ RewritePositionDeletesGroupPlanner planner =
+ new RewritePositionDeletesGroupPlanner(
+ table,
+ Expressions.or(
+ Expressions.equal(Expressions.bucket("data", 16), 0),
+ Expressions.equal(Expressions.bucket("data", 16), 2)),
+ false);
+ planner.init(REWRITE_ALL);
+ RewritePositionDeletePlan plan = planner.plan();
+ List groups = plan.groups().collect(Collectors.toList());
+
+ assertThat(plan.totalGroupCount()).isEqualTo(2);
+ assertThat(groups).hasSize(2);
+ assertThat(groups.stream().mapToLong(FileRewriteGroup::numInputFiles).sum()).isEqualTo(4);
+ }
+
+ @Test
+ void testWriteMaxFileSize() {
+ int targetFileSize = 10;
+ addFiles();
+
+ RewritePositionDeletesGroupPlanner planner = new RewritePositionDeletesGroupPlanner(table);
+ planner.init(
+ ImmutableMap.of(
+ RewritePositionDeletesGroupPlanner.REWRITE_ALL,
+ "true",
+ RewritePositionDeletesGroupPlanner.TARGET_FILE_SIZE_BYTES,
+ String.valueOf(targetFileSize)));
+ RewritePositionDeletePlan plan = planner.plan();
+ assertThat(plan.writeMaxFileSize())
+ .isGreaterThan(targetFileSize)
+ .isLessThan((long) (targetFileSize * MAX_FILE_SIZE_DEFAULT_RATIO));
+ }
+
+ @Test
+ void testValidOptions() {
+ RewritePositionDeletesGroupPlanner planner = new RewritePositionDeletesGroupPlanner(table);
+
+ assertThat(planner.validOptions())
+ .as("Planner must report all supported options")
+ .isEqualTo(
+ ImmutableSet.of(
+ RewritePositionDeletesGroupPlanner.TARGET_FILE_SIZE_BYTES,
+ RewritePositionDeletesGroupPlanner.MIN_FILE_SIZE_BYTES,
+ RewritePositionDeletesGroupPlanner.MAX_FILE_SIZE_BYTES,
+ RewritePositionDeletesGroupPlanner.MIN_INPUT_FILES,
+ RewritePositionDeletesGroupPlanner.REWRITE_ALL,
+ RewritePositionDeletesGroupPlanner.MAX_FILE_GROUP_SIZE_BYTES,
+ RewriteDataFiles.REWRITE_JOB_ORDER));
+ }
+
+ @Test
+ void testInvalidOption() {
+ RewritePositionDeletesGroupPlanner planner = new RewritePositionDeletesGroupPlanner(table);
+
+ Map invalidRewriteJobOrderOptions =
+ ImmutableMap.of(RewritePositionDeleteFiles.REWRITE_JOB_ORDER, "foo");
+ assertThatThrownBy(() -> planner.init(invalidRewriteJobOrderOptions))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Invalid rewrite job order name: foo");
+ }
+
+ private void addFiles() {
+ table
+ .newRowDelta()
+ .addRows(FILE_1)
+ .addDeletes(newDeleteFile(FILE_1.partition(), 10))
+ .addDeletes(newDeleteFile(FILE_1.partition(), 10))
+ .addDeletes(newDeleteFile(FILE_1.partition(), 10))
+ .addRows(FILE_2)
+ .addDeletes(newDeleteFile(FILE_2.partition(), 11))
+ .addDeletes(newDeleteFile(FILE_2.partition(), 11))
+ .addRows(FILE_3)
+ .addDeletes(newDeleteFile(FILE_3.partition(), 50))
+ .commit();
+ }
+
+ private static DataFile newDataFile(String partitionPath) {
+ return DataFiles.builder(TestBase.SPEC)
+ .withPath("/path/to/data-" + UUID.randomUUID() + ".parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath(partitionPath)
+ .withRecordCount(1)
+ .build();
+ }
+
+ private static DeleteFile newDeleteFile(long fileSize) {
+ return newDeleteFile(
+ new PartitionData(PartitionSpec.unpartitioned().partitionType()), fileSize);
+ }
+
+ private static DeleteFile newDeleteFile(StructLike partition, long fileSize) {
+ return FileMetadata.deleteFileBuilder(TestBase.SPEC)
+ .ofPositionDeletes()
+ .withPath("/path/to/delete-" + UUID.randomUUID() + ".parquet")
+ .withFileSizeInBytes(fileSize)
+ .withPartition(partition)
+ .withRecordCount(1)
+ .build();
+ }
+}
diff --git a/core/src/test/java/org/apache/iceberg/actions/TestSizeBasedFileRewritePlanner.java b/core/src/test/java/org/apache/iceberg/actions/TestSizeBasedFileRewritePlanner.java
new file mode 100644
index 000000000000..43ea307e11ff
--- /dev/null
+++ b/core/src/test/java/org/apache/iceberg/actions/TestSizeBasedFileRewritePlanner.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.actions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.MockFileScanTask;
+import org.apache.iceberg.StructLike;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TestBase;
+import org.apache.iceberg.TestTables;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mockito;
+
+class TestSizeBasedFileRewritePlanner {
+ @TempDir private File tableDir = null;
+ private TestTables.TestTable table = null;
+
+ @BeforeEach
+ public void setupTable() throws Exception {
+ this.table = TestTables.create(tableDir, "test", TestBase.SCHEMA, TestBase.SPEC, 3);
+ }
+
+ @AfterEach
+ public void cleanupTables() {
+ TestTables.clearTables();
+ }
+
+ @Test
+ void testSplitSizeLowerBound() {
+ FileScanTask task1 = new MockFileScanTask(mockDataFile());
+ FileScanTask task2 = new MockFileScanTask(mockDataFile());
+ FileScanTask task3 = new MockFileScanTask(mockDataFile());
+ FileScanTask task4 = new MockFileScanTask(mockDataFile());
+ List tasks = ImmutableList.of(task1, task2, task3, task4);
+
+ TestingPlanner planner = new TestingPlanner(table);
+
+ long minFileSize = 256L * 1024 * 1024;
+ long targetFileSize = 512L * 1024 * 1024;
+ long maxFileSize = 768L * 1024 * 1024;
+
+ Map options =
+ ImmutableMap.of(
+ RewriteFileGroupPlanner.MIN_FILE_SIZE_BYTES, String.valueOf(minFileSize),
+ RewriteFileGroupPlanner.TARGET_FILE_SIZE_BYTES, String.valueOf(targetFileSize),
+ RewriteFileGroupPlanner.MAX_FILE_SIZE_BYTES, String.valueOf(maxFileSize));
+ planner.init(options);
+
+ // the total task size is 580 MB and the target file size is 512 MB
+ // the remainder must be written into a separate file as it exceeds 10%
+ List> groups = Lists.newArrayList(planner.planFileGroups(tasks).iterator());
+
+ assertThat(groups).hasSize(1);
+
+ List group = groups.get(0);
+ // the split size must be >= targetFileSize and < maxFileSize
+ long splitSize = group.stream().mapToLong(FileScanTask::sizeBytes).sum();
+ assertThat(splitSize).isGreaterThanOrEqualTo(targetFileSize).isLessThan(maxFileSize);
+ }
+
+ @Test
+ void testValidOptions() {
+ TestingPlanner planner = new TestingPlanner(table);
+
+ assertThat(planner.validOptions())
+ .as("Planner must report all supported options")
+ .isEqualTo(
+ ImmutableSet.of(
+ RewriteFileGroupPlanner.TARGET_FILE_SIZE_BYTES,
+ RewriteFileGroupPlanner.MIN_FILE_SIZE_BYTES,
+ RewriteFileGroupPlanner.MAX_FILE_SIZE_BYTES,
+ RewriteFileGroupPlanner.MIN_INPUT_FILES,
+ RewriteFileGroupPlanner.REWRITE_ALL,
+ RewriteFileGroupPlanner.MAX_FILE_GROUP_SIZE_BYTES));
+ }
+
+ @Test
+ void testInvalidOption() {
+ TestingPlanner planner = new TestingPlanner(table);
+
+ Map invalidTargetSizeOptions =
+ ImmutableMap.of(SizeBasedFileRewritePlanner.TARGET_FILE_SIZE_BYTES, "0");
+ assertThatThrownBy(() -> planner.init(invalidTargetSizeOptions))
+ .hasMessageContaining("'target-file-size-bytes' is set to 0 but must be > 0");
+
+ Map invalidMinSizeOptions =
+ ImmutableMap.of(SizeBasedFileRewritePlanner.MIN_FILE_SIZE_BYTES, "-1");
+ assertThatThrownBy(() -> planner.init(invalidMinSizeOptions))
+ .hasMessageContaining("'min-file-size-bytes' is set to -1 but must be >= 0");
+
+ Map invalidTargetMinSizeOptions =
+ ImmutableMap.of(
+ SizeBasedFileRewritePlanner.TARGET_FILE_SIZE_BYTES, "3",
+ SizeBasedFileRewritePlanner.MIN_FILE_SIZE_BYTES, "5");
+ assertThatThrownBy(() -> planner.init(invalidTargetMinSizeOptions))
+ .hasMessageContaining("'target-file-size-bytes' (3) must be > 'min-file-size-bytes' (5)")
+ .hasMessageContaining("all new files will be smaller than the min threshold");
+
+ Map invalidTargetMaxSizeOptions =
+ ImmutableMap.of(
+ SizeBasedFileRewritePlanner.TARGET_FILE_SIZE_BYTES, "5",
+ SizeBasedFileRewritePlanner.MAX_FILE_SIZE_BYTES, "3");
+ assertThatThrownBy(() -> planner.init(invalidTargetMaxSizeOptions))
+ .hasMessageContaining("'target-file-size-bytes' (5) must be < 'max-file-size-bytes' (3)")
+ .hasMessageContaining("all new files will be larger than the max threshold");
+
+ Map invalidMinInputFilesOptions =
+ ImmutableMap.of(SizeBasedFileRewritePlanner.MIN_INPUT_FILES, "0");
+ assertThatThrownBy(() -> planner.init(invalidMinInputFilesOptions))
+ .hasMessageContaining("'min-input-files' is set to 0 but must be > 0");
+
+ Map invalidMaxFileGroupSizeOptions =
+ ImmutableMap.of(SizeBasedFileRewritePlanner.MAX_FILE_GROUP_SIZE_BYTES, "0");
+ assertThatThrownBy(() -> planner.init(invalidMaxFileGroupSizeOptions))
+ .hasMessageContaining("'max-file-group-size-bytes' is set to 0 but must be > 0");
+ }
+
+ private static class TestingPlanner
+ extends SizeBasedFileRewritePlanner<
+ RewriteDataFiles.FileGroupInfo, FileScanTask, DataFile, RewriteFileGroup> {
+ protected TestingPlanner(Table table) {
+ super(table);
+ }
+
+ @Override
+ protected long defaultTargetFileSize() {
+ return TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+ }
+
+ @Override
+ protected Iterable filterFiles(Iterable tasks) {
+ return tasks;
+ }
+
+ @Override
+ protected Iterable> filterFileGroups(List> groups) {
+ return groups;
+ }
+
+ @Override
+ public FileRewritePlan
+ plan() {
+ throw new UnsupportedOperationException("Not supported");
+ }
+ }
+
+ private DataFile mockDataFile() {
+ DataFile file = Mockito.mock(DataFile.class);
+ when(file.partition()).thenReturn(Mockito.mock(StructLike.class));
+ when(file.fileSizeInBytes()).thenReturn(145L * 1024 * 1024);
+ return file;
+ }
+}
diff --git a/core/src/test/java/org/apache/iceberg/actions/TestSizeBasedRewriter.java b/core/src/test/java/org/apache/iceberg/actions/TestSizeBasedRewriter.java
deleted file mode 100644
index 77d16d3bc821..000000000000
--- a/core/src/test/java/org/apache/iceberg/actions/TestSizeBasedRewriter.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iceberg.actions;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.FileScanTask;
-import org.apache.iceberg.MockFileScanTask;
-import org.apache.iceberg.ParameterizedTestExtension;
-import org.apache.iceberg.Parameters;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.TestBase;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
-import org.junit.jupiter.api.TestTemplate;
-import org.junit.jupiter.api.extension.ExtendWith;
-
-@ExtendWith(ParameterizedTestExtension.class)
-public class TestSizeBasedRewriter extends TestBase {
-
- @Parameters(name = "formatVersion = {0}")
- protected static List