diff --git a/docs/content/cdc-ingestion/kafka-cdc.md b/docs/content/cdc-ingestion/kafka-cdc.md index f57260275ea8..b037937c554f 100644 --- a/docs/content/cdc-ingestion/kafka-cdc.md +++ b/docs/content/cdc-ingestion/kafka-cdc.md @@ -198,6 +198,7 @@ To use this feature through `flink run`, run the following shell command. kafka_sync_database --warehouse \ --database \ + [--table_mapping =] \ [--table_prefix ] \ [--table_suffix ] \ [--including_tables ] \ diff --git a/docs/layouts/shortcodes/generated/kafka_sync_database.html b/docs/layouts/shortcodes/generated/kafka_sync_database.html index 888901991d69..6c90f1d7f7d8 100644 --- a/docs/layouts/shortcodes/generated/kafka_sync_database.html +++ b/docs/layouts/shortcodes/generated/kafka_sync_database.html @@ -37,6 +37,10 @@
--ignore_incompatible
It is default false, in this case, if MySQL table name exists in Paimon and their schema is incompatible,an exception will be thrown. You can specify it to true explicitly to ignore the incompatible tables and exception. + +
--table_mapping
+ The table name mapping between source database and Paimon. For example, if you want to synchronize a source table named "test" to a Paimon table named "paimon_test", you can specify "--table_mapping test=paimon_test". Multiple mappings could be specified with multiple "--table_mapping" options. "--table_mapping" has higher priority than "--table_prefix" and "--table_suffix". +
--table_prefix
The prefix of all Paimon tables to be synchronized. For example, if you want all synchronized tables to have "ods_" as prefix, you can specify "--table_prefix ods_". diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java index 8f96022bde35..83891c90b8e1 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/CdcActionCommonUtils.java @@ -56,6 +56,7 @@ public class CdcActionCommonUtils { public static final String PULSAR_CONF = "pulsar_conf"; public static final String TABLE_PREFIX = "table_prefix"; public static final String TABLE_SUFFIX = "table_suffix"; + public static final String TABLE_MAPPING = "table_mapping"; public static final String INCLUDING_TABLES = "including_tables"; public static final String EXCLUDING_TABLES = "excluding_tables"; public static final String TYPE_MAPPING = "type_mapping"; diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java index 4ab56bdcf118..ac3483ac23bf 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java @@ -52,6 +52,7 @@ public abstract class SyncDatabaseActionBase extends SynchronizationActionBase { protected MultiTablesSinkMode mode = COMBINED; protected String tablePrefix = ""; protected String tableSuffix = ""; + protected Map tableMapping = new HashMap<>(); protected String includingTables = ".*"; protected List partitionKeys = new ArrayList<>(); protected List primaryKeys = new ArrayList<>(); @@ -97,6 +98,13 @@ public SyncDatabaseActionBase withTableSuffix(@Nullable String tableSuffix) { return this; } + public SyncDatabaseActionBase withTableMapping(Map tableMapping) { + if (tableMapping != null) { + this.tableMapping = tableMapping; + } + return this; + } + public SyncDatabaseActionBase includingTables(@Nullable String includingTables) { if (includingTables != null) { this.includingTables = includingTables; @@ -155,7 +163,8 @@ protected EventParser.Factory buildEventParserFactory() Pattern excludingPattern = excludingTables == null ? null : Pattern.compile(excludingTables); TableNameConverter tableNameConverter = - new TableNameConverter(allowUpperCase, mergeShards, tablePrefix, tableSuffix); + new TableNameConverter( + allowUpperCase, mergeShards, tablePrefix, tableSuffix, tableMapping); Set createdTables; try { createdTables = new HashSet<>(catalog.listTables(database)); diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java index e7a386979d4e..2135f2a28112 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionFactoryBase.java @@ -29,6 +29,7 @@ import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.MULTIPLE_TABLE_PARTITION_KEYS; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PRIMARY_KEYS; +import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_MAPPING; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX; import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING; @@ -51,6 +52,7 @@ public Optional create(MultipleParameterToolAdapter params) { protected void withParams(MultipleParameterToolAdapter params, T action) { action.withTablePrefix(params.get(TABLE_PREFIX)) .withTableSuffix(params.get(TABLE_SUFFIX)) + .withTableMapping(optionalConfigMap(params, TABLE_MAPPING)) .includingTables(params.get(INCLUDING_TABLES)) .excludingTables(params.get(EXCLUDING_TABLES)) .withPartitionKeyMultiple( diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java index 67c70aa58cdb..4eca8b903ed1 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/TableNameConverter.java @@ -21,6 +21,8 @@ import org.apache.paimon.catalog.Identifier; import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; /** Used to convert a MySQL source table name to corresponding Paimon table name. */ public class TableNameConverter implements Serializable { @@ -31,20 +33,31 @@ public class TableNameConverter implements Serializable { private final boolean mergeShards; private final String prefix; private final String suffix; + private final Map tableMapping; public TableNameConverter(boolean caseSensitive) { - this(caseSensitive, true, "", ""); + this(caseSensitive, true, "", "", null); } public TableNameConverter( - boolean caseSensitive, boolean mergeShards, String prefix, String suffix) { + boolean caseSensitive, + boolean mergeShards, + String prefix, + String suffix, + Map tableMapping) { this.caseSensitive = caseSensitive; this.mergeShards = mergeShards; this.prefix = prefix; this.suffix = suffix; + this.tableMapping = lowerMapKey(tableMapping); } public String convert(String originName) { + if (tableMapping.containsKey(originName.toLowerCase())) { + String mappedName = tableMapping.get(originName.toLowerCase()); + return caseSensitive ? mappedName : mappedName.toLowerCase(); + } + String tableName = caseSensitive ? originName : originName.toLowerCase(); return prefix + tableName + suffix; } @@ -58,4 +71,18 @@ public String convert(Identifier originIdentifier) { + originIdentifier.getObjectName(); return convert(rawName); } + + private Map lowerMapKey(Map map) { + int size = map == null ? 0 : map.size(); + Map lowerKeyMap = new HashMap<>(size); + if (size == 0) { + return lowerKeyMap; + } + + for (String key : map.keySet()) { + lowerKeyMap.put(key.toLowerCase(), map.get(key)); + } + + return lowerKeyMap; + } } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java index f8ea8cdc4438..235b3f9a3235 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseAction.java @@ -138,7 +138,8 @@ protected void beforeBuildingSourceSink() throws Exception { + ", or MySQL database does not exist."); TableNameConverter tableNameConverter = - new TableNameConverter(allowUpperCase, mergeShards, tablePrefix, tableSuffix); + new TableNameConverter( + allowUpperCase, mergeShards, tablePrefix, tableSuffix, tableMapping); for (JdbcTableInfo tableInfo : jdbcTableInfos) { Identifier identifier = Identifier.create( diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/TableNameConverterTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/TableNameConverterTest.java new file mode 100644 index 000000000000..dfbe32e3d398 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/TableNameConverterTest.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +/** Tests for {@link TableNameConverter}. */ +public class TableNameConverterTest { + + @Test + public void testConvertTableName() { + Map tableMapping = new HashMap<>(1); + tableMapping.put("mapped_src", "mapped_TGT"); + TableNameConverter caseConverter = + new TableNameConverter(true, true, "pre_", "_pos", tableMapping); + Assert.assertEquals(caseConverter.convert("mapped_SRC"), "mapped_TGT"); + + Assert.assertEquals(caseConverter.convert("unmapped_src"), "pre_unmapped_src_pos"); + + TableNameConverter noCaseConverter = + new TableNameConverter(false, true, "pre_", "_pos", tableMapping); + Assert.assertEquals(noCaseConverter.convert("mapped_src"), "mapped_tgt"); + Assert.assertEquals(noCaseConverter.convert("unmapped_src"), "pre_unmapped_src_pos"); + } +}