Skip to content

Conversation

@laithalzyoud
Copy link
Contributor

@laithalzyoud laithalzyoud commented Mar 22, 2024

This PR adds a new Spark action to copy an Iceberg table. The action includes processing metadata files, manifest lists, and position delete files to reflect changes in the location prefixes, aiming to support operations like migration to a new storage location or table duplication.

Here's a breakdown of what it does and how it works:

  • Initializes and validates input parameters, such as source and target location prefixes, start and end versions.
  • Executes the copy action, which involves rebuilding metadata to reflect the new locations, creating a staging area for the copied files, and generating lists of data and metadata files to move.
  • Supports rewriting of metadata files, manifest files and lists, and position delete files to update paths inside these files to accommodate the new location prefix.
  • Identify which data files need to be moved based on the snapshots included between the specified start and end versions of the table.
  • Utilizes Spark to parallelize the processing of rewriting files, enabling efficient handling of large tables.
  • Supports Spark 3.3, 3.4 and 3.5

This PR extends @flyrain original PR #4705

@laithalzyoud laithalzyoud changed the title Spark: Add CopyTable spark action [Draft] Spark: Add CopyTable spark action Mar 22, 2024
@manuzhang
Copy link
Member

How about position delete files?

@laithalzyoud
Copy link
Contributor Author

How about position delete files?

@manuzhang They are covered in this PR, let me add that in the description as well 👌

@laithalzyoud laithalzyoud marked this pull request as draft March 22, 2024 15:18
@amogh-jahagirdar amogh-jahagirdar self-requested a review March 22, 2024 17:13
@laithalzyoud laithalzyoud changed the title [Draft] Spark: Add CopyTable spark action Spark: Add CopyTable spark action Mar 25, 2024
@laithalzyoud laithalzyoud marked this pull request as ready for review March 25, 2024 10:30
@nastra
Copy link
Contributor

nastra commented Mar 25, 2024

Supports Spark 3.3, 3.4 and 3.5

We should probably first focus on a single Spark version (3.5) and once the PR is merged, backport the changes to previous Spark versions. Otherwise it will be difficult to review/change the same stuff across multiple Spark versions.

*/
package org.apache.iceberg.actions;

public class BaseCopyTableActionResult implements CopyTable.Result {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should probably be similar to how all the other Result classes are implemented, such as

@Value.Enclosing
@SuppressWarnings("ImmutablesStyle")
@Value.Style(
typeImmutableEnclosing = "ImmutableDeleteReachableFiles",
visibilityString = "PUBLIC",
builderVisibilityString = "PUBLIC")
interface BaseDeleteReachableFiles extends DeleteReachableFiles {
@Value.Immutable
interface Result extends DeleteReachableFiles.Result {}
}

.expireSnapshotId(sourceTable.currentSnapshot().parentId())
.execute();

AssertHelpers.assertThrows(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is deprecated code. please use assertThatThrownBy(...).isInstanceOf(..).hasMessage(...)


assertThat(count)
.as("The rebuilt metadata file number should be")
.isEqualTo(filesToMove.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actual/expected are wrong here. Should be assertThat(filesToMove).hasSize(count)

.as(Encoders.STRING())
.collectAsList();

assertThat(count).as("The rebuilt data file number should be").isEqualTo(filesToMove.size());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actual/expected are wrong here and should be the other way around

.as(Encoders.STRING())
.collectAsList();

assertThat(versionFileCount)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems a bunch of assertions have actual/expected in the wrong order. Please also update all the other places

// Utility class
}

public static TableMetadata replacePaths(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add a TestTableMetadataUtil with some tests where metadata/prefixes can be null/empty/invalid/valid

}

@Override
public CopyTable lastCopiedVersion(String sVersion) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not clear what sVersion refers to, so why not newStartVersion? Same for all the other params

@nastra
Copy link
Contributor

nastra commented Mar 25, 2024

@laithalzyoud thanks for working on this. I just did a very quick high-level review, but will do a more thorough one this week

@laithalzyoud
Copy link
Contributor Author

@laithalzyoud thanks for working on this. I just did a very quick high-level review, but will do a more thorough one this week

Thanks for taking a look @nastra, I'll stash the 3.3 and 3.4 implementations for now and have them in another PR after this one is merged and address the comments as well 👍

rewriteVersionFile(metadata, stagingPath);

List<MetadataLogEntry> versions = metadata.previousFiles();
for (int i = versions.size() - 1; i >= 0; i--) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be rewritten as List<MetadataLogEntry> versions = Lists.reverse(metadata.previousFiles()); for (MetadataLogEntry version: versions) { if (version.file().equals(startVersion)) { break; } }

@RussellSpitzer
Copy link
Member

@flyrain You should take a look at this as well

Copy link
Contributor

@amogh-jahagirdar amogh-jahagirdar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @laithalzyoud this is great to see! Beyond @nastra's point of doing Spark 3.5 separately, it would be ideal to have the CopyTable API changes be in a separate PR first.

Having the API changes be separate allows us to discuss things like semantics/expectations/preconditions of the API just so the community is on the same page as to what they can expect when working with this action and if the right options are exposed.

After that point, we can look at the implementation. One aspect of implementation that I think I'd also separate is the exposing of ManifestLists/ManifestReader. I totally get why that's required for this action but I think it's worth having that in a separate commit.

Lastly, this gets more into the API and implementation but I think we should figure out if there should be separate sorts of exposed "operations" for replicating a given manifest/manifest list rather than having it all embedded in the spark procedure etc. Those operations can invidvidually be used for reasons beyond copying for replication; they can be used for fixing corrupt metadata if the right APIs are exposed.

Comment on lines +346 to +358
try {
dataFiles
.repartition(1)
.write()
.mode(SaveMode.Overwrite)
.format("text")
.save(dataFileListPath);
} catch (Exception e) {
throw new UnsupportedOperationException(
"Failed to build the data files dataframe, the end version you are "
+ "trying to copy may contain invalid snapshots, please use the younger version which doesn't have invalid "
+ "snapshots",
e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something to discuss I think when we define the API semantics. I think it's a bit awkward to dump paths into our own "text manifest". Why shouldn't the action execute the copy of the actual data files (I mean the actual Parquet files)? There's a bunch of ways to do that and I think in Iceberg we should have the right interfaces and some basic implementations to facilitate that. Lmk if that makes sense.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to support actually moving the files - then we will need to support different cloud providers (GCP, AWS, Azure) as well as on-prem setups (i.e copying in Linux). It might not be ideal for some use-cases as well - for example in our case where we are actually using this in a production setting, copying using GCS client libraries was extremely slow and inefficient for huge tables (>5TB) and instead we used a managed service from GCP to handle the move efficiently. So this can actually be very specific to where the files are stored and where you want to move them, you can consider a use-case of someone migrating between 2 different cloud providers or moving data from on-premise to the cloud for example and so on. So from my perspective for it's better to just rewrite the paths so the table is usable in a new location and leave the actual copying to the users, maybe in a future iteration some basic interface to move files for common use-cases can be implemented

@flyrain
Copy link
Contributor

flyrain commented Mar 28, 2024

Thanks @laithalzyoud for taking lead for the copy table action. Agreed with @amogh-jahagirdar, can we separated the PR to interface only PR, and implementation PRs. That way, we can get a consensus on interface first.

@ajantha-bhat
Copy link
Member

@laithalzyoud : Are you planning to address the comments on this? This feature is definitely useful.
If not, I would like to take it up.

@laithalzyoud
Copy link
Contributor Author

@laithalzyoud : Are you planning to address the comments on this? This feature is definitely useful. If not, I would like to take it up.

Hey @ajantha-bhat! Yes I'm planning to continue working on this soon, you can help in the code review if you'd like 👍

@huaxingao
Copy link
Contributor

@laithalzyoud Thanks for your work on this PR! I've noticed there hasn't been activity for a while, and I wanted to check if you're still able to continue working on it. If you're busy with other commitments and would like some help, I’d be glad to take over or assist. Thanks!

@moomindani
Copy link

@laithalzyoud I am super interested in this PR and it will unblock many use cases. Are you working on this now?

@huaxingao
Copy link
Contributor

@laithalzyoud Thanks for the thumbs-up! Could you please confirm if you are planning to continue working on this PR, or would you like me to take over? I’m happy to help in any way needed. Thank you!

@laithalzyoud
Copy link
Contributor Author

Hey @huaxingao! I'm planning to continue working on it starting this week. For now I'll close this PR and open a new one to just add the interface, once we agree on the interface, I'll create the implementation PR after like agreed earlier with @flyrain and @amogh-jahagirdar 👍

@laithalzyoud
Copy link
Contributor Author

I created the PR to just add the interface, please feel to review it and provide feedback!

@loudwanderingdune
Copy link

I see the interface PR #10920 has been merged. Is the implementation ready to be worked on?

@manuzhang
Copy link
Member

@loudwanderingdune This feature is already implemented and released. Please check out https://iceberg.apache.org/docs/nightly/spark-procedures/#table-replication.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.