diff --git a/paimon-core/pom.xml b/paimon-core/pom.xml
index e137d57a6db1..1531d9f511b3 100644
--- a/paimon-core/pom.xml
+++ b/paimon-core/pom.xml
@@ -227,6 +227,13 @@ under the License.
test
+
+ org.apache.iceberg
+ iceberg-parquet
+ ${iceberg.version}
+ test
+
+
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergPathFactory.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergPathFactory.java
index 74d2e8e48f1b..9025dbe87ad0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergPathFactory.java
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergPathFactory.java
@@ -68,6 +68,10 @@ public Path toMetadataPath(long snapshotId) {
return new Path(metadataDirectory(), String.format("v%d.metadata.json", snapshotId));
}
+ public Path toMetadataPath(String metadataName) {
+ return new Path(metadataDirectory(), metadataName);
+ }
+
public Stream getAllMetadataPathBefore(FileIO fileIO, long snapshotId)
throws IOException {
return FileUtils.listVersionedFileStatus(fileIO, metadataDirectory, "v")
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntrySerializer.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntrySerializer.java
index d93456c3fe20..b9d2c271b509 100644
--- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntrySerializer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntrySerializer.java
@@ -54,4 +54,32 @@ public IcebergManifestEntry fromRow(InternalRow row) {
row.getLong(3),
fileSerializer.fromRow(row.getRow(4, fileSerializer.numFields())));
}
+
+ public IcebergManifestEntry fromRow(InternalRow row, IcebergManifestFileMeta meta) {
+ IcebergManifestEntry.Status status = IcebergManifestEntry.Status.fromId(row.getInt(0));
+ long snapshotId = row.isNullAt(1) ? meta.addedSnapshotId() : row.getLong(1);
+ long sequenceNumber = getOrInherit(row, meta, 2, status);
+ long fileSequenceNumber = getOrInherit(row, meta, 3, status);
+
+ return new IcebergManifestEntry(
+ status,
+ snapshotId,
+ sequenceNumber,
+ fileSequenceNumber,
+ fileSerializer.fromRow(row.getRow(4, fileSerializer.numFields())));
+ }
+
+ private long getOrInherit(
+ InternalRow row,
+ IcebergManifestFileMeta meta,
+ int pos,
+ IcebergManifestEntry.Status status) {
+ long sequenceNumber = meta.sequenceNumber();
+ if (row.isNullAt(pos)
+ && (sequenceNumber == 0 || status == IcebergManifestEntry.Status.ADDED)) {
+ return sequenceNumber;
+ } else {
+ return row.getLong(pos);
+ }
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
index 5955da6220f8..4553a1c850a0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
@@ -19,6 +19,7 @@
package org.apache.paimon.iceberg.manifest;
import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.FormatWriterFactory;
@@ -38,9 +39,14 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.FileUtils;
+import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.ObjectsFile;
import org.apache.paimon.utils.PathFactory;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
@@ -109,6 +115,50 @@ public static IcebergManifestFile create(FileStoreTable table, IcebergPathFactor
table.coreOptions().manifestTargetSize());
}
+ public List read(IcebergManifestFileMeta meta) {
+ return read(meta, null);
+ }
+
+ public List read(IcebergManifestFileMeta meta, @Nullable Long fileSize) {
+ String fileName = new Path(meta.manifestPath()).getName();
+ try {
+ Path path = pathFactory.toPath(fileName);
+
+ return readFromIterator(
+ meta,
+ createIterator(path, fileSize),
+ (IcebergManifestEntrySerializer) serializer,
+ Filter.alwaysTrue());
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to read " + fileName, e);
+ }
+ }
+
+ private CloseableIterator createIterator(Path file, @Nullable Long fileSize)
+ throws IOException {
+ return FileUtils.createFormatReader(fileIO, readerFactory, file, fileSize)
+ .toCloseableIterator();
+ }
+
+ private static List readFromIterator(
+ IcebergManifestFileMeta meta,
+ CloseableIterator inputIterator,
+ IcebergManifestEntrySerializer serializer,
+ Filter readFilter) {
+ try (CloseableIterator iterator = inputIterator) {
+ List result = new ArrayList<>();
+ while (iterator.hasNext()) {
+ InternalRow row = iterator.next();
+ if (readFilter.test(row)) {
+ result.add(serializer.fromRow(row, meta));
+ }
+ }
+ return result;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
public List rollingWrite(
Iterator entries, long sequenceNumber) {
RollingFileWriter writer =
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java
index 4ecc77a13581..a310e64f6474 100644
--- a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java
@@ -20,13 +20,22 @@
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BinaryType;
+import org.apache.paimon.types.BooleanType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DateType;
import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -64,7 +73,7 @@ public class IcebergDataField {
@JsonProperty(FIELD_TYPE)
private final Object type;
- @JsonIgnore private final DataType dataType;
+ @JsonIgnore private DataType dataType;
@JsonProperty(FIELD_DOC)
private final String doc;
@@ -126,6 +135,10 @@ public String doc() {
@JsonIgnore
public DataType dataType() {
+ if (dataType != null) {
+ return dataType;
+ }
+ dataType = getDataTypeFromType();
return Preconditions.checkNotNull(dataType);
}
@@ -190,6 +203,70 @@ private static Object toTypeObject(DataType dataType, int fieldId, int depth) {
}
}
+ private DataType getDataTypeFromType() {
+ String simpleType = type.toString();
+ String delimiter = "(";
+ if (simpleType.contains("[")) {
+ delimiter = "[";
+ }
+ String typePrefix =
+ !simpleType.contains(delimiter)
+ ? simpleType
+ : simpleType.substring(0, simpleType.indexOf(delimiter));
+ switch (typePrefix) {
+ case "boolean":
+ return new BooleanType(!required);
+ case "int":
+ return new IntType(!required);
+ case "long":
+ return new BigIntType(!required);
+ case "float":
+ return new FloatType(!required);
+ case "double":
+ return new DoubleType(!required);
+ case "date":
+ return new DateType(!required);
+ case "string":
+ return new VarCharType(!required, VarCharType.MAX_LENGTH);
+ case "binary":
+ return new VarBinaryType(!required, VarBinaryType.MAX_LENGTH);
+ case "fixed":
+ int fixedLength =
+ Integer.parseInt(
+ simpleType.substring(
+ simpleType.indexOf("[") + 1, simpleType.indexOf("]")));
+ return new BinaryType(!required, fixedLength);
+ case "uuid":
+ // https://iceberg.apache.org/spec/?h=vector#primitive-types
+ // uuid should use 16-byte fixed
+ return new BinaryType(!required, 16);
+ case "decimal":
+ int precision =
+ Integer.parseInt(
+ simpleType.substring(
+ simpleType.indexOf("(") + 1, simpleType.indexOf(",")));
+ int scale =
+ Integer.parseInt(
+ simpleType.substring(
+ simpleType.indexOf(",") + 2, simpleType.indexOf(")")));
+ return new DecimalType(!required, precision, scale);
+ case "timestamp":
+ return new TimestampType(!required, 6);
+ case "timestamptz":
+ return new LocalZonedTimestampType(!required, 6);
+ case "timestamp_ns": // iceberg v3 format
+ return new TimestampType(!required, 9);
+ case "timestamptz_ns": // iceberg v3 format
+ return new LocalZonedTimestampType(!required, 9);
+ default:
+ throw new UnsupportedOperationException("Unsupported data type: " + type);
+ }
+ }
+
+ public DataField toDatafield() {
+ return new DataField(id, name, dataType(), doc);
+ }
+
@Override
public int hashCode() {
return Objects.hash(id, name, required, type, doc);
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
index 86fb4a5df75a..fbaf8060022f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
@@ -106,7 +106,7 @@ public class IcebergMetadata {
private final List snapshots;
@JsonProperty(FIELD_CURRENT_SNAPSHOT_ID)
- private final int currentSnapshotId;
+ private final long currentSnapshotId;
@JsonProperty(FIELD_PROPERTIES)
@Nullable
@@ -122,7 +122,7 @@ public IcebergMetadata(
List partitionSpecs,
int lastPartitionId,
List snapshots,
- int currentSnapshotId) {
+ long currentSnapshotId) {
this(
CURRENT_FORMAT_VERSION,
tableUuid,
@@ -158,7 +158,7 @@ public IcebergMetadata(
@JsonProperty(FIELD_SORT_ORDERS) List sortOrders,
@JsonProperty(FIELD_DEFAULT_SORT_ORDER_ID) int defaultSortOrderId,
@JsonProperty(FIELD_SNAPSHOTS) List snapshots,
- @JsonProperty(FIELD_CURRENT_SNAPSHOT_ID) int currentSnapshotId,
+ @JsonProperty(FIELD_CURRENT_SNAPSHOT_ID) long currentSnapshotId,
@JsonProperty(FIELD_PROPERTIES) @Nullable Map properties) {
this.formatVersion = formatVersion;
this.tableUuid = tableUuid;
@@ -249,7 +249,7 @@ public List snapshots() {
}
@JsonGetter(FIELD_CURRENT_SNAPSHOT_ID)
- public int currentSnapshotId() {
+ public long currentSnapshotId() {
return currentSnapshotId;
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadata.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadata.java
new file mode 100644
index 000000000000..a6c5fb027ba2
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadata.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.migrate;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.iceberg.IcebergPathFactory;
+import org.apache.paimon.iceberg.metadata.IcebergMetadata;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/** Get iceberg table latest snapshot metadata in hadoop. */
+public class IcebergMigrateHadoopMetadata implements IcebergMigrateMetadata {
+ private static final Logger LOG = LoggerFactory.getLogger(IcebergMigrateHadoopMetadata.class);
+
+ private static final String VERSION_HINT_FILENAME = "version-hint.text";
+ private static final String ICEBERG_WAREHOUSE = "iceberg_warehouse";
+
+ private final FileIO fileIO;
+ private final Identifier icebergIdentifier;
+ private final Options icebergOptions;
+
+ private Path icebergLatestMetaVersionPath;
+ private IcebergPathFactory icebergMetaPathFactory;
+
+ public IcebergMigrateHadoopMetadata(
+ Identifier icebergIdentifier, FileIO fileIO, Options icebergOptions) {
+ this.fileIO = fileIO;
+ this.icebergIdentifier = icebergIdentifier;
+ this.icebergOptions = icebergOptions;
+ }
+
+ @Override
+ public IcebergMetadata icebergMetadata() {
+ Preconditions.checkArgument(
+ icebergOptions.get(ICEBERG_WAREHOUSE) != null,
+ "'iceberg_warehouse' is null. "
+ + "In hadoop-catalog, you should explicitly set this argument for finding iceberg metadata.");
+ this.icebergMetaPathFactory =
+ new IcebergPathFactory(
+ new Path(
+ icebergOptions.get(ICEBERG_WAREHOUSE),
+ new Path(
+ String.format(
+ "%s/%s/metadata",
+ icebergIdentifier.getDatabaseName(),
+ icebergIdentifier.getTableName()))));
+ long icebergLatestMetaVersion = getIcebergLatestMetaVersion();
+
+ this.icebergLatestMetaVersionPath =
+ icebergMetaPathFactory.toMetadataPath(icebergLatestMetaVersion);
+ LOG.info(
+ "iceberg latest snapshot metadata file location: {}", icebergLatestMetaVersionPath);
+
+ return IcebergMetadata.fromPath(fileIO, icebergLatestMetaVersionPath);
+ }
+
+ @Override
+ public String icebergLatestMetadataLocation() {
+ return icebergLatestMetaVersionPath.toString();
+ }
+
+ @Override
+ public void deleteOriginTable() {
+ Path tablePath = icebergMetaPathFactory.metadataDirectory().getParent();
+ LOG.info("Iceberg table path to be deleted:{}", tablePath);
+ try {
+ if (fileIO.isDir(tablePath)) {
+ fileIO.deleteDirectoryQuietly(tablePath);
+ }
+ } catch (IOException e) {
+ LOG.warn("exception occurred when deleting origin table.", e);
+ }
+ }
+
+ private long getIcebergLatestMetaVersion() {
+ Path versionHintPath =
+ new Path(icebergMetaPathFactory.metadataDirectory(), VERSION_HINT_FILENAME);
+ try {
+ return Integer.parseInt(fileIO.readFileUtf8(versionHintPath));
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "read iceberg version-hint.text failed. Iceberg metadata path: "
+ + icebergMetaPathFactory.metadataDirectory(),
+ e);
+ }
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadataFactory.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadataFactory.java
new file mode 100644
index 000000000000..666630101445
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHadoopMetadataFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.migrate;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.iceberg.IcebergOptions;
+import org.apache.paimon.options.Options;
+
+/** Factory to create {@link IcebergMigrateHadoopMetadata}. */
+public class IcebergMigrateHadoopMetadataFactory implements IcebergMigrateMetadataFactory {
+
+ @Override
+ public String identifier() {
+ return IcebergOptions.StorageType.HADOOP_CATALOG.toString() + "_migrate";
+ }
+
+ @Override
+ public IcebergMigrateHadoopMetadata create(
+ Identifier icebergIdentifier, FileIO fileIO, Options icebergOptions) {
+ return new IcebergMigrateHadoopMetadata(icebergIdentifier, fileIO, icebergOptions);
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateMetadata.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateMetadata.java
new file mode 100644
index 000000000000..58648c537f90
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateMetadata.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.migrate;
+
+import org.apache.paimon.iceberg.metadata.IcebergMetadata;
+
+/**
+ * Get iceberg metadata for migrate. Each kind of iceberg catalog should have its own
+ * implementation.
+ */
+public interface IcebergMigrateMetadata {
+
+ IcebergMetadata icebergMetadata();
+
+ String icebergLatestMetadataLocation();
+
+ void deleteOriginTable();
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateMetadataFactory.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateMetadataFactory.java
new file mode 100644
index 000000000000..f727088f5d11
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateMetadataFactory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.migrate;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.factories.Factory;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.options.Options;
+
+/** Factory to create {@link IcebergMigrateMetadata}. */
+public interface IcebergMigrateMetadataFactory extends Factory {
+
+ IcebergMigrateMetadata create(
+ Identifier icebergIdentifier, FileIO fileIO, Options icebergOptions);
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java
new file mode 100644
index 000000000000..44162dea7fc3
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.migrate;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.factories.FactoryException;
+import org.apache.paimon.factories.FactoryUtil;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.iceberg.IcebergOptions;
+import org.apache.paimon.iceberg.IcebergPathFactory;
+import org.apache.paimon.iceberg.manifest.IcebergDataFileMeta;
+import org.apache.paimon.iceberg.manifest.IcebergManifestEntry;
+import org.apache.paimon.iceberg.manifest.IcebergManifestFile;
+import org.apache.paimon.iceberg.manifest.IcebergManifestFileMeta;
+import org.apache.paimon.iceberg.manifest.IcebergManifestList;
+import org.apache.paimon.iceberg.metadata.IcebergDataField;
+import org.apache.paimon.iceberg.metadata.IcebergMetadata;
+import org.apache.paimon.iceberg.metadata.IcebergPartitionField;
+import org.apache.paimon.iceberg.metadata.IcebergPartitionSpec;
+import org.apache.paimon.iceberg.metadata.IcebergSchema;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.migrate.FileMetaUtils;
+import org.apache.paimon.migrate.Migrator;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.utils.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
+
+/** migrate iceberg table to paimon table. */
+public class IcebergMigrator implements Migrator {
+ private static final Logger LOG = LoggerFactory.getLogger(IcebergMigrator.class);
+
+ private final ThreadPoolExecutor executor;
+
+ private final Catalog paimonCatalog;
+ private final FileIO paimonFileIO;
+ private final String paimonDatabaseName;
+ private final String paimonTableName;
+
+ private final String icebergDatabaseName;
+ private final String icebergTableName;
+ private final Options icebergOptions;
+
+ private final IcebergMigrateMetadata icebergMigrateMetadata;
+ // metadata path factory for iceberg metadata
+ private final IcebergPathFactory icebergMetaPathFactory;
+ // latest metadata file path
+ private final String icebergLatestMetadataLocation;
+ // metadata for newest iceberg snapshot
+ private final IcebergMetadata icebergMetadata;
+
+ private Boolean deleteOriginTable = true;
+
+ public IcebergMigrator(
+ Catalog paimonCatalog,
+ String paimonDatabaseName,
+ String paimonTableName,
+ String icebergDatabaseName,
+ String icebergTableName,
+ Options icebergOptions,
+ Integer parallelism) {
+ this.paimonCatalog = paimonCatalog;
+ this.paimonFileIO = paimonCatalog.fileIO();
+ this.paimonDatabaseName = paimonDatabaseName;
+ this.paimonTableName = paimonTableName;
+
+ this.icebergDatabaseName = icebergDatabaseName;
+ this.icebergTableName = icebergTableName;
+ this.icebergOptions = icebergOptions;
+
+ Preconditions.checkArgument(
+ icebergOptions.containsKey(IcebergOptions.METADATA_ICEBERG_STORAGE.key()),
+ "'metadata.iceberg.storage' is required, please make sure it has been set.");
+
+ IcebergMigrateMetadataFactory icebergMigrateMetadataFactory;
+ try {
+ icebergMigrateMetadataFactory =
+ FactoryUtil.discoverFactory(
+ IcebergMigrator.class.getClassLoader(),
+ IcebergMigrateMetadataFactory.class,
+ icebergOptions.get(IcebergOptions.METADATA_ICEBERG_STORAGE).toString()
+ + "_migrate");
+ } catch (FactoryException e) {
+ throw new RuntimeException("create IcebergMigrateMetadataFactory failed.", e);
+ }
+
+ icebergMigrateMetadata =
+ icebergMigrateMetadataFactory.create(
+ Identifier.create(icebergDatabaseName, icebergTableName),
+ paimonFileIO,
+ icebergOptions);
+
+ this.icebergMetadata = icebergMigrateMetadata.icebergMetadata();
+ this.icebergLatestMetadataLocation = icebergMigrateMetadata.icebergLatestMetadataLocation();
+ this.icebergMetaPathFactory =
+ new IcebergPathFactory(new Path(icebergLatestMetadataLocation).getParent());
+
+ this.executor = createCachedThreadPool(parallelism, "ICEBERG_MIGRATOR");
+ }
+
+ @Override
+ public void executeMigrate() throws Exception {
+ Schema paimonSchema = icebergSchemaToPaimonSchema(icebergMetadata);
+ Identifier paimonIdentifier = Identifier.create(paimonDatabaseName, paimonTableName);
+
+ paimonCatalog.createDatabase(paimonDatabaseName, true);
+ paimonCatalog.createTable(paimonIdentifier, paimonSchema, false);
+
+ try {
+ FileStoreTable paimonTable = (FileStoreTable) paimonCatalog.getTable(paimonIdentifier);
+
+ IcebergManifestFile manifestFile =
+ IcebergManifestFile.create(paimonTable, icebergMetaPathFactory);
+ IcebergManifestList manifestList =
+ IcebergManifestList.create(paimonTable, icebergMetaPathFactory);
+
+ List icebergManifestFileMetas =
+ manifestList.read(icebergMetadata.currentSnapshot().manifestList());
+
+ // check manifest file with 'DELETE' kind
+ checkAndFilterManifestFiles(icebergManifestFileMetas);
+
+ // get all live iceberg entries
+ List icebergEntries =
+ icebergManifestFileMetas.stream()
+ .flatMap(fileMeta -> manifestFile.read(fileMeta).stream())
+ .filter(IcebergManifestEntry::isLive)
+ .collect(Collectors.toList());
+ if (icebergEntries.isEmpty()) {
+ LOG.info(
+ "No live manifest entry in iceberg table for snapshot {}, iceberg table meta path is {}.",
+ icebergMetadata.currentSnapshotId(),
+ icebergLatestMetadataLocation);
+ return;
+ }
+
+ List icebergDataFileMetas =
+ icebergEntries.stream()
+ .map(IcebergManifestEntry::file)
+ .collect(Collectors.toList());
+
+ // Again, check if delete File exists
+ checkAndFilterDataFiles(icebergDataFileMetas);
+
+ LOG.info(
+ "Begin to create Migrate Task, the number of iceberg data files is {}",
+ icebergDataFileMetas.size());
+
+ List tasks = new ArrayList<>();
+ Map rollback = new ConcurrentHashMap<>();
+ if (paimonTable.partitionKeys().isEmpty()) {
+ tasks.add(importUnPartitionedTable(icebergDataFileMetas, paimonTable, rollback));
+ } else {
+ tasks.addAll(importPartitionedTable(icebergDataFileMetas, paimonTable, rollback));
+ }
+
+ List> futures =
+ tasks.stream().map(executor::submit).collect(Collectors.toList());
+ List commitMessages = new ArrayList<>();
+ try {
+ for (Future future : futures) {
+ commitMessages.add(future.get());
+ }
+ } catch (Exception e) {
+ futures.forEach(f -> f.cancel(true));
+ for (Future> future : futures) {
+ // wait all task cancelled or finished
+ while (!future.isDone()) {
+ //noinspection BusyWait
+ Thread.sleep(100);
+ }
+ }
+ // roll back all renamed path
+ for (Map.Entry entry : rollback.entrySet()) {
+ Path newPath = entry.getKey();
+ Path origin = entry.getValue();
+ if (paimonFileIO.exists(newPath)) {
+ paimonFileIO.rename(newPath, origin);
+ }
+ }
+
+ throw new RuntimeException("Migrating failed because exception happens", e);
+ }
+ try (BatchTableCommit commit = paimonTable.newBatchWriteBuilder().newCommit()) {
+ commit.commit(new ArrayList<>(commitMessages));
+ LOG.info("paimon commit success! Iceberg data files have been migrated to paimon.");
+ }
+ } catch (Exception e) {
+ paimonCatalog.dropTable(paimonIdentifier, true);
+ throw new RuntimeException("Migrating failed", e);
+ }
+
+ // if all success, drop the origin table according the delete field
+ if (deleteOriginTable) {
+ icebergMigrateMetadata.deleteOriginTable();
+ }
+ }
+
+ @Override
+ public void deleteOriginTable(boolean delete) throws Exception {
+ this.deleteOriginTable = delete;
+ }
+
+ @Override
+ public void renameTable(boolean ignoreIfNotExists) throws Exception {
+ Identifier targetTableId = Identifier.create(paimonDatabaseName, paimonTableName);
+ Identifier sourceTableId = Identifier.create(icebergDatabaseName, icebergTableName);
+ LOG.info("Last step: rename {} to {}.", targetTableId, sourceTableId);
+ paimonCatalog.renameTable(targetTableId, sourceTableId, ignoreIfNotExists);
+ }
+
+ public Schema icebergSchemaToPaimonSchema(IcebergMetadata icebergMetadata) {
+ // get iceberg current schema
+ IcebergSchema icebergSchema =
+ icebergMetadata.schemas().get(icebergMetadata.currentSchemaId());
+
+ // get iceberg current partition spec
+ int currentPartitionSpecId = icebergMetadata.defaultSpecId();
+ IcebergPartitionSpec currentIcebergPartitionSpec =
+ icebergMetadata.partitionSpecs().get(currentPartitionSpecId);
+
+ List dataFields =
+ icebergSchema.fields().stream()
+ .map(IcebergDataField::toDatafield)
+ .collect(Collectors.toList());
+
+ List partitionKeys =
+ currentIcebergPartitionSpec.fields().stream()
+ .map(IcebergPartitionField::name)
+ .collect(Collectors.toList());
+
+ return new Schema(
+ dataFields, partitionKeys, Collections.emptyList(), Collections.emptyMap(), null);
+ }
+
+ private void checkAndFilterManifestFiles(
+ List icebergManifestFileMetas) {
+
+ for (IcebergManifestFileMeta meta : icebergManifestFileMetas) {
+ Preconditions.checkArgument(
+ meta.content() != IcebergManifestFileMeta.Content.DELETES,
+ "IcebergMigrator don't support analyzing manifest file with 'DELETE' content.");
+ }
+ }
+
+ private void checkAndFilterDataFiles(List icebergDataFileMetas) {
+
+ for (IcebergDataFileMeta meta : icebergDataFileMetas) {
+ Preconditions.checkArgument(
+ meta.content() == IcebergDataFileMeta.Content.DATA,
+ "IcebergMigrator don't support analyzing iceberg delete file.");
+ }
+ }
+
+ private static List construct(
+ List icebergDataFileMetas,
+ FileIO fileIO,
+ Table paimonTable,
+ Path newDir,
+ Map rollback) {
+ return icebergDataFileMetas.stream()
+ .map(
+ icebergDataFileMeta ->
+ constructFileMeta(
+ icebergDataFileMeta, fileIO, paimonTable, newDir, rollback))
+ .collect(Collectors.toList());
+ }
+
+ private static DataFileMeta constructFileMeta(
+ IcebergDataFileMeta icebergDataFileMeta,
+ FileIO fileIO,
+ Table table,
+ Path dir,
+ Map rollback) {
+ FileStatus status;
+ try {
+ status = fileIO.getFileStatus(new Path(icebergDataFileMeta.filePath()));
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "error when get file status. file path is " + icebergDataFileMeta.filePath(),
+ e);
+ }
+ String format = icebergDataFileMeta.fileFormat();
+ return FileMetaUtils.constructFileMeta(format, status, fileIO, table, dir, rollback);
+ }
+
+ private MigrateTask importUnPartitionedTable(
+ List icebergDataFileMetas,
+ FileStoreTable paimonTable,
+ Map rollback) {
+ BinaryRow partitionRow = BinaryRow.EMPTY_ROW;
+ Path newDir = paimonTable.store().pathFactory().bucketPath(partitionRow, 0);
+
+ return new MigrateTask(
+ icebergDataFileMetas, paimonFileIO, paimonTable, partitionRow, newDir, rollback);
+ }
+
+ private List importPartitionedTable(
+ List icebergDataFileMetas,
+ FileStoreTable paimonTable,
+ Map rollback) {
+ Map> dataInPartition =
+ icebergDataFileMetas.stream()
+ .collect(Collectors.groupingBy(IcebergDataFileMeta::partition));
+ List migrateTasks = new ArrayList<>();
+ for (Map.Entry> entry : dataInPartition.entrySet()) {
+ BinaryRow partitionRow = entry.getKey();
+ Path newDir = paimonTable.store().pathFactory().bucketPath(partitionRow, 0);
+ migrateTasks.add(
+ new MigrateTask(
+ entry.getValue(),
+ paimonFileIO,
+ paimonTable,
+ partitionRow,
+ newDir,
+ rollback));
+ }
+ return migrateTasks;
+ }
+
+ /** One import task for one partition. */
+ public static class MigrateTask implements Callable {
+
+ private final List icebergDataFileMetas;
+ private final FileIO fileIO;
+ private final FileStoreTable paimonTable;
+ private final BinaryRow partitionRow;
+ private final Path newDir;
+ private final Map rollback;
+
+ public MigrateTask(
+ List icebergDataFileMetas,
+ FileIO fileIO,
+ FileStoreTable paimonTable,
+ BinaryRow partitionRow,
+ Path newDir,
+ Map rollback) {
+ this.icebergDataFileMetas = icebergDataFileMetas;
+ this.fileIO = fileIO;
+ this.paimonTable = paimonTable;
+ this.partitionRow = partitionRow;
+ this.newDir = newDir;
+ this.rollback = rollback;
+ }
+
+ @Override
+ public CommitMessage call() throws Exception {
+ if (!fileIO.exists(newDir)) {
+ fileIO.mkdirs(newDir);
+ }
+ List fileMetas =
+ construct(icebergDataFileMetas, fileIO, paimonTable, newDir, rollback);
+ return FileMetaUtils.commitFile(partitionRow, fileMetas);
+ }
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
index 391c5f9bb615..c34afc76442e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
@@ -92,9 +92,7 @@ public static CommitMessage commitFile(BinaryRow partition, List d
Collections.emptyList(), Collections.emptyList(), Collections.emptyList()));
}
- // -----------------------------private method---------------------------------------------
-
- private static DataFileMeta constructFileMeta(
+ public static DataFileMeta constructFileMeta(
String format,
FileStatus fileStatus,
FileIO fileIO,
@@ -131,6 +129,8 @@ private static DataFileMeta constructFileMeta(
}
}
+ // -----------------------------private method---------------------------------------------
+
private static Path renameFile(
FileIO fileIO, Path originPath, Path newDir, String format, Map rollback)
throws IOException {
diff --git a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index 6416edd720f8..ff423bffd8df 100644
--- a/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ b/paimon-core/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -39,3 +39,4 @@ org.apache.paimon.mergetree.compact.aggregate.factory.FieldThetaSketchAggFactory
org.apache.paimon.rest.RESTCatalogFactory
org.apache.paimon.rest.auth.BearTokenCredentialsProviderFactory
org.apache.paimon.rest.auth.BearTokenFileCredentialsProviderFactory
+org.apache.paimon.iceberg.migrate.IcebergMigrateHadoopMetadataFactory
diff --git a/paimon-core/src/test/java/org/apache/paimon/iceberg/migrate/IcebergMigrateTest.java b/paimon-core/src/test/java/org/apache/paimon/iceberg/migrate/IcebergMigrateTest.java
new file mode 100644
index 000000000000..aadaca0c3854
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/iceberg/migrate/IcebergMigrateTest.java
@@ -0,0 +1,619 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg.migrate;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.DataFormatTestUtil;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.iceberg.IcebergOptions;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
+
+/** Tests for {@link IcebergMigrator}. */
+public class IcebergMigrateTest {
+ @TempDir java.nio.file.Path iceTempDir;
+ @TempDir java.nio.file.Path paiTempDir;
+
+ Catalog paiCatalog;
+
+ String iceDatabase = "ice_db";
+ String iceTable = "ice_t";
+
+ String paiDatabase = "pai_db";
+ String paiTable = "pai_t";
+
+ Schema iceSchema =
+ new Schema(
+ Types.NestedField.required(1, "k", Types.IntegerType.get()),
+ Types.NestedField.required(2, "v", Types.IntegerType.get()),
+ Types.NestedField.required(3, "dt", Types.StringType.get()),
+ Types.NestedField.required(4, "hh", Types.StringType.get()));
+ Schema iceDeleteSchema =
+ new Schema(
+ Types.NestedField.required(1, "k", Types.IntegerType.get()),
+ Types.NestedField.optional(2, "v", Types.IntegerType.get()));
+
+ PartitionSpec icePartitionSpec =
+ PartitionSpec.builderFor(iceSchema).identity("dt").identity("hh").build();
+
+ Map icebergProperties = new HashMap<>();
+
+ @BeforeEach
+ public void beforeEach() throws Exception {
+ paiCatalog = createPaimonCatalog();
+ icebergProperties.put(IcebergOptions.METADATA_ICEBERG_STORAGE.key(), "hadoop-catalog");
+ icebergProperties.put("iceberg_warehouse", iceTempDir.toString());
+ }
+
+ @ParameterizedTest(name = "isPartitioned = {0}")
+ @ValueSource(booleans = {true, false})
+ public void testMigrateOnlyAdd(boolean isPartitioned) throws Exception {
+ Table icebergTable = createIcebergTable(isPartitioned);
+ String format = "parquet";
+ List records1 =
+ Stream.of(
+ toIcebergRecord(1, 1, "20240101", "00"),
+ toIcebergRecord(2, 2, "20240101", "00"))
+ .collect(Collectors.toList());
+ if (isPartitioned) {
+ writeRecordsToIceberg(icebergTable, format, records1, "20240101", "00");
+ } else {
+ writeRecordsToIceberg(icebergTable, format, records1);
+ }
+
+ List records2 =
+ Stream.of(
+ toIcebergRecord(1, 1, "20240101", "01"),
+ toIcebergRecord(2, 2, "20240101", "01"))
+ .collect(Collectors.toList());
+ if (isPartitioned) {
+ writeRecordsToIceberg(icebergTable, format, records2, "20240101", "01");
+ } else {
+ writeRecordsToIceberg(icebergTable, format, records2);
+ }
+
+ IcebergMigrator icebergMigrator =
+ new IcebergMigrator(
+ paiCatalog,
+ paiDatabase,
+ paiTable,
+ iceDatabase,
+ iceTable,
+ new Options(icebergProperties),
+ 1);
+ icebergMigrator.executeMigrate();
+ icebergMigrator.renameTable(false);
+
+ FileStoreTable paimonTable =
+ (FileStoreTable) paiCatalog.getTable(Identifier.create(iceDatabase, iceTable));
+ List paiResults = getPaimonResult(paimonTable);
+ assertThat(
+ paiResults.stream()
+ .map(row -> String.format("Record(%s)", row))
+ .collect(Collectors.toList()))
+ .hasSameElementsAs(
+ Stream.concat(records1.stream(), records2.stream())
+ .map(GenericRecord::toString)
+ .collect(Collectors.toList()));
+
+ // verify iceberg table has been deleted
+ assertThat(paimonTable.fileIO().exists(new Path(icebergTable.location()))).isFalse();
+ }
+
+ @ParameterizedTest(name = "isPartitioned = {0}")
+ @ValueSource(booleans = {true, false})
+ public void testMigrateAddAndDelete(boolean isPartitioned) throws Exception {
+ Table icebergTable = createIcebergTable(isPartitioned);
+ String format = "parquet";
+ List records1 =
+ Stream.of(
+ toIcebergRecord(1, 1, "20240101", "00"),
+ toIcebergRecord(2, 2, "20240101", "00"))
+ .collect(Collectors.toList());
+ if (isPartitioned) {
+ writeRecordsToIceberg(icebergTable, format, records1, "20240101", "00");
+ } else {
+ writeRecordsToIceberg(icebergTable, format, records1);
+ }
+
+ List records2 =
+ Stream.of(
+ toIcebergRecord(1, 1, "20240101", "01"),
+ toIcebergRecord(2, 2, "20240101", "01"))
+ .collect(Collectors.toList());
+ if (isPartitioned) {
+ writeRecordsToIceberg(icebergTable, format, records2, "20240101", "01");
+ } else {
+ writeRecordsToIceberg(icebergTable, format, records2);
+ }
+
+ // the file written with records2 will be deleted and generate a delete manifest entry, not
+ // a delete file
+ icebergTable.newDelete().deleteFromRowFilter(Expressions.equal("hh", "00")).commit();
+
+ IcebergMigrator icebergMigrator =
+ new IcebergMigrator(
+ paiCatalog,
+ paiDatabase,
+ paiTable,
+ iceDatabase,
+ iceTable,
+ new Options(icebergProperties),
+ 1);
+ icebergMigrator.executeMigrate();
+
+ FileStoreTable paimonTable =
+ (FileStoreTable) paiCatalog.getTable(Identifier.create(paiDatabase, paiTable));
+ List paiResults = getPaimonResult(paimonTable);
+ assertThat(
+ paiResults.stream()
+ .map(row -> String.format("Record(%s)", row))
+ .collect(Collectors.toList()))
+ .hasSameElementsAs(
+ records2.stream()
+ .map(GenericRecord::toString)
+ .collect(Collectors.toList()));
+ }
+
+ @ParameterizedTest(name = "isPartitioned = {0}")
+ @ValueSource(booleans = {true, false})
+ public void testMigrateWithDeleteFile(boolean isPartitioned) throws Exception {
+ // only support create delete file with parquet format
+ Table icebergTable = createIcebergTable(isPartitioned);
+ String format = "parquet";
+ List records1 =
+ Stream.of(
+ toIcebergRecord(1, 1, "20240101", "00"),
+ toIcebergRecord(2, 2, "20240101", "00"))
+ .collect(Collectors.toList());
+ List deleteRecords1 =
+ Collections.singletonList(toIcebergRecord(1, 1, iceDeleteSchema));
+
+ if (isPartitioned) {
+ writeRecordsToIceberg(icebergTable, format, records1, "20240101", "00");
+ writeEqualityDeleteFile(icebergTable, deleteRecords1, "20240101", "00");
+ } else {
+ writeRecordsToIceberg(icebergTable, format, records1);
+ writeEqualityDeleteFile(icebergTable, deleteRecords1);
+ }
+
+ List records2 =
+ Stream.of(
+ toIcebergRecord(1, 1, "20240101", "01"),
+ toIcebergRecord(2, 2, "20240101", "01"))
+ .collect(Collectors.toList());
+ if (isPartitioned) {
+ writeRecordsToIceberg(icebergTable, format, records2, "20240101", "01");
+ } else {
+ writeRecordsToIceberg(icebergTable, format, records2);
+ }
+
+ IcebergMigrator icebergMigrator =
+ new IcebergMigrator(
+ paiCatalog,
+ paiDatabase,
+ paiTable,
+ iceDatabase,
+ iceTable,
+ new Options(icebergProperties),
+ 1);
+
+ assertThatThrownBy(icebergMigrator::executeMigrate)
+ .rootCause()
+ .isInstanceOf(RuntimeException.class)
+ .hasMessage(
+ "IcebergMigrator don't support analyzing manifest file with 'DELETE' content.");
+ }
+
+ @ParameterizedTest(name = "isPartitioned = {0}")
+ @ValueSource(booleans = {true, false})
+ public void testMigrateWithRandomIcebergData(boolean isPartitioned) throws Exception {
+ Table icebergTable = createIcebergTable(isPartitioned);
+ String format = "parquet";
+
+ int numRounds = 50;
+ int numRecords = 20;
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ List expectRecords = new ArrayList<>();
+ for (int i = 0; i < numRounds; i++) {
+ List records = new ArrayList<>();
+ String dt = Integer.toString(random.nextInt(20240101, 20240104));
+ String hh = Integer.toString(random.nextInt(3));
+ for (int j = 0; j < numRecords; j++) {
+ records.add(toIcebergRecord(random.nextInt(100), random.nextInt(100), dt, hh));
+ }
+ expectRecords.addAll(records);
+ if (isPartitioned) {
+ writeRecordsToIceberg(icebergTable, format, records, dt, hh);
+ } else {
+ writeRecordsToIceberg(icebergTable, format, records);
+ }
+ }
+
+ IcebergMigrator icebergMigrator =
+ new IcebergMigrator(
+ paiCatalog,
+ paiDatabase,
+ paiTable,
+ iceDatabase,
+ iceTable,
+ new Options(icebergProperties),
+ 1);
+ icebergMigrator.executeMigrate();
+
+ FileStoreTable paimonTable =
+ (FileStoreTable) paiCatalog.getTable(Identifier.create(paiDatabase, paiTable));
+ List paiResults = getPaimonResult(paimonTable);
+ assertThat(
+ paiResults.stream()
+ .map(row -> String.format("Record(%s)", row))
+ .collect(Collectors.toList()))
+ .hasSameElementsAs(
+ expectRecords.stream()
+ .map(GenericRecord::toString)
+ .collect(Collectors.toList()));
+ }
+
+ @ParameterizedTest(name = "isPartitioned = {0}")
+ @ValueSource(booleans = {true, false})
+ public void testMigrateWithSchemaEvolution(boolean isPartitioned) throws Exception {
+ Table icebergTable = createIcebergTable(isPartitioned);
+ String format = "parquet";
+
+ // write base data
+ List records1 =
+ Stream.of(
+ toIcebergRecord(1, 1, "20240101", "00"),
+ toIcebergRecord(2, 2, "20240101", "00"))
+ .collect(Collectors.toList());
+ if (isPartitioned) {
+ writeRecordsToIceberg(icebergTable, format, records1, "20240101", "00");
+ } else {
+ writeRecordsToIceberg(icebergTable, format, records1);
+ }
+
+ List records2 =
+ Stream.of(
+ toIcebergRecord(1, 1, "20240101", "01"),
+ toIcebergRecord(2, 2, "20240101", "01"))
+ .collect(Collectors.toList());
+ if (isPartitioned) {
+ writeRecordsToIceberg(icebergTable, format, records2, "20240101", "01");
+ } else {
+ writeRecordsToIceberg(icebergTable, format, records2);
+ }
+
+ // TODO: currently only support schema evolution of deleting columns
+ testDeleteColumn(icebergTable, format, isPartitioned);
+ }
+
+ private void testDeleteColumn(Table icebergTable, String format, boolean isPartitioned)
+ throws Exception {
+ icebergTable.updateSchema().deleteColumn("v").commit();
+ Schema newIceSchema = icebergTable.schema();
+ List addedRecords =
+ Stream.of(
+ toIcebergRecord(3, "20240101", "00", newIceSchema),
+ toIcebergRecord(4, "20240101", "00", newIceSchema))
+ .collect(Collectors.toList());
+ if (isPartitioned) {
+ writeRecordsToIceberg(icebergTable, format, addedRecords, "20240101", "00");
+ } else {
+ writeRecordsToIceberg(icebergTable, format, addedRecords);
+ }
+
+ IcebergMigrator icebergMigrator =
+ new IcebergMigrator(
+ paiCatalog,
+ paiDatabase,
+ paiTable,
+ iceDatabase,
+ iceTable,
+ new Options(icebergProperties),
+ 1);
+ icebergMigrator.executeMigrate();
+
+ FileStoreTable paimonTable =
+ (FileStoreTable) paiCatalog.getTable(Identifier.create(paiDatabase, paiTable));
+ List paiResults = getPaimonResult(paimonTable);
+ assertThat(
+ paiResults.stream()
+ .map(row -> String.format("Record(%s)", row))
+ .collect(Collectors.toList()))
+ .hasSameElementsAs(
+ Stream.of(
+ "Record(1, 20240101, 00)",
+ "Record(2, 20240101, 00)",
+ "Record(1, 20240101, 01)",
+ "Record(2, 20240101, 01)",
+ "Record(3, 20240101, 00)",
+ "Record(4, 20240101, 00)")
+ .collect(Collectors.toList()));
+ }
+
+ @Test
+ public void testAllDataTypes() throws Exception {
+ Schema iceAllTypesSchema =
+ new Schema(
+ Types.NestedField.required(1, "c1", Types.BooleanType.get()),
+ Types.NestedField.required(2, "c2", Types.IntegerType.get()),
+ Types.NestedField.required(3, "c3", Types.LongType.get()),
+ Types.NestedField.required(4, "c4", Types.FloatType.get()),
+ Types.NestedField.required(5, "c5", Types.DoubleType.get()),
+ Types.NestedField.required(6, "c6", Types.DateType.get()),
+ Types.NestedField.required(7, "c7", Types.StringType.get()),
+ Types.NestedField.required(8, "c9", Types.BinaryType.get()),
+ Types.NestedField.required(9, "c11", Types.DecimalType.of(10, 2)),
+ Types.NestedField.required(10, "c13", Types.TimestampType.withoutZone()),
+ Types.NestedField.required(11, "c14", Types.TimestampType.withZone()));
+ Table icebergTable = createIcebergTable(false, iceAllTypesSchema);
+ String format = "parquet";
+ GenericRecord record =
+ toIcebergRecord(
+ iceAllTypesSchema,
+ true,
+ 1,
+ 1L,
+ 1.0F,
+ 1.0D,
+ LocalDate.of(2023, 10, 18),
+ "test",
+ ByteBuffer.wrap(new byte[] {1, 2, 3}),
+ new BigDecimal("122.50"),
+ LocalDateTime.now(),
+ OffsetDateTime.now());
+
+ writeRecordsToIceberg(icebergTable, format, Collections.singletonList(record));
+
+ CatalogContext context = CatalogContext.create(new Path(paiTempDir.toString()));
+ context.options().set(CACHE_ENABLED, false);
+ Catalog catalog = CatalogFactory.createCatalog(context);
+ IcebergMigrator icebergMigrator =
+ new IcebergMigrator(
+ paiCatalog,
+ paiDatabase,
+ paiTable,
+ iceDatabase,
+ iceTable,
+ new Options(icebergProperties),
+ 1);
+ icebergMigrator.executeMigrate();
+
+ FileStoreTable paimonTable =
+ (FileStoreTable) catalog.getTable(Identifier.create(paiDatabase, paiTable));
+ List paiResults = getPaimonResult(paimonTable);
+ assertThat(paiResults.size()).isEqualTo(1);
+ }
+
+ private org.apache.iceberg.catalog.Catalog createIcebergCatalog() {
+ Map icebergCatalogOptions = new HashMap<>();
+ icebergCatalogOptions.put("type", "hadoop");
+ icebergCatalogOptions.put("warehouse", iceTempDir.toString());
+
+ return CatalogUtil.buildIcebergCatalog(
+ "iceberg_catalog", icebergCatalogOptions, new Configuration());
+ }
+
+ private Catalog createPaimonCatalog() {
+ CatalogContext context = CatalogContext.create(new Path(paiTempDir.toString()));
+ context.options().set(CACHE_ENABLED, false);
+ return CatalogFactory.createCatalog(context);
+ }
+
+ private Table createIcebergTable(boolean isPartitioned) {
+ return createIcebergTable(isPartitioned, iceSchema);
+ }
+
+ private Table createIcebergTable(boolean isPartitioned, Schema icebergSchema) {
+
+ org.apache.iceberg.catalog.Catalog icebergCatalog = createIcebergCatalog();
+ TableIdentifier icebergIdentifier = TableIdentifier.of(iceDatabase, iceTable);
+
+ if (!isPartitioned) {
+ return icebergCatalog
+ .buildTable(icebergIdentifier, icebergSchema)
+ .withPartitionSpec(PartitionSpec.unpartitioned())
+ .create();
+ } else {
+ return icebergCatalog
+ .buildTable(icebergIdentifier, icebergSchema)
+ .withPartitionSpec(icePartitionSpec)
+ .create();
+ }
+ }
+
+ private GenericRecord toIcebergRecord(Schema icebergSchema, Object... values) {
+ GenericRecord record = GenericRecord.create(icebergSchema);
+ for (int i = 0; i < values.length; i++) {
+ record.set(i, values[i]);
+ }
+ return record;
+ }
+
+ private GenericRecord toIcebergRecord(Object... values) {
+ return toIcebergRecord(iceSchema, values);
+ }
+
+ private DataWriter createIcebergDataWriter(
+ Table icebergTable, String format, OutputFile file, String... partitionValues)
+ throws IOException {
+ Schema schema = icebergTable.schema();
+ PartitionSpec partitionSpec = icebergTable.spec();
+ PartitionKey partitionKey =
+ partitionValues.length == 0
+ ? null
+ : partitionKey(
+ icePartitionSpec,
+ icebergTable,
+ partitionValues[0],
+ partitionValues[1]);
+ // currently only support "parquet" format
+ switch (format) {
+ case "parquet":
+ return Parquet.writeData(file)
+ .schema(schema)
+ .createWriterFunc(GenericParquetWriter::buildWriter)
+ .overwrite()
+ .withSpec(partitionSpec)
+ .withPartition(partitionKey)
+ .build();
+ default:
+ throw new IllegalArgumentException("Unsupported format: " + format);
+ }
+ }
+
+ private void writeRecordsToIceberg(
+ Table icebergTable,
+ String format,
+ List records,
+ String... partitionValues)
+ throws IOException {
+ String filepath = icebergTable.location() + "/" + UUID.randomUUID();
+ OutputFile file = icebergTable.io().newOutputFile(filepath);
+
+ DataWriter dataWriter =
+ createIcebergDataWriter(icebergTable, format, file, partitionValues);
+ try {
+ for (GenericRecord r : records) {
+ dataWriter.write(r);
+ }
+ } finally {
+ dataWriter.close();
+ }
+ DataFile dataFile = dataWriter.toDataFile();
+ icebergTable.newAppend().appendFile(dataFile).commit();
+ }
+
+ private void writeEqualityDeleteFile(
+ Table icebergTable, List deleteRecords, String... partitionValues)
+ throws IOException {
+ String filepath = icebergTable.location() + "/" + UUID.randomUUID();
+ OutputFile file = icebergTable.io().newOutputFile(filepath);
+
+ EqualityDeleteWriter deleteWriter =
+ Parquet.writeDeletes(file)
+ .createWriterFunc(GenericParquetWriter::buildWriter)
+ .overwrite()
+ .rowSchema(iceDeleteSchema)
+ .withSpec(PartitionSpec.unpartitioned())
+ .equalityFieldIds(1)
+ .buildEqualityWriter();
+ if (partitionValues.length != 0) {
+ deleteWriter =
+ Parquet.writeDeletes(file)
+ .createWriterFunc(GenericParquetWriter::buildWriter)
+ .overwrite()
+ .rowSchema(iceDeleteSchema)
+ .withSpec(icePartitionSpec)
+ .withPartition(
+ partitionKey(
+ icePartitionSpec,
+ icebergTable,
+ partitionValues[0],
+ partitionValues[1]))
+ .equalityFieldIds(1)
+ .buildEqualityWriter();
+ }
+
+ try (EqualityDeleteWriter closableWriter = deleteWriter) {
+ closableWriter.write(deleteRecords);
+ }
+
+ DeleteFile deleteFile = deleteWriter.toDeleteFile();
+ icebergTable.newRowDelta().addDeletes(deleteFile).commit();
+ }
+
+ private PartitionKey partitionKey(
+ PartitionSpec spec, Table icergTable, String... partitionValues) {
+ Record record =
+ GenericRecord.create(icergTable.schema())
+ .copy(ImmutableMap.of("dt", partitionValues[0], "hh", partitionValues[1]));
+
+ PartitionKey partitionKey = new PartitionKey(spec, icergTable.schema());
+ partitionKey.partition(record);
+
+ return partitionKey;
+ }
+
+ private List getPaimonResult(FileStoreTable paimonTable) throws Exception {
+ List splits = paimonTable.newReadBuilder().newScan().plan().splits();
+ TableRead read = paimonTable.newReadBuilder().newRead();
+ try (RecordReader recordReader = read.createReader(splits)) {
+ List result = new ArrayList<>();
+ recordReader.forEachRemaining(
+ row ->
+ result.add(
+ DataFormatTestUtil.toStringNoRowKind(
+ row, paimonTable.rowType())));
+ return result;
+ }
+ }
+}