Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@
* RewriteDataFiles#MAX_FILE_GROUP_SIZE_BYTES}. Groups will be considered for rewriting if they
* contain more files than {@link #MIN_INPUT_FILES} or would produce at least one file of {@link
* RewriteDataFiles#TARGET_FILE_SIZE_BYTES}.
*
* @deprecated since 1.3.0, will be removed in 1.4.0; use {@link SizeBasedFileRewriter} instead.
*/
@Deprecated
public abstract class BinPackStrategy implements RewriteStrategy {

private static final Logger LOG = LoggerFactory.getLogger(BinPackStrategy.class);
Expand Down
77 changes: 77 additions & 0 deletions core/src/main/java/org/apache/iceberg/actions/FileRewriter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.actions;

import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.ContentScanTask;

/**
* A class for rewriting content files.
*
* <p>The entire rewrite operation is broken down into pieces based on partitioning, and size-based
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@szehon-ho, moved some of those detailed comments here instead of per options.

* groups within a partition. These subunits of the rewrite are referred to as file groups. A file
* group will be processed by a single framework "action". For example, in Spark this means that
* each group would be rewritten in its own Spark job.
*
* @param <T> the Java type of tasks to read content files
* @param <F> the Java type of content files
*/
public interface FileRewriter<T extends ContentScanTask<F>, F extends ContentFile<F>> {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created a separate hierarchy because I did not follow the exact API.

  • I replaced RewriteStrategy options(map) with void init(map). In order to support proper method chaining with inheritance, we need to parametrize the strategy and rewriter with ThisT, like we do in Scan, for instance. That was not done and method chaining was partially broken in existing strategies. To simplify the API, I went for the approach we use in catalogs and FileIO and added void init(map) instead.
  • I did not make the new interface serializable as our strategies are not used in a distributed fashion. They can’t be serialized in their current form too (they have non-serializable fields). If we want to support that functionality in the future, we may add it later. We should either mark rewriters serializable and support that or don’t do that.


/** Returns a description for this rewriter. */
default String description() {
return getClass().getName();
}

/**
* Returns a set of supported options for this rewriter. Only options specified in this list will
* be accepted at runtime. Any other options will be rejected.
*/
Set<String> validOptions();

/**
* Initializes this rewriter using provided options.
*
* @param options options to initialize this rewriter
*/
void init(Map<String, String> options);

/**
* Selects files which this rewriter believes are valid targets to be rewritten based on their
* scan tasks and groups those scan tasks into file groups. The file groups are then rewritten in
* a single executable unit, such as a Spark job.
*
* @param tasks an iterable of scan task for files in a partition
* @return groups of scan tasks for files to be rewritten in a single executable unit
*/
Iterable<List<T>> planFileGroups(Iterable<T> tasks);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: do you think 'plan' is necessary, or can we call it 'groupFiles'

Also, it wasnt immediately clear, that not all tasks need to in a returned group. I think we can document it. Another option, going with this approach (having this interface define methods that filter/aggregate files), would it make sense to make selectGroups into a separate API for clarity?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me think. Maybe, we should have separate methods for planning and filtering.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switched to a single planFileGroups, as discussed offline.


/**
* Rewrite a group of files represented by the given list of scan tasks.
*
* <p>The implementation is supposed to be engine-specific (e.g. Spark, Flink, Trino).
*
* @param group a group of scan tasks for files to be rewritten together
* @return a set of newly written files
*/
Set<F> rewrite(List<T> group);
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Table;

/**
* A strategy for rewriting files.
*
* @deprecated since 1.3.0, will be removed in 1.4.0; use {@link FileRewriter} instead.
*/
@Deprecated
public interface RewriteStrategy extends Serializable {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I decided not to migrate our existing Spark strategies as they were public. Also, we have more information about all types of rewrites so we can structure code a bit differently.

/** Returns the name of this rewrite strategy */
String name();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.actions;

import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.util.PropertyUtil;

public abstract class SizeBasedDataRewriter extends SizeBasedFileRewriter<FileScanTask, DataFile> {

/**
* The minimum number of deletes that needs to be associated with a data file for it to be
* considered for rewriting. If a data file has this number of deletes or more, it will be
* rewritten regardless of its file size determined by {@link #MIN_FILE_SIZE_BYTES} and {@link
* #MAX_FILE_SIZE_BYTES}. If a file group contains a file that satisfies this condition, the file
* group will be rewritten regardless of the number of files in the file group determined by
* {@link #MIN_INPUT_FILES}.
*
* <p>Defaults to Integer.MAX_VALUE, which means this feature is not enabled by default.
*/
public static final String DELETE_FILE_THRESHOLD = "delete-file-threshold";

public static final int DELETE_FILE_THRESHOLD_DEFAULT = Integer.MAX_VALUE;

private int deleteFileThreshold;

protected SizeBasedDataRewriter(Table table) {
super(table);
}

@Override
public Set<String> validOptions() {
return ImmutableSet.<String>builder()
.addAll(super.validOptions())
.add(DELETE_FILE_THRESHOLD)
.build();
}

@Override
public void init(Map<String, String> options) {
super.init(options);
this.deleteFileThreshold = deleteFileThreshold(options);
}

@Override
protected Iterable<FileScanTask> filterFiles(Iterable<FileScanTask> tasks) {
return Iterables.filter(tasks, task -> wronglySized(task) || tooManyDeletes(task));
}

private boolean tooManyDeletes(FileScanTask task) {
return task.deletes() != null && task.deletes().size() >= deleteFileThreshold;
}

@Override
protected Iterable<List<FileScanTask>> filterFileGroups(List<List<FileScanTask>> groups) {
return Iterables.filter(groups, this::shouldRewrite);
}

private boolean shouldRewrite(List<FileScanTask> group) {
return enoughInputFiles(group)
|| enoughContent(group)
|| tooMuchContent(group)
|| anyTaskHasTooManyDeletes(group);
}

private boolean anyTaskHasTooManyDeletes(List<FileScanTask> group) {
return group.stream().anyMatch(this::tooManyDeletes);
}

@Override
protected long defaultTargetFileSize() {
return PropertyUtil.propertyAsLong(
table().properties(),
TableProperties.WRITE_TARGET_FILE_SIZE_BYTES,
TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
}

private int deleteFileThreshold(Map<String, String> options) {
int value =
PropertyUtil.propertyAsInt(options, DELETE_FILE_THRESHOLD, DELETE_FILE_THRESHOLD_DEFAULT);
Preconditions.checkArgument(
value >= 0, "'%s' is set to %s but must be >= 0", DELETE_FILE_THRESHOLD, value);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aokolnychyi Why do we allow the delete-file-threshold to be 0 here? Is there a meaningful use case?

return value;
}
}
Loading