Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/content/cdc-ingestion/kafka-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,9 @@ To use this feature through `flink run`, run the following shell command.
--warehouse <warehouse-path> \
--database <database-name> \
[--table_mapping <table-name>=<paimon-table-name>] \
[--table_prefix_db <paimon-table-prefix-by-db>] \
[--table_prefix <paimon-table-prefix>] \
[--table_suffix_db <paimon-table-suffix-by-db>] \
[--table_suffix <paimon-table-suffix>] \
[--including_tables <table-name|name-regular-expr>] \
[--excluding_tables <table-name|name-regular-expr>] \
Expand Down
12 changes: 10 additions & 2 deletions docs/layouts/shortcodes/generated/kafka_sync_database.html
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,21 @@
<td><h5>--table_mapping</h5></td>
<td>The table name mapping between source database and Paimon. For example, if you want to synchronize a source table named "test" to a Paimon table named "paimon_test", you can specify "--table_mapping test=paimon_test". Multiple mappings could be specified with multiple "--table_mapping" options. "--table_mapping" has higher priority than "--table_prefix" and "--table_suffix".</td>
</tr>
<tr>
<td><h5>--table_prefix_db</h5></td>
<td>The prefix of the Paimon tables to be synchronized from the specified db. For example, if you want to prefix the tables from db1 with "ods_db1_", you can specify "--table_prefix_db db1=ods_db1_". "--table_prefix_db" has higher priority than "--table_prefix".</td>
</tr>
<tr>
<td><h5>--table_prefix</h5></td>
<td>The prefix of all Paimon tables to be synchronized. For example, if you want all synchronized tables to have "ods_" as prefix, you can specify "--table_prefix ods_".</td>
<td>The prefix of all Paimon tables to be synchronized except those specified by "--table_mapping" or "--table_prefix_db". For example, if you want all synchronized tables to have "ods_" as prefix, you can specify "--table_prefix ods_".</td>
</tr>
<tr>
<td><h5>--table_suffix_db</h5></td>
<td>The suffix of the Paimon tables to be synchronized from the specified db. The usage is same as "--table_prefix_db".</td>
</tr>
<tr>
<td><h5>--table_suffix</h5></td>
<td>The suffix of all Paimon tables to be synchronized. The usage is same as "--table_prefix".</td>
<td>The suffix of all Paimon tables to be synchronized except those specified by "--table_mapping" or "--table_suffix_db". The usage is same as "--table_prefix".</td>
</tr>
<tr>
<td><h5>--including_tables</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public class CdcActionCommonUtils {
public static final String PULSAR_CONF = "pulsar_conf";
public static final String TABLE_PREFIX = "table_prefix";
public static final String TABLE_SUFFIX = "table_suffix";
public static final String TABLE_PREFIX_DB = "table_prefix_db";
public static final String TABLE_SUFFIX_DB = "table_suffix_db";
public static final String TABLE_MAPPING = "table_mapping";
public static final String INCLUDING_TABLES = "including_tables";
public static final String EXCLUDING_TABLES = "excluding_tables";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ public abstract class SyncDatabaseActionBase extends SynchronizationActionBase {
protected String tablePrefix = "";
protected String tableSuffix = "";
protected Map<String, String> tableMapping = new HashMap<>();
protected Map<String, String> dbPrefix = new HashMap<>();
protected Map<String, String> dbSuffix = new HashMap<>();
protected String includingTables = ".*";
protected List<String> partitionKeys = new ArrayList<>();
protected List<String> primaryKeys = new ArrayList<>();
Expand Down Expand Up @@ -98,6 +100,30 @@ public SyncDatabaseActionBase withTableSuffix(@Nullable String tableSuffix) {
return this;
}

public SyncDatabaseActionBase withDbPrefix(Map<String, String> dbPrefix) {
if (dbPrefix != null) {
this.dbPrefix =
dbPrefix.entrySet().stream()
.collect(
HashMap::new,
(m, e) -> m.put(e.getKey().toLowerCase(), e.getValue()),
HashMap::putAll);
}
return this;
}

public SyncDatabaseActionBase withDbSuffix(Map<String, String> dbSuffix) {
if (dbSuffix != null) {
this.dbSuffix =
dbSuffix.entrySet().stream()
.collect(
HashMap::new,
(m, e) -> m.put(e.getKey().toLowerCase(), e.getValue()),
HashMap::putAll);
}
return this;
}

public SyncDatabaseActionBase withTableMapping(Map<String, String> tableMapping) {
if (tableMapping != null) {
this.tableMapping = tableMapping;
Expand Down Expand Up @@ -164,7 +190,13 @@ protected EventParser.Factory<RichCdcMultiplexRecord> buildEventParserFactory()
excludingTables == null ? null : Pattern.compile(excludingTables);
TableNameConverter tableNameConverter =
new TableNameConverter(
allowUpperCase, mergeShards, tablePrefix, tableSuffix, tableMapping);
allowUpperCase,
mergeShards,
dbPrefix,
dbSuffix,
tablePrefix,
tableSuffix,
tableMapping);
Set<String> createdTables;
try {
createdTables = new HashSet<>(catalog.listTables(database));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PRIMARY_KEYS;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_MAPPING;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_PREFIX_DB;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TABLE_SUFFIX_DB;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.TYPE_MAPPING;

/** Base {@link ActionFactory} for synchronizing into database. */
Expand All @@ -52,6 +54,8 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
protected void withParams(MultipleParameterToolAdapter params, T action) {
action.withTablePrefix(params.get(TABLE_PREFIX))
.withTableSuffix(params.get(TABLE_SUFFIX))
.withDbPrefix(optionalConfigMap(params, TABLE_PREFIX_DB))
.withDbSuffix(optionalConfigMap(params, TABLE_SUFFIX_DB))
.withTableMapping(optionalConfigMap(params, TABLE_MAPPING))
.includingTables(params.get(INCLUDING_TABLES))
.excludingTables(params.get(EXCLUDING_TABLES))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class TableNameConverter implements Serializable {

private final boolean caseSensitive;
private final boolean mergeShards;
private final Map<String, String> dbPrefix;
private final Map<String, String> dbSuffix;
private final String prefix;
private final String suffix;
private final Map<String, String> tableMapping;
Expand All @@ -45,21 +47,54 @@ public TableNameConverter(
String prefix,
String suffix,
Map<String, String> tableMapping) {
this(
caseSensitive,
mergeShards,
new HashMap<>(),
new HashMap<>(),
prefix,
suffix,
tableMapping);
}

public TableNameConverter(
boolean caseSensitive,
boolean mergeShards,
Map<String, String> dbPrefix,
Map<String, String> dbSuffix,
String prefix,
String suffix,
Map<String, String> tableMapping) {
this.caseSensitive = caseSensitive;
this.mergeShards = mergeShards;
this.dbPrefix = dbPrefix;
this.dbSuffix = dbSuffix;
this.prefix = prefix;
this.suffix = suffix;
this.tableMapping = lowerMapKey(tableMapping);
}

public String convert(String originName) {
if (tableMapping.containsKey(originName.toLowerCase())) {
String mappedName = tableMapping.get(originName.toLowerCase());
public String convert(String originDbName, String originTblName) {
// top priority: table mapping
if (tableMapping.containsKey(originTblName.toLowerCase())) {
String mappedName = tableMapping.get(originTblName.toLowerCase());
return caseSensitive ? mappedName : mappedName.toLowerCase();
}

String tableName = caseSensitive ? originName : originName.toLowerCase();
return prefix + tableName + suffix;
String tblPrefix = prefix;
String tblSuffix = suffix;

// second priority: prefix and postfix specified by db
if (dbPrefix.containsKey(originDbName.toLowerCase())) {
tblPrefix = dbPrefix.get(originDbName.toLowerCase());
}
if (dbSuffix.containsKey(originDbName.toLowerCase())) {
tblSuffix = dbSuffix.get(originDbName.toLowerCase());
}

// third priority: normal prefix and suffix
String tableName = caseSensitive ? originTblName : originTblName.toLowerCase();
return tblPrefix + tableName + tblSuffix;
}

public String convert(Identifier originIdentifier) {
Expand All @@ -69,7 +104,7 @@ public String convert(Identifier originIdentifier) {
: originIdentifier.getDatabaseName()
+ "_"
+ originIdentifier.getObjectName();
return convert(rawName);
return convert(originIdentifier.getDatabaseName(), rawName);
}

private Map<String, String> lowerMapKey(Map<String, String> map) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ protected void beforeBuildingSourceSink() throws Exception {
for (JdbcTableInfo tableInfo : jdbcTableInfos) {
Identifier identifier =
Identifier.create(
database, tableNameConverter.convert(tableInfo.toPaimonTableName()));
database,
tableNameConverter.convert("", tableInfo.toPaimonTableName()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

database?

Copy link
Contributor Author

@JackeyLee007 JackeyLee007 Dec 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I changed nothing but supply default value for the first argument of the "convert" method.
In this context, the prefix and suffix are not specified by database, so it's ok to provide an empty string as the argument value.

FileStoreTable table;
Schema fromMySql =
CdcActionCommonUtils.buildPaimonSchema(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,47 @@ public void testConvertTableName() {
tableMapping.put("mapped_src", "mapped_TGT");
TableNameConverter caseConverter =
new TableNameConverter(true, true, "pre_", "_pos", tableMapping);
Assert.assertEquals(caseConverter.convert("mapped_SRC"), "mapped_TGT");
Assert.assertEquals(caseConverter.convert("", "mapped_SRC"), "mapped_TGT");

Assert.assertEquals(caseConverter.convert("unmapped_src"), "pre_unmapped_src_pos");
Assert.assertEquals(caseConverter.convert("", "unmapped_src"), "pre_unmapped_src_pos");

TableNameConverter noCaseConverter =
new TableNameConverter(false, true, "pre_", "_pos", tableMapping);
Assert.assertEquals(noCaseConverter.convert("mapped_src"), "mapped_tgt");
Assert.assertEquals(noCaseConverter.convert("unmapped_src"), "pre_unmapped_src_pos");
Assert.assertEquals(noCaseConverter.convert("", "mapped_src"), "mapped_tgt");
Assert.assertEquals(noCaseConverter.convert("", "unmapped_src"), "pre_unmapped_src_pos");
}

@Test
public void testConvertTableNameByDBPrefix_Suffix() {
Map<String, String> dbPrefix = new HashMap<>(2);
dbPrefix.put("db_with_prefix", "db_pref_");
dbPrefix.put("db_with_prefix_suffix", "db_pref_");

Map<String, String> dbSuffix = new HashMap<>(2);
dbSuffix.put("db_with_suffix", "_db_suff");
dbSuffix.put("db_with_prefix_suffix", "_db_suff");

TableNameConverter tblNameConverter =
new TableNameConverter(false, true, dbPrefix, dbSuffix, "pre_", "_suf", null);

// Tables in the specified db should have the specified prefix and suffix.

// db prefix + normal suffix
Assert.assertEquals(
"db_pref_table_name_suf", tblNameConverter.convert("db_with_prefix", "table_name"));

// normal prefix + db suffix
Assert.assertEquals(
"pre_table_name_db_suff", tblNameConverter.convert("db_with_suffix", "table_name"));

// db prefix + db suffix
Assert.assertEquals(
"db_pref_table_name_db_suff",
tblNameConverter.convert("db_with_prefix_suffix", "table_name"));

// only normal prefix and suffix
Assert.assertEquals(
"pre_table_name_suf",
tblNameConverter.convert("db_without_prefix_suffix", "table_name"));
}
}
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,7 @@ under the License.
<exclude>release/**</exclude>
<!-- antlr grammar files -->
<exclude>paimon-common/src/main/antlr4/**</exclude>
<exclude>paimon-core/src/test/resources/compatibility/**</exclude>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why add this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not, the rat-plugin always report license warning when building locally as the following:
[WARNING] Files with unapproved licenses:
paimon-core/src/test/resources/compatibility/datasplit-v1
paimon-core/src/test/resources/compatibility/datasplit-v2

</excludes>
</configuration>
</plugin>
Expand Down
Loading