From 2f9f2a0d366e120b5e8f8900a6a73ca01bf4b159 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Thu, 21 Nov 2024 15:11:50 +0800 Subject: [PATCH 1/9] [iceberg] Introduce feature and IT cases to migrate table from iceberg to paimon # Conflicts: # paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java --- .../procedure/MigrateTableProcedure.java | 59 ++++--- .../flink/action/MigrateTableAction.java | 26 ++- .../action/MigrateTableActionFactory.java | 17 +- .../procedure/MigrateTableProcedure.java | 29 +++- .../flink/utils/TableMigrationUtils.java | 35 ++++ .../migrate/IcebergMigrateHiveMetadata.java | 156 +++++++++++++++++ .../IcebergMigrateHiveMetadataFactory.java | 38 ++++ .../org.apache.paimon.factories.Factory | 1 + paimon-hive/paimon-hive-connector-2.3/pom.xml | 6 + paimon-hive/paimon-hive-connector-3.1/pom.xml | 6 + .../paimon-hive-connector-common/pom.xml | 19 ++ .../MigrateTableProcedureITCase.java | 164 ++++++++++++++++++ paimon-hive/pom.xml | 1 + 13 files changed, 524 insertions(+), 33 deletions(-) create mode 100644 paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java create mode 100644 paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadataFactory.java diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java index 196528d31c78..8f2a5401fc8c 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java @@ -20,6 +20,7 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.utils.TableMigrationUtils; +import org.apache.paimon.migrate.Migrator; import org.apache.paimon.utils.ParameterUtils; import org.apache.flink.table.procedure.ProcedureContext; @@ -50,25 +51,13 @@ public String[] call( String sourceTablePath, String properties) throws Exception { - String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX; - - Identifier sourceTableId = Identifier.fromString(sourceTablePath); - Identifier targetTableId = Identifier.fromString(targetPaimonTablePath); - - TableMigrationUtils.getImporter( - connector, - catalog, - sourceTableId.getDatabaseName(), - sourceTableId.getObjectName(), - targetTableId.getDatabaseName(), - targetTableId.getObjectName(), - Runtime.getRuntime().availableProcessors(), - ParameterUtils.parseCommaSeparatedKeyValues(properties)) - .executeMigrate(); - LOG.info("Last step: rename " + targetTableId + " to " + sourceTableId); - catalog.renameTable(targetTableId, sourceTableId, false); - return new String[] {"Success"}; + return call( + procedureContext, + connector, + sourceTablePath, + properties, + Runtime.getRuntime().availableProcessors()); } public String[] call( @@ -80,10 +69,31 @@ public String[] call( throws Exception { String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX; + return call( + procedureContext, + connector, + sourceTablePath, + properties, + parallelism, + targetPaimonTablePath, + ""); + } + + public String[] call( + ProcedureContext procedureContext, + String connector, + String sourceTablePath, + String properties, + Integer parallelism, + String targetTablePath, + String icebergProperties) + throws Exception { + Identifier sourceTableId = Identifier.fromString(sourceTablePath); - Identifier targetTableId = Identifier.fromString(targetPaimonTablePath); + Identifier targetTableId = Identifier.fromString(targetTablePath); - TableMigrationUtils.getImporter( + Migrator migrator = + TableMigrationUtils.getImporter( connector, catalog, sourceTableId.getDatabaseName(), @@ -91,11 +101,12 @@ public String[] call( targetTableId.getDatabaseName(), targetTableId.getObjectName(), parallelism, - ParameterUtils.parseCommaSeparatedKeyValues(properties)) - .executeMigrate(); + ParameterUtils.parseCommaSeparatedKeyValues(properties), + ParameterUtils.parseCommaSeparatedKeyValues(icebergProperties)); + LOG.info("create migrator success."); + migrator.executeMigrate(); - LOG.info("Last step: rename " + targetTableId + " to " + sourceTableId); - catalog.renameTable(targetTableId, sourceTableId, false); + migrator.renameTable(false); return new String[] {"Success"}; } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java index b12cb5f862fe..1fe9441812d8 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java @@ -32,6 +32,9 @@ public class MigrateTableAction extends ActionBase { private final String tableProperties; private final Integer parallelism; + private final String paimonTableFullName; + private final String icebergProperties; + public MigrateTableAction( String connector, String hiveTableFullName, @@ -43,6 +46,25 @@ public MigrateTableAction( this.hiveTableFullName = hiveTableFullName; this.tableProperties = tableProperties; this.parallelism = parallelism; + this.paimonTableFullName = null; + this.icebergProperties = null; + } + + public MigrateTableAction( + String connector, + String hiveTableFullName, + Map catalogConfig, + String tableProperties, + Integer parallelism, + String paimonTableFullName, + String icebergProperties) { + super(catalogConfig); + this.connector = connector; + this.hiveTableFullName = hiveTableFullName; + this.tableProperties = tableProperties; + this.parallelism = parallelism; + this.paimonTableFullName = paimonTableFullName; + this.icebergProperties = icebergProperties; } @Override @@ -54,6 +76,8 @@ public void run() throws Exception { connector, hiveTableFullName, tableProperties, - parallelism); + parallelism, + paimonTableFullName, + icebergProperties); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java index b2c96795a439..3af99d14c4b0 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java @@ -30,6 +30,9 @@ public class MigrateTableActionFactory implements ActionFactory { private static final String OPTIONS = "options"; private static final String PARALLELISM = "parallelism"; + private static final String TARGET_TYPE = "target_type"; + private static final String ICEBERG_OPTIONS = "iceberg_options"; + @Override public String identifier() { return IDENTIFIER; @@ -41,11 +44,21 @@ public Optional create(MultipleParameterToolAdapter params) { String sourceHiveTable = params.get(TABLE); Map catalogConfig = catalogConfigMap(params); String tableConf = params.get(OPTIONS); - Integer parallelism = Integer.parseInt(params.get(PARALLELISM)); + Integer parallelism = + params.get(PARALLELISM) == null ? null : Integer.parseInt(params.get(PARALLELISM)); + + String targetPaimonTable = params.get(TARGET_TYPE); + String icebergOptions = params.get(ICEBERG_OPTIONS); MigrateTableAction migrateTableAction = new MigrateTableAction( - connector, sourceHiveTable, catalogConfig, tableConf, parallelism); + connector, + sourceHiveTable, + catalogConfig, + tableConf, + parallelism, + targetPaimonTable, + icebergOptions); return Optional.of(migrateTableAction); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java index fff05a1a8555..bba99d844ae8 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java @@ -20,6 +20,7 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.utils.TableMigrationUtils; +import org.apache.paimon.migrate.Migrator; import org.apache.paimon.utils.ParameterUtils; import org.apache.flink.table.annotation.ArgumentHint; @@ -49,6 +50,14 @@ public String identifier() { @ArgumentHint( name = "parallelism", type = @DataTypeHint("Integer"), + isOptional = true), + @ArgumentHint( + name = "target_table", + type = @DataTypeHint("STRING"), + isOptional = true), + @ArgumentHint( + name = "iceberg_options", + type = @DataTypeHint("STRING"), isOptional = true) }) public String[] call( @@ -56,18 +65,25 @@ public String[] call( String connector, String sourceTablePath, String properties, - Integer parallelism) + Integer parallelism, + String targetTablePath, + String icebergProperties) throws Exception { properties = notnull(properties); + icebergProperties = notnull(icebergProperties); String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX; + if (targetTablePath != null) { + targetPaimonTablePath = targetTablePath; + } Identifier sourceTableId = Identifier.fromString(sourceTablePath); Identifier targetTableId = Identifier.fromString(targetPaimonTablePath); Integer p = parallelism == null ? Runtime.getRuntime().availableProcessors() : parallelism; - TableMigrationUtils.getImporter( + Migrator migrator = + TableMigrationUtils.getImporter( connector, catalog, sourceTableId.getDatabaseName(), @@ -75,11 +91,12 @@ public String[] call( targetTableId.getDatabaseName(), targetTableId.getObjectName(), p, - ParameterUtils.parseCommaSeparatedKeyValues(properties)) - .executeMigrate(); + ParameterUtils.parseCommaSeparatedKeyValues(properties), + ParameterUtils.parseCommaSeparatedKeyValues(icebergProperties)); + LOG.info("create migrator success."); + migrator.executeMigrate(); - LOG.info("Last step: rename " + targetTableId + " to " + sourceTableId); - catalog.renameTable(targetTableId, sourceTableId, false); + migrator.renameTable(false); return new String[] {"Success"}; } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java index b59c3592a97d..68257417d585 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java @@ -22,8 +22,11 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.hive.HiveCatalog; import org.apache.paimon.hive.migrate.HiveMigrator; +import org.apache.paimon.iceberg.migrate.IcebergMigrator; import org.apache.paimon.migrate.Migrator; +import org.apache.paimon.options.Options; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -39,6 +42,28 @@ public static Migrator getImporter( String targetTableName, Integer parallelism, Map options) { + return getImporter( + connector, + catalog, + sourceDatabase, + sourceTableName, + targetDatabase, + targetTableName, + parallelism, + options, + Collections.emptyMap()); + } + + public static Migrator getImporter( + String connector, + Catalog catalog, + String sourceDatabase, + String sourceTableName, + String targetDatabase, + String targetTableName, + Integer parallelism, + Map options, + Map icebergOptions) { switch (connector) { case "hive": if (catalog instanceof CachingCatalog) { @@ -55,6 +80,16 @@ public static Migrator getImporter( targetTableName, parallelism, options); + case "iceberg": + Options icebergConf = new Options(icebergOptions); + return new IcebergMigrator( + catalog, + targetDatabase, + targetTableName, + sourceDatabase, + sourceTableName, + icebergConf, + parallelism); default: throw new UnsupportedOperationException("Don't support connector " + connector); } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java new file mode 100644 index 000000000000..10669f09a0f5 --- /dev/null +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg.migrate; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.client.ClientPool; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.hive.HiveCatalog; +import org.apache.paimon.hive.pool.CachedClientPool; +import org.apache.paimon.iceberg.IcebergOptions; +import org.apache.paimon.iceberg.metadata.IcebergMetadata; +import org.apache.paimon.options.Options; +import org.apache.paimon.utils.Preconditions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Get iceberg table latest snapshot metadata in hive. */ +public class IcebergMigrateHiveMetadata implements IcebergMigrateMetadata { + private static final Logger LOG = LoggerFactory.getLogger(IcebergMigrateHiveMetadata.class); + + public static final String TABLE_TYPE_PROP = "table_type"; + public static final String ICEBERG_TABLE_TYPE_VALUE = "iceberg"; + private static final String ICEBERG_METADATA_LOCATION = "metadata_location"; + + private final FileIO fileIO; + private final Identifier icebergIdentifier; + + private final ClientPool clients; + + private String metadataLocation = null; + + public IcebergMigrateHiveMetadata( + Identifier icebergIdentifier, FileIO fileIO, Options icebergOptions) { + this.fileIO = fileIO; + this.icebergIdentifier = icebergIdentifier; + + String uri = icebergOptions.get(IcebergOptions.URI); + String hiveConfDir = icebergOptions.get(IcebergOptions.HIVE_CONF_DIR); + String hadoopConfDir = icebergOptions.get(IcebergOptions.HADOOP_CONF_DIR); + Configuration hadoopConf = new Configuration(); + hadoopConf.setClassLoader(IcebergMigrateHiveMetadata.class.getClassLoader()); + HiveConf hiveConf = HiveCatalog.createHiveConf(hiveConfDir, hadoopConfDir, hadoopConf); + + icebergOptions.toMap().forEach(hiveConf::set); + if (uri != null) { + hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, uri); + } + + if (hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname) == null) { + LOG.error( + "Can't find hive metastore uri to connect: " + + "either set {} in iceberg options or set hive.metastore.uris " + + "in hive-site.xml or hadoop configurations. " + + "Will use empty metastore uris, which means we may use a embedded metastore. " + + "Please make sure hive metastore uri for iceberg table is correctly set as expected.", + IcebergOptions.URI.key()); + } + + this.clients = + new CachedClientPool( + hiveConf, + icebergOptions, + icebergOptions.getString(IcebergOptions.HIVE_CLIENT_CLASS)); + } + + @Override + public IcebergMetadata icebergMetadata() { + try { + boolean isExist = tableExists(icebergIdentifier); + if (!isExist) { + throw new RuntimeException( + String.format( + "iceberg table %s is not existed in hive metastore", + icebergIdentifier)); + } + Table icebergHiveTable = + clients.run( + client -> + client.getTable( + icebergIdentifier.getDatabaseName(), + icebergIdentifier.getTableName())); + // TODO:Is this check necessary? + // check whether it is an iceberg table + String tableType = icebergHiveTable.getParameters().get(TABLE_TYPE_PROP); + Preconditions.checkArgument( + tableType != null && tableType.equalsIgnoreCase(ICEBERG_TABLE_TYPE_VALUE), + "not an iceberg table: %s (table-type=%s)", + icebergIdentifier.toString(), + tableType); + + metadataLocation = icebergHiveTable.getParameters().get(ICEBERG_METADATA_LOCATION); + LOG.info("iceberg latest metadata location: {}", metadataLocation); + + return IcebergMetadata.fromPath(fileIO, new Path(metadataLocation)); + } catch (Exception e) { + throw new RuntimeException( + String.format("Failed to read Iceberg metadata from path %s", metadataLocation), + e); + } + } + + @Override + public String icebergLatestMetadataLocation() { + return metadataLocation; + } + + @Override + public void deleteOriginTable() { + LOG.info("Iceberg table in hive to be deleted:{}", icebergIdentifier.toString()); + try { + clients.run( + client -> { + client.dropTable( + icebergIdentifier.getDatabaseName(), + icebergIdentifier.getTableName(), + true, + true); + return null; + }); + } catch (Exception e) { + LOG.warn( + "exception occurred when deleting origin table, exception message:{}", + e.getMessage()); + } + } + + private boolean tableExists(Identifier identifier) throws Exception { + return clients.run( + client -> + client.tableExists( + identifier.getDatabaseName(), identifier.getTableName())); + } +} diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadataFactory.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadataFactory.java new file mode 100644 index 000000000000..184eb3134058 --- /dev/null +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadataFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg.migrate; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.iceberg.IcebergOptions; +import org.apache.paimon.options.Options; + +/** Factory to create {@link IcebergMigrateHiveMetadata}. */ +public class IcebergMigrateHiveMetadataFactory implements IcebergMigrateMetadataFactory { + @Override + public String identifier() { + return IcebergOptions.StorageType.HIVE_CATALOG.toString() + "_migrate"; + } + + @Override + public IcebergMigrateHiveMetadata create( + Identifier icebergIdentifier, FileIO fileIO, Options icebergOptions) { + return new IcebergMigrateHiveMetadata(icebergIdentifier, fileIO, icebergOptions); + } +} diff --git a/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 26f0944d916e..608f034659ca 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -16,3 +16,4 @@ org.apache.paimon.hive.HiveCatalogFactory org.apache.paimon.hive.HiveCatalogLockFactory org.apache.paimon.iceberg.IcebergHiveMetadataCommitterFactory +org.apache.paimon.iceberg.migrate.IcebergMigrateHiveMetadataFactory diff --git a/paimon-hive/paimon-hive-connector-2.3/pom.xml b/paimon-hive/paimon-hive-connector-2.3/pom.xml index c0660fc573ee..2e235c832d86 100644 --- a/paimon-hive/paimon-hive-connector-2.3/pom.xml +++ b/paimon-hive/paimon-hive-connector-2.3/pom.xml @@ -569,6 +569,12 @@ under the License. ${iceberg.version} test + + org.apache.flink + flink-metrics-dropwizard + ${iceberg.flink.dropwizard.version} + test + diff --git a/paimon-hive/paimon-hive-connector-3.1/pom.xml b/paimon-hive/paimon-hive-connector-3.1/pom.xml index ef700d8c0bcd..2ae5b10e6a1c 100644 --- a/paimon-hive/paimon-hive-connector-3.1/pom.xml +++ b/paimon-hive/paimon-hive-connector-3.1/pom.xml @@ -599,6 +599,12 @@ under the License. ${iceberg.version} test + + org.apache.flink + flink-metrics-dropwizard + ${iceberg.flink.dropwizard.version} + test + diff --git a/paimon-hive/paimon-hive-connector-common/pom.xml b/paimon-hive/paimon-hive-connector-common/pom.xml index 397dfc942185..a79f2002ea24 100644 --- a/paimon-hive/paimon-hive-connector-common/pom.xml +++ b/paimon-hive/paimon-hive-connector-common/pom.xml @@ -562,6 +562,25 @@ under the License. + + org.apache.iceberg + iceberg-flink-${iceberg.flink.version} + ${iceberg.version} + test + + + org.apache.orc + orc-core + + + + + org.apache.flink + flink-metrics-dropwizard + ${iceberg.flink.dropwizard.version} + test + + junit junit diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java index 8d6ded69dc99..7227854e6c94 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java @@ -25,22 +25,27 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; +import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.types.Row; import org.apache.hadoop.hive.conf.HiveConf; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.UUID; import java.util.stream.Stream; /** Tests for {@link MigrateFileProcedure}. */ @@ -50,6 +55,9 @@ public class MigrateTableProcedureITCase extends ActionITCaseBase { private static final int PORT = 9084; + @TempDir java.nio.file.Path iceTempDir; + @TempDir java.nio.file.Path paiTempDir; + @BeforeEach public void beforeEach() { TEST_HIVE_METASTORE.start(PORT); @@ -70,6 +78,16 @@ private static Stream testArguments() { Arguments.of("parquet", false)); } + private static Stream testIcebergArguments() { + return Stream.of( + Arguments.of(true, false, true), + Arguments.of(false, false, true), + Arguments.of(true, true, true), + Arguments.of(false, true, true), + Arguments.of(true, true, false), + Arguments.of(false, true, false)); + } + @ParameterizedTest @MethodSource("testArguments") public void testMigrateProcedure(String format, boolean isNamedArgument) throws Exception { @@ -203,6 +221,152 @@ public void testMigrateAction(String format) throws Exception { Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2); } + @ParameterizedTest + @MethodSource("testIcebergArguments") + public void testMigrateIcebergTableProcedure( + boolean isPartitioned, boolean isHive, boolean isNamedArgument) throws Exception { + TableEnvironment tEnv = + TableEnvironmentImpl.create( + EnvironmentSettings.newInstance().inBatchMode().build()); + + // create iceberg catalog, database, table, and insert some data to iceberg table + tEnv.executeSql(icebergCatalogDdl(isHive)); + + String icebergTable = "iceberg_" + UUID.randomUUID().toString().replace("-", "_"); + tEnv.executeSql("USE CATALOG my_iceberg"); + if (isPartitioned) { + tEnv.executeSql( + String.format( + "CREATE TABLE `default`.`%s` (id string, id2 int, id3 int) PARTITIONED BY (id3)", + icebergTable)); + } else { + tEnv.executeSql( + String.format( + "CREATE TABLE `default`.`%s` (id string, id2 int, id3 int) WITH ('format-version'='2')", + icebergTable)); + } + tEnv.executeSql( + String.format( + "INSERT INTO `default`.`%s` VALUES ('a',1,1),('b',2,2),('c',3,3)", + icebergTable)) + .await(); + + tEnv.executeSql(paimonCatalogDdl(false)); + tEnv.executeSql("USE CATALOG my_paimon"); + + String icebergOptions = + isHive + ? "metadata.iceberg.storage=hive-catalog, metadata.iceberg.uri=thrift://localhost:" + + PORT + : "metadata.iceberg.storage=hadoop-catalog,iceberg_warehouse=" + iceTempDir; + if (isNamedArgument) { + tEnv.executeSql( + String.format( + "CALL sys.migrate_table(connector => 'iceberg', source_table => 'default.%s', " + + "target_table => 'default.paimon_table', " + + "iceberg_options => '%s')", + icebergTable, icebergOptions)) + .await(); + } else { + tEnv.executeSql( + String.format( + "CALL sys.migrate_table('iceberg', 'default.%s','',1,'default.paimon_table','%s')", + icebergTable, icebergOptions)) + .await(); + } + + Assertions.assertThatList( + Arrays.asList(Row.of("a", 1, 1), Row.of("b", 2, 2), Row.of("c", 3, 3))) + .containsExactlyInAnyOrderElementsOf( + ImmutableList.copyOf( + tEnv.executeSql("SELECT * FROM `default`.`paimon_table`") + .collect())); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testMigrateIcebergTableAction(boolean isPartitioned) throws Exception { + TableEnvironment tEnv = + TableEnvironmentImpl.create( + EnvironmentSettings.newInstance().inBatchMode().build()); + + // create iceberg catalog, database, table, and insert some data to iceberg table + tEnv.executeSql(icebergCatalogDdl(true)); + + String icebergTable = "iceberg_" + UUID.randomUUID().toString().replace("-", "_"); + tEnv.executeSql("USE CATALOG my_iceberg"); + if (isPartitioned) { + tEnv.executeSql( + String.format( + "CREATE TABLE `default`.`%s` (id string, id2 int, id3 int) PARTITIONED BY (id3)", + icebergTable)); + } else { + tEnv.executeSql( + String.format( + "CREATE TABLE `default`.`%s` (id string, id2 int, id3 int) WITH ('format-version'='2')", + icebergTable)); + } + tEnv.executeSql( + String.format( + "INSERT INTO `default`.`%s` VALUES ('a',1,1),('b',2,2),('c',3,3)", + icebergTable)) + .await(); + + String icebergOptions = + "metadata.iceberg.storage=hive-catalog, metadata.iceberg.uri=thrift://localhost:" + + PORT; + + Map catalogConf = new HashMap<>(); + catalogConf.put("warehouse", paiTempDir.toString()); + + MigrateTableAction migrateTableAction = + new MigrateTableAction( + "iceberg", + "default." + icebergTable, + catalogConf, + "", + 6, + "default.paimon_table", + icebergOptions); + migrateTableAction.run(); + + tEnv.executeSql(paimonCatalogDdl(false)); + tEnv.executeSql("USE CATALOG my_paimon"); + Assertions.assertThatList( + Arrays.asList(Row.of("a", 1, 1), Row.of("b", 2, 2), Row.of("c", 3, 3))) + .containsExactlyInAnyOrderElementsOf( + ImmutableList.copyOf( + tEnv.executeSql( + "SELECT * FROM `my_paimon`.`default`.`paimon_table`") + .collect())); + } + + private String icebergCatalogDdl(boolean isHive) { + return isHive + ? String.format( + "CREATE CATALOG my_iceberg WITH " + + "( 'type' = 'iceberg', 'catalog-type' = 'hive', 'uri' = 'thrift://localhost:%s', " + + "'warehouse' = '%s', 'cache-enabled' = 'false')", + PORT, iceTempDir) + : String.format( + "CREATE CATALOG my_iceberg WITH " + + "( 'type' = 'iceberg', 'catalog-type' = 'hadoop'," + + "'warehouse' = '%s', 'cache-enabled' = 'false' )", + iceTempDir); + } + + private String paimonCatalogDdl(boolean isHive) { + return isHive + ? String.format( + "CREATE CATALOG my_paimon WITH " + + "( 'type' = 'paimon', 'metastore' = 'hive', 'uri' = 'thrift://localhost:%s', " + + "'warehouse' = '%s', 'cache-enabled' = 'false' )", + PORT, iceTempDir) + : String.format( + "CREATE CATALOG my_paimon WITH ('type' = 'paimon', 'warehouse' = '%s')", + paiTempDir); + } + protected static String data(int i) { Random random = new Random(); StringBuilder stringBuilder = new StringBuilder(); diff --git a/paimon-hive/pom.xml b/paimon-hive/pom.xml index c97aceb1b85c..92f32d1336a6 100644 --- a/paimon-hive/pom.xml +++ b/paimon-hive/pom.xml @@ -50,6 +50,7 @@ under the License. 0.9.8 1.12.319 1.19 + 1.19.0 From 9e84b464f914aa847f4fb9cfe771c54015f90b04 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Thu, 9 Jan 2025 18:02:10 +0800 Subject: [PATCH 2/9] [core][hive] delete iceberg physical data using fileIO because iceberg table in hive is external --- .../migrate/IcebergMigrateHiveMetadata.java | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java index 10669f09a0f5..95ae3f8c1be4 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java @@ -52,6 +52,8 @@ public class IcebergMigrateHiveMetadata implements IcebergMigrateMetadata { private String metadataLocation = null; + private IcebergMetadata icebergMetadata; + public IcebergMigrateHiveMetadata( Identifier icebergIdentifier, FileIO fileIO, Options icebergOptions) { this.fileIO = fileIO; @@ -114,7 +116,8 @@ public IcebergMetadata icebergMetadata() { metadataLocation = icebergHiveTable.getParameters().get(ICEBERG_METADATA_LOCATION); LOG.info("iceberg latest metadata location: {}", metadataLocation); - return IcebergMetadata.fromPath(fileIO, new Path(metadataLocation)); + icebergMetadata = IcebergMetadata.fromPath(fileIO, new Path(metadataLocation)); + return icebergMetadata; } catch (Exception e) { throw new RuntimeException( String.format("Failed to read Iceberg metadata from path %s", metadataLocation), @@ -140,10 +143,16 @@ public void deleteOriginTable() { true); return null; }); + + // iceberg table in hive is external table, client.dropTable only deletes the metadata + // of iceberg table in hive, so we manually delete the data files + Path icebergTablePath = new Path(icebergMetadata.location()); + + if (fileIO.exists(icebergTablePath) && fileIO.isDir(icebergTablePath)) { + fileIO.deleteDirectoryQuietly(icebergTablePath); + } } catch (Exception e) { - LOG.warn( - "exception occurred when deleting origin table, exception message:{}", - e.getMessage()); + LOG.warn("exception occurred when deleting origin table", e); } } From 09df7c5f2b439240a34a5f3bd893197e120fc352 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Thu, 9 Jan 2025 18:19:20 +0800 Subject: [PATCH 3/9] [procedure] remove 'target_table', delete source table and rename paimon table to source table if migrating success [core] alter access permisssion of getDataTypeFromType() --- .../procedure/MigrateTableProcedure.java | 12 +---- .../flink/action/MigrateTableAction.java | 13 ++--- .../action/MigrateTableActionFactory.java | 2 - .../procedure/MigrateTableProcedure.java | 8 --- .../MigrateTableProcedureITCase.java | 52 +++++++++++-------- 5 files changed, 36 insertions(+), 51 deletions(-) diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java index 8f2a5401fc8c..92a970e5b67d 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java @@ -67,16 +67,8 @@ public String[] call( String properties, Integer parallelism) throws Exception { - String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX; - return call( - procedureContext, - connector, - sourceTablePath, - properties, - parallelism, - targetPaimonTablePath, - ""); + return call(procedureContext, connector, sourceTablePath, properties, parallelism, ""); } public String[] call( @@ -85,9 +77,9 @@ public String[] call( String sourceTablePath, String properties, Integer parallelism, - String targetTablePath, String icebergProperties) throws Exception { + String targetTablePath = sourceTablePath + PAIMON_SUFFIX; Identifier sourceTableId = Identifier.fromString(sourceTablePath); Identifier targetTableId = Identifier.fromString(targetTablePath); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java index 1fe9441812d8..62412d87af82 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java @@ -28,11 +28,10 @@ public class MigrateTableAction extends ActionBase { private final String connector; - private final String hiveTableFullName; + private final String sourceTableFullName; private final String tableProperties; private final Integer parallelism; - private final String paimonTableFullName; private final String icebergProperties; public MigrateTableAction( @@ -43,10 +42,9 @@ public MigrateTableAction( Integer parallelism) { super(catalogConfig); this.connector = connector; - this.hiveTableFullName = hiveTableFullName; + this.sourceTableFullName = hiveTableFullName; this.tableProperties = tableProperties; this.parallelism = parallelism; - this.paimonTableFullName = null; this.icebergProperties = null; } @@ -56,14 +54,12 @@ public MigrateTableAction( Map catalogConfig, String tableProperties, Integer parallelism, - String paimonTableFullName, String icebergProperties) { super(catalogConfig); this.connector = connector; - this.hiveTableFullName = hiveTableFullName; + this.sourceTableFullName = hiveTableFullName; this.tableProperties = tableProperties; this.parallelism = parallelism; - this.paimonTableFullName = paimonTableFullName; this.icebergProperties = icebergProperties; } @@ -74,10 +70,9 @@ public void run() throws Exception { migrateTableProcedure.call( new DefaultProcedureContext(env), connector, - hiveTableFullName, + sourceTableFullName, tableProperties, parallelism, - paimonTableFullName, icebergProperties); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java index 3af99d14c4b0..c36ff09232e7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java @@ -47,7 +47,6 @@ public Optional create(MultipleParameterToolAdapter params) { Integer parallelism = params.get(PARALLELISM) == null ? null : Integer.parseInt(params.get(PARALLELISM)); - String targetPaimonTable = params.get(TARGET_TYPE); String icebergOptions = params.get(ICEBERG_OPTIONS); MigrateTableAction migrateTableAction = @@ -57,7 +56,6 @@ public Optional create(MultipleParameterToolAdapter params) { catalogConfig, tableConf, parallelism, - targetPaimonTable, icebergOptions); return Optional.of(migrateTableAction); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java index bba99d844ae8..46f553e0cd26 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java @@ -51,10 +51,6 @@ public String identifier() { name = "parallelism", type = @DataTypeHint("Integer"), isOptional = true), - @ArgumentHint( - name = "target_table", - type = @DataTypeHint("STRING"), - isOptional = true), @ArgumentHint( name = "iceberg_options", type = @DataTypeHint("STRING"), @@ -66,16 +62,12 @@ public String[] call( String sourceTablePath, String properties, Integer parallelism, - String targetTablePath, String icebergProperties) throws Exception { properties = notnull(properties); icebergProperties = notnull(icebergProperties); String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX; - if (targetTablePath != null) { - targetPaimonTablePath = targetTablePath; - } Identifier sourceTableId = Identifier.fromString(sourceTablePath); Identifier targetTableId = Identifier.fromString(targetPaimonTablePath); diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java index 7227854e6c94..581a8169e8f7 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java @@ -79,13 +79,16 @@ private static Stream testArguments() { } private static Stream testIcebergArguments() { + // isPartitioned, icebergIsHive, paimonIsHive, isNamedArgument return Stream.of( - Arguments.of(true, false, true), - Arguments.of(false, false, true), - Arguments.of(true, true, true), - Arguments.of(false, true, true), - Arguments.of(true, true, false), - Arguments.of(false, true, false)); + Arguments.of(true, false, false, true), + Arguments.of(false, false, false, true), + Arguments.of(true, true, false, true), + Arguments.of(false, true, false, true), + Arguments.of(true, true, false, false), + Arguments.of(false, true, false, false), + Arguments.of(true, false, true, true), + Arguments.of(true, true, true, true)); } @ParameterizedTest @@ -224,13 +227,17 @@ public void testMigrateAction(String format) throws Exception { @ParameterizedTest @MethodSource("testIcebergArguments") public void testMigrateIcebergTableProcedure( - boolean isPartitioned, boolean isHive, boolean isNamedArgument) throws Exception { + boolean isPartitioned, + boolean icebergIsHive, + boolean paimonIsHive, + boolean isNamedArgument) + throws Exception { TableEnvironment tEnv = TableEnvironmentImpl.create( EnvironmentSettings.newInstance().inBatchMode().build()); // create iceberg catalog, database, table, and insert some data to iceberg table - tEnv.executeSql(icebergCatalogDdl(isHive)); + tEnv.executeSql(icebergCatalogDdl(icebergIsHive)); String icebergTable = "iceberg_" + UUID.randomUUID().toString().replace("-", "_"); tEnv.executeSql("USE CATALOG my_iceberg"); @@ -251,11 +258,11 @@ public void testMigrateIcebergTableProcedure( icebergTable)) .await(); - tEnv.executeSql(paimonCatalogDdl(false)); + tEnv.executeSql(paimonCatalogDdl(paimonIsHive)); tEnv.executeSql("USE CATALOG my_paimon"); String icebergOptions = - isHive + icebergIsHive ? "metadata.iceberg.storage=hive-catalog, metadata.iceberg.uri=thrift://localhost:" + PORT : "metadata.iceberg.storage=hadoop-catalog,iceberg_warehouse=" + iceTempDir; @@ -263,14 +270,13 @@ public void testMigrateIcebergTableProcedure( tEnv.executeSql( String.format( "CALL sys.migrate_table(connector => 'iceberg', source_table => 'default.%s', " - + "target_table => 'default.paimon_table', " + "iceberg_options => '%s')", icebergTable, icebergOptions)) .await(); } else { tEnv.executeSql( String.format( - "CALL sys.migrate_table('iceberg', 'default.%s','',1,'default.paimon_table','%s')", + "CALL sys.migrate_table('iceberg', 'default.%s','',1,'%s')", icebergTable, icebergOptions)) .await(); } @@ -279,7 +285,10 @@ public void testMigrateIcebergTableProcedure( Arrays.asList(Row.of("a", 1, 1), Row.of("b", 2, 2), Row.of("c", 3, 3))) .containsExactlyInAnyOrderElementsOf( ImmutableList.copyOf( - tEnv.executeSql("SELECT * FROM `default`.`paimon_table`") + tEnv.executeSql( + String.format( + "SELECT * FROM `default`.`%s`", + icebergTable)) .collect())); } @@ -318,26 +327,25 @@ public void testMigrateIcebergTableAction(boolean isPartitioned) throws Exceptio Map catalogConf = new HashMap<>(); catalogConf.put("warehouse", paiTempDir.toString()); + catalogConf.put("metastore", "hive"); + catalogConf.put("uri", "thrift://localhost:" + PORT); + catalogConf.put("cache-enabled", "false"); MigrateTableAction migrateTableAction = new MigrateTableAction( - "iceberg", - "default." + icebergTable, - catalogConf, - "", - 6, - "default.paimon_table", - icebergOptions); + "iceberg", "default." + icebergTable, catalogConf, "", 6, icebergOptions); migrateTableAction.run(); - tEnv.executeSql(paimonCatalogDdl(false)); + tEnv.executeSql(paimonCatalogDdl(true)); tEnv.executeSql("USE CATALOG my_paimon"); Assertions.assertThatList( Arrays.asList(Row.of("a", 1, 1), Row.of("b", 2, 2), Row.of("c", 3, 3))) .containsExactlyInAnyOrderElementsOf( ImmutableList.copyOf( tEnv.executeSql( - "SELECT * FROM `my_paimon`.`default`.`paimon_table`") + String.format( + "SELECT * FROM `my_paimon`.`default`.`%s`", + icebergTable)) .collect())); } From 1e7937e30ce1dd93a658fe7463096e8849e053c8 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Fri, 10 Jan 2025 11:32:28 +0800 Subject: [PATCH 4/9] [pom][hive] remove useless dependency --- paimon-hive/paimon-hive-connector-2.3/pom.xml | 6 ------ paimon-hive/paimon-hive-connector-3.1/pom.xml | 6 ------ 2 files changed, 12 deletions(-) diff --git a/paimon-hive/paimon-hive-connector-2.3/pom.xml b/paimon-hive/paimon-hive-connector-2.3/pom.xml index 2e235c832d86..c0660fc573ee 100644 --- a/paimon-hive/paimon-hive-connector-2.3/pom.xml +++ b/paimon-hive/paimon-hive-connector-2.3/pom.xml @@ -569,12 +569,6 @@ under the License. ${iceberg.version} test - - org.apache.flink - flink-metrics-dropwizard - ${iceberg.flink.dropwizard.version} - test - diff --git a/paimon-hive/paimon-hive-connector-3.1/pom.xml b/paimon-hive/paimon-hive-connector-3.1/pom.xml index 2ae5b10e6a1c..ef700d8c0bcd 100644 --- a/paimon-hive/paimon-hive-connector-3.1/pom.xml +++ b/paimon-hive/paimon-hive-connector-3.1/pom.xml @@ -599,12 +599,6 @@ under the License. ${iceberg.version} test - - org.apache.flink - flink-metrics-dropwizard - ${iceberg.flink.dropwizard.version} - test - From 138021592edefe974d0cbd54159b00b1df2c0ef5 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Fri, 10 Jan 2025 12:00:20 +0800 Subject: [PATCH 5/9] [procedure] small fix --- .../apache/paimon/flink/action/MigrateTableAction.java | 8 ++++---- .../paimon/flink/action/MigrateTableActionFactory.java | 1 - 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java index 62412d87af82..098193a22dc2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java @@ -36,13 +36,13 @@ public class MigrateTableAction extends ActionBase { public MigrateTableAction( String connector, - String hiveTableFullName, + String sourceTableFullName, Map catalogConfig, String tableProperties, Integer parallelism) { super(catalogConfig); this.connector = connector; - this.sourceTableFullName = hiveTableFullName; + this.sourceTableFullName = sourceTableFullName; this.tableProperties = tableProperties; this.parallelism = parallelism; this.icebergProperties = null; @@ -50,14 +50,14 @@ public MigrateTableAction( public MigrateTableAction( String connector, - String hiveTableFullName, + String sourceTableFullName, Map catalogConfig, String tableProperties, Integer parallelism, String icebergProperties) { super(catalogConfig); this.connector = connector; - this.sourceTableFullName = hiveTableFullName; + this.sourceTableFullName = sourceTableFullName; this.tableProperties = tableProperties; this.parallelism = parallelism; this.icebergProperties = icebergProperties; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java index c36ff09232e7..3834145fe35f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java @@ -30,7 +30,6 @@ public class MigrateTableActionFactory implements ActionFactory { private static final String OPTIONS = "options"; private static final String PARALLELISM = "parallelism"; - private static final String TARGET_TYPE = "target_type"; private static final String ICEBERG_OPTIONS = "iceberg_options"; @Override From 6d9a905d0c823525483083523245fbdae3018be4 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Mon, 20 Jan 2025 15:10:45 +0800 Subject: [PATCH 6/9] [procedure] use MigrateIcebergTableProcedure for migration # Conflicts: # paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory # Conflicts: # paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory --- .../MigrateIcebergTableProcedure.java | 92 +++++++ .../procedure/MigrateTableProcedure.java | 15 +- .../action/MigrateIcebergTableAction.java | 61 +++++ .../MigrateIcebergTableActionFactory.java | 70 ++++++ .../flink/action/MigrateTableAction.java | 29 +-- .../action/MigrateTableActionFactory.java | 14 +- .../MigrateIcebergTableProcedure.java | 90 +++++++ .../procedure/MigrateTableProcedure.java | 11 +- .../flink/utils/TableMigrationUtils.java | 54 ++-- .../org.apache.paimon.factories.Factory | 3 +- .../MigrateIcebergTableProcedureITCase.java | 230 ++++++++++++++++++ .../MigrateTableProcedureITCase.java | 172 ------------- 12 files changed, 576 insertions(+), 265 deletions(-) create mode 100644 paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableAction.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableActionFactory.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java create mode 100644 paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java new file mode 100644 index 000000000000..0402d3e8961b --- /dev/null +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.utils.TableMigrationUtils; +import org.apache.paimon.migrate.Migrator; +import org.apache.paimon.utils.ParameterUtils; + +import org.apache.flink.table.procedure.ProcedureContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Migrate procedure to migrate iceberg table to paimon table. */ +public class MigrateIcebergTableProcedure extends ProcedureBase { + + private static final Logger LOG = LoggerFactory.getLogger(MigrateIcebergTableProcedure.class); + + private static final String PAIMON_SUFFIX = "_paimon_"; + + @Override + public String identifier() { + return "migrate_iceberg_table"; + } + + public String[] call( + ProcedureContext procedureContext, String sourceTablePath, String icebergProperties) + throws Exception { + + return call(procedureContext, sourceTablePath, icebergProperties, ""); + } + + public String[] call( + ProcedureContext procedureContext, + String sourceTablePath, + String icebergProperties, + String properties) + throws Exception { + + return call( + procedureContext, + sourceTablePath, + icebergProperties, + properties, + Runtime.getRuntime().availableProcessors()); + } + + public String[] call( + ProcedureContext procedureContext, + String sourceTablePath, + String icebergProperties, + String properties, + Integer parallelism) + throws Exception { + String targetTablePath = sourceTablePath + PAIMON_SUFFIX; + + Identifier sourceTableId = Identifier.fromString(sourceTablePath); + Identifier targetTableId = Identifier.fromString(targetTablePath); + + Migrator migrator = + TableMigrationUtils.getIcebergImporter( + catalog, + sourceTableId.getDatabaseName(), + sourceTableId.getObjectName(), + targetTableId.getDatabaseName(), + targetTableId.getObjectName(), + parallelism, + ParameterUtils.parseCommaSeparatedKeyValues(properties), + ParameterUtils.parseCommaSeparatedKeyValues(icebergProperties)); + LOG.info("create migrator success."); + migrator.executeMigrate(); + + migrator.renameTable(false); + return new String[] {"Success"}; + } +} diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java index 92a970e5b67d..8778b9d5e187 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java @@ -67,18 +67,6 @@ public String[] call( String properties, Integer parallelism) throws Exception { - - return call(procedureContext, connector, sourceTablePath, properties, parallelism, ""); - } - - public String[] call( - ProcedureContext procedureContext, - String connector, - String sourceTablePath, - String properties, - Integer parallelism, - String icebergProperties) - throws Exception { String targetTablePath = sourceTablePath + PAIMON_SUFFIX; Identifier sourceTableId = Identifier.fromString(sourceTablePath); @@ -93,8 +81,7 @@ public String[] call( targetTableId.getDatabaseName(), targetTableId.getObjectName(), parallelism, - ParameterUtils.parseCommaSeparatedKeyValues(properties), - ParameterUtils.parseCommaSeparatedKeyValues(icebergProperties)); + ParameterUtils.parseCommaSeparatedKeyValues(properties)); LOG.info("create migrator success."); migrator.executeMigrate(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableAction.java new file mode 100644 index 000000000000..1b9fcb46a9e9 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableAction.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.paimon.flink.procedure.MigrateIcebergTableProcedure; + +import org.apache.flink.table.procedure.DefaultProcedureContext; + +import java.util.Map; + +/** Migrate from iceberg table to paimon table. */ +public class MigrateIcebergTableAction extends ActionBase { + + private final String sourceTableFullName; + private final String tableProperties; + private final Integer parallelism; + + private final String icebergProperties; + + public MigrateIcebergTableAction( + String sourceTableFullName, + Map catalogConfig, + String icebergProperties, + String tableProperties, + Integer parallelism) { + super(catalogConfig); + this.sourceTableFullName = sourceTableFullName; + this.tableProperties = tableProperties; + this.parallelism = parallelism; + this.icebergProperties = icebergProperties; + } + + @Override + public void run() throws Exception { + MigrateIcebergTableProcedure migrateIcebergTableProcedure = + new MigrateIcebergTableProcedure(); + migrateIcebergTableProcedure.withCatalog(catalog); + migrateIcebergTableProcedure.call( + new DefaultProcedureContext(env), + sourceTableFullName, + icebergProperties, + tableProperties, + parallelism); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableActionFactory.java new file mode 100644 index 000000000000..c85559d66b41 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableActionFactory.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import java.util.Map; +import java.util.Optional; + +/** Action Factory for {@link MigrateIcebergTableAction}. */ +public class MigrateIcebergTableActionFactory implements ActionFactory { + + public static final String IDENTIFIER = "migrate_iceberg_table"; + + private static final String OPTIONS = "options"; + private static final String PARALLELISM = "parallelism"; + + private static final String ICEBERG_OPTIONS = "iceberg_options"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public Optional create(MultipleParameterToolAdapter params) { + + String sourceTable = params.get(TABLE); + Map catalogConfig = catalogConfigMap(params); + String tableConf = params.get(OPTIONS); + Integer parallelism = + params.get(PARALLELISM) == null ? null : Integer.parseInt(params.get(PARALLELISM)); + + String icebergOptions = params.get(ICEBERG_OPTIONS); + + MigrateIcebergTableAction migrateIcebergTableAction = + new MigrateIcebergTableAction( + sourceTable, catalogConfig, icebergOptions, tableConf, parallelism); + return Optional.of(migrateIcebergTableAction); + } + + @Override + public void printHelp() { + System.out.println( + "Action \"migrate_iceberg_table\" runs a migrating job from iceberg to paimon."); + System.out.println(); + + System.out.println("Syntax:"); + System.out.println( + " migrate_iceberg_table" + + "--table " + + "--iceberg_options =[,=,...]" + + "[--catalog_conf ==,=,...]"); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java index 098193a22dc2..b12cb5f862fe 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java @@ -28,39 +28,21 @@ public class MigrateTableAction extends ActionBase { private final String connector; - private final String sourceTableFullName; + private final String hiveTableFullName; private final String tableProperties; private final Integer parallelism; - private final String icebergProperties; - public MigrateTableAction( String connector, - String sourceTableFullName, + String hiveTableFullName, Map catalogConfig, String tableProperties, Integer parallelism) { super(catalogConfig); this.connector = connector; - this.sourceTableFullName = sourceTableFullName; - this.tableProperties = tableProperties; - this.parallelism = parallelism; - this.icebergProperties = null; - } - - public MigrateTableAction( - String connector, - String sourceTableFullName, - Map catalogConfig, - String tableProperties, - Integer parallelism, - String icebergProperties) { - super(catalogConfig); - this.connector = connector; - this.sourceTableFullName = sourceTableFullName; + this.hiveTableFullName = hiveTableFullName; this.tableProperties = tableProperties; this.parallelism = parallelism; - this.icebergProperties = icebergProperties; } @Override @@ -70,9 +52,8 @@ public void run() throws Exception { migrateTableProcedure.call( new DefaultProcedureContext(env), connector, - sourceTableFullName, + hiveTableFullName, tableProperties, - parallelism, - icebergProperties); + parallelism); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java index 3834145fe35f..b2c96795a439 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java @@ -30,8 +30,6 @@ public class MigrateTableActionFactory implements ActionFactory { private static final String OPTIONS = "options"; private static final String PARALLELISM = "parallelism"; - private static final String ICEBERG_OPTIONS = "iceberg_options"; - @Override public String identifier() { return IDENTIFIER; @@ -43,19 +41,11 @@ public Optional create(MultipleParameterToolAdapter params) { String sourceHiveTable = params.get(TABLE); Map catalogConfig = catalogConfigMap(params); String tableConf = params.get(OPTIONS); - Integer parallelism = - params.get(PARALLELISM) == null ? null : Integer.parseInt(params.get(PARALLELISM)); - - String icebergOptions = params.get(ICEBERG_OPTIONS); + Integer parallelism = Integer.parseInt(params.get(PARALLELISM)); MigrateTableAction migrateTableAction = new MigrateTableAction( - connector, - sourceHiveTable, - catalogConfig, - tableConf, - parallelism, - icebergOptions); + connector, sourceHiveTable, catalogConfig, tableConf, parallelism); return Optional.of(migrateTableAction); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java new file mode 100644 index 000000000000..f43d29ed4f17 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.utils.TableMigrationUtils; +import org.apache.paimon.migrate.Migrator; +import org.apache.paimon.utils.ParameterUtils; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.procedure.ProcedureContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Migrate procedure to migrate iceberg table to paimon table. */ +public class MigrateIcebergTableProcedure extends ProcedureBase { + private static final Logger LOG = LoggerFactory.getLogger(MigrateIcebergTableProcedure.class); + + private static final String PAIMON_SUFFIX = "_paimon_"; + + @Override + public String identifier() { + return "migrate_iceberg_table"; + } + + @ProcedureHint( + argument = { + @ArgumentHint(name = "source_table", type = @DataTypeHint("STRING")), + @ArgumentHint( + name = "iceberg_options", + type = @DataTypeHint("STRING"), + isOptional = true), + @ArgumentHint(name = "options", type = @DataTypeHint("STRING"), isOptional = true), + @ArgumentHint( + name = "parallelism", + type = @DataTypeHint("Integer"), + isOptional = true) + }) + public String[] call( + ProcedureContext procedureContext, + String sourceTablePath, + String icebergProperties, + String properties, + Integer parallelism) + throws Exception { + properties = notnull(properties); + icebergProperties = notnull(icebergProperties); + + String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX; + + Identifier sourceTableId = Identifier.fromString(sourceTablePath); + Identifier targetTableId = Identifier.fromString(targetPaimonTablePath); + + Integer p = parallelism == null ? Runtime.getRuntime().availableProcessors() : parallelism; + + Migrator migrator = + TableMigrationUtils.getIcebergImporter( + catalog, + sourceTableId.getDatabaseName(), + sourceTableId.getObjectName(), + targetTableId.getDatabaseName(), + targetTableId.getObjectName(), + p, + ParameterUtils.parseCommaSeparatedKeyValues(properties), + ParameterUtils.parseCommaSeparatedKeyValues(icebergProperties)); + LOG.info("create migrator success."); + migrator.executeMigrate(); + + migrator.renameTable(false); + return new String[] {"Success"}; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java index 46f553e0cd26..32a2a16dc51d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java @@ -50,10 +50,6 @@ public String identifier() { @ArgumentHint( name = "parallelism", type = @DataTypeHint("Integer"), - isOptional = true), - @ArgumentHint( - name = "iceberg_options", - type = @DataTypeHint("STRING"), isOptional = true) }) public String[] call( @@ -61,11 +57,9 @@ public String[] call( String connector, String sourceTablePath, String properties, - Integer parallelism, - String icebergProperties) + Integer parallelism) throws Exception { properties = notnull(properties); - icebergProperties = notnull(icebergProperties); String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX; @@ -83,8 +77,7 @@ public String[] call( targetTableId.getDatabaseName(), targetTableId.getObjectName(), p, - ParameterUtils.parseCommaSeparatedKeyValues(properties), - ParameterUtils.parseCommaSeparatedKeyValues(icebergProperties)); + ParameterUtils.parseCommaSeparatedKeyValues(properties)); LOG.info("create migrator success."); migrator.executeMigrate(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java index 68257417d585..4e7268c6f14e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java @@ -26,7 +26,6 @@ import org.apache.paimon.migrate.Migrator; import org.apache.paimon.options.Options; -import java.util.Collections; import java.util.List; import java.util.Map; @@ -42,28 +41,6 @@ public static Migrator getImporter( String targetTableName, Integer parallelism, Map options) { - return getImporter( - connector, - catalog, - sourceDatabase, - sourceTableName, - targetDatabase, - targetTableName, - parallelism, - options, - Collections.emptyMap()); - } - - public static Migrator getImporter( - String connector, - Catalog catalog, - String sourceDatabase, - String sourceTableName, - String targetDatabase, - String targetTableName, - Integer parallelism, - Map options, - Map icebergOptions) { switch (connector) { case "hive": if (catalog instanceof CachingCatalog) { @@ -80,21 +57,32 @@ public static Migrator getImporter( targetTableName, parallelism, options); - case "iceberg": - Options icebergConf = new Options(icebergOptions); - return new IcebergMigrator( - catalog, - targetDatabase, - targetTableName, - sourceDatabase, - sourceTableName, - icebergConf, - parallelism); default: throw new UnsupportedOperationException("Don't support connector " + connector); } } + public static Migrator getIcebergImporter( + Catalog catalog, + String sourceDatabase, + String sourceTableName, + String targetDatabase, + String targetTableName, + Integer parallelism, + Map options, + Map icebergOptions) { + + Options icebergConf = new Options(icebergOptions); + return new IcebergMigrator( + catalog, + targetDatabase, + targetTableName, + sourceDatabase, + sourceTableName, + icebergConf, + parallelism); + } + public static List getImporters( String connector, Catalog catalog, diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 6f6becf85fc7..efaa25627d69 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -85,4 +85,5 @@ org.apache.paimon.flink.procedure.CloneProcedure org.apache.paimon.flink.procedure.CompactManifestProcedure org.apache.paimon.flink.procedure.RefreshObjectTableProcedure org.apache.paimon.flink.procedure.RemoveUnexistingFilesProcedure -org.apache.paimon.flink.procedure.ClearConsumersProcedure \ No newline at end of file +org.apache.paimon.flink.procedure.ClearConsumersProcedure +org.apache.paimon.flink.procedure.MigrateIcebergTableProcedure diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java new file mode 100644 index 000000000000..b34da39d45c8 --- /dev/null +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.hive.procedure; + +import org.apache.paimon.flink.action.ActionITCaseBase; +import org.apache.paimon.flink.action.MigrateIcebergTableAction; +import org.apache.paimon.flink.procedure.MigrateIcebergTableProcedure; +import org.apache.paimon.hive.TestHiveMetastore; + +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; + +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.internal.TableEnvironmentImpl; +import org.apache.flink.types.Row; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Stream; + +/** Tests for {@link MigrateIcebergTableProcedure}. */ +public class MigrateIcebergTableProcedureITCase extends ActionITCaseBase { + + private static final TestHiveMetastore TEST_HIVE_METASTORE = new TestHiveMetastore(); + + private static final int PORT = 9084; + + @TempDir java.nio.file.Path iceTempDir; + @TempDir java.nio.file.Path paiTempDir; + + @BeforeEach + public void beforeEach() { + TEST_HIVE_METASTORE.start(PORT); + } + + @AfterEach + public void afterEach() throws Exception { + TEST_HIVE_METASTORE.stop(); + } + + private static Stream testIcebergArguments() { + // isPartitioned, icebergIsHive, paimonIsHive, isNamedArgument + return Stream.of( + Arguments.of(true, false, false, true), + Arguments.of(false, false, false, true), + Arguments.of(true, true, false, true), + Arguments.of(false, true, false, true), + Arguments.of(true, true, false, false), + Arguments.of(false, true, false, false), + Arguments.of(true, false, true, true), + Arguments.of(true, true, true, true)); + } + + @ParameterizedTest + @MethodSource("testIcebergArguments") + public void testMigrateIcebergTableProcedure( + boolean isPartitioned, + boolean icebergIsHive, + boolean paimonIsHive, + boolean isNamedArgument) + throws Exception { + TableEnvironment tEnv = + TableEnvironmentImpl.create( + EnvironmentSettings.newInstance().inBatchMode().build()); + + // create iceberg catalog, database, table, and insert some data to iceberg table + tEnv.executeSql(icebergCatalogDdl(icebergIsHive)); + + String icebergTable = "iceberg_" + UUID.randomUUID().toString().replace("-", "_"); + tEnv.executeSql("USE CATALOG my_iceberg"); + if (isPartitioned) { + tEnv.executeSql( + String.format( + "CREATE TABLE `default`.`%s` (id string, id2 int, id3 int) PARTITIONED BY (id3)", + icebergTable)); + } else { + tEnv.executeSql( + String.format( + "CREATE TABLE `default`.`%s` (id string, id2 int, id3 int) WITH ('format-version'='2')", + icebergTable)); + } + tEnv.executeSql( + String.format( + "INSERT INTO `default`.`%s` VALUES ('a',1,1),('b',2,2),('c',3,3)", + icebergTable)) + .await(); + + tEnv.executeSql(paimonCatalogDdl(paimonIsHive)); + tEnv.executeSql("USE CATALOG my_paimon"); + + String icebergOptions = + icebergIsHive + ? "metadata.iceberg.storage=hive-catalog, metadata.iceberg.uri=thrift://localhost:" + + PORT + : "metadata.iceberg.storage=hadoop-catalog,iceberg_warehouse=" + iceTempDir; + if (isNamedArgument) { + tEnv.executeSql( + String.format( + "CALL sys.migrate_iceberg_table(source_table => 'default.%s', " + + "iceberg_options => '%s')", + icebergTable, icebergOptions)) + .await(); + } else { + tEnv.executeSql( + String.format( + "CALL sys.migrate_iceberg_table('default.%s','%s')", + icebergTable, icebergOptions)) + .await(); + } + + Assertions.assertThatList( + Arrays.asList(Row.of("a", 1, 1), Row.of("b", 2, 2), Row.of("c", 3, 3))) + .containsExactlyInAnyOrderElementsOf( + ImmutableList.copyOf( + tEnv.executeSql( + String.format( + "SELECT * FROM `default`.`%s`", + icebergTable)) + .collect())); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testMigrateIcebergTableAction(boolean isPartitioned) throws Exception { + TableEnvironment tEnv = + TableEnvironmentImpl.create( + EnvironmentSettings.newInstance().inBatchMode().build()); + + // create iceberg catalog, database, table, and insert some data to iceberg table + tEnv.executeSql(icebergCatalogDdl(true)); + + String icebergTable = "iceberg_" + UUID.randomUUID().toString().replace("-", "_"); + tEnv.executeSql("USE CATALOG my_iceberg"); + if (isPartitioned) { + tEnv.executeSql( + String.format( + "CREATE TABLE `default`.`%s` (id string, id2 int, id3 int) PARTITIONED BY (id3)", + icebergTable)); + } else { + tEnv.executeSql( + String.format( + "CREATE TABLE `default`.`%s` (id string, id2 int, id3 int) WITH ('format-version'='2')", + icebergTable)); + } + tEnv.executeSql( + String.format( + "INSERT INTO `default`.`%s` VALUES ('a',1,1),('b',2,2),('c',3,3)", + icebergTable)) + .await(); + + String icebergOptions = + "metadata.iceberg.storage=hive-catalog, metadata.iceberg.uri=thrift://localhost:" + + PORT; + + Map catalogConf = new HashMap<>(); + catalogConf.put("warehouse", paiTempDir.toString()); + catalogConf.put("metastore", "hive"); + catalogConf.put("uri", "thrift://localhost:" + PORT); + catalogConf.put("cache-enabled", "false"); + + MigrateIcebergTableAction migrateIcebergTableAction = + new MigrateIcebergTableAction( + "default." + icebergTable, catalogConf, icebergOptions, "", 6); + migrateIcebergTableAction.run(); + + tEnv.executeSql(paimonCatalogDdl(true)); + tEnv.executeSql("USE CATALOG my_paimon"); + Assertions.assertThatList( + Arrays.asList(Row.of("a", 1, 1), Row.of("b", 2, 2), Row.of("c", 3, 3))) + .containsExactlyInAnyOrderElementsOf( + ImmutableList.copyOf( + tEnv.executeSql( + String.format( + "SELECT * FROM `my_paimon`.`default`.`%s`", + icebergTable)) + .collect())); + } + + private String icebergCatalogDdl(boolean isHive) { + return isHive + ? String.format( + "CREATE CATALOG my_iceberg WITH " + + "( 'type' = 'iceberg', 'catalog-type' = 'hive', 'uri' = 'thrift://localhost:%s', " + + "'warehouse' = '%s', 'cache-enabled' = 'false')", + PORT, iceTempDir) + : String.format( + "CREATE CATALOG my_iceberg WITH " + + "( 'type' = 'iceberg', 'catalog-type' = 'hadoop'," + + "'warehouse' = '%s', 'cache-enabled' = 'false' )", + iceTempDir); + } + + private String paimonCatalogDdl(boolean isHive) { + return isHive + ? String.format( + "CREATE CATALOG my_paimon WITH " + + "( 'type' = 'paimon', 'metastore' = 'hive', 'uri' = 'thrift://localhost:%s', " + + "'warehouse' = '%s', 'cache-enabled' = 'false' )", + PORT, iceTempDir) + : String.format( + "CREATE CATALOG my_paimon WITH ('type' = 'paimon', 'warehouse' = '%s')", + paiTempDir); + } +} diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java index 581a8169e8f7..8d6ded69dc99 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java @@ -25,27 +25,22 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; -import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.types.Row; import org.apache.hadoop.hive.conf.HiveConf; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; -import java.util.UUID; import java.util.stream.Stream; /** Tests for {@link MigrateFileProcedure}. */ @@ -55,9 +50,6 @@ public class MigrateTableProcedureITCase extends ActionITCaseBase { private static final int PORT = 9084; - @TempDir java.nio.file.Path iceTempDir; - @TempDir java.nio.file.Path paiTempDir; - @BeforeEach public void beforeEach() { TEST_HIVE_METASTORE.start(PORT); @@ -78,19 +70,6 @@ private static Stream testArguments() { Arguments.of("parquet", false)); } - private static Stream testIcebergArguments() { - // isPartitioned, icebergIsHive, paimonIsHive, isNamedArgument - return Stream.of( - Arguments.of(true, false, false, true), - Arguments.of(false, false, false, true), - Arguments.of(true, true, false, true), - Arguments.of(false, true, false, true), - Arguments.of(true, true, false, false), - Arguments.of(false, true, false, false), - Arguments.of(true, false, true, true), - Arguments.of(true, true, true, true)); - } - @ParameterizedTest @MethodSource("testArguments") public void testMigrateProcedure(String format, boolean isNamedArgument) throws Exception { @@ -224,157 +203,6 @@ public void testMigrateAction(String format) throws Exception { Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2); } - @ParameterizedTest - @MethodSource("testIcebergArguments") - public void testMigrateIcebergTableProcedure( - boolean isPartitioned, - boolean icebergIsHive, - boolean paimonIsHive, - boolean isNamedArgument) - throws Exception { - TableEnvironment tEnv = - TableEnvironmentImpl.create( - EnvironmentSettings.newInstance().inBatchMode().build()); - - // create iceberg catalog, database, table, and insert some data to iceberg table - tEnv.executeSql(icebergCatalogDdl(icebergIsHive)); - - String icebergTable = "iceberg_" + UUID.randomUUID().toString().replace("-", "_"); - tEnv.executeSql("USE CATALOG my_iceberg"); - if (isPartitioned) { - tEnv.executeSql( - String.format( - "CREATE TABLE `default`.`%s` (id string, id2 int, id3 int) PARTITIONED BY (id3)", - icebergTable)); - } else { - tEnv.executeSql( - String.format( - "CREATE TABLE `default`.`%s` (id string, id2 int, id3 int) WITH ('format-version'='2')", - icebergTable)); - } - tEnv.executeSql( - String.format( - "INSERT INTO `default`.`%s` VALUES ('a',1,1),('b',2,2),('c',3,3)", - icebergTable)) - .await(); - - tEnv.executeSql(paimonCatalogDdl(paimonIsHive)); - tEnv.executeSql("USE CATALOG my_paimon"); - - String icebergOptions = - icebergIsHive - ? "metadata.iceberg.storage=hive-catalog, metadata.iceberg.uri=thrift://localhost:" - + PORT - : "metadata.iceberg.storage=hadoop-catalog,iceberg_warehouse=" + iceTempDir; - if (isNamedArgument) { - tEnv.executeSql( - String.format( - "CALL sys.migrate_table(connector => 'iceberg', source_table => 'default.%s', " - + "iceberg_options => '%s')", - icebergTable, icebergOptions)) - .await(); - } else { - tEnv.executeSql( - String.format( - "CALL sys.migrate_table('iceberg', 'default.%s','',1,'%s')", - icebergTable, icebergOptions)) - .await(); - } - - Assertions.assertThatList( - Arrays.asList(Row.of("a", 1, 1), Row.of("b", 2, 2), Row.of("c", 3, 3))) - .containsExactlyInAnyOrderElementsOf( - ImmutableList.copyOf( - tEnv.executeSql( - String.format( - "SELECT * FROM `default`.`%s`", - icebergTable)) - .collect())); - } - - @ParameterizedTest - @ValueSource(booleans = {false, true}) - public void testMigrateIcebergTableAction(boolean isPartitioned) throws Exception { - TableEnvironment tEnv = - TableEnvironmentImpl.create( - EnvironmentSettings.newInstance().inBatchMode().build()); - - // create iceberg catalog, database, table, and insert some data to iceberg table - tEnv.executeSql(icebergCatalogDdl(true)); - - String icebergTable = "iceberg_" + UUID.randomUUID().toString().replace("-", "_"); - tEnv.executeSql("USE CATALOG my_iceberg"); - if (isPartitioned) { - tEnv.executeSql( - String.format( - "CREATE TABLE `default`.`%s` (id string, id2 int, id3 int) PARTITIONED BY (id3)", - icebergTable)); - } else { - tEnv.executeSql( - String.format( - "CREATE TABLE `default`.`%s` (id string, id2 int, id3 int) WITH ('format-version'='2')", - icebergTable)); - } - tEnv.executeSql( - String.format( - "INSERT INTO `default`.`%s` VALUES ('a',1,1),('b',2,2),('c',3,3)", - icebergTable)) - .await(); - - String icebergOptions = - "metadata.iceberg.storage=hive-catalog, metadata.iceberg.uri=thrift://localhost:" - + PORT; - - Map catalogConf = new HashMap<>(); - catalogConf.put("warehouse", paiTempDir.toString()); - catalogConf.put("metastore", "hive"); - catalogConf.put("uri", "thrift://localhost:" + PORT); - catalogConf.put("cache-enabled", "false"); - - MigrateTableAction migrateTableAction = - new MigrateTableAction( - "iceberg", "default." + icebergTable, catalogConf, "", 6, icebergOptions); - migrateTableAction.run(); - - tEnv.executeSql(paimonCatalogDdl(true)); - tEnv.executeSql("USE CATALOG my_paimon"); - Assertions.assertThatList( - Arrays.asList(Row.of("a", 1, 1), Row.of("b", 2, 2), Row.of("c", 3, 3))) - .containsExactlyInAnyOrderElementsOf( - ImmutableList.copyOf( - tEnv.executeSql( - String.format( - "SELECT * FROM `my_paimon`.`default`.`%s`", - icebergTable)) - .collect())); - } - - private String icebergCatalogDdl(boolean isHive) { - return isHive - ? String.format( - "CREATE CATALOG my_iceberg WITH " - + "( 'type' = 'iceberg', 'catalog-type' = 'hive', 'uri' = 'thrift://localhost:%s', " - + "'warehouse' = '%s', 'cache-enabled' = 'false')", - PORT, iceTempDir) - : String.format( - "CREATE CATALOG my_iceberg WITH " - + "( 'type' = 'iceberg', 'catalog-type' = 'hadoop'," - + "'warehouse' = '%s', 'cache-enabled' = 'false' )", - iceTempDir); - } - - private String paimonCatalogDdl(boolean isHive) { - return isHive - ? String.format( - "CREATE CATALOG my_paimon WITH " - + "( 'type' = 'paimon', 'metastore' = 'hive', 'uri' = 'thrift://localhost:%s', " - + "'warehouse' = '%s', 'cache-enabled' = 'false' )", - PORT, iceTempDir) - : String.format( - "CREATE CATALOG my_paimon WITH ('type' = 'paimon', 'warehouse' = '%s')", - paiTempDir); - } - protected static String data(int i) { Random random = new Random(); StringBuilder stringBuilder = new StringBuilder(); From 2727667e83cb6f64af20431f661b76eea724c662 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Mon, 20 Jan 2025 16:17:58 +0800 Subject: [PATCH 7/9] [procedure][fix] change the port num for TestHiveMetastore in MigrateIcebergTableProcedureITCase --- .../hive/procedure/MigrateIcebergTableProcedureITCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java index b34da39d45c8..d100131e0935 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java @@ -49,7 +49,7 @@ public class MigrateIcebergTableProcedureITCase extends ActionITCaseBase { private static final TestHiveMetastore TEST_HIVE_METASTORE = new TestHiveMetastore(); - private static final int PORT = 9084; + private static final int PORT = 9087; @TempDir java.nio.file.Path iceTempDir; @TempDir java.nio.file.Path paiTempDir; From 68fee7fbd543b704648442dc4fa54794439edc16 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Mon, 10 Feb 2025 17:23:11 +0800 Subject: [PATCH 8/9] [procedure][test] make testMigrateIcebergTableProcedure a random test --- .../MigrateIcebergTableProcedureITCase.java | 47 +++++++++---------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java index d100131e0935..1875b08eba22 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java @@ -32,21 +32,25 @@ import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.UUID; -import java.util.stream.Stream; +import java.util.concurrent.ThreadLocalRandom; /** Tests for {@link MigrateIcebergTableProcedure}. */ public class MigrateIcebergTableProcedureITCase extends ActionITCaseBase { + private static final Logger LOG = + LoggerFactory.getLogger(MigrateIcebergTableProcedureITCase.class); + private static final TestHiveMetastore TEST_HIVE_METASTORE = new TestHiveMetastore(); private static final int PORT = 9087; @@ -64,31 +68,26 @@ public void afterEach() throws Exception { TEST_HIVE_METASTORE.stop(); } - private static Stream testIcebergArguments() { - // isPartitioned, icebergIsHive, paimonIsHive, isNamedArgument - return Stream.of( - Arguments.of(true, false, false, true), - Arguments.of(false, false, false, true), - Arguments.of(true, true, false, true), - Arguments.of(false, true, false, true), - Arguments.of(true, true, false, false), - Arguments.of(false, true, false, false), - Arguments.of(true, false, true, true), - Arguments.of(true, true, true, true)); - } - - @ParameterizedTest - @MethodSource("testIcebergArguments") - public void testMigrateIcebergTableProcedure( - boolean isPartitioned, - boolean icebergIsHive, - boolean paimonIsHive, - boolean isNamedArgument) - throws Exception { + @Test + public void testMigrateIcebergTableProcedure() throws Exception { TableEnvironment tEnv = TableEnvironmentImpl.create( EnvironmentSettings.newInstance().inBatchMode().build()); + ThreadLocalRandom random = ThreadLocalRandom.current(); + boolean isPartitioned = random.nextBoolean(); + boolean icebergIsHive = random.nextBoolean(); + boolean paimonIsHive = random.nextBoolean(); + boolean isNamedArgument = random.nextBoolean(); + + // Logging the random arguments for debugging + LOG.info( + "isPartitioned:{}, icebergIsHive:{}, paimonIsHive:{}, isNamedArgument:{}", + isPartitioned, + icebergIsHive, + paimonIsHive, + isNamedArgument); + // create iceberg catalog, database, table, and insert some data to iceberg table tEnv.executeSql(icebergCatalogDdl(icebergIsHive)); From 6b84657352d220cd9669c60caea5191d0c777871 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Tue, 11 Feb 2025 14:07:29 +0800 Subject: [PATCH 9/9] [core][hive] remove FileIO argument in constructor --- .../iceberg/migrate/IcebergMigrateHiveMetadata.java | 13 ++++++++----- .../migrate/IcebergMigrateHiveMetadataFactory.java | 6 ++---- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java index 95ae3f8c1be4..3c0d7da024a0 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java @@ -18,6 +18,7 @@ package org.apache.paimon.iceberg.migrate; +import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.client.ClientPool; import org.apache.paimon.fs.FileIO; @@ -45,7 +46,8 @@ public class IcebergMigrateHiveMetadata implements IcebergMigrateMetadata { public static final String ICEBERG_TABLE_TYPE_VALUE = "iceberg"; private static final String ICEBERG_METADATA_LOCATION = "metadata_location"; - private final FileIO fileIO; + private FileIO fileIO; + private final Options icebergOptions; private final Identifier icebergIdentifier; private final ClientPool clients; @@ -54,10 +56,10 @@ public class IcebergMigrateHiveMetadata implements IcebergMigrateMetadata { private IcebergMetadata icebergMetadata; - public IcebergMigrateHiveMetadata( - Identifier icebergIdentifier, FileIO fileIO, Options icebergOptions) { - this.fileIO = fileIO; + public IcebergMigrateHiveMetadata(Identifier icebergIdentifier, Options icebergOptions) { + this.icebergIdentifier = icebergIdentifier; + this.icebergOptions = icebergOptions; String uri = icebergOptions.get(IcebergOptions.URI); String hiveConfDir = icebergOptions.get(IcebergOptions.HIVE_CONF_DIR); @@ -104,7 +106,6 @@ public IcebergMetadata icebergMetadata() { client.getTable( icebergIdentifier.getDatabaseName(), icebergIdentifier.getTableName())); - // TODO:Is this check necessary? // check whether it is an iceberg table String tableType = icebergHiveTable.getParameters().get(TABLE_TYPE_PROP); Preconditions.checkArgument( @@ -116,6 +117,8 @@ public IcebergMetadata icebergMetadata() { metadataLocation = icebergHiveTable.getParameters().get(ICEBERG_METADATA_LOCATION); LOG.info("iceberg latest metadata location: {}", metadataLocation); + fileIO = FileIO.get(new Path(metadataLocation), CatalogContext.create(icebergOptions)); + icebergMetadata = IcebergMetadata.fromPath(fileIO, new Path(metadataLocation)); return icebergMetadata; } catch (Exception e) { diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadataFactory.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadataFactory.java index 184eb3134058..0a539cdec2d2 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadataFactory.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadataFactory.java @@ -19,7 +19,6 @@ package org.apache.paimon.iceberg.migrate; import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.fs.FileIO; import org.apache.paimon.iceberg.IcebergOptions; import org.apache.paimon.options.Options; @@ -31,8 +30,7 @@ public String identifier() { } @Override - public IcebergMigrateHiveMetadata create( - Identifier icebergIdentifier, FileIO fileIO, Options icebergOptions) { - return new IcebergMigrateHiveMetadata(icebergIdentifier, fileIO, icebergOptions); + public IcebergMigrateHiveMetadata create(Identifier icebergIdentifier, Options icebergOptions) { + return new IcebergMigrateHiveMetadata(icebergIdentifier, icebergOptions); } }