From 2f9f2a0d366e120b5e8f8900a6a73ca01bf4b159 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Thu, 21 Nov 2024 15:11:50 +0800 Subject: [PATCH 01/13] [iceberg] Introduce feature and IT cases to migrate table from iceberg to paimon # Conflicts: # paimon-core/src/main/java/org/apache/paimon/iceberg/metadata/IcebergDataField.java --- .../procedure/MigrateTableProcedure.java | 59 ++++--- .../flink/action/MigrateTableAction.java | 26 ++- .../action/MigrateTableActionFactory.java | 17 +- .../procedure/MigrateTableProcedure.java | 29 +++- .../flink/utils/TableMigrationUtils.java | 35 ++++ .../migrate/IcebergMigrateHiveMetadata.java | 156 +++++++++++++++++ .../IcebergMigrateHiveMetadataFactory.java | 38 ++++ .../org.apache.paimon.factories.Factory | 1 + paimon-hive/paimon-hive-connector-2.3/pom.xml | 6 + paimon-hive/paimon-hive-connector-3.1/pom.xml | 6 + .../paimon-hive-connector-common/pom.xml | 19 ++ .../MigrateTableProcedureITCase.java | 164 ++++++++++++++++++ paimon-hive/pom.xml | 1 + 13 files changed, 524 insertions(+), 33 deletions(-) create mode 100644 paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java create mode 100644 paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadataFactory.java diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java index 196528d31c78..8f2a5401fc8c 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java @@ -20,6 +20,7 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.utils.TableMigrationUtils; +import org.apache.paimon.migrate.Migrator; import org.apache.paimon.utils.ParameterUtils; import org.apache.flink.table.procedure.ProcedureContext; @@ -50,25 +51,13 @@ public String[] call( String sourceTablePath, String properties) throws Exception { - String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX; - - Identifier sourceTableId = Identifier.fromString(sourceTablePath); - Identifier targetTableId = Identifier.fromString(targetPaimonTablePath); - - TableMigrationUtils.getImporter( - connector, - catalog, - sourceTableId.getDatabaseName(), - sourceTableId.getObjectName(), - targetTableId.getDatabaseName(), - targetTableId.getObjectName(), - Runtime.getRuntime().availableProcessors(), - ParameterUtils.parseCommaSeparatedKeyValues(properties)) - .executeMigrate(); - LOG.info("Last step: rename " + targetTableId + " to " + sourceTableId); - catalog.renameTable(targetTableId, sourceTableId, false); - return new String[] {"Success"}; + return call( + procedureContext, + connector, + sourceTablePath, + properties, + Runtime.getRuntime().availableProcessors()); } public String[] call( @@ -80,10 +69,31 @@ public String[] call( throws Exception { String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX; + return call( + procedureContext, + connector, + sourceTablePath, + properties, + parallelism, + targetPaimonTablePath, + ""); + } + + public String[] call( + ProcedureContext procedureContext, + String connector, + String sourceTablePath, + String properties, + Integer parallelism, + String targetTablePath, + String icebergProperties) + throws Exception { + Identifier sourceTableId = Identifier.fromString(sourceTablePath); - Identifier targetTableId = Identifier.fromString(targetPaimonTablePath); + Identifier targetTableId = Identifier.fromString(targetTablePath); - TableMigrationUtils.getImporter( + Migrator migrator = + TableMigrationUtils.getImporter( connector, catalog, sourceTableId.getDatabaseName(), @@ -91,11 +101,12 @@ public String[] call( targetTableId.getDatabaseName(), targetTableId.getObjectName(), parallelism, - ParameterUtils.parseCommaSeparatedKeyValues(properties)) - .executeMigrate(); + ParameterUtils.parseCommaSeparatedKeyValues(properties), + ParameterUtils.parseCommaSeparatedKeyValues(icebergProperties)); + LOG.info("create migrator success."); + migrator.executeMigrate(); - LOG.info("Last step: rename " + targetTableId + " to " + sourceTableId); - catalog.renameTable(targetTableId, sourceTableId, false); + migrator.renameTable(false); return new String[] {"Success"}; } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java index b12cb5f862fe..1fe9441812d8 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java @@ -32,6 +32,9 @@ public class MigrateTableAction extends ActionBase { private final String tableProperties; private final Integer parallelism; + private final String paimonTableFullName; + private final String icebergProperties; + public MigrateTableAction( String connector, String hiveTableFullName, @@ -43,6 +46,25 @@ public MigrateTableAction( this.hiveTableFullName = hiveTableFullName; this.tableProperties = tableProperties; this.parallelism = parallelism; + this.paimonTableFullName = null; + this.icebergProperties = null; + } + + public MigrateTableAction( + String connector, + String hiveTableFullName, + Map catalogConfig, + String tableProperties, + Integer parallelism, + String paimonTableFullName, + String icebergProperties) { + super(catalogConfig); + this.connector = connector; + this.hiveTableFullName = hiveTableFullName; + this.tableProperties = tableProperties; + this.parallelism = parallelism; + this.paimonTableFullName = paimonTableFullName; + this.icebergProperties = icebergProperties; } @Override @@ -54,6 +76,8 @@ public void run() throws Exception { connector, hiveTableFullName, tableProperties, - parallelism); + parallelism, + paimonTableFullName, + icebergProperties); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java index b2c96795a439..3af99d14c4b0 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java @@ -30,6 +30,9 @@ public class MigrateTableActionFactory implements ActionFactory { private static final String OPTIONS = "options"; private static final String PARALLELISM = "parallelism"; + private static final String TARGET_TYPE = "target_type"; + private static final String ICEBERG_OPTIONS = "iceberg_options"; + @Override public String identifier() { return IDENTIFIER; @@ -41,11 +44,21 @@ public Optional create(MultipleParameterToolAdapter params) { String sourceHiveTable = params.get(TABLE); Map catalogConfig = catalogConfigMap(params); String tableConf = params.get(OPTIONS); - Integer parallelism = Integer.parseInt(params.get(PARALLELISM)); + Integer parallelism = + params.get(PARALLELISM) == null ? null : Integer.parseInt(params.get(PARALLELISM)); + + String targetPaimonTable = params.get(TARGET_TYPE); + String icebergOptions = params.get(ICEBERG_OPTIONS); MigrateTableAction migrateTableAction = new MigrateTableAction( - connector, sourceHiveTable, catalogConfig, tableConf, parallelism); + connector, + sourceHiveTable, + catalogConfig, + tableConf, + parallelism, + targetPaimonTable, + icebergOptions); return Optional.of(migrateTableAction); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java index fff05a1a8555..bba99d844ae8 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java @@ -20,6 +20,7 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.utils.TableMigrationUtils; +import org.apache.paimon.migrate.Migrator; import org.apache.paimon.utils.ParameterUtils; import org.apache.flink.table.annotation.ArgumentHint; @@ -49,6 +50,14 @@ public String identifier() { @ArgumentHint( name = "parallelism", type = @DataTypeHint("Integer"), + isOptional = true), + @ArgumentHint( + name = "target_table", + type = @DataTypeHint("STRING"), + isOptional = true), + @ArgumentHint( + name = "iceberg_options", + type = @DataTypeHint("STRING"), isOptional = true) }) public String[] call( @@ -56,18 +65,25 @@ public String[] call( String connector, String sourceTablePath, String properties, - Integer parallelism) + Integer parallelism, + String targetTablePath, + String icebergProperties) throws Exception { properties = notnull(properties); + icebergProperties = notnull(icebergProperties); String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX; + if (targetTablePath != null) { + targetPaimonTablePath = targetTablePath; + } Identifier sourceTableId = Identifier.fromString(sourceTablePath); Identifier targetTableId = Identifier.fromString(targetPaimonTablePath); Integer p = parallelism == null ? Runtime.getRuntime().availableProcessors() : parallelism; - TableMigrationUtils.getImporter( + Migrator migrator = + TableMigrationUtils.getImporter( connector, catalog, sourceTableId.getDatabaseName(), @@ -75,11 +91,12 @@ public String[] call( targetTableId.getDatabaseName(), targetTableId.getObjectName(), p, - ParameterUtils.parseCommaSeparatedKeyValues(properties)) - .executeMigrate(); + ParameterUtils.parseCommaSeparatedKeyValues(properties), + ParameterUtils.parseCommaSeparatedKeyValues(icebergProperties)); + LOG.info("create migrator success."); + migrator.executeMigrate(); - LOG.info("Last step: rename " + targetTableId + " to " + sourceTableId); - catalog.renameTable(targetTableId, sourceTableId, false); + migrator.renameTable(false); return new String[] {"Success"}; } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java index b59c3592a97d..68257417d585 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java @@ -22,8 +22,11 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.hive.HiveCatalog; import org.apache.paimon.hive.migrate.HiveMigrator; +import org.apache.paimon.iceberg.migrate.IcebergMigrator; import org.apache.paimon.migrate.Migrator; +import org.apache.paimon.options.Options; +import java.util.Collections; import java.util.List; import java.util.Map; @@ -39,6 +42,28 @@ public static Migrator getImporter( String targetTableName, Integer parallelism, Map options) { + return getImporter( + connector, + catalog, + sourceDatabase, + sourceTableName, + targetDatabase, + targetTableName, + parallelism, + options, + Collections.emptyMap()); + } + + public static Migrator getImporter( + String connector, + Catalog catalog, + String sourceDatabase, + String sourceTableName, + String targetDatabase, + String targetTableName, + Integer parallelism, + Map options, + Map icebergOptions) { switch (connector) { case "hive": if (catalog instanceof CachingCatalog) { @@ -55,6 +80,16 @@ public static Migrator getImporter( targetTableName, parallelism, options); + case "iceberg": + Options icebergConf = new Options(icebergOptions); + return new IcebergMigrator( + catalog, + targetDatabase, + targetTableName, + sourceDatabase, + sourceTableName, + icebergConf, + parallelism); default: throw new UnsupportedOperationException("Don't support connector " + connector); } diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java new file mode 100644 index 000000000000..10669f09a0f5 --- /dev/null +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg.migrate; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.client.ClientPool; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.fs.Path; +import org.apache.paimon.hive.HiveCatalog; +import org.apache.paimon.hive.pool.CachedClientPool; +import org.apache.paimon.iceberg.IcebergOptions; +import org.apache.paimon.iceberg.metadata.IcebergMetadata; +import org.apache.paimon.options.Options; +import org.apache.paimon.utils.Preconditions; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.thrift.TException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Get iceberg table latest snapshot metadata in hive. */ +public class IcebergMigrateHiveMetadata implements IcebergMigrateMetadata { + private static final Logger LOG = LoggerFactory.getLogger(IcebergMigrateHiveMetadata.class); + + public static final String TABLE_TYPE_PROP = "table_type"; + public static final String ICEBERG_TABLE_TYPE_VALUE = "iceberg"; + private static final String ICEBERG_METADATA_LOCATION = "metadata_location"; + + private final FileIO fileIO; + private final Identifier icebergIdentifier; + + private final ClientPool clients; + + private String metadataLocation = null; + + public IcebergMigrateHiveMetadata( + Identifier icebergIdentifier, FileIO fileIO, Options icebergOptions) { + this.fileIO = fileIO; + this.icebergIdentifier = icebergIdentifier; + + String uri = icebergOptions.get(IcebergOptions.URI); + String hiveConfDir = icebergOptions.get(IcebergOptions.HIVE_CONF_DIR); + String hadoopConfDir = icebergOptions.get(IcebergOptions.HADOOP_CONF_DIR); + Configuration hadoopConf = new Configuration(); + hadoopConf.setClassLoader(IcebergMigrateHiveMetadata.class.getClassLoader()); + HiveConf hiveConf = HiveCatalog.createHiveConf(hiveConfDir, hadoopConfDir, hadoopConf); + + icebergOptions.toMap().forEach(hiveConf::set); + if (uri != null) { + hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, uri); + } + + if (hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname) == null) { + LOG.error( + "Can't find hive metastore uri to connect: " + + "either set {} in iceberg options or set hive.metastore.uris " + + "in hive-site.xml or hadoop configurations. " + + "Will use empty metastore uris, which means we may use a embedded metastore. " + + "Please make sure hive metastore uri for iceberg table is correctly set as expected.", + IcebergOptions.URI.key()); + } + + this.clients = + new CachedClientPool( + hiveConf, + icebergOptions, + icebergOptions.getString(IcebergOptions.HIVE_CLIENT_CLASS)); + } + + @Override + public IcebergMetadata icebergMetadata() { + try { + boolean isExist = tableExists(icebergIdentifier); + if (!isExist) { + throw new RuntimeException( + String.format( + "iceberg table %s is not existed in hive metastore", + icebergIdentifier)); + } + Table icebergHiveTable = + clients.run( + client -> + client.getTable( + icebergIdentifier.getDatabaseName(), + icebergIdentifier.getTableName())); + // TODO:Is this check necessary? + // check whether it is an iceberg table + String tableType = icebergHiveTable.getParameters().get(TABLE_TYPE_PROP); + Preconditions.checkArgument( + tableType != null && tableType.equalsIgnoreCase(ICEBERG_TABLE_TYPE_VALUE), + "not an iceberg table: %s (table-type=%s)", + icebergIdentifier.toString(), + tableType); + + metadataLocation = icebergHiveTable.getParameters().get(ICEBERG_METADATA_LOCATION); + LOG.info("iceberg latest metadata location: {}", metadataLocation); + + return IcebergMetadata.fromPath(fileIO, new Path(metadataLocation)); + } catch (Exception e) { + throw new RuntimeException( + String.format("Failed to read Iceberg metadata from path %s", metadataLocation), + e); + } + } + + @Override + public String icebergLatestMetadataLocation() { + return metadataLocation; + } + + @Override + public void deleteOriginTable() { + LOG.info("Iceberg table in hive to be deleted:{}", icebergIdentifier.toString()); + try { + clients.run( + client -> { + client.dropTable( + icebergIdentifier.getDatabaseName(), + icebergIdentifier.getTableName(), + true, + true); + return null; + }); + } catch (Exception e) { + LOG.warn( + "exception occurred when deleting origin table, exception message:{}", + e.getMessage()); + } + } + + private boolean tableExists(Identifier identifier) throws Exception { + return clients.run( + client -> + client.tableExists( + identifier.getDatabaseName(), identifier.getTableName())); + } +} diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadataFactory.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadataFactory.java new file mode 100644 index 000000000000..184eb3134058 --- /dev/null +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadataFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.iceberg.migrate; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.FileIO; +import org.apache.paimon.iceberg.IcebergOptions; +import org.apache.paimon.options.Options; + +/** Factory to create {@link IcebergMigrateHiveMetadata}. */ +public class IcebergMigrateHiveMetadataFactory implements IcebergMigrateMetadataFactory { + @Override + public String identifier() { + return IcebergOptions.StorageType.HIVE_CATALOG.toString() + "_migrate"; + } + + @Override + public IcebergMigrateHiveMetadata create( + Identifier icebergIdentifier, FileIO fileIO, Options icebergOptions) { + return new IcebergMigrateHiveMetadata(icebergIdentifier, fileIO, icebergOptions); + } +} diff --git a/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 26f0944d916e..608f034659ca 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-hive/paimon-hive-catalog/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -16,3 +16,4 @@ org.apache.paimon.hive.HiveCatalogFactory org.apache.paimon.hive.HiveCatalogLockFactory org.apache.paimon.iceberg.IcebergHiveMetadataCommitterFactory +org.apache.paimon.iceberg.migrate.IcebergMigrateHiveMetadataFactory diff --git a/paimon-hive/paimon-hive-connector-2.3/pom.xml b/paimon-hive/paimon-hive-connector-2.3/pom.xml index c0660fc573ee..2e235c832d86 100644 --- a/paimon-hive/paimon-hive-connector-2.3/pom.xml +++ b/paimon-hive/paimon-hive-connector-2.3/pom.xml @@ -569,6 +569,12 @@ under the License. ${iceberg.version} test + + org.apache.flink + flink-metrics-dropwizard + ${iceberg.flink.dropwizard.version} + test + diff --git a/paimon-hive/paimon-hive-connector-3.1/pom.xml b/paimon-hive/paimon-hive-connector-3.1/pom.xml index ef700d8c0bcd..2ae5b10e6a1c 100644 --- a/paimon-hive/paimon-hive-connector-3.1/pom.xml +++ b/paimon-hive/paimon-hive-connector-3.1/pom.xml @@ -599,6 +599,12 @@ under the License. ${iceberg.version} test + + org.apache.flink + flink-metrics-dropwizard + ${iceberg.flink.dropwizard.version} + test + diff --git a/paimon-hive/paimon-hive-connector-common/pom.xml b/paimon-hive/paimon-hive-connector-common/pom.xml index 397dfc942185..a79f2002ea24 100644 --- a/paimon-hive/paimon-hive-connector-common/pom.xml +++ b/paimon-hive/paimon-hive-connector-common/pom.xml @@ -562,6 +562,25 @@ under the License. + + org.apache.iceberg + iceberg-flink-${iceberg.flink.version} + ${iceberg.version} + test + + + org.apache.orc + orc-core + + + + + org.apache.flink + flink-metrics-dropwizard + ${iceberg.flink.dropwizard.version} + test + + junit junit diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java index 8d6ded69dc99..7227854e6c94 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java @@ -25,22 +25,27 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; +import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.types.Row; import org.apache.hadoop.hive.conf.HiveConf; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.UUID; import java.util.stream.Stream; /** Tests for {@link MigrateFileProcedure}. */ @@ -50,6 +55,9 @@ public class MigrateTableProcedureITCase extends ActionITCaseBase { private static final int PORT = 9084; + @TempDir java.nio.file.Path iceTempDir; + @TempDir java.nio.file.Path paiTempDir; + @BeforeEach public void beforeEach() { TEST_HIVE_METASTORE.start(PORT); @@ -70,6 +78,16 @@ private static Stream testArguments() { Arguments.of("parquet", false)); } + private static Stream testIcebergArguments() { + return Stream.of( + Arguments.of(true, false, true), + Arguments.of(false, false, true), + Arguments.of(true, true, true), + Arguments.of(false, true, true), + Arguments.of(true, true, false), + Arguments.of(false, true, false)); + } + @ParameterizedTest @MethodSource("testArguments") public void testMigrateProcedure(String format, boolean isNamedArgument) throws Exception { @@ -203,6 +221,152 @@ public void testMigrateAction(String format) throws Exception { Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2); } + @ParameterizedTest + @MethodSource("testIcebergArguments") + public void testMigrateIcebergTableProcedure( + boolean isPartitioned, boolean isHive, boolean isNamedArgument) throws Exception { + TableEnvironment tEnv = + TableEnvironmentImpl.create( + EnvironmentSettings.newInstance().inBatchMode().build()); + + // create iceberg catalog, database, table, and insert some data to iceberg table + tEnv.executeSql(icebergCatalogDdl(isHive)); + + String icebergTable = "iceberg_" + UUID.randomUUID().toString().replace("-", "_"); + tEnv.executeSql("USE CATALOG my_iceberg"); + if (isPartitioned) { + tEnv.executeSql( + String.format( + "CREATE TABLE `default`.`%s` (id string, id2 int, id3 int) PARTITIONED BY (id3)", + icebergTable)); + } else { + tEnv.executeSql( + String.format( + "CREATE TABLE `default`.`%s` (id string, id2 int, id3 int) WITH ('format-version'='2')", + icebergTable)); + } + tEnv.executeSql( + String.format( + "INSERT INTO `default`.`%s` VALUES ('a',1,1),('b',2,2),('c',3,3)", + icebergTable)) + .await(); + + tEnv.executeSql(paimonCatalogDdl(false)); + tEnv.executeSql("USE CATALOG my_paimon"); + + String icebergOptions = + isHive + ? "metadata.iceberg.storage=hive-catalog, metadata.iceberg.uri=thrift://localhost:" + + PORT + : "metadata.iceberg.storage=hadoop-catalog,iceberg_warehouse=" + iceTempDir; + if (isNamedArgument) { + tEnv.executeSql( + String.format( + "CALL sys.migrate_table(connector => 'iceberg', source_table => 'default.%s', " + + "target_table => 'default.paimon_table', " + + "iceberg_options => '%s')", + icebergTable, icebergOptions)) + .await(); + } else { + tEnv.executeSql( + String.format( + "CALL sys.migrate_table('iceberg', 'default.%s','',1,'default.paimon_table','%s')", + icebergTable, icebergOptions)) + .await(); + } + + Assertions.assertThatList( + Arrays.asList(Row.of("a", 1, 1), Row.of("b", 2, 2), Row.of("c", 3, 3))) + .containsExactlyInAnyOrderElementsOf( + ImmutableList.copyOf( + tEnv.executeSql("SELECT * FROM `default`.`paimon_table`") + .collect())); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testMigrateIcebergTableAction(boolean isPartitioned) throws Exception { + TableEnvironment tEnv = + TableEnvironmentImpl.create( + EnvironmentSettings.newInstance().inBatchMode().build()); + + // create iceberg catalog, database, table, and insert some data to iceberg table + tEnv.executeSql(icebergCatalogDdl(true)); + + String icebergTable = "iceberg_" + UUID.randomUUID().toString().replace("-", "_"); + tEnv.executeSql("USE CATALOG my_iceberg"); + if (isPartitioned) { + tEnv.executeSql( + String.format( + "CREATE TABLE `default`.`%s` (id string, id2 int, id3 int) PARTITIONED BY (id3)", + icebergTable)); + } else { + tEnv.executeSql( + String.format( + "CREATE TABLE `default`.`%s` (id string, id2 int, id3 int) WITH ('format-version'='2')", + icebergTable)); + } + tEnv.executeSql( + String.format( + "INSERT INTO `default`.`%s` VALUES ('a',1,1),('b',2,2),('c',3,3)", + icebergTable)) + .await(); + + String icebergOptions = + "metadata.iceberg.storage=hive-catalog, metadata.iceberg.uri=thrift://localhost:" + + PORT; + + Map catalogConf = new HashMap<>(); + catalogConf.put("warehouse", paiTempDir.toString()); + + MigrateTableAction migrateTableAction = + new MigrateTableAction( + "iceberg", + "default." + icebergTable, + catalogConf, + "", + 6, + "default.paimon_table", + icebergOptions); + migrateTableAction.run(); + + tEnv.executeSql(paimonCatalogDdl(false)); + tEnv.executeSql("USE CATALOG my_paimon"); + Assertions.assertThatList( + Arrays.asList(Row.of("a", 1, 1), Row.of("b", 2, 2), Row.of("c", 3, 3))) + .containsExactlyInAnyOrderElementsOf( + ImmutableList.copyOf( + tEnv.executeSql( + "SELECT * FROM `my_paimon`.`default`.`paimon_table`") + .collect())); + } + + private String icebergCatalogDdl(boolean isHive) { + return isHive + ? String.format( + "CREATE CATALOG my_iceberg WITH " + + "( 'type' = 'iceberg', 'catalog-type' = 'hive', 'uri' = 'thrift://localhost:%s', " + + "'warehouse' = '%s', 'cache-enabled' = 'false')", + PORT, iceTempDir) + : String.format( + "CREATE CATALOG my_iceberg WITH " + + "( 'type' = 'iceberg', 'catalog-type' = 'hadoop'," + + "'warehouse' = '%s', 'cache-enabled' = 'false' )", + iceTempDir); + } + + private String paimonCatalogDdl(boolean isHive) { + return isHive + ? String.format( + "CREATE CATALOG my_paimon WITH " + + "( 'type' = 'paimon', 'metastore' = 'hive', 'uri' = 'thrift://localhost:%s', " + + "'warehouse' = '%s', 'cache-enabled' = 'false' )", + PORT, iceTempDir) + : String.format( + "CREATE CATALOG my_paimon WITH ('type' = 'paimon', 'warehouse' = '%s')", + paiTempDir); + } + protected static String data(int i) { Random random = new Random(); StringBuilder stringBuilder = new StringBuilder(); diff --git a/paimon-hive/pom.xml b/paimon-hive/pom.xml index c97aceb1b85c..92f32d1336a6 100644 --- a/paimon-hive/pom.xml +++ b/paimon-hive/pom.xml @@ -50,6 +50,7 @@ under the License. 0.9.8 1.12.319 1.19 + 1.19.0 From 9e84b464f914aa847f4fb9cfe771c54015f90b04 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Thu, 9 Jan 2025 18:02:10 +0800 Subject: [PATCH 02/13] [core][hive] delete iceberg physical data using fileIO because iceberg table in hive is external --- .../migrate/IcebergMigrateHiveMetadata.java | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java index 10669f09a0f5..95ae3f8c1be4 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java @@ -52,6 +52,8 @@ public class IcebergMigrateHiveMetadata implements IcebergMigrateMetadata { private String metadataLocation = null; + private IcebergMetadata icebergMetadata; + public IcebergMigrateHiveMetadata( Identifier icebergIdentifier, FileIO fileIO, Options icebergOptions) { this.fileIO = fileIO; @@ -114,7 +116,8 @@ public IcebergMetadata icebergMetadata() { metadataLocation = icebergHiveTable.getParameters().get(ICEBERG_METADATA_LOCATION); LOG.info("iceberg latest metadata location: {}", metadataLocation); - return IcebergMetadata.fromPath(fileIO, new Path(metadataLocation)); + icebergMetadata = IcebergMetadata.fromPath(fileIO, new Path(metadataLocation)); + return icebergMetadata; } catch (Exception e) { throw new RuntimeException( String.format("Failed to read Iceberg metadata from path %s", metadataLocation), @@ -140,10 +143,16 @@ public void deleteOriginTable() { true); return null; }); + + // iceberg table in hive is external table, client.dropTable only deletes the metadata + // of iceberg table in hive, so we manually delete the data files + Path icebergTablePath = new Path(icebergMetadata.location()); + + if (fileIO.exists(icebergTablePath) && fileIO.isDir(icebergTablePath)) { + fileIO.deleteDirectoryQuietly(icebergTablePath); + } } catch (Exception e) { - LOG.warn( - "exception occurred when deleting origin table, exception message:{}", - e.getMessage()); + LOG.warn("exception occurred when deleting origin table", e); } } From 09df7c5f2b439240a34a5f3bd893197e120fc352 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Thu, 9 Jan 2025 18:19:20 +0800 Subject: [PATCH 03/13] [procedure] remove 'target_table', delete source table and rename paimon table to source table if migrating success [core] alter access permisssion of getDataTypeFromType() --- .../procedure/MigrateTableProcedure.java | 12 +---- .../flink/action/MigrateTableAction.java | 13 ++--- .../action/MigrateTableActionFactory.java | 2 - .../procedure/MigrateTableProcedure.java | 8 --- .../MigrateTableProcedureITCase.java | 52 +++++++++++-------- 5 files changed, 36 insertions(+), 51 deletions(-) diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java index 8f2a5401fc8c..92a970e5b67d 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java @@ -67,16 +67,8 @@ public String[] call( String properties, Integer parallelism) throws Exception { - String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX; - return call( - procedureContext, - connector, - sourceTablePath, - properties, - parallelism, - targetPaimonTablePath, - ""); + return call(procedureContext, connector, sourceTablePath, properties, parallelism, ""); } public String[] call( @@ -85,9 +77,9 @@ public String[] call( String sourceTablePath, String properties, Integer parallelism, - String targetTablePath, String icebergProperties) throws Exception { + String targetTablePath = sourceTablePath + PAIMON_SUFFIX; Identifier sourceTableId = Identifier.fromString(sourceTablePath); Identifier targetTableId = Identifier.fromString(targetTablePath); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java index 1fe9441812d8..62412d87af82 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java @@ -28,11 +28,10 @@ public class MigrateTableAction extends ActionBase { private final String connector; - private final String hiveTableFullName; + private final String sourceTableFullName; private final String tableProperties; private final Integer parallelism; - private final String paimonTableFullName; private final String icebergProperties; public MigrateTableAction( @@ -43,10 +42,9 @@ public MigrateTableAction( Integer parallelism) { super(catalogConfig); this.connector = connector; - this.hiveTableFullName = hiveTableFullName; + this.sourceTableFullName = hiveTableFullName; this.tableProperties = tableProperties; this.parallelism = parallelism; - this.paimonTableFullName = null; this.icebergProperties = null; } @@ -56,14 +54,12 @@ public MigrateTableAction( Map catalogConfig, String tableProperties, Integer parallelism, - String paimonTableFullName, String icebergProperties) { super(catalogConfig); this.connector = connector; - this.hiveTableFullName = hiveTableFullName; + this.sourceTableFullName = hiveTableFullName; this.tableProperties = tableProperties; this.parallelism = parallelism; - this.paimonTableFullName = paimonTableFullName; this.icebergProperties = icebergProperties; } @@ -74,10 +70,9 @@ public void run() throws Exception { migrateTableProcedure.call( new DefaultProcedureContext(env), connector, - hiveTableFullName, + sourceTableFullName, tableProperties, parallelism, - paimonTableFullName, icebergProperties); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java index 3af99d14c4b0..c36ff09232e7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java @@ -47,7 +47,6 @@ public Optional create(MultipleParameterToolAdapter params) { Integer parallelism = params.get(PARALLELISM) == null ? null : Integer.parseInt(params.get(PARALLELISM)); - String targetPaimonTable = params.get(TARGET_TYPE); String icebergOptions = params.get(ICEBERG_OPTIONS); MigrateTableAction migrateTableAction = @@ -57,7 +56,6 @@ public Optional create(MultipleParameterToolAdapter params) { catalogConfig, tableConf, parallelism, - targetPaimonTable, icebergOptions); return Optional.of(migrateTableAction); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java index bba99d844ae8..46f553e0cd26 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java @@ -51,10 +51,6 @@ public String identifier() { name = "parallelism", type = @DataTypeHint("Integer"), isOptional = true), - @ArgumentHint( - name = "target_table", - type = @DataTypeHint("STRING"), - isOptional = true), @ArgumentHint( name = "iceberg_options", type = @DataTypeHint("STRING"), @@ -66,16 +62,12 @@ public String[] call( String sourceTablePath, String properties, Integer parallelism, - String targetTablePath, String icebergProperties) throws Exception { properties = notnull(properties); icebergProperties = notnull(icebergProperties); String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX; - if (targetTablePath != null) { - targetPaimonTablePath = targetTablePath; - } Identifier sourceTableId = Identifier.fromString(sourceTablePath); Identifier targetTableId = Identifier.fromString(targetPaimonTablePath); diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java index 7227854e6c94..581a8169e8f7 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java @@ -79,13 +79,16 @@ private static Stream testArguments() { } private static Stream testIcebergArguments() { + // isPartitioned, icebergIsHive, paimonIsHive, isNamedArgument return Stream.of( - Arguments.of(true, false, true), - Arguments.of(false, false, true), - Arguments.of(true, true, true), - Arguments.of(false, true, true), - Arguments.of(true, true, false), - Arguments.of(false, true, false)); + Arguments.of(true, false, false, true), + Arguments.of(false, false, false, true), + Arguments.of(true, true, false, true), + Arguments.of(false, true, false, true), + Arguments.of(true, true, false, false), + Arguments.of(false, true, false, false), + Arguments.of(true, false, true, true), + Arguments.of(true, true, true, true)); } @ParameterizedTest @@ -224,13 +227,17 @@ public void testMigrateAction(String format) throws Exception { @ParameterizedTest @MethodSource("testIcebergArguments") public void testMigrateIcebergTableProcedure( - boolean isPartitioned, boolean isHive, boolean isNamedArgument) throws Exception { + boolean isPartitioned, + boolean icebergIsHive, + boolean paimonIsHive, + boolean isNamedArgument) + throws Exception { TableEnvironment tEnv = TableEnvironmentImpl.create( EnvironmentSettings.newInstance().inBatchMode().build()); // create iceberg catalog, database, table, and insert some data to iceberg table - tEnv.executeSql(icebergCatalogDdl(isHive)); + tEnv.executeSql(icebergCatalogDdl(icebergIsHive)); String icebergTable = "iceberg_" + UUID.randomUUID().toString().replace("-", "_"); tEnv.executeSql("USE CATALOG my_iceberg"); @@ -251,11 +258,11 @@ public void testMigrateIcebergTableProcedure( icebergTable)) .await(); - tEnv.executeSql(paimonCatalogDdl(false)); + tEnv.executeSql(paimonCatalogDdl(paimonIsHive)); tEnv.executeSql("USE CATALOG my_paimon"); String icebergOptions = - isHive + icebergIsHive ? "metadata.iceberg.storage=hive-catalog, metadata.iceberg.uri=thrift://localhost:" + PORT : "metadata.iceberg.storage=hadoop-catalog,iceberg_warehouse=" + iceTempDir; @@ -263,14 +270,13 @@ public void testMigrateIcebergTableProcedure( tEnv.executeSql( String.format( "CALL sys.migrate_table(connector => 'iceberg', source_table => 'default.%s', " - + "target_table => 'default.paimon_table', " + "iceberg_options => '%s')", icebergTable, icebergOptions)) .await(); } else { tEnv.executeSql( String.format( - "CALL sys.migrate_table('iceberg', 'default.%s','',1,'default.paimon_table','%s')", + "CALL sys.migrate_table('iceberg', 'default.%s','',1,'%s')", icebergTable, icebergOptions)) .await(); } @@ -279,7 +285,10 @@ public void testMigrateIcebergTableProcedure( Arrays.asList(Row.of("a", 1, 1), Row.of("b", 2, 2), Row.of("c", 3, 3))) .containsExactlyInAnyOrderElementsOf( ImmutableList.copyOf( - tEnv.executeSql("SELECT * FROM `default`.`paimon_table`") + tEnv.executeSql( + String.format( + "SELECT * FROM `default`.`%s`", + icebergTable)) .collect())); } @@ -318,26 +327,25 @@ public void testMigrateIcebergTableAction(boolean isPartitioned) throws Exceptio Map catalogConf = new HashMap<>(); catalogConf.put("warehouse", paiTempDir.toString()); + catalogConf.put("metastore", "hive"); + catalogConf.put("uri", "thrift://localhost:" + PORT); + catalogConf.put("cache-enabled", "false"); MigrateTableAction migrateTableAction = new MigrateTableAction( - "iceberg", - "default." + icebergTable, - catalogConf, - "", - 6, - "default.paimon_table", - icebergOptions); + "iceberg", "default." + icebergTable, catalogConf, "", 6, icebergOptions); migrateTableAction.run(); - tEnv.executeSql(paimonCatalogDdl(false)); + tEnv.executeSql(paimonCatalogDdl(true)); tEnv.executeSql("USE CATALOG my_paimon"); Assertions.assertThatList( Arrays.asList(Row.of("a", 1, 1), Row.of("b", 2, 2), Row.of("c", 3, 3))) .containsExactlyInAnyOrderElementsOf( ImmutableList.copyOf( tEnv.executeSql( - "SELECT * FROM `my_paimon`.`default`.`paimon_table`") + String.format( + "SELECT * FROM `my_paimon`.`default`.`%s`", + icebergTable)) .collect())); } From 1e7937e30ce1dd93a658fe7463096e8849e053c8 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Fri, 10 Jan 2025 11:32:28 +0800 Subject: [PATCH 04/13] [pom][hive] remove useless dependency --- paimon-hive/paimon-hive-connector-2.3/pom.xml | 6 ------ paimon-hive/paimon-hive-connector-3.1/pom.xml | 6 ------ 2 files changed, 12 deletions(-) diff --git a/paimon-hive/paimon-hive-connector-2.3/pom.xml b/paimon-hive/paimon-hive-connector-2.3/pom.xml index 2e235c832d86..c0660fc573ee 100644 --- a/paimon-hive/paimon-hive-connector-2.3/pom.xml +++ b/paimon-hive/paimon-hive-connector-2.3/pom.xml @@ -569,12 +569,6 @@ under the License. ${iceberg.version} test - - org.apache.flink - flink-metrics-dropwizard - ${iceberg.flink.dropwizard.version} - test - diff --git a/paimon-hive/paimon-hive-connector-3.1/pom.xml b/paimon-hive/paimon-hive-connector-3.1/pom.xml index 2ae5b10e6a1c..ef700d8c0bcd 100644 --- a/paimon-hive/paimon-hive-connector-3.1/pom.xml +++ b/paimon-hive/paimon-hive-connector-3.1/pom.xml @@ -599,12 +599,6 @@ under the License. ${iceberg.version} test - - org.apache.flink - flink-metrics-dropwizard - ${iceberg.flink.dropwizard.version} - test - From 138021592edefe974d0cbd54159b00b1df2c0ef5 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Fri, 10 Jan 2025 12:00:20 +0800 Subject: [PATCH 05/13] [procedure] small fix --- .../apache/paimon/flink/action/MigrateTableAction.java | 8 ++++---- .../paimon/flink/action/MigrateTableActionFactory.java | 1 - 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java index 62412d87af82..098193a22dc2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java @@ -36,13 +36,13 @@ public class MigrateTableAction extends ActionBase { public MigrateTableAction( String connector, - String hiveTableFullName, + String sourceTableFullName, Map catalogConfig, String tableProperties, Integer parallelism) { super(catalogConfig); this.connector = connector; - this.sourceTableFullName = hiveTableFullName; + this.sourceTableFullName = sourceTableFullName; this.tableProperties = tableProperties; this.parallelism = parallelism; this.icebergProperties = null; @@ -50,14 +50,14 @@ public MigrateTableAction( public MigrateTableAction( String connector, - String hiveTableFullName, + String sourceTableFullName, Map catalogConfig, String tableProperties, Integer parallelism, String icebergProperties) { super(catalogConfig); this.connector = connector; - this.sourceTableFullName = hiveTableFullName; + this.sourceTableFullName = sourceTableFullName; this.tableProperties = tableProperties; this.parallelism = parallelism; this.icebergProperties = icebergProperties; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java index c36ff09232e7..3834145fe35f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java @@ -30,7 +30,6 @@ public class MigrateTableActionFactory implements ActionFactory { private static final String OPTIONS = "options"; private static final String PARALLELISM = "parallelism"; - private static final String TARGET_TYPE = "target_type"; private static final String ICEBERG_OPTIONS = "iceberg_options"; @Override From 6d9a905d0c823525483083523245fbdae3018be4 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Mon, 20 Jan 2025 15:10:45 +0800 Subject: [PATCH 06/13] [procedure] use MigrateIcebergTableProcedure for migration # Conflicts: # paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory # Conflicts: # paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory --- .../MigrateIcebergTableProcedure.java | 92 +++++++ .../procedure/MigrateTableProcedure.java | 15 +- .../action/MigrateIcebergTableAction.java | 61 +++++ .../MigrateIcebergTableActionFactory.java | 70 ++++++ .../flink/action/MigrateTableAction.java | 29 +-- .../action/MigrateTableActionFactory.java | 14 +- .../MigrateIcebergTableProcedure.java | 90 +++++++ .../procedure/MigrateTableProcedure.java | 11 +- .../flink/utils/TableMigrationUtils.java | 54 ++-- .../org.apache.paimon.factories.Factory | 3 +- .../MigrateIcebergTableProcedureITCase.java | 230 ++++++++++++++++++ .../MigrateTableProcedureITCase.java | 172 ------------- 12 files changed, 576 insertions(+), 265 deletions(-) create mode 100644 paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableAction.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableActionFactory.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java create mode 100644 paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java new file mode 100644 index 000000000000..0402d3e8961b --- /dev/null +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.utils.TableMigrationUtils; +import org.apache.paimon.migrate.Migrator; +import org.apache.paimon.utils.ParameterUtils; + +import org.apache.flink.table.procedure.ProcedureContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Migrate procedure to migrate iceberg table to paimon table. */ +public class MigrateIcebergTableProcedure extends ProcedureBase { + + private static final Logger LOG = LoggerFactory.getLogger(MigrateIcebergTableProcedure.class); + + private static final String PAIMON_SUFFIX = "_paimon_"; + + @Override + public String identifier() { + return "migrate_iceberg_table"; + } + + public String[] call( + ProcedureContext procedureContext, String sourceTablePath, String icebergProperties) + throws Exception { + + return call(procedureContext, sourceTablePath, icebergProperties, ""); + } + + public String[] call( + ProcedureContext procedureContext, + String sourceTablePath, + String icebergProperties, + String properties) + throws Exception { + + return call( + procedureContext, + sourceTablePath, + icebergProperties, + properties, + Runtime.getRuntime().availableProcessors()); + } + + public String[] call( + ProcedureContext procedureContext, + String sourceTablePath, + String icebergProperties, + String properties, + Integer parallelism) + throws Exception { + String targetTablePath = sourceTablePath + PAIMON_SUFFIX; + + Identifier sourceTableId = Identifier.fromString(sourceTablePath); + Identifier targetTableId = Identifier.fromString(targetTablePath); + + Migrator migrator = + TableMigrationUtils.getIcebergImporter( + catalog, + sourceTableId.getDatabaseName(), + sourceTableId.getObjectName(), + targetTableId.getDatabaseName(), + targetTableId.getObjectName(), + parallelism, + ParameterUtils.parseCommaSeparatedKeyValues(properties), + ParameterUtils.parseCommaSeparatedKeyValues(icebergProperties)); + LOG.info("create migrator success."); + migrator.executeMigrate(); + + migrator.renameTable(false); + return new String[] {"Success"}; + } +} diff --git a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java index 92a970e5b67d..8778b9d5e187 100644 --- a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java +++ b/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java @@ -67,18 +67,6 @@ public String[] call( String properties, Integer parallelism) throws Exception { - - return call(procedureContext, connector, sourceTablePath, properties, parallelism, ""); - } - - public String[] call( - ProcedureContext procedureContext, - String connector, - String sourceTablePath, - String properties, - Integer parallelism, - String icebergProperties) - throws Exception { String targetTablePath = sourceTablePath + PAIMON_SUFFIX; Identifier sourceTableId = Identifier.fromString(sourceTablePath); @@ -93,8 +81,7 @@ public String[] call( targetTableId.getDatabaseName(), targetTableId.getObjectName(), parallelism, - ParameterUtils.parseCommaSeparatedKeyValues(properties), - ParameterUtils.parseCommaSeparatedKeyValues(icebergProperties)); + ParameterUtils.parseCommaSeparatedKeyValues(properties)); LOG.info("create migrator success."); migrator.executeMigrate(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableAction.java new file mode 100644 index 000000000000..1b9fcb46a9e9 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableAction.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import org.apache.paimon.flink.procedure.MigrateIcebergTableProcedure; + +import org.apache.flink.table.procedure.DefaultProcedureContext; + +import java.util.Map; + +/** Migrate from iceberg table to paimon table. */ +public class MigrateIcebergTableAction extends ActionBase { + + private final String sourceTableFullName; + private final String tableProperties; + private final Integer parallelism; + + private final String icebergProperties; + + public MigrateIcebergTableAction( + String sourceTableFullName, + Map catalogConfig, + String icebergProperties, + String tableProperties, + Integer parallelism) { + super(catalogConfig); + this.sourceTableFullName = sourceTableFullName; + this.tableProperties = tableProperties; + this.parallelism = parallelism; + this.icebergProperties = icebergProperties; + } + + @Override + public void run() throws Exception { + MigrateIcebergTableProcedure migrateIcebergTableProcedure = + new MigrateIcebergTableProcedure(); + migrateIcebergTableProcedure.withCatalog(catalog); + migrateIcebergTableProcedure.call( + new DefaultProcedureContext(env), + sourceTableFullName, + icebergProperties, + tableProperties, + parallelism); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableActionFactory.java new file mode 100644 index 000000000000..c85559d66b41 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateIcebergTableActionFactory.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action; + +import java.util.Map; +import java.util.Optional; + +/** Action Factory for {@link MigrateIcebergTableAction}. */ +public class MigrateIcebergTableActionFactory implements ActionFactory { + + public static final String IDENTIFIER = "migrate_iceberg_table"; + + private static final String OPTIONS = "options"; + private static final String PARALLELISM = "parallelism"; + + private static final String ICEBERG_OPTIONS = "iceberg_options"; + + @Override + public String identifier() { + return IDENTIFIER; + } + + @Override + public Optional create(MultipleParameterToolAdapter params) { + + String sourceTable = params.get(TABLE); + Map catalogConfig = catalogConfigMap(params); + String tableConf = params.get(OPTIONS); + Integer parallelism = + params.get(PARALLELISM) == null ? null : Integer.parseInt(params.get(PARALLELISM)); + + String icebergOptions = params.get(ICEBERG_OPTIONS); + + MigrateIcebergTableAction migrateIcebergTableAction = + new MigrateIcebergTableAction( + sourceTable, catalogConfig, icebergOptions, tableConf, parallelism); + return Optional.of(migrateIcebergTableAction); + } + + @Override + public void printHelp() { + System.out.println( + "Action \"migrate_iceberg_table\" runs a migrating job from iceberg to paimon."); + System.out.println(); + + System.out.println("Syntax:"); + System.out.println( + " migrate_iceberg_table" + + "--table " + + "--iceberg_options =[,=,...]" + + "[--catalog_conf ==,=,...]"); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java index 098193a22dc2..b12cb5f862fe 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableAction.java @@ -28,39 +28,21 @@ public class MigrateTableAction extends ActionBase { private final String connector; - private final String sourceTableFullName; + private final String hiveTableFullName; private final String tableProperties; private final Integer parallelism; - private final String icebergProperties; - public MigrateTableAction( String connector, - String sourceTableFullName, + String hiveTableFullName, Map catalogConfig, String tableProperties, Integer parallelism) { super(catalogConfig); this.connector = connector; - this.sourceTableFullName = sourceTableFullName; - this.tableProperties = tableProperties; - this.parallelism = parallelism; - this.icebergProperties = null; - } - - public MigrateTableAction( - String connector, - String sourceTableFullName, - Map catalogConfig, - String tableProperties, - Integer parallelism, - String icebergProperties) { - super(catalogConfig); - this.connector = connector; - this.sourceTableFullName = sourceTableFullName; + this.hiveTableFullName = hiveTableFullName; this.tableProperties = tableProperties; this.parallelism = parallelism; - this.icebergProperties = icebergProperties; } @Override @@ -70,9 +52,8 @@ public void run() throws Exception { migrateTableProcedure.call( new DefaultProcedureContext(env), connector, - sourceTableFullName, + hiveTableFullName, tableProperties, - parallelism, - icebergProperties); + parallelism); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java index 3834145fe35f..b2c96795a439 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MigrateTableActionFactory.java @@ -30,8 +30,6 @@ public class MigrateTableActionFactory implements ActionFactory { private static final String OPTIONS = "options"; private static final String PARALLELISM = "parallelism"; - private static final String ICEBERG_OPTIONS = "iceberg_options"; - @Override public String identifier() { return IDENTIFIER; @@ -43,19 +41,11 @@ public Optional create(MultipleParameterToolAdapter params) { String sourceHiveTable = params.get(TABLE); Map catalogConfig = catalogConfigMap(params); String tableConf = params.get(OPTIONS); - Integer parallelism = - params.get(PARALLELISM) == null ? null : Integer.parseInt(params.get(PARALLELISM)); - - String icebergOptions = params.get(ICEBERG_OPTIONS); + Integer parallelism = Integer.parseInt(params.get(PARALLELISM)); MigrateTableAction migrateTableAction = new MigrateTableAction( - connector, - sourceHiveTable, - catalogConfig, - tableConf, - parallelism, - icebergOptions); + connector, sourceHiveTable, catalogConfig, tableConf, parallelism); return Optional.of(migrateTableAction); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java new file mode 100644 index 000000000000..f43d29ed4f17 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateIcebergTableProcedure.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.procedure; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.utils.TableMigrationUtils; +import org.apache.paimon.migrate.Migrator; +import org.apache.paimon.utils.ParameterUtils; + +import org.apache.flink.table.annotation.ArgumentHint; +import org.apache.flink.table.annotation.DataTypeHint; +import org.apache.flink.table.annotation.ProcedureHint; +import org.apache.flink.table.procedure.ProcedureContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Migrate procedure to migrate iceberg table to paimon table. */ +public class MigrateIcebergTableProcedure extends ProcedureBase { + private static final Logger LOG = LoggerFactory.getLogger(MigrateIcebergTableProcedure.class); + + private static final String PAIMON_SUFFIX = "_paimon_"; + + @Override + public String identifier() { + return "migrate_iceberg_table"; + } + + @ProcedureHint( + argument = { + @ArgumentHint(name = "source_table", type = @DataTypeHint("STRING")), + @ArgumentHint( + name = "iceberg_options", + type = @DataTypeHint("STRING"), + isOptional = true), + @ArgumentHint(name = "options", type = @DataTypeHint("STRING"), isOptional = true), + @ArgumentHint( + name = "parallelism", + type = @DataTypeHint("Integer"), + isOptional = true) + }) + public String[] call( + ProcedureContext procedureContext, + String sourceTablePath, + String icebergProperties, + String properties, + Integer parallelism) + throws Exception { + properties = notnull(properties); + icebergProperties = notnull(icebergProperties); + + String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX; + + Identifier sourceTableId = Identifier.fromString(sourceTablePath); + Identifier targetTableId = Identifier.fromString(targetPaimonTablePath); + + Integer p = parallelism == null ? Runtime.getRuntime().availableProcessors() : parallelism; + + Migrator migrator = + TableMigrationUtils.getIcebergImporter( + catalog, + sourceTableId.getDatabaseName(), + sourceTableId.getObjectName(), + targetTableId.getDatabaseName(), + targetTableId.getObjectName(), + p, + ParameterUtils.parseCommaSeparatedKeyValues(properties), + ParameterUtils.parseCommaSeparatedKeyValues(icebergProperties)); + LOG.info("create migrator success."); + migrator.executeMigrate(); + + migrator.renameTable(false); + return new String[] {"Success"}; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java index 46f553e0cd26..32a2a16dc51d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/MigrateTableProcedure.java @@ -50,10 +50,6 @@ public String identifier() { @ArgumentHint( name = "parallelism", type = @DataTypeHint("Integer"), - isOptional = true), - @ArgumentHint( - name = "iceberg_options", - type = @DataTypeHint("STRING"), isOptional = true) }) public String[] call( @@ -61,11 +57,9 @@ public String[] call( String connector, String sourceTablePath, String properties, - Integer parallelism, - String icebergProperties) + Integer parallelism) throws Exception { properties = notnull(properties); - icebergProperties = notnull(icebergProperties); String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX; @@ -83,8 +77,7 @@ public String[] call( targetTableId.getDatabaseName(), targetTableId.getObjectName(), p, - ParameterUtils.parseCommaSeparatedKeyValues(properties), - ParameterUtils.parseCommaSeparatedKeyValues(icebergProperties)); + ParameterUtils.parseCommaSeparatedKeyValues(properties)); LOG.info("create migrator success."); migrator.executeMigrate(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java index 68257417d585..4e7268c6f14e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableMigrationUtils.java @@ -26,7 +26,6 @@ import org.apache.paimon.migrate.Migrator; import org.apache.paimon.options.Options; -import java.util.Collections; import java.util.List; import java.util.Map; @@ -42,28 +41,6 @@ public static Migrator getImporter( String targetTableName, Integer parallelism, Map options) { - return getImporter( - connector, - catalog, - sourceDatabase, - sourceTableName, - targetDatabase, - targetTableName, - parallelism, - options, - Collections.emptyMap()); - } - - public static Migrator getImporter( - String connector, - Catalog catalog, - String sourceDatabase, - String sourceTableName, - String targetDatabase, - String targetTableName, - Integer parallelism, - Map options, - Map icebergOptions) { switch (connector) { case "hive": if (catalog instanceof CachingCatalog) { @@ -80,21 +57,32 @@ public static Migrator getImporter( targetTableName, parallelism, options); - case "iceberg": - Options icebergConf = new Options(icebergOptions); - return new IcebergMigrator( - catalog, - targetDatabase, - targetTableName, - sourceDatabase, - sourceTableName, - icebergConf, - parallelism); default: throw new UnsupportedOperationException("Don't support connector " + connector); } } + public static Migrator getIcebergImporter( + Catalog catalog, + String sourceDatabase, + String sourceTableName, + String targetDatabase, + String targetTableName, + Integer parallelism, + Map options, + Map icebergOptions) { + + Options icebergConf = new Options(icebergOptions); + return new IcebergMigrator( + catalog, + targetDatabase, + targetTableName, + sourceDatabase, + sourceTableName, + icebergConf, + parallelism); + } + public static List getImporters( String connector, Catalog catalog, diff --git a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory index 6f6becf85fc7..efaa25627d69 100644 --- a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory +++ b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory @@ -85,4 +85,5 @@ org.apache.paimon.flink.procedure.CloneProcedure org.apache.paimon.flink.procedure.CompactManifestProcedure org.apache.paimon.flink.procedure.RefreshObjectTableProcedure org.apache.paimon.flink.procedure.RemoveUnexistingFilesProcedure -org.apache.paimon.flink.procedure.ClearConsumersProcedure \ No newline at end of file +org.apache.paimon.flink.procedure.ClearConsumersProcedure +org.apache.paimon.flink.procedure.MigrateIcebergTableProcedure diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java new file mode 100644 index 000000000000..b34da39d45c8 --- /dev/null +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.hive.procedure; + +import org.apache.paimon.flink.action.ActionITCaseBase; +import org.apache.paimon.flink.action.MigrateIcebergTableAction; +import org.apache.paimon.flink.procedure.MigrateIcebergTableProcedure; +import org.apache.paimon.hive.TestHiveMetastore; + +import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; + +import org.apache.flink.table.api.EnvironmentSettings; +import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.internal.TableEnvironmentImpl; +import org.apache.flink.types.Row; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Stream; + +/** Tests for {@link MigrateIcebergTableProcedure}. */ +public class MigrateIcebergTableProcedureITCase extends ActionITCaseBase { + + private static final TestHiveMetastore TEST_HIVE_METASTORE = new TestHiveMetastore(); + + private static final int PORT = 9084; + + @TempDir java.nio.file.Path iceTempDir; + @TempDir java.nio.file.Path paiTempDir; + + @BeforeEach + public void beforeEach() { + TEST_HIVE_METASTORE.start(PORT); + } + + @AfterEach + public void afterEach() throws Exception { + TEST_HIVE_METASTORE.stop(); + } + + private static Stream testIcebergArguments() { + // isPartitioned, icebergIsHive, paimonIsHive, isNamedArgument + return Stream.of( + Arguments.of(true, false, false, true), + Arguments.of(false, false, false, true), + Arguments.of(true, true, false, true), + Arguments.of(false, true, false, true), + Arguments.of(true, true, false, false), + Arguments.of(false, true, false, false), + Arguments.of(true, false, true, true), + Arguments.of(true, true, true, true)); + } + + @ParameterizedTest + @MethodSource("testIcebergArguments") + public void testMigrateIcebergTableProcedure( + boolean isPartitioned, + boolean icebergIsHive, + boolean paimonIsHive, + boolean isNamedArgument) + throws Exception { + TableEnvironment tEnv = + TableEnvironmentImpl.create( + EnvironmentSettings.newInstance().inBatchMode().build()); + + // create iceberg catalog, database, table, and insert some data to iceberg table + tEnv.executeSql(icebergCatalogDdl(icebergIsHive)); + + String icebergTable = "iceberg_" + UUID.randomUUID().toString().replace("-", "_"); + tEnv.executeSql("USE CATALOG my_iceberg"); + if (isPartitioned) { + tEnv.executeSql( + String.format( + "CREATE TABLE `default`.`%s` (id string, id2 int, id3 int) PARTITIONED BY (id3)", + icebergTable)); + } else { + tEnv.executeSql( + String.format( + "CREATE TABLE `default`.`%s` (id string, id2 int, id3 int) WITH ('format-version'='2')", + icebergTable)); + } + tEnv.executeSql( + String.format( + "INSERT INTO `default`.`%s` VALUES ('a',1,1),('b',2,2),('c',3,3)", + icebergTable)) + .await(); + + tEnv.executeSql(paimonCatalogDdl(paimonIsHive)); + tEnv.executeSql("USE CATALOG my_paimon"); + + String icebergOptions = + icebergIsHive + ? "metadata.iceberg.storage=hive-catalog, metadata.iceberg.uri=thrift://localhost:" + + PORT + : "metadata.iceberg.storage=hadoop-catalog,iceberg_warehouse=" + iceTempDir; + if (isNamedArgument) { + tEnv.executeSql( + String.format( + "CALL sys.migrate_iceberg_table(source_table => 'default.%s', " + + "iceberg_options => '%s')", + icebergTable, icebergOptions)) + .await(); + } else { + tEnv.executeSql( + String.format( + "CALL sys.migrate_iceberg_table('default.%s','%s')", + icebergTable, icebergOptions)) + .await(); + } + + Assertions.assertThatList( + Arrays.asList(Row.of("a", 1, 1), Row.of("b", 2, 2), Row.of("c", 3, 3))) + .containsExactlyInAnyOrderElementsOf( + ImmutableList.copyOf( + tEnv.executeSql( + String.format( + "SELECT * FROM `default`.`%s`", + icebergTable)) + .collect())); + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testMigrateIcebergTableAction(boolean isPartitioned) throws Exception { + TableEnvironment tEnv = + TableEnvironmentImpl.create( + EnvironmentSettings.newInstance().inBatchMode().build()); + + // create iceberg catalog, database, table, and insert some data to iceberg table + tEnv.executeSql(icebergCatalogDdl(true)); + + String icebergTable = "iceberg_" + UUID.randomUUID().toString().replace("-", "_"); + tEnv.executeSql("USE CATALOG my_iceberg"); + if (isPartitioned) { + tEnv.executeSql( + String.format( + "CREATE TABLE `default`.`%s` (id string, id2 int, id3 int) PARTITIONED BY (id3)", + icebergTable)); + } else { + tEnv.executeSql( + String.format( + "CREATE TABLE `default`.`%s` (id string, id2 int, id3 int) WITH ('format-version'='2')", + icebergTable)); + } + tEnv.executeSql( + String.format( + "INSERT INTO `default`.`%s` VALUES ('a',1,1),('b',2,2),('c',3,3)", + icebergTable)) + .await(); + + String icebergOptions = + "metadata.iceberg.storage=hive-catalog, metadata.iceberg.uri=thrift://localhost:" + + PORT; + + Map catalogConf = new HashMap<>(); + catalogConf.put("warehouse", paiTempDir.toString()); + catalogConf.put("metastore", "hive"); + catalogConf.put("uri", "thrift://localhost:" + PORT); + catalogConf.put("cache-enabled", "false"); + + MigrateIcebergTableAction migrateIcebergTableAction = + new MigrateIcebergTableAction( + "default." + icebergTable, catalogConf, icebergOptions, "", 6); + migrateIcebergTableAction.run(); + + tEnv.executeSql(paimonCatalogDdl(true)); + tEnv.executeSql("USE CATALOG my_paimon"); + Assertions.assertThatList( + Arrays.asList(Row.of("a", 1, 1), Row.of("b", 2, 2), Row.of("c", 3, 3))) + .containsExactlyInAnyOrderElementsOf( + ImmutableList.copyOf( + tEnv.executeSql( + String.format( + "SELECT * FROM `my_paimon`.`default`.`%s`", + icebergTable)) + .collect())); + } + + private String icebergCatalogDdl(boolean isHive) { + return isHive + ? String.format( + "CREATE CATALOG my_iceberg WITH " + + "( 'type' = 'iceberg', 'catalog-type' = 'hive', 'uri' = 'thrift://localhost:%s', " + + "'warehouse' = '%s', 'cache-enabled' = 'false')", + PORT, iceTempDir) + : String.format( + "CREATE CATALOG my_iceberg WITH " + + "( 'type' = 'iceberg', 'catalog-type' = 'hadoop'," + + "'warehouse' = '%s', 'cache-enabled' = 'false' )", + iceTempDir); + } + + private String paimonCatalogDdl(boolean isHive) { + return isHive + ? String.format( + "CREATE CATALOG my_paimon WITH " + + "( 'type' = 'paimon', 'metastore' = 'hive', 'uri' = 'thrift://localhost:%s', " + + "'warehouse' = '%s', 'cache-enabled' = 'false' )", + PORT, iceTempDir) + : String.format( + "CREATE CATALOG my_paimon WITH ('type' = 'paimon', 'warehouse' = '%s')", + paiTempDir); + } +} diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java index 581a8169e8f7..8d6ded69dc99 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateTableProcedureITCase.java @@ -25,27 +25,22 @@ import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList; -import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.internal.TableEnvironmentImpl; import org.apache.flink.types.Row; import org.apache.hadoop.hive.conf.HiveConf; import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; -import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; -import java.util.UUID; import java.util.stream.Stream; /** Tests for {@link MigrateFileProcedure}. */ @@ -55,9 +50,6 @@ public class MigrateTableProcedureITCase extends ActionITCaseBase { private static final int PORT = 9084; - @TempDir java.nio.file.Path iceTempDir; - @TempDir java.nio.file.Path paiTempDir; - @BeforeEach public void beforeEach() { TEST_HIVE_METASTORE.start(PORT); @@ -78,19 +70,6 @@ private static Stream testArguments() { Arguments.of("parquet", false)); } - private static Stream testIcebergArguments() { - // isPartitioned, icebergIsHive, paimonIsHive, isNamedArgument - return Stream.of( - Arguments.of(true, false, false, true), - Arguments.of(false, false, false, true), - Arguments.of(true, true, false, true), - Arguments.of(false, true, false, true), - Arguments.of(true, true, false, false), - Arguments.of(false, true, false, false), - Arguments.of(true, false, true, true), - Arguments.of(true, true, true, true)); - } - @ParameterizedTest @MethodSource("testArguments") public void testMigrateProcedure(String format, boolean isNamedArgument) throws Exception { @@ -224,157 +203,6 @@ public void testMigrateAction(String format) throws Exception { Assertions.assertThatList(r1).containsExactlyInAnyOrderElementsOf(r2); } - @ParameterizedTest - @MethodSource("testIcebergArguments") - public void testMigrateIcebergTableProcedure( - boolean isPartitioned, - boolean icebergIsHive, - boolean paimonIsHive, - boolean isNamedArgument) - throws Exception { - TableEnvironment tEnv = - TableEnvironmentImpl.create( - EnvironmentSettings.newInstance().inBatchMode().build()); - - // create iceberg catalog, database, table, and insert some data to iceberg table - tEnv.executeSql(icebergCatalogDdl(icebergIsHive)); - - String icebergTable = "iceberg_" + UUID.randomUUID().toString().replace("-", "_"); - tEnv.executeSql("USE CATALOG my_iceberg"); - if (isPartitioned) { - tEnv.executeSql( - String.format( - "CREATE TABLE `default`.`%s` (id string, id2 int, id3 int) PARTITIONED BY (id3)", - icebergTable)); - } else { - tEnv.executeSql( - String.format( - "CREATE TABLE `default`.`%s` (id string, id2 int, id3 int) WITH ('format-version'='2')", - icebergTable)); - } - tEnv.executeSql( - String.format( - "INSERT INTO `default`.`%s` VALUES ('a',1,1),('b',2,2),('c',3,3)", - icebergTable)) - .await(); - - tEnv.executeSql(paimonCatalogDdl(paimonIsHive)); - tEnv.executeSql("USE CATALOG my_paimon"); - - String icebergOptions = - icebergIsHive - ? "metadata.iceberg.storage=hive-catalog, metadata.iceberg.uri=thrift://localhost:" - + PORT - : "metadata.iceberg.storage=hadoop-catalog,iceberg_warehouse=" + iceTempDir; - if (isNamedArgument) { - tEnv.executeSql( - String.format( - "CALL sys.migrate_table(connector => 'iceberg', source_table => 'default.%s', " - + "iceberg_options => '%s')", - icebergTable, icebergOptions)) - .await(); - } else { - tEnv.executeSql( - String.format( - "CALL sys.migrate_table('iceberg', 'default.%s','',1,'%s')", - icebergTable, icebergOptions)) - .await(); - } - - Assertions.assertThatList( - Arrays.asList(Row.of("a", 1, 1), Row.of("b", 2, 2), Row.of("c", 3, 3))) - .containsExactlyInAnyOrderElementsOf( - ImmutableList.copyOf( - tEnv.executeSql( - String.format( - "SELECT * FROM `default`.`%s`", - icebergTable)) - .collect())); - } - - @ParameterizedTest - @ValueSource(booleans = {false, true}) - public void testMigrateIcebergTableAction(boolean isPartitioned) throws Exception { - TableEnvironment tEnv = - TableEnvironmentImpl.create( - EnvironmentSettings.newInstance().inBatchMode().build()); - - // create iceberg catalog, database, table, and insert some data to iceberg table - tEnv.executeSql(icebergCatalogDdl(true)); - - String icebergTable = "iceberg_" + UUID.randomUUID().toString().replace("-", "_"); - tEnv.executeSql("USE CATALOG my_iceberg"); - if (isPartitioned) { - tEnv.executeSql( - String.format( - "CREATE TABLE `default`.`%s` (id string, id2 int, id3 int) PARTITIONED BY (id3)", - icebergTable)); - } else { - tEnv.executeSql( - String.format( - "CREATE TABLE `default`.`%s` (id string, id2 int, id3 int) WITH ('format-version'='2')", - icebergTable)); - } - tEnv.executeSql( - String.format( - "INSERT INTO `default`.`%s` VALUES ('a',1,1),('b',2,2),('c',3,3)", - icebergTable)) - .await(); - - String icebergOptions = - "metadata.iceberg.storage=hive-catalog, metadata.iceberg.uri=thrift://localhost:" - + PORT; - - Map catalogConf = new HashMap<>(); - catalogConf.put("warehouse", paiTempDir.toString()); - catalogConf.put("metastore", "hive"); - catalogConf.put("uri", "thrift://localhost:" + PORT); - catalogConf.put("cache-enabled", "false"); - - MigrateTableAction migrateTableAction = - new MigrateTableAction( - "iceberg", "default." + icebergTable, catalogConf, "", 6, icebergOptions); - migrateTableAction.run(); - - tEnv.executeSql(paimonCatalogDdl(true)); - tEnv.executeSql("USE CATALOG my_paimon"); - Assertions.assertThatList( - Arrays.asList(Row.of("a", 1, 1), Row.of("b", 2, 2), Row.of("c", 3, 3))) - .containsExactlyInAnyOrderElementsOf( - ImmutableList.copyOf( - tEnv.executeSql( - String.format( - "SELECT * FROM `my_paimon`.`default`.`%s`", - icebergTable)) - .collect())); - } - - private String icebergCatalogDdl(boolean isHive) { - return isHive - ? String.format( - "CREATE CATALOG my_iceberg WITH " - + "( 'type' = 'iceberg', 'catalog-type' = 'hive', 'uri' = 'thrift://localhost:%s', " - + "'warehouse' = '%s', 'cache-enabled' = 'false')", - PORT, iceTempDir) - : String.format( - "CREATE CATALOG my_iceberg WITH " - + "( 'type' = 'iceberg', 'catalog-type' = 'hadoop'," - + "'warehouse' = '%s', 'cache-enabled' = 'false' )", - iceTempDir); - } - - private String paimonCatalogDdl(boolean isHive) { - return isHive - ? String.format( - "CREATE CATALOG my_paimon WITH " - + "( 'type' = 'paimon', 'metastore' = 'hive', 'uri' = 'thrift://localhost:%s', " - + "'warehouse' = '%s', 'cache-enabled' = 'false' )", - PORT, iceTempDir) - : String.format( - "CREATE CATALOG my_paimon WITH ('type' = 'paimon', 'warehouse' = '%s')", - paiTempDir); - } - protected static String data(int i) { Random random = new Random(); StringBuilder stringBuilder = new StringBuilder(); From 2727667e83cb6f64af20431f661b76eea724c662 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Mon, 20 Jan 2025 16:17:58 +0800 Subject: [PATCH 07/13] [procedure][fix] change the port num for TestHiveMetastore in MigrateIcebergTableProcedureITCase --- .../hive/procedure/MigrateIcebergTableProcedureITCase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java index b34da39d45c8..d100131e0935 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java @@ -49,7 +49,7 @@ public class MigrateIcebergTableProcedureITCase extends ActionITCaseBase { private static final TestHiveMetastore TEST_HIVE_METASTORE = new TestHiveMetastore(); - private static final int PORT = 9084; + private static final int PORT = 9087; @TempDir java.nio.file.Path iceTempDir; @TempDir java.nio.file.Path paiTempDir; From 68fee7fbd543b704648442dc4fa54794439edc16 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Mon, 10 Feb 2025 17:23:11 +0800 Subject: [PATCH 08/13] [procedure][test] make testMigrateIcebergTableProcedure a random test --- .../MigrateIcebergTableProcedureITCase.java | 47 +++++++++---------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java index d100131e0935..1875b08eba22 100644 --- a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java +++ b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/procedure/MigrateIcebergTableProcedureITCase.java @@ -32,21 +32,25 @@ import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.UUID; -import java.util.stream.Stream; +import java.util.concurrent.ThreadLocalRandom; /** Tests for {@link MigrateIcebergTableProcedure}. */ public class MigrateIcebergTableProcedureITCase extends ActionITCaseBase { + private static final Logger LOG = + LoggerFactory.getLogger(MigrateIcebergTableProcedureITCase.class); + private static final TestHiveMetastore TEST_HIVE_METASTORE = new TestHiveMetastore(); private static final int PORT = 9087; @@ -64,31 +68,26 @@ public void afterEach() throws Exception { TEST_HIVE_METASTORE.stop(); } - private static Stream testIcebergArguments() { - // isPartitioned, icebergIsHive, paimonIsHive, isNamedArgument - return Stream.of( - Arguments.of(true, false, false, true), - Arguments.of(false, false, false, true), - Arguments.of(true, true, false, true), - Arguments.of(false, true, false, true), - Arguments.of(true, true, false, false), - Arguments.of(false, true, false, false), - Arguments.of(true, false, true, true), - Arguments.of(true, true, true, true)); - } - - @ParameterizedTest - @MethodSource("testIcebergArguments") - public void testMigrateIcebergTableProcedure( - boolean isPartitioned, - boolean icebergIsHive, - boolean paimonIsHive, - boolean isNamedArgument) - throws Exception { + @Test + public void testMigrateIcebergTableProcedure() throws Exception { TableEnvironment tEnv = TableEnvironmentImpl.create( EnvironmentSettings.newInstance().inBatchMode().build()); + ThreadLocalRandom random = ThreadLocalRandom.current(); + boolean isPartitioned = random.nextBoolean(); + boolean icebergIsHive = random.nextBoolean(); + boolean paimonIsHive = random.nextBoolean(); + boolean isNamedArgument = random.nextBoolean(); + + // Logging the random arguments for debugging + LOG.info( + "isPartitioned:{}, icebergIsHive:{}, paimonIsHive:{}, isNamedArgument:{}", + isPartitioned, + icebergIsHive, + paimonIsHive, + isNamedArgument); + // create iceberg catalog, database, table, and insert some data to iceberg table tEnv.executeSql(icebergCatalogDdl(icebergIsHive)); From 6b84657352d220cd9669c60caea5191d0c777871 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Tue, 11 Feb 2025 14:07:29 +0800 Subject: [PATCH 09/13] [core][hive] remove FileIO argument in constructor --- .../iceberg/migrate/IcebergMigrateHiveMetadata.java | 13 ++++++++----- .../migrate/IcebergMigrateHiveMetadataFactory.java | 6 ++---- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java index 95ae3f8c1be4..3c0d7da024a0 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadata.java @@ -18,6 +18,7 @@ package org.apache.paimon.iceberg.migrate; +import org.apache.paimon.catalog.CatalogContext; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.client.ClientPool; import org.apache.paimon.fs.FileIO; @@ -45,7 +46,8 @@ public class IcebergMigrateHiveMetadata implements IcebergMigrateMetadata { public static final String ICEBERG_TABLE_TYPE_VALUE = "iceberg"; private static final String ICEBERG_METADATA_LOCATION = "metadata_location"; - private final FileIO fileIO; + private FileIO fileIO; + private final Options icebergOptions; private final Identifier icebergIdentifier; private final ClientPool clients; @@ -54,10 +56,10 @@ public class IcebergMigrateHiveMetadata implements IcebergMigrateMetadata { private IcebergMetadata icebergMetadata; - public IcebergMigrateHiveMetadata( - Identifier icebergIdentifier, FileIO fileIO, Options icebergOptions) { - this.fileIO = fileIO; + public IcebergMigrateHiveMetadata(Identifier icebergIdentifier, Options icebergOptions) { + this.icebergIdentifier = icebergIdentifier; + this.icebergOptions = icebergOptions; String uri = icebergOptions.get(IcebergOptions.URI); String hiveConfDir = icebergOptions.get(IcebergOptions.HIVE_CONF_DIR); @@ -104,7 +106,6 @@ public IcebergMetadata icebergMetadata() { client.getTable( icebergIdentifier.getDatabaseName(), icebergIdentifier.getTableName())); - // TODO:Is this check necessary? // check whether it is an iceberg table String tableType = icebergHiveTable.getParameters().get(TABLE_TYPE_PROP); Preconditions.checkArgument( @@ -116,6 +117,8 @@ public IcebergMetadata icebergMetadata() { metadataLocation = icebergHiveTable.getParameters().get(ICEBERG_METADATA_LOCATION); LOG.info("iceberg latest metadata location: {}", metadataLocation); + fileIO = FileIO.get(new Path(metadataLocation), CatalogContext.create(icebergOptions)); + icebergMetadata = IcebergMetadata.fromPath(fileIO, new Path(metadataLocation)); return icebergMetadata; } catch (Exception e) { diff --git a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadataFactory.java b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadataFactory.java index 184eb3134058..0a539cdec2d2 100644 --- a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadataFactory.java +++ b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrateHiveMetadataFactory.java @@ -19,7 +19,6 @@ package org.apache.paimon.iceberg.migrate; import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.fs.FileIO; import org.apache.paimon.iceberg.IcebergOptions; import org.apache.paimon.options.Options; @@ -31,8 +30,7 @@ public String identifier() { } @Override - public IcebergMigrateHiveMetadata create( - Identifier icebergIdentifier, FileIO fileIO, Options icebergOptions) { - return new IcebergMigrateHiveMetadata(icebergIdentifier, fileIO, icebergOptions); + public IcebergMigrateHiveMetadata create(Identifier icebergIdentifier, Options icebergOptions) { + return new IcebergMigrateHiveMetadata(icebergIdentifier, icebergOptions); } } From ef138c38618626c2abf8d4128c58a59cc6730d56 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Fri, 10 Jan 2025 17:24:47 +0800 Subject: [PATCH 10/13] [core][wip] support iceberg schema evolution --- .../iceberg/manifest/IcebergDataFileMeta.java | 12 + .../iceberg/migrate/IcebergMigrator.java | 146 +++++- .../apache/paimon/migrate/FileMetaUtils.java | 67 ++- .../apache/paimon/schema/SchemaManager.java | 2 +- .../iceberg/migrate/IcebergMigrateTest.java | 419 +++++++++++++++--- 5 files changed, 551 insertions(+), 95 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java index d171962becad..cb78c3c646b5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/manifest/IcebergDataFileMeta.java @@ -83,6 +83,9 @@ public static Content fromId(int id) { private final InternalMap lowerBounds; private final InternalMap upperBounds; + // only used for iceberg migrate + private long schemaId = 0; + IcebergDataFileMeta( Content content, String filePath, @@ -201,6 +204,15 @@ public InternalMap upperBounds() { return upperBounds; } + public long schemaId() { + return schemaId; + } + + public IcebergDataFileMeta withSchemaId(long schemaId) { + this.schemaId = schemaId; + return this; + } + public static RowType schema(RowType partitionType) { List fields = new ArrayList<>(); fields.add(new DataField(134, "content", DataTypes.INT().notNull())); diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java index 9e91fa2d18a8..16147cdb1e46 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java @@ -23,6 +23,7 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.factories.FactoryException; import org.apache.paimon.factories.FactoryUtil; +import org.apache.paimon.format.avro.SeekableInputStreamWrapper; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; @@ -43,19 +44,28 @@ import org.apache.paimon.migrate.Migrator; import org.apache.paimon.options.Options; import org.apache.paimon.schema.Schema; +import org.apache.paimon.schema.SchemaManager; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.table.sink.BatchTableCommit; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.types.DataField; +import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.utils.Preconditions; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.file.FileReader; +import org.apache.avro.file.SeekableInput; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; @@ -136,15 +146,28 @@ public IcebergMigrator( @Override public void executeMigrate() throws Exception { - Schema paimonSchema = icebergSchemaToPaimonSchema(icebergMetadata); + List paimonSchemas = icebergSchemasToPaimonSchemas(icebergMetadata); + Preconditions.checkArgument( + !paimonSchemas.isEmpty(), + "paimon schemas transformed from iceberg table is empty."); Identifier paimonIdentifier = Identifier.create(paimonDatabaseName, paimonTableName); paimonCatalog.createDatabase(paimonDatabaseName, true); - paimonCatalog.createTable(paimonIdentifier, paimonSchema, false); + TableSchema firstSchema = paimonSchemas.get(0); + Preconditions.checkArgument(firstSchema.id() == 0, "Unexpected, first schema id is not 0."); + paimonCatalog.createTable(paimonIdentifier, paimonSchemas.get(0).toSchema(), false); try { FileStoreTable paimonTable = (FileStoreTable) paimonCatalog.getTable(paimonIdentifier); FileIO fileIO = paimonTable.fileIO(); + SchemaManager schemaManager = paimonTable.schemaManager(); + // commit all the iceberg schemas + for (int i = 1; i < paimonSchemas.size(); i++) { + LOG.info( + "commit new schema from iceberg, new schema id:{}", + paimonSchemas.get(i).id()); + schemaManager.commit(paimonSchemas.get(i)); + } IcebergManifestFile manifestFile = IcebergManifestFile.create(paimonTable, icebergMetaPathFactory); @@ -157,25 +180,70 @@ public void executeMigrate() throws Exception { // check manifest file with 'DELETE' kind checkAndFilterManifestFiles(icebergManifestFileMetas); - // get all live iceberg entries - List icebergEntries = - icebergManifestFileMetas.stream() - .flatMap(fileMeta -> manifestFile.read(fileMeta).stream()) - .filter(IcebergManifestEntry::isLive) - .collect(Collectors.toList()); - if (icebergEntries.isEmpty()) { + Map> icebergEntries = new HashMap<>(); + for (IcebergManifestFileMeta icebergManifestFileMeta : icebergManifestFileMetas) { + long schemaId = + getSchemaIdFromIcebergManifestFile( + new Path(icebergManifestFileMeta.manifestPath())); + List entries = manifestFile.read(icebergManifestFileMeta); + icebergEntries + .computeIfAbsent(schemaId, v -> new ArrayList<>()) + .addAll( + entries.stream() + .filter(IcebergManifestEntry::isLive) + .collect(Collectors.toList())); + } + + // // get all live iceberg entries + // List icebergEntries = + // icebergManifestFileMetas.stream() + // .flatMap( + // fileMeta -> { + // List entries = + // manifestFile.read(fileMeta); + // entries.forEach( + // entry -> + // entry.setSchemaId( + // + // getSchemaIdFromIcebergManifestFile( + // new Path( + // + // fileMeta + // + // .manifestPath())))); + // return entries.stream(); + // }) + // .filter(IcebergManifestEntry::isLive) + // .collect(Collectors.toList()); + // if (icebergEntries.isEmpty()) { + // LOG.info( + // "No live manifest entry in iceberg table for snapshot {}, + // iceberg table meta path is {}.", + // icebergMetadata.currentSnapshotId(), + // icebergLatestMetadataLocation); + // return; + // } + + // List icebergDataFileMetas = + // icebergEntries.stream() + // .map(IcebergManifestEntry::file) + // .collect(Collectors.toList()); + + List icebergDataFileMetas = new ArrayList<>(); + for (Map.Entry> kv : icebergEntries.entrySet()) { + icebergDataFileMetas.addAll( + kv.getValue().stream() + .map(entry -> entry.file().withSchemaId(kv.getKey())) + .collect(Collectors.toList())); + } + + if (icebergDataFileMetas.isEmpty()) { LOG.info( - "No live manifest entry in iceberg table for snapshot {}, iceberg table meta path is {}.", + "No live iceberg data files in iceberg table for snapshot {}, iceberg table meta path is {}.", icebergMetadata.currentSnapshotId(), icebergLatestMetadataLocation); return; } - - List icebergDataFileMetas = - icebergEntries.stream() - .map(IcebergManifestEntry::file) - .collect(Collectors.toList()); - // Again, check if delete File exists checkAndFilterDataFiles(icebergDataFileMetas); @@ -246,10 +314,24 @@ public void renameTable(boolean ignoreIfNotExists) throws Exception { paimonCatalog.renameTable(targetTableId, sourceTableId, ignoreIfNotExists); } - public Schema icebergSchemaToPaimonSchema(IcebergMetadata icebergMetadata) { - // get iceberg current schema - IcebergSchema icebergSchema = - icebergMetadata.schemas().get(icebergMetadata.currentSchemaId()); + public List icebergSchemasToPaimonSchemas(IcebergMetadata icebergMetadata) { + return icebergMetadata.schemas().stream() + .map( + icebergSchema -> { + LOG.info( + "Convert iceberg schema to paimon schema, iceberg schema id: {}", + icebergSchema.schemaId()); + return TableSchema.create( + icebergSchema.schemaId(), + icebergSchemaToPaimonSchema(icebergSchema)); + }) + .collect(Collectors.toList()); + } + + public Schema icebergSchemaToPaimonSchema(IcebergSchema icebergSchema) { + // // get iceberg current schema + // IcebergSchema icebergSchema = + // icebergMetadata.schemas().get(icebergMetadata.currentSchemaId()); // get iceberg current partition spec int currentPartitionSpecId = icebergMetadata.defaultSpecId(); @@ -289,6 +371,26 @@ private void checkAndFilterDataFiles(List icebergDataFileMe } } + public long getSchemaIdFromIcebergManifestFile(Path manifestPath) { + try { + SeekableInput in = + new SeekableInputStreamWrapper( + paimonFileIO.newInputStream(manifestPath), + paimonFileIO.getFileSize(manifestPath)); + FileReader dataFileReader = + DataFileReader.openReader(in, new GenericDatumReader<>()); + String schema = ((DataFileReader) dataFileReader).getMetaString("schema"); + dataFileReader.close(); + return JsonSerdeUtil.fromJson(schema, IcebergSchema.class).schemaId(); + } catch (Exception e) { + throw new RuntimeException( + String.format( + "Can not get schema id in iceberg manifest file, file path is %s", + manifestPath.toString()), + e); + } + } + private static List construct( List icebergDataFileMetas, FileIO fileIO, @@ -318,7 +420,9 @@ private static DataFileMeta constructFileMeta( e); } String format = icebergDataFileMeta.fileFormat(); - return FileMetaUtils.constructFileMeta(format, status, fileIO, table, dir, rollback); + long schemaId = icebergDataFileMeta.schemaId(); + return FileMetaUtils.constructFileMeta( + format, status, fileIO, table, dir, rollback, schemaId); } private MigrateTask importUnPartitionedTable( diff --git a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java index 366f8afcfd38..405870d5fa03 100644 --- a/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java +++ b/paimon-core/src/main/java/org/apache/paimon/migrate/FileMetaUtils.java @@ -129,6 +129,47 @@ public static DataFileMeta constructFileMeta( } } + public static DataFileMeta constructFileMeta( + String format, + FileStatus fileStatus, + FileIO fileIO, + Table table, + Path dir, + Map rollback, + long schemaId) { + + try { + RowType rowTypeWithSchemaId = + ((FileStoreTable) table).schemaManager().schema(schemaId).logicalRowType(); + SimpleColStatsCollector.Factory[] factories = + StatsCollectorFactories.createStatsFactories( + ((FileStoreTable) table).coreOptions(), + rowTypeWithSchemaId.getFieldNames()); + + SimpleStatsExtractor simpleStatsExtractor = + FileFormat.fromIdentifier( + format, + ((FileStoreTable) table).coreOptions().toConfiguration()) + .createStatsExtractor(rowTypeWithSchemaId, factories) + .orElseThrow( + () -> + new RuntimeException( + "Can't get table stats extractor for format " + + format)); + Path newPath = renameFile(fileIO, fileStatus.getPath(), dir, format, rollback); + return constructFileMeta( + newPath.getName(), + fileStatus.getLen(), + newPath, + simpleStatsExtractor, + fileIO, + table, + schemaId); + } catch (IOException e) { + throw new RuntimeException("error when construct file meta", e); + } + } + // -----------------------------private method--------------------------------------------- private static Path renameFile( @@ -152,7 +193,29 @@ private static DataFileMeta constructFileMeta( FileIO fileIO, Table table) throws IOException { - SimpleStatsConverter statsArraySerializer = new SimpleStatsConverter(table.rowType()); + return constructFileMeta( + fileName, + fileSize, + path, + simpleStatsExtractor, + fileIO, + table, + ((FileStoreTable) table).schema().id()); + } + + private static DataFileMeta constructFileMeta( + String fileName, + long fileSize, + Path path, + SimpleStatsExtractor simpleStatsExtractor, + FileIO fileIO, + Table table, + long schemaId) + throws IOException { + RowType rowTypeWithSchemaId = + ((FileStoreTable) table).schemaManager().schema(schemaId).logicalRowType(); + + SimpleStatsConverter statsArraySerializer = new SimpleStatsConverter(rowTypeWithSchemaId); Pair fileInfo = simpleStatsExtractor.extractWithFileInfo(fileIO, path); @@ -165,7 +228,7 @@ private static DataFileMeta constructFileMeta( stats, 0, 0, - ((FileStoreTable) table).schema().id(), + schemaId, Collections.emptyList(), null, FileSource.APPEND, diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 753bc34d95ef..d15312fce1f7 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -769,7 +769,7 @@ protected void updateLastColumn(List newFields, String fieldName) } @VisibleForTesting - boolean commit(TableSchema newSchema) throws Exception { + public boolean commit(TableSchema newSchema) throws Exception { SchemaValidation.validateTableSchema(newSchema); SchemaValidation.validateFallbackBranch(this, newSchema); Path schemaPath = toSchemaPath(newSchema.id()); diff --git a/paimon-core/src/test/java/org/apache/paimon/iceberg/migrate/IcebergMigrateTest.java b/paimon-core/src/test/java/org/apache/paimon/iceberg/migrate/IcebergMigrateTest.java index aadaca0c3854..455ecbd57ca5 100644 --- a/paimon-core/src/test/java/org/apache/paimon/iceberg/migrate/IcebergMigrateTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/iceberg/migrate/IcebergMigrateTest.java @@ -64,8 +64,10 @@ import java.time.LocalDateTime; import java.time.OffsetDateTime; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.UUID; @@ -118,27 +120,7 @@ public void beforeEach() throws Exception { public void testMigrateOnlyAdd(boolean isPartitioned) throws Exception { Table icebergTable = createIcebergTable(isPartitioned); String format = "parquet"; - List records1 = - Stream.of( - toIcebergRecord(1, 1, "20240101", "00"), - toIcebergRecord(2, 2, "20240101", "00")) - .collect(Collectors.toList()); - if (isPartitioned) { - writeRecordsToIceberg(icebergTable, format, records1, "20240101", "00"); - } else { - writeRecordsToIceberg(icebergTable, format, records1); - } - - List records2 = - Stream.of( - toIcebergRecord(1, 1, "20240101", "01"), - toIcebergRecord(2, 2, "20240101", "01")) - .collect(Collectors.toList()); - if (isPartitioned) { - writeRecordsToIceberg(icebergTable, format, records2, "20240101", "01"); - } else { - writeRecordsToIceberg(icebergTable, format, records2); - } + writeInitialData(icebergTable, format, isPartitioned); IcebergMigrator icebergMigrator = new IcebergMigrator( @@ -160,8 +142,11 @@ public void testMigrateOnlyAdd(boolean isPartitioned) throws Exception { .map(row -> String.format("Record(%s)", row)) .collect(Collectors.toList())) .hasSameElementsAs( - Stream.concat(records1.stream(), records2.stream()) - .map(GenericRecord::toString) + Stream.of( + "Record(1, 1, 20240101, 00)", + "Record(2, 2, 20240101, 00)", + "Record(1, 1, 20240101, 01)", + "Record(2, 2, 20240101, 01)") .collect(Collectors.toList())); // verify iceberg table has been deleted @@ -173,27 +158,7 @@ public void testMigrateOnlyAdd(boolean isPartitioned) throws Exception { public void testMigrateAddAndDelete(boolean isPartitioned) throws Exception { Table icebergTable = createIcebergTable(isPartitioned); String format = "parquet"; - List records1 = - Stream.of( - toIcebergRecord(1, 1, "20240101", "00"), - toIcebergRecord(2, 2, "20240101", "00")) - .collect(Collectors.toList()); - if (isPartitioned) { - writeRecordsToIceberg(icebergTable, format, records1, "20240101", "00"); - } else { - writeRecordsToIceberg(icebergTable, format, records1); - } - - List records2 = - Stream.of( - toIcebergRecord(1, 1, "20240101", "01"), - toIcebergRecord(2, 2, "20240101", "01")) - .collect(Collectors.toList()); - if (isPartitioned) { - writeRecordsToIceberg(icebergTable, format, records2, "20240101", "01"); - } else { - writeRecordsToIceberg(icebergTable, format, records2); - } + writeInitialData(icebergTable, format, isPartitioned); // the file written with records2 will be deleted and generate a delete manifest entry, not // a delete file @@ -218,8 +183,7 @@ public void testMigrateAddAndDelete(boolean isPartitioned) throws Exception { .map(row -> String.format("Record(%s)", row)) .collect(Collectors.toList())) .hasSameElementsAs( - records2.stream() - .map(GenericRecord::toString) + Stream.of("Record(1, 1, 20240101, 01)", "Record(2, 2, 20240101, 01)") .collect(Collectors.toList())); } @@ -324,45 +288,115 @@ public void testMigrateWithRandomIcebergData(boolean isPartitioned) throws Excep @ParameterizedTest(name = "isPartitioned = {0}") @ValueSource(booleans = {true, false}) - public void testMigrateWithSchemaEvolution(boolean isPartitioned) throws Exception { + public void testDeleteColumn(boolean isPartitioned) throws Exception { Table icebergTable = createIcebergTable(isPartitioned); String format = "parquet"; + writeInitialData(icebergTable, format, isPartitioned); - // write base data - List records1 = + icebergTable.updateSchema().deleteColumn("v").commit(); + Schema newIceSchema = icebergTable.schema(); + List addedRecords = Stream.of( - toIcebergRecord(1, 1, "20240101", "00"), - toIcebergRecord(2, 2, "20240101", "00")) + toIcebergRecord(3, "20240101", "00", newIceSchema), + toIcebergRecord(4, "20240101", "00", newIceSchema)) .collect(Collectors.toList()); if (isPartitioned) { - writeRecordsToIceberg(icebergTable, format, records1, "20240101", "00"); + writeRecordsToIceberg(icebergTable, format, addedRecords, "20240101", "00"); } else { - writeRecordsToIceberg(icebergTable, format, records1); + writeRecordsToIceberg(icebergTable, format, addedRecords); } - List records2 = + IcebergMigrator icebergMigrator = + new IcebergMigrator( + paiCatalog, + paiDatabase, + paiTable, + iceDatabase, + iceTable, + new Options(icebergProperties), + 1); + icebergMigrator.executeMigrate(); + + FileStoreTable paimonTable = + (FileStoreTable) paiCatalog.getTable(Identifier.create(paiDatabase, paiTable)); + List paiResults = getPaimonResult(paimonTable); + assertThat( + paiResults.stream() + .map(row -> String.format("Record(%s)", row)) + .collect(Collectors.toList())) + .hasSameElementsAs( + Stream.of( + "Record(1, 20240101, 00)", + "Record(2, 20240101, 00)", + "Record(1, 20240101, 01)", + "Record(2, 20240101, 01)", + "Record(3, 20240101, 00)", + "Record(4, 20240101, 00)") + .collect(Collectors.toList())); + } + + @ParameterizedTest(name = "isPartitioned = {0}") + @ValueSource(booleans = {true, false}) + public void testRenameColumn(boolean isPartitioned) throws Exception { + Table icebergTable = createIcebergTable(isPartitioned); + String format = "parquet"; + writeInitialData(icebergTable, format, isPartitioned); + + icebergTable.updateSchema().renameColumn("v", "v2").commit(); + Schema newIceSchema = icebergTable.schema(); + List addedRecords = Stream.of( - toIcebergRecord(1, 1, "20240101", "01"), - toIcebergRecord(2, 2, "20240101", "01")) + toIcebergRecord(newIceSchema, 3, 3, "20240101", "00"), + toIcebergRecord(newIceSchema, 4, 4, "20240101", "00")) .collect(Collectors.toList()); if (isPartitioned) { - writeRecordsToIceberg(icebergTable, format, records2, "20240101", "01"); + writeRecordsToIceberg(icebergTable, format, addedRecords, "20240101", "00"); } else { - writeRecordsToIceberg(icebergTable, format, records2); + writeRecordsToIceberg(icebergTable, format, addedRecords); } - // TODO: currently only support schema evolution of deleting columns - testDeleteColumn(icebergTable, format, isPartitioned); + IcebergMigrator icebergMigrator = + new IcebergMigrator( + paiCatalog, + paiDatabase, + paiTable, + iceDatabase, + iceTable, + new Options(icebergProperties), + 1); + icebergMigrator.executeMigrate(); + + FileStoreTable paimonTable = + (FileStoreTable) paiCatalog.getTable(Identifier.create(paiDatabase, paiTable)); + List paiResults = getPaimonResult(paimonTable); + assertThat( + paiResults.stream() + .map(row -> String.format("Record(%s)", row)) + .collect(Collectors.toList())) + .hasSameElementsAs( + Stream.of( + "Record(1, 1, 20240101, 00)", + "Record(2, 2, 20240101, 00)", + "Record(1, 1, 20240101, 01)", + "Record(2, 2, 20240101, 01)", + "Record(3, 3, 20240101, 00)", + "Record(4, 4, 20240101, 00)") + .collect(Collectors.toList())); } - private void testDeleteColumn(Table icebergTable, String format, boolean isPartitioned) - throws Exception { - icebergTable.updateSchema().deleteColumn("v").commit(); + @ParameterizedTest(name = "isPartitioned = {0}") + @ValueSource(booleans = {true, false}) + public void testAddColumn(boolean isPartitioned) throws Exception { + Table icebergTable = createIcebergTable(isPartitioned); + String format = "parquet"; + writeInitialData(icebergTable, format, isPartitioned); + + icebergTable.updateSchema().addColumn("v2", Types.IntegerType.get()).commit(); Schema newIceSchema = icebergTable.schema(); List addedRecords = Stream.of( - toIcebergRecord(3, "20240101", "00", newIceSchema), - toIcebergRecord(4, "20240101", "00", newIceSchema)) + toIcebergRecord(newIceSchema, 3, 3, "20240101", "00", 3), + toIcebergRecord(newIceSchema, 4, 4, "20240101", "00", 4)) .collect(Collectors.toList()); if (isPartitioned) { writeRecordsToIceberg(icebergTable, format, addedRecords, "20240101", "00"); @@ -390,15 +424,233 @@ private void testDeleteColumn(Table icebergTable, String format, boolean isParti .collect(Collectors.toList())) .hasSameElementsAs( Stream.of( - "Record(1, 20240101, 00)", - "Record(2, 20240101, 00)", - "Record(1, 20240101, 01)", - "Record(2, 20240101, 01)", - "Record(3, 20240101, 00)", - "Record(4, 20240101, 00)") + "Record(1, 1, 20240101, 00, NULL)", + "Record(2, 2, 20240101, 00, NULL)", + "Record(1, 1, 20240101, 01, NULL)", + "Record(2, 2, 20240101, 01, NULL)", + "Record(3, 3, 20240101, 00, 3)", + "Record(4, 4, 20240101, 00, 4)") + .collect(Collectors.toList())); + } + + @ParameterizedTest(name = "isPartitioned = {0}") + @ValueSource(booleans = {true, false}) + public void testReorderColumn(boolean isPartitioned) throws Exception { + Table icebergTable = createIcebergTable(isPartitioned); + String format = "parquet"; + writeInitialData(icebergTable, format, isPartitioned); + + icebergTable.updateSchema().moveAfter("v", "hh").commit(); + Schema newIceSchema = icebergTable.schema(); + List addedRecords = + Stream.of( + toIcebergRecord(newIceSchema, 3, "20240101", "00", 3), + toIcebergRecord(newIceSchema, 4, "20240101", "00", 4)) + .collect(Collectors.toList()); + if (isPartitioned) { + writeRecordsToIceberg(icebergTable, format, addedRecords, "20240101", "00"); + } else { + writeRecordsToIceberg(icebergTable, format, addedRecords); + } + + IcebergMigrator icebergMigrator = + new IcebergMigrator( + paiCatalog, + paiDatabase, + paiTable, + iceDatabase, + iceTable, + new Options(icebergProperties), + 1); + icebergMigrator.executeMigrate(); + + FileStoreTable paimonTable = + (FileStoreTable) paiCatalog.getTable(Identifier.create(paiDatabase, paiTable)); + List paiResults = getPaimonResult(paimonTable); + assertThat( + paiResults.stream() + .map(row -> String.format("Record(%s)", row)) + .collect(Collectors.toList())) + .hasSameElementsAs( + Stream.of( + "Record(1, 20240101, 00, 1)", + "Record(2, 20240101, 00, 2)", + "Record(1, 20240101, 01, 1)", + "Record(2, 20240101, 01, 2)", + "Record(3, 20240101, 00, 3)", + "Record(4, 20240101, 00, 4)") .collect(Collectors.toList())); } + @ParameterizedTest(name = "isPartitioned = {0}") + @ValueSource(booleans = {true, false}) + public void testUpdateColumn(boolean isPartitioned) throws Exception { + Table icebergTable = createIcebergTable(isPartitioned); + String format = "parquet"; + writeInitialData(icebergTable, format, isPartitioned); + + icebergTable.updateSchema().updateColumn("v", Types.LongType.get()).commit(); + Schema newIceSchema = icebergTable.schema(); + List addedRecords = + Stream.of( + toIcebergRecord(newIceSchema, 3, 3L, "20240101", "00"), + toIcebergRecord(newIceSchema, 4, 3L, "20240101", "00")) + .collect(Collectors.toList()); + if (isPartitioned) { + writeRecordsToIceberg(icebergTable, format, addedRecords, "20240101", "00"); + } else { + writeRecordsToIceberg(icebergTable, format, addedRecords); + } + + IcebergMigrator icebergMigrator = + new IcebergMigrator( + paiCatalog, + paiDatabase, + paiTable, + iceDatabase, + iceTable, + new Options(icebergProperties), + 1); + icebergMigrator.executeMigrate(); + + FileStoreTable paimonTable = + (FileStoreTable) paiCatalog.getTable(Identifier.create(paiDatabase, paiTable)); + List paiResults = getPaimonResult(paimonTable); + assertThat( + paiResults.stream() + .map(row -> String.format("Record(%s)", row)) + .collect(Collectors.toList())) + .hasSameElementsAs( + Stream.of( + "Record(1, 1, 20240101, 00)", + "Record(2, 2, 20240101, 00)", + "Record(1, 1, 20240101, 01)", + "Record(2, 2, 20240101, 01)", + "Record(3, 3, 20240101, 00)", + "Record(4, 3, 20240101, 00)") + .collect(Collectors.toList())); + } + + @ParameterizedTest(name = "isPartitioned = {0}") + @ValueSource(booleans = {true, false}) + public void testMigrateWithRandomIcebergEvolution(boolean isPartitioned) throws Exception { + Table icebergTable = createIcebergTable(isPartitioned); + icebergTable.updateSchema().addColumn("v2", Types.IntegerType.get()).commit(); + String format = "parquet"; + List index = new LinkedList<>(Arrays.asList("k", "v", "dt", "hh", "v2")); + + int numRounds = 20; + int numRecords = 10; + List ops = new ArrayList<>(Arrays.asList(1, 2, 3, 4, 5)); + ThreadLocalRandom random = ThreadLocalRandom.current(); + boolean isTypeChange = false; + List> expectRecords = new ArrayList<>(); + + for (int i = 0; i < numRounds; i++) { + List records = new ArrayList<>(); + String dt = Integer.toString(random.nextInt(20240101, 20240104)); + String hh = Integer.toString(random.nextInt(3)); + + if ((i + 1) % 4 == 0 && !ops.isEmpty()) { + switch (ops.remove(random.nextInt(ops.size()))) { + case 1: + icebergTable + .updateSchema() + .addColumn("v3", Types.IntegerType.get()) + .commit(); + for (List record : expectRecords) { + record.add("NULL"); + } + index.add("v3"); + break; + case 2: + icebergTable.updateSchema().renameColumn("v", "vv").commit(); + break; + case 3: + icebergTable.updateSchema().deleteColumn("v2").commit(); + int v2Idx = index.indexOf("v2"); + for (List record : expectRecords) { + record.remove(v2Idx); + } + index.remove(v2Idx); + break; + case 4: + icebergTable.updateSchema().moveAfter("k", "hh").commit(); + int kIdx = index.indexOf("k"); + int hhIdx = index.indexOf("hh"); + for (List record : expectRecords) { + String k = record.remove(kIdx); + record.add(hhIdx, k); + } + index.remove(kIdx); + index.add(hhIdx, "k"); + break; + case 5: + icebergTable + .updateSchema() + .updateColumn("k", Types.LongType.get()) + .commit(); + isTypeChange = true; + break; + default: + throw new IllegalStateException("Unknown operation"); + } + } + for (int j = 0; j < numRecords; j++) { + List recordString = new ArrayList<>(); + GenericRecord record = GenericRecord.create(icebergTable.schema()); + for (int idx = 0; idx < index.size(); idx++) { + String field = index.get(idx); + if (field.equals("dt")) { + record.set(idx, dt); + recordString.add(dt); + } else if (field.equals("hh")) { + record.set(idx, hh); + recordString.add(hh); + } else { + int value = random.nextInt(100); + if (field.equals("k") && isTypeChange) { + record.set(idx, (long) value); + } else { + record.set(idx, value); + } + recordString.add(String.valueOf(value)); + } + } + records.add(record); + expectRecords.add(recordString); + } + + if (isPartitioned) { + writeRecordsToIceberg(icebergTable, format, records, dt, hh); + } else { + writeRecordsToIceberg(icebergTable, format, records); + } + } + + IcebergMigrator icebergMigrator = + new IcebergMigrator( + paiCatalog, + paiDatabase, + paiTable, + iceDatabase, + iceTable, + new Options(icebergProperties), + 1); + icebergMigrator.executeMigrate(); + + FileStoreTable paimonTable = + (FileStoreTable) paiCatalog.getTable(Identifier.create(paiDatabase, paiTable)); + List paiResults = getPaimonResult(paimonTable); + System.out.println(); + assertThat( + paiResults.stream() + .map(row -> String.format("[%s]", row)) + .collect(Collectors.toList())) + .hasSameElementsAs( + expectRecords.stream().map(List::toString).collect(Collectors.toList())); + } + @Test public void testAllDataTypes() throws Exception { Schema iceAllTypesSchema = @@ -490,6 +742,31 @@ private Table createIcebergTable(boolean isPartitioned, Schema icebergSchema) { } } + private void writeInitialData(Table icebergTable, String format, boolean isPartitioned) + throws IOException { + List records1 = + Stream.of( + toIcebergRecord(1, 1, "20240101", "00"), + toIcebergRecord(2, 2, "20240101", "00")) + .collect(Collectors.toList()); + if (isPartitioned) { + writeRecordsToIceberg(icebergTable, format, records1, "20240101", "00"); + } else { + writeRecordsToIceberg(icebergTable, format, records1); + } + + List records2 = + Stream.of( + toIcebergRecord(1, 1, "20240101", "01"), + toIcebergRecord(2, 2, "20240101", "01")) + .collect(Collectors.toList()); + if (isPartitioned) { + writeRecordsToIceberg(icebergTable, format, records2, "20240101", "01"); + } else { + writeRecordsToIceberg(icebergTable, format, records2); + } + } + private GenericRecord toIcebergRecord(Schema icebergSchema, Object... values) { GenericRecord record = GenericRecord.create(icebergSchema); for (int i = 0; i < values.length; i++) { From 18247a2762ee2febcfe2569a191ae216faf51beb Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Mon, 13 Jan 2025 14:35:19 +0800 Subject: [PATCH 11/13] [core] delete useless segments --- .../iceberg/migrate/IcebergMigrator.java | 42 ++----------------- 1 file changed, 3 insertions(+), 39 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java index 16147cdb1e46..990278d7dfd2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java @@ -155,7 +155,7 @@ public void executeMigrate() throws Exception { paimonCatalog.createDatabase(paimonDatabaseName, true); TableSchema firstSchema = paimonSchemas.get(0); Preconditions.checkArgument(firstSchema.id() == 0, "Unexpected, first schema id is not 0."); - paimonCatalog.createTable(paimonIdentifier, paimonSchemas.get(0).toSchema(), false); + paimonCatalog.createTable(paimonIdentifier, firstSchema.toSchema(), false); try { FileStoreTable paimonTable = (FileStoreTable) paimonCatalog.getTable(paimonIdentifier); @@ -194,42 +194,8 @@ public void executeMigrate() throws Exception { .collect(Collectors.toList())); } - // // get all live iceberg entries - // List icebergEntries = - // icebergManifestFileMetas.stream() - // .flatMap( - // fileMeta -> { - // List entries = - // manifestFile.read(fileMeta); - // entries.forEach( - // entry -> - // entry.setSchemaId( - // - // getSchemaIdFromIcebergManifestFile( - // new Path( - // - // fileMeta - // - // .manifestPath())))); - // return entries.stream(); - // }) - // .filter(IcebergManifestEntry::isLive) - // .collect(Collectors.toList()); - // if (icebergEntries.isEmpty()) { - // LOG.info( - // "No live manifest entry in iceberg table for snapshot {}, - // iceberg table meta path is {}.", - // icebergMetadata.currentSnapshotId(), - // icebergLatestMetadataLocation); - // return; - // } - - // List icebergDataFileMetas = - // icebergEntries.stream() - // .map(IcebergManifestEntry::file) - // .collect(Collectors.toList()); - List icebergDataFileMetas = new ArrayList<>(); + // write schema id to IcebergDataFileMeta for (Map.Entry> kv : icebergEntries.entrySet()) { icebergDataFileMetas.addAll( kv.getValue().stream() @@ -329,9 +295,6 @@ public List icebergSchemasToPaimonSchemas(IcebergMetadata icebergMe } public Schema icebergSchemaToPaimonSchema(IcebergSchema icebergSchema) { - // // get iceberg current schema - // IcebergSchema icebergSchema = - // icebergMetadata.schemas().get(icebergMetadata.currentSchemaId()); // get iceberg current partition spec int currentPartitionSpecId = icebergMetadata.defaultSpecId(); @@ -373,6 +336,7 @@ private void checkAndFilterDataFiles(List icebergDataFileMe public long getSchemaIdFromIcebergManifestFile(Path manifestPath) { try { + // read raw iceberg manifest file with avro format to get the schema SeekableInput in = new SeekableInputStreamWrapper( paimonFileIO.newInputStream(manifestPath), From c42678d35aa5f56a31d13c00ab90ba27cd4ec713 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Tue, 14 Jan 2025 17:20:53 +0800 Subject: [PATCH 12/13] [core] change the way of getting schema id from manifest file for avoiding dependency conflicts --- .../iceberg/migrate/IcebergMigrator.java | 28 ++++++------------- 1 file changed, 8 insertions(+), 20 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java index 990278d7dfd2..5ec8613642d8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java @@ -23,7 +23,6 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.factories.FactoryException; import org.apache.paimon.factories.FactoryUtil; -import org.apache.paimon.format.avro.SeekableInputStreamWrapper; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.FileStatus; import org.apache.paimon.fs.Path; @@ -54,9 +53,7 @@ import org.apache.paimon.utils.JsonSerdeUtil; import org.apache.paimon.utils.Preconditions; -import org.apache.avro.file.DataFileReader; -import org.apache.avro.file.FileReader; -import org.apache.avro.file.SeekableInput; +import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.slf4j.Logger; @@ -335,23 +332,14 @@ private void checkAndFilterDataFiles(List icebergDataFileMe } public long getSchemaIdFromIcebergManifestFile(Path manifestPath) { - try { - // read raw iceberg manifest file with avro format to get the schema - SeekableInput in = - new SeekableInputStreamWrapper( - paimonFileIO.newInputStream(manifestPath), - paimonFileIO.getFileSize(manifestPath)); - FileReader dataFileReader = - DataFileReader.openReader(in, new GenericDatumReader<>()); - String schema = ((DataFileReader) dataFileReader).getMetaString("schema"); - dataFileReader.close(); + + try (DataFileStream dataFileStream = + new DataFileStream<>( + paimonFileIO.newInputStream(manifestPath), new GenericDatumReader<>())) { + String schema = dataFileStream.getMetaString("schema"); return JsonSerdeUtil.fromJson(schema, IcebergSchema.class).schemaId(); - } catch (Exception e) { - throw new RuntimeException( - String.format( - "Can not get schema id in iceberg manifest file, file path is %s", - manifestPath.toString()), - e); + } catch (IOException e) { + throw new RuntimeException(e); } } From f08bcaa08d1f2fb8fccb12dc418dd4fe9e95f9d2 Mon Sep 17 00:00:00 2001 From: LsomeYeah Date: Thu, 13 Feb 2025 19:27:21 +0800 Subject: [PATCH 13/13] resolve rebase conflicts --- .../org/apache/paimon/iceberg/migrate/IcebergMigrator.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java index 5ec8613642d8..e2f56c437538 100644 --- a/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java +++ b/paimon-core/src/main/java/org/apache/paimon/iceberg/migrate/IcebergMigrator.java @@ -181,7 +181,7 @@ public void executeMigrate() throws Exception { for (IcebergManifestFileMeta icebergManifestFileMeta : icebergManifestFileMetas) { long schemaId = getSchemaIdFromIcebergManifestFile( - new Path(icebergManifestFileMeta.manifestPath())); + new Path(icebergManifestFileMeta.manifestPath()), fileIO); List entries = manifestFile.read(icebergManifestFileMeta); icebergEntries .computeIfAbsent(schemaId, v -> new ArrayList<>()) @@ -331,11 +331,11 @@ private void checkAndFilterDataFiles(List icebergDataFileMe } } - public long getSchemaIdFromIcebergManifestFile(Path manifestPath) { + public long getSchemaIdFromIcebergManifestFile(Path manifestPath, FileIO fileIO) { try (DataFileStream dataFileStream = new DataFileStream<>( - paimonFileIO.newInputStream(manifestPath), new GenericDatumReader<>())) { + fileIO.newInputStream(manifestPath), new GenericDatumReader<>())) { String schema = dataFileStream.getMetaString("schema"); return JsonSerdeUtil.fromJson(schema, IcebergSchema.class).schemaId(); } catch (IOException e) {