From c84a1c47aec27119009b854e3beffea9a6974166 Mon Sep 17 00:00:00 2001 From: "liming.1018" Date: Mon, 6 Jan 2025 18:46:30 +0800 Subject: [PATCH] [flink][action] add '`' to the fields of merge into action to avoid exceptions when the field name is an SQL keyword. --- .../paimon/flink/action/MergeIntoAction.java | 36 ++++++++++++++++--- .../flink/action/MergeIntoActionITCase.java | 23 ++++++++++++ 2 files changed, 55 insertions(+), 4 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java index 1ecd23ea6246..7254999797c3 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/MergeIntoAction.java @@ -23,6 +23,7 @@ import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.types.DataField; import org.apache.paimon.types.DataType; +import org.apache.paimon.utils.StringUtils; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.table.api.Table; @@ -77,6 +78,8 @@ public class MergeIntoAction extends TableActionBase { private static final Logger LOG = LoggerFactory.getLogger(MergeIntoAction.class); + public static final String IDENTIFIER_QUOTE = "`"; + // primary keys of target table private final List primaryKeys; @@ -333,7 +336,7 @@ private Optional> getMatchedUpsertDataStream() { String query = String.format( "SELECT %s FROM %s INNER JOIN %s ON %s %s", - String.join(",", project), + String.join(",", normalizeFieldName(project)), escapedTargetName(), escapedSourceName(), mergeCondition, @@ -377,7 +380,7 @@ private Optional> getNotMatchedUpsertDataStream() { String query = String.format( "SELECT %s FROM %s WHERE NOT EXISTS (SELECT * FROM %s WHERE %s) %s", - String.join(",", project), + String.join(",", normalizeFieldName(project)), escapedTargetName(), escapedSourceName(), mergeCondition, @@ -408,7 +411,7 @@ private Optional> getMatchedDeleteDataStream() { String query = String.format( "SELECT %s FROM %s INNER JOIN %s ON %s %s", - String.join(",", project), + String.join(",", normalizeFieldName(project)), escapedTargetName(), escapedSourceName(), mergeCondition, @@ -430,7 +433,7 @@ private Optional> getNotMatchedDeleteDataStream() { String query = String.format( "SELECT %s FROM %s WHERE NOT EXISTS (SELECT * FROM %s WHERE %s) %s", - String.join(",", targetFieldNames), + String.join(",", normalizeFieldName(targetFieldNames)), escapedTargetName(), escapedSourceName(), mergeCondition, @@ -519,4 +522,29 @@ private String escapedSourceName() { .map(s -> String.format("`%s`", s)) .collect(Collectors.joining(".")); } + + private List normalizeFieldName(List fieldNames) { + return fieldNames.stream().map(this::normalizeFieldName).collect(Collectors.toList()); + } + + private String normalizeFieldName(String fieldName) { + if (StringUtils.isNullOrWhitespaceOnly(fieldName) || fieldName.endsWith(IDENTIFIER_QUOTE)) { + return fieldName; + } + + String[] splitFieldNames = fieldName.split("\\."); + if (!targetFieldNames.contains(splitFieldNames[splitFieldNames.length - 1])) { + return fieldName; + } + + return String.join( + ".", + Arrays.stream(splitFieldNames) + .map( + part -> + part.endsWith(IDENTIFIER_QUOTE) + ? part + : IDENTIFIER_QUOTE + part + IDENTIFIER_QUOTE) + .toArray(String[]::new)); + } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java index 3907c0398532..25b4055465b4 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/MergeIntoActionITCase.java @@ -688,6 +688,29 @@ public void testNotMatchedBySourceDelete(boolean qualified, String invoker) thro } } + @Test + public void testSqlWithKeywordCase() throws Exception { + // drop table S + sEnv.executeSql("DROP TABLE T"); + sEnv.executeSql( + buildDdl( + "T", + Arrays.asList("k INT", "`language` STRING", "dt STRING"), + Arrays.asList("k", "dt"), + Collections.singletonList("dt"), + Collections.emptyMap())); + insertInto("T", "(1, 'v_1', '02-27')", "(13, 'v_13', '02-29')"); + + MergeIntoActionBuilder action = new MergeIntoActionBuilder(warehouse, database, "T"); + action.withSourceTable("S") + .withMergeCondition("T.k = S.k AND T.dt = S.dt") + .withMatchedDelete("S.k < 12"); + + List batchExpected = Arrays.asList(changelogRow("+I", 13, "v_13", "02-29")); + action.build().run(); + testBatchRead(buildSimpleQuery("T"), batchExpected); + } + private void validateActionRunResult( MergeIntoAction action, List streamingExpected, List batchExpected) throws Exception {