Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.actions;

import java.util.Map;
import java.util.Set;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.ContentScanTask;

/**
* A class for rewriting content file groups ({@link FileRewriteGroup}). The lifecycle for the
* executor looks like the following:
*
* <ul>
* <li>{@link #init(Map)} initializes the executor with the configuration parameters
* <li>{@link #initPlan(FileRewritePlan)} initializes the executor with the configuration
* calculated during planning ({@link FileRewritePlan#writeMaxFileSize()}, {@link
* RewriteFilePlan#outputSpecId()}
* <li>{@link #rewrite(FileRewriteGroup)} called for every group in the plan to do the actual
* rewrite of the files, and returns the generated new files.
* </ul>
*
* A single executor could be used to rewrite multiple groups for the same plan.
*
* @param <I> the Java type of the plan info
* @param <T> the Java type of the tasks to read content files
* @param <F> the Java type of the content files
* @param <G> the Java type of the planned groups
* @param <P> the Java type of the plan to execute
*/
public interface FileRewriteExecutor<
I,
T extends ContentScanTask<F>,
F extends ContentFile<F>,
G extends FileRewriteGroup<I, T, F>,
P extends FileRewritePlan<I, T, F, G>> {

/** Returns a description for this rewriter. */
default String description() {
return getClass().getName();
}

/**
* Returns a set of supported options for this rewriter. Only options specified in this list will
* be accepted at runtime. Any other options will be rejected.
*/
Set<String> validOptions();

/**
* Initializes this rewriter using provided options.
*
* @param options options to initialize this rewriter
*/
void init(Map<String, String> options);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not clear from the definition here why initPlan and init are both required. It is also not clear when they are called or if they are both called etc ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add javadoc.

  • init is called when the executor is initialized
  • initPlan is called when the plan has been generated and the parameters needed for the executor is calculated (writeMaxFileSize, outputSpecId)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored the javadoc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also found it confusing. Can the init and initPlan be combined into one method?

Alternatively, can the plan be passed in via the rewrite method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without modifying the current Spark behavior we can not merge the init and the initPlan.
The init is called when the executor is created and based only on the user provided parameters, the initPlan is used to set parameters calculated during the planning phase.

I have posted an email to the dev list: https://lists.apache.org/thread/6lj2jn3dbvqjscc96w0mc32bhxq0qfqv
There I have asked a question about the "Data organization". If we decide on the 2nd answer "group should contain every information" then we don't need the initPlan anymore.


/**
* Initializes the rewriter using the information generated during planning.
*
* @param plan containing the configuration data
*/
void initPlan(P plan);

/**
* Rewrite a group of files represented by the given list of scan tasks.
*
* <p>The implementation is supposed to be engine-specific (e.g. Spark, Flink, Trino).
*
* @param group of scan tasks for files to be rewritten together
* @return a set of newly written files
*/
Set<F> rewrite(G group);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.actions;

import java.util.Comparator;
import java.util.List;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.ContentScanTask;
import org.apache.iceberg.RewriteJobOrder;

/**
* Container class representing a set of files to be rewritten by a {@link FileRewriteExecutor}.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem to be the case since it also has

  /** Expected split size for the output files. */
  public long splitSize() {
    return splitSize;
  }

  /** Expected number of the output files. */
  public int expectedOutputFiles() {
    return expectedOutputFiles;
  }

The first one "splitSize" feels like it's global to a lot of FileRewriteGroups and not a particular property of 1 RewriteGroup, so may that belongs in Plan

"estimatedOutputFiles" could live here but maybe just makes sense as
estimatedOutputFiles(splitSize), where split size is passed in from elsewhere?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first one "splitSize" feels like it's global to a lot of FileRewriteGroups and not a particular property of 1 RewriteGroup, so may that belongs in Plan

By my understanding currently this could be different for different groups in the same plan. The splitSize(long inputSize) and the numOutputFiles(long inputSize) is dependent on the input size, and big part of that is dependent on the configuration of the planner (see: #9069)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current code makes sure that the exact same compaction result is generated than with the previous algorithm.

In the old code we had:

  • Plan level
    • writeMaxFileSize - which governs the target file size when rewriting (SparkWriteOptions.TARGET_FILE_SIZE_BYTES)
  • Group level
    • splitSize - which governs the input split size when rewriting (SparkReadOptions.SPLIT_SIZE, splitSize)]
    • expectedOutputFiles - which governs the shuffle partition numbers for shuffling rewriters

In the new code I have mirrored this.

I think the new code would be much cleaner if we put all of this information into the group level (we could get rid of the initPlan step)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think basically anything having to do with "output" should be part of "Plan" or "Execute" depending. Like in this case we basically are just using these properties because they make it easy to control spark's behavior

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me apply this to the current situation, so we can check if I understand correctly:

  • We should keep splitSize in the group, as this drives the reading
  • We should move expectedOutputFiles to the plan, as it is driving the output

Is this the change that you are suggesting?

I see 3 possible separation of concerns:

  1. Plan is the 'result' - everything below that is only organized based on the multiplicity of the data. So if some value applies to every group, then that value belongs to the 'global' plan variables. If a value is different for every group, then that value belongs to the group (current code)
  2. Plan is the write config, group is the read config. If I understand correctly, this is what you are suggesting. IMHO this is a bit awkward, as currently the groups are part of the plan. Maybe we could have a readConfig, writeConfig map in the plan instead of adding extra attributes to the plan and to the groups. This comes with a cost of extra parsing for the configs, but allows us more flexibility (fewer classes)
  3. The group should contain every information which is required for a single job. So the job (executor) only receives a single group and every other bit of information is global. The drawback is that some information is duplicated, but cleaner on the executor side.

Your thoughts?

*
* @param <I> the Java type of the plan info
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very ambiguous to me, and it's not clear what I should be.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are the RewriteDataFiles.FileGroupInfo and RewritePositionDeleteFiles.FileGroupInfo

* @param <T> the Java type of the tasks to read content files
* @param <F> the Java type of the content files
*/
public abstract class FileRewriteGroup<I, T extends ContentScanTask<F>, F extends ContentFile<F>> {
private final I info;
private final List<T> fileScanTasks;
private final long splitSize;
private final int expectedOutputFiles;

FileRewriteGroup(I info, List<T> fileScanTasks, long splitSize, int expectedOutputFiles) {
this.info = info;
this.fileScanTasks = fileScanTasks;
this.splitSize = splitSize;
this.expectedOutputFiles = expectedOutputFiles;
}

/** Identifiers and partition information about the group. */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't this just a structlike? What other class types might we be passing through here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is RewriteDataFiles.FileGroupInfo and RewritePositionDeleteFiles.FileGroupInfo
These are inherited from the old code

public I info() {
return info;
}

/** Input of the group. {@link ContentScanTask}s to read. */
public List<T> fileScans() {
return fileScanTasks;
}

/** Expected split size for the output files. */
public long splitSize() {
return splitSize;
}

/** Expected number of the output files. */
public int expectedOutputFiles() {
return expectedOutputFiles;
}

/** Accumulated size for the input files. */
public long sizeInBytes() {
return fileScanTasks.stream().mapToLong(T::length).sum();
}

/** Number of the input files. */
public int numInputFiles() {
return fileScanTasks.size();
}

/** Comparator to order the FileRewriteGroups based on a provided {@link RewriteJobOrder}. */
public static <I, T extends ContentScanTask<F>, F extends ContentFile<F>>
Comparator<FileRewriteGroup<I, T, F>> taskComparator(RewriteJobOrder rewriteJobOrder) {
switch (rewriteJobOrder) {
case BYTES_ASC:
return Comparator.comparing(FileRewriteGroup::sizeInBytes);
case BYTES_DESC:
return Comparator.comparing(FileRewriteGroup::sizeInBytes, Comparator.reverseOrder());
case FILES_ASC:
return Comparator.comparing(FileRewriteGroup::numInputFiles);
case FILES_DESC:
return Comparator.comparing(FileRewriteGroup::numInputFiles, Comparator.reverseOrder());
default:
return (unused, unused2) -> 0;
}
}
}
83 changes: 83 additions & 0 deletions core/src/main/java/org/apache/iceberg/actions/FileRewritePlan.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.actions;

import java.util.Map;
import java.util.stream.Stream;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.ContentScanTask;
import org.apache.iceberg.StructLike;

/**
* Result of the file rewrite planning as generated by the {@link FileRewritePlanner#plan()}.
*
* <p>The plan contains the stream of the planned groups and statistics about the number of the
* generated groups, like the total number of the groups and the groups per partition. The plan also
* contains some calculated values required by the {@link FileRewriteExecutor}s where the values are
* based on the input data and the planning parameters.
*
* <p>Groups in a plan could be processed independently. For example, in Spark this means that each
* group would be rewritten in its own Spark job.
*
* @param <I> the Java type of the plan info
* @param <T> the Java type of the tasks to read content files
* @param <F> the Java type of the content files
* @param <G> the Java type of the planned groups
*/
public abstract class FileRewritePlan<
I,
T extends ContentScanTask<F>,
F extends ContentFile<F>,
G extends FileRewriteGroup<I, T, F>> {
private final Stream<G> groups;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently the groups are calculated in advance to allow sorting, so instead of a stream we could provide a Collection or Set.
Kept it as Stream as the previous iteration of the code used stream, but I would prefer Collection

private final int totalGroupCount;
private final Map<StructLike, Integer> groupsInPartition;
private final long writeMaxFileSize;

protected FileRewritePlan(
Stream<G> groups,
int totalGroupCount,
Map<StructLike, Integer> groupsInPartition,
long writeMaxFileSize) {
this.groups = groups;
this.totalGroupCount = totalGroupCount;
this.groupsInPartition = groupsInPartition;
this.writeMaxFileSize = writeMaxFileSize;
}

/** The stream of the generated {@link FileRewriteGroup}s. */
public Stream<G> groups() {
return groups;
}

/** The number of the generated groups in the given partition. */
public int groupsInPartition(StructLike partition) {
return groupsInPartition.get(partition);
}

/** The total number of the groups generated by this plan. */
public int totalGroupCount() {
return totalGroupCount;
}

/** Calculated maximum file size based on the planner target file size configuration */
public long writeMaxFileSize() {
return writeMaxFileSize;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.actions;

import java.util.Map;
import java.util.Set;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.ContentScanTask;

/**
* A class for planning content file rewrites.
*
* <p>The entire rewrite operation is broken down into pieces. The grouping is based on partitioning
* and the planning could create multiple groups within a partition. As a result {@link
* FileRewritePlan} is generated which contains the data need by the {@link FileRewriteExecutor}s
* which execute the actual file rewrite.
*
* <p>The lifecycle of the planner is:
*
* <ul>
* <li>{@link #init(Map)} initializes the planner with the configuration parameters
* <li>{@link #plan()} generates the plan for the given configuration
* </ul>
*
* @param <I> the Java type of the plan info
* @param <T> the Java type of the tasks to read content files
* @param <F> the Java type of the content files
* @param <G> the Java type of the planned groups
*/
public interface FileRewritePlanner<
I,
T extends ContentScanTask<F>,
F extends ContentFile<F>,
G extends FileRewriteGroup<I, T, F>> {

/** Returns a description for this rewriter. */
default String description() {
return getClass().getName();
}

/**
* Returns a set of supported options for this rewriter. Only options specified in this list will
* be accepted at runtime. Any other options will be rejected.
*/
Set<String> validOptions();

/**
* Initializes this rewriter using provided options.
*
* @param options options to initialize this rewriter
*/
void init(Map<String, String> options);

/**
* Generates the plan for rewrite.
*
* @return the generated plan which could be executed during the compaction
*/
FileRewritePlan<I, T, F, G> plan();
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@
*
* @param <T> the Java type of tasks to read content files
* @param <F> the Java type of content files
* @deprecated since 1.8.0, will be removed in 1.9.0; use {@link FileRewritePlanner} and {@link
* FileRewriteExecutor}.
*/
@Deprecated
public interface FileRewriter<T extends ContentScanTask<F>, F extends ContentFile<F>> {

/** Returns a description for this rewriter. */
Expand Down
Loading