diff --git a/paimon-core/pom.xml b/paimon-core/pom.xml
index e137d57a6db1..697079d5af21 100644
--- a/paimon-core/pom.xml
+++ b/paimon-core/pom.xml
@@ -203,14 +203,12 @@ under the License.
org.apache.iceberg
iceberg-core
${iceberg.version}
- test
org.apache.iceberg
iceberg-data
${iceberg.version}
- test
@@ -227,6 +225,20 @@ under the License.
test
+
+ org.apache.iceberg
+ iceberg-parquet
+ ${iceberg.version}
+ test
+
+
+
+ org.apache.iceberg
+ iceberg-orc
+ ${iceberg.version}
+ test
+
+
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergMigrator.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergMigrator.java
new file mode 100644
index 000000000000..54b74c3edcc4
--- /dev/null
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/IcebergMigrator.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.iceberg.manifest.IcebergDataFileMeta;
+import org.apache.paimon.iceberg.manifest.IcebergManifestEntry;
+import org.apache.paimon.iceberg.manifest.IcebergManifestFile;
+import org.apache.paimon.iceberg.manifest.IcebergManifestFileMeta;
+import org.apache.paimon.iceberg.manifest.IcebergManifestList;
+import org.apache.paimon.iceberg.metadata.IcebergDataField;
+import org.apache.paimon.iceberg.metadata.IcebergMetadata;
+import org.apache.paimon.iceberg.metadata.IcebergPartitionField;
+import org.apache.paimon.iceberg.metadata.IcebergPartitionSpec;
+import org.apache.paimon.iceberg.metadata.IcebergSchema;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.migrate.FileMetaUtils;
+import org.apache.paimon.migrate.Migrator;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.types.DataField;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
+
+/** migrate hive table to paimon table. */
+public class IcebergMigrator implements Migrator {
+ private static final Logger LOG = LoggerFactory.getLogger(IcebergMigrator.class);
+ private static final String VERSION_HINT_FILENAME = "version-hint.text";
+
+ private final ThreadPoolExecutor executor;
+
+ private final Catalog paimonCatalog;
+ private final FileIO paimonFileIO;
+ private final String paimonDatabaseName;
+ private final String paimonTableNameame;
+
+ private final int icebergNewestSnapshotId;
+ // Path factory for iceberg metadata
+ private final IcebergPathFactory icebergPathFactory;
+ // metadata for newest iceberg snapshot
+ private final IcebergMetadata icebergMetadata;
+
+ private boolean ignoreDelete = false;
+
+ public IcebergMigrator(
+ Catalog paimonCatalog,
+ Path icebergMetaPath,
+ String paimonDatabaseName,
+ String paimonTableNameame,
+ boolean ignoreDelete,
+ Integer parallelism) {
+ this.paimonCatalog = paimonCatalog;
+ this.paimonFileIO = paimonCatalog.fileIO();
+ this.paimonDatabaseName = paimonDatabaseName;
+ this.paimonTableNameame = paimonTableNameame;
+
+ this.icebergPathFactory = new IcebergPathFactory(icebergMetaPath);
+ this.icebergNewestSnapshotId = getIcebergNewestSnapshotId();
+ this.icebergMetadata =
+ IcebergMetadata.fromPath(
+ paimonFileIO, icebergPathFactory.toMetadataPath(icebergNewestSnapshotId));
+ this.ignoreDelete = ignoreDelete;
+
+ this.executor = createCachedThreadPool(parallelism, "ICEBERG_MIGRATOR");
+ }
+
+ @Override
+ public void executeMigrate() throws Exception {
+ Schema paimonSchema = icebergSchemaToPaimonSchema(icebergMetadata);
+ Identifier paimonIdentifier = Identifier.create(paimonDatabaseName, paimonTableNameame);
+
+ paimonCatalog.createDatabase(paimonDatabaseName, false);
+ paimonCatalog.createTable(paimonIdentifier, paimonSchema, false);
+
+ try {
+ FileStoreTable paimonTable = (FileStoreTable) paimonCatalog.getTable(paimonIdentifier);
+
+ IcebergManifestFile manifestFile =
+ IcebergManifestFile.create(paimonTable, icebergPathFactory);
+ IcebergManifestList manifestList =
+ IcebergManifestList.create(paimonTable, icebergPathFactory);
+
+ List icebergManifestFileMetas =
+ manifestList.read(icebergMetadata.currentSnapshot().manifestList());
+
+ // check manifest file with 'DELETE' kind
+ icebergManifestFileMetas = checkAndFilterManifestFiles(icebergManifestFileMetas);
+
+ // get all live iceberg entries
+ List icebergEntries =
+ icebergManifestFileMetas.stream()
+ .flatMap(fileMeta -> manifestFile.read(fileMeta).stream())
+ .filter(IcebergManifestEntry::isLive)
+ .collect(Collectors.toList());
+ if (icebergEntries.isEmpty()) {
+ LOG.info(
+ "No live manifest entry in iceberg table for snapshot {}, iceberg table meta path is {}.",
+ icebergNewestSnapshotId,
+ icebergPathFactory.toMetadataPath(icebergNewestSnapshotId));
+ return;
+ }
+
+ List icebergDataFileMetas =
+ icebergEntries.stream()
+ .map(IcebergManifestEntry::file)
+ .collect(Collectors.toList());
+
+ // Again, check if the file is a Delete File
+ icebergDataFileMetas = checkAndFilterDataFiles(icebergDataFileMetas);
+
+ LOG.info(
+ "Begin to create Migrate Task, the number of iceberg data files is {}",
+ icebergDataFileMetas.size());
+
+ List tasks = new ArrayList<>();
+ Map rollback = new ConcurrentHashMap<>();
+ if (paimonTable.partitionKeys().isEmpty()) {
+ tasks.add(importUnPartitionedTable(icebergDataFileMetas, paimonTable, rollback));
+ } else {
+ tasks.addAll(importPartitionedTable(icebergDataFileMetas, paimonTable, rollback));
+ }
+
+ List> futures =
+ tasks.stream().map(executor::submit).collect(Collectors.toList());
+ List commitMessages = new ArrayList<>();
+ try {
+ for (Future future : futures) {
+ commitMessages.add(future.get());
+ }
+ } catch (Exception e) {
+ futures.forEach(f -> f.cancel(true));
+ for (Future> future : futures) {
+ // wait all task cancelled or finished
+ while (!future.isDone()) {
+ //noinspection BusyWait
+ Thread.sleep(100);
+ }
+ }
+ // roll back all renamed path
+ for (Map.Entry entry : rollback.entrySet()) {
+ Path newPath = entry.getKey();
+ Path origin = entry.getValue();
+ if (paimonFileIO.exists(newPath)) {
+ paimonFileIO.rename(newPath, origin);
+ }
+ }
+
+ throw new RuntimeException("Migrating failed because exception happens", e);
+ }
+ try (BatchTableCommit commit = paimonTable.newBatchWriteBuilder().newCommit()) {
+ commit.commit(new ArrayList<>(commitMessages));
+ LOG.info("paimon commit success! Iceberg data files has been migrated to paimon.");
+ }
+ } catch (Exception e) {
+ paimonCatalog.dropTable(paimonIdentifier, true);
+ throw new RuntimeException("Migrating failed", e);
+ }
+ }
+
+ @Override
+ public void deleteOriginTable(boolean delete) throws Exception {}
+
+ @Override
+ public void renameTable(boolean ignoreIfNotExists) throws Exception {
+ LOG.info("Last step: rename.");
+ LOG.info("Iceberg migrator do not rename table now.");
+ }
+
+ public int getIcebergNewestSnapshotId() {
+ Path versionHintPath =
+ new Path(icebergPathFactory.metadataDirectory(), VERSION_HINT_FILENAME);
+ try {
+ return Integer.parseInt(paimonFileIO.readFileUtf8(versionHintPath));
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "read iceberg version-hint.text failed. Iceberg metadata path: "
+ + icebergPathFactory.metadataDirectory());
+ }
+ }
+
+ public Schema icebergSchemaToPaimonSchema(IcebergMetadata icebergMetadata) {
+ // get iceberg current schema
+ IcebergSchema icebergSchema =
+ icebergMetadata.schemas().get(icebergMetadata.currentSchemaId());
+
+ // get iceberg current partition spec
+ int currentPartitionSpecId = icebergMetadata.defaultSpecId();
+ IcebergPartitionSpec currentIcebergPartitionSpec =
+ icebergMetadata.partitionSpecs().get(currentPartitionSpecId);
+
+ List dataFields =
+ icebergSchema.fields().stream()
+ .map(IcebergDataField::toDatafield)
+ .collect(Collectors.toList());
+
+ List partitionKeys =
+ currentIcebergPartitionSpec.fields().stream()
+ .map(IcebergPartitionField::name)
+ .collect(Collectors.toList());
+
+ return new Schema(
+ dataFields, partitionKeys, Collections.emptyList(), Collections.emptyMap(), null);
+ }
+
+ private List checkAndFilterManifestFiles(
+ List icebergManifestFileMetas) {
+ if (!ignoreDelete) {
+ for (IcebergManifestFileMeta meta : icebergManifestFileMetas) {
+ if (meta.content() == IcebergManifestFileMeta.Content.DELETES) {
+ throw new RuntimeException(
+ "IcebergMigrator don't support analyzing manifest file with 'DELETE' content. "
+ + "You can set 'ignore-delete' to ignore manifest file with 'DELETE' content.");
+ }
+ }
+ return icebergManifestFileMetas;
+ } else {
+ return icebergManifestFileMetas.stream()
+ .filter(meta -> meta.content() != IcebergManifestFileMeta.Content.DELETES)
+ .collect(Collectors.toList());
+ }
+ }
+
+ private List checkAndFilterDataFiles(
+ List icebergDataFileMetas) {
+ if (!ignoreDelete) {
+ for (IcebergDataFileMeta meta : icebergDataFileMetas) {
+ if (meta.content() != IcebergDataFileMeta.Content.DATA) {
+ throw new RuntimeException(
+ "IcebergMigrator don't support analyzing iceberg delete file. "
+ + "You can set 'ignore-delete' to ignore iceberg delete files.");
+ }
+ }
+ return icebergDataFileMetas;
+ } else {
+ return icebergDataFileMetas.stream()
+ .filter(meta -> meta.content() == IcebergDataFileMeta.Content.DATA)
+ .collect(Collectors.toList());
+ }
+ }
+
+ private MigrateTask importUnPartitionedTable(
+ List icebergDataFileMetas,
+ FileStoreTable paimonTable,
+ Map rollback) {
+ BinaryRow partitionRow = BinaryRow.EMPTY_ROW;
+ Path newDir = paimonTable.store().pathFactory().bucketPath(partitionRow, 0);
+
+ return new MigrateTask(
+ icebergDataFileMetas, paimonFileIO, paimonTable, partitionRow, newDir, rollback);
+ }
+
+ private List importPartitionedTable(
+ List icebergDataFileMetas,
+ FileStoreTable paimonTable,
+ Map rollback) {
+ Map> dataInPartition =
+ icebergDataFileMetas.stream()
+ .collect(Collectors.groupingBy(IcebergDataFileMeta::partition));
+ List migrateTasks = new ArrayList<>();
+ for (Map.Entry> entry : dataInPartition.entrySet()) {
+ BinaryRow partitionRow = entry.getKey();
+ Path newDir = paimonTable.store().pathFactory().bucketPath(partitionRow, 0);
+ migrateTasks.add(
+ new MigrateTask(
+ entry.getValue(),
+ paimonFileIO,
+ paimonTable,
+ partitionRow,
+ newDir,
+ rollback));
+ }
+ return migrateTasks;
+ }
+
+ /** One import task for one partition. */
+ public static class MigrateTask implements Callable {
+
+ private final List icebergDataFileMetas;
+ private final FileIO fileIO;
+ private final FileStoreTable paimonTable;
+ private final BinaryRow partitionRow;
+ private final Path newDir;
+ private final Map rollback;
+
+ public MigrateTask(
+ List icebergDataFileMetas,
+ FileIO fileIO,
+ FileStoreTable paimonTable,
+ BinaryRow partitionRow,
+ Path newDir,
+ Map rollback) {
+ this.icebergDataFileMetas = icebergDataFileMetas;
+ this.fileIO = fileIO;
+ this.paimonTable = paimonTable;
+ this.partitionRow = partitionRow;
+ this.newDir = newDir;
+ this.rollback = rollback;
+ }
+
+ @Override
+ public CommitMessage call() throws Exception {
+ if (!fileIO.exists(newDir)) {
+ fileIO.mkdirs(newDir);
+ }
+ List fileMetas =
+ FileMetaUtils.construct(
+ icebergDataFileMetas, fileIO, paimonTable, newDir, rollback);
+ return FileMetaUtils.commitFile(partitionRow, fileMetas);
+ }
+ }
+}
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntrySerializer.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntrySerializer.java
index d93456c3fe20..cf55f9749228 100644
--- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntrySerializer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestEntrySerializer.java
@@ -54,4 +54,32 @@ public IcebergManifestEntry fromRow(InternalRow row) {
row.getLong(3),
fileSerializer.fromRow(row.getRow(4, fileSerializer.numFields())));
}
+
+ public IcebergManifestEntry fromRow(InternalRow row, IcebergManifestFileMeta meta) {
+ IcebergManifestEntry.Status status = IcebergManifestEntry.Status.fromId(row.getInt(0));
+ long snapshotId = !row.isNullAt(1) ? row.getLong(1) : meta.addedSnapshotId();
+ long sequenceNumber = getOrInherit(row, meta, 2, status);
+ long fileSequenceNumber = getOrInherit(row, meta, 3, status);
+
+ return new IcebergManifestEntry(
+ status,
+ snapshotId,
+ sequenceNumber,
+ fileSequenceNumber,
+ fileSerializer.fromRow(row.getRow(4, fileSerializer.numFields())));
+ }
+
+ private long getOrInherit(
+ InternalRow row,
+ IcebergManifestFileMeta meta,
+ int pos,
+ IcebergManifestEntry.Status status) {
+ long sequenceNumber = meta.sequenceNumber();
+ if (row.isNullAt(pos)
+ && (sequenceNumber == 0 || status == IcebergManifestEntry.Status.ADDED)) {
+ return sequenceNumber;
+ } else {
+ return row.getLong(pos);
+ }
+ }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
index 5955da6220f8..4553a1c850a0 100644
--- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergManifestFile.java
@@ -19,6 +19,7 @@
package org.apache.paimon.iceberg.manifest;
import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.FormatWriterFactory;
@@ -38,9 +39,14 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.FileUtils;
+import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.ObjectsFile;
import org.apache.paimon.utils.PathFactory;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
@@ -109,6 +115,50 @@ public static IcebergManifestFile create(FileStoreTable table, IcebergPathFactor
table.coreOptions().manifestTargetSize());
}
+ public List read(IcebergManifestFileMeta meta) {
+ return read(meta, null);
+ }
+
+ public List read(IcebergManifestFileMeta meta, @Nullable Long fileSize) {
+ String fileName = new Path(meta.manifestPath()).getName();
+ try {
+ Path path = pathFactory.toPath(fileName);
+
+ return readFromIterator(
+ meta,
+ createIterator(path, fileSize),
+ (IcebergManifestEntrySerializer) serializer,
+ Filter.alwaysTrue());
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to read " + fileName, e);
+ }
+ }
+
+ private CloseableIterator createIterator(Path file, @Nullable Long fileSize)
+ throws IOException {
+ return FileUtils.createFormatReader(fileIO, readerFactory, file, fileSize)
+ .toCloseableIterator();
+ }
+
+ private static List readFromIterator(
+ IcebergManifestFileMeta meta,
+ CloseableIterator inputIterator,
+ IcebergManifestEntrySerializer serializer,
+ Filter readFilter) {
+ try (CloseableIterator iterator = inputIterator) {
+ List result = new ArrayList<>();
+ while (iterator.hasNext()) {
+ InternalRow row = iterator.next();
+ if (readFilter.test(row)) {
+ result.add(serializer.fromRow(row, meta));
+ }
+ }
+ return result;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
public List rollingWrite(
Iterator entries, long sequenceNumber) {
RollingFileWriter writer =
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java
index 4ecc77a13581..c5588b1d3769 100644
--- a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java
@@ -20,13 +20,22 @@
import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BinaryType;
+import org.apache.paimon.types.BooleanType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DateType;
import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -190,6 +199,70 @@ private static Object toTypeObject(DataType dataType, int fieldId, int depth) {
}
}
+ public DataType getDataType() {
+ String simpleType = type.toString();
+ String delimiter = "(";
+ if (simpleType.contains("[")) {
+ delimiter = "[";
+ }
+ String typePrefix =
+ !simpleType.contains(delimiter)
+ ? simpleType
+ : simpleType.substring(0, simpleType.indexOf(delimiter));
+ switch (typePrefix) {
+ case "boolean":
+ return new BooleanType(!required);
+ case "int":
+ return new IntType(!required);
+ case "long":
+ return new BigIntType(!required);
+ case "float":
+ return new FloatType(!required);
+ case "double":
+ return new DoubleType(!required);
+ case "date":
+ return new DateType(!required);
+ case "string":
+ return new VarCharType(!required, VarCharType.MAX_LENGTH);
+ case "binary":
+ return new VarBinaryType(!required, VarBinaryType.MAX_LENGTH);
+ case "fixed":
+ int fixedLength =
+ Integer.parseInt(
+ simpleType.substring(
+ simpleType.indexOf("[") + 1, simpleType.indexOf("]")));
+ return new BinaryType(!required, fixedLength);
+ case "uuid":
+ // https://iceberg.apache.org/spec/?h=vector#primitive-types
+ // uuid should use 16-byte fixed
+ return new BinaryType(!required, 16);
+ case "decimal":
+ int precision =
+ Integer.parseInt(
+ simpleType.substring(
+ simpleType.indexOf("(") + 1, simpleType.indexOf(",")));
+ int scale =
+ Integer.parseInt(
+ simpleType.substring(
+ simpleType.indexOf(",") + 2, simpleType.indexOf(")")));
+ return new DecimalType(!required, precision, scale);
+ case "timestamp":
+ return new TimestampType(!required, 6);
+ case "timestamptz":
+ return new LocalZonedTimestampType(!required, 6);
+ case "timestamp_ns": // iceberg v3 format
+ return new TimestampType(!required, 9);
+ case "timestamptz_ns": // iceberg v3 format
+ return new LocalZonedTimestampType(!required, 9);
+ default:
+ throw new UnsupportedOperationException("Unsupported data type: " + type);
+ }
+ }
+
+ public DataField toDatafield() {
+ return new DataField(id, name, getDataType(), doc);
+ }
+
@Override
public int hashCode() {
return Objects.hash(id, name, required, type, doc);
diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
index 86fb4a5df75a..fbaf8060022f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
+++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergMetadata.java
@@ -106,7 +106,7 @@ public class IcebergMetadata {
private final List snapshots;
@JsonProperty(FIELD_CURRENT_SNAPSHOT_ID)
- private final int currentSnapshotId;
+ private final long currentSnapshotId;
@JsonProperty(FIELD_PROPERTIES)
@Nullable
@@ -122,7 +122,7 @@ public IcebergMetadata(
List partitionSpecs,
int lastPartitionId,
List snapshots,
- int currentSnapshotId) {
+ long currentSnapshotId) {
this(
CURRENT_FORMAT_VERSION,
tableUuid,
@@ -158,7 +158,7 @@ public IcebergMetadata(
@JsonProperty(FIELD_SORT_ORDERS) List sortOrders,
@JsonProperty(FIELD_DEFAULT_SORT_ORDER_ID) int defaultSortOrderId,
@JsonProperty(FIELD_SNAPSHOTS) List snapshots,
- @JsonProperty(FIELD_CURRENT_SNAPSHOT_ID) int currentSnapshotId,
+ @JsonProperty(FIELD_CURRENT_SNAPSHOT_ID) long currentSnapshotId,
@JsonProperty(FIELD_PROPERTIES) @Nullable Map properties) {
this.formatVersion = formatVersion;
this.tableUuid = tableUuid;
@@ -249,7 +249,7 @@ public List snapshots() {
}
@JsonGetter(FIELD_CURRENT_SNAPSHOT_ID)
- public int currentSnapshotId() {
+ public long currentSnapshotId() {
return currentSnapshotId;
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
index 391c5f9bb615..dc87bd1fc4b3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java
@@ -27,6 +27,7 @@
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.iceberg.manifest.IcebergDataFileMeta;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataIncrement;
@@ -60,6 +61,20 @@ public class FileMetaUtils {
private static final Logger LOG = LoggerFactory.getLogger(FileMetaUtils.class);
+ public static List construct(
+ List icebergDataFileMetas,
+ FileIO fileIO,
+ Table paimonTable,
+ Path newDir,
+ Map rollback) {
+ return icebergDataFileMetas.stream()
+ .map(
+ icebergDataFileMeta ->
+ constructFileMeta(
+ icebergDataFileMeta, fileIO, paimonTable, newDir, rollback))
+ .collect(Collectors.toList());
+ }
+
public static List construct(
FileIO fileIO,
String format,
@@ -93,6 +108,22 @@ public static CommitMessage commitFile(BinaryRow partition, List d
}
// -----------------------------private method---------------------------------------------
+ private static DataFileMeta constructFileMeta(
+ IcebergDataFileMeta icebergDataFileMeta,
+ FileIO fileIO,
+ Table table,
+ Path dir,
+ Map rollback) {
+ FileStatus status;
+ try {
+ status = fileIO.getFileStatus(new Path(icebergDataFileMeta.filePath()));
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "error when get file status. file path is " + icebergDataFileMeta.filePath());
+ }
+ String format = icebergDataFileMeta.fileFormat();
+ return constructFileMeta(format, status, fileIO, table, dir, rollback);
+ }
private static DataFileMeta constructFileMeta(
String format,
diff --git a/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergMigrateTest.java b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergMigrateTest.java
new file mode 100644
index 000000000000..898484c765a2
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/iceberg/IcebergMigrateTest.java
@@ -0,0 +1,576 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.iceberg;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.DataFormatTestUtil;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.PartitionKey;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.orc.GenericOrcWriter;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.deletes.EqualityDeleteWriter;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link IcebergMigrator}. */
+public class IcebergMigrateTest {
+ @TempDir java.nio.file.Path iceTempDir;
+ @TempDir java.nio.file.Path paiTempDir;
+
+ String iceDatabase = "ice_db";
+ String iceTable = "ice_t";
+
+ String paiDatabase = "pai_db";
+ String paiTable = "pai_t";
+
+ Schema iceSchema =
+ new Schema(
+ Types.NestedField.required(1, "k", Types.IntegerType.get()),
+ Types.NestedField.required(2, "v", Types.IntegerType.get()),
+ Types.NestedField.required(3, "dt", Types.StringType.get()),
+ Types.NestedField.required(4, "hh", Types.StringType.get()));
+ Schema iceDeleteSchema =
+ new Schema(
+ Types.NestedField.required(1, "k", Types.IntegerType.get()),
+ Types.NestedField.optional(2, "v", Types.IntegerType.get()));
+
+ PartitionSpec icePartitionSpec =
+ PartitionSpec.builderFor(iceSchema).identity("dt").identity("hh").build();
+
+ @ParameterizedTest(name = "isPartitioned = {0}")
+ @ValueSource(booleans = {true, false})
+ public void testMigrateOnlyAdd(boolean isPartitioned) throws Exception {
+ Table icebergTable = createIcebergTable(isPartitioned);
+ String format = "parquet";
+ List records1 =
+ Stream.of(
+ toIcebergRecord(1, 1, "20240101", "00"),
+ toIcebergRecord(2, 2, "20240101", "00"))
+ .collect(Collectors.toList());
+ if (isPartitioned) {
+ writeRecordsToIceberg(icebergTable, format, records1, "20240101", "00");
+ } else {
+ writeRecordsToIceberg(icebergTable, format, records1);
+ }
+
+ List records2 =
+ Stream.of(
+ toIcebergRecord(1, 1, "20240101", "01"),
+ toIcebergRecord(2, 2, "20240101", "01"))
+ .collect(Collectors.toList());
+ if (isPartitioned) {
+ writeRecordsToIceberg(icebergTable, format, records2, "20240101", "01");
+ } else {
+ writeRecordsToIceberg(icebergTable, format, records2);
+ }
+
+ CatalogContext context = CatalogContext.create(new Path(paiTempDir.toString()));
+ context.options().set(CACHE_ENABLED, false);
+ Catalog catalog = CatalogFactory.createCatalog(context);
+ IcebergMigrator icebergMigrator =
+ new IcebergMigrator(
+ catalog,
+ new Path(icebergTable.location(), "metadata"),
+ paiDatabase,
+ paiTable,
+ false,
+ 1);
+ icebergMigrator.executeMigrate();
+
+ FileStoreTable paimonTable =
+ (FileStoreTable) catalog.getTable(Identifier.create(paiDatabase, paiTable));
+ List paiResults = getPaimonResult(paimonTable);
+ assertThat(
+ paiResults.stream()
+ .map(row -> String.format("Record(%s)", row))
+ .collect(Collectors.toList()))
+ .hasSameElementsAs(
+ Stream.concat(records1.stream(), records2.stream())
+ .map(GenericRecord::toString)
+ .collect(Collectors.toList()));
+ }
+
+ @ParameterizedTest(name = "isPartitioned = {0}")
+ @ValueSource(booleans = {true, false})
+ public void testMigrateAddAndDelete(boolean isPartitioned) throws Exception {
+ Table icebergTable = createIcebergTable(isPartitioned);
+ String format = "parquet";
+ List records1 =
+ Stream.of(
+ toIcebergRecord(1, 1, "20240101", "00"),
+ toIcebergRecord(2, 2, "20240101", "00"))
+ .collect(Collectors.toList());
+ if (isPartitioned) {
+ writeRecordsToIceberg(icebergTable, format, records1, "20240101", "00");
+ } else {
+ writeRecordsToIceberg(icebergTable, format, records1);
+ }
+
+ List records2 =
+ Stream.of(
+ toIcebergRecord(1, 1, "20240101", "01"),
+ toIcebergRecord(2, 2, "20240101", "01"))
+ .collect(Collectors.toList());
+ if (isPartitioned) {
+ writeRecordsToIceberg(icebergTable, format, records2, "20240101", "01");
+ } else {
+ writeRecordsToIceberg(icebergTable, format, records2);
+ }
+
+ // the file written with records2 will be deleted and generate a delete manifest entry, not
+ // a delete file
+ icebergTable.newDelete().deleteFromRowFilter(Expressions.equal("hh", "00")).commit();
+
+ CatalogContext context = CatalogContext.create(new Path(paiTempDir.toString()));
+ context.options().set(CACHE_ENABLED, false);
+ Catalog catalog = CatalogFactory.createCatalog(context);
+ IcebergMigrator icebergMigrator =
+ new IcebergMigrator(
+ catalog,
+ new Path(icebergTable.location(), "metadata"),
+ paiDatabase,
+ paiTable,
+ false,
+ 1);
+ icebergMigrator.executeMigrate();
+
+ FileStoreTable paimonTable =
+ (FileStoreTable) catalog.getTable(Identifier.create(paiDatabase, paiTable));
+ List paiResults = getPaimonResult(paimonTable);
+ assertThat(
+ paiResults.stream()
+ .map(row -> String.format("Record(%s)", row))
+ .collect(Collectors.toList()))
+ .hasSameElementsAs(
+ records2.stream()
+ .map(GenericRecord::toString)
+ .collect(Collectors.toList()));
+ }
+
+ @ParameterizedTest
+ @CsvSource({"true, true", "true, false", "false, true", "false, false"})
+ public void testMigrateWithDeleteFile(boolean isPartitioned, boolean ignoreDelete)
+ throws Exception {
+ // only support create delete file with parquet format
+ Table icebergTable = createIcebergTable(isPartitioned);
+ String format = "parquet";
+ List records1 =
+ Stream.of(
+ toIcebergRecord(1, 1, "20240101", "00"),
+ toIcebergRecord(2, 2, "20240101", "00"))
+ .collect(Collectors.toList());
+ List deleteRecords1 =
+ Collections.singletonList(toIcebergRecord(1, 1, iceDeleteSchema));
+
+ if (isPartitioned) {
+ writeRecordsToIceberg(icebergTable, format, records1, "20240101", "00");
+ writeEqualityDeleteFile(icebergTable, deleteRecords1, "20240101", "00");
+ } else {
+ writeRecordsToIceberg(icebergTable, format, records1);
+ writeEqualityDeleteFile(icebergTable, deleteRecords1);
+ }
+
+ List records2 =
+ Stream.of(
+ toIcebergRecord(1, 1, "20240101", "01"),
+ toIcebergRecord(2, 2, "20240101", "01"))
+ .collect(Collectors.toList());
+ if (isPartitioned) {
+ writeRecordsToIceberg(icebergTable, format, records2, "20240101", "01");
+ } else {
+ writeRecordsToIceberg(icebergTable, format, records2);
+ }
+
+ CatalogContext context = CatalogContext.create(new Path(paiTempDir.toString()));
+ context.options().set(CACHE_ENABLED, false);
+ Catalog catalog = CatalogFactory.createCatalog(context);
+ IcebergMigrator icebergMigrator =
+ new IcebergMigrator(
+ catalog,
+ new Path(icebergTable.location(), "metadata"),
+ paiDatabase,
+ paiTable,
+ ignoreDelete,
+ 1);
+ if (!ignoreDelete) {
+ assertThatThrownBy(icebergMigrator::executeMigrate)
+ .rootCause()
+ .isInstanceOf(RuntimeException.class)
+ .hasMessage(
+ "IcebergMigrator don't support analyzing manifest file with 'DELETE' content. "
+ + "You can set 'ignore-delete' to ignore manifest file with 'DELETE' content.");
+ return;
+ } else {
+ icebergMigrator.executeMigrate();
+ }
+
+ FileStoreTable paimonTable =
+ (FileStoreTable) catalog.getTable(Identifier.create(paiDatabase, paiTable));
+ List paiResults = getPaimonResult(paimonTable);
+ assertThat(
+ paiResults.stream()
+ .map(row -> String.format("Record(%s)", row))
+ .collect(Collectors.toList()))
+ .hasSameElementsAs(
+ Stream.concat(records1.stream(), records2.stream())
+ .map(GenericRecord::toString)
+ .collect(Collectors.toList()));
+ }
+
+ @ParameterizedTest(name = "isPartitioned = {0}")
+ @ValueSource(booleans = {true, false})
+ public void testMigrateWithSchemaEvolution(boolean isPartitioned) throws Exception {
+ Table icebergTable = createIcebergTable(isPartitioned);
+ String format = "parquet";
+
+ // write base data
+ List records1 =
+ Stream.of(
+ toIcebergRecord(1, 1, "20240101", "00"),
+ toIcebergRecord(2, 2, "20240101", "00"))
+ .collect(Collectors.toList());
+ if (isPartitioned) {
+ writeRecordsToIceberg(icebergTable, format, records1, "20240101", "00");
+ } else {
+ writeRecordsToIceberg(icebergTable, format, records1);
+ }
+
+ List records2 =
+ Stream.of(
+ toIcebergRecord(1, 1, "20240101", "01"),
+ toIcebergRecord(2, 2, "20240101", "01"))
+ .collect(Collectors.toList());
+ if (isPartitioned) {
+ writeRecordsToIceberg(icebergTable, format, records2, "20240101", "01");
+ } else {
+ writeRecordsToIceberg(icebergTable, format, records2);
+ }
+
+ // TODO: currently only support schema evolution of deleting columns
+ testDeleteColumn(icebergTable, format, isPartitioned);
+ }
+
+ private void testDeleteColumn(Table icebergTable, String format, boolean isPartitioned)
+ throws Exception {
+ icebergTable.updateSchema().deleteColumn("v").commit();
+ Schema newIceSchema = icebergTable.schema();
+ List addedRecords =
+ Stream.of(
+ toIcebergRecord(3, "20240101", "00", newIceSchema),
+ toIcebergRecord(4, "20240101", "00", newIceSchema))
+ .collect(Collectors.toList());
+ if (isPartitioned) {
+ writeRecordsToIceberg(icebergTable, format, addedRecords, "20240101", "00");
+ } else {
+ writeRecordsToIceberg(icebergTable, format, addedRecords);
+ }
+
+ CatalogContext context = CatalogContext.create(new Path(paiTempDir.toString()));
+ context.options().set(CACHE_ENABLED, false);
+ Catalog catalog = CatalogFactory.createCatalog(context);
+ IcebergMigrator icebergMigrator =
+ new IcebergMigrator(
+ catalog,
+ new Path(icebergTable.location(), "metadata"),
+ paiDatabase,
+ paiTable,
+ false,
+ 1);
+ icebergMigrator.executeMigrate();
+
+ FileStoreTable paimonTable =
+ (FileStoreTable) catalog.getTable(Identifier.create(paiDatabase, paiTable));
+ List paiResults = getPaimonResult(paimonTable);
+ assertThat(
+ paiResults.stream()
+ .map(row -> String.format("Record(%s)", row))
+ .collect(Collectors.toList()))
+ .hasSameElementsAs(
+ Stream.of(
+ "Record(1, 20240101, 00)",
+ "Record(2, 20240101, 00)",
+ "Record(1, 20240101, 01)",
+ "Record(2, 20240101, 01)",
+ "Record(3, 20240101, 00)",
+ "Record(4, 20240101, 00)")
+ .collect(Collectors.toList()));
+ }
+
+ @Test
+ public void testAllDataTypes() throws Exception {
+ Schema iceAllTypesSchema =
+ new Schema(
+ Types.NestedField.required(1, "c1", Types.BooleanType.get()),
+ Types.NestedField.required(2, "c2", Types.IntegerType.get()),
+ Types.NestedField.required(3, "c3", Types.LongType.get()),
+ Types.NestedField.required(4, "c4", Types.FloatType.get()),
+ Types.NestedField.required(5, "c5", Types.DoubleType.get()),
+ Types.NestedField.required(6, "c6", Types.DateType.get()),
+ Types.NestedField.required(7, "c7", Types.StringType.get()),
+ Types.NestedField.required(8, "c9", Types.BinaryType.get()),
+ Types.NestedField.required(9, "c11", Types.DecimalType.of(10, 2)),
+ Types.NestedField.required(10, "c13", Types.TimestampType.withoutZone()),
+ Types.NestedField.required(11, "c14", Types.TimestampType.withZone()));
+ Table icebergTable = createIcebergTable(false, iceAllTypesSchema);
+ String format = "parquet";
+ GenericRecord record =
+ toIcebergRecord(
+ iceAllTypesSchema,
+ true,
+ 1,
+ 1L,
+ 1.0F,
+ 1.0D,
+ LocalDate.of(2023, 10, 18),
+ "test",
+ ByteBuffer.wrap(new byte[] {1, 2, 3}),
+ new BigDecimal("122.50"),
+ LocalDateTime.now(),
+ OffsetDateTime.now());
+
+ writeRecordsToIceberg(icebergTable, format, Collections.singletonList(record));
+
+ CatalogContext context = CatalogContext.create(new Path(paiTempDir.toString()));
+ context.options().set(CACHE_ENABLED, false);
+ Catalog catalog = CatalogFactory.createCatalog(context);
+ IcebergMigrator icebergMigrator =
+ new IcebergMigrator(
+ catalog,
+ new Path(icebergTable.location(), "metadata"),
+ paiDatabase,
+ paiTable,
+ false,
+ 1);
+ icebergMigrator.executeMigrate();
+
+ FileStoreTable paimonTable =
+ (FileStoreTable) catalog.getTable(Identifier.create(paiDatabase, paiTable));
+ List paiResults = getPaimonResult(paimonTable);
+ assertThat(paiResults.size()).isEqualTo(1);
+ }
+
+ private Table createIcebergTable(boolean isPartitioned) {
+ return createIcebergTable(isPartitioned, iceSchema);
+ }
+
+ private Table createIcebergTable(boolean isPartitioned, Schema icebergSchema) {
+ HadoopCatalog catalog = new HadoopCatalog(new Configuration(), iceTempDir.toString());
+ TableIdentifier icebergIdentifier = TableIdentifier.of(iceDatabase, iceTable);
+
+ if (!isPartitioned) {
+ return catalog.buildTable(icebergIdentifier, icebergSchema)
+ .withPartitionSpec(PartitionSpec.unpartitioned())
+ .create();
+ } else {
+ return catalog.buildTable(icebergIdentifier, icebergSchema)
+ .withPartitionSpec(icePartitionSpec)
+ .create();
+ }
+ }
+
+ private GenericRecord toIcebergRecord(Schema icebergSchema, Object... values) {
+ GenericRecord record = GenericRecord.create(icebergSchema);
+ for (int i = 0; i < values.length; i++) {
+ record.set(i, values[i]);
+ }
+ return record;
+ }
+
+ private GenericRecord toIcebergRecord(Object... values) {
+ return toIcebergRecord(iceSchema, values);
+ }
+
+ private DataWriter createIcebergDataWriter(
+ Table icebergTable, String format, OutputFile file, String... partitionValues)
+ throws IOException {
+ Schema schema = icebergTable.schema();
+ PartitionSpec partitionSpec = icebergTable.spec();
+ PartitionKey partitionKey =
+ partitionValues.length == 0
+ ? null
+ : partitionKey(
+ icePartitionSpec,
+ icebergTable,
+ partitionValues[0],
+ partitionValues[1]);
+ // TODO: currently only support "parquet" format
+ switch (format) {
+ case "parquet":
+ return Parquet.writeData(file)
+ .schema(schema)
+ .createWriterFunc(GenericParquetWriter::buildWriter)
+ .overwrite()
+ .withSpec(partitionSpec)
+ .withPartition(partitionKey)
+ .build();
+ case "avro":
+ return Avro.writeData(file)
+ .schema(schema)
+ .createWriterFunc(org.apache.iceberg.data.avro.DataWriter::create)
+ .overwrite()
+ .withSpec(partitionSpec)
+ .withPartition(partitionKey)
+ .build();
+ case "orc":
+ return ORC.writeData(file)
+ .schema(schema)
+ .createWriterFunc(GenericOrcWriter::buildWriter)
+ .overwrite()
+ .withSpec(partitionSpec)
+ .withPartition(partitionKey)
+ .build();
+ default:
+ throw new IllegalArgumentException("Unsupported format: " + format);
+ }
+ }
+
+ private void writeRecordsToIceberg(
+ Table icebergTable,
+ String format,
+ List records,
+ String... partitionValues)
+ throws IOException {
+ String filepath = icebergTable.location() + "/" + UUID.randomUUID();
+ OutputFile file = icebergTable.io().newOutputFile(filepath);
+
+ DataWriter dataWriter =
+ createIcebergDataWriter(icebergTable, format, file, partitionValues);
+ try {
+ for (GenericRecord r : records) {
+ dataWriter.write(r);
+ }
+ } finally {
+ dataWriter.close();
+ }
+ DataFile dataFile = dataWriter.toDataFile();
+ icebergTable.newAppend().appendFile(dataFile).commit();
+ }
+
+ private void writeEqualityDeleteFile(
+ Table icebergTable, List deleteRecords, String... partitionValues)
+ throws IOException {
+ String filepath = icebergTable.location() + "/" + UUID.randomUUID();
+ OutputFile file = icebergTable.io().newOutputFile(filepath);
+
+ EqualityDeleteWriter deleteWriter =
+ Parquet.writeDeletes(file)
+ .createWriterFunc(GenericParquetWriter::buildWriter)
+ .overwrite()
+ .rowSchema(iceDeleteSchema)
+ .withSpec(PartitionSpec.unpartitioned())
+ .equalityFieldIds(1)
+ .buildEqualityWriter();
+ if (partitionValues.length != 0) {
+ deleteWriter =
+ Parquet.writeDeletes(file)
+ .createWriterFunc(GenericParquetWriter::buildWriter)
+ .overwrite()
+ .rowSchema(iceDeleteSchema)
+ .withSpec(icePartitionSpec)
+ .withPartition(
+ partitionKey(
+ icePartitionSpec,
+ icebergTable,
+ partitionValues[0],
+ partitionValues[1]))
+ .equalityFieldIds(1)
+ .buildEqualityWriter();
+ }
+
+ try (EqualityDeleteWriter closableWriter = deleteWriter) {
+ closableWriter.write(deleteRecords);
+ }
+
+ DeleteFile deleteFile = deleteWriter.toDeleteFile();
+ icebergTable.newRowDelta().addDeletes(deleteFile).commit();
+ }
+
+ private PartitionKey partitionKey(
+ PartitionSpec spec, Table icergTable, String... partitionValues) {
+ Record record =
+ GenericRecord.create(icergTable.schema())
+ .copy(ImmutableMap.of("dt", partitionValues[0], "hh", partitionValues[1]));
+
+ PartitionKey partitionKey = new PartitionKey(spec, icergTable.schema());
+ partitionKey.partition(record);
+
+ return partitionKey;
+ }
+
+ private List getPaimonResult(FileStoreTable paimonTable) throws Exception {
+ List splits = paimonTable.newReadBuilder().newScan().plan().splits();
+ TableRead read = paimonTable.newReadBuilder().newRead();
+ try (RecordReader recordReader = read.createReader(splits)) {
+ List result = new ArrayList<>();
+ recordReader.forEachRemaining(
+ row ->
+ result.add(
+ DataFormatTestUtil.toStringNoRowKind(
+ row, paimonTable.rowType())));
+ return result;
+ }
+ }
+}
diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
index 1e581c38cb97..785e41383a48 100644
--- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
+++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
@@ -86,7 +86,10 @@ public void migrateHandle(
boolean deleteOrigin,
Integer parallelism)
throws Exception {
- Identifier sourceTableId = Identifier.fromString(sourceTablePath);
+ if (!connector.equals("hive")) {
+ throw new IllegalArgumentException("MigrateFile only support hive connector now.");
+ }
+
Identifier targetTableId = Identifier.fromString(targetPaimonTablePath);
try {
@@ -100,8 +103,7 @@ public void migrateHandle(
TableMigrationUtils.getImporter(
connector,
catalog,
- sourceTableId.getDatabaseName(),
- sourceTableId.getObjectName(),
+ sourceTablePath,
targetTableId.getDatabaseName(),
targetTableId.getObjectName(),
parallelism,
diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
index 196528d31c78..53be1e3c0581 100644
--- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
+++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
@@ -20,17 +20,15 @@
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.utils.TableMigrationUtils;
+import org.apache.paimon.migrate.Migrator;
import org.apache.paimon.utils.ParameterUtils;
+import org.apache.paimon.utils.Preconditions;
import org.apache.flink.table.procedure.ProcedureContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/** Migrate procedure to migrate hive table to paimon table. */
public class MigrateTableProcedure extends ProcedureBase {
- private static final Logger LOG = LoggerFactory.getLogger(MigrateTableProcedure.class);
-
private static final String PAIMON_SUFFIX = "_paimon_";
@Override
@@ -50,24 +48,39 @@ public String[] call(
String sourceTablePath,
String properties)
throws Exception {
+ return call(
+ procedureContext,
+ connector,
+ sourceTablePath,
+ properties,
+ Runtime.getRuntime().availableProcessors());
+ }
+
+ public String[] call(
+ ProcedureContext procedureContext,
+ String connector,
+ String sourceTablePath,
+ String properties,
+ Integer parallelism)
+ throws Exception {
+ Preconditions.checkArgument(connector.equals("hive"));
String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;
- Identifier sourceTableId = Identifier.fromString(sourceTablePath);
Identifier targetTableId = Identifier.fromString(targetPaimonTablePath);
- TableMigrationUtils.getImporter(
+ Migrator migrator =
+ TableMigrationUtils.getImporter(
connector,
catalog,
- sourceTableId.getDatabaseName(),
- sourceTableId.getObjectName(),
+ sourceTablePath,
targetTableId.getDatabaseName(),
targetTableId.getObjectName(),
- Runtime.getRuntime().availableProcessors(),
- ParameterUtils.parseCommaSeparatedKeyValues(properties))
- .executeMigrate();
+ parallelism,
+ ParameterUtils.parseCommaSeparatedKeyValues(properties));
+
+ migrator.executeMigrate();
- LOG.info("Last step: rename " + targetTableId + " to " + sourceTableId);
- catalog.renameTable(targetTableId, sourceTableId, false);
+ migrator.renameTable(false);
return new String[] {"Success"};
}
@@ -76,26 +89,19 @@ public String[] call(
String connector,
String sourceTablePath,
String properties,
- Integer parallelism)
+ Integer parallelism,
+ String icebrgConf)
throws Exception {
- String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;
-
- Identifier sourceTableId = Identifier.fromString(sourceTablePath);
- Identifier targetTableId = Identifier.fromString(targetPaimonTablePath);
-
- TableMigrationUtils.getImporter(
- connector,
+ Preconditions.checkArgument(connector.equals("iceberg"));
+ Migrator migrator =
+ TableMigrationUtils.getIcebergImporter(
catalog,
- sourceTableId.getDatabaseName(),
- sourceTableId.getObjectName(),
- targetTableId.getDatabaseName(),
- targetTableId.getObjectName(),
parallelism,
- ParameterUtils.parseCommaSeparatedKeyValues(properties))
- .executeMigrate();
+ ParameterUtils.parseCommaSeparatedKeyValues(properties),
+ ParameterUtils.parseCommaSeparatedKeyValues(icebrgConf));
+ migrator.executeMigrate();
- LOG.info("Last step: rename " + targetTableId + " to " + sourceTableId);
- catalog.renameTable(targetTableId, sourceTableId, false);
+ migrator.renameTable(false);
return new String[] {"Success"};
}
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileAction.java
index 798d1d347732..bd8a177c4597 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileAction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateFileAction.java
@@ -54,9 +54,9 @@ public MigrateFileAction(
@Override
public void run() throws Exception {
- MigrateFileProcedure migrateTableProcedure = new MigrateFileProcedure();
- migrateTableProcedure.withCatalog(catalog);
- migrateTableProcedure.call(
+ MigrateFileProcedure migrateFileProcedure = new MigrateFileProcedure();
+ migrateFileProcedure.withCatalog(catalog);
+ migrateFileProcedure.call(
new DefaultProcedureContext(env),
connector,
sourceTable,
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java
index 8a4efdfc710d..b140a4dd6a64 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java
@@ -31,6 +31,7 @@ public class MigrateTableAction extends ActionBase {
private final String hiveTableFullName;
private final String tableProperties;
private final Integer parallelism;
+ private final String icebergOptions;
public MigrateTableAction(
String connector,
@@ -38,12 +39,14 @@ public MigrateTableAction(
String hiveTableFullName,
Map catalogConfig,
String tableProperties,
- Integer parallelism) {
+ Integer parallelism,
+ String icebergOptions) {
super(warehouse, catalogConfig);
this.connector = connector;
this.hiveTableFullName = hiveTableFullName;
this.tableProperties = tableProperties;
this.parallelism = parallelism;
+ this.icebergOptions = icebergOptions;
}
@Override
@@ -55,6 +58,7 @@ public void run() throws Exception {
connector,
hiveTableFullName,
tableProperties,
- parallelism);
+ parallelism,
+ icebergOptions);
}
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java
index a1a93bc91163..92a8aec00730 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java
@@ -29,6 +29,7 @@ public class MigrateTableActionFactory implements ActionFactory {
private static final String SOURCE_TYPE = "source_type";
private static final String OPTIONS = "options";
private static final String PARALLELISM = "parallelism";
+ private static final String ICEBERG_OPTIONS = "iceberg_options";
@Override
public String identifier() {
@@ -43,6 +44,7 @@ public Optional create(MultipleParameterToolAdapter params) {
Map catalogConfig = optionalConfigMap(params, CATALOG_CONF);
String tableConf = params.get(OPTIONS);
Integer parallelism = Integer.parseInt(params.get(PARALLELISM));
+ String icebergOptions = params.get(ICEBERG_OPTIONS);
MigrateTableAction migrateTableAction =
new MigrateTableAction(
@@ -51,7 +53,8 @@ public Optional create(MultipleParameterToolAdapter params) {
sourceHiveTable,
catalogConfig,
tableConf,
- parallelism);
+ parallelism,
+ icebergOptions);
return Optional.of(migrateTableAction);
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
index f2f10d087406..1712f4ca8926 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateFileProcedure.java
@@ -75,7 +75,10 @@ public void migrateHandle(
boolean deleteOrigin,
Integer parallelism)
throws Exception {
- Identifier sourceTableId = Identifier.fromString(sourceTablePath);
+ if (!connector.equals("hive")) {
+ throw new IllegalArgumentException("MigrateFile only support hive connector now.");
+ }
+
Identifier targetTableId = Identifier.fromString(targetPaimonTablePath);
try {
@@ -89,8 +92,7 @@ public void migrateHandle(
TableMigrationUtils.getImporter(
connector,
catalog,
- sourceTableId.getDatabaseName(),
- sourceTableId.getObjectName(),
+ sourceTablePath,
targetTableId.getDatabaseName(),
targetTableId.getObjectName(),
parallelism,
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
index fff05a1a8555..835b568758c3 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java
@@ -20,20 +20,18 @@
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.utils.TableMigrationUtils;
+import org.apache.paimon.migrate.Migrator;
import org.apache.paimon.utils.ParameterUtils;
+import org.apache.paimon.utils.Preconditions;
import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/** Migrate procedure to migrate hive table to paimon table. */
public class MigrateTableProcedure extends ProcedureBase {
- private static final Logger LOG = LoggerFactory.getLogger(MigrateTableProcedure.class);
-
private static final String PAIMON_SUFFIX = "_paimon_";
@Override
@@ -44,42 +42,65 @@ public String identifier() {
@ProcedureHint(
argument = {
@ArgumentHint(name = "connector", type = @DataTypeHint("STRING")),
- @ArgumentHint(name = "source_table", type = @DataTypeHint("STRING")),
+ @ArgumentHint(
+ name = "source_table",
+ type = @DataTypeHint("STRING"),
+ isOptional = true),
@ArgumentHint(name = "options", type = @DataTypeHint("STRING"), isOptional = true),
@ArgumentHint(
name = "parallelism",
type = @DataTypeHint("Integer"),
- isOptional = true)
+ isOptional = true),
+ @ArgumentHint(
+ name = "iceberg_options",
+ type = @DataTypeHint("STRING"),
+ isOptional = true),
})
public String[] call(
ProcedureContext procedureContext,
String connector,
String sourceTablePath,
String properties,
- Integer parallelism)
+ Integer parallelism,
+ String icebergOptions)
throws Exception {
properties = notnull(properties);
-
- String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;
-
- Identifier sourceTableId = Identifier.fromString(sourceTablePath);
- Identifier targetTableId = Identifier.fromString(targetPaimonTablePath);
+ icebergOptions = notnull(icebergOptions);
Integer p = parallelism == null ? Runtime.getRuntime().availableProcessors() : parallelism;
- TableMigrationUtils.getImporter(
- connector,
- catalog,
- sourceTableId.getDatabaseName(),
- sourceTableId.getObjectName(),
- targetTableId.getDatabaseName(),
- targetTableId.getObjectName(),
- p,
- ParameterUtils.parseCommaSeparatedKeyValues(properties))
- .executeMigrate();
+ Migrator migrator;
+ switch (connector) {
+ case "hive":
+ Preconditions.checkArgument(
+ sourceTablePath != null, "please set 'source_table' for hive migrator");
+ String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;
+ Identifier targetTableId = Identifier.fromString(targetPaimonTablePath);
+ migrator =
+ TableMigrationUtils.getImporter(
+ connector,
+ catalog,
+ sourceTablePath,
+ targetTableId.getDatabaseName(),
+ targetTableId.getObjectName(),
+ p,
+ ParameterUtils.parseCommaSeparatedKeyValues(properties));
+ break;
+ case "iceberg":
+ migrator =
+ TableMigrationUtils.getIcebergImporter(
+ catalog,
+ p,
+ ParameterUtils.parseCommaSeparatedKeyValues(properties),
+ ParameterUtils.parseCommaSeparatedKeyValues(icebergOptions));
+ break;
+ default:
+ throw new UnsupportedOperationException("Don't support connector " + connector);
+ }
+
+ migrator.executeMigrate();
- LOG.info("Last step: rename " + targetTableId + " to " + sourceTableId);
- catalog.renameTable(targetTableId, sourceTableId, false);
+ migrator.renameTable(false);
return new String[] {"Success"};
}
}
diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
index b59c3592a97d..86c763bab7cc 100644
--- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
+++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java
@@ -20,9 +20,14 @@
import org.apache.paimon.catalog.CachingCatalog;
import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.hive.HiveCatalog;
import org.apache.paimon.hive.migrate.HiveMigrator;
+import org.apache.paimon.iceberg.IcebergMigrator;
import org.apache.paimon.migrate.Migrator;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.Preconditions;
import java.util.List;
import java.util.Map;
@@ -33,14 +38,14 @@ public class TableMigrationUtils {
public static Migrator getImporter(
String connector,
Catalog catalog,
- String sourceDatabase,
- String sourceTableName,
+ String sourceIdentifier,
String targetDatabase,
String targetTableName,
Integer parallelism,
Map options) {
switch (connector) {
case "hive":
+ Identifier identifier = Identifier.fromString(sourceIdentifier);
if (catalog instanceof CachingCatalog) {
catalog = ((CachingCatalog) catalog).wrapped();
}
@@ -49,8 +54,8 @@ public static Migrator getImporter(
}
return new HiveMigrator(
(HiveCatalog) catalog,
- sourceDatabase,
- sourceTableName,
+ identifier.getDatabaseName(),
+ identifier.getTableName(),
targetDatabase,
targetTableName,
parallelism,
@@ -60,6 +65,21 @@ public static Migrator getImporter(
}
}
+ public static Migrator getIcebergImporter(
+ Catalog catalog,
+ Integer parallelism,
+ Map options,
+ Map icebergConf) {
+ checkIcebergRequiredConf(icebergConf);
+ return new IcebergMigrator(
+ catalog,
+ new Path(icebergConf.get("iceberg-meta-path")),
+ icebergConf.get("target-database"),
+ icebergConf.get("target-table"),
+ new Options(icebergConf).getBoolean("ignore-delete-file", false),
+ parallelism);
+ }
+
public static List getImporters(
String connector,
Catalog catalog,
@@ -80,4 +100,16 @@ public static List getImporters(
throw new UnsupportedOperationException("Don't support connector " + connector);
}
}
+
+ private static void checkIcebergRequiredConf(Map icebergConf) {
+ Preconditions.checkArgument(
+ icebergConf.containsKey("iceberg-meta-path"),
+ "please set required iceberg argument 'iceberg-meta-path'.");
+ Preconditions.checkArgument(
+ icebergConf.containsKey("target-database"),
+ "please set required iceberg argument 'target-database'.");
+ Preconditions.checkArgument(
+ icebergConf.containsKey("target-table"),
+ "please set required iceberg argument 'target-table'.");
+ }
}
diff --git a/paimon-hive/paimon-hive-connector-2.3/pom.xml b/paimon-hive/paimon-hive-connector-2.3/pom.xml
index a0f509b53375..daeb87475f3f 100644
--- a/paimon-hive/paimon-hive-connector-2.3/pom.xml
+++ b/paimon-hive/paimon-hive-connector-2.3/pom.xml
@@ -569,6 +569,12 @@ under the License.
${iceberg.version}
test
+
+ org.apache.flink
+ flink-metrics-dropwizard
+ ${iceberg.flink.dropwizard.version}
+ test
+
diff --git a/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/procedure/MigrateIcebergTableProcedure23ITCase.java b/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/procedure/MigrateIcebergTableProcedure23ITCase.java
new file mode 100644
index 000000000000..638a803eb75c
--- /dev/null
+++ b/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/procedure/MigrateIcebergTableProcedure23ITCase.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.procedure;
+
+import org.apache.paimon.hive.procedure.MigrateIcebergTableProcedureITCase;
+
+/** IT cases for migrating iceberg table to paimon table in hive 2.3. */
+public class MigrateIcebergTableProcedure23ITCase extends MigrateIcebergTableProcedureITCase {}
diff --git a/paimon-hive/paimon-hive-connector-3.1/pom.xml b/paimon-hive/paimon-hive-connector-3.1/pom.xml
index 5383af90c3e5..9ea1718205b2 100644
--- a/paimon-hive/paimon-hive-connector-3.1/pom.xml
+++ b/paimon-hive/paimon-hive-connector-3.1/pom.xml
@@ -599,6 +599,12 @@ under the License.
${iceberg.version}
test
+
+ org.apache.flink
+ flink-metrics-dropwizard
+ ${iceberg.flink.dropwizard.version}
+ test
+
diff --git a/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/procedure/MigrateIcebergTableProcedure31ITCase.java b/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/procedure/MigrateIcebergTableProcedure31ITCase.java
new file mode 100644
index 000000000000..fa8ac6dc6503
--- /dev/null
+++ b/paimon-hive/paimon-hive-connector-3.1/src/test/java/org/apache/paimon/procedure/MigrateIcebergTableProcedure31ITCase.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.procedure;
+
+import org.apache.paimon.hive.procedure.MigrateIcebergTableProcedureITCase;
+
+/** IT cases for migrating iceberg table to paimon table in hive 3.1. */
+public class MigrateIcebergTableProcedure31ITCase extends MigrateIcebergTableProcedureITCase {}
diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java
new file mode 100644
index 000000000000..f7de3a954450
--- /dev/null
+++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.hive.procedure;
+
+import org.apache.paimon.hive.runner.PaimonEmbeddedHiveRunner;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import com.klarna.hiverunner.HiveShell;
+import com.klarna.hiverunner.annotations.HiveSQL;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.types.Row;
+import org.assertj.core.api.Assertions;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+
+import java.util.Arrays;
+import java.util.stream.Stream;
+
+/** IT cases for migrating iceberg table to paimon table. */
+@RunWith(PaimonEmbeddedHiveRunner.class)
+public abstract class MigrateIcebergTableProcedureITCase {
+ @Rule public TemporaryFolder folder = new TemporaryFolder();
+
+ @TempDir private java.nio.file.Path iceTempDir;
+ @TempDir private java.nio.file.Path paiTempDir;
+
+ @HiveSQL(files = {})
+ protected static HiveShell hiveShell;
+
+ private static Stream testIcebergArguments() {
+ return Stream.of(Arguments.of(true, false), Arguments.of(false, false));
+ }
+
+ @Test
+ public void initTest() {}
+
+ @ParameterizedTest
+ @MethodSource("testIcebergArguments")
+ public void testMigrateIcebergUnPartitionedTable(boolean isPartitioned, boolean isHive)
+ throws Exception {
+ TableEnvironment tEnv =
+ TableEnvironmentImpl.create(
+ EnvironmentSettings.newInstance().inBatchMode().build());
+
+ // create iceberg catalog, database, table, and insert some data to iceberg table
+ tEnv.executeSql(icebergCatalogDdl(isHive));
+ tEnv.executeSql("USE CATALOG my_iceberg");
+ tEnv.executeSql("CREATE DATABASE iceberg_db;");
+ if (isPartitioned) {
+ tEnv.executeSql(
+ "CREATE TABLE iceberg_db.iceberg_table (id string, id2 int, id3 int) PARTITIONED BY (id3)"
+ + " WITH ('format-version'='2')");
+ } else {
+ tEnv.executeSql(
+ "CREATE TABLE iceberg_db.iceberg_table (id string, id2 int, id3 int) WITH ('format-version'='2')");
+ }
+ tEnv.executeSql("INSERT INTO iceberg_db.iceberg_table VALUES ('a',1,1),('b',2,2),('c',3,3)")
+ .await();
+
+ tEnv.executeSql(paimonCatalogDdl(isHive));
+ tEnv.executeSql("USE CATALOG my_paimon");
+ tEnv.executeSql(
+ String.format(
+ "CALL sys.migrate_table(connector => 'iceberg', "
+ + "iceberg_options => 'iceberg-meta-path=%s,target-database=%s,target-table=%s')",
+ iceTempDir + "/iceberg_db/iceberg_table/metadata",
+ "paimon_db",
+ "paimon_table"))
+ .await();
+
+ Assertions.assertThatList(
+ Arrays.asList(Row.of("a", 1, 1), Row.of("b", 2, 2), Row.of("c", 3, 3)))
+ .containsExactlyInAnyOrderElementsOf(
+ ImmutableList.copyOf(
+ tEnv.executeSql("SELECT * FROM paimon_db.paimon_table").collect()));
+ }
+
+ private String icebergCatalogDdl(boolean isHive) {
+ return isHive
+ ? String.format(
+ "CREATE CATALOG my_iceberg WITH "
+ + "( 'type' = 'iceberg', 'catalog-type' = 'hive', 'uri' = '', "
+ + "'warehouse' = '%s', 'cache-enabled' = 'false' )",
+ iceTempDir)
+ : String.format(
+ "CREATE CATALOG my_iceberg WITH "
+ + "( 'type' = 'iceberg', 'catalog-type' = 'hadoop',"
+ + "'warehouse' = '%s', 'cache-enabled' = 'false' )",
+ iceTempDir);
+ }
+
+ private String paimonCatalogDdl(boolean isHive) {
+ return isHive
+ ? String.format(
+ "CREATE CATALOG my_paimon WITH "
+ + "( 'type' = 'paimon', 'metastore' = 'hive', 'uri' = '', "
+ + "'warehouse' = '%s', 'cache-enabled' = 'false' )",
+ iceTempDir)
+ : String.format(
+ "CREATE CATALOG my_paimon WITH ('type' = 'paimon', 'warehouse' = '%s')",
+ paiTempDir);
+ }
+}
diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
index ca3b6c82e7d3..adc8aff918b1 100644
--- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
+++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java
@@ -192,7 +192,8 @@ public void testMigrateAction(String format) throws Exception {
"default.hivetable",
catalogConf,
"",
- 6);
+ 6,
+ "");
migrateTableAction.run();
tEnv.executeSql(
diff --git a/paimon-hive/pom.xml b/paimon-hive/pom.xml
index 7d1d0f2c499c..43bb7f5a2211 100644
--- a/paimon-hive/pom.xml
+++ b/paimon-hive/pom.xml
@@ -50,6 +50,7 @@ under the License.
0.9.8
1.12.319
1.19
+ 1.19.0