From a2c4976da520efc2f0da768f1b9be5ff6df526b3 Mon Sep 17 00:00:00 2001 From: laihui <1353307710@qq.com> Date: Sat, 10 Aug 2024 19:37:03 +0800 Subject: [PATCH] [fix](routine load) fix enclose and escape can not set in routine load job (#38402) --- .../doris/analysis/AlterRoutineLoadStmt.java | 8 +++++ .../doris/analysis/CreateRoutineLoadStmt.java | 30 ++++++++++++++++++- .../doris/analysis/DataDescription.java | 19 +++++++++++- .../org/apache/doris/analysis/LoadStmt.java | 4 +++ .../load/routineload/RoutineLoadJob.java | 9 +++++- 5 files changed, 67 insertions(+), 3 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java index 8493ab25661b08..21dc4dec651460 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterRoutineLoadStmt.java @@ -66,6 +66,8 @@ public class AlterRoutineLoadStmt extends DdlStmt { .add(CreateRoutineLoadStmt.PARTIAL_COLUMNS) .add(LoadStmt.STRICT_MODE) .add(LoadStmt.TIMEZONE) + .add(LoadStmt.KEY_ENCLOSE) + .add(LoadStmt.KEY_ESCAPE) .build(); private final LabelName labelName; @@ -242,6 +244,12 @@ private void checkJobProperties() throws UserException { analyzedJobProperties.put(CreateRoutineLoadStmt.PARTIAL_COLUMNS, String.valueOf(isPartialUpdate)); } + if (jobProperties.containsKey(LoadStmt.KEY_ENCLOSE)) { + analyzedJobProperties.put(LoadStmt.KEY_ENCLOSE, jobProperties.get(LoadStmt.KEY_ENCLOSE)); + } + if (jobProperties.containsKey(LoadStmt.KEY_ESCAPE)) { + analyzedJobProperties.put(LoadStmt.KEY_ESCAPE, jobProperties.get(LoadStmt.KEY_ESCAPE)); + } } private void checkDataSourceProperties() throws UserException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index c825b6d653936c..0e2ed01c6c290f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -138,6 +138,8 @@ public class CreateRoutineLoadStmt extends DdlStmt { .add(SEND_BATCH_PARALLELISM) .add(LOAD_TO_SINGLE_TABLET) .add(PARTIAL_COLUMNS) + .add(LoadStmt.KEY_ENCLOSE) + .add(LoadStmt.KEY_ESCAPE) .build(); private final LabelName labelName; @@ -174,7 +176,8 @@ public class CreateRoutineLoadStmt extends DdlStmt { private boolean stripOuterArray = false; private boolean numAsString = false; private boolean fuzzyParse = false; - + private byte enclose; + private byte escape; /** * support partial columns load(Only Unique Key Columns) */ @@ -302,6 +305,14 @@ public String getJsonPaths() { return jsonPaths; } + public byte getEnclose() { + return enclose; + } + + public byte getEscape() { + return escape; + } + public String getJsonRoot() { return jsonRoot; } @@ -486,6 +497,23 @@ private void checkJobProperties() throws UserException { RoutineLoadJob.DEFAULT_LOAD_TO_SINGLE_TABLET, LoadStmt.LOAD_TO_SINGLE_TABLET + " should be a boolean"); + String encloseStr = jobProperties.get(LoadStmt.KEY_ENCLOSE); + if (encloseStr != null) { + if (encloseStr.length() != 1) { + throw new AnalysisException("enclose must be single-char"); + } else { + enclose = encloseStr.getBytes()[0]; + } + } + String escapeStr = jobProperties.get(LoadStmt.KEY_ESCAPE); + if (escapeStr != null) { + if (escapeStr.length() != 1) { + throw new AnalysisException("enclose must be single-char"); + } else { + escape = escapeStr.getBytes()[0]; + } + } + if (ConnectContext.get() != null) { timezone = ConnectContext.get().getSessionVariable().getTimeZone(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java index 618c80df5c0e32..f7944eca68e705 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -51,6 +51,7 @@ import org.apache.logging.log4j.Logger; import java.io.StringReader; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -862,7 +863,7 @@ private void analyzeMultiLoadColumns() throws AnalysisException { return; } String columnsSQL = "COLUMNS (" + columnDef + ")"; - SqlParser parser = new SqlParser(new SqlScanner(new StringReader(columnsSQL))); + SqlParser parser = new SqlParser(new org.apache.doris.analysis.SqlScanner(new StringReader(columnsSQL))); ImportColumnsStmt columnsStmt; try { columnsStmt = (ImportColumnsStmt) SqlParserUtils.getFirstStmt(parser); @@ -998,6 +999,22 @@ private void analyzeProperties() throws AnalysisException { if (analysisMap.containsKey(LoadStmt.KEY_SKIP_LINES)) { skipLines = Integer.parseInt(analysisMap.get(LoadStmt.KEY_SKIP_LINES)); } + if (analysisMap.containsKey(LoadStmt.KEY_ENCLOSE)) { + String encloseProp = analysisMap.get(LoadStmt.KEY_ENCLOSE); + if (encloseProp.length() == 1) { + enclose = encloseProp.getBytes(StandardCharsets.UTF_8)[0]; + } else { + throw new AnalysisException("enclose must be single-char"); + } + } + if (analysisMap.containsKey(LoadStmt.KEY_ESCAPE)) { + String escapeProp = analysisMap.get(LoadStmt.KEY_ESCAPE); + if (escapeProp.length() == 1) { + escape = escapeProp.getBytes(StandardCharsets.UTF_8)[0]; + } else { + throw new AnalysisException("escape must be single-char"); + } + } } private void checkLoadPriv(String fullDbName) throws AnalysisException { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java index fab011af528e9e..773d54ee56d018 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LoadStmt.java @@ -125,6 +125,10 @@ public class LoadStmt extends DdlStmt { public static final String KEY_COMMENT = "comment"; + public static final String KEY_ENCLOSE = "enclose"; + + public static final String KEY_ESCAPE = "escape"; + private final LabelName label; private final List dataDescriptions; private final BrokerDesc brokerDesc; diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index 58eff713b4530a..57aa84e773137d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -350,7 +350,6 @@ protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException { this.isPartialUpdate = true; } jobProperties.put(CreateRoutineLoadStmt.MAX_FILTER_RATIO_PROPERTY, String.valueOf(maxFilterRatio)); - if (Strings.isNullOrEmpty(stmt.getFormat()) || stmt.getFormat().equals("csv")) { jobProperties.put(PROPS_FORMAT, "csv"); } else if (stmt.getFormat().equals("json")) { @@ -384,6 +383,14 @@ protected void setOptional(CreateRoutineLoadStmt stmt) throws UserException { } else { jobProperties.put(PROPS_FUZZY_PARSE, "false"); } + if (String.valueOf(stmt.getEnclose()) != null) { + this.enclose = stmt.getEnclose(); + jobProperties.put(LoadStmt.KEY_ENCLOSE, String.valueOf(stmt.getEnclose())); + } + if (String.valueOf(stmt.getEscape()) != null) { + this.escape = stmt.getEscape(); + jobProperties.put(LoadStmt.KEY_ESCAPE, String.valueOf(stmt.getEscape())); + } } private void setRoutineLoadDesc(RoutineLoadDesc routineLoadDesc) {