diff --git a/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java b/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java index 785f5c3ea3f8..8e0c0b01dd90 100644 --- a/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java +++ b/core/src/main/java/org/apache/iceberg/actions/BinPackStrategy.java @@ -44,7 +44,10 @@ * RewriteDataFiles#MAX_FILE_GROUP_SIZE_BYTES}. Groups will be considered for rewriting if they * contain more files than {@link #MIN_INPUT_FILES} or would produce at least one file of {@link * RewriteDataFiles#TARGET_FILE_SIZE_BYTES}. + * + * @deprecated since 1.3.0, will be removed in 1.4.0; use {@link SizeBasedFileRewriter} instead. */ +@Deprecated public abstract class BinPackStrategy implements RewriteStrategy { private static final Logger LOG = LoggerFactory.getLogger(BinPackStrategy.class); diff --git a/core/src/main/java/org/apache/iceberg/actions/FileRewriter.java b/core/src/main/java/org/apache/iceberg/actions/FileRewriter.java new file mode 100644 index 000000000000..7c6b4e8d7ef5 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/actions/FileRewriter.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.actions; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.ContentScanTask; + +/** + * A class for rewriting content files. + * + *

The entire rewrite operation is broken down into pieces based on partitioning, and size-based + * groups within a partition. These subunits of the rewrite are referred to as file groups. A file + * group will be processed by a single framework "action". For example, in Spark this means that + * each group would be rewritten in its own Spark job. + * + * @param the Java type of tasks to read content files + * @param the Java type of content files + */ +public interface FileRewriter, F extends ContentFile> { + + /** Returns a description for this rewriter. */ + default String description() { + return getClass().getName(); + } + + /** + * Returns a set of supported options for this rewriter. Only options specified in this list will + * be accepted at runtime. Any other options will be rejected. + */ + Set validOptions(); + + /** + * Initializes this rewriter using provided options. + * + * @param options options to initialize this rewriter + */ + void init(Map options); + + /** + * Selects files which this rewriter believes are valid targets to be rewritten based on their + * scan tasks and groups those scan tasks into file groups. The file groups are then rewritten in + * a single executable unit, such as a Spark job. + * + * @param tasks an iterable of scan task for files in a partition + * @return groups of scan tasks for files to be rewritten in a single executable unit + */ + Iterable> planFileGroups(Iterable tasks); + + /** + * Rewrite a group of files represented by the given list of scan tasks. + * + *

The implementation is supposed to be engine-specific (e.g. Spark, Flink, Trino). + * + * @param group a group of scan tasks for files to be rewritten together + * @return a set of newly written files + */ + Set rewrite(List group); +} diff --git a/core/src/main/java/org/apache/iceberg/actions/RewriteStrategy.java b/core/src/main/java/org/apache/iceberg/actions/RewriteStrategy.java index 36fc7247528e..d3a450ddfb93 100644 --- a/core/src/main/java/org/apache/iceberg/actions/RewriteStrategy.java +++ b/core/src/main/java/org/apache/iceberg/actions/RewriteStrategy.java @@ -26,6 +26,12 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.Table; +/** + * A strategy for rewriting files. + * + * @deprecated since 1.3.0, will be removed in 1.4.0; use {@link FileRewriter} instead. + */ +@Deprecated public interface RewriteStrategy extends Serializable { /** Returns the name of this rewrite strategy */ String name(); diff --git a/core/src/main/java/org/apache/iceberg/actions/SizeBasedDataRewriter.java b/core/src/main/java/org/apache/iceberg/actions/SizeBasedDataRewriter.java new file mode 100644 index 000000000000..e5b5908804e7 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/actions/SizeBasedDataRewriter.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.actions; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.util.PropertyUtil; + +public abstract class SizeBasedDataRewriter extends SizeBasedFileRewriter { + + /** + * The minimum number of deletes that needs to be associated with a data file for it to be + * considered for rewriting. If a data file has this number of deletes or more, it will be + * rewritten regardless of its file size determined by {@link #MIN_FILE_SIZE_BYTES} and {@link + * #MAX_FILE_SIZE_BYTES}. If a file group contains a file that satisfies this condition, the file + * group will be rewritten regardless of the number of files in the file group determined by + * {@link #MIN_INPUT_FILES}. + * + *

Defaults to Integer.MAX_VALUE, which means this feature is not enabled by default. + */ + public static final String DELETE_FILE_THRESHOLD = "delete-file-threshold"; + + public static final int DELETE_FILE_THRESHOLD_DEFAULT = Integer.MAX_VALUE; + + private int deleteFileThreshold; + + protected SizeBasedDataRewriter(Table table) { + super(table); + } + + @Override + public Set validOptions() { + return ImmutableSet.builder() + .addAll(super.validOptions()) + .add(DELETE_FILE_THRESHOLD) + .build(); + } + + @Override + public void init(Map options) { + super.init(options); + this.deleteFileThreshold = deleteFileThreshold(options); + } + + @Override + protected Iterable filterFiles(Iterable tasks) { + return Iterables.filter(tasks, task -> wronglySized(task) || tooManyDeletes(task)); + } + + private boolean tooManyDeletes(FileScanTask task) { + return task.deletes() != null && task.deletes().size() >= deleteFileThreshold; + } + + @Override + protected Iterable> filterFileGroups(List> groups) { + return Iterables.filter(groups, this::shouldRewrite); + } + + private boolean shouldRewrite(List group) { + return enoughInputFiles(group) + || enoughContent(group) + || tooMuchContent(group) + || anyTaskHasTooManyDeletes(group); + } + + private boolean anyTaskHasTooManyDeletes(List group) { + return group.stream().anyMatch(this::tooManyDeletes); + } + + @Override + protected long defaultTargetFileSize() { + return PropertyUtil.propertyAsLong( + table().properties(), + TableProperties.WRITE_TARGET_FILE_SIZE_BYTES, + TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT); + } + + private int deleteFileThreshold(Map options) { + int value = + PropertyUtil.propertyAsInt(options, DELETE_FILE_THRESHOLD, DELETE_FILE_THRESHOLD_DEFAULT); + Preconditions.checkArgument( + value >= 0, "'%s' is set to %s but must be >= 0", DELETE_FILE_THRESHOLD, value); + return value; + } +} diff --git a/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java b/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java new file mode 100644 index 000000000000..6c6880eff670 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/actions/SizeBasedFileRewriter.java @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.actions; + +import java.math.RoundingMode; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.ContentScanTask; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.math.LongMath; +import org.apache.iceberg.util.BinPacking; +import org.apache.iceberg.util.PropertyUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A file rewriter that determines which files to rewrite based on their size. + * + *

If files are smaller than the {@link #MIN_FILE_SIZE_BYTES} threshold or larger than the {@link + * #MAX_FILE_SIZE_BYTES} threshold, they are considered targets for being rewritten. + * + *

Once selected, files are grouped based on the {@link BinPacking bin-packing algorithm} into + * groups of no more than {@link #MAX_FILE_GROUP_SIZE_BYTES}. Groups will be actually rewritten if + * they contain more than {@link #MIN_INPUT_FILES} or if they would produce at least one file of + * {@link #TARGET_FILE_SIZE_BYTES}. + * + *

Note that implementations may add extra conditions for selecting files or filtering groups. + */ +public abstract class SizeBasedFileRewriter, F extends ContentFile> + implements FileRewriter { + + private static final Logger LOG = LoggerFactory.getLogger(SizeBasedFileRewriter.class); + + /** The target output file size that this file rewriter will attempt to generate. */ + public static final String TARGET_FILE_SIZE_BYTES = "target-file-size-bytes"; + + /** + * Controls which files will be considered for rewriting. Files with sizes under this threshold + * will be considered for rewriting regardless of any other criteria. + * + *

Defaults to 75% of the target file size. + */ + public static final String MIN_FILE_SIZE_BYTES = "min-file-size-bytes"; + + public static final double MIN_FILE_SIZE_DEFAULT_RATIO = 0.75; + + /** + * Controls which files will be considered for rewriting. Files with sizes above this threshold + * will be considered for rewriting regardless of any other criteria. + * + *

Defaults to 180% of the target file size. + */ + public static final String MAX_FILE_SIZE_BYTES = "max-file-size-bytes"; + + public static final double MAX_FILE_SIZE_DEFAULT_RATIO = 1.80; + + /** + * Any file group exceeding this number of files will be rewritten regardless of other criteria. + * This config ensures file groups that contain many files are compacted even if the total size of + * that group is less than the target file size. This can also be thought of as the maximum number + * of wrongly sized files that could remain in a partition after rewriting. + */ + public static final String MIN_INPUT_FILES = "min-input-files"; + + public static final int MIN_INPUT_FILES_DEFAULT = 5; + + /** Overrides other options and forces rewriting of all provided files. */ + public static final String REWRITE_ALL = "rewrite-all"; + + public static final boolean REWRITE_ALL_DEFAULT = false; + + /** + * This option controls the largest amount of data that should be rewritten in a single file + * group. It helps with breaking down the rewriting of very large partitions which may not be + * rewritable otherwise due to the resource constraints of the cluster. For example, a sort-based + * rewrite may not scale to TB-sized partitions, and those partitions need to be worked on in + * small subsections to avoid exhaustion of resources. + */ + public static final String MAX_FILE_GROUP_SIZE_BYTES = "max-file-group-size-bytes"; + + public static final long MAX_FILE_GROUP_SIZE_BYTES_DEFAULT = 100L * 1024 * 1024 * 1024; // 100 GB + + private final Table table; + private long targetFileSize; + private long minFileSize; + private long maxFileSize; + private int minInputFiles; + private boolean rewriteAll; + private long maxGroupSize; + + protected SizeBasedFileRewriter(Table table) { + this.table = table; + } + + protected abstract long defaultTargetFileSize(); + + protected abstract Iterable filterFiles(Iterable tasks); + + protected abstract Iterable> filterFileGroups(List> groups); + + protected Table table() { + return table; + } + + @Override + public Set validOptions() { + return ImmutableSet.of( + TARGET_FILE_SIZE_BYTES, + MIN_FILE_SIZE_BYTES, + MAX_FILE_SIZE_BYTES, + MIN_INPUT_FILES, + REWRITE_ALL, + MAX_FILE_GROUP_SIZE_BYTES); + } + + @Override + public void init(Map options) { + Map sizeThresholds = sizeThresholds(options); + this.targetFileSize = sizeThresholds.get(TARGET_FILE_SIZE_BYTES); + this.minFileSize = sizeThresholds.get(MIN_FILE_SIZE_BYTES); + this.maxFileSize = sizeThresholds.get(MAX_FILE_SIZE_BYTES); + + this.minInputFiles = minInputFiles(options); + this.rewriteAll = rewriteAll(options); + this.maxGroupSize = maxGroupSize(options); + + if (rewriteAll) { + LOG.info("Configured to rewrite all provided files in table {}", table.name()); + } + } + + protected boolean wronglySized(T task) { + return task.length() < minFileSize || task.length() > maxFileSize; + } + + @Override + public Iterable> planFileGroups(Iterable tasks) { + Iterable filteredTasks = rewriteAll ? tasks : filterFiles(tasks); + BinPacking.ListPacker packer = new BinPacking.ListPacker<>(maxGroupSize, 1, false); + List> groups = packer.pack(filteredTasks, ContentScanTask::length); + return rewriteAll ? groups : filterFileGroups(groups); + } + + protected boolean enoughInputFiles(List group) { + return group.size() > 1 && group.size() >= minInputFiles; + } + + protected boolean enoughContent(List group) { + return group.size() > 1 && inputSize(group) > targetFileSize; + } + + protected boolean tooMuchContent(List group) { + return inputSize(group) > maxFileSize; + } + + protected long inputSize(List group) { + return group.stream().mapToLong(ContentScanTask::length).sum(); + } + + /** + * Determines the preferable number of output files when rewriting a particular file group. + * + *

If the rewriter is handling 10.1 GB of data with a target file size of 1 GB, it could + * produce 11 files, one of which would only have 0.1 GB. This would most likely be less + * preferable to 10 files with 1.01 GB each. So this method decides whether to round up or round + * down based on what the estimated average file size will be if the remainder (0.1 GB) is + * distributed amongst other files. If the new average file size is no more than 10% greater than + * the target file size, then this method will round down when determining the number of output + * files. Otherwise, the remainder will be written into a separate file. + * + * @param inputSize a total input size for a file group + * @return the number of files this rewriter should create + */ + protected long numOutputFiles(long inputSize) { + if (inputSize < targetFileSize) { + return 1; + } + + long numFilesWithRemainder = LongMath.divide(inputSize, targetFileSize, RoundingMode.CEILING); + long numFilesWithoutRemainder = LongMath.divide(inputSize, targetFileSize, RoundingMode.FLOOR); + long avgFileSizeWithoutRemainder = inputSize / numFilesWithoutRemainder; + + if (LongMath.mod(inputSize, targetFileSize) > minFileSize) { + // the remainder file is of a valid size for this rewrite so keep it + return numFilesWithRemainder; + + } else if (avgFileSizeWithoutRemainder < Math.min(1.1 * targetFileSize, writeMaxFileSize())) { + // if the reminder is distributed amongst other files, + // the average file size will be no more than 10% bigger than the target file size + // so round down and distribute remainder amongst other files + return numFilesWithoutRemainder; + + } else { + // keep the remainder file as it is not OK to distribute it amongst other files + return numFilesWithRemainder; + } + } + + /** + * Estimates a larger max target file size than the target size used in task creation to avoid + * creating tiny remainder files. + * + *

While we create tasks that should all be smaller than our target size, there is a chance + * that the actual data will end up being larger than our target size due to various factors of + * compression, serialization, which are outside our control. If this occurs, instead of making a + * single file that is close in size to our target, we would end up producing one file of the + * target size, and then a small extra file with the remaining data. + * + *

For example, if our target is 512 MB, we may generate a rewrite task that should be 500 MB. + * When we write the data we may find we actually have to write out 530 MB. If we use the target + * size while writing, we would produce a 512 MB file and an 18 MB file. If instead we use a + * larger size estimated by this method, then we end up writing a single file. + * + * @return the target size plus one half of the distance between max and target + */ + protected long writeMaxFileSize() { + return (long) (targetFileSize + ((maxFileSize - targetFileSize) * 0.5)); + } + + private Map sizeThresholds(Map options) { + long target = + PropertyUtil.propertyAsLong(options, TARGET_FILE_SIZE_BYTES, defaultTargetFileSize()); + + long defaultMin = (long) (target * MIN_FILE_SIZE_DEFAULT_RATIO); + long min = PropertyUtil.propertyAsLong(options, MIN_FILE_SIZE_BYTES, defaultMin); + + long defaultMax = (long) (target * MAX_FILE_SIZE_DEFAULT_RATIO); + long max = PropertyUtil.propertyAsLong(options, MAX_FILE_SIZE_BYTES, defaultMax); + + Preconditions.checkArgument( + target > 0, "'%s' is set to %s but must be > 0", TARGET_FILE_SIZE_BYTES, target); + + Preconditions.checkArgument( + min >= 0, "'%s' is set to %s but must be >= 0", MIN_FILE_SIZE_BYTES, min); + + Preconditions.checkArgument( + target > min, + "'%s' (%s) must be > '%s' (%s), all new files will be smaller than the min threshold", + TARGET_FILE_SIZE_BYTES, + target, + MIN_FILE_SIZE_BYTES, + min); + + Preconditions.checkArgument( + target < max, + "'%s' (%s) must be < '%s' (%s), all new files will be larger than the max threshold", + TARGET_FILE_SIZE_BYTES, + target, + MAX_FILE_SIZE_BYTES, + max); + + Map values = Maps.newHashMap(); + + values.put(TARGET_FILE_SIZE_BYTES, target); + values.put(MIN_FILE_SIZE_BYTES, min); + values.put(MAX_FILE_SIZE_BYTES, max); + + return values; + } + + private int minInputFiles(Map options) { + int value = PropertyUtil.propertyAsInt(options, MIN_INPUT_FILES, MIN_INPUT_FILES_DEFAULT); + Preconditions.checkArgument( + value > 0, "'%s' is set to %s but must be > 0", MIN_INPUT_FILES, value); + return value; + } + + private long maxGroupSize(Map options) { + long value = + PropertyUtil.propertyAsLong( + options, MAX_FILE_GROUP_SIZE_BYTES, MAX_FILE_GROUP_SIZE_BYTES_DEFAULT); + Preconditions.checkArgument( + value > 0, "'%s' is set to %s but must be > 0", MAX_FILE_GROUP_SIZE_BYTES, value); + return value; + } + + private boolean rewriteAll(Map options) { + return PropertyUtil.propertyAsBoolean(options, REWRITE_ALL, REWRITE_ALL_DEFAULT); + } +} diff --git a/core/src/main/java/org/apache/iceberg/actions/SortStrategy.java b/core/src/main/java/org/apache/iceberg/actions/SortStrategy.java index d08f0940f5d8..59decb802066 100644 --- a/core/src/main/java/org/apache/iceberg/actions/SortStrategy.java +++ b/core/src/main/java/org/apache/iceberg/actions/SortStrategy.java @@ -37,7 +37,10 @@ * would be chosen by {@link BinPackStrategy} will be rewrite candidates. * *

In the future other algorithms for determining files to rewrite will be provided. + * + * @deprecated since 1.3.0, will be removed in 1.4.0; use {@link SizeBasedFileRewriter} instead. */ +@Deprecated public abstract class SortStrategy extends BinPackStrategy { private SortOrder sortOrder; diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java index d13e0967b638..44aca898b696 100644 --- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java @@ -390,7 +390,7 @@ public void testRewriteDataFilesWithInvalidInputs() { AssertHelpers.assertThrows( "Should reject calls with error message", IllegalArgumentException.class, - "Cannot set strategy to sort, it has already been set", + "Must use only one rewriter type (bin-pack, sort, zorder)", () -> sql( "CALL %s.system.rewrite_data_files(table => '%s', strategy => 'binpack', " @@ -401,7 +401,7 @@ public void testRewriteDataFilesWithInvalidInputs() { AssertHelpers.assertThrows( "Should reject calls with error message", IllegalArgumentException.class, - "Can't use SORT when there is no sort order", + "Cannot sort data without a valid sort order", () -> sql( "CALL %s.system.rewrite_data_files(table => '%s', strategy => 'sort')", @@ -455,7 +455,7 @@ public void testRewriteDataFilesWithInvalidInputs() { AssertHelpers.assertThrows( "Should reject calls with error message", IllegalArgumentException.class, - "Cannot find column 'col1' in table schema: " + "Cannot find column 'col1' in table schema (case sensitive = false): " + "struct<1: c1: optional int, 2: c2: optional string, 3: c3: optional string>", () -> sql( diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java index e3db8fe9dc0e..5f95ef3ed4c9 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.math.RoundingMode; +import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Map; @@ -37,13 +38,11 @@ import org.apache.iceberg.SortOrder; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; -import org.apache.iceberg.actions.BinPackStrategy; +import org.apache.iceberg.actions.FileRewriter; import org.apache.iceberg.actions.ImmutableRewriteDataFiles; import org.apache.iceberg.actions.RewriteDataFiles; import org.apache.iceberg.actions.RewriteDataFilesCommitManager; import org.apache.iceberg.actions.RewriteFileGroup; -import org.apache.iceberg.actions.RewriteStrategy; -import org.apache.iceberg.actions.SortStrategy; import org.apache.iceberg.data.GenericRecord; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.ValidationException; @@ -92,7 +91,7 @@ public class RewriteDataFilesSparkAction private boolean partialProgressEnabled; private boolean useStartingSequenceNumber; private RewriteJobOrder rewriteJobOrder; - private RewriteStrategy strategy = null; + private FileRewriter rewriter = null; RewriteDataFilesSparkAction(SparkSession spark, Table table) { super(spark.cloneSession()); @@ -109,40 +108,32 @@ protected RewriteDataFilesSparkAction self() { @Override public RewriteDataFilesSparkAction binPack() { Preconditions.checkArgument( - this.strategy == null, - "Cannot set strategy to binpack, it has already been set", - this.strategy); - this.strategy = binPackStrategy(); + rewriter == null, "Must use only one rewriter type (bin-pack, sort, zorder)"); + this.rewriter = new SparkBinPackDataRewriter(spark(), table); return this; } @Override public RewriteDataFilesSparkAction sort(SortOrder sortOrder) { Preconditions.checkArgument( - this.strategy == null, - "Cannot set strategy to sort, it has already been set to %s", - this.strategy); - this.strategy = sortStrategy().sortOrder(sortOrder); + rewriter == null, "Must use only one rewriter type (bin-pack, sort, zorder)"); + this.rewriter = new SparkSortDataRewriter(spark(), table, sortOrder); return this; } @Override public RewriteDataFilesSparkAction sort() { Preconditions.checkArgument( - this.strategy == null, - "Cannot set strategy to sort, it has already been set to %s", - this.strategy); - this.strategy = sortStrategy(); + rewriter == null, "Must use only one rewriter type (bin-pack, sort, zorder)"); + this.rewriter = new SparkSortDataRewriter(spark(), table); return this; } @Override public RewriteDataFilesSparkAction zOrder(String... columnNames) { Preconditions.checkArgument( - this.strategy == null, - "Cannot set strategy to zorder, it has already been set to %s", - this.strategy); - this.strategy = zOrderStrategy(columnNames); + rewriter == null, "Must use only one rewriter type (bin-pack, sort, zorder)"); + this.rewriter = new SparkZOrderDataRewriter(spark(), table, Arrays.asList(columnNames)); return this; } @@ -161,8 +152,8 @@ public RewriteDataFiles.Result execute() { long startingSnapshotId = table.currentSnapshot().snapshotId(); // Default to BinPack if no strategy selected - if (this.strategy == null) { - this.strategy = binPackStrategy(); + if (this.rewriter == null) { + this.rewriter = new SparkBinPackDataRewriter(spark(), table); } validateAndInitOptions(); @@ -226,9 +217,8 @@ Map>> planFileGroups(long startingSnapshotId filesByPartition.forEach( (partition, tasks) -> { - Iterable filtered = strategy.selectFilesToRewrite(tasks); - Iterable> groupedTasks = strategy.planFileGroups(filtered); - List> fileGroups = ImmutableList.copyOf(groupedTasks); + Iterable> plannedFileGroups = rewriter.planFileGroups(tasks); + List> fileGroups = ImmutableList.copyOf(plannedFileGroups); if (fileGroups.size() > 0) { fileGroupsByPartition.put(partition, fileGroups); } @@ -250,7 +240,7 @@ RewriteFileGroup rewriteFiles(RewriteExecutionContext ctx, RewriteFileGroup file Set addedFiles = withJobGroupInfo( newJobGroupInfo("REWRITE-DATA-FILES", desc), - () -> strategy.rewriteFiles(fileGroup.fileScans())); + () -> rewriter.rewrite(fileGroup.fileScans())); fileGroup.setOutputFiles(addedFiles); LOG.info("Rewrite Files Ready to be Committed - {}", desc); @@ -418,7 +408,7 @@ private Comparator rewriteGroupComparator() { } void validateAndInitOptions() { - Set validOptions = Sets.newHashSet(strategy.validOptions()); + Set validOptions = Sets.newHashSet(rewriter.validOptions()); validOptions.addAll(VALID_OPTIONS); Set invalidKeys = Sets.newHashSet(options().keySet()); @@ -426,11 +416,11 @@ void validateAndInitOptions() { Preconditions.checkArgument( invalidKeys.isEmpty(), - "Cannot use options %s, they are not supported by the action or the strategy %s", + "Cannot use options %s, they are not supported by the action or the rewriter %s", invalidKeys, - strategy.name()); + rewriter.description()); - strategy = strategy.options(options()); + rewriter.init(options()); maxConcurrentFileGroupRewrites = PropertyUtil.propertyAsInt( @@ -474,7 +464,7 @@ private String jobDesc(RewriteFileGroup group, RewriteExecutionContext ctx) { return String.format( "Rewriting %d files (%s, file group %d/%d, %s (%d/%d)) in %s", group.rewrittenFiles().size(), - strategy.name(), + rewriter.description(), group.info().globalIndex(), ctx.totalGroupCount(), partition, @@ -485,25 +475,13 @@ private String jobDesc(RewriteFileGroup group, RewriteExecutionContext ctx) { return String.format( "Rewriting %d files (%s, file group %d/%d) in %s", group.rewrittenFiles().size(), - strategy.name(), + rewriter.description(), group.info().globalIndex(), ctx.totalGroupCount(), table.name()); } } - private BinPackStrategy binPackStrategy() { - return new SparkBinPackStrategy(table, spark()); - } - - private SortStrategy sortStrategy() { - return new SparkSortStrategy(table, spark()); - } - - private SortStrategy zOrderStrategy(String... columnNames) { - return new SparkZOrderStrategy(table, spark(), Lists.newArrayList(columnNames)); - } - @VisibleForTesting static class RewriteExecutionContext { private final Map numGroupsByPartition; diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java new file mode 100644 index 000000000000..21e94ef9b4bf --- /dev/null +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackDataRewriter.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.util.List; +import org.apache.iceberg.DistributionMode; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Table; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.SparkWriteOptions; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +class SparkBinPackDataRewriter extends SparkSizeBasedDataRewriter { + + private static final long SPLIT_OVERHEAD = 5 * 1024; + + SparkBinPackDataRewriter(SparkSession spark, Table table) { + super(spark, table); + } + + @Override + public String description() { + return "BIN-PACK"; + } + + @Override + protected void doRewrite(String groupId, List group) { + // read the files packing them into splits of the required size + Dataset scanDF = + spark() + .read() + .format("iceberg") + .option(SparkReadOptions.SCAN_TASK_SET_ID, groupId) + .option(SparkReadOptions.SPLIT_SIZE, splitSize(inputSize(group))) + .option(SparkReadOptions.FILE_OPEN_COST, "0") + .load(groupId); + + // write the packed data into new files where each split becomes a new file + scanDF + .write() + .format("iceberg") + .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId) + .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize()) + .option(SparkWriteOptions.DISTRIBUTION_MODE, distributionMode(group).modeName()) + .mode("append") + .save(groupId); + } + + // invoke a shuffle if the original spec does not match the output spec + private DistributionMode distributionMode(List group) { + boolean requiresRepartition = !group.get(0).spec().equals(table().spec()); + return requiresRepartition ? DistributionMode.RANGE : DistributionMode.NONE; + } + + /** + * Returns the smallest of our max write file threshold and our estimated split size based on the + * number of output files we want to generate. Add an overhead onto the estimated split size to + * try to avoid small errors in size creating brand-new files. + */ + private long splitSize(long inputSize) { + long estimatedSplitSize = (inputSize / numOutputFiles(inputSize)) + SPLIT_OVERHEAD; + return Math.min(estimatedSplitSize, writeMaxFileSize()); + } +} diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java index 46aefd20af00..07d3210ead66 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkBinPackStrategy.java @@ -35,6 +35,12 @@ import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; +/** + * A Spark strategy to bin-pack data. + * + * @deprecated since 1.3.0, will be removed in 1.4.0; use {@link SparkBinPackDataRewriter} instead. + */ +@Deprecated public class SparkBinPackStrategy extends BinPackStrategy { private final Table table; private final SparkSession spark; diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java new file mode 100644 index 000000000000..1add6383c618 --- /dev/null +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkShufflingDataRewriter.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.spark.SparkDistributionAndOrderingUtil; +import org.apache.iceberg.spark.SparkReadOptions; +import org.apache.iceberg.spark.SparkWriteOptions; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.SortOrderUtil; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.catalyst.utils.DistributionAndOrderingUtils$; +import org.apache.spark.sql.connector.distributions.Distributions; +import org.apache.spark.sql.connector.distributions.OrderedDistribution; +import org.apache.spark.sql.connector.expressions.SortOrder; +import org.apache.spark.sql.internal.SQLConf; + +abstract class SparkShufflingDataRewriter extends SparkSizeBasedDataRewriter { + + /** + * The number of shuffle partitions and consequently the number of output files created by the + * Spark sort is based on the size of the input data files used in this file rewriter. Due to + * compression, the disk file sizes may not accurately represent the size of files in the output. + * This parameter lets the user adjust the file size used for estimating actual output data size. + * A factor greater than 1.0 would generate more files than we would expect based on the on-disk + * file size. A value less than 1.0 would create fewer files than we would expect based on the + * on-disk size. + */ + public static final String COMPRESSION_FACTOR = "compression-factor"; + + public static final double COMPRESSION_FACTOR_DEFAULT = 1.0; + + private double compressionFactor; + + protected SparkShufflingDataRewriter(SparkSession spark, Table table) { + super(spark, table); + } + + protected abstract Dataset sortedDF(Dataset df, List group); + + @Override + public Set validOptions() { + return ImmutableSet.builder() + .addAll(super.validOptions()) + .add(COMPRESSION_FACTOR) + .build(); + } + + @Override + public void init(Map options) { + super.init(options); + this.compressionFactor = compressionFactor(options); + } + + @Override + public void doRewrite(String groupId, List group) { + // the number of shuffle partition controls the number of output files + spark().conf().set(SQLConf.SHUFFLE_PARTITIONS().key(), numShufflePartitions(group)); + + Dataset scanDF = + spark() + .read() + .format("iceberg") + .option(SparkReadOptions.SCAN_TASK_SET_ID, groupId) + .load(groupId); + + Dataset sortedDF = sortedDF(scanDF, group); + + sortedDF + .write() + .format("iceberg") + .option(SparkWriteOptions.REWRITTEN_FILE_SCAN_TASK_SET_ID, groupId) + .option(SparkWriteOptions.TARGET_FILE_SIZE_BYTES, writeMaxFileSize()) + .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false") + .mode("append") + .save(groupId); + } + + protected Dataset sort(Dataset df, org.apache.iceberg.SortOrder sortOrder) { + SortOrder[] ordering = SparkDistributionAndOrderingUtil.convert(sortOrder); + OrderedDistribution distribution = Distributions.ordered(ordering); + SQLConf conf = spark().sessionState().conf(); + LogicalPlan plan = df.logicalPlan(); + LogicalPlan sortPlan = + DistributionAndOrderingUtils$.MODULE$.prepareQuery(distribution, ordering, plan, conf); + return new Dataset<>(spark(), sortPlan, df.encoder()); + } + + protected org.apache.iceberg.SortOrder outputSortOrder( + List group, org.apache.iceberg.SortOrder sortOrder) { + boolean includePartitionColumns = !group.get(0).spec().equals(table().spec()); + if (includePartitionColumns) { + // build in the requirement for partition sorting into our sort order + // as the original spec for this group does not match the output spec + return SortOrderUtil.buildSortOrder(table(), sortOrder); + } else { + return sortOrder; + } + } + + private long numShufflePartitions(List group) { + long numOutputFiles = numOutputFiles((long) (inputSize(group) * compressionFactor)); + return Math.max(1, numOutputFiles); + } + + private double compressionFactor(Map options) { + double value = + PropertyUtil.propertyAsDouble(options, COMPRESSION_FACTOR, COMPRESSION_FACTOR_DEFAULT); + Preconditions.checkArgument( + value > 0, "'%s' is set to %s but must be > 0", COMPRESSION_FACTOR, value); + return value; + } +} diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSizeBasedDataRewriter.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSizeBasedDataRewriter.java new file mode 100644 index 000000000000..d40cbbb871b3 --- /dev/null +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSizeBasedDataRewriter.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.util.List; +import java.util.Set; +import java.util.UUID; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.SizeBasedDataRewriter; +import org.apache.iceberg.spark.FileRewriteCoordinator; +import org.apache.iceberg.spark.ScanTaskSetManager; +import org.apache.iceberg.spark.SparkTableCache; +import org.apache.spark.sql.SparkSession; + +abstract class SparkSizeBasedDataRewriter extends SizeBasedDataRewriter { + + private final SparkSession spark; + private final SparkTableCache tableCache = SparkTableCache.get(); + private final ScanTaskSetManager taskSetManager = ScanTaskSetManager.get(); + private final FileRewriteCoordinator coordinator = FileRewriteCoordinator.get(); + + SparkSizeBasedDataRewriter(SparkSession spark, Table table) { + super(table); + this.spark = spark; + } + + protected abstract void doRewrite(String groupId, List group); + + protected SparkSession spark() { + return spark; + } + + @Override + public Set rewrite(List group) { + String groupId = UUID.randomUUID().toString(); + try { + tableCache.add(groupId, table()); + taskSetManager.stageTasks(table(), groupId, group); + + doRewrite(groupId, group); + + return coordinator.fetchNewDataFiles(table(), groupId); + } finally { + tableCache.remove(groupId); + taskSetManager.removeTasks(table(), groupId); + coordinator.clearRewrite(table(), groupId); + } + } +} diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortDataRewriter.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortDataRewriter.java new file mode 100644 index 000000000000..4615f3cebc92 --- /dev/null +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortDataRewriter.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.util.List; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; + +class SparkSortDataRewriter extends SparkShufflingDataRewriter { + + private final SortOrder sortOrder; + + SparkSortDataRewriter(SparkSession spark, Table table) { + super(spark, table); + Preconditions.checkArgument( + table.sortOrder().isSorted(), + "Cannot sort data without a valid sort order, table '%s' is unsorted and no sort order is provided", + table.name()); + this.sortOrder = table.sortOrder(); + } + + SparkSortDataRewriter(SparkSession spark, Table table, SortOrder sortOrder) { + super(spark, table); + Preconditions.checkArgument( + sortOrder != null && sortOrder.isSorted(), + "Cannot sort data without a valid sort order, the provided sort order is null or empty"); + this.sortOrder = sortOrder; + } + + @Override + public String description() { + return "SORT"; + } + + @Override + protected Dataset sortedDF(Dataset df, List group) { + return sort(df, outputSortOrder(group, sortOrder)); + } +} diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java index 59aafc595ac9..21e29263c925 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkSortStrategy.java @@ -47,6 +47,12 @@ import org.apache.spark.sql.connector.expressions.SortOrder; import org.apache.spark.sql.internal.SQLConf; +/** + * A Spark strategy to sort data. + * + * @deprecated since 1.3.0, will be removed in 1.4.0; use {@link SparkSortDataRewriter} instead. + */ +@Deprecated public class SparkSortStrategy extends SortStrategy { /** diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java new file mode 100644 index 000000000000..68db76d37fcb --- /dev/null +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderDataRewriter.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import static org.apache.spark.sql.functions.array; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.NullOrder; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortDirection; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkUtil; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.PropertyUtil; +import org.apache.iceberg.util.ZOrderByteUtils; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +class SparkZOrderDataRewriter extends SparkShufflingDataRewriter { + + private static final Logger LOG = LoggerFactory.getLogger(SparkZOrderDataRewriter.class); + + private static final String Z_COLUMN = "ICEZVALUE"; + private static final Schema Z_SCHEMA = + new Schema(Types.NestedField.required(0, Z_COLUMN, Types.BinaryType.get())); + private static final SortOrder Z_SORT_ORDER = + SortOrder.builderFor(Z_SCHEMA) + .sortBy(Z_COLUMN, SortDirection.ASC, NullOrder.NULLS_LAST) + .build(); + + /** + * Controls the amount of bytes interleaved in the ZOrder algorithm. Default is all bytes being + * interleaved. + */ + public static final String MAX_OUTPUT_SIZE = "max-output-size"; + + public static final int MAX_OUTPUT_SIZE_DEFAULT = Integer.MAX_VALUE; + + /** + * Controls the number of bytes considered from an input column of a type with variable length + * (String, Binary). + * + *

Default is to use the same size as primitives {@link ZOrderByteUtils#PRIMITIVE_BUFFER_SIZE}. + */ + public static final String VAR_LENGTH_CONTRIBUTION = "var-length-contribution"; + + public static final int VAR_LENGTH_CONTRIBUTION_DEFAULT = ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE; + + private final List zOrderColNames; + private int maxOutputSize; + private int varLengthContribution; + + SparkZOrderDataRewriter(SparkSession spark, Table table, List zOrderColNames) { + super(spark, table); + this.zOrderColNames = validZOrderColNames(spark, table, zOrderColNames); + } + + @Override + public String description() { + return "Z-ORDER"; + } + + @Override + public Set validOptions() { + return ImmutableSet.builder() + .addAll(super.validOptions()) + .add(MAX_OUTPUT_SIZE) + .add(VAR_LENGTH_CONTRIBUTION) + .build(); + } + + @Override + public void init(Map options) { + super.init(options); + this.maxOutputSize = maxOutputSize(options); + this.varLengthContribution = varLengthContribution(options); + } + + @Override + protected Dataset sortedDF(Dataset df, List group) { + Dataset zValueDF = df.withColumn(Z_COLUMN, zValue(df)); + Dataset sortedDF = sort(zValueDF, outputSortOrder(group, Z_SORT_ORDER)); + return sortedDF.drop(Z_COLUMN); + } + + private Column zValue(Dataset df) { + SparkZOrderUDF zOrderUDF = + new SparkZOrderUDF(zOrderColNames.size(), varLengthContribution, maxOutputSize); + + Column[] zOrderCols = + zOrderColNames.stream() + .map(df.schema()::apply) + .map(col -> zOrderUDF.sortedLexicographically(df.col(col.name()), col.dataType())) + .toArray(Column[]::new); + + return zOrderUDF.interleaveBytes(array(zOrderCols)); + } + + private int varLengthContribution(Map options) { + int value = + PropertyUtil.propertyAsInt( + options, VAR_LENGTH_CONTRIBUTION, VAR_LENGTH_CONTRIBUTION_DEFAULT); + Preconditions.checkArgument( + value > 0, + "Cannot use less than 1 byte for variable length types with ZOrder, '%s' was set to %s", + VAR_LENGTH_CONTRIBUTION, + value); + return value; + } + + private int maxOutputSize(Map options) { + int value = PropertyUtil.propertyAsInt(options, MAX_OUTPUT_SIZE, MAX_OUTPUT_SIZE_DEFAULT); + Preconditions.checkArgument( + value > 0, + "Cannot have the interleaved ZOrder value use less than 1 byte, '%s' was set to %s", + MAX_OUTPUT_SIZE, + value); + return value; + } + + private List validZOrderColNames( + SparkSession spark, Table table, List inputZOrderColNames) { + + Preconditions.checkArgument( + inputZOrderColNames != null && !inputZOrderColNames.isEmpty(), + "Cannot ZOrder when no columns are specified"); + + Schema schema = table.schema(); + Set identityPartitionFieldIds = table.spec().identitySourceIds(); + boolean caseSensitive = SparkUtil.caseSensitive(spark); + + List validZOrderColNames = Lists.newArrayList(); + + for (String colName : inputZOrderColNames) { + Types.NestedField field = + caseSensitive ? schema.findField(colName) : schema.caseInsensitiveFindField(colName); + Preconditions.checkArgument( + field != null, + "Cannot find column '%s' in table schema (case sensitive = %s): %s", + colName, + caseSensitive, + schema.asStruct()); + + if (identityPartitionFieldIds.contains(field.fieldId())) { + LOG.warn("Ignoring '{}' as such values are constant within a partition", colName); + } else { + validZOrderColNames.add(colName); + } + } + + Preconditions.checkArgument( + validZOrderColNames.size() > 0, + "Cannot ZOrder, all columns provided were identity partition columns and cannot be used"); + + return validZOrderColNames; + } +} diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java index fe04c1f4f1ef..26d2b4837b4b 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/SparkZOrderStrategy.java @@ -58,6 +58,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * A Spark strategy to zOrder data. + * + * @deprecated since 1.3.0, will be removed in 1.4.0; use {@link SparkZOrderDataRewriter} instead. + */ +@Deprecated public class SparkZOrderStrategy extends SparkSortStrategy { private static final Logger LOG = LoggerFactory.getLogger(SparkZOrderStrategy.class); diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java index b6b8b5b09d67..53d024089207 100644 --- a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java @@ -1232,23 +1232,25 @@ public void testZOrderAllTypesSort() { public void testInvalidAPIUsage() { Table table = createTable(1); + SortOrder sortOrder = SortOrder.builderFor(table.schema()).asc("c2").build(); + AssertHelpers.assertThrows( "Should be unable to set Strategy more than once", IllegalArgumentException.class, - "Cannot set strategy", + "Must use only one rewriter type", () -> actions().rewriteDataFiles(table).binPack().sort()); AssertHelpers.assertThrows( "Should be unable to set Strategy more than once", IllegalArgumentException.class, - "Cannot set strategy", - () -> actions().rewriteDataFiles(table).sort().binPack()); + "Must use only one rewriter type", + () -> actions().rewriteDataFiles(table).sort(sortOrder).binPack()); AssertHelpers.assertThrows( "Should be unable to set Strategy more than once", IllegalArgumentException.class, - "Cannot set strategy", - () -> actions().rewriteDataFiles(table).sort(SortOrder.unsorted()).binPack()); + "Must use only one rewriter type", + () -> actions().rewriteDataFiles(table).sort(sortOrder).binPack()); } @Test diff --git a/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkFileRewriter.java b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkFileRewriter.java new file mode 100644 index 000000000000..6800ffd404ea --- /dev/null +++ b/spark/v3.3/spark/src/test/java/org/apache/iceberg/spark/actions/TestSparkFileRewriter.java @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.actions; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MockFileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.SortOrder; +import org.apache.iceberg.Table; +import org.apache.iceberg.actions.SizeBasedDataRewriter; +import org.apache.iceberg.actions.SizeBasedFileRewriter; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.spark.SparkTestBase; +import org.apache.iceberg.types.Types.IntegerType; +import org.apache.iceberg.types.Types.NestedField; +import org.apache.iceberg.types.Types.StringType; +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +public class TestSparkFileRewriter extends SparkTestBase { + + private static final TableIdentifier TABLE_IDENT = TableIdentifier.of("default", "tbl"); + private static final Schema SCHEMA = + new Schema( + NestedField.required(1, "id", IntegerType.get()), + NestedField.required(2, "dep", StringType.get())); + private static final PartitionSpec SPEC = + PartitionSpec.builderFor(SCHEMA).identity("dep").build(); + private static final SortOrder SORT_ORDER = SortOrder.builderFor(SCHEMA).asc("id").build(); + + @After + public void removeTable() { + catalog.dropTable(TABLE_IDENT); + } + + @Test + public void testBinPackDataSelectFiles() { + Table table = catalog.createTable(TABLE_IDENT, SCHEMA); + SparkBinPackDataRewriter rewriter = new SparkBinPackDataRewriter(spark, table); + + checkDataFileSizeFiltering(rewriter); + checkDataFilesDeleteThreshold(rewriter); + checkDataFileGroupWithEnoughFiles(rewriter); + checkDataFileGroupWithEnoughData(rewriter); + checkDataFileGroupWithTooMuchData(rewriter); + } + + @Test + public void testSortDataSelectFiles() { + Table table = catalog.createTable(TABLE_IDENT, SCHEMA); + SparkSortDataRewriter rewriter = new SparkSortDataRewriter(spark, table, SORT_ORDER); + + checkDataFileSizeFiltering(rewriter); + checkDataFilesDeleteThreshold(rewriter); + checkDataFileGroupWithEnoughFiles(rewriter); + checkDataFileGroupWithEnoughData(rewriter); + checkDataFileGroupWithTooMuchData(rewriter); + } + + @Test + public void testZOrderDataSelectFiles() { + Table table = catalog.createTable(TABLE_IDENT, SCHEMA); + ImmutableList zOrderCols = ImmutableList.of("id"); + SparkZOrderDataRewriter rewriter = new SparkZOrderDataRewriter(spark, table, zOrderCols); + + checkDataFileSizeFiltering(rewriter); + checkDataFilesDeleteThreshold(rewriter); + checkDataFileGroupWithEnoughFiles(rewriter); + checkDataFileGroupWithEnoughData(rewriter); + checkDataFileGroupWithTooMuchData(rewriter); + } + + private void checkDataFileSizeFiltering(SizeBasedDataRewriter rewriter) { + FileScanTask tooSmallTask = new MockFileScanTask(100L); + FileScanTask optimal = new MockFileScanTask(450); + FileScanTask tooBigTask = new MockFileScanTask(1000L); + List tasks = ImmutableList.of(tooSmallTask, optimal, tooBigTask); + + Map options = + ImmutableMap.of( + SizeBasedDataRewriter.MIN_FILE_SIZE_BYTES, "250", + SizeBasedDataRewriter.TARGET_FILE_SIZE_BYTES, "500", + SizeBasedDataRewriter.MAX_FILE_SIZE_BYTES, "750", + SizeBasedDataRewriter.DELETE_FILE_THRESHOLD, String.valueOf(Integer.MAX_VALUE)); + rewriter.init(options); + + Iterable> groups = rewriter.planFileGroups(tasks); + Assert.assertEquals("Must have 1 group", 1, Iterables.size(groups)); + List group = Iterables.getOnlyElement(groups); + Assert.assertEquals("Must rewrite 2 files", 2, group.size()); + } + + private void checkDataFilesDeleteThreshold(SizeBasedDataRewriter rewriter) { + FileScanTask tooManyDeletesTask = MockFileScanTask.mockTaskWithDeletes(1000L, 3); + FileScanTask optimalTask = MockFileScanTask.mockTaskWithDeletes(1000L, 1); + List tasks = ImmutableList.of(tooManyDeletesTask, optimalTask); + + Map options = + ImmutableMap.of( + SizeBasedDataRewriter.MIN_FILE_SIZE_BYTES, "1", + SizeBasedDataRewriter.TARGET_FILE_SIZE_BYTES, "2000", + SizeBasedDataRewriter.MAX_FILE_SIZE_BYTES, "5000", + SizeBasedDataRewriter.DELETE_FILE_THRESHOLD, "2"); + rewriter.init(options); + + Iterable> groups = rewriter.planFileGroups(tasks); + Assert.assertEquals("Must have 1 group", 1, Iterables.size(groups)); + List group = Iterables.getOnlyElement(groups); + Assert.assertEquals("Must rewrite 1 file", 1, group.size()); + } + + private void checkDataFileGroupWithEnoughFiles(SizeBasedDataRewriter rewriter) { + List tasks = + ImmutableList.of( + new MockFileScanTask(100L), + new MockFileScanTask(100L), + new MockFileScanTask(100L), + new MockFileScanTask(100L)); + + Map options = + ImmutableMap.of( + SizeBasedDataRewriter.MIN_INPUT_FILES, "3", + SizeBasedDataRewriter.MIN_FILE_SIZE_BYTES, "150", + SizeBasedDataRewriter.TARGET_FILE_SIZE_BYTES, "1000", + SizeBasedDataRewriter.MAX_FILE_SIZE_BYTES, "5000", + SizeBasedDataRewriter.DELETE_FILE_THRESHOLD, String.valueOf(Integer.MAX_VALUE)); + rewriter.init(options); + + Iterable> groups = rewriter.planFileGroups(tasks); + Assert.assertEquals("Must have 1 group", 1, Iterables.size(groups)); + List group = Iterables.getOnlyElement(groups); + Assert.assertEquals("Must rewrite 4 files", 4, group.size()); + } + + private void checkDataFileGroupWithEnoughData(SizeBasedDataRewriter rewriter) { + List tasks = + ImmutableList.of( + new MockFileScanTask(100L), new MockFileScanTask(100L), new MockFileScanTask(100L)); + + Map options = + ImmutableMap.of( + SizeBasedDataRewriter.MIN_INPUT_FILES, "5", + SizeBasedDataRewriter.MIN_FILE_SIZE_BYTES, "200", + SizeBasedDataRewriter.TARGET_FILE_SIZE_BYTES, "250", + SizeBasedDataRewriter.MAX_FILE_SIZE_BYTES, "500", + SizeBasedDataRewriter.DELETE_FILE_THRESHOLD, String.valueOf(Integer.MAX_VALUE)); + rewriter.init(options); + + Iterable> groups = rewriter.planFileGroups(tasks); + Assert.assertEquals("Must have 1 group", 1, Iterables.size(groups)); + List group = Iterables.getOnlyElement(groups); + Assert.assertEquals("Must rewrite 3 files", 3, group.size()); + } + + private void checkDataFileGroupWithTooMuchData(SizeBasedDataRewriter rewriter) { + List tasks = ImmutableList.of(new MockFileScanTask(2000L)); + + Map options = + ImmutableMap.of( + SizeBasedDataRewriter.MIN_INPUT_FILES, "5", + SizeBasedDataRewriter.MIN_FILE_SIZE_BYTES, "200", + SizeBasedDataRewriter.TARGET_FILE_SIZE_BYTES, "250", + SizeBasedDataRewriter.MAX_FILE_SIZE_BYTES, "500", + SizeBasedDataRewriter.DELETE_FILE_THRESHOLD, String.valueOf(Integer.MAX_VALUE)); + rewriter.init(options); + + Iterable> groups = rewriter.planFileGroups(tasks); + Assert.assertEquals("Must have 1 group", 1, Iterables.size(groups)); + List group = Iterables.getOnlyElement(groups); + Assert.assertEquals("Must rewrite big file", 1, group.size()); + } + + @Test + public void testInvalidConstructorUsagesSortData() { + Table table = catalog.createTable(TABLE_IDENT, SCHEMA); + + Assertions.assertThatThrownBy(() -> new SparkSortDataRewriter(spark, table)) + .hasMessageContaining("Cannot sort data without a valid sort order") + .hasMessageContaining("is unsorted and no sort order is provided"); + + Assertions.assertThatThrownBy(() -> new SparkSortDataRewriter(spark, table, null)) + .hasMessageContaining("Cannot sort data without a valid sort order") + .hasMessageContaining("the provided sort order is null or empty"); + + Assertions.assertThatThrownBy( + () -> new SparkSortDataRewriter(spark, table, SortOrder.unsorted())) + .hasMessageContaining("Cannot sort data without a valid sort order") + .hasMessageContaining("the provided sort order is null or empty"); + } + + @Test + public void testInvalidConstructorUsagesZOrderData() { + Table table = catalog.createTable(TABLE_IDENT, SCHEMA, SPEC); + + Assertions.assertThatThrownBy(() -> new SparkZOrderDataRewriter(spark, table, null)) + .hasMessageContaining("Cannot ZOrder when no columns are specified"); + + Assertions.assertThatThrownBy( + () -> new SparkZOrderDataRewriter(spark, table, ImmutableList.of())) + .hasMessageContaining("Cannot ZOrder when no columns are specified"); + + Assertions.assertThatThrownBy( + () -> new SparkZOrderDataRewriter(spark, table, ImmutableList.of("dep"))) + .hasMessageContaining("Cannot ZOrder") + .hasMessageContaining("all columns provided were identity partition columns"); + + Assertions.assertThatThrownBy( + () -> new SparkZOrderDataRewriter(spark, table, ImmutableList.of("DeP"))) + .hasMessageContaining("Cannot ZOrder") + .hasMessageContaining("all columns provided were identity partition columns"); + } + + @Test + public void testBinPackDataValidOptions() { + Table table = catalog.createTable(TABLE_IDENT, SCHEMA); + SparkBinPackDataRewriter rewriter = new SparkBinPackDataRewriter(spark, table); + + Assert.assertEquals( + "Rewriter must report all supported options", + ImmutableSet.of( + SparkBinPackDataRewriter.TARGET_FILE_SIZE_BYTES, + SparkBinPackDataRewriter.MIN_FILE_SIZE_BYTES, + SparkBinPackDataRewriter.MAX_FILE_SIZE_BYTES, + SparkBinPackDataRewriter.MIN_INPUT_FILES, + SparkBinPackDataRewriter.REWRITE_ALL, + SparkBinPackDataRewriter.MAX_FILE_GROUP_SIZE_BYTES, + SparkBinPackDataRewriter.DELETE_FILE_THRESHOLD), + rewriter.validOptions()); + } + + @Test + public void testSortDataValidOptions() { + Table table = catalog.createTable(TABLE_IDENT, SCHEMA); + SparkSortDataRewriter rewriter = new SparkSortDataRewriter(spark, table, SORT_ORDER); + + Assert.assertEquals( + "Rewriter must report all supported options", + ImmutableSet.of( + SparkSortDataRewriter.TARGET_FILE_SIZE_BYTES, + SparkSortDataRewriter.MIN_FILE_SIZE_BYTES, + SparkSortDataRewriter.MAX_FILE_SIZE_BYTES, + SparkSortDataRewriter.MIN_INPUT_FILES, + SparkSortDataRewriter.REWRITE_ALL, + SparkSortDataRewriter.MAX_FILE_GROUP_SIZE_BYTES, + SparkSortDataRewriter.DELETE_FILE_THRESHOLD, + SparkSortDataRewriter.COMPRESSION_FACTOR), + rewriter.validOptions()); + } + + @Test + public void testZOrderDataValidOptions() { + Table table = catalog.createTable(TABLE_IDENT, SCHEMA); + ImmutableList zOrderCols = ImmutableList.of("id"); + SparkZOrderDataRewriter rewriter = new SparkZOrderDataRewriter(spark, table, zOrderCols); + + Assert.assertEquals( + "Rewriter must report all supported options", + ImmutableSet.of( + SparkZOrderDataRewriter.TARGET_FILE_SIZE_BYTES, + SparkZOrderDataRewriter.MIN_FILE_SIZE_BYTES, + SparkZOrderDataRewriter.MAX_FILE_SIZE_BYTES, + SparkZOrderDataRewriter.MIN_INPUT_FILES, + SparkZOrderDataRewriter.REWRITE_ALL, + SparkZOrderDataRewriter.MAX_FILE_GROUP_SIZE_BYTES, + SparkZOrderDataRewriter.DELETE_FILE_THRESHOLD, + SparkZOrderDataRewriter.COMPRESSION_FACTOR, + SparkZOrderDataRewriter.MAX_OUTPUT_SIZE, + SparkZOrderDataRewriter.VAR_LENGTH_CONTRIBUTION), + rewriter.validOptions()); + } + + @Test + public void testInvalidValuesForBinPackDataOptions() { + Table table = catalog.createTable(TABLE_IDENT, SCHEMA); + SparkBinPackDataRewriter rewriter = new SparkBinPackDataRewriter(spark, table); + + validateSizeBasedRewriterOptions(rewriter); + + Map invalidDeleteThresholdOptions = + ImmutableMap.of(SizeBasedDataRewriter.DELETE_FILE_THRESHOLD, "-1"); + Assertions.assertThatThrownBy(() -> rewriter.init(invalidDeleteThresholdOptions)) + .hasMessageContaining("'delete-file-threshold' is set to -1 but must be >= 0"); + } + + @Test + public void testInvalidValuesForSortDataOptions() { + Table table = catalog.createTable(TABLE_IDENT, SCHEMA); + SparkSortDataRewriter rewriter = new SparkSortDataRewriter(spark, table, SORT_ORDER); + + validateSizeBasedRewriterOptions(rewriter); + + Map invalidDeleteThresholdOptions = + ImmutableMap.of(SizeBasedDataRewriter.DELETE_FILE_THRESHOLD, "-1"); + Assertions.assertThatThrownBy(() -> rewriter.init(invalidDeleteThresholdOptions)) + .hasMessageContaining("'delete-file-threshold' is set to -1 but must be >= 0"); + + Map invalidCompressionFactorOptions = + ImmutableMap.of(SparkShufflingDataRewriter.COMPRESSION_FACTOR, "0"); + Assertions.assertThatThrownBy(() -> rewriter.init(invalidCompressionFactorOptions)) + .hasMessageContaining("'compression-factor' is set to 0.0 but must be > 0"); + } + + @Test + public void testInvalidValuesForZOrderDataOptions() { + Table table = catalog.createTable(TABLE_IDENT, SCHEMA); + ImmutableList zOrderCols = ImmutableList.of("id"); + SparkZOrderDataRewriter rewriter = new SparkZOrderDataRewriter(spark, table, zOrderCols); + + validateSizeBasedRewriterOptions(rewriter); + + Map invalidDeleteThresholdOptions = + ImmutableMap.of(SizeBasedDataRewriter.DELETE_FILE_THRESHOLD, "-1"); + Assertions.assertThatThrownBy(() -> rewriter.init(invalidDeleteThresholdOptions)) + .hasMessageContaining("'delete-file-threshold' is set to -1 but must be >= 0"); + + Map invalidCompressionFactorOptions = + ImmutableMap.of(SparkShufflingDataRewriter.COMPRESSION_FACTOR, "0"); + Assertions.assertThatThrownBy(() -> rewriter.init(invalidCompressionFactorOptions)) + .hasMessageContaining("'compression-factor' is set to 0.0 but must be > 0"); + + Map invalidMaxOutputOptions = + ImmutableMap.of(SparkZOrderDataRewriter.MAX_OUTPUT_SIZE, "0"); + Assertions.assertThatThrownBy(() -> rewriter.init(invalidMaxOutputOptions)) + .hasMessageContaining("Cannot have the interleaved ZOrder value use less than 1 byte") + .hasMessageContaining("'max-output-size' was set to 0"); + + Map invalidVarLengthContributionOptions = + ImmutableMap.of(SparkZOrderDataRewriter.VAR_LENGTH_CONTRIBUTION, "0"); + Assertions.assertThatThrownBy(() -> rewriter.init(invalidVarLengthContributionOptions)) + .hasMessageContaining("Cannot use less than 1 byte for variable length types with ZOrder") + .hasMessageContaining("'var-length-contribution' was set to 0"); + } + + private void validateSizeBasedRewriterOptions(SizeBasedFileRewriter rewriter) { + Map invalidTargetSizeOptions = + ImmutableMap.of(SizeBasedFileRewriter.TARGET_FILE_SIZE_BYTES, "0"); + Assertions.assertThatThrownBy(() -> rewriter.init(invalidTargetSizeOptions)) + .hasMessageContaining("'target-file-size-bytes' is set to 0 but must be > 0"); + + Map invalidMinSizeOptions = + ImmutableMap.of(SizeBasedFileRewriter.MIN_FILE_SIZE_BYTES, "-1"); + Assertions.assertThatThrownBy(() -> rewriter.init(invalidMinSizeOptions)) + .hasMessageContaining("'min-file-size-bytes' is set to -1 but must be >= 0"); + + Map invalidTargetMinSizeOptions = + ImmutableMap.of( + SizeBasedFileRewriter.TARGET_FILE_SIZE_BYTES, "3", + SizeBasedFileRewriter.MIN_FILE_SIZE_BYTES, "5"); + Assertions.assertThatThrownBy(() -> rewriter.init(invalidTargetMinSizeOptions)) + .hasMessageContaining("'target-file-size-bytes' (3) must be > 'min-file-size-bytes' (5)") + .hasMessageContaining("all new files will be smaller than the min threshold"); + + Map invalidTargetMaxSizeOptions = + ImmutableMap.of( + SizeBasedFileRewriter.TARGET_FILE_SIZE_BYTES, "5", + SizeBasedFileRewriter.MAX_FILE_SIZE_BYTES, "3"); + Assertions.assertThatThrownBy(() -> rewriter.init(invalidTargetMaxSizeOptions)) + .hasMessageContaining("'target-file-size-bytes' (5) must be < 'max-file-size-bytes' (3)") + .hasMessageContaining("all new files will be larger than the max threshold"); + + Map invalidMinInputFilesOptions = + ImmutableMap.of(SizeBasedFileRewriter.MIN_INPUT_FILES, "0"); + Assertions.assertThatThrownBy(() -> rewriter.init(invalidMinInputFilesOptions)) + .hasMessageContaining("'min-input-files' is set to 0 but must be > 0"); + + Map invalidMaxFileGroupSizeOptions = + ImmutableMap.of(SizeBasedFileRewriter.MAX_FILE_GROUP_SIZE_BYTES, "0"); + Assertions.assertThatThrownBy(() -> rewriter.init(invalidMaxFileGroupSizeOptions)) + .hasMessageContaining("'max-file-group-size-bytes' is set to 0 but must be > 0"); + } +}