diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java new file mode 100644 index 000000000000..0402d3e8961b --- /dev/null +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.utils.TableMigrationUtils; +import org.apache.paimon.migrate.Migrator; +import org.apache.paimon.utils.ParameterUtils; + +import org.apache.flink.table.procedure.ProcedureContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Migrate procedure to migrate iceberg table to paimon table. */ +public class MigrateIcebergTableProcedure extends ProcedureBase { + + private static final Logger LOG = LoggerFactory.getLogger(MigrateIcebergTableProcedure.class); + + private static final String PAIMON_SUFFIX = "_paimon_"; + + @Override + public String identifier() { + return "migrate_iceberg_table"; + } + + public String[] call( + ProcedureContext procedureContext, String sourceTablePath, String icebergProperties) + throws Exception { + + return call(procedureContext, sourceTablePath, icebergProperties, ""); + } + + public String[] call( + ProcedureContext procedureContext, + String sourceTablePath, + String icebergProperties, + String properties) + throws Exception { + + return call( + procedureContext, + sourceTablePath, + icebergProperties, + properties, + Runtime.getRuntime().availableProcessors()); + } + + public String[] call( + ProcedureContext procedureContext, + String sourceTablePath, + String icebergProperties, + String properties, + Integer parallelism) + throws Exception { + String targetTablePath = sourceTablePath + PAIMON_SUFFIX; + + Identifier sourceTableId = Identifier.fromString(sourceTablePath); + Identifier targetTableId = Identifier.fromString(targetTablePath); + + Migrator migrator = + TableMigrationUtils.getIcebergImporter( + catalog, + sourceTableId.getDatabaseName(), + sourceTableId.getObjectName(), + targetTableId.getDatabaseName(), + targetTableId.getObjectName(), + parallelism, + ParameterUtils.parseCommaSeparatedKeyValues(properties), + ParameterUtils.parseCommaSeparatedKeyValues(icebergProperties)); + LOG.info("create migrator success."); + migrator.executeMigrate(); + + migrator.renameTable(false); + return new String[] {"Success"}; + } +} diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java index 196528d31c78..8778b9d5e187 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java @@ -20,6 +20,7 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.utils.TableMigrationUtils; +import org.apache.paimon.migrate.Migrator; import org.apache.paimon.utils.ParameterUtils; import org.apache.flink.table.procedure.ProcedureContext; @@ -50,25 +51,13 @@ public String[] call( String sourceTablePath, String properties) throws Exception { - String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX; - Identifier sourceTableId = Identifier.fromString(sourceTablePath); - Identifier targetTableId = Identifier.fromString(targetPaimonTablePath); - - TableMigrationUtils.getImporter( - connector, - catalog, - sourceTableId.getDatabaseName(), - sourceTableId.getObjectName(), - targetTableId.getDatabaseName(), - targetTableId.getObjectName(), - Runtime.getRuntime().availableProcessors(), - ParameterUtils.parseCommaSeparatedKeyValues(properties)) - .executeMigrate(); - - LOG.info("Last step: rename " + targetTableId + " to " + sourceTableId); - catalog.renameTable(targetTableId, sourceTableId, false); - return new String[] {"Success"}; + return call( + procedureContext, + connector, + sourceTablePath, + properties, + Runtime.getRuntime().availableProcessors()); } public String[] call( @@ -78,12 +67,13 @@ public String[] call( String properties, Integer parallelism) throws Exception { - String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX; + String targetTablePath = sourceTablePath + PAIMON_SUFFIX; Identifier sourceTableId = Identifier.fromString(sourceTablePath); - Identifier targetTableId = Identifier.fromString(targetPaimonTablePath); + Identifier targetTableId = Identifier.fromString(targetTablePath); - TableMigrationUtils.getImporter( + Migrator migrator = + TableMigrationUtils.getImporter( connector, catalog, sourceTableId.getDatabaseName(), @@ -91,11 +81,11 @@ public String[] call( targetTableId.getDatabaseName(), targetTableId.getObjectName(), parallelism, - ParameterUtils.parseCommaSeparatedKeyValues(properties)) - .executeMigrate(); + ParameterUtils.parseCommaSeparatedKeyValues(properties)); + LOG.info("create migrator success."); + migrator.executeMigrate(); - LOG.info("Last step: rename " + targetTableId + " to " + sourceTableId); - catalog.renameTable(targetTableId, sourceTableId, false); + migrator.renameTable(false); return new String[] {"Success"}; } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableAction.java new file mode 100644 index 000000000000..1b9fcb46a9e9 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableAction.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.paimon.flink.procedure.MigrateIcebergTableProcedure; + +import org.apache.flink.table.procedure.DefaultProcedureContext; + +import java.util.Map; + +/** Migrate from iceberg table to paimon table. */ +public class MigrateIcebergTableAction extends ActionBase { + + private final String sourceTableFullName; + private final String tableProperties; + private final Integer parallelism; + + private final String icebergProperties; + + public MigrateIcebergTableAction( + String sourceTableFullName, + Map catalogConfig, + String icebergProperties, + String tableProperties, + Integer parallelism) { + super(catalogConfig); + this.sourceTableFullName = sourceTableFullName; + this.tableProperties = tableProperties; + this.parallelism = parallelism; + this.icebergProperties = icebergProperties; + } + + @Override + public void run() throws Exception { + MigrateIcebergTableProcedure migrateIcebergTableProcedure = + new MigrateIcebergTableProcedure(); + migrateIcebergTableProcedure.withCatalog(catalog); + migrateIcebergTableProcedure.call( + new DefaultProcedureContext(env), + sourceTableFullName, + icebergProperties, + tableProperties, + parallelism); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableActionFactory.java new file mode 100644 index 000000000000..c85559d66b41 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableActionFactory.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import java.util.Map; +import java.util.Optional; + +/** Action Factory for {@link MigrateIcebergTableAction}. */ +public class MigrateIcebergTableActionFactory implements ActionFactory { + + public static final String IDENTIFIER = "migrate_iceberg_table"; + + private static final String OPTIONS = "options"; + private static final String PARALLELISM = "parallelism"; + + private static final String ICEBERG_OPTIONS = "iceberg_options"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public Optional create(MultipleParameterToolAdapter params) { + + String sourceTable = params.get(TABLE); + Map catalogConfig = catalogConfigMap(params); + String tableConf = params.get(OPTIONS); + Integer parallelism = + params.get(PARALLELISM) == null ? null : Integer.parseInt(params.get(PARALLELISM)); + + String icebergOptions = params.get(ICEBERG_OPTIONS); + + MigrateIcebergTableAction migrateIcebergTableAction = + new MigrateIcebergTableAction( + sourceTable, catalogConfig, icebergOptions, tableConf, parallelism); + return Optional.of(migrateIcebergTableAction); + } + + @Override + public void printHelp() { + System.out.println( + "Action \"migrate_iceberg_table\" runs a migrating job from iceberg to paimon."); + System.out.println(); + + System.out.println("Syntax:"); + System.out.println( + " migrate_iceberg_table" + + "--table " + + "--iceberg_options =[,=,...]" + + "[--catalog_conf ==,=,...]"); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java new file mode 100644 index 000000000000..f43d29ed4f17 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.utils.TableMigrationUtils; +import org.apache.paimon.migrate.Migrator; +import org.apache.paimon.utils.ParameterUtils; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.procedure.ProcedureContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Migrate procedure to migrate iceberg table to paimon table. */ +public class MigrateIcebergTableProcedure extends ProcedureBase { + private static final Logger LOG = LoggerFactory.getLogger(MigrateIcebergTableProcedure.class); + + private static final String PAIMON_SUFFIX = "_paimon_"; + + @Override + public String identifier() { + return "migrate_iceberg_table"; + } + + @ProcedureHint( + argument = { + @ArgumentHint(name = "source_table", type = @DataTypeHint("STRING")), + @ArgumentHint( + name = "iceberg_options", + type = @DataTypeHint("STRING"), + isOptional = true), + @ArgumentHint(name = "options", type = @DataTypeHint("STRING"), isOptional = true), + @ArgumentHint( + name = "parallelism", + type = @DataTypeHint("Integer"), + isOptional = true) + }) + public String[] call( + ProcedureContext procedureContext, + String sourceTablePath, + String icebergProperties, + String properties, + Integer parallelism) + throws Exception { + properties = notnull(properties); + icebergProperties = notnull(icebergProperties); + + String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX; + + Identifier sourceTableId = Identifier.fromString(sourceTablePath); + Identifier targetTableId = Identifier.fromString(targetPaimonTablePath); + + Integer p = parallelism == null ? Runtime.getRuntime().availableProcessors() : parallelism; + + Migrator migrator = + TableMigrationUtils.getIcebergImporter( + catalog, + sourceTableId.getDatabaseName(), + sourceTableId.getObjectName(), + targetTableId.getDatabaseName(), + targetTableId.getObjectName(), + p, + ParameterUtils.parseCommaSeparatedKeyValues(properties), + ParameterUtils.parseCommaSeparatedKeyValues(icebergProperties)); + LOG.info("create migrator success."); + migrator.executeMigrate(); + + migrator.renameTable(false); + return new String[] {"Success"}; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java index fff05a1a8555..32a2a16dc51d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java @@ -20,6 +20,7 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.utils.TableMigrationUtils; +import org.apache.paimon.migrate.Migrator; import org.apache.paimon.utils.ParameterUtils; import org.apache.flink.table.annotation.ArgumentHint; @@ -67,7 +68,8 @@ public String[] call( Integer p = parallelism == null ? Runtime.getRuntime().availableProcessors() : parallelism; - TableMigrationUtils.getImporter( + Migrator migrator = + TableMigrationUtils.getImporter( connector, catalog, sourceTableId.getDatabaseName(), @@ -75,11 +77,11 @@ public String[] call( targetTableId.getDatabaseName(), targetTableId.getObjectName(), p, - ParameterUtils.parseCommaSeparatedKeyValues(properties)) - .executeMigrate(); + ParameterUtils.parseCommaSeparatedKeyValues(properties)); + LOG.info("create migrator success."); + migrator.executeMigrate(); - LOG.info("Last step: rename " + targetTableId + " to " + sourceTableId); - catalog.renameTable(targetTableId, sourceTableId, false); + migrator.renameTable(false); return new String[] {"Success"}; } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java index b59c3592a97d..4e7268c6f14e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java @@ -22,7 +22,9 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.hive.HiveCatalog; import org.apache.paimon.hive.migrate.HiveMigrator; +import org.apache.paimon.iceberg.migrate.IcebergMigrator; import org.apache.paimon.migrate.Migrator; +import org.apache.paimon.options.Options; import java.util.List; import java.util.Map; @@ -60,6 +62,27 @@ public static Migrator getImporter( } } + public static Migrator getIcebergImporter( + Catalog catalog, + String sourceDatabase, + String sourceTableName, + String targetDatabase, + String targetTableName, + Integer parallelism, + Map options, + Map icebergOptions) { + + Options icebergConf = new Options(icebergOptions); + return new IcebergMigrator( + catalog, + targetDatabase, + targetTableName, + sourceDatabase, + sourceTableName, + icebergConf, + parallelism); + } + public static List getImporters( String connector, Catalog catalog, diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 6f6becf85fc7..efaa25627d69 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -85,4 +85,5 @@ org.apache.paimon.flink.procedure.CloneProcedure org.apache.paimon.flink.procedure.CompactManifestProcedure org.apache.paimon.flink.procedure.RefreshObjectTableProcedure org.apache.paimon.flink.procedure.RemoveUnexistingFilesProcedure -org.apache.paimon.flink.procedure.ClearConsumersProcedure \ No newline at end of file +org.apache.paimon.flink.procedure.ClearConsumersProcedure +org.apache.paimon.flink.procedure.MigrateIcebergTableProcedure diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java new file mode 100644 index 000000000000..3c0d7da024a0 --- /dev/null +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg.migrate; + +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.client.ClientPool; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.hive.HiveCatalog; +import org.apache.paimon.hive.pool.CachedClientPool; +import org.apache.paimon.iceberg.IcebergOptions; +import org.apache.paimon.iceberg.metadata.IcebergMetadata; +import org.apache.paimon.options.Options; +import org.apache.paimon.utils.Preconditions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Get iceberg table latest snapshot metadata in hive. */ +public class IcebergMigrateHiveMetadata implements IcebergMigrateMetadata { + private static final Logger LOG = LoggerFactory.getLogger(IcebergMigrateHiveMetadata.class); + + public static final String TABLE_TYPE_PROP = "table_type"; + public static final String ICEBERG_TABLE_TYPE_VALUE = "iceberg"; + private static final String ICEBERG_METADATA_LOCATION = "metadata_location"; + + private FileIO fileIO; + private final Options icebergOptions; + private final Identifier icebergIdentifier; + + private final ClientPool clients; + + private String metadataLocation = null; + + private IcebergMetadata icebergMetadata; + + public IcebergMigrateHiveMetadata(Identifier icebergIdentifier, Options icebergOptions) { + + this.icebergIdentifier = icebergIdentifier; + this.icebergOptions = icebergOptions; + + String uri = icebergOptions.get(IcebergOptions.URI); + String hiveConfDir = icebergOptions.get(IcebergOptions.HIVE_CONF_DIR); + String hadoopConfDir = icebergOptions.get(IcebergOptions.HADOOP_CONF_DIR); + Configuration hadoopConf = new Configuration(); + hadoopConf.setClassLoader(IcebergMigrateHiveMetadata.class.getClassLoader()); + HiveConf hiveConf = HiveCatalog.createHiveConf(hiveConfDir, hadoopConfDir, hadoopConf); + + icebergOptions.toMap().forEach(hiveConf::set); + if (uri != null) { + hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, uri); + } + + if (hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname) == null) { + LOG.error( + "Can't find hive metastore uri to connect: " + + "either set {} in iceberg options or set hive.metastore.uris " + + "in hive-site.xml or hadoop configurations. " + + "Will use empty metastore uris, which means we may use a embedded metastore. " + + "Please make sure hive metastore uri for iceberg table is correctly set as expected.", + IcebergOptions.URI.key()); + } + + this.clients = + new CachedClientPool( + hiveConf, + icebergOptions, + icebergOptions.getString(IcebergOptions.HIVE_CLIENT_CLASS)); + } + + @Override + public IcebergMetadata icebergMetadata() { + try { + boolean isExist = tableExists(icebergIdentifier); + if (!isExist) { + throw new RuntimeException( + String.format( + "iceberg table %s is not existed in hive metastore", + icebergIdentifier)); + } + Table icebergHiveTable = + clients.run( + client -> + client.getTable( + icebergIdentifier.getDatabaseName(), + icebergIdentifier.getTableName())); + // check whether it is an iceberg table + String tableType = icebergHiveTable.getParameters().get(TABLE_TYPE_PROP); + Preconditions.checkArgument( + tableType != null && tableType.equalsIgnoreCase(ICEBERG_TABLE_TYPE_VALUE), + "not an iceberg table: %s (table-type=%s)", + icebergIdentifier.toString(), + tableType); + + metadataLocation = icebergHiveTable.getParameters().get(ICEBERG_METADATA_LOCATION); + LOG.info("iceberg latest metadata location: {}", metadataLocation); + + fileIO = FileIO.get(new Path(metadataLocation), CatalogContext.create(icebergOptions)); + + icebergMetadata = IcebergMetadata.fromPath(fileIO, new Path(metadataLocation)); + return icebergMetadata; + } catch (Exception e) { + throw new RuntimeException( + String.format("Failed to read Iceberg metadata from path %s", metadataLocation), + e); + } + } + + @Override + public String icebergLatestMetadataLocation() { + return metadataLocation; + } + + @Override + public void deleteOriginTable() { + LOG.info("Iceberg table in hive to be deleted:{}", icebergIdentifier.toString()); + try { + clients.run( + client -> { + client.dropTable( + icebergIdentifier.getDatabaseName(), + icebergIdentifier.getTableName(), + true, + true); + return null; + }); + + // iceberg table in hive is external table, client.dropTable only deletes the metadata + // of iceberg table in hive, so we manually delete the data files + Path icebergTablePath = new Path(icebergMetadata.location()); + + if (fileIO.exists(icebergTablePath) && fileIO.isDir(icebergTablePath)) { + fileIO.deleteDirectoryQuietly(icebergTablePath); + } + } catch (Exception e) { + LOG.warn("exception occurred when deleting origin table", e); + } + } + + private boolean tableExists(Identifier identifier) throws Exception { + return clients.run( + client -> + client.tableExists( + identifier.getDatabaseName(), identifier.getTableName())); + } +} diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadataFactory.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadataFactory.java new file mode 100644 index 000000000000..0a539cdec2d2 --- /dev/null +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadataFactory.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg.migrate; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.iceberg.IcebergOptions; +import org.apache.paimon.options.Options; + +/** Factory to create {@link IcebergMigrateHiveMetadata}. */ +public class IcebergMigrateHiveMetadataFactory implements IcebergMigrateMetadataFactory { + @Override + public String identifier() { + return IcebergOptions.StorageType.HIVE_CATALOG.toString() + "_migrate"; + } + + @Override + public IcebergMigrateHiveMetadata create(Identifier icebergIdentifier, Options icebergOptions) { + return new IcebergMigrateHiveMetadata(icebergIdentifier, icebergOptions); + } +} diff --git a/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 26f0944d916e..608f034659ca 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -16,3 +16,4 @@ org.apache.paimon.hive.HiveCatalogFactory org.apache.paimon.hive.HiveCatalogLockFactory org.apache.paimon.iceberg.IcebergHiveMetadataCommitterFactory +org.apache.paimon.iceberg.migrate.IcebergMigrateHiveMetadataFactory diff --git a/paimon-hive/paimon-hive-connector-common/pom.xml b/paimon-hive/paimon-hive-connector-common/pom.xml index 397dfc942185..a79f2002ea24 100644 --- a/paimon-hive/paimon-hive-connector-common/pom.xml +++ b/paimon-hive/paimon-hive-connector-common/pom.xml @@ -562,6 +562,25 @@ under the License. + + org.apache.iceberg + iceberg-flink-${iceberg.flink.version} + ${iceberg.version} + test + + + org.apache.orc + orc-core + + + + + org.apache.flink + flink-metrics-dropwizard + ${iceberg.flink.dropwizard.version} + test + + junit junit diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java new file mode 100644 index 000000000000..1875b08eba22 --- /dev/null +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.hive.procedure; + +import org.apache.paimon.flink.action.ActionITCaseBase; +import org.apache.paimon.flink.action.MigrateIcebergTableAction; +import org.apache.paimon.flink.procedure.MigrateIcebergTableProcedure; +import org.apache.paimon.hive.TestHiveMetastore; + +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; + +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.internal.TableEnvironmentImpl; +import org.apache.flink.types.Row; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; + +/** Tests for {@link MigrateIcebergTableProcedure}. */ +public class MigrateIcebergTableProcedureITCase extends ActionITCaseBase { + + private static final Logger LOG = + LoggerFactory.getLogger(MigrateIcebergTableProcedureITCase.class); + + private static final TestHiveMetastore TEST_HIVE_METASTORE = new TestHiveMetastore(); + + private static final int PORT = 9087; + + @TempDir java.nio.file.Path iceTempDir; + @TempDir java.nio.file.Path paiTempDir; + + @BeforeEach + public void beforeEach() { + TEST_HIVE_METASTORE.start(PORT); + } + + @AfterEach + public void afterEach() throws Exception { + TEST_HIVE_METASTORE.stop(); + } + + @Test + public void testMigrateIcebergTableProcedure() throws Exception { + TableEnvironment tEnv = + TableEnvironmentImpl.create( + EnvironmentSettings.newInstance().inBatchMode().build()); + + ThreadLocalRandom random = ThreadLocalRandom.current(); + boolean isPartitioned = random.nextBoolean(); + boolean icebergIsHive = random.nextBoolean(); + boolean paimonIsHive = random.nextBoolean(); + boolean isNamedArgument = random.nextBoolean(); + + // Logging the random arguments for debugging + LOG.info( + "isPartitioned:{}, icebergIsHive:{}, paimonIsHive:{}, isNamedArgument:{}", + isPartitioned, + icebergIsHive, + paimonIsHive, + isNamedArgument); + + // create iceberg catalog, database, table, and insert some data to iceberg table + tEnv.executeSql(icebergCatalogDdl(icebergIsHive)); + + String icebergTable = "iceberg_" + UUID.randomUUID().toString().replace("-", "_"); + tEnv.executeSql("USE CATALOG my_iceberg"); + if (isPartitioned) { + tEnv.executeSql( + String.format( + "CREATE TABLE `default`.`%s` (id string, id2 int, id3 int) PARTITIONED BY (id3)", + icebergTable)); + } else { + tEnv.executeSql( + String.format( + "CREATE TABLE `default`.`%s` (id string, id2 int, id3 int) WITH ('format-version'='2')", + icebergTable)); + } + tEnv.executeSql( + String.format( + "INSERT INTO `default`.`%s` VALUES ('a',1,1),('b',2,2),('c',3,3)", + icebergTable)) + .await(); + + tEnv.executeSql(paimonCatalogDdl(paimonIsHive)); + tEnv.executeSql("USE CATALOG my_paimon"); + + String icebergOptions = + icebergIsHive + ? "metadata.iceberg.storage=hive-catalog, metadata.iceberg.uri=thrift://localhost:" + + PORT + : "metadata.iceberg.storage=hadoop-catalog,iceberg_warehouse=" + iceTempDir; + if (isNamedArgument) { + tEnv.executeSql( + String.format( + "CALL sys.migrate_iceberg_table(source_table => 'default.%s', " + + "iceberg_options => '%s')", + icebergTable, icebergOptions)) + .await(); + } else { + tEnv.executeSql( + String.format( + "CALL sys.migrate_iceberg_table('default.%s','%s')", + icebergTable, icebergOptions)) + .await(); + } + + Assertions.assertThatList( + Arrays.asList(Row.of("a", 1, 1), Row.of("b", 2, 2), Row.of("c", 3, 3))) + .containsExactlyInAnyOrderElementsOf( + ImmutableList.copyOf( + tEnv.executeSql( + String.format( + "SELECT * FROM `default`.`%s`", + icebergTable)) + .collect())); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testMigrateIcebergTableAction(boolean isPartitioned) throws Exception { + TableEnvironment tEnv = + TableEnvironmentImpl.create( + EnvironmentSettings.newInstance().inBatchMode().build()); + + // create iceberg catalog, database, table, and insert some data to iceberg table + tEnv.executeSql(icebergCatalogDdl(true)); + + String icebergTable = "iceberg_" + UUID.randomUUID().toString().replace("-", "_"); + tEnv.executeSql("USE CATALOG my_iceberg"); + if (isPartitioned) { + tEnv.executeSql( + String.format( + "CREATE TABLE `default`.`%s` (id string, id2 int, id3 int) PARTITIONED BY (id3)", + icebergTable)); + } else { + tEnv.executeSql( + String.format( + "CREATE TABLE `default`.`%s` (id string, id2 int, id3 int) WITH ('format-version'='2')", + icebergTable)); + } + tEnv.executeSql( + String.format( + "INSERT INTO `default`.`%s` VALUES ('a',1,1),('b',2,2),('c',3,3)", + icebergTable)) + .await(); + + String icebergOptions = + "metadata.iceberg.storage=hive-catalog, metadata.iceberg.uri=thrift://localhost:" + + PORT; + + Map catalogConf = new HashMap<>(); + catalogConf.put("warehouse", paiTempDir.toString()); + catalogConf.put("metastore", "hive"); + catalogConf.put("uri", "thrift://localhost:" + PORT); + catalogConf.put("cache-enabled", "false"); + + MigrateIcebergTableAction migrateIcebergTableAction = + new MigrateIcebergTableAction( + "default." + icebergTable, catalogConf, icebergOptions, "", 6); + migrateIcebergTableAction.run(); + + tEnv.executeSql(paimonCatalogDdl(true)); + tEnv.executeSql("USE CATALOG my_paimon"); + Assertions.assertThatList( + Arrays.asList(Row.of("a", 1, 1), Row.of("b", 2, 2), Row.of("c", 3, 3))) + .containsExactlyInAnyOrderElementsOf( + ImmutableList.copyOf( + tEnv.executeSql( + String.format( + "SELECT * FROM `my_paimon`.`default`.`%s`", + icebergTable)) + .collect())); + } + + private String icebergCatalogDdl(boolean isHive) { + return isHive + ? String.format( + "CREATE CATALOG my_iceberg WITH " + + "( 'type' = 'iceberg', 'catalog-type' = 'hive', 'uri' = 'thrift://localhost:%s', " + + "'warehouse' = '%s', 'cache-enabled' = 'false')", + PORT, iceTempDir) + : String.format( + "CREATE CATALOG my_iceberg WITH " + + "( 'type' = 'iceberg', 'catalog-type' = 'hadoop'," + + "'warehouse' = '%s', 'cache-enabled' = 'false' )", + iceTempDir); + } + + private String paimonCatalogDdl(boolean isHive) { + return isHive + ? String.format( + "CREATE CATALOG my_paimon WITH " + + "( 'type' = 'paimon', 'metastore' = 'hive', 'uri' = 'thrift://localhost:%s', " + + "'warehouse' = '%s', 'cache-enabled' = 'false' )", + PORT, iceTempDir) + : String.format( + "CREATE CATALOG my_paimon WITH ('type' = 'paimon', 'warehouse' = '%s')", + paiTempDir); + } +} diff --git a/paimon-hive/pom.xml b/paimon-hive/pom.xml index c97aceb1b85c..92f32d1336a6 100644 --- a/paimon-hive/pom.xml +++ b/paimon-hive/pom.xml @@ -50,6 +50,7 @@ under the License. 0.9.8 1.12.319 1.19 + 1.19.0