Skip to content
Merged
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.utils.TableMigrationUtils;
import org.apache.paimon.migrate.Migrator;
import org.apache.paimon.utils.ParameterUtils;

import org.apache.flink.table.procedure.ProcedureContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Migrate procedure to migrate iceberg table to paimon table. */
public class MigrateIcebergTableProcedure extends ProcedureBase {

private static final Logger LOG = LoggerFactory.getLogger(MigrateIcebergTableProcedure.class);

private static final String PAIMON_SUFFIX = "_paimon_";

@Override
public String identifier() {
return "migrate_iceberg_table";
}

public String[] call(
ProcedureContext procedureContext, String sourceTablePath, String icebergProperties)
throws Exception {

return call(procedureContext, sourceTablePath, icebergProperties, "");
}

public String[] call(
ProcedureContext procedureContext,
String sourceTablePath,
String icebergProperties,
String properties)
throws Exception {

return call(
procedureContext,
sourceTablePath,
icebergProperties,
properties,
Runtime.getRuntime().availableProcessors());
}

public String[] call(
ProcedureContext procedureContext,
String sourceTablePath,
String icebergProperties,
String properties,
Integer parallelism)
throws Exception {
String targetTablePath = sourceTablePath + PAIMON_SUFFIX;

Identifier sourceTableId = Identifier.fromString(sourceTablePath);
Identifier targetTableId = Identifier.fromString(targetTablePath);

Migrator migrator =
TableMigrationUtils.getIcebergImporter(
catalog,
sourceTableId.getDatabaseName(),
sourceTableId.getObjectName(),
targetTableId.getDatabaseName(),
targetTableId.getObjectName(),
parallelism,
ParameterUtils.parseCommaSeparatedKeyValues(properties),
ParameterUtils.parseCommaSeparatedKeyValues(icebergProperties));
LOG.info("create migrator success.");
migrator.executeMigrate();

migrator.renameTable(false);
return new String[] {"Success"};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.utils.TableMigrationUtils;
import org.apache.paimon.migrate.Migrator;
import org.apache.paimon.utils.ParameterUtils;

import org.apache.flink.table.procedure.ProcedureContext;
Expand Down Expand Up @@ -50,25 +51,13 @@ public String[] call(
String sourceTablePath,
String properties)
throws Exception {
String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;

Identifier sourceTableId = Identifier.fromString(sourceTablePath);
Identifier targetTableId = Identifier.fromString(targetPaimonTablePath);

TableMigrationUtils.getImporter(
connector,
catalog,
sourceTableId.getDatabaseName(),
sourceTableId.getObjectName(),
targetTableId.getDatabaseName(),
targetTableId.getObjectName(),
Runtime.getRuntime().availableProcessors(),
ParameterUtils.parseCommaSeparatedKeyValues(properties))
.executeMigrate();

LOG.info("Last step: rename " + targetTableId + " to " + sourceTableId);
catalog.renameTable(targetTableId, sourceTableId, false);
return new String[] {"Success"};
return call(
procedureContext,
connector,
sourceTablePath,
properties,
Runtime.getRuntime().availableProcessors());
}

public String[] call(
Expand All @@ -78,24 +67,25 @@ public String[] call(
String properties,
Integer parallelism)
throws Exception {
String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;
String targetTablePath = sourceTablePath + PAIMON_SUFFIX;

Identifier sourceTableId = Identifier.fromString(sourceTablePath);
Identifier targetTableId = Identifier.fromString(targetPaimonTablePath);
Identifier targetTableId = Identifier.fromString(targetTablePath);

TableMigrationUtils.getImporter(
Migrator migrator =
TableMigrationUtils.getImporter(
connector,
catalog,
sourceTableId.getDatabaseName(),
sourceTableId.getObjectName(),
targetTableId.getDatabaseName(),
targetTableId.getObjectName(),
parallelism,
ParameterUtils.parseCommaSeparatedKeyValues(properties))
.executeMigrate();
ParameterUtils.parseCommaSeparatedKeyValues(properties));
LOG.info("create migrator success.");
migrator.executeMigrate();

LOG.info("Last step: rename " + targetTableId + " to " + sourceTableId);
catalog.renameTable(targetTableId, sourceTableId, false);
migrator.renameTable(false);
return new String[] {"Success"};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import org.apache.paimon.flink.procedure.MigrateIcebergTableProcedure;

import org.apache.flink.table.procedure.DefaultProcedureContext;

import java.util.Map;

/** Migrate from iceberg table to paimon table. */
public class MigrateIcebergTableAction extends ActionBase {

private final String sourceTableFullName;
private final String tableProperties;
private final Integer parallelism;

private final String icebergProperties;

public MigrateIcebergTableAction(
String sourceTableFullName,
Map<String, String> catalogConfig,
String icebergProperties,
String tableProperties,
Integer parallelism) {
super(catalogConfig);
this.sourceTableFullName = sourceTableFullName;
this.tableProperties = tableProperties;
this.parallelism = parallelism;
this.icebergProperties = icebergProperties;
}

@Override
public void run() throws Exception {
MigrateIcebergTableProcedure migrateIcebergTableProcedure =
new MigrateIcebergTableProcedure();
migrateIcebergTableProcedure.withCatalog(catalog);
migrateIcebergTableProcedure.call(
new DefaultProcedureContext(env),
sourceTableFullName,
icebergProperties,
tableProperties,
parallelism);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import java.util.Map;
import java.util.Optional;

/** Action Factory for {@link MigrateIcebergTableAction}. */
public class MigrateIcebergTableActionFactory implements ActionFactory {

public static final String IDENTIFIER = "migrate_iceberg_table";

private static final String OPTIONS = "options";
private static final String PARALLELISM = "parallelism";

private static final String ICEBERG_OPTIONS = "iceberg_options";

@Override
public String identifier() {
return IDENTIFIER;
}

@Override
public Optional<Action> create(MultipleParameterToolAdapter params) {

String sourceTable = params.get(TABLE);
Map<String, String> catalogConfig = catalogConfigMap(params);
String tableConf = params.get(OPTIONS);
Integer parallelism =
params.get(PARALLELISM) == null ? null : Integer.parseInt(params.get(PARALLELISM));

String icebergOptions = params.get(ICEBERG_OPTIONS);

MigrateIcebergTableAction migrateIcebergTableAction =
new MigrateIcebergTableAction(
sourceTable, catalogConfig, icebergOptions, tableConf, parallelism);
return Optional.of(migrateIcebergTableAction);
}

@Override
public void printHelp() {
System.out.println(
"Action \"migrate_iceberg_table\" runs a migrating job from iceberg to paimon.");
System.out.println();

System.out.println("Syntax:");
System.out.println(
" migrate_iceberg_table"
+ "--table <database.table_name> "
+ "--iceberg_options <key>=<value>[,<key>=<value>,...]"
+ "[--catalog_conf <key>=<value] "
+ "[--options <key>=<value>,<key>=<value>,...]");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.utils.TableMigrationUtils;
import org.apache.paimon.migrate.Migrator;
import org.apache.paimon.utils.ParameterUtils;

import org.apache.flink.table.annotation.ArgumentHint;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.ProcedureHint;
import org.apache.flink.table.procedure.ProcedureContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Migrate procedure to migrate iceberg table to paimon table. */
public class MigrateIcebergTableProcedure extends ProcedureBase {
private static final Logger LOG = LoggerFactory.getLogger(MigrateIcebergTableProcedure.class);

private static final String PAIMON_SUFFIX = "_paimon_";

@Override
public String identifier() {
return "migrate_iceberg_table";
}

@ProcedureHint(
argument = {
@ArgumentHint(name = "source_table", type = @DataTypeHint("STRING")),
@ArgumentHint(
name = "iceberg_options",
type = @DataTypeHint("STRING"),
isOptional = true),
@ArgumentHint(name = "options", type = @DataTypeHint("STRING"), isOptional = true),
@ArgumentHint(
name = "parallelism",
type = @DataTypeHint("Integer"),
isOptional = true)
})
public String[] call(
ProcedureContext procedureContext,
String sourceTablePath,
String icebergProperties,
String properties,
Integer parallelism)
throws Exception {
properties = notnull(properties);
icebergProperties = notnull(icebergProperties);

String targetPaimonTablePath = sourceTablePath + PAIMON_SUFFIX;

Identifier sourceTableId = Identifier.fromString(sourceTablePath);
Identifier targetTableId = Identifier.fromString(targetPaimonTablePath);

Integer p = parallelism == null ? Runtime.getRuntime().availableProcessors() : parallelism;

Migrator migrator =
TableMigrationUtils.getIcebergImporter(
catalog,
sourceTableId.getDatabaseName(),
sourceTableId.getObjectName(),
targetTableId.getDatabaseName(),
targetTableId.getObjectName(),
p,
ParameterUtils.parseCommaSeparatedKeyValues(properties),
ParameterUtils.parseCommaSeparatedKeyValues(icebergProperties));
LOG.info("create migrator success.");
migrator.executeMigrate();

migrator.renameTable(false);
return new String[] {"Success"};
}
}
Loading
Loading