diff --git a/docs/documentation/cn/administrator-guide/load-data/broker-load-manual.md b/docs/documentation/cn/administrator-guide/load-data/broker-load-manual.md index 4c21c217980115..bdcb4026f947f1 100644 --- a/docs/documentation/cn/administrator-guide/load-data/broker-load-manual.md +++ b/docs/documentation/cn/administrator-guide/load-data/broker-load-manual.md @@ -142,6 +142,10 @@ Label 的另一个作用,是防止用户重复导入相同的数据。**强烈 在 ```data_desc``` 中可以指定待导入表的 partition 信息,如果待导入数据不属于指定的 partition 则不会被导入。同时,不在指定 Partition 的数据会被认为是错误数据。 ++ set column mapping + + 在 ```data_desc``` 中的 SET 语句负责设置列函数变换,这里的列函数变换支持所有查询的等值表达式变换。如果原始数据的列和表中的列不一一对应,就需要用到这个属性。 + #### 导入作业参数 导入作业参数主要指的是 Broker load 创建导入语句中的属于 ```opt_properties```部分的参数。导入作业参数是作用于整个导入作业的。 diff --git a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md index b6b445a1b3ceeb..977320a0d03c4a 100644 --- a/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md +++ b/docs/documentation/cn/sql-reference/sql-statements/Data Manipulation/BROKER LOAD.md @@ -75,38 +75,10 @@ SET: - 如果指定此参数,可以将源文件某一列按照函数进行转化,然后将转化后的结果导入到table中。 - 目前支持的函数有: - - strftime(fmt, column) 日期转换函数 - fmt: 日期格式,形如%Y%m%d%H%i%S (年月日时分秒) - column: column_list中的列,即输入文件中的列。存储内容应为数字型的时间戳。 - 如果没有column_list,则按照palo表的列顺序默认输入文件的列。 - 注意:数字型的时间戳单位为秒。 - - time_format(output_fmt, input_fmt, column) 日期格式转化 - output_fmt: 转化后的日期格式,形如%Y%m%d%H%i%S (年月日时分秒) - input_fmt: 转化前column列的日期格式,形如%Y%m%d%H%i%S (年月日时分秒) - column: column_list中的列,即输入文件中的列。存储内容应为input_fmt格式的日期字符串。 - 如果没有column_list,则按照palo表的列顺序默认输入文件的列。 - - alignment_timestamp(precision, column) 将时间戳对齐到指定精度 - precision: year|month|day|hour - column: column_list中的列,即输入文件中的列。存储内容应为数字型的时间戳。 - 如果没有column_list,则按照palo表的列顺序默认输入文件的列。 - 注意:对齐精度为year、month的时候,只支持20050101~20191231范围内的时间戳。 - - default_value(value) 设置某一列导入的默认值 - 不指定则使用建表时列的默认值 - - md5sum(column1, column2, ...) 将指定的导入列的值求md5sum,返回32位16进制字符串 - - replace_value(old_value[, new_value]) 将导入文件中指定的old_value替换为new_value - new_value如不指定则使用建表时列的默认值 - - hll_hash(column) 用于将表或数据里面的某一列转化成HLL列的数据结构 - - now() 设置某一列导入的数据为导入执行的时间点。该列必须为 DATE/DATETIME 类型。 + 如果指定此参数,可以将源文件某一列按照函数进行转化,然后将转化后的结果导入到table中。语法为 `column_name` = expression。举几个例子帮助理解。 + 例1: 表中有3个列“c1, c2, c3", 源文件中前两列依次对应(c1,c2),后两列之和对应c3;那么需要指定 columns (c1,c2,tmp_c3,tmp_c4) SET (c3=tmp_c3+tmp_c4); + 例2: 表中有3个列“year, month, day"三个列,源文件中只有一个时间列,为”2018-06-01 01:02:03“格式。 + 那么可以指定 columns(tmp_time) set (year = year(tmp_time), month=month(tmp_time), day=day(tmp_time)) 完成导入。 3. broker_name @@ -274,28 +246,20 @@ 6. 从 BOS 导入一批数据,指定分区, 并对导入文件的列做一些转化,如下: 表结构为: - k1 datetime - k2 date - k3 bigint - k4 varchar(20) - k5 varchar(64) - k6 int + k1 varchar(20) + k2 int 假设数据文件只有一行数据: - 1537002087,2018-08-09 11:12:13,1537002087,-,1 + Adele,1,1 数据文件中各列,对应导入语句中指定的各列: - tmp_k1, tmp_k2, tmp_k3, k6, v1 + k1,tmp_k2,tmp_k3 转换如下: - 1) k1:将 tmp_k1 时间戳列转化为 datetime 类型的数据 - 2) k2:将 tmp_k2 datetime 类型的数据转化为 date 的数据 - 3) k3:将 tmp_k3 时间戳列转化为天级别时间戳 - 4) k4:指定导入默认值为1 - 5) k5:将 tmp_k1、tmp_k2、tmp_k3 列计算 md5 值 - 6) k6:将导入文件中的 - 值替换为 10 + 1) k1: 不变换 + 2) k2:是 tmp_k2 和 tmp_k3 数据之和 LOAD LABEL example_db.label6 ( @@ -303,14 +267,9 @@ INTO TABLE `my_table` PARTITION (p1, p2) COLUMNS TERMINATED BY "," - (tmp_k1, tmp_k2, tmp_k3, k6, v1) + (k1, tmp_k2, tmp_k3) SET ( - k1 = strftime("%Y-%m-%d %H:%i:%S", tmp_k1), - k2 = time_format("%Y-%m-%d %H:%i:%S", "%Y-%m-%d", tmp_k2), - k3 = alignment_timestamp("day", tmp_k3), - k4 = default_value("1"), - k5 = md5sum(tmp_k1, tmp_k2, tmp_k3), - k6 = replace_value("-", "10") + k2 = tmp_k2 + tmp_k3 ) ) WITH BROKER my_bos_broker diff --git a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java index 0ec39039ab14af..37f38a58e06515 100644 --- a/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/CreateRoutineLoadStmt.java @@ -220,7 +220,7 @@ public void analyze(Analyzer analyzer) throws UserException { // check name FeNameFormat.checkCommonName(NAME_TYPE, name); // check load properties include column separator etc. - checkLoadProperties(analyzer); + checkLoadProperties(); // check routine load job properties include desired concurrent number etc. checkJobProperties(); // check data source properties @@ -236,7 +236,7 @@ public void checkDBTable(Analyzer analyzer) throws AnalysisException { } } - public void checkLoadProperties(Analyzer analyzer) throws UserException { + public void checkLoadProperties() throws UserException { if (loadPropertyList == null) { return; } diff --git a/fe/src/main/java/org/apache/doris/analysis/DataDescription.java b/fe/src/main/java/org/apache/doris/analysis/DataDescription.java index 981ba82567e785..0623a38ea30928 100644 --- a/fe/src/main/java/org/apache/doris/analysis/DataDescription.java +++ b/fe/src/main/java/org/apache/doris/analysis/DataDescription.java @@ -17,12 +17,6 @@ package org.apache.doris.analysis; -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.base.Strings; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; import org.apache.doris.analysis.BinaryPredicate.Operator; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; @@ -34,12 +28,21 @@ import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TNetworkAddress; + +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Strings; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeSet; // used to describe data info which is needed to import. // // data_desc: @@ -49,15 +52,27 @@ // [PARTITION (p1, p2)] // [COLUMNS TERMINATED BY separator] // [FORMAT AS format] -// [(col1, ...)] -// [SET (k1=f1(xx), k2=f2(xx))] +// [(tmp_col1, tmp_col2, col3, ...)] +// [SET (k1=f1(xx), k2=f2(xxx))] + +/** + * The transform of columns should be added after the keyword named COLUMNS. + * The transform after the keyword named SET is the old ways which only supports the hadoop function. + * It old way of transform will be removed gradually. It + */ public class DataDescription { private static final Logger LOG = LogManager.getLogger(DataDescription.class); public static String FUNCTION_HASH_HLL = "hll_hash"; + private static final List hadoopSupportFunctionName = Arrays.asList("strftime", "time_format", + "alignment_timestamp", + "default_value", "md5sum", + "replace_value", "now", + "hll_hash"); private final String tableName; private final List partitionNames; private final List filePaths; - private final List columnNames; + // the column name list of data desc + private final List columns; private final ColumnSeparator columnSeparator; private final String fileFormat; private final boolean isNegative; @@ -67,32 +82,21 @@ public class DataDescription { private TNetworkAddress beAddr; private String lineDelimiter; - private Map>> columnToFunction; - private Map parsedExprMap; + // This param only include the hadoop function which need to be checked in the future. + // For hadoop load, this param is also used to persistence. + private Map>> columnToHadoopFunction; + /** + * Merged from columns and columnMappingList + * ImportColumnDesc: column name to expr or null + **/ + private List parsedColumnExprList; - private boolean isPullLoad = false; - - public DataDescription(String tableName, - List partitionNames, - List filePaths, - List columnNames, - ColumnSeparator columnSeparator, - boolean isNegative, - List columnMappingList) { - this.tableName = tableName; - this.partitionNames = partitionNames; - this.filePaths = filePaths; - this.columnNames = columnNames; - this.columnSeparator = columnSeparator; - this.fileFormat = null; - this.isNegative = isNegative; - this.columnMappingList = columnMappingList; - } + private boolean isHadoopLoad = false; public DataDescription(String tableName, List partitionNames, List filePaths, - List columnNames, + List columns, ColumnSeparator columnSeparator, String fileFormat, boolean isNegative, @@ -100,7 +104,7 @@ public DataDescription(String tableName, this.tableName = tableName; this.partitionNames = partitionNames; this.filePaths = filePaths; - this.columnNames = columnNames; + this.columns = columns; this.columnSeparator = columnSeparator; this.fileFormat = fileFormat; this.isNegative = isNegative; @@ -119,8 +123,12 @@ public List getFilePaths() { return filePaths; } + // only return the column names of SlotRef in columns public List getColumnNames() { - return columnNames; + if (columns == null || columns.isEmpty()) { + return null; + } + return columns; } public String getFileFormat() { @@ -159,127 +167,124 @@ public void addColumnMapping(String functionName, Pair> pai if (Strings.isNullOrEmpty(functionName) || pair == null) { return; } - if (columnToFunction == null) { - columnToFunction = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + if (columnToHadoopFunction == null) { + columnToHadoopFunction = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); } - columnToFunction.put(functionName, pair); + columnToHadoopFunction.put(functionName, pair); } - public Map>> getColumnMapping() { - if (columnMappingList == null && columnToFunction == null) { - return null; - } - - return columnToFunction; + public Map>> getColumnToHadoopFunction() { + return columnToHadoopFunction; } - public List getColumnMappingList() { - return columnMappingList; + public List getParsedColumnExprList() { + return parsedColumnExprList; } - public Map getParsedExprMap() { - return parsedExprMap; + public void setIsHadoopLoad(boolean isHadoopLoad) { + this.isHadoopLoad = isHadoopLoad; } - public void setIsPullLoad(boolean isPullLoad) { - this.isPullLoad = isPullLoad; + public boolean isHadoopLoad() { + return isHadoopLoad; } - public boolean isPullLoad() { - return isPullLoad; - } - - private void checkColumnInfo() throws AnalysisException { - if (columnNames == null || columnNames.isEmpty()) { + /** + * Analyze parsedExprMap and columnToHadoopFunction from columns and columnMappingList + * Example: columns (col1, tmp_col2, tmp_col3) set (col2=tmp_col2+1, col3=strftime("%Y-%m-%d %H:%M:%S", tmp_col3)) + * Result: parsedExprMap = {"col1": null, "tmp_col2": null, "tmp_col3": null, + * "col2": "tmp_col2+1", "col3": "strftime("%Y-%m-%d %H:%M:%S", tmp_col3)"} + */ + private void analyzeColumns() throws AnalysisException { + if (columns == null || columns.isEmpty()) { return; } - Set columnSet = Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER); - for (String col : columnNames) { - if (!columnSet.add(col)) { - ErrorReport.reportAnalysisException(ErrorCode.ERR_DUP_FIELDNAME, col); + // merge columns exprs from columns and columnMappingList + // used to check duplicated column name + Set columnNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); + parsedColumnExprList = Lists.newArrayList(); + // Step1: analyze columns + for (String columnName : columns) { + if (!columnNames.add(columnName)) { + throw new AnalysisException("Duplicate column : " + columnName); } + ImportColumnDesc importColumnDesc = new ImportColumnDesc(columnName, null); + parsedColumnExprList.add(importColumnDesc); } - } - private void checkColumnMapping() throws AnalysisException { + if (columnMappingList == null || columnMappingList.isEmpty()) { return; } + // Step2: analyze column mapping + // the column expr only support the SlotRef or eq binary predicate which's child(0) must be a SloRef. + // the duplicate column name of SloRef is forbidden. + columnToHadoopFunction = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); + for (Expr columnExpr : columnMappingList) { - columnToFunction = Maps.newTreeMap(String.CASE_INSENSITIVE_ORDER); - parsedExprMap = Maps.newHashMap(); - for (Expr expr : columnMappingList) { - if (!(expr instanceof BinaryPredicate)) { - throw new AnalysisException("Mapping function expr error. expr: " + expr.toSql()); + if (!(columnExpr instanceof BinaryPredicate)) { + throw new AnalysisException("Mapping function expr only support the column or eq binary predicate. " + + "Expr: " + columnExpr.toSql()); } - - BinaryPredicate predicate = (BinaryPredicate) expr; + BinaryPredicate predicate = (BinaryPredicate) columnExpr; if (predicate.getOp() != Operator.EQ) { - throw new AnalysisException("Mapping function operator error. op: " + predicate.getOp()); + throw new AnalysisException("Mapping function expr only support the column or eq binary predicate. " + + "The mapping operator error, op: " + predicate.getOp()); } - Expr child0 = predicate.getChild(0); if (!(child0 instanceof SlotRef)) { - throw new AnalysisException("Mapping column error. column: " + child0.toSql()); + throw new AnalysisException("Mapping function expr only support the column or eq binary predicate. " + + "The mapping column error. column: " + child0.toSql()); } - String column = ((SlotRef) child0).getColumnName(); - if (columnToFunction.containsKey(column)) { + if (!columnNames.add(column)) { throw new AnalysisException("Duplicate column mapping: " + column); } - - // we support function and column reference to change a column name + // hadoop load only supports the FunctionCallExpr Expr child1 = predicate.getChild(1); - if (!(child1 instanceof FunctionCallExpr)) { - if (isPullLoad && child1 instanceof SlotRef) { - // we only support SlotRef in pull load - } else { - throw new AnalysisException("Mapping function error, function: " + child1.toSql()); - } - } - - if (!child1.supportSerializable()) { - throw new AnalysisException("Expr do not support serializable." + child1.toSql()); + if (isHadoopLoad && !(child1 instanceof FunctionCallExpr)) { + throw new AnalysisException("Hadoop load only supports the designated function. " + + "The error mapping function is:" + child1.toSql()); } + ImportColumnDesc importColumnDesc = new ImportColumnDesc(column, child1); + parsedColumnExprList.add(importColumnDesc); + analyzeColumnToHadoopFunction(column, child1); - parsedExprMap.put(column, child1); - - if (!(child1 instanceof FunctionCallExpr)) { - // only just for pass later check - columnToFunction.put(column, Pair.create("__slot_ref", Lists.newArrayList())); - continue; - } + } + } - FunctionCallExpr functionCallExpr = (FunctionCallExpr) child1; - String functionName = functionCallExpr.getFnName().getFunction(); - List paramExprs = functionCallExpr.getParams().exprs(); - List args = Lists.newArrayList(); - for (Expr paramExpr : paramExprs) { - if (paramExpr instanceof SlotRef) { - SlotRef slot = (SlotRef) paramExpr; - args.add(slot.getColumnName()); - } else if (paramExpr instanceof StringLiteral) { - StringLiteral literal = (StringLiteral) paramExpr; - args.add(literal.getValue()); - } else if (paramExpr instanceof NullLiteral) { - args.add(null); - } else { - if (isPullLoad) { - continue; - } else { - throw new AnalysisException("Mapping function args error, arg: " + paramExpr.toSql()); - } + private void analyzeColumnToHadoopFunction(String columnName, Expr child1) throws AnalysisException { + FunctionCallExpr functionCallExpr = (FunctionCallExpr) child1; + String functionName = functionCallExpr.getFnName().getFunction(); + if (!hadoopSupportFunctionName.contains(functionName.toLowerCase())) { + return; + } + List paramExprs = functionCallExpr.getParams().exprs(); + List args = Lists.newArrayList(); + for (Expr paramExpr : paramExprs) { + if (paramExpr instanceof SlotRef) { + SlotRef slot = (SlotRef) paramExpr; + args.add(slot.getColumnName()); + } else if (paramExpr instanceof StringLiteral) { + StringLiteral literal = (StringLiteral) paramExpr; + args.add(literal.getValue()); + } else if (paramExpr instanceof NullLiteral) { + args.add(null); + } else { + if (isHadoopLoad) { + throw new AnalysisException("Mapping function args error, arg: " + paramExpr.toSql()); } + continue; } - - Pair> functionPair = new Pair>(functionName, args); - columnToFunction.put(column, functionPair); } + + Pair> functionPair = new Pair>(functionName, args); + columnToHadoopFunction.put(columnName, functionPair); } public static void validateMappingFunction(String functionName, List args, Map columnNameMap, - Column mappingColumn, boolean isPullLoad) throws AnalysisException { + Column mappingColumn, boolean isHadoopLoad) throws AnalysisException { if (functionName.equalsIgnoreCase("alignment_timestamp")) { validateAlignmentTimestamp(args, columnNameMap); } else if (functionName.equalsIgnoreCase("strftime")) { @@ -297,9 +302,7 @@ public static void validateMappingFunction(String functionName, List arg } else if (functionName.equalsIgnoreCase("now")) { validateNowFunction(mappingColumn); } else { - if (isPullLoad) { - return; - } else { + if (isHadoopLoad) { throw new AnalysisException("Unknown function: " + functionName); } } @@ -434,7 +437,7 @@ private static void validateNowFunction(Column mappingColumn) throws AnalysisExc } } - public void analyze(String fullDbName) throws AnalysisException { + private void checkLoadPriv(String fullDbName) throws AnalysisException { if (Strings.isNullOrEmpty(tableName)) { throw new AnalysisException("No table name in load statement."); } @@ -446,7 +449,14 @@ public void analyze(String fullDbName) throws AnalysisException { ConnectContext.get().getQualifiedUser(), ConnectContext.get().getRemoteIP(), tableName); } + } + + public void analyze(String fullDbName) throws AnalysisException { + checkLoadPriv(fullDbName); + analyzeWithoutCheckPriv(); + } + public void analyzeWithoutCheckPriv() throws AnalysisException { if (filePaths == null || filePaths.isEmpty()) { throw new AnalysisException("No file path in load statement."); } @@ -458,8 +468,7 @@ public void analyze(String fullDbName) throws AnalysisException { columnSeparator.analyze(); } - checkColumnInfo(); - checkColumnMapping(); + analyzeColumns(); } public String toSql() { @@ -482,9 +491,9 @@ public String apply(String s) { if (columnSeparator != null) { sb.append(" COLUMNS TERMINATED BY ").append(columnSeparator.toSql()); } - if (columnNames != null && !columnNames.isEmpty()) { + if (columns != null && !columns.isEmpty()) { sb.append(" ("); - Joiner.on(", ").appendTo(sb, columnNames).append(")"); + Joiner.on(", ").appendTo(sb, columns).append(")"); } if (columnMappingList != null && !columnMappingList.isEmpty()) { sb.append(" SET ("); diff --git a/fe/src/main/java/org/apache/doris/analysis/Expr.java b/fe/src/main/java/org/apache/doris/analysis/Expr.java index fa78d078c0aa60..f9771babeaa45b 100644 --- a/fe/src/main/java/org/apache/doris/analysis/Expr.java +++ b/fe/src/main/java/org/apache/doris/analysis/Expr.java @@ -1512,6 +1512,12 @@ public static void writeTo(Expr expr, DataOutput output) throws IOException { expr.write(output); } + /** + * The expr result may be null + * @param in + * @return + * @throws IOException + */ public static Expr readIn(DataInput in) throws IOException { int code = in.readInt(); ExprSerCode exprSerCode = ExprSerCode.fromCode(code); diff --git a/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java b/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java index 3cc97c12ec9b99..99a2923549bc1a 100644 --- a/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java +++ b/fe/src/main/java/org/apache/doris/analysis/LoadStmt.java @@ -17,10 +17,6 @@ package org.apache.doris.analysis; -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.UserException; @@ -28,6 +24,11 @@ import org.apache.doris.load.Load; import org.apache.doris.qe.ConnectContext; +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; + import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -214,8 +215,8 @@ public void analyze(Analyzer analyzer) throws UserException { throw new AnalysisException("No data file in load statement."); } for (DataDescription dataDescription : dataDescriptions) { - if (brokerDesc != null) { - dataDescription.setIsPullLoad(true); + if (brokerDesc == null) { + dataDescription.setIsHadoopLoad(true); } dataDescription.analyze(label.getDbName()); } diff --git a/fe/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/src/main/java/org/apache/doris/catalog/Catalog.java index b2ce8282efabcf..0cc9aa638ae7e4 100644 --- a/fe/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/src/main/java/org/apache/doris/catalog/Catalog.java @@ -1083,7 +1083,7 @@ private void transferToMaster() throws IOException { LoadChecker.startAll(); // New load scheduler - loadManager.submitJobs(); + loadManager.prepareJobs(); loadJobScheduler.start(); loadTimeoutChecker.start(); diff --git a/fe/src/main/java/org/apache/doris/common/FeConstants.java b/fe/src/main/java/org/apache/doris/common/FeConstants.java index 084a5b642cc9ab..95683e20ac3693 100644 --- a/fe/src/main/java/org/apache/doris/common/FeConstants.java +++ b/fe/src/main/java/org/apache/doris/common/FeConstants.java @@ -35,5 +35,5 @@ public class FeConstants { // general model // Current meta data version. Use this version to write journals and image - public static int meta_version = FeMetaVersion.VERSION_57; + public static int meta_version = FeMetaVersion.VERSION_58; } diff --git a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java index 62acd73006cdd5..b5bdb2a42d9edd 100644 --- a/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -124,4 +124,6 @@ public final class FeMetaVersion { public static final int VERSION_56 = 56; // for base index using different id public static final int VERSION_57 = 57; + // broker load support function, persist origin stmt in broker load + public static final int VERSION_58 = 58; } diff --git a/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java b/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java index 075134e3cd1f77..baf25b0cbe1d17 100644 --- a/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java +++ b/fe/src/main/java/org/apache/doris/load/BrokerFileGroup.java @@ -17,11 +17,10 @@ package org.apache.doris.load; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; import org.apache.doris.analysis.ColumnSeparator; import org.apache.doris.analysis.DataDescription; import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.ImportColumnDesc; import org.apache.doris.catalog.BrokerTable; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; @@ -33,6 +32,10 @@ import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -60,11 +63,14 @@ public class BrokerFileGroup implements Writable { private String fileFormat; private boolean isNegative; private List partitionIds; + // this is a compatible param which only happens before the function of broker has been supported. private List fileFieldNames; private List filePaths; - // This column need expression to get column + // this is a compatible param which only happens before the function of broker has been supported. private Map exprColumnMap; + // this param will be recreated by data desc when the log replay + private List columnExprList; // Used for recovery from edit log private BrokerFileGroup() { @@ -82,7 +88,8 @@ public BrokerFileGroup(BrokerTable table) throws AnalysisException { public BrokerFileGroup(DataDescription dataDescription) { this.dataDescription = dataDescription; - exprColumnMap = dataDescription.getParsedExprMap(); + this.exprColumnMap = null; + this.columnExprList = dataDescription.getParsedColumnExprList(); } // NOTE: DBLock will be held @@ -116,11 +123,6 @@ public void parse(Database db) throws DdlException { } } - // fileFieldNames - if (dataDescription.getColumnNames() != null) { - fileFieldNames = Lists.newArrayList(dataDescription.getColumnNames()); - } - // column valueSeparator = dataDescription.getColumnSeparator(); if (valueSeparator == null) { @@ -163,10 +165,6 @@ public boolean isNegative() { return isNegative; } - public List getFileFieldNames() { - return fileFieldNames; - } - public List getPartitionIds() { return partitionIds; } @@ -175,8 +173,8 @@ public List getFilePaths() { return filePaths; } - public Map getExprColumnMap() { - return exprColumnMap; + public List getColumnExprList() { + return columnExprList; } @Override @@ -257,7 +255,7 @@ public void write(DataOutput out) throws IOException { for (String path : filePaths) { Text.writeString(out, path); } - // expr column map + // expr column map will be null after broker load supports function if (exprColumnMap == null) { out.writeInt(0); } else { @@ -328,6 +326,23 @@ public void readFields(DataInput in) throws IOException { fileFormat = Text.readString(in); } } + + // There are no columnExprList in the previous load job which is created before function is supported. + // The columnExprList could not be analyzed without origin stmt in the previous load job. + // So, the columnExprList need to be merged in here. + if (fileFieldNames == null || fileFieldNames.isEmpty()) { + return; + } + columnExprList = Lists.newArrayList(); + for (String columnName : fileFieldNames) { + columnExprList.add(new ImportColumnDesc(columnName, null)); + } + if (exprColumnMap == null || exprColumnMap.isEmpty()) { + return; + } + for (Map.Entry columnExpr : exprColumnMap.entrySet()) { + columnExprList.add(new ImportColumnDesc(columnExpr.getKey(), columnExpr.getValue())); + } } public static BrokerFileGroup read(DataInput in) throws IOException { diff --git a/fe/src/main/java/org/apache/doris/load/Load.java b/fe/src/main/java/org/apache/doris/load/Load.java index 536e8dd14a4974..5aa11dd32a6f71 100644 --- a/fe/src/main/java/org/apache/doris/load/Load.java +++ b/fe/src/main/java/org/apache/doris/load/Load.java @@ -299,8 +299,9 @@ public boolean addLoadJob(TMiniLoadRequest request) throws DdlException { formatType = params.get(LoadStmt.KEY_IN_PARAM_FORMAT_TYPE); } - DataDescription dataDescription = new DataDescription(tableName, partitionNames, filePaths, columnNames, - columnSeparator, formatType, false, null); + DataDescription dataDescription = new DataDescription(tableName, partitionNames, filePaths, + columnNames, + columnSeparator, formatType, false, null); dataDescription.setLineDelimiter(lineDelimiter); dataDescription.setBeAddr(beAddr); // parse hll param pair @@ -363,56 +364,6 @@ public void addLoadJob(LoadStmt stmt, EtlJobType etlJobType, long timestamp) thr addLoadJob(job, db); } - // for insert select from or create as stmt - public void addLoadJob(String label, String dbName, - long tableId, Map indexIdToSchemaHash, - long transactionId, - List fileList, long timestamp) throws DdlException { - // get db and table - Database db = Catalog.getInstance().getDb(dbName); - if (db == null) { - throw new DdlException("Database[" + dbName + "] does not exist"); - } - - OlapTable table = null; - db.readLock(); - try { - table = (OlapTable) db.getTable(tableId); - } finally { - db.readUnlock(); - } - if (table == null) { - throw new DdlException("Table[" + tableId + "] does not exist"); - } - - // create job - DataDescription desc = new DataDescription(table.getName(), null, Lists.newArrayList(""), - null, null, false, null); - LoadStmt stmt = new LoadStmt(new LabelName(dbName, label), Lists.newArrayList(desc), null, null, null); - LoadJob job = createLoadJob(stmt, EtlJobType.INSERT, db, timestamp); - - // add schema hash - for (Map.Entry entry : indexIdToSchemaHash.entrySet()) { - job.getTableLoadInfo(tableId).addIndexSchemaHash(entry.getKey(), entry.getValue()); - } - - // file size use -1 temporarily - Map fileMap = Maps.newHashMap(); - for (String filePath : fileList) { - fileMap.put(filePath, -1L); - } - - // update job info to etl finish - EtlStatus status = job.getEtlJobStatus(); - status.setState(TEtlState.FINISHED); - status.setFileMap(fileMap); - job.setState(JobState.ETL); - job.setTransactionId(transactionId); - - // add load job - addLoadJob(job, db); - } - // This is the final step of all addLoadJob() methods private void addLoadJob(LoadJob job, Database db) throws DdlException { // check cluster capacity @@ -722,7 +673,7 @@ public static void checkAndCreateSource(Database db, DataDescription dataDescrip source.setColumnNames(columnNames); // check default value - Map>> assignColumnToFunction = dataDescription.getColumnMapping(); + Map>> assignColumnToFunction = dataDescription.getColumnToHadoopFunction(); for (Column column : tableSchema) { String columnName = column.getName(); if (columnNames.contains(columnName)) { @@ -784,7 +735,7 @@ public static void checkAndCreateSource(Database db, DataDescription dataDescrip Pair> function = entry.getValue(); try { DataDescription.validateMappingFunction(function.first, function.second, columnNameMap, - mappingColumn, dataDescription.isPullLoad()); + mappingColumn, dataDescription.isHadoopLoad()); } catch (AnalysisException e) { throw new DdlException(e.getMessage()); } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 3f7d7044ff90bc..cd0e091b0ed2b8 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -21,6 +21,8 @@ import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.DataDescription; import org.apache.doris.analysis.LoadStmt; +import org.apache.doris.analysis.SqlParser; +import org.apache.doris.analysis.SqlScanner; import org.apache.doris.catalog.AuthorizationInfo; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; @@ -29,9 +31,11 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.Config; import org.apache.doris.common.DdlException; +import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.LabelAlreadyUsedException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.UserException; +import org.apache.doris.common.io.Text; import org.apache.doris.common.util.LogBuilder; import org.apache.doris.common.util.LogKey; import org.apache.doris.load.BrokerFileGroup; @@ -54,6 +58,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.io.StringReader; import java.util.List; import java.util.Map; import java.util.Set; @@ -72,6 +77,10 @@ public class BrokerLoadJob extends LoadJob { // input params private List dataDescriptions = Lists.newArrayList(); private BrokerDesc brokerDesc; + // this param is used to persist the expr of columns + // the origin stmt is persisted instead of columns expr + // the expr of columns will be reanalyze when the log is replayed + private String originStmt; // include broker desc and data desc private PullLoadSourceInfo dataSourceInfo = new PullLoadSourceInfo(); @@ -83,17 +92,19 @@ public BrokerLoadJob() { this.jobType = EtlJobType.BROKER; } - public BrokerLoadJob(long dbId, String label, BrokerDesc brokerDesc, List dataDescriptions) + public BrokerLoadJob(long dbId, String label, BrokerDesc brokerDesc, List dataDescriptions, + String originStmt) throws MetaNotFoundException { super(dbId, label); this.timeoutSecond = Config.broker_load_default_timeout_second; this.dataDescriptions = dataDescriptions; this.brokerDesc = brokerDesc; + this.originStmt = originStmt; this.jobType = EtlJobType.BROKER; this.authorizationInfo = gatherAuthInfo(); } - public static BrokerLoadJob fromLoadStmt(LoadStmt stmt) throws DdlException { + public static BrokerLoadJob fromLoadStmt(LoadStmt stmt, String originStmt) throws DdlException { // get db id String dbName = stmt.getLabel().getDbName(); Database db = Catalog.getCurrentCatalog().getDb(stmt.getLabel().getDbName()); @@ -106,7 +117,8 @@ public static BrokerLoadJob fromLoadStmt(LoadStmt stmt) throws DdlException { // create job try { BrokerLoadJob brokerLoadJob = new BrokerLoadJob(db.getId(), stmt.getLabel().getLabelName(), - stmt.getBrokerDesc(), stmt.getDataDescriptions()); + stmt.getBrokerDesc(), stmt.getDataDescriptions(), + originStmt); brokerLoadJob.setJobProperties(stmt.getProperties()); brokerLoadJob.setDataSourceInfo(db, stmt.getDataDescriptions()); return brokerLoadJob; @@ -237,6 +249,38 @@ public void onTaskFailed(long taskId, FailMsg failMsg) { logFinalOperation(); } + /** + * If the db or table could not be found, the Broker load job will be cancelled. + */ + @Override + public void analyze() { + if (originStmt == null) { + return; + } + // Reset dataSourceInfo, it will be re-created in analyze + dataSourceInfo = new PullLoadSourceInfo(); + SqlParser parser = new SqlParser(new SqlScanner(new StringReader(originStmt))); + LoadStmt stmt = null; + try { + stmt = (LoadStmt) parser.parse().value; + for (DataDescription dataDescription : stmt.getDataDescriptions()) { + dataDescription.analyzeWithoutCheckPriv(); + } + Database db = Catalog.getCurrentCatalog().getDb(dbId); + if (db == null) { + throw new DdlException("Database[" + dbId + "] does not exist"); + } + setDataSourceInfo(db, stmt.getDataDescriptions()); + } catch (Exception e) { + LOG.info(new LogBuilder(LogKey.LOAD_JOB, id) + .add("origin_stmt", originStmt) + .add("msg", "The failure happens in analyze, the load job will be cancelled with error:" + + e.getMessage()) + .build(), e); + cancelJobWithoutCheck(new FailMsg(FailMsg.CancelType.LOAD_RUN_FAIL, e.getMessage()), false); + } + } + /** * step1: divide job into loading task * step2: init the plan of task @@ -441,6 +485,7 @@ public void write(DataOutput out) throws IOException { super.write(out); brokerDesc.write(out); dataSourceInfo.write(out); + Text.writeString(out, originStmt); } @Override @@ -448,6 +493,13 @@ public void readFields(DataInput in) throws IOException { super.readFields(in); brokerDesc = BrokerDesc.read(in); dataSourceInfo.readFields(in); + + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_58) { + originStmt = Text.readString(in); + } + // The origin stmt does not be analyzed in here. + // The reason is that it will thrown MetaNotFoundException when the tableId could not be found by tableName. + // The origin stmt will be analyzed after the replay is completed. } } diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java index be262fbb854296..4aba2d2a6722d6 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/BrokerLoadPendingTask.java @@ -45,6 +45,7 @@ public BrokerLoadPendingTask(BrokerLoadJob loadTaskCallback, Map> tableToBrokerFileList, BrokerDesc brokerDesc) { super(loadTaskCallback); + this.retryTime = 3; this.attachment = new BrokerPendingTaskAttachment(signature); this.tableToBrokerFileList = tableToBrokerFileList; this.brokerDesc = brokerDesc; diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java index 41f7f35ab07ef4..bf637998b4ad74 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadJob.java @@ -773,6 +773,13 @@ public void onTaskFinished(TaskAttachment attachment) { public void onTaskFailed(long taskId, FailMsg failMsg) { } + // This analyze will be invoked after the replay is finished. + // The edit log of LoadJob saves the origin param which is not analyzed. + // So, the re-analyze must be invoked between the replay is finished and LoadJobScheduler is started. + // Only, the PENDING load job need to be analyzed. + public void analyze() { + } + @Override public boolean equals(Object obj) { if (obj == this) { diff --git a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java index df88a0c0051451..c6022a514dbd74 100644 --- a/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java +++ b/fe/src/main/java/org/apache/doris/load/loadv2/LoadManager.java @@ -90,7 +90,7 @@ public LoadManager(LoadJobScheduler loadJobScheduler) { * @param stmt * @throws DdlException */ - public void createLoadJobFromStmt(LoadStmt stmt) throws DdlException { + public void createLoadJobFromStmt(LoadStmt stmt, String originStmt) throws DdlException { Database database = checkDb(stmt.getLabel().getDbName()); long dbId = database.getId(); LoadJob loadJob = null; @@ -104,7 +104,7 @@ public void createLoadJobFromStmt(LoadStmt stmt) throws DdlException { throw new DdlException("There are more then " + Config.desired_max_waiting_jobs + " load jobs in waiting queue, " + "please retry later."); } - loadJob = BrokerLoadJob.fromLoadStmt(stmt); + loadJob = BrokerLoadJob.fromLoadStmt(stmt, originStmt); createLoadJob(loadJob); } finally { writeUnlock(); @@ -315,12 +315,6 @@ public void replayEndLoadJob(LoadJobFinalOperation operation) { .build()); } - public List getLoadJobByState(JobState jobState) { - return idToLoadJob.values().stream() - .filter(entity -> entity.getState() == jobState) - .collect(Collectors.toList()); - } - public int getLoadJobNum(JobState jobState, long dbId) { readLock(); try { @@ -454,13 +448,22 @@ public void getLoadJobInfo(Load.JobInfo info) throws DdlException { } } - public void submitJobs() { + public void prepareJobs() { + analyzeLoadJobs(); + submitJobs(); + } + + private void submitJobs() { loadJobScheduler.submitJob(idToLoadJob.values().stream().filter( loadJob -> loadJob.state == JobState.PENDING).collect(Collectors.toList())); } - private Map getIdToLoadJobs() { - return idToLoadJob; + private void analyzeLoadJobs() { + for (LoadJob loadJob : idToLoadJob.values()) { + if (loadJob.getState() == JobState.PENDING) { + loadJob.analyze(); + } + } } private Database checkDb(String dbName) throws DdlException { diff --git a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java index ccf187956543d4..fd9b79cd85218d 100644 --- a/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java +++ b/fe/src/main/java/org/apache/doris/load/routineload/RoutineLoadJob.java @@ -1165,7 +1165,7 @@ public void readFields(DataInput in) throws IOException { CreateRoutineLoadStmt stmt = null; try { stmt = (CreateRoutineLoadStmt) parser.parse().value; - stmt.checkLoadProperties(null); + stmt.checkLoadProperties(); setRoutineLoadDesc(stmt.getRoutineLoadDesc()); } catch (Exception e) { throw new IOException("error happens when parsing create routine load stmt: " + origStmt, e); diff --git a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java index 1463a0d8149295..3cda72adf7425d 100644 --- a/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java +++ b/fe/src/main/java/org/apache/doris/planner/BrokerScanNode.java @@ -26,6 +26,7 @@ import org.apache.doris.analysis.FunctionCallExpr; import org.apache.doris.analysis.FunctionName; import org.apache.doris.analysis.FunctionParams; +import org.apache.doris.analysis.ImportColumnDesc; import org.apache.doris.analysis.IntLiteral; import org.apache.doris.analysis.NullLiteral; import org.apache.doris.analysis.SlotDescriptor; @@ -77,7 +78,6 @@ import java.util.List; import java.util.Map; import java.util.Random; -import java.util.stream.Collectors; // Broker scan node public class BrokerScanNode extends ScanNode { @@ -221,108 +221,6 @@ private void createPartitionInfos() throws AnalysisException { (OlapTable) targetTable, exprByName, null, partitionExprs); } - private void parseExprMap(Map exprMap) throws UserException { - if (exprMap == null) { - return; - } - for (Map.Entry entry : exprMap.entrySet()) { - String colName = entry.getKey(); - Expr originExpr = entry.getValue(); - - Column column = targetTable.getColumn(colName); - if (column == null) { - throw new UserException("Unknown column(" + colName + ")"); - } - - // To compatible with older load version - if (originExpr instanceof FunctionCallExpr) { - FunctionCallExpr funcExpr = (FunctionCallExpr) originExpr; - String funcName = funcExpr.getFnName().getFunction(); - - if (funcName.equalsIgnoreCase("replace_value")) { - List exprs = Lists.newArrayList(); - SlotRef slotRef = new SlotRef(null, entry.getKey()); - // We will convert this to IF(`col` != child0, `col`, child1), - // because we need the if return type equal to `col`, we use NE - // - exprs.add(new BinaryPredicate(BinaryPredicate.Operator.NE, slotRef, funcExpr.getChild(0))); - exprs.add(slotRef); - if (funcExpr.hasChild(1)) { - exprs.add(funcExpr.getChild(1)); - } else { - if (column.getDefaultValue() != null) { - exprs.add(new StringLiteral(column.getDefaultValue())); - } else { - if (column.isAllowNull()) { - exprs.add(NullLiteral.create(Type.VARCHAR)); - } else { - throw new UserException("Column(" + colName + ") has no default value."); - } - } - } - FunctionCallExpr newFn = new FunctionCallExpr("if", exprs); - entry.setValue(newFn); - } else if (funcName.equalsIgnoreCase("strftime")) { - FunctionName fromUnixName = new FunctionName("FROM_UNIXTIME"); - List fromUnixArgs = Lists.newArrayList(funcExpr.getChild(1)); - FunctionCallExpr fromUnixFunc = new FunctionCallExpr( - fromUnixName, new FunctionParams(false, fromUnixArgs)); - - entry.setValue(fromUnixFunc); - } else if (funcName.equalsIgnoreCase("time_format")) { - FunctionName strToDateName = new FunctionName("STR_TO_DATE"); - List strToDateExprs = Lists.newArrayList(funcExpr.getChild(2), funcExpr.getChild(1)); - FunctionCallExpr strToDateFuncExpr = new FunctionCallExpr( - strToDateName, new FunctionParams(false, strToDateExprs)); - - FunctionName dateFormatName = new FunctionName("DATE_FORMAT"); - List dateFormatArgs = Lists.newArrayList(strToDateFuncExpr, funcExpr.getChild(0)); - FunctionCallExpr dateFormatFunc = new FunctionCallExpr( - dateFormatName, new FunctionParams(false, dateFormatArgs)); - - entry.setValue(dateFormatFunc); - } else if (funcName.equalsIgnoreCase("alignment_timestamp")) { - FunctionName fromUnixName = new FunctionName("FROM_UNIXTIME"); - List fromUnixArgs = Lists.newArrayList(funcExpr.getChild(1)); - FunctionCallExpr fromUnixFunc = new FunctionCallExpr( - fromUnixName, new FunctionParams(false, fromUnixArgs)); - - StringLiteral precision = (StringLiteral) funcExpr.getChild(0); - StringLiteral format; - if (precision.getStringValue().equalsIgnoreCase("year")) { - format = new StringLiteral("%Y-01-01 00:00:00"); - } else if (precision.getStringValue().equalsIgnoreCase("month")) { - format = new StringLiteral("%Y-%m-01 00:00:00"); - } else if (precision.getStringValue().equalsIgnoreCase("day")) { - format = new StringLiteral("%Y-%m-%d 00:00:00"); - } else if (precision.getStringValue().equalsIgnoreCase("hour")) { - format = new StringLiteral("%Y-%m-%d %H:00:00"); - } else { - throw new UserException("Unknown precision(" + precision.getStringValue() + ")"); - } - FunctionName dateFormatName = new FunctionName("DATE_FORMAT"); - List dateFormatArgs = Lists.newArrayList(fromUnixFunc, format); - FunctionCallExpr dateFormatFunc = new FunctionCallExpr( - dateFormatName, new FunctionParams(false, dateFormatArgs)); - - FunctionName unixTimeName = new FunctionName("UNIX_TIMESTAMP"); - List unixTimeArgs = Lists.newArrayList(); - unixTimeArgs.add(dateFormatFunc); - FunctionCallExpr unixTimeFunc = new FunctionCallExpr( - unixTimeName, new FunctionParams(false, unixTimeArgs)); - - entry.setValue(unixTimeFunc); - } else if (funcName.equalsIgnoreCase("default_value")) { - entry.setValue(funcExpr.getChild(0)); - } else if (funcName.equalsIgnoreCase("now")) { - FunctionName nowFunctionName = new FunctionName("NOW"); - FunctionCallExpr newFunc = new FunctionCallExpr(nowFunctionName, new FunctionParams(null)); - entry.setValue(newFunc); - } - } - } - } - // Called from init, construct source tuple information private void initParams(ParamCreateContext context) throws AnalysisException, UserException { TBrokerScanRangeParams params = new TBrokerScanRangeParams(); @@ -341,75 +239,208 @@ private void initParams(ParamCreateContext context) throws AnalysisException, Us } params.setProperties(brokerDesc.getProperties()); + initColumns(context); + } - // We must create a new map here, because we will change this map later. - // But fileGroup will be persisted later, so we keep it unchanged. - if (fileGroup.getExprColumnMap() != null) { - context.exprMap = Maps.newHashMap(fileGroup.getExprColumnMap()); - } else { - context.exprMap = null; - } - parseExprMap(context.exprMap); + /** + * This method is used to calculate the slotDescByName and exprMap. + * The expr in exprMap is analyzed in this function. + * The smap of slot which belongs to expr will be analyzed by src desc. + * slotDescByName: the single slot from columns in load stmt + * exprMap: the expr from column mapping in load stmt. + * @param context + * @throws UserException + */ + private void initColumns(ParamCreateContext context) throws UserException { + // This tuple descriptor is used for origin file + TupleDescriptor srcTupleDesc = analyzer.getDescTbl().createTupleDescriptor(); + context.tupleDescriptor = srcTupleDesc; + Map slotDescByName = Maps.newHashMap(); + context.slotDescByName = slotDescByName; - // Generate expr - List fileFieldNames = fileGroup.getFileFieldNames(); - if (fileFieldNames == null) { - fileFieldNames = Lists.newArrayList(); + TBrokerScanRangeParams params = context.params; + // there are no columns transform + List originColumnNameToExprList = context.fileGroup.getColumnExprList(); + if (originColumnNameToExprList == null || originColumnNameToExprList.isEmpty()) { for (Column column : targetTable.getBaseSchema()) { - fileFieldNames.add(column.getName()); + SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(srcTupleDesc); + slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR)); + slotDesc.setIsMaterialized(true); + // ISSUE A: src slot should be nullable even if the column is not nullable. + // because src slot is what we read from file, not represent to real column value. + // If column is not nullable, error will be thrown when filling the dest slot, + // which is not nullable + slotDesc.setIsNullable(true); + slotDescByName.put(column.getName(), slotDesc); + params.addToSrc_slot_ids(slotDesc.getId().asInt()); } - } else { - // change fileFiledName to real column name(case match) - fileFieldNames = fileFieldNames.stream().map( - f -> targetTable.getColumn(f) == null ? f : targetTable.getColumn(f).getName()).collect( - Collectors.toList()); + params.setSrc_tuple_id(srcTupleDesc.getId().asInt()); + return; } - // This tuple descriptor is used for file of - TupleDescriptor srcTupleDesc = analyzer.getDescTbl().createTupleDescriptor(); - context.tupleDescriptor = srcTupleDesc; - - Map slotDescByName = Maps.newHashMap(); - context.slotDescByName = slotDescByName; - for (String fieldName : fileFieldNames) { - SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(srcTupleDesc); - slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR)); - slotDesc.setIsMaterialized(true); - slotDesc.setIsNullable(false); - slotDesc.setColumn(new Column(fieldName, PrimitiveType.VARCHAR)); - slotDescByName.put(fieldName, slotDesc); + // there are columns expr which belong to load + Map columnNameToExpr = Maps.newHashMap(); + context.exprMap = columnNameToExpr; + for (ImportColumnDesc originColumnNameToExpr : originColumnNameToExprList) { + // make column name case match with real column name + String columnName = originColumnNameToExpr.getColumnName(); + Expr columnExpr = originColumnNameToExpr.getExpr(); + String realColName = targetTable.getColumn(columnName) == null ? columnName + : targetTable.getColumn(columnName).getName(); + if (columnExpr != null) { + columnExpr = transformHadoopFunctionExpr(columnName, columnExpr); + columnNameToExpr.put(realColName, columnExpr); + } else { + SlotDescriptor slotDesc = analyzer.getDescTbl().addSlotDescriptor(srcTupleDesc); + slotDesc.setType(ScalarType.createType(PrimitiveType.VARCHAR)); + slotDesc.setIsMaterialized(true); + // same as ISSUE A + slotDesc.setIsNullable(true); + params.addToSrc_slot_ids(slotDesc.getId().asInt()); + slotDescByName.put(realColName, slotDesc); + } + } + // analyze all exprs + for (Map.Entry entry : columnNameToExpr.entrySet()) { + ExprSubstitutionMap smap = new ExprSubstitutionMap(); + List slots = Lists.newArrayList(); + entry.getValue().collect(SlotRef.class, slots); + for (SlotRef slot : slots) { + SlotDescriptor slotDesc = slotDescByName.get(slot.getColumnName()); + if (slotDesc == null) { + throw new UserException("unknown reference column, column=" + entry.getKey() + + ", reference=" + slot.getColumnName()); + } + smap.getLhs().add(slot); + smap.getRhs().add(new SlotRef(slotDesc)); + } + Expr expr = entry.getValue().clone(smap); + expr.analyze(analyzer); + + // check if contain aggregation + List funcs = Lists.newArrayList(); + expr.collect(FunctionCallExpr.class, funcs); + for (FunctionCallExpr fn : funcs) { + if (fn.isAggregateFunction()) { + throw new AnalysisException("Don't support aggregation function in load expression"); + } + } - params.addToSrc_slot_ids(slotDesc.getId().asInt()); + columnNameToExpr.put(entry.getKey(), expr); } params.setSrc_tuple_id(srcTupleDesc.getId().asInt()); + } - private void finalizeParams(ParamCreateContext context) throws UserException, AnalysisException { - Map slotDescByName = context.slotDescByName; - Map exprMap = context.exprMap; - Map destSidToSrcSidWithoutTrans = Maps.newHashMap(); - // Analyze expr map - if (exprMap != null) { - for (Map.Entry entry : exprMap.entrySet()) { - ExprSubstitutionMap smap = new ExprSubstitutionMap(); - - List slots = Lists.newArrayList(); - entry.getValue().collect(SlotRef.class, slots); - - for (SlotRef slot : slots) { - SlotDescriptor slotDesc = slotDescByName.get(slot.getColumnName()); - if (slotDesc == null) { - throw new UserException("Unknown slot"); + /** + * This method is used to transform hadoop function. + * The hadoop function includes: replace_value, strftime, time_format, alignment_timestamp, default_value, now. + * It rewrites those function with real function name and param. + * For the other function, the expr only go through this function and the origin expr is returned. + * @param columnName + * @param originExpr + * @return + * @throws UserException + */ + private Expr transformHadoopFunctionExpr(String columnName, Expr originExpr) throws UserException { + Column column = targetTable.getColumn(columnName); + if (column == null) { + throw new UserException("Unknown column(" + columnName + ")"); + } + + // To compatible with older load version + if (originExpr instanceof FunctionCallExpr) { + FunctionCallExpr funcExpr = (FunctionCallExpr) originExpr; + String funcName = funcExpr.getFnName().getFunction(); + + if (funcName.equalsIgnoreCase("replace_value")) { + List exprs = Lists.newArrayList(); + SlotRef slotRef = new SlotRef(null, columnName); + // We will convert this to IF(`col` != child0, `col`, child1), + // because we need the if return type equal to `col`, we use NE + // + exprs.add(new BinaryPredicate(BinaryPredicate.Operator.NE, slotRef, funcExpr.getChild(0))); + exprs.add(slotRef); + if (funcExpr.hasChild(1)) { + exprs.add(funcExpr.getChild(1)); + } else { + if (column.getDefaultValue() != null) { + exprs.add(new StringLiteral(column.getDefaultValue())); + } else { + if (column.isAllowNull()) { + exprs.add(NullLiteral.create(Type.VARCHAR)); + } else { + throw new UserException("Column(" + columnName + ") has no default value."); + } } - smap.getLhs().add(slot); - smap.getRhs().add(new SlotRef(slotDesc)); } - - Expr expr = entry.getValue().clone(smap); - expr.analyze(analyzer); - exprMap.put(entry.getKey(), expr); + FunctionCallExpr newFn = new FunctionCallExpr("if", exprs); + return newFn; + } else if (funcName.equalsIgnoreCase("strftime")) { + FunctionName fromUnixName = new FunctionName("FROM_UNIXTIME"); + List fromUnixArgs = Lists.newArrayList(funcExpr.getChild(1)); + FunctionCallExpr fromUnixFunc = new FunctionCallExpr( + fromUnixName, new FunctionParams(false, fromUnixArgs)); + + return fromUnixFunc; + } else if (funcName.equalsIgnoreCase("time_format")) { + FunctionName strToDateName = new FunctionName("STR_TO_DATE"); + List strToDateExprs = Lists.newArrayList(funcExpr.getChild(2), funcExpr.getChild(1)); + FunctionCallExpr strToDateFuncExpr = new FunctionCallExpr( + strToDateName, new FunctionParams(false, strToDateExprs)); + + FunctionName dateFormatName = new FunctionName("DATE_FORMAT"); + List dateFormatArgs = Lists.newArrayList(strToDateFuncExpr, funcExpr.getChild(0)); + FunctionCallExpr dateFormatFunc = new FunctionCallExpr( + dateFormatName, new FunctionParams(false, dateFormatArgs)); + + return dateFormatFunc; + } else if (funcName.equalsIgnoreCase("alignment_timestamp")) { + FunctionName fromUnixName = new FunctionName("FROM_UNIXTIME"); + List fromUnixArgs = Lists.newArrayList(funcExpr.getChild(1)); + FunctionCallExpr fromUnixFunc = new FunctionCallExpr( + fromUnixName, new FunctionParams(false, fromUnixArgs)); + + StringLiteral precision = (StringLiteral) funcExpr.getChild(0); + StringLiteral format; + if (precision.getStringValue().equalsIgnoreCase("year")) { + format = new StringLiteral("%Y-01-01 00:00:00"); + } else if (precision.getStringValue().equalsIgnoreCase("month")) { + format = new StringLiteral("%Y-%m-01 00:00:00"); + } else if (precision.getStringValue().equalsIgnoreCase("day")) { + format = new StringLiteral("%Y-%m-%d 00:00:00"); + } else if (precision.getStringValue().equalsIgnoreCase("hour")) { + format = new StringLiteral("%Y-%m-%d %H:00:00"); + } else { + throw new UserException("Unknown precision(" + precision.getStringValue() + ")"); + } + FunctionName dateFormatName = new FunctionName("DATE_FORMAT"); + List dateFormatArgs = Lists.newArrayList(fromUnixFunc, format); + FunctionCallExpr dateFormatFunc = new FunctionCallExpr( + dateFormatName, new FunctionParams(false, dateFormatArgs)); + + FunctionName unixTimeName = new FunctionName("UNIX_TIMESTAMP"); + List unixTimeArgs = Lists.newArrayList(); + unixTimeArgs.add(dateFormatFunc); + FunctionCallExpr unixTimeFunc = new FunctionCallExpr( + unixTimeName, new FunctionParams(false, unixTimeArgs)); + + return unixTimeFunc; + } else if (funcName.equalsIgnoreCase("default_value")) { + return funcExpr.getChild(0); + } else if (funcName.equalsIgnoreCase("now")) { + FunctionName nowFunctionName = new FunctionName("NOW"); + FunctionCallExpr newFunc = new FunctionCallExpr(nowFunctionName, new FunctionParams(null)); + return newFunc; } } + return originExpr; + } + + private void finalizeParams(ParamCreateContext context) throws UserException, AnalysisException { + Map slotDescByName = context.slotDescByName; + Map exprMap = context.exprMap; + Map destSidToSrcSidWithoutTrans = Maps.newHashMap(); boolean isNegative = context.fileGroup.isNegative(); for (SlotDescriptor destSlotDesc : desc.getSlots()) { @@ -444,6 +475,21 @@ private void finalizeParams(ParamCreateContext context) throws UserException, An } } + // check hll_hash + if (destSlotDesc.getType().getPrimitiveType() == PrimitiveType.HLL) { + if (!(expr instanceof FunctionCallExpr)) { + throw new AnalysisException("HLL column must use hll_hash function, like " + + destSlotDesc.getColumn().getName() + "=hll_hash(xxx)"); + } + FunctionCallExpr fn = (FunctionCallExpr) expr; + if (!fn.getFnName().getFunction().equalsIgnoreCase("hll_hash")) { + throw new AnalysisException("HLL column must use hll_hash function, like " + + destSlotDesc.getColumn().getName() + "=hll_hash(xxx)"); + } + expr.setType(Type.HLL); + } + + // analyze negative if (isNegative && destSlotDesc.getColumn().getAggregationType() == AggregateType.SUM) { expr = new ArithmeticExpr(ArithmeticExpr.Operator.MULTIPLY, expr, new IntLiteral(-1)); expr.analyze(analyzer); diff --git a/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java index 9ad683fcf933a0..e6364302e63bec 100644 --- a/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -118,7 +118,7 @@ public static void execute(Catalog catalog, DdlStmt ddlStmt, String origStmt) th if (loadStmt.getVersion().equals(Load.VERSION) || loadStmt.getBrokerDesc() == null) { catalog.getLoadManager().createLoadJobV1FromStmt(loadStmt, jobType, System.currentTimeMillis()); } else { - catalog.getLoadManager().createLoadJobFromStmt(loadStmt); + catalog.getLoadManager().createLoadJobFromStmt(loadStmt, origStmt); } } else if (ddlStmt instanceof CancelLoadStmt) { if (catalog.getLoadInstance().isLabelExist( diff --git a/fe/src/main/java/org/apache/doris/qe/MultiLoadMgr.java b/fe/src/main/java/org/apache/doris/qe/MultiLoadMgr.java index 3e7792666f0cde..36f2cb57c8b5d5 100644 --- a/fe/src/main/java/org/apache/doris/qe/MultiLoadMgr.java +++ b/fe/src/main/java/org/apache/doris/qe/MultiLoadMgr.java @@ -382,7 +382,7 @@ public DataDescription toDataDesc() throws DdlException { } DataDescription dataDescription = new DataDescription( - tbl, null, files, columns, columnSeparator, false, null); + tbl, null, files, columns, columnSeparator, null, false, null); dataDescription.setBeAddr(address); return dataDescription; diff --git a/fe/src/test/java/org/apache/doris/analysis/CreateUserStmtTest.java b/fe/src/test/java/org/apache/doris/analysis/CreateUserStmtTest.java index a21ffde426bf9d..70135a8112bc7d 100644 --- a/fe/src/test/java/org/apache/doris/analysis/CreateUserStmtTest.java +++ b/fe/src/test/java/org/apache/doris/analysis/CreateUserStmtTest.java @@ -19,6 +19,8 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.common.UserException; +import org.apache.doris.mysql.privilege.PaloAuth; +import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; import org.junit.Assert; @@ -27,6 +29,7 @@ import mockit.Expectations; import mockit.Injectable; +import mockit.Mocked; public class CreateUserStmtTest { @@ -42,12 +45,16 @@ public void setUp() { } @Test - public void testToString(@Injectable Analyzer analyzer) throws UserException, AnalysisException { + public void testToString(@Injectable Analyzer analyzer, + @Mocked PaloAuth paloAuth) throws UserException, AnalysisException { new Expectations() { { analyzer.getClusterName(); result = "testCluster"; + paloAuth.checkHasPriv((ConnectContext) any, PrivPredicate.GRANT, PaloAuth.PrivLevel.GLOBAL, PaloAuth + .PrivLevel.DATABASE); + result = true; } }; diff --git a/fe/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java b/fe/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java index 89d232c3d5399f..a01cac0c26f2b0 100644 --- a/fe/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java +++ b/fe/src/test/java/org/apache/doris/analysis/DataDescriptionTest.java @@ -31,6 +31,9 @@ import java.util.List; +import mockit.Deencapsulation; +import mockit.Expectations; +import mockit.Injectable; import mockit.Mocked; import mockit.internal.startup.Startup; @@ -54,20 +57,22 @@ public void setUp() { @Test public void testNormal() throws AnalysisException { DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), - null, null, false, null); + null, null, null, false, null); desc.analyze("testDb"); Assert.assertEquals("DATA INFILE ('abc.txt') INTO TABLE testTable", desc.toString()); - desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), null, null, true, null); + desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), null, null, null, + true, null); desc.analyze("testDb"); Assert.assertEquals("DATA INFILE ('abc.txt') NEGATIVE INTO TABLE testTable", desc.toString()); - desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt", "bcd.txt"), null, null, true, null); + desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt", "bcd.txt"), null, + null, null, true, null); desc.analyze("testDb"); Assert.assertEquals("DATA INFILE ('abc.txt', 'bcd.txt') NEGATIVE INTO TABLE testTable", desc.toString()); desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), - Lists.newArrayList("col1", "col2"), null, true, null); + Lists.newArrayList("col1", "col2"), null, null, true, null); desc.analyze("testDb"); Assert.assertEquals("DATA INFILE ('abc.txt') NEGATIVE INTO TABLE testTable (col1, col2)", desc.toString()); Assert.assertEquals("testTable", desc.getTableName()); @@ -77,7 +82,8 @@ public void testNormal() throws AnalysisException { Assert.assertNull(desc.getColumnSeparator()); desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt", "bcd.txt"), - Lists.newArrayList("col1", "col2"), new ColumnSeparator("\t"), true, null); + Lists.newArrayList("col1", "col2"), new ColumnSeparator("\t"), + null, true, null); desc.analyze("testDb"); Assert.assertEquals("DATA INFILE ('abc.txt', 'bcd.txt') NEGATIVE INTO TABLE testTable" + " COLUMNS TERMINATED BY '\t' (col1, col2)", @@ -85,15 +91,17 @@ public void testNormal() throws AnalysisException { // hive \x01 column separator desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt", "bcd.txt"), - Lists.newArrayList("col1", "col2"), new ColumnSeparator("\\x01"), true, null); + Lists.newArrayList("col1", "col2"), new ColumnSeparator("\\x01"), + null, true, null); desc.analyze("testDb"); Assert.assertEquals("DATA INFILE ('abc.txt', 'bcd.txt') NEGATIVE INTO TABLE testTable" + " COLUMNS TERMINATED BY '\\x01' (col1, col2)", desc.toString()); // with partition - desc = new DataDescription("testTable", Lists.newArrayList("p1", "p2"), Lists.newArrayList("abc.txt"), - null, null, false, null); + desc = new DataDescription("testTable", Lists.newArrayList("p1", "p2"), + Lists.newArrayList("abc.txt"), + null, null, null, false, null); desc.analyze("testDb"); Assert.assertEquals("DATA INFILE ('abc.txt') INTO TABLE testTable PARTITION (p1, p2)", desc.toString()); @@ -103,8 +111,10 @@ public void testNormal() throws AnalysisException { params.add(new SlotRef(null, "k2")); BinaryPredicate predicate = new BinaryPredicate(Operator.EQ, new SlotRef(null, "k1"), new FunctionCallExpr("alignment_timestamp", params)); - desc = new DataDescription("testTable", Lists.newArrayList("p1", "p2"), Lists.newArrayList("abc.txt"), - Lists.newArrayList("k2", "k3"), null, false, Lists.newArrayList((Expr) predicate)); + desc = new DataDescription("testTable", Lists.newArrayList("p1", "p2"), + Lists.newArrayList("abc.txt"), + Lists.newArrayList("k2", "k3"), null, null, false, Lists + .newArrayList((Expr) predicate)); desc.analyze("testDb"); String sql = "DATA INFILE ('abc.txt') INTO TABLE testTable PARTITION (p1, p2) (k2, k3)" + " SET (`k1` = alignment_timestamp('day', `k2`))"; @@ -116,8 +126,10 @@ public void testNormal() throws AnalysisException { params.add(new StringLiteral("10")); predicate = new BinaryPredicate(Operator.EQ, new SlotRef(null, "k1"), new FunctionCallExpr("replace_value", params)); - desc = new DataDescription("testTable", Lists.newArrayList("p1", "p2"), Lists.newArrayList("abc.txt"), - Lists.newArrayList("k2", "k3"), null, false, Lists.newArrayList((Expr) predicate)); + desc = new DataDescription("testTable", Lists.newArrayList("p1", "p2"), + Lists.newArrayList("abc.txt"), + Lists.newArrayList("k2", "k3"), null, null, + false, Lists.newArrayList((Expr) predicate)); desc.analyze("testDb"); sql = "DATA INFILE ('abc.txt') INTO TABLE testTable PARTITION (p1, p2) (k2, k3)" + " SET (`k1` = replace_value('-', '10'))"; @@ -129,8 +141,10 @@ public void testNormal() throws AnalysisException { params.add(new NullLiteral()); predicate = new BinaryPredicate(Operator.EQ, new SlotRef(null, "k1"), new FunctionCallExpr("replace_value", params)); - desc = new DataDescription("testTable", Lists.newArrayList("p1", "p2"), Lists.newArrayList("abc.txt"), - Lists.newArrayList("k2", "k3"), null, false, Lists.newArrayList((Expr) predicate)); + desc = new DataDescription("testTable", Lists.newArrayList("p1", "p2"), + Lists.newArrayList("abc.txt"), + Lists.newArrayList("k2", "k3"), null, null, false, Lists + .newArrayList((Expr) predicate)); desc.analyze("testDb"); sql = "DATA INFILE ('abc.txt') INTO TABLE testTable PARTITION (p1, p2) (k2, k3)" + " SET (`k1` = replace_value('', NULL))"; @@ -140,20 +154,84 @@ public void testNormal() throws AnalysisException { @Test(expected = AnalysisException.class) public void testNoTable() throws AnalysisException { DataDescription desc = new DataDescription("", null, Lists.newArrayList("abc.txt"), - null, null, false, null); + null, null, null, false, null); desc.analyze("testDb"); } @Test(expected = AnalysisException.class) public void testNoFile() throws AnalysisException { - DataDescription desc = new DataDescription("testTable", null, null, null, null, false, null); + DataDescription desc = new DataDescription("testTable", null, null, null, null, null, false, null); desc.analyze("testDb"); } @Test(expected = AnalysisException.class) public void testDupCol() throws AnalysisException { DataDescription desc = new DataDescription("testTable", null, Lists.newArrayList("abc.txt"), - Lists.newArrayList("col1", "col1"), null, false, null); + Lists.newArrayList("col1", "col1"), null, null, false, null); desc.analyze("testDb"); } + + @Test + public void testAnalyzeColumnsWithDuplicatedColumn(@Injectable SlotRef column1, + @Injectable SlotRef column2) { + List columns = Lists.newArrayList(); + String duplicatedColumnName = "id"; + columns.add(duplicatedColumnName); + columns.add(duplicatedColumnName); + + DataDescription dataDescription = new DataDescription(null, null, null, columns, null, null, false, null); + try { + Deencapsulation.invoke(dataDescription, "analyzeColumns"); + Assert.fail(); + } catch (Exception e) { + if (!(e instanceof AnalysisException)) { + Assert.fail(); + } + } + } + + @Test + public void testAnalyzeColumnsWithDuplicatedColumnMapping(@Injectable BinaryPredicate columnMapping1, + @Injectable BinaryPredicate columnMapping2, + @Injectable SlotRef column1, + @Injectable SlotRef column2, + @Injectable FunctionCallExpr expr1, + @Injectable FunctionCallExpr expr2, + @Injectable FunctionName functionName) { + List columns = Lists.newArrayList(); + columns.add("tmp_col1"); + columns.add("tmp_col2"); + List columnMappingList = Lists.newArrayList(); + columnMappingList.add(columnMapping1); + columnMappingList.add(columnMapping2); + String duplicatedColumnName = "id"; + new Expectations() { + { + columnMapping1.getChild(0); + result = column1; + columnMapping2.getChild(0); + result = column2; + columnMapping1.getChild(1); + result = expr1; + expr1.getFnName(); + result = functionName; + functionName.getFunction(); + result = "test"; + column1.getColumnName(); + result = duplicatedColumnName; + column2.getColumnName(); + result = duplicatedColumnName; + } + }; + DataDescription dataDescription = new DataDescription(null, null, null, columns, null, null, false, + columnMappingList); + try { + Deencapsulation.invoke(dataDescription, "analyzeColumns"); + Assert.fail(); + } catch (Exception e) { + if (!(e instanceof AnalysisException)) { + Assert.fail(); + } + } + } } diff --git a/fe/src/test/java/org/apache/doris/analysis/LoadStmtTest.java b/fe/src/test/java/org/apache/doris/analysis/LoadStmtTest.java index 91d331f86bfdbd..cde97c6163e9f6 100644 --- a/fe/src/test/java/org/apache/doris/analysis/LoadStmtTest.java +++ b/fe/src/test/java/org/apache/doris/analysis/LoadStmtTest.java @@ -31,6 +31,8 @@ import java.util.List; +import mockit.Expectations; +import mockit.Injectable; import mockit.Mocked; import mockit.NonStrictExpectations; import mockit.internal.startup.Startup; @@ -69,15 +71,21 @@ public void setUp() { } @Test - public void testNormal() throws UserException, AnalysisException { - desc.analyze(EasyMock.anyString()); - EasyMock.expectLastCall().anyTimes(); - EasyMock.replay(desc); + public void testNormal(@Injectable DataDescription desc) throws UserException, AnalysisException { + List dataDescriptionList = Lists.newArrayList(); + dataDescriptionList.add(desc); + + new Expectations(){ + { + desc.toSql(); + result = "XXX"; + } + }; - LoadStmt stmt = new LoadStmt(new LabelName("testDb", "testLabel"), dataDescriptions, null, null, null); + LoadStmt stmt = new LoadStmt(new LabelName("testDb", "testLabel"), dataDescriptionList, null, null, null); stmt.analyze(analyzer); Assert.assertEquals("testCluster:testDb", stmt.getLabel().getDbName()); - Assert.assertEquals(dataDescriptions, stmt.getDataDescriptions()); + Assert.assertEquals(dataDescriptionList, stmt.getDataDescriptions()); Assert.assertNull(stmt.getProperties()); Assert.assertEquals("LOAD LABEL `testCluster:testDb`.`testLabel`\n" diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java index e114aef9fb506e..bd0ae6232c318c 100644 --- a/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java +++ b/fe/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java @@ -62,7 +62,8 @@ public void testFromLoadStmt(@Injectable LoadStmt loadStmt, @Injectable DataDescription dataDescription, @Mocked Catalog catalog, @Injectable Database database, - @Injectable BrokerDesc brokerDesc) { + @Injectable BrokerDesc brokerDesc, + @Injectable String originStmt) { List dataDescriptionList = Lists.newArrayList(); dataDescriptionList.add(dataDescription); @@ -88,7 +89,7 @@ public void testFromLoadStmt(@Injectable LoadStmt loadStmt, }; try { - BrokerLoadJob brokerLoadJob = BrokerLoadJob.fromLoadStmt(loadStmt); + BrokerLoadJob brokerLoadJob = BrokerLoadJob.fromLoadStmt(loadStmt, originStmt); Assert.fail(); } catch (DdlException e) { System.out.println("could not find table named " + tableName); @@ -102,7 +103,8 @@ public void testFromLoadStmt(@Injectable LoadStmt loadStmt, @Injectable LabelName labelName, @Injectable Database database, @Injectable OlapTable olapTable, - @Mocked Catalog catalog) { + @Mocked Catalog catalog, + @Injectable String originStmt) { String label = "label"; long dbId = 1; @@ -130,8 +132,6 @@ public void testFromLoadStmt(@Injectable LoadStmt loadStmt, result = olapTable; dataDescription.getPartitionNames(); result = null; - dataDescription.getColumnNames(); - result = null; database.getId(); result = dbId; } @@ -146,7 +146,7 @@ public void checkAndCreateSource(Database db, DataDescription dataDescription, }; try { - BrokerLoadJob brokerLoadJob = BrokerLoadJob.fromLoadStmt(loadStmt); + BrokerLoadJob brokerLoadJob = BrokerLoadJob.fromLoadStmt(loadStmt, originStmt); Assert.assertEquals(Long.valueOf(dbId), Deencapsulation.getField(brokerLoadJob, "dbId")); Assert.assertEquals(label, Deencapsulation.getField(brokerLoadJob, "label")); Assert.assertEquals(JobState.PENDING, Deencapsulation.getField(brokerLoadJob, "state")); diff --git a/fe/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java b/fe/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java index 6581394d4b1b2f..6c5d67597caa8c 100644 --- a/fe/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java +++ b/fe/src/test/java/org/apache/doris/load/loadv2/LoadManagerTest.java @@ -51,7 +51,7 @@ public class LoadManagerTest { private LoadManager loadManager; - private static final String methodName = "getIdToLoadJobs"; + private final String fieldName = "idToLoadJob"; @Before public void setUp() throws Exception { @@ -123,8 +123,8 @@ public void testSerializationNormal(@Mocked Catalog catalog, LoadManager newLoadManager = deserializeFromFile(file); - Map loadJobs = Deencapsulation.invoke(loadManager, methodName); - Map newLoadJobs = Deencapsulation.invoke(newLoadManager, methodName); + Map loadJobs = Deencapsulation.getField(loadManager, fieldName); + Map newLoadJobs = Deencapsulation.getField(newLoadManager, fieldName); Assert.assertEquals(loadJobs, newLoadJobs); } @@ -155,7 +155,7 @@ public void testSerializationWithJobRemoved(@Mocked MetaContext metaContext, File file = serializeToFile(loadManager); LoadManager newLoadManager = deserializeFromFile(file); - Map newLoadJobs = Deencapsulation.invoke(newLoadManager, methodName); + Map newLoadJobs = Deencapsulation.getField(newLoadManager, fieldName); Assert.assertEquals(0, newLoadJobs.size()); }