diff --git a/docs/.vuepress/sidebar/en.js b/docs/.vuepress/sidebar/en.js index 98da4723eb1fe9..8f26f169d6cb28 100644 --- a/docs/.vuepress/sidebar/en.js +++ b/docs/.vuepress/sidebar/en.js @@ -187,6 +187,7 @@ module.exports = [ "sql-mode", "time-zone", "variables", + "update", ], sidebarDepth: 1, }, @@ -534,6 +535,7 @@ module.exports = [ "STREAM LOAD", "alter-routine-load", "insert", + "UPDATE", ], }, { diff --git a/docs/.vuepress/sidebar/zh-CN.js b/docs/.vuepress/sidebar/zh-CN.js index c3f7bcd98a9ea2..3bfdf15f21debc 100644 --- a/docs/.vuepress/sidebar/zh-CN.js +++ b/docs/.vuepress/sidebar/zh-CN.js @@ -189,6 +189,7 @@ module.exports = [ "sql-mode", "time-zone", "variables", + "update", ], sidebarDepth: 1, }, @@ -538,6 +539,7 @@ module.exports = [ "STREAM LOAD", "alter-routine-load", "insert", + "UPDATE", ], }, { diff --git a/docs/en/administrator-guide/update.md b/docs/en/administrator-guide/update.md new file mode 100644 index 00000000000000..b3efc45ece75ca --- /dev/null +++ b/docs/en/administrator-guide/update.md @@ -0,0 +1,126 @@ +--- +{ + "title": "update", + "language": "en" +} +--- + + + +# Update + +If we need to modify or update the data in Doris, we can use the UPDATE command. + +## Applicable scenarios + ++ To modify the value of a row that meets certain conditions. ++ Point updates, small updates, where the rows to be updated are preferably a very small part of the entire table. ++ Only could be used in Unique table + +## Explanation of terms + +1. Unique model: A data model in the Doris system. When the user imports rows with the same Key, the Value of the latter overrides the existing Value, in the same sense as Unique in Mysql. + +## Fundamentals + +Use the query engine's own where filtering logic to filter the rows that need to be updated from the table to be updated. Then use the Unique model's own Value column replacement logic to change the rows to be updated and reinsert them into the table. This enables row-level updates. + +### Example + +Suppose there is an order table in Doris, where order id is the Key column, order status, and order amount are the Value columns. The data state is as follows. + +| order id | order amount | order status | +|--|--|--| +| 1 | 100| Pending Payment | + +At this time, after the user clicks the payment, Doris system needs to change the order id to '1' order status to 'pending shipment', you need to use the Update function. + +``` +UPDATE order SET order status='To be shipped' WHERE order id=1; +``` + +After the user executes the UPDATE command, the system performs the following three steps. + ++ Step 1: Read the rows that satisfy WHERE order id=1 + (1, 100, 'pending payment') ++ Step 2: Change the order status of the row from 'Pending Payment' to 'Pending Shipping' + (1, 100, 'Pending shipment') ++ Step 3: Insert the updated row back into the table to achieve the updated effect. + | order id | order amount | order status | + | ---| ---| ---| + | 1 | 100| Pending Payment | + | 1 | 100 | Pending shipments | + Since the table order is a UNIQUE model, the rows with the same Key, after which the latter will take effect, so the final effect is as follows. + | order id | order amount | order status | + |--|--|--| + | 1 | 100 | Pending shipments | + +## Basic operations + +### UPDATE syntax + +```UPDATE table_name SET value=xxx WHERE condition;``` + ++ ``table_name``: the table to be updated, must be a UNIQUE model table to update. + ++ value=xxx: The column to be updated, the left side of the equation must be the value column of the table. The right side of the equation can be a constant or an expression transformation of a column in a table. + For example, if value = 1, then the value of the column to be updated will be 1. + For example, if value = value + 1, the value of the column to be updated is incremented by 1. + ++ condition: Only rows that satisfy the condition will be updated. condition must be an expression that results in a Boolean type. + For example, if k1 = 1, only rows with a k1 column value of 1 will be updated. + For example, if k1 = k2, only rows with the same value in column k1 as in column k2 will be updated. + No support for unfilled condition, i.e., no support for full table updates. + +### Synchronization + +The Update syntax is a synchronization syntax in Doris. If the Update statement succeeds, the update succeeds and the data is visible. + +### Performance + +The performance of the Update statement is closely related to the number of rows to be updated and the retrieval efficiency of the condition. + ++ Number of rows to be updated: The more rows to be updated, the slower the Update statement will be. This is consistent with the principle of importing. + Doris updates are more suitable for occasional update scenarios, such as changing the values of individual rows. + Doris is not suitable for large batches of data changes. Large modifications can make Update statements take a long time to run. + ++ Condition retrieval efficiency: Doris Update implements the principle of reading the rows that satisfy the condition first, so if the condition retrieval efficiency is high, the Update will be faster. + The condition column should ideally be hit, indexed, or bucket clipped. This way Doris does not need to scan the entire table and can quickly locate the rows that need to be updated. This improves update efficiency. + It is strongly discouraged to include the UNIQUE model value column in the condition column. + +### Concurrency Control + +By default, multiple concurrent Update operations on the same table are not allowed at the same time. + +The main reason for this is that Doris currently supports row updates, which means that even if the user declares ``SET v2 = 1``, virtually all other Value columns will be overwritten (even though the values are not changed). + +This presents a problem in that if two Update operations update the same row at the same time, the behavior may be indeterminate. That is, there may be dirty data. + +However, in practice, the concurrency limit can be turned on manually if the user himself can guarantee that even if concurrent updates are performed, they will not operate on the same row at the same time. This is done by modifying the FE configuration ``enable_concurrent_update``. When the configuration value is true, there is no limit on concurrent updates. + +## Risks of use + +Since Doris currently supports row updates and uses a two-step read-and-write operation, there is uncertainty about the outcome of an Update statement if it modifies the same row as another Import or Delete statement. + +Therefore, when using Doris, you must be careful to control the concurrency of Update statements and other DML statements on the *user side itself*. + +## Version + +Doris Version 0.15.x + diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW EXPORT.md b/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW EXPORT.md index 0df0431f4e2cba..1cda8f848dbc44 100644 --- a/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW EXPORT.md +++ b/docs/en/sql-reference/sql-statements/Data Manipulation/SHOW EXPORT.md @@ -28,15 +28,15 @@ under the License. ## Description This statement is used to show the execution of the specified export task Grammar: -SHOW EXPORT -[FROM both names] -[ -WHERE -[ID = your_job_id] -[STATE = ["PENDING"|"EXPORTING"|"FINISHED"|"CANCELLED"]] -] -[ORDER BY ...] -[LIMIT limit]; + SHOW EXPORT + [FROM db_name] + [ + WHERE + [ID = your_job_id] + [STATE = ["PENDING"|"EXPORTING"|"FINISHED"|"CANCELLED"]] + ] + [ORDER BY ...] + [LIMIT limit]; Explain: 1) If db_name is not specified, use the current default DB @@ -46,16 +46,16 @@ Explain: ## example 1. Show all export tasks of default DB -SHOW EXPORT; + SHOW EXPORT; 2. Show the export tasks of the specified db, sorted in descending order by StartTime -SHOW EXPORT FROM example_db ORDER BY StartTime DESC; + SHOW EXPORT FROM example_db ORDER BY StartTime DESC; 3. Show the export task of the specified db, state is "exporting" and sorted in descending order by StartTime -SHOW EXPORT FROM example_db WHERE STATE = "exporting" ORDER BY StartTime DESC; + SHOW EXPORT FROM example_db WHERE STATE = "exporting" ORDER BY StartTime DESC; 4. Show the export task of specifying dB and job_id -SHOW EXPORT FROM example_db WHERE ID = job_id; + SHOW EXPORT FROM example_db WHERE ID = job_id; ## keyword SHOW,EXPORT diff --git a/docs/en/sql-reference/sql-statements/Data Manipulation/UPDATE.md b/docs/en/sql-reference/sql-statements/Data Manipulation/UPDATE.md new file mode 100644 index 00000000000000..d087659e087450 --- /dev/null +++ b/docs/en/sql-reference/sql-statements/Data Manipulation/UPDATE.md @@ -0,0 +1,75 @@ +--- +{ + "title": "UPDATE", + "language": "en" +} +--- + + + +# UPDATE +## description +### Syntax + +``` +UPDATE table_name + SET assignment_list + WHERE expression + +value: + {expr | DEFAULT} + +assignment: + col_name = value + +assignment_list: + assignment [, assignment] ... +``` + +### Parameters + ++ table_name: The target table of the data to be updated. Can be in the form of 'db_name.table_name' ++ assignment_list: The target column to be updated. Can be in the form of 'col_name = value, col_name = value' ++ where expression: The condition to be updated is an expression that returns true or false + +### Note + +The current UPDATE statement only supports row updates on the Unique model, and there may be data conflicts caused by concurrent updates. +Currently Doris does not deal with such problems, and users are required to avoid such problems from the business side. + +## example + +The `test` table is a unique model table, which contains four columns: k1, k2, v1, v2. Among them, k1, k2 are keys, v1, v2 are values, and the aggregation method is Replace. + +1. Update the v1 column that satisfies the conditions k1 =1 and k2 = 2 in the'test' table to 1 + +``` +UPDATE test SET v1 = 1 WHERE k1=1 and k2=2; +``` + +2. Increment the v1 column of the column with k1=1 in the'test' table by 1 + +``` +UPDATE test SET v1 = v1+1 WHERE k1=1; +``` + +## keyword + + UPDATE diff --git a/docs/zh-CN/administrator-guide/update.md b/docs/zh-CN/administrator-guide/update.md new file mode 100644 index 00000000000000..c994c8a6c7e8ee --- /dev/null +++ b/docs/zh-CN/administrator-guide/update.md @@ -0,0 +1,126 @@ +--- +{ + "title": "更新", + "language": "zh-CN" +} +--- + + + +# 更新 + +如果我们需要修改或更新 Doris 中的数据,就可以使用 UPDATE 命令来操作。 + +## 适用场景 + ++ 对满足某些条件的行,修改他的取值。 ++ 点更新,小范围更新,待更新的行最好是整个表的非常小一部分。 ++ update 命令只能在 Unique 数据模型的表中操作。 + +## 名词解释 + +1. Unique 模型:Doris 系统中的一种数据模型。将列分为两类,Key 和 Value。当用户导入相同 Key 的行时,后者的 Value 会覆盖已有的 Value。与 Mysql 中的 Unique 含义一致。 + +## 基本原理 + +利用查询引擎自身的 where 过滤逻辑,从待更新表中筛选出需要被更新的行。再利用 Unique 模型自带的 Value 列新数据替换旧数据的逻辑,将待更新的行变更后,再重新插入到表中。从而实现行级别更新。 + +举例说明 + +假设 Doris 中存在一张订单表,其中 订单id 是 Key 列,订单状态,订单金额是 Value 列。数据状态如下: + +|订单id | 订单金额| 订单状态| +|---|---|---| +| 1 | 100| 待付款 | + +这时候,用户点击付款后,Doris 系统需要将订单id 为 '1' 的订单状态变更为 '待发货', 就需要用到 Update 功能。 + +``` +UPDATE order SET 订单状态='待发货' WHERE 订单id=1; +``` + +用户执行 UPDATE 命令后,系统会进行如下三步: + ++ 第一步:读取满足 WHERE 订单id=1 的行 + (1,100,'待付款') ++ 第二步:变更该行的订单状态,从'待付款'改为'待发货' + (1,100,'待发货') ++ 第三步:将更新后的行再插入回表中,从而达到更新的效果。 + |订单id | 订单金额| 订单状态| + |---|---|---| + | 1 | 100| 待付款 | + | 1 | 100 | 待发货 | + 由于表 order 是 UNIQUE 模型,所以相同 Key 的行,之后后者才会生效,所以最终效果如下: + |订单id | 订单金额| 订单状态| + |---|---|---| + | 1 | 100 | 待发货 | + +## 基本操作 + +### UPDATE 语法 + +```UPDATE table_name SET value=xxx WHERE condition;``` + ++ `table_name`: 待更新的表,必须是 UNIQUE 模型的表才能进行更新。 + ++ value=xxx: 待更新的列,等式左边必须是表的 value 列。等式右边可以是常量,也可以是某个表中某列的表达式变换。 + 比如 value = 1, 则待更新的列值会变为1。 + 比如 value = value +1, 则待更新的列值会自增1。 + ++ condition:只有满足 condition 的行才会被更新。condition 必须是一个结果为 Boolean 类型的表达式。 + 比如 k1 = 1, 则只有当 k1 列值为1的行才会被更新。 + 比如 k1 = k2, 则只有 k1 列值和 k2 列一样的行才会被更新。 + 不支持不填写condition,也就是不支持全表更新。 + +### 同步 + +Update 语法在 Doris 中是一个同步语法,既 Update 语句成功,更新就成功了,数据可见。 + +### 性能 + +Update 语句的性能和待更新的行数,以及 condition 的检索效率密切相关。 + ++ 待更新的行数:待更新的行数越多,Update 语句的速度就会越慢。这和导入的原理是一致的。 + Doris 的更新比较合适偶发更新的场景,比如修改个别行的值。 + Doris 并不适合大批量的修改数据。大批量修改会使得 Update 语句运行时间很久。 + ++ condition 的检索效率:Doris 的 Update 实现原理是先将满足 condition 的行读取处理,所以如果 condition 的检索效率高,则 Update 的速度也会快。 + condition 列最好能命中,索引或者分区分桶裁剪。这样 Doris 就不需要扫全表,可以快速定位到需要更新的行。从而提升更新效率。 + 强烈不推荐 condition 列中包含 UNIQUE 模型的 value 列。 + +### 并发控制 + +默认情况下,并不允许同一时间对同一张表并发进行多个 Update 操作。 + +主要原因是,Doris 目前支持的是行更新,这意味着,即使用户声明的是 ```SET v2 = 1```,实际上,其他所有的 Value 列也会被覆盖一遍(尽管值没有变化)。 + +这就会存在一个问题,如果同时有两个 Update 操作对同一行进行更新,那么其行为可能是不确定的。也就是可能存在脏数据。 + +但在实际应用中,如果用户自己可以保证即使并发更新,也不会同时对同一行进行操作的话,就可以手动打开并发限制。通过修改 FE 配置 ```enable_concurrent_update```。当配置值为 true 时,则对更新并发无限制。 + +## 使用风险 + +由于 Doris 目前支持的是行更新,并且采用的是读取后再写入的两步操作,则如果 Update 语句和其他导入或 Delete 语句刚好修改的是同一行时,存在不确定的数据结果。 + +所以用户在使用的时候,一定要注意*用户侧自己*进行 Update 语句和其他 DML 语句的并发控制。 + +## 版本 + +Doris Version 0.15.x + diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW EXPORT.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW EXPORT.md index 91d9238028f5d6..d4ad10b549b299 100644 --- a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW EXPORT.md +++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/SHOW EXPORT.md @@ -55,7 +55,7 @@ under the License. SHOW EXPORT FROM example_db WHERE STATE = "exporting" ORDER BY StartTime DESC; 4. 展示指定db,指定job_id的导出任务 - SHOW EXPORT FROM example_db WHERE ID = job_id; + SHOW EXPORT FROM example_db WHERE ID = job_id; ## keyword SHOW,EXPORT diff --git a/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/UPDATE.md b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/UPDATE.md new file mode 100644 index 00000000000000..861cd4baf7f94e --- /dev/null +++ b/docs/zh-CN/sql-reference/sql-statements/Data Manipulation/UPDATE.md @@ -0,0 +1,75 @@ +--- +{ + "title": "UPDATE", + "language": "zh-CN" +} +--- + + + +# UPDATE +## description +### Syntax + +``` +UPDATE table_name + SET assignment_list + WHERE expression + +value: + {expr | DEFAULT} + +assignment: + col_name = value + +assignment_list: + assignment [, assignment] ... +``` + +### Parameters + ++ table_name: 待更新数据的目标表。可以是 'db_name.table_name' 形式 ++ assignment_list: 待更新的目标列,形如 'col_name = value, col_name = value' 格式 ++ where expression: 期望更新的条件,一个返回 true 或者 false 的表达式即可 + +### Note + +当前 UPDATE 语句仅支持在 Unique 模型上的行更新,存在并发更新导致的数据冲突可能。 +目前 Doris 并不处理这类问题,需要用户从业务侧规避这类问题。 + +## example + +`test` 表是一个 unique 模型的表,包含: k1, k2, v1, v2 四个列。其中 k1, k2 是 key,v1, v2 是value,聚合方式是 Replace。 + +1. 将 'test' 表中满足条件 k1 =1 , k2 =2 的 v1 列更新为 1 + +``` +UPDATE test SET v1 = 1 WHERE k1=1 and k2=2; +``` + +2. 将 'test' 表中 k1=1 的列的 v1 列自增1 + +``` +UPDATE test SET v1 = v1+1 WHERE k1=1; +``` + +## keyword + + UPDATE diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index c3c3ed1407daae..fb6106586f0a99 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -239,7 +239,7 @@ terminal String KW_ADD, KW_ADMIN, KW_AFTER, KW_AGGREGATE, KW_ALL, KW_ALTER, KW_A KW_COLLATE, KW_COLLATION, KW_COLUMN, KW_COLON, KW_COLUMNS, KW_COMMENT, KW_COMMIT, KW_COMMITTED, KW_CONFIG, KW_CONNECTION, KW_CONNECTION_ID, KW_CONSISTENT, KW_CONVERT, KW_COUNT, KW_CREATE, KW_CROSS, KW_CUBE, KW_CURRENT, KW_CURRENT_USER, KW_DATA, KW_DATABASE, KW_DATABASES, KW_DATE, KW_DATETIME, KW_DAY, KW_DECIMAL, KW_DECOMMISSION, KW_DEFAULT, KW_DESC, KW_DESCRIBE, - KW_DELETE, KW_DISTINCT, KW_DISTINCTPC, KW_DISTINCTPCSA, KW_DISTRIBUTED, KW_DISTRIBUTION, KW_DYNAMIC, KW_BUCKETS, KW_DIV, KW_DOUBLE, KW_DROP, KW_DROPP, KW_DUPLICATE, + KW_DELETE, KW_UPDATE, KW_DISTINCT, KW_DISTINCTPC, KW_DISTINCTPCSA, KW_DISTRIBUTED, KW_DISTRIBUTION, KW_DYNAMIC, KW_BUCKETS, KW_DIV, KW_DOUBLE, KW_DROP, KW_DROPP, KW_DUPLICATE, KW_ELSE, KW_ENABLE, KW_ENCRYPTKEY, KW_ENCRYPTKEYS, KW_END, KW_ENGINE, KW_ENGINES, KW_ENTER, KW_ERRORS, KW_EVENTS, KW_EXCEPT, KW_EXCLUDE, KW_EXISTS, KW_EXPORT, KW_EXTERNAL, KW_EXTRACT, KW_FALSE, KW_FEATURE, KW_FOLLOWER, KW_FOLLOWING, KW_FREE, KW_FROM, KW_FILE, KW_FILTER, KW_FIRST, KW_FLOAT, KW_FOR, KW_FORCE, KW_FORMAT, KW_FRONTEND, KW_FRONTENDS, KW_FULL, KW_FUNCTION, KW_FUNCTIONS, @@ -331,6 +331,7 @@ nonterminal QueryStmt set_operation_with_order_by_or_limit; nonterminal InsertStmt insert_stmt; nonterminal InsertTarget insert_target; nonterminal InsertSource insert_source; +nonterminal UpdateStmt update_stmt; nonterminal BackupStmt backup_stmt; nonterminal AbstractBackupTableRefClause opt_backup_table_ref_list; @@ -686,6 +687,8 @@ stmt ::= {: RESULT = stmt; :} | insert_stmt : stmt {: RESULT = stmt; :} + | update_stmt : stmt + {: RESULT = stmt; :} | backup_stmt : stmt {: RESULT = stmt; :} | restore_stmt : stmt @@ -3117,6 +3120,14 @@ insert_source ::= :} ; +// update stmt +update_stmt ::= + KW_UPDATE table_name:tbl KW_SET expr_list:setExprs where_clause:whereClause + {: + RESULT = new UpdateStmt(tbl, setExprs, whereClause); + :} + ; + // backup stmt backup_stmt ::= KW_BACKUP KW_SNAPSHOT job_label:label diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java index 4230c188213fb8..7c7888c243c41c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java @@ -457,7 +457,7 @@ public TupleDescriptor registerTableRef(TableRef ref) throws AnalysisException { result.setAliases(aliases, ref.hasExplicitAlias()); // Register all legal aliases. - for (String alias: aliases) { + for (String alias : aliases) { // TODO(zc) // aliasMap_.put(alias, result); tupleByAlias.put(alias, result); @@ -467,6 +467,33 @@ public TupleDescriptor registerTableRef(TableRef ref) throws AnalysisException { return result; } + /** + * Create an new tuple descriptor for the given table, register all table columns. + * Using this method requires external table read locks in advance. + */ + public TupleDescriptor registerOlapTable(Table table, TableName tableName, List partitions) { + TableRef ref = new TableRef(tableName, null, partitions == null ? null : new PartitionNames(false, partitions)); + BaseTableRef tableRef = new BaseTableRef(ref, table, tableName); + TupleDescriptor result = globalState.descTbl.createTupleDescriptor(); + result.setTable(table); + result.setRef(tableRef); + result.setAliases(tableRef.getAliases(), ref.hasExplicitAlias()); + for (Column col : table.getBaseSchema(true)) { + SlotDescriptor slot = globalState.descTbl.addSlotDescriptor(result); + slot.setIsMaterialized(true); + slot.setColumn(col); + slot.setIsNullable(col.isAllowNull()); + String key = tableRef.aliases_[0] + "." + col.getName(); + slotRefMap.put(key, slot); + } + globalState.descTbl.computeStatAndMemLayout(); + tableRefMap_.put(result.getId(), ref); + for (String alias : tableRef.getAliases()) { + tupleByAlias.put(alias, result); + } + return result; + } + public List getAllTupleIds() { return new ArrayList<>(tableRefMap_.keySet()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java index b6b503f5e95b22..6b29f8d7e95b4d 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Expr.java @@ -20,6 +20,7 @@ import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Function; import org.apache.doris.catalog.FunctionSet; +import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.ScalarType; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; @@ -1234,6 +1235,43 @@ public void checkReturnsBool(String name, boolean printExpr) throws AnalysisExce } } + public Expr checkTypeCompatibility(Type targetType) throws AnalysisException { + if (targetType.getPrimitiveType().equals(type.getPrimitiveType())) { + return this; + } + // bitmap must match exactly + if (targetType.getPrimitiveType() == PrimitiveType.BITMAP) { + throw new AnalysisException("bitmap column require the function return type is BITMAP"); + } + // TargetTable's hll column must be hll_hash's result + if (targetType.getPrimitiveType() == PrimitiveType.HLL) { + checkHllCompatibility(); + return this; + } + Expr newExpr = castTo(targetType); + newExpr.checkValueValid(); + return newExpr; + } + + private void checkHllCompatibility() throws AnalysisException { + final String hllMismatchLog = "Column's type is HLL," + + " SelectList must contains HLL or hll_hash or hll_empty function's result"; + if (this instanceof SlotRef) { + final SlotRef slot = (SlotRef) this; + if (!slot.getType().equals(Type.HLL)) { + throw new AnalysisException(hllMismatchLog); + } + } else if (this instanceof FunctionCallExpr) { + final FunctionCallExpr functionExpr = (FunctionCallExpr) this; + if (!functionExpr.getFnName().getFunction().equalsIgnoreCase("hll_hash") && + !functionExpr.getFnName().getFunction().equalsIgnoreCase("hll_empty")) { + throw new AnalysisException(hllMismatchLog); + } + } else { + throw new AnalysisException(hllMismatchLog); + } + } + /** * Checks validity of cast, and * calls uncheckedCastTo() to diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java index 3b961bdb25c4c4..3a3cd91578bc07 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/InsertStmt.java @@ -18,7 +18,6 @@ package org.apache.doris.analysis; import org.apache.doris.alter.SchemaChangeHandler; -import org.apache.doris.catalog.AggregateType; import org.apache.doris.catalog.BrokerTable; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; @@ -29,7 +28,6 @@ import org.apache.doris.catalog.Partition; import org.apache.doris.catalog.PartitionType; import org.apache.doris.catalog.Table; -import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.DdlException; import org.apache.doris.common.ErrorCode; @@ -92,8 +90,7 @@ public class InsertStmt extends DdlStmt { private final TableName tblName; private final PartitionNames targetPartitionNames; // parsed from targetPartitionNames. - // if targetPartitionNames is not set, add all formal partitions' id of the table into it - private List targetPartitionIds = Lists.newArrayList(); + private List targetPartitionIds; private final List targetColumnNames; private QueryStmt queryStmt; private final List planHints; @@ -344,6 +341,7 @@ private void analyzeTargetTable(Analyzer analyzer) throws AnalysisException { // partition if (targetPartitionNames != null) { + targetPartitionIds = Lists.newArrayList(); if (olapTable.getPartitionInfo().getType() == PartitionType.UNPARTITIONED) { ErrorReport.reportAnalysisException(ErrorCode.ERR_PARTITION_CLAUSE_NO_ALLOWED); } @@ -355,14 +353,6 @@ private void analyzeTargetTable(Analyzer analyzer) throws AnalysisException { } targetPartitionIds.add(part.getId()); } - } else { - for (Partition partition : olapTable.getPartitions()) { - targetPartitionIds.add(partition.getId()); - } - if (targetPartitionIds.isEmpty()) { - ErrorReport.reportAnalysisException( - ErrorCode.ERR_EMPTY_PARTITION_IN_TABLE, targetTable.getName()); - } } // need a descriptor DescriptorTable descTable = analyzer.getDescTbl(); @@ -559,15 +549,8 @@ private void analyzeSubquery(Analyzer analyzer) throws UserException { // check compatibility for (int i = 0; i < targetColumns.size(); ++i) { Column column = targetColumns.get(i); - if (column.getType().isHllType()) { - Expr expr = queryStmt.getResultExprs().get(i); - checkHllCompatibility(column, expr); - } - - if (column.getAggregationType() == AggregateType.BITMAP_UNION) { - Expr expr = queryStmt.getResultExprs().get(i); - checkBitmapCompatibility(column, expr); - } + Expr expr = queryStmt.getResultExprs().get(i); + queryStmt.getResultExprs().set(i, expr.checkTypeCompatibility(column.getType())); } } @@ -647,11 +630,6 @@ private void analyzeRow(Analyzer analyzer, List targetColumns, List selectList = Expr.cloneList(queryStmt.getBaseTblResultExprs()); @@ -739,7 +678,7 @@ public void prepareExpressions() throws UserException { int numCols = targetColumns.size(); for (int i = 0; i < numCols; ++i) { Column col = targetColumns.get(i); - Expr expr = checkTypeCompatibility(col, selectList.get(i)); + Expr expr = selectList.get(i).checkTypeCompatibility(col.getType()); selectList.set(i, expr); exprByName.put(col.getName(), expr); } @@ -758,7 +697,8 @@ public void prepareExpressions() throws UserException { resultExprs.add(NullLiteral.create(col.getType())); } else { - resultExprs.add(checkTypeCompatibility(col, new StringLiteral(col.getDefaultValue()))); + StringLiteral defaultValueExpr = new StringLiteral(col.getDefaultValue()); + resultExprs.add(defaultValueExpr.checkTypeCompatibility(col.getType())); } } } @@ -814,7 +754,9 @@ public DataPartition getDataPartition() { @Override public void reset() { super.reset(); - targetPartitionIds.clear(); + if (targetPartitionIds != null) { + targetPartitionIds.clear(); + } queryStmt.reset(); resultExprs.clear(); exprByName.clear(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/IntLiteral.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/IntLiteral.java index 65aee319977bbb..9a43c820defbaa 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/IntLiteral.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/IntLiteral.java @@ -126,7 +126,6 @@ private void checkValueValid(long longValue, Type type) throws AnalysisException valid = false; break; } - if (!valid) { throw new AnalysisException("Number out of range[" + longValue + "]. type: " + type); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/LargeIntLiteral.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/LargeIntLiteral.java index 3e0b2413b562ee..0e3fd0c7a8f0f7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/LargeIntLiteral.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/LargeIntLiteral.java @@ -20,7 +20,6 @@ import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; -import org.apache.doris.common.NotImplementedException; import org.apache.doris.common.io.Text; import org.apache.doris.thrift.TExprNode; import org.apache.doris.thrift.TExprNodeType; @@ -204,7 +203,7 @@ protected Expr uncheckedCastTo(Type targetType) throws AnalysisException { } @Override - public void swapSign() throws NotImplementedException { + public void swapSign() { // swapping sign does not change the type value = value.negate(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java new file mode 100644 index 00000000000000..4853b3fb73884b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/UpdateStmt.java @@ -0,0 +1,216 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.AggregateType; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.KeysType; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; + +import com.google.common.base.Preconditions; + +import java.util.List; +import java.util.Set; +import java.util.TreeSet; + +/** + * UPDATE is a DML statement that modifies rows in a table. + * The current update syntax only supports updating the filtered data of a single table. + * + * UPDATE table_reference + * SET assignment_list + * [WHERE where_condition] + * + * value: + * {expr} + * + * assignment: + * col_name = value + * + * assignment_list: + * assignment [, assignment] ... + */ +public class UpdateStmt extends DdlStmt { + + private TableName tableName; + private List setExprs; + private Expr whereExpr; + + // After analyzed + private Table targetTable; + private TupleDescriptor srcTupleDesc; + + public UpdateStmt(TableName tableName, List setExprs, Expr whereExpr) { + this.tableName = tableName; + this.setExprs = setExprs; + this.whereExpr = whereExpr; + } + + public TableName getTableName() { + return tableName; + } + + public List getSetExprs() { + return setExprs; + } + + public Expr getWhereExpr() { + return whereExpr; + } + + public Table getTargetTable() { + return targetTable; + } + + public TupleDescriptor getSrcTupleDesc() { + return srcTupleDesc; + } + + @Override + public void analyze(Analyzer analyzer) throws UserException { + super.analyze(analyzer); + analyzeTargetTable(analyzer); + analyzeSetExprs(analyzer); + analyzeWhereExpr(analyzer); + } + + private void analyzeTargetTable(Analyzer analyzer) throws AnalysisException { + // step1: analyze table name + tableName.analyze(analyzer); + // step2: resolve table name with catalog, only unique olap table could be update + String dbName = tableName.getDb(); + String targetTableName = tableName.getTbl(); + Preconditions.checkNotNull(dbName); + Preconditions.checkNotNull(targetTableName); + Database database = Catalog.getCurrentCatalog().getDb(dbName); + if (database == null) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_DB_ERROR, dbName); + } + targetTable = database.getTable(tableName.getTbl()); + if (targetTable == null) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_BAD_TABLE_ERROR, tableName.getTbl()); + } + if (targetTable.getType() != Table.TableType.OLAP + || ((OlapTable) targetTable).getKeysType() != KeysType.UNIQUE_KEYS) { + throw new AnalysisException("Only unique olap table could be updated."); + } + // step3: register tuple desc + targetTable.readLock(); + try { + srcTupleDesc = analyzer.registerOlapTable(targetTable, tableName, null); + } finally { + targetTable.readUnlock(); + } + } + + private void analyzeSetExprs(Analyzer analyzer) throws AnalysisException { + // step1: analyze set exprs + Set columnMappingNames = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); + // the column expr only support binary predicate which's child(0) must be a SloRef. + // the duplicate column name of SloRef is forbidden. + for (Expr setExpr : setExprs) { + if (!(setExpr instanceof BinaryPredicate)) { + throw new AnalysisException("Set function expr only support eq binary predicate. " + + "Expr: " + setExpr.toSql()); + } + BinaryPredicate predicate = (BinaryPredicate) setExpr; + if (predicate.getOp() != BinaryPredicate.Operator.EQ) { + throw new AnalysisException("Set function expr only support eq binary predicate. " + + "The predicate operator error, op: " + predicate.getOp()); + } + Expr lhs = predicate.getChild(0); + if (!(lhs instanceof SlotRef)) { + throw new AnalysisException("Set function expr only support eq binary predicate " + + "which's child(0) must be a column name. " + + "The child(0) expr error. expr: " + lhs.toSql()); + } + String column = ((SlotRef) lhs).getColumnName(); + if (!columnMappingNames.add(column)) { + throw new AnalysisException("Duplicate column setting: " + column); + } + } + // step2: resolve target columns with catalog, + // only value columns which belong to target table could be updated. + for (Expr setExpr : setExprs) { + Preconditions.checkState(setExpr instanceof BinaryPredicate); + // check target column + // 1. columns must belong to target table + // 2. only value columns could be updated + Expr lhs = setExpr.getChild(0); + if (!(lhs instanceof SlotRef)) { + throw new AnalysisException("The left side of the set expr must be the column name"); + } + lhs.analyze(analyzer); + if (((SlotRef) lhs).getColumn().getAggregationType() != AggregateType.REPLACE) { + throw new AnalysisException("Only value columns of unique table could be updated."); + } + // check set expr of target column + Expr rhs = setExpr.getChild(1); + checkLargeIntOverflow(rhs); + rhs.analyze(analyzer); + if (lhs.getType() != rhs.getType()) { + setExpr.setChild(1, rhs.checkTypeCompatibility(lhs.getType())); + } + } + } + + /* + The overflow detection of LargeInt needs to be verified again here. + The reason is: the first overflow detection(in constructor) cannot filter 2^127. + Therefore, a second verification is required here. + */ + private void checkLargeIntOverflow(Expr expr) throws AnalysisException { + if (expr instanceof LargeIntLiteral) { + expr.analyzeImpl(analyzer); + } + } + + private void analyzeWhereExpr(Analyzer analyzer) throws AnalysisException { + if (whereExpr == null) { + throw new AnalysisException("Where clause is required"); + } + whereExpr = analyzer.getExprRewriter().rewrite(whereExpr, analyzer); + whereExpr.analyze(analyzer); + if (!whereExpr.getType().equals(Type.BOOLEAN)) { + throw new AnalysisException("Where clause is not a valid statement return bool"); + } + analyzer.registerConjunct(whereExpr, srcTupleDesc.getId()); + } + + @Override + public String toSql() { + StringBuilder sb = new StringBuilder("UPDATE "); + sb.append(tableName.toSql()).append("\n"); + sb.append(" ").append("SET "); + for (Expr setExpr : setExprs) { + sb.append(setExpr.toSql()).append(", "); + } + sb.append("\n"); + if (whereExpr != null) { + sb.append(" ").append("WHERE ").append(whereExpr.toSql()); + } + return sb.toString(); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java index 3e2266e5e31aed..c53d9acb4de37f 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Catalog.java @@ -157,6 +157,7 @@ import org.apache.doris.load.routineload.RoutineLoadManager; import org.apache.doris.load.routineload.RoutineLoadScheduler; import org.apache.doris.load.routineload.RoutineLoadTaskScheduler; +import org.apache.doris.load.update.UpdateManager; import org.apache.doris.master.Checkpoint; import org.apache.doris.master.MetaHelper; import org.apache.doris.master.PartitionInMemoryInfoCollector; @@ -306,6 +307,7 @@ public class Catalog { private ConsistencyChecker consistencyChecker; private BackupHandler backupHandler; private PublishVersionDaemon publishVersionDaemon; + private UpdateManager updateManager; private DeleteHandler deleteHandler; private DbUsedDataQuotaInfoCollector dbUsedDataQuotaInfoCollector; private PartitionInMemoryInfoCollector partitionInMemoryInfoCollector; @@ -493,6 +495,7 @@ private Catalog(boolean isCheckpointCatalog) { this.backupHandler = new BackupHandler(this); this.metaDir = Config.meta_dir; this.publishVersionDaemon = new PublishVersionDaemon(); + this.updateManager = new UpdateManager(); this.deleteHandler = new DeleteHandler(); this.dbUsedDataQuotaInfoCollector = new DbUsedDataQuotaInfoCollector(); this.partitionInMemoryInfoCollector = new PartitionInMemoryInfoCollector(); @@ -4872,6 +4875,10 @@ public BackupHandler getBackupHandler() { return this.backupHandler; } + public UpdateManager getUpdateManager() { + return updateManager; + } + public DeleteHandler getDeleteHandler() { return this.deleteHandler; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java index fc2511bba4bad7..48d85b6136da1e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Column.java @@ -20,6 +20,8 @@ import org.apache.doris.alter.SchemaChangeHandler; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.StringLiteral; +import org.apache.doris.common.AnalysisException; import org.apache.doris.common.CaseSensibility; import org.apache.doris.common.DdlException; import org.apache.doris.common.FeMetaVersion; @@ -273,6 +275,16 @@ public String getDefaultValue() { return this.defaultValue; } + public Expr getDefaultValueExpr() throws AnalysisException { + StringLiteral defaultValueLiteral = new StringLiteral(defaultValue); + if (getDataType() == PrimitiveType.VARCHAR) { + return defaultValueLiteral; + } + Expr result = defaultValueLiteral.castTo(getType()); + result.checkValueValid(); + return result; + } + public void setStats(ColumnStats stats) { this.stats = stats; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java index 844c2b7bd96a44..d409acc18b1cb5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java @@ -826,6 +826,10 @@ public Set getPartitionNames() { return Sets.newHashSet(nameToPartition.keySet()); } + public List getPartitionIds() { + return getPartitions().stream().map(entity -> entity.getId()).collect(Collectors.toList()); + } + public Set getCopiedBfColumns() { if (bfColumns == null) { return null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java index 3f0155e90b3785..e1012dd518d93e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java @@ -1450,4 +1450,7 @@ public class Config extends ConfigBase { */ @ConfField(mutable = false, masterOnly = true) public static int partition_in_memory_update_interval_secs = 300; + + @ConfField(masterOnly = true) + public static boolean enable_concurrent_update = false; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java index a07286bc76a485..30c0f453679124 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/ExportJob.java @@ -376,9 +376,7 @@ private ScanNode genScanNode() throws UserException { switch (exportTable.getType()) { case OLAP: scanNode = new OlapScanNode(new PlanNodeId(0), exportTupleDesc, "OlapScanNodeForExport"); - ((OlapScanNode) scanNode).setColumnFilters(Maps.newHashMap()); - ((OlapScanNode) scanNode).setIsPreAggregation(false, "This an export operation"); - ((OlapScanNode) scanNode).setCanTurnOnPreAggr(false); + ((OlapScanNode) scanNode).closePreAggregation("This an export operation"); ((OlapScanNode) scanNode).selectBestRollupByRollupSelector(analyzer); break; case ODBC: @@ -461,7 +459,7 @@ private void genCoordinators(List fragments, List nodes) ScanNode scanNode = nodes.get(i); TUniqueId queryId = new TUniqueId(uuid.getMostSignificantBits() + i, uuid.getLeastSignificantBits()); Coordinator coord = new Coordinator( - id, queryId, desc, Lists.newArrayList(fragment), Lists.newArrayList(scanNode), clusterName, + id, queryId, desc, Lists.newArrayList(fragment), Lists.newArrayList(scanNode), TimeUtils.DEFAULT_TIME_ZONE); coord.setExecMemoryLimit(getExecMemLimit()); this.coordList.add(coord); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index 2e0c7bb60e4274..1345d80a69079a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -116,7 +116,7 @@ protected void executeTask() throws Exception{ private void executeOnce() throws Exception { // New one query id, Coordinator curCoordinator = new Coordinator(callback.getCallbackId(), loadId, planner.getDescTable(), - planner.getFragments(), planner.getScanNodes(), db.getClusterName(), planner.getTimezone()); + planner.getFragments(), planner.getScanNodes(), planner.getTimezone()); curCoordinator.setQueryType(TQueryType.LOAD); curCoordinator.setExecMemoryLimit(execMemLimit); /* diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java index 2d8d574e7d16a4..4d2dbe434d95c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java @@ -26,7 +26,6 @@ import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.OlapTable; -import org.apache.doris.catalog.Partition; import org.apache.doris.common.LoadException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.NotImplementedException; @@ -171,30 +170,19 @@ public String getTimezone() { } private List getAllPartitionIds() throws LoadException, MetaNotFoundException { - Set partitionIds = Sets.newHashSet(); + Set specifiedPartitionIds = Sets.newHashSet(); for (BrokerFileGroup brokerFileGroup : fileGroups) { if (brokerFileGroup.getPartitionIds() != null) { - partitionIds.addAll(brokerFileGroup.getPartitionIds()); + specifiedPartitionIds.addAll(brokerFileGroup.getPartitionIds()); } // all file group in fileGroups should have same partitions, so only need to get partition ids // from one of these file groups break; } - - if (partitionIds.isEmpty()) { - for (Partition partition : table.getPartitions()) { - partitionIds.add(partition.getId()); - } + if (specifiedPartitionIds.isEmpty()) { + return null; } - - // If this is a dynamic partitioned table, it will take some time to create the partition after the - // table is created, a exception needs to be thrown here - if (partitionIds.isEmpty()) { - throw new LoadException("data cannot be inserted into table with empty partition. " + - "Use `SHOW PARTITIONS FROM " + table.getName() + "` to see the currently partitions of this table. "); - } - - return Lists.newArrayList(partitionIds); + return Lists.newArrayList(specifiedPartitionIds); } // when retry load by reusing this plan in load process, the load_id should be changed diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdateManager.java b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdateManager.java new file mode 100644 index 00000000000000..3199df79286126 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdateManager.java @@ -0,0 +1,87 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.update; + +import org.apache.doris.analysis.UpdateStmt; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.UserException; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +public class UpdateManager { + private final boolean enableConcurrentUpdate = Config.enable_concurrent_update; + private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock(); + private Map> tableIdToCurrentUpdate = Maps.newConcurrentMap(); + + private void writeLock() { + rwLock.writeLock().lock(); + } + + private void writeUnlock() { + rwLock.writeLock().unlock(); + } + + public void handleUpdate(UpdateStmt updateStmt) throws UserException { + UpdateStmtExecutor updateStmtExecutor = addUpdateExecutor(updateStmt); + try { + updateStmtExecutor.execute(); + } finally { + removeUpdateExecutor(updateStmtExecutor); + } + } + + private UpdateStmtExecutor addUpdateExecutor(UpdateStmt updateStmt) throws AnalysisException, DdlException { + writeLock(); + try { + List currentUpdateList = tableIdToCurrentUpdate.get(updateStmt.getTargetTable().getId()); + if (!enableConcurrentUpdate && currentUpdateList != null && currentUpdateList.size() > 0) { + throw new DdlException("There is an update operation in progress for the current table. " + + "Please try again later, or set enable_concurrent_update in fe.conf to true"); + } + UpdateStmtExecutor updateStmtExecutor = UpdateStmtExecutor.fromUpdateStmt(updateStmt); + if (currentUpdateList == null) { + currentUpdateList = Lists.newArrayList(); + tableIdToCurrentUpdate.put(updateStmtExecutor.getTargetTableId(), currentUpdateList); + } + currentUpdateList.add(updateStmtExecutor); + return updateStmtExecutor; + } finally { + writeUnlock(); + } + } + + private void removeUpdateExecutor(UpdateStmtExecutor updateStmtExecutor) { + writeLock(); + try { + List currentUpdateList = tableIdToCurrentUpdate.get(updateStmtExecutor.getTargetTableId()); + if (currentUpdateList == null) { + return; + } + currentUpdateList.remove(updateStmtExecutor); + } finally { + writeUnlock(); + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java new file mode 100644 index 00000000000000..0ffe906d2be630 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdatePlanner.java @@ -0,0 +1,191 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.update; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.BinaryPredicate; +import org.apache.doris.analysis.DescriptorTable; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.NullLiteral; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.IdGenerator; +import org.apache.doris.common.UserException; +import org.apache.doris.planner.DataPartition; +import org.apache.doris.planner.OlapScanNode; +import org.apache.doris.planner.OlapTableSink; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.PlanFragmentId; +import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.planner.Planner; +import org.apache.doris.planner.ScanNode; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +import java.util.List; +import java.util.Map; + +import static org.apache.doris.alter.SchemaChangeHandler.SHADOW_NAME_PRFIX; + + +public class UpdatePlanner extends Planner { + + private final IdGenerator nodeIdGenerator_ = PlanNodeId.createGenerator(); + private final IdGenerator fragmentIdGenerator_ = + PlanFragmentId.createGenerator(); + + private long targetDBId; + private OlapTable targetTable; + private List setExprs; + private TupleDescriptor srcTupleDesc; + private Analyzer analyzer; + + private List scanNodeList = Lists.newArrayList(); + + public UpdatePlanner(long dbId, OlapTable targetTable, List setExprs, + TupleDescriptor srcTupleDesc, Analyzer analyzer) { + this.targetDBId = dbId; + this.targetTable = targetTable; + this.setExprs = setExprs; + this.srcTupleDesc = srcTupleDesc; + this.analyzer = analyzer; + } + + @Override + public List getScanNodes() { + return scanNodeList; + } + + public void plan(long txnId) throws UserException { + // 1. gen scan node + OlapScanNode olapScanNode = new OlapScanNode(nodeIdGenerator_.getNextId(), srcTupleDesc, "OlapScanNode"); + /* BEGIN: Temporary code, this part of the code needs to be refactored */ + olapScanNode.closePreAggregation("This an update operation"); + olapScanNode.useBaseIndexId(); + /* END */ + olapScanNode.init(analyzer); + olapScanNode.finalize(analyzer); + scanNodeList.add(olapScanNode); + // 2. gen olap table sink + OlapTableSink olapTableSink = new OlapTableSink(targetTable, computeTargetTupleDesc(), null); + olapTableSink.init(analyzer.getContext().queryId(), txnId, targetDBId, + analyzer.getContext().getSessionVariable().queryTimeoutS); + olapTableSink.complete(); + // 3. gen plan fragment + PlanFragment planFragment = new PlanFragment(fragmentIdGenerator_.getNextId(), olapScanNode, + DataPartition.RANDOM); + planFragment.setSink(olapTableSink); + planFragment.setOutputExprs(computeOutputExprs()); + planFragment.finalize(analyzer, false); + fragments.add(planFragment); + } + + private TupleDescriptor computeTargetTupleDesc() { + DescriptorTable descTable = analyzer.getDescTbl(); + TupleDescriptor targetTupleDesc = descTable.createTupleDescriptor(); + for (Column col : targetTable.getFullSchema()) { + SlotDescriptor slotDesc = descTable.addSlotDescriptor(targetTupleDesc); + slotDesc.setIsMaterialized(true); + slotDesc.setType(col.getType()); + slotDesc.setColumn(col); + if (col.isAllowNull()) { + slotDesc.setIsNullable(true); + } else { + slotDesc.setIsNullable(false); + } + } + targetTupleDesc.computeStatAndMemLayout(); + return targetTupleDesc; + } + + /** + * There are three Rules of output exprs: + * RuleA: columns that need to be updated, + * use the right child of a set expr + * base column: (k1, v1) + * update stmt: set v1=1 + * output expr: k1, 1(use 1 as output expr) + * RuleB: columns that do not need to be updated, + * just add the original value of column -> slot ref + * base column: (k1, v1) + * update stmt: set v1 = 1 + * output expr: k1(use k1 slot ref as output expr), 1 + * RuleC: the output columns is being added by the schema change job, + * need to add default value expr in output expr + * base column: (k1, v1) + * schema change job: add v2 column + * full column: (k1, v1, v2) + * output expr: k1, v1, default_value(v2) + */ + private List computeOutputExprs() throws AnalysisException { + Map columnNameToSetExpr = Maps.newHashMap(); + for (Expr setExpr : setExprs) { + Preconditions.checkState(setExpr instanceof BinaryPredicate); + Preconditions.checkState(setExpr.getChild(0) instanceof SlotRef); + SlotRef slotRef = (SlotRef) setExpr.getChild(0); + // pay attention to case ignore of column name + columnNameToSetExpr.put(slotRef.getColumnName().toLowerCase(), setExpr.getChild(1)); + } + Map columnNameToSrcSlotDesc = Maps.newHashMap(); + for (SlotDescriptor srcSlotDesc : srcTupleDesc.getSlots()) { + // pay attention to case ignore of column name + columnNameToSrcSlotDesc.put(srcSlotDesc.getColumn().getName().toLowerCase(), srcSlotDesc); + } + + // compute output expr + List outputExprs = Lists.newArrayList(); + for (int i = 0; i < targetTable.getFullSchema().size(); i++) { + Column column = targetTable.getFullSchema().get(i); + // pay attention to case ignore of column name + String originColumnName = (column.getName().startsWith(SHADOW_NAME_PRFIX) ? + column.getName().substring(SHADOW_NAME_PRFIX.length()) : column.getName()) + .toLowerCase(); + Expr setExpr = columnNameToSetExpr.get(originColumnName); + SlotDescriptor srcSlotDesc = columnNameToSrcSlotDesc.get(originColumnName); + if (setExpr != null) { + // RuleA + outputExprs.add(setExpr); + } else if (srcSlotDesc != null) { + // RuleB + SlotRef slotRef = new SlotRef(srcSlotDesc); + outputExprs.add(slotRef); + } else { + // RuleC + Expr defaultExpr; + if (column.getDefaultValue() != null) { + defaultExpr = column.getDefaultValueExpr(); + } else { + if (column.isAllowNull()) { + defaultExpr = NullLiteral.create(column.getType()); + } else { + throw new AnalysisException("column has no source field, column=" + column.getName()); + } + } + defaultExpr.analyze(analyzer); + outputExprs.add(defaultExpr); + } + } + return outputExprs; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdateStmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdateStmtExecutor.java new file mode 100644 index 00000000000000..1e8988d7b91319 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/load/update/UpdateStmtExecutor.java @@ -0,0 +1,256 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.update; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.UpdateStmt; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.DuplicatedRequestException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.LabelAlreadyUsedException; +import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.QuotaExceedException; +import org.apache.doris.common.UserException; +import org.apache.doris.common.util.DebugUtil; +import org.apache.doris.common.util.TimeUtils; +import org.apache.doris.metric.MetricRepo; +import org.apache.doris.qe.Coordinator; +import org.apache.doris.qe.QeProcessorImpl; +import org.apache.doris.service.FrontendOptions; +import org.apache.doris.task.LoadEtlTask; +import org.apache.doris.thrift.TQueryType; +import org.apache.doris.thrift.TUniqueId; +import org.apache.doris.transaction.BeginTransactionException; +import org.apache.doris.transaction.GlobalTransactionMgr; +import org.apache.doris.transaction.TabletCommitInfo; +import org.apache.doris.transaction.TransactionCommitFailedException; +import org.apache.doris.transaction.TransactionState.LoadJobSourceType; +import org.apache.doris.transaction.TransactionState.TxnCoordinator; +import org.apache.doris.transaction.TransactionState.TxnSourceType; +import org.apache.doris.transaction.TransactionStatus; + +import com.google.common.collect.Lists; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; + +public class UpdateStmtExecutor { + private static final Logger LOG = LogManager.getLogger(UpdateStmtExecutor.class); + + private OlapTable targetTable; + private Expr whereExpr; + private List setExprs; + private long dbId; + private TUniqueId queryId; + private int timeoutSecond; + private Analyzer analyzer; + private UpdatePlanner updatePlanner; + + private String label; + private long txnId; + private Coordinator coordinator; + private long effectRows; + + + public long getTargetTableId() { + return targetTable.getId(); + } + + public void execute() throws UserException { + // 0. empty set + // A where clause with a constant equal to false will not execute the update directly + // Example: update xxx set v1=0 where 1=2 + if (analyzer.hasEmptyResultSet()) { + QeProcessorImpl.INSTANCE.unregisterQuery(queryId); + analyzer.getContext().getState().setOk(); + return; + } + + // 1. begin txn + beginTxn(); + + // 2. plan + targetTable.readLock(); + try { + updatePlanner.plan(txnId); + } catch (Throwable e) { + LOG.warn("failed to plan update stmt, query id:{}", DebugUtil.printId(queryId), e); + Catalog.getCurrentGlobalTransactionMgr().abortTransaction(dbId, txnId, e.getMessage()); + QeProcessorImpl.INSTANCE.unregisterQuery(queryId); + throw new DdlException("failed to execute update stmt, query id:" + DebugUtil.printId(queryId), e); + } finally { + targetTable.readUnlock(); + } + + // 3. execute plan + try { + executePlan(); + } catch (DdlException e) { + LOG.warn("failed to execute update stmt, query id:{}", DebugUtil.printId(queryId), e); + Catalog.getCurrentGlobalTransactionMgr().abortTransaction(dbId, txnId, e.getMessage()); + throw e; + } catch (Throwable e) { + LOG.warn("failed to execute update stmt, query id:{}", DebugUtil.printId(queryId), e); + Catalog.getCurrentGlobalTransactionMgr().abortTransaction(dbId, txnId, e.getMessage()); + throw new DdlException("failed to execute update stmt, query id:" + DebugUtil.printId(queryId), e); + } finally { + QeProcessorImpl.INSTANCE.unregisterQuery(queryId); + } + + // 4. commit and publish + commitAndPublishTxn(); + } + + private void beginTxn() throws LabelAlreadyUsedException, AnalysisException, BeginTransactionException, + DuplicatedRequestException, QuotaExceedException, MetaNotFoundException { + LOG.info("begin transaction for update stmt, query id:{}", DebugUtil.printId(queryId)); + MetricRepo.COUNTER_LOAD_ADD.increase(1L); + label = "update_" + DebugUtil.printId(queryId); + txnId = Catalog.getCurrentGlobalTransactionMgr() + .beginTransaction(dbId, Lists.newArrayList(targetTable.getId()), label, + new TxnCoordinator(TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), + LoadJobSourceType.INSERT_STREAMING, timeoutSecond); + } + + // TODO(ML): Abstract the logic of executing the coordinater and retrying. + // It makes stmt such as insert, load, update and export can be reused + private void executePlan() throws Exception { + LOG.info("begin execute update stmt, query id:{}", DebugUtil.printId(queryId)); + coordinator = new Coordinator(Catalog.getCurrentCatalog().getNextId(), queryId, analyzer.getDescTbl(), + updatePlanner.getFragments(), updatePlanner.getScanNodes(), TimeUtils.DEFAULT_TIME_ZONE); + coordinator.setQueryType(TQueryType.LOAD); + QeProcessorImpl.INSTANCE.registerQuery(queryId, coordinator); + analyzer.getContext().getExecutor().setCoord(coordinator); + + // execute + coordinator.setTimeout(timeoutSecond); + coordinator.exec(); + if (coordinator.join(timeoutSecond)) { + if (!coordinator.isDone()) { + coordinator.cancel(); + ErrorReport.reportDdlException(ErrorCode.ERR_EXECUTE_TIMEOUT); + } + if (!coordinator.getExecStatus().ok()) { + String errMsg = "update failed: " + coordinator.getExecStatus().getErrorMsg(); + LOG.warn(errMsg); + throw new DdlException(errMsg); + } + LOG.info("finish to execute update stmt, query id:{}", DebugUtil.printId(queryId)); + } else { + String errMsg = "coordinator could not finished before update timeout: " + + coordinator.getExecStatus().getErrorMsg(); + LOG.warn(errMsg); + throw new DdlException(errMsg); + } + + // counter + if (coordinator.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL) != null) { + effectRows = Long.valueOf(coordinator.getLoadCounters().get(LoadEtlTask.DPP_NORMAL_ALL)); + if (Long.valueOf(coordinator.getLoadCounters().get(LoadEtlTask.DPP_ABNORMAL_ALL)) != 0) { + throw new DdlException("update failed, some rows did not take effect"); + } + } + } + + private void commitAndPublishTxn() throws UserException { + GlobalTransactionMgr globalTransactionMgr = Catalog.getCurrentGlobalTransactionMgr(); + // situation1: no data is updated, abort transaction + if (effectRows == 0) { + LOG.info("abort transaction for update stmt, query id:{}, reason: {}", DebugUtil.printId(queryId), + TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG); + globalTransactionMgr.abortTransaction(dbId, txnId, TransactionCommitFailedException.NO_DATA_TO_LOAD_MSG); + StringBuilder sb = new StringBuilder(); + sb.append("{'label':'").append(label); + sb.append(", 'txnId':'").append(txnId).append("'"); + sb.append(", 'queryId':'").append(DebugUtil.printId(queryId)).append("'"); + sb.append("}"); + analyzer.getContext().getState().setOk(effectRows, 0, sb.toString()); + return; + } + TransactionStatus txnStatus; + // situation2: data is updated, commit and publish transaction + boolean isPublished; + try { + LOG.info("commit and publish transaction for update stmt, query id: {}", DebugUtil.printId(queryId)); + isPublished = globalTransactionMgr.commitAndPublishTransaction(Catalog.getCurrentCatalog().getDb(dbId), + Lists.newArrayList(targetTable), txnId, + TabletCommitInfo.fromThrift(coordinator.getCommitInfos()), + analyzer.getContext().getSessionVariable().getInsertVisibleTimeoutMs()); + } catch (Throwable e) { + // situation2.1: publish error, throw exception + String errMsg = "failed to commit and publish transaction for update stmt, query id:" + + DebugUtil.printId(queryId); + LOG.warn(errMsg, e); + globalTransactionMgr.abortTransaction(dbId, txnId, e.getMessage()); + throw new DdlException(errMsg, e); + } + String errMsg = null; + if (isPublished) { + // situation2.2: publish successful + txnStatus = TransactionStatus.VISIBLE; + MetricRepo.COUNTER_LOAD_FINISHED.increase(1L); + } else { + // situation2.3: be published later + txnStatus = TransactionStatus.COMMITTED; + errMsg = "transaction will be published later, data will be visible later"; + LOG.warn("transaction will be published later, query id: {}", DebugUtil.printId(queryId)); + } + + // set context + StringBuilder sb = new StringBuilder(); + sb.append("{'label':'").append(label).append("', 'status':'").append(txnStatus.name()).append("'"); + sb.append(", 'txnId':'").append(txnId).append("'"); + sb.append(", 'queryId':'").append(DebugUtil.printId(queryId)).append("'"); + if (errMsg != null) { + sb.append(", 'err':'").append(errMsg).append("'"); + } + sb.append("}"); + analyzer.getContext().getState().setOk(effectRows, 0, sb.toString()); + } + + public static UpdateStmtExecutor fromUpdateStmt(UpdateStmt updateStmt) throws AnalysisException { + UpdateStmtExecutor updateStmtExecutor = new UpdateStmtExecutor(); + updateStmtExecutor.targetTable = (OlapTable) updateStmt.getTargetTable(); + updateStmtExecutor.whereExpr = updateStmt.getWhereExpr(); + updateStmtExecutor.setExprs = updateStmt.getSetExprs(); + Database database = Catalog.getCurrentCatalog().getDb(updateStmt.getTableName().getDb()); + if (database == null) { + String errMsg = "Database does not exists in update stmt, db:" + updateStmt.getTableName().getDb(); + LOG.info(errMsg); + throw new AnalysisException(errMsg); + } + updateStmtExecutor.dbId = database.getId(); + updateStmtExecutor.analyzer = updateStmt.getAnalyzer(); + updateStmtExecutor.queryId = updateStmtExecutor.analyzer.getContext().queryId(); + updateStmtExecutor.timeoutSecond = updateStmtExecutor.analyzer.getContext() + .getSessionVariable().getQueryTimeoutS(); + updateStmtExecutor.updatePlanner = new UpdatePlanner(updateStmtExecutor.dbId, updateStmtExecutor.targetTable, + updateStmt.getSetExprs(), updateStmt.getSrcTupleDesc(), + updateStmt.getAnalyzer()); + return updateStmtExecutor; + } + +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java index e8c2b71e3d157b..2ed1a4bde80859 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/EsScanNode.java @@ -80,7 +80,7 @@ public EsScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) { @Override public void init(Analyzer analyzer) throws UserException { super.init(analyzer); - + computeColumnFilter(); assignBackends(); computeStats(analyzer); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index a69c08d5143210..41f1de31378abb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -162,6 +162,11 @@ public void setCanTurnOnPreAggr(boolean canChangePreAggr) { this.canTurnOnPreAggr = canChangePreAggr; } + public void closePreAggregation(String reason) { + setIsPreAggregation(false, reason); + setCanTurnOnPreAggr(false); + } + public boolean getForceOpenPreAgg() { return forceOpenPreAgg; } @@ -174,6 +179,20 @@ public Collection getSelectedPartitionIds() { return selectedPartitionIds; } + /** + * The function is used to directly select the index id of the base table as the selectedIndexId. + * It makes sure that the olap scan node must scan the base data rather than scan the materialized view data. + * + * This function is mainly used to update stmt. + * Update stmt also needs to scan data like normal queries. + * But its syntax is different from ordinary queries, + * so planner cannot use the logic of query to automatically match the best index id. + * So, here it need to manually specify the index id to scan the base table directly. + */ + public void useBaseIndexId() { + this.selectedIndexId = olapTable.getBaseIndexId(); + } + /** * This method is mainly used to update scan range info in OlapScanNode by the new materialized selector. * Situation1: @@ -295,6 +314,7 @@ public void init(Analyzer analyzer) throws UserException { super.init(analyzer); filterDeletedRows(analyzer); + computeColumnFilter(); computePartitionInfo(); computeTupleState(analyzer); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index 8ca0cca730a2a8..32683743b753bb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -67,7 +67,6 @@ import com.google.common.collect.Multimap; import com.google.common.collect.Range; -import org.apache.commons.collections.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -82,7 +81,7 @@ public class OlapTableSink extends DataSink { // input variables private OlapTable dstTable; private TupleDescriptor tupleDescriptor; - // specified partition ids. this list should not be empty and should contains all related partition ids + // specified partition ids. private List partitionIds; // set after init called @@ -91,8 +90,6 @@ public class OlapTableSink extends DataSink { public OlapTableSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, List partitionIds) { this.dstTable = dstTable; this.tupleDescriptor = tupleDescriptor; - Preconditions.checkState(!CollectionUtils.isEmpty(partitionIds), - "The specified partition ids is empty."); this.partitionIds = partitionIds; } @@ -106,6 +103,12 @@ public void init(TUniqueId loadId, long txnId, long dbId, long loadChannelTimeou tDataSink.setType(TDataSinkType.OLAP_TABLE_SINK); tDataSink.setOlapTableSink(tSink); + if (partitionIds == null) { + partitionIds = dstTable.getPartitionIds(); + if (partitionIds.isEmpty()) { + ErrorReport.reportAnalysisException(ErrorCode.ERR_EMPTY_PARTITION_IN_TABLE, dstTable.getName()); + } + } for (Long partitionId : partitionIds) { Partition part = dstTable.getPartition(partitionId); if (part == null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index dc53c44a81cdf1..445a9aaff89e21 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -293,6 +293,13 @@ public void addConjuncts(List conjuncts) { this.conjuncts.addAll(conjuncts); } + public void addConjunct(Expr conjunct) { + if (conjuncts == null) { + conjuncts = Lists.newArrayList(); + } + conjuncts.add(conjunct); + } + public void setAssignedConjuncts(Set conjuncts) { assignedConjuncts = conjuncts; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java index 86b2fd2abdeb57..bb2efa4e7d1eb1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/Planner.java @@ -56,7 +56,7 @@ public class Planner { private boolean isBlockQuery = false; - private ArrayList fragments = Lists.newArrayList(); + protected ArrayList fragments = Lists.newArrayList(); private PlannerContext plannerContext; private SingleNodePlanner singleNodePlanner; diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PredicatePushDown.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PredicatePushDown.java new file mode 100644 index 00000000000000..28abd58f528ba2 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PredicatePushDown.java @@ -0,0 +1,122 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.BinaryPredicate; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.InPredicate; +import org.apache.doris.analysis.JoinOperator; +import org.apache.doris.analysis.Predicate; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.TupleId; + +import org.apache.directory.api.util.Strings; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.List; + +public class PredicatePushDown { + private final static Logger LOG = LogManager.getLogger(PredicatePushDown.class); + + public static PlanNode visitScanNode(ScanNode scanNode, JoinOperator joinOp, Analyzer analyzer) { + switch (joinOp) { + case INNER_JOIN: + case LEFT_OUTER_JOIN: + predicateFromLeftSidePropagatesToRightSide(scanNode, analyzer); + break; + // TODO + default: + break; + } + return scanNode; + } + + private static void predicateFromLeftSidePropagatesToRightSide(ScanNode scanNode, Analyzer analyzer) { + List tupleIdList = scanNode.getTupleIds(); + if (tupleIdList.size() != 1) { + LOG.info("The predicate pushdown is not reflected " + + "because the scan node involves more then one tuple:{}", + Strings.listToString(tupleIdList)); + return; + } + TupleId rightSideTuple = tupleIdList.get(0); + List unassignedRightSideConjuncts = analyzer.getUnassignedConjuncts(scanNode); + List eqJoinPredicates = analyzer.getEqJoinConjuncts(rightSideTuple); + if (eqJoinPredicates != null) { + List allConjuncts = analyzer.getConjuncts(analyzer.getAllTupleIds()); + allConjuncts.removeAll(unassignedRightSideConjuncts); + for (Expr conjunct : allConjuncts) { + if (!Predicate.canPushDownPredicate(conjunct)) { + continue; + } + for (Expr eqJoinPredicate : eqJoinPredicates) { + // we can ensure slot is left node, because NormalizeBinaryPredicatesRule + SlotRef otherSlot = conjunct.getChild(0).unwrapSlotRef(); + + // ensure the children for eqJoinPredicate both be SlotRef + if (eqJoinPredicate.getChild(0).unwrapSlotRef() == null + || eqJoinPredicate.getChild(1).unwrapSlotRef() == null) { + continue; + } + + SlotRef leftSlot = eqJoinPredicate.getChild(0).unwrapSlotRef(); + SlotRef rightSlot = eqJoinPredicate.getChild(1).unwrapSlotRef(); + // ensure the type is match + if (!leftSlot.getDesc().getType().matchesType(rightSlot.getDesc().getType())) { + continue; + } + + // example: t1.id = t2.id and t1.id = 1 => t2.id =1 + if (otherSlot.isBound(leftSlot.getSlotId()) + && rightSlot.isBound(rightSideTuple)) { + Expr pushDownConjunct = rewritePredicate(analyzer, conjunct, rightSlot); + LOG.debug("pushDownConjunct: {}", pushDownConjunct); + scanNode.addConjunct(pushDownConjunct); + } else if (otherSlot.isBound(rightSlot.getSlotId()) + && leftSlot.isBound(rightSideTuple)) { + Expr pushDownConjunct = rewritePredicate(analyzer, conjunct, leftSlot); + LOG.debug("pushDownConjunct: {}", pushDownConjunct); + scanNode.addConjunct(pushDownConjunct); + } + } + } + } + } + + // Rewrite the oldPredicate with new leftChild + // For example: oldPredicate is t1.id = 1, leftChild is t2.id, will return t2.id = 1 + private static Expr rewritePredicate(Analyzer analyzer, Expr oldPredicate, Expr leftChild) { + if (oldPredicate instanceof BinaryPredicate) { + BinaryPredicate oldBP = (BinaryPredicate) oldPredicate; + BinaryPredicate bp = new BinaryPredicate(oldBP.getOp(), leftChild, oldBP.getChild(1)); + bp.analyzeNoThrow(analyzer); + return bp; + } + + if (oldPredicate instanceof InPredicate) { + InPredicate oldIP = (InPredicate) oldPredicate; + InPredicate ip = new InPredicate(leftChild, oldIP.getListChildren(), oldIP.isNotIn()); + ip.analyzeNoThrow(analyzer); + return ip; + } + + return oldPredicate; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index 649d1c5f5347f5..a8bce22623aa16 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -18,15 +18,26 @@ package org.apache.doris.planner; import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.BinaryPredicate; import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.InPredicate; +import org.apache.doris.analysis.IsNullPredicate; +import org.apache.doris.analysis.LiteralExpr; +import org.apache.doris.analysis.NullLiteral; import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Column; import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.common.UserException; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TScanRangeLocations; import com.google.common.base.MoreObjects; +import com.google.common.collect.Maps; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; import java.util.List; import java.util.Map; @@ -35,8 +46,9 @@ * Representation of the common elements of all scan nodes. */ abstract public class ScanNode extends PlanNode { + private final static Logger LOG = LogManager.getLogger(ScanNode.class); protected final TupleDescriptor desc; - protected Map columnFilters; + protected Map columnFilters = Maps.newHashMap(); protected String sortColumn = null; public ScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName) { @@ -65,10 +77,6 @@ protected static TNetworkAddress addressToTNetworkAddress(String address) { public TupleDescriptor getTupleDesc() { return desc; } - public void setColumnFilters(Map columnFilters) { - this.columnFilters = columnFilters; - } - public void setSortColumn(String column) { sortColumn = column; } @@ -96,6 +104,100 @@ protected Expr castToSlot(SlotDescriptor slotDesc, Expr expr) throws UserExcepti */ abstract public List getScanRangeLocations(long maxScanRangeLength); + // TODO(ML): move it into PrunerOptimizer + public void computeColumnFilter() { + for (Column column : desc.getTable().getBaseSchema()) { + SlotDescriptor slotDesc = desc.getColumnSlot(column.getName()); + if (null == slotDesc) { + continue; + } + PartitionColumnFilter keyFilter = createPartitionFilter(slotDesc, conjuncts); + if (null != keyFilter) { + columnFilters.put(column.getName(), keyFilter); + } + } + } + + private PartitionColumnFilter createPartitionFilter(SlotDescriptor desc, List conjuncts) { + PartitionColumnFilter partitionColumnFilter = null; + for (Expr expr : conjuncts) { + if (!expr.isBound(desc.getId())) { + continue; + } + if (expr instanceof BinaryPredicate) { + BinaryPredicate binPredicate = (BinaryPredicate) expr; + Expr slotBinding = binPredicate.getSlotBinding(desc.getId()); + if (slotBinding == null || !slotBinding.isConstant()) { + continue; + } + if (binPredicate.getOp() == BinaryPredicate.Operator.NE + || !(slotBinding instanceof LiteralExpr)) { + continue; + } + + if (null == partitionColumnFilter) { + partitionColumnFilter = new PartitionColumnFilter(); + } + LiteralExpr literal = (LiteralExpr) slotBinding; + BinaryPredicate.Operator op = binPredicate.getOp(); + if (!binPredicate.slotIsLeft()) { + op = op.commutative(); + } + switch (op) { + case EQ: + partitionColumnFilter.setLowerBound(literal, true); + partitionColumnFilter.setUpperBound(literal, true); + break; + case LE: + partitionColumnFilter.setUpperBound(literal, true); + partitionColumnFilter.lowerBoundInclusive = true; + break; + case LT: + partitionColumnFilter.setUpperBound(literal, false); + partitionColumnFilter.lowerBoundInclusive = true; + break; + case GE: + partitionColumnFilter.setLowerBound(literal, true); + break; + case GT: + partitionColumnFilter.setLowerBound(literal, false); + break; + default: + break; + } + } else if (expr instanceof InPredicate) { + InPredicate inPredicate = (InPredicate) expr; + if (!inPredicate.isLiteralChildren() || inPredicate.isNotIn()) { + continue; + } + if (!(inPredicate.getChild(0).unwrapExpr(false) instanceof SlotRef)) { + // If child(0) of the in predicate is not a SlotRef, + // then other children of in predicate should not be used as a condition for partition prune. + continue; + } + if (null == partitionColumnFilter) { + partitionColumnFilter = new PartitionColumnFilter(); + } + partitionColumnFilter.setInPredicate(inPredicate); + } else if (expr instanceof IsNullPredicate) { + IsNullPredicate isNullPredicate = (IsNullPredicate) expr; + if (!isNullPredicate.isSlotRefChildren() || isNullPredicate.isNotNull()) { + continue; + } + + // If we meet a IsNull predicate on partition column, then other predicates are useless + // eg: (xxxx) and (col is null), only the IsNull predicate has an effect on partition pruning. + partitionColumnFilter = new PartitionColumnFilter(); + NullLiteral nullLiteral = new NullLiteral(); + partitionColumnFilter.setLowerBound(nullLiteral, true); + partitionColumnFilter.setUpperBound(nullLiteral, true); + break; + } + } + LOG.debug("partitionColumnFilter: {}", partitionColumnFilter); + return partitionColumnFilter; + } + @Override public String toString() { return MoreObjects.toStringHelper(this).add("tid", desc.getId().asInt()).add("tblName", diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java index f11fbf7ef88198..801ba0a0b47e37 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java @@ -1228,7 +1228,6 @@ public static PartitionColumnFilter createPartitionFilter(SlotDescriptor desc, L || !(slotBinding instanceof LiteralExpr)) { continue; } - if (null == partitionColumnFilter) { partitionColumnFilter = new PartitionColumnFilter(); } @@ -1696,65 +1695,8 @@ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt s break; } if (scanNode instanceof OlapScanNode || scanNode instanceof EsScanNode) { - Map columnFilters = Maps.newHashMap(); - List conjuncts = analyzer.getUnassignedConjuncts(scanNode); - - // push down join predicate - List pushDownConjuncts = Lists.newArrayList(); - TupleId tupleId = tblRef.getId(); - List eqJoinPredicates = analyzer.getEqJoinConjuncts(tupleId); - if (eqJoinPredicates != null) { - // only inner and left outer join - if ((tblRef.getJoinOp().isInnerJoin() || tblRef.getJoinOp().isLeftOuterJoin())) { - List allConjuncts = analyzer.getConjuncts(analyzer.getAllTupleIds()); - allConjuncts.removeAll(conjuncts); - for (Expr conjunct : allConjuncts) { - if (org.apache.doris.analysis.Predicate.canPushDownPredicate(conjunct)) { - for (Expr eqJoinPredicate : eqJoinPredicates) { - // we can ensure slot is left node, because NormalizeBinaryPredicatesRule - SlotRef otherSlot = conjunct.getChild(0).unwrapSlotRef(); - - // ensure the children for eqJoinPredicate both be SlotRef - if (eqJoinPredicate.getChild(0).unwrapSlotRef() == null || eqJoinPredicate.getChild(1).unwrapSlotRef() == null) { - continue; - } - - SlotRef leftSlot = eqJoinPredicate.getChild(0).unwrapSlotRef(); - SlotRef rightSlot = eqJoinPredicate.getChild(1).unwrapSlotRef(); - - // ensure the type is match - if (!leftSlot.getDesc().getType().matchesType(rightSlot.getDesc().getType())) { - continue; - } - - // example: t1.id = t2.id and t1.id = 1 => t2.id =1 - if (otherSlot.isBound(leftSlot.getSlotId()) && rightSlot.isBound(tupleId)) { - pushDownConjuncts.add(rewritePredicate(analyzer, conjunct, rightSlot)); - } else if (otherSlot.isBound(rightSlot.getSlotId()) && leftSlot.isBound(tupleId)) { - pushDownConjuncts.add(rewritePredicate(analyzer, conjunct, leftSlot)); - } - } - } - } - } - - LOG.debug("pushDownConjuncts: {}", pushDownConjuncts); - conjuncts.addAll(pushDownConjuncts); - } - - for (Column column : tblRef.getTable().getBaseSchema()) { - SlotDescriptor slotDesc = tblRef.getDesc().getColumnSlot(column.getName()); - if (null == slotDesc) { - continue; - } - PartitionColumnFilter keyFilter = createPartitionFilter(slotDesc, conjuncts); - if (null != keyFilter) { - columnFilters.put(column.getName(), keyFilter); - } - } - scanNode.setColumnFilters(columnFilters); + PredicatePushDown.visitScanNode(scanNode, tblRef.getJoinOp(), analyzer); scanNode.setSortColumn(tblRef.getSortColumn()); - scanNode.addConjuncts(pushDownConjuncts); } scanNodes.add(scanNode); @@ -1767,26 +1709,6 @@ private PlanNode createScanNode(Analyzer analyzer, TableRef tblRef, SelectStmt s return scanNode; } - // Rewrite the oldPredicate with new leftChild - // For example: oldPredicate is t1.id = 1, leftChild is t2.id, will return t2.id = 1 - private Expr rewritePredicate(Analyzer analyzer, Expr oldPredicate, Expr leftChild) { - if (oldPredicate instanceof BinaryPredicate) { - BinaryPredicate oldBP = (BinaryPredicate) oldPredicate; - BinaryPredicate bp = new BinaryPredicate(oldBP.getOp(), leftChild, oldBP.getChild(1)); - bp.analyzeNoThrow(analyzer); - return bp; - } - - if (oldPredicate instanceof InPredicate) { - InPredicate oldIP = (InPredicate) oldPredicate; - InPredicate ip = new InPredicate(leftChild, oldIP.getListChildren(), oldIP.isNotIn()); - ip.analyzeNoThrow(analyzer); - return ip; - } - - return oldPredicate; - } - /** * Return join conjuncts that can be used for hash table lookups. - for inner joins, those are equi-join predicates * in which one side is fully bound by lhsIds and the other by rhs' id; - for outer joins: same type of conjuncts as diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java index 3377330b3a92fa..f2323346211bc4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java @@ -211,7 +211,7 @@ public TExecPlanFragmentParams plan(TUniqueId loadId) throws UserException { } // get all specified partition ids. - // if no partition specified, return all partitions + // if no partition specified, return null private List getAllPartitionIds() throws DdlException, AnalysisException { List partitionIds = Lists.newArrayList(); @@ -224,46 +224,37 @@ private List getAllPartitionIds() throws DdlException, AnalysisException { } partitionIds.add(part.getId()); } - } else { - List conjuncts = scanNode.getConjuncts(); - if (destTable.getPartitionInfo().getType() != PartitionType.UNPARTITIONED && !conjuncts.isEmpty()) { - PartitionInfo partitionInfo = destTable.getPartitionInfo(); - Map itemById = partitionInfo.getIdToItem(false); - Map columnFilters = Maps.newHashMap(); - for (Column column : partitionInfo.getPartitionColumns()) { - SlotDescriptor slotDesc = tupleDesc.getColumnSlot(column.getName()); - if (null == slotDesc) { - continue; - } - PartitionColumnFilter keyFilter = SingleNodePlanner.createPartitionFilter(slotDesc, conjuncts); - if (null != keyFilter) { - columnFilters.put(column.getName(), keyFilter); - } - } - if (columnFilters.isEmpty()) { - partitionIds.addAll(itemById.keySet()); - } else { - PartitionPruner partitionPruner = null; - if (destTable.getPartitionInfo().getType() == PartitionType.RANGE) { - partitionPruner = new RangePartitionPruner(itemById, - partitionInfo.getPartitionColumns(), columnFilters); - } else if (destTable.getPartitionInfo().getType() == PartitionType.LIST) { - partitionPruner = new ListPartitionPruner(itemById, - partitionInfo.getPartitionColumns(), columnFilters); - } - partitionIds.addAll(partitionPruner.prune()); + return partitionIds; + } + List conjuncts = scanNode.getConjuncts(); + if (destTable.getPartitionInfo().getType() != PartitionType.UNPARTITIONED && !conjuncts.isEmpty()) { + PartitionInfo partitionInfo = destTable.getPartitionInfo(); + Map itemById = partitionInfo.getIdToItem(false); + Map columnFilters = Maps.newHashMap(); + for (Column column : partitionInfo.getPartitionColumns()) { + SlotDescriptor slotDesc = tupleDesc.getColumnSlot(column.getName()); + if (null == slotDesc) { + continue; } - } else { - for (Partition partition : destTable.getPartitions()) { - partitionIds.add(partition.getId()); + PartitionColumnFilter keyFilter = SingleNodePlanner.createPartitionFilter(slotDesc, conjuncts); + if (null != keyFilter) { + columnFilters.put(column.getName(), keyFilter); } } - - if (partitionIds.isEmpty()) { - ErrorReport.reportDdlException(ErrorCode.ERR_EMPTY_PARTITION_IN_TABLE, destTable.getName()); + if (columnFilters.isEmpty()) { + return null; } + PartitionPruner partitionPruner = null; + if (destTable.getPartitionInfo().getType() == PartitionType.RANGE) { + partitionPruner = new RangePartitionPruner(itemById, + partitionInfo.getPartitionColumns(), columnFilters); + } else if (destTable.getPartitionInfo().getType() == PartitionType.LIST) { + partitionPruner = new ListPartitionPruner(itemById, + partitionInfo.getPartitionColumns(), columnFilters); + } + partitionIds.addAll(partitionPruner.prune()); + return partitionIds; } - - return partitionIds; + return null; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java index 8f1dad822cdcaf..0aeadd8751cfc2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectContext.java @@ -369,6 +369,10 @@ public void setExecutor(StmtExecutor executor) { this.executor = executor; } + public StmtExecutor getExecutor() { + return executor; + } + public void cleanup() { mysqlChannel.close(); threadLocalInfo.remove(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index 0f567ab39a67db..6ce8906033aad7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -482,6 +482,7 @@ public TMasterOpResult proxyExecute(TMasterOpRequest request) { // 0 for compatibility. int idx = request.isSetStmtIdx() ? request.getStmtIdx() : 0; executor = new StmtExecutor(ctx, new OriginStatement(request.getSql(), idx), true); + ctx.setExecutor(executor); TUniqueId queryId; // This query id will be set in ctx if (request.isSetQueryId()) { queryId = request.getQueryId(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java index 11375f420c5593..9cefa7d111d888 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java @@ -201,7 +201,6 @@ public class Coordinator { private TResourceInfo tResourceInfo; private boolean needReport; - private String clusterName; // parallel execute private final TUniqueId nextInstanceId; @@ -235,16 +234,15 @@ public Coordinator(ConnectContext context, Analyzer analyzer, Planner planner) { this.tResourceInfo = new TResourceInfo(context.getQualifiedUser(), context.getSessionVariable().getResourceGroup()); this.needReport = context.getSessionVariable().isReportSucc(); - this.clusterName = context.getClusterName(); this.nextInstanceId = new TUniqueId(); nextInstanceId.setHi(queryId.hi); nextInstanceId.setLo(queryId.lo + 1); this.assignedRuntimeFilters = analyzer.getAssignedRuntimeFilter(); } - // Used for broker load task/export task coordinator + // Used for broker load task/export task/update coordinator public Coordinator(Long jobId, TUniqueId queryId, DescriptorTable descTable, - List fragments, List scanNodes, String cluster, String timezone) { + List fragments, List scanNodes, String timezone) { this.isBlockQuery = true; this.jobId = jobId; this.queryId = queryId; @@ -257,7 +255,6 @@ public Coordinator(Long jobId, TUniqueId queryId, DescriptorTable descTable, this.queryGlobals.setTimeZone(timezone); this.tResourceInfo = new TResourceInfo("", ""); this.needReport = true; - this.clusterName = cluster; this.nextInstanceId = new TUniqueId(); nextInstanceId.setHi(queryId.hi); nextInstanceId.setLo(queryId.lo + 1); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java index e3b082a8556259..1045caf5ad67ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java @@ -78,6 +78,7 @@ import org.apache.doris.analysis.SyncStmt; import org.apache.doris.analysis.TruncateTableStmt; import org.apache.doris.analysis.UninstallPluginStmt; +import org.apache.doris.analysis.UpdateStmt; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.EncryptKeyHelper; import org.apache.doris.common.Config; @@ -163,6 +164,8 @@ public static void execute(Catalog catalog, DdlStmt ddlStmt) throws Exception { catalog.getRoutineLoadManager().stopRoutineLoadJob((StopRoutineLoadStmt) ddlStmt); } else if (ddlStmt instanceof AlterRoutineLoadStmt) { catalog.getRoutineLoadManager().alterRoutineLoadJob((AlterRoutineLoadStmt) ddlStmt); + } else if (ddlStmt instanceof UpdateStmt) { + catalog.getUpdateManager().handleUpdate((UpdateStmt) ddlStmt); } else if (ddlStmt instanceof DeleteStmt) { catalog.getDeleteHandler().process((DeleteStmt) ddlStmt); } else if (ddlStmt instanceof CreateUserStmt) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryState.java b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryState.java index f753e4a0b57217..cf0376ab4a1d05 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/QueryState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/QueryState.java @@ -70,6 +70,9 @@ public void setEof() { } public void setOk() { + if (stateType == MysqlStateType.OK) { + return; + } setOk(0, 0, null); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 5938712360512b..4cf5033d1cef56 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -181,6 +181,10 @@ public StmtExecutor(ConnectContext ctx, StatementBase parsedStmt) { this.isProxy = false; } + public void setCoord(Coordinator coord) { + this.coord = coord; + } + // At the end of query execution, we begin to add up profile private void initProfile(QueryPlannerProfile plannerProfile, boolean waiteBeReport) { long currentTimestamp = System.currentTimeMillis(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java index 28a77e3372f3ff..426d0beed5ab0e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java +++ b/fe/fe-core/src/main/java/org/apache/doris/transaction/TransactionState.java @@ -65,7 +65,7 @@ public int compare(TransactionState t1, TransactionState t2) { public enum LoadJobSourceType { FRONTEND(1), // old dpp load, mini load, insert stmt(not streaming type) use this type BACKEND_STREAMING(2), // streaming load use this type - INSERT_STREAMING(3), // insert stmt (streaming type) use this type + INSERT_STREAMING(3), // insert stmt (streaming type), update stmt use this type ROUTINE_LOAD_TASK(4), // routine load task use this type BATCH_LOAD_JOB(5); // load job v2 for broker load diff --git a/fe/fe-core/src/main/jflex/sql_scanner.flex b/fe/fe-core/src/main/jflex/sql_scanner.flex index 2fede3b0436081..a8c1d7ec3ccc84 100644 --- a/fe/fe-core/src/main/jflex/sql_scanner.flex +++ b/fe/fe-core/src/main/jflex/sql_scanner.flex @@ -382,6 +382,7 @@ import org.apache.doris.qe.SqlModeHelper; keywordMap.put("user", new Integer(SqlParserSymbols.KW_USER)); keywordMap.put("using", new Integer(SqlParserSymbols.KW_USING)); keywordMap.put("uninstall", new Integer(SqlParserSymbols.KW_UNINSTALL)); + keywordMap.put("update", new Integer(SqlParserSymbols.KW_UPDATE)); keywordMap.put("value", new Integer(SqlParserSymbols.KW_VALUE)); keywordMap.put("values", new Integer(SqlParserSymbols.KW_VALUES)); keywordMap.put("varchar", new Integer(SqlParserSymbols.KW_VARCHAR)); diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertStmtTest.java index cbd4b67112f3ec..095b882bd77582 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertStmtTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/InsertStmtTest.java @@ -32,11 +32,6 @@ import com.google.common.collect.Lists; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - import java.io.StringReader; import java.util.ArrayList; import java.util.List; @@ -44,6 +39,10 @@ import mockit.Expectations; import mockit.Injectable; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; public class InsertStmtTest { private static String runningDir = "fe/mocked/DemoTest/" + UUID.randomUUID().toString() + "/"; @@ -259,7 +258,9 @@ public void testInsertSelect() throws Exception { List slots = Lists.newArrayList(); expr4.collect(SlotRef.class, slots); Assert.assertEquals(1, slots.size()); - Assert.assertEquals(queryStmtSubstitue.getResultExprs().get(0), slots.get(0)); + Assert.assertTrue(queryStmtSubstitue.getResultExprs().get(0) instanceof CastExpr); + CastExpr resultExpr0 = (CastExpr) queryStmtSubstitue.getResultExprs().get(0); + Assert.assertEquals(resultExpr0.getChild(0), slots.get(0)); Assert.assertTrue(queryStmtSubstitue.getResultExprs().get(5) instanceof FunctionCallExpr); FunctionCallExpr expr5 = (FunctionCallExpr) queryStmtSubstitue.getResultExprs().get(5); diff --git a/fe/fe-core/src/test/java/org/apache/doris/analysis/UpdateStmtTest.java b/fe/fe-core/src/test/java/org/apache/doris/analysis/UpdateStmtTest.java new file mode 100644 index 00000000000000..06880215acf3f4 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/analysis/UpdateStmtTest.java @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.common.UserException; + +import java.util.List; + +import com.clearspring.analytics.util.Lists; +import mockit.Expectations; +import mockit.Injectable; +import org.junit.Assert; +import org.junit.Test; + +public class UpdateStmtTest { + + @Test + public void testAnalyze(@Injectable Analyzer analyzer) { + TableName tableName = new TableName("db", "table"); + IntLiteral intLiteral = new IntLiteral(1); + SlotRef slotRef = new SlotRef(tableName, "c1"); + BinaryPredicate binaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, intLiteral, slotRef); + List setExprs = Lists.newArrayList(); + setExprs.add(binaryPredicate); + + new Expectations() { + { + analyzer.getClusterName(); + result = "default"; + } + }; + UpdateStmt updateStmt = new UpdateStmt(tableName, Lists.newArrayList(setExprs), null); + try { + updateStmt.analyze(analyzer); + Assert.fail(); + } catch (UserException e) { + System.out.println(e.getMessage()); + } + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/common/ThreadPoolManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/common/ThreadPoolManagerTest.java index 70926302c4ee17..5298042f56f884 100755 --- a/fe/fe-core/src/test/java/org/apache/doris/common/ThreadPoolManagerTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/common/ThreadPoolManagerTest.java @@ -81,7 +81,5 @@ public void testNormal() throws InterruptedException { Assert.assertEquals(0, testFixedThreaddPool.getActiveCount()); Assert.assertEquals(0, testFixedThreaddPool.getQueue().size()); Assert.assertEquals(4, testFixedThreaddPool.getCompletedTaskCount()); - - } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/update/UpdateManagerTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/update/UpdateManagerTest.java new file mode 100644 index 00000000000000..d75f7c1b46e94a --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/load/update/UpdateManagerTest.java @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.update; + +import org.apache.doris.analysis.UpdateStmt; +import org.apache.doris.common.Config; +import org.apache.doris.common.DdlException; +import org.apache.doris.common.jmockit.Deencapsulation; + +import com.google.common.collect.Maps; + +import java.util.List; +import java.util.Map; + +import com.clearspring.analytics.util.Lists; +import mockit.Expectations; +import mockit.Injectable; +import org.junit.Assert; +import org.junit.Test; + +public class UpdateManagerTest { + + @Test + public void testDisableConcurrentUpdate(@Injectable UpdateStmt updateStmt, + @Injectable UpdateStmtExecutor updateStmtExecutor) { + Config.enable_concurrent_update = false; + Map> tableIdToCurrentUpdate = Maps.newConcurrentMap(); + List currentUpdate = Lists.newArrayList(); + currentUpdate.add(updateStmtExecutor); + tableIdToCurrentUpdate.put(new Long(1), currentUpdate); + UpdateManager updateManager = new UpdateManager(); + Assert.assertFalse(Deencapsulation.getField(updateManager, "enableConcurrentUpdate")); + Deencapsulation.setField(updateManager, "tableIdToCurrentUpdate", tableIdToCurrentUpdate); + new Expectations() { + { + updateStmt.getTargetTable().getId(); + result = 1; + } + }; + + try { + Deencapsulation.invoke(updateManager, "addUpdateExecutor", updateStmt); + Assert.fail(); + } catch (Exception e) { + if (e instanceof DdlException) { + System.out.println(e.getMessage()); + } else { + throw e; + } + } + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/update/UpdateStmtExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/update/UpdateStmtExecutorTest.java new file mode 100644 index 00000000000000..c70ddb5fe947cf --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/load/update/UpdateStmtExecutorTest.java @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.load.update; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.BinaryPredicate; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.IntLiteral; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.TableName; +import org.apache.doris.analysis.UpdateStmt; +import org.apache.doris.catalog.Catalog; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.jmockit.Deencapsulation; +import org.apache.doris.thrift.TUniqueId; +import org.apache.doris.transaction.GlobalTransactionMgr; + +import java.util.List; + +import com.clearspring.analytics.util.Lists; +import mockit.Expectations; +import mockit.Injectable; +import mockit.Mocked; +import org.junit.Assert; +import org.junit.Test; + +public class UpdateStmtExecutorTest { + + @Test + public void testCommitAndPublishTxn(@Injectable Analyzer analyzer, + @Mocked GlobalTransactionMgr globalTransactionMgr) { + UpdateStmtExecutor updateStmtExecutor = new UpdateStmtExecutor(); + Deencapsulation.setField(updateStmtExecutor, "effectRows", 0); + Deencapsulation.setField(updateStmtExecutor, "analyzer", analyzer); + Deencapsulation.invoke(updateStmtExecutor, "commitAndPublishTxn"); + } + + @Test + public void testFromUpdateStmt(@Injectable OlapTable olapTable, + @Mocked Catalog catalog, + @Injectable Database db, + @Injectable Analyzer analyzer) throws AnalysisException { + TableName tableName = new TableName("db", "test"); + List setExprs = Lists.newArrayList(); + SlotRef slotRef = new SlotRef(tableName, "v1"); + IntLiteral intLiteral = new IntLiteral(1); + BinaryPredicate binaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, + slotRef, intLiteral); + setExprs.add(binaryPredicate); + SlotRef keySlotRef = new SlotRef(tableName, "k1"); + Expr whereExpr = new BinaryPredicate(BinaryPredicate.Operator.EQ, keySlotRef, intLiteral); + UpdateStmt updateStmt = new UpdateStmt(tableName, setExprs, whereExpr); + Deencapsulation.setField(updateStmt, "targetTable", olapTable); + Deencapsulation.setField(updateStmt, "analyzer", analyzer); + new Expectations() { + { + catalog.getDb("db"); + result = db; + db.getId(); + result = 1; + analyzer.getContext().queryId(); + result = new TUniqueId(1, 2); + analyzer.getContext().getSessionVariable().getQueryTimeoutS(); + result = 1000; + olapTable.getId(); + result = 2; + } + }; + UpdateStmtExecutor executor = UpdateStmtExecutor.fromUpdateStmt(updateStmt); + Assert.assertEquals(new Long(2), new Long(executor.getTargetTableId())); + Assert.assertEquals(whereExpr, Deencapsulation.getField(executor, "whereExpr")); + Assert.assertEquals(setExprs, Deencapsulation.getField(executor, "setExprs")); + Assert.assertEquals(new Long(1), Deencapsulation.getField(executor, "dbId")); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java index d6535a38b4fdcf..418ce909ed3784 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryPlanTest.java @@ -41,21 +41,21 @@ import org.apache.doris.load.EtlJobType; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.QueryState.MysqlStateType; -import org.apache.doris.thrift.TRuntimeFilterMode; import org.apache.doris.utframe.UtFrameUtils; import com.google.common.collect.Lists; import org.apache.commons.lang3.StringUtils; -import org.junit.AfterClass; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; import java.io.File; import java.util.List; import java.util.UUID; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + public class QueryPlanTest { // use a unique dir so that it won't be conflict with other unit test which // may also start a Mocked Frontend @@ -446,7 +446,7 @@ public void testBitmapInsertInto() throws Exception { queryStr = "explain insert into test.bitmap_table select id, id from test.bitmap_table_2;"; String errorMsg = UtFrameUtils.getSQLPlanOrErrorMsg(connectContext, queryStr); - Assert.assertTrue(errorMsg.contains("bitmap column id2 require the function return type is BITMAP")); + Assert.assertTrue(errorMsg.contains("bitmap column require the function return type is BITMAP")); } private static void testBitmapQueryPlan(String sql, String result) throws Exception { diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/UpdatePlannerTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/UpdatePlannerTest.java new file mode 100644 index 00000000000000..d1e0e3c9218881 --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/UpdatePlannerTest.java @@ -0,0 +1,185 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.BinaryPredicate; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.IntLiteral; +import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SlotId; +import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.TableName; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.analysis.TupleId; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.IdGenerator; +import org.apache.doris.common.jmockit.Deencapsulation; +import org.apache.doris.load.update.UpdatePlanner; + +import java.util.List; + +import com.clearspring.analytics.util.Lists; +import mockit.Expectations; +import mockit.Injectable; +import org.junit.Assert; +import org.junit.Test; + +import static org.apache.doris.alter.SchemaChangeHandler.SHADOW_NAME_PRFIX; + +public class UpdatePlannerTest { + + private final IdGenerator tupleIdGenerator_ = TupleId.createGenerator(); + private final IdGenerator slotIdGenerator_ = SlotId.createGenerator(); + + /** + * Full columns: k1, k2 v1, shadow_column + * Shadow column: SHADOW_NAME_PRFIX + v1 + * Set expr: v1=1 + * Expect output exprs: k1, k2, 1, 1 + */ + @Test + public void testComputeOutputExprsWithShadowColumnAndSetExpr(@Injectable OlapTable targetTable, + @Injectable Column k1, + @Injectable Column k2, + @Injectable Column v1, + @Injectable Column shadow_v1, + @Injectable Analyzer analyzer) { + List setExprs = Lists.newArrayList(); + TableName tableName = new TableName(null, "test"); + SlotRef slotRef = new SlotRef(tableName, "V1"); + IntLiteral intLiteral = new IntLiteral(1); + BinaryPredicate binaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, + slotRef, intLiteral); + setExprs.add(binaryPredicate); + TupleDescriptor srcTupleDesc = new TupleDescriptor(tupleIdGenerator_.getNextId()); + SlotDescriptor k1SlotDesc = new SlotDescriptor(slotIdGenerator_.getNextId(), srcTupleDesc); + k1SlotDesc.setColumn(k1); + srcTupleDesc.addSlot(k1SlotDesc); + SlotDescriptor k2SlotDesc = new SlotDescriptor(slotIdGenerator_.getNextId(), srcTupleDesc); + k2SlotDesc.setColumn(k2); + srcTupleDesc.addSlot(k2SlotDesc); + SlotDescriptor v1SlotDesc = new SlotDescriptor(slotIdGenerator_.getNextId(), srcTupleDesc); + v1SlotDesc.setColumn(v1); + srcTupleDesc.addSlot(v1SlotDesc); + List fullSchema = Lists.newArrayList(); + fullSchema.add(k1); + fullSchema.add(k2); + fullSchema.add(v1); + fullSchema.add(shadow_v1); + + new Expectations(){ + { + targetTable.getFullSchema(); + result = fullSchema; + k1.getName(); + result = "k1"; + k2.getName(); + result = "k2"; + v1.getName(); + result = "v1"; + shadow_v1.getName(); + result = SHADOW_NAME_PRFIX + "v1"; + } + }; + + UpdatePlanner updatePlanner = new UpdatePlanner(1, targetTable, setExprs, srcTupleDesc, analyzer); + List outputExpr = Deencapsulation.invoke(updatePlanner, "computeOutputExprs"); + Assert.assertEquals(4, outputExpr.size()); + Expr outputExpr1 = outputExpr.get(0); + Assert.assertTrue(outputExpr1 instanceof SlotRef); + Assert.assertEquals(((SlotRef) outputExpr1).getDesc().getColumn().getName(), "k1"); + Expr outputExpr2 = outputExpr.get(1); + Assert.assertTrue(outputExpr2 instanceof SlotRef); + Assert.assertEquals(((SlotRef) outputExpr2).getDesc().getColumn().getName(), "k2"); + Expr outputExpr3 = outputExpr.get(2); + Assert.assertTrue(outputExpr3 instanceof IntLiteral); + Assert.assertEquals(((IntLiteral) outputExpr3).getValue(), 1); + Expr outputExpr4 = outputExpr.get(3); + Assert.assertTrue(outputExpr4 instanceof IntLiteral); + Assert.assertEquals(((IntLiteral) outputExpr4).getValue(), 1); + } + + @Test + public void testNewColumnBySchemaChange(@Injectable OlapTable targetTable, + @Injectable Column k1, + @Injectable Column k2, + @Injectable Column v1, + @Injectable Column new_v2, + @Injectable Analyzer analyzer) throws AnalysisException { + List setExprs = Lists.newArrayList(); + TableName tableName = new TableName(null, "test"); + SlotRef slotRef = new SlotRef(tableName, "V1"); + IntLiteral intLiteral = new IntLiteral(1); + BinaryPredicate binaryPredicate = new BinaryPredicate(BinaryPredicate.Operator.EQ, + slotRef, intLiteral); + setExprs.add(binaryPredicate); + TupleDescriptor srcTupleDesc = new TupleDescriptor(tupleIdGenerator_.getNextId()); + SlotDescriptor k1SlotDesc = new SlotDescriptor(slotIdGenerator_.getNextId(), srcTupleDesc); + k1SlotDesc.setColumn(k1); + srcTupleDesc.addSlot(k1SlotDesc); + SlotDescriptor k2SlotDesc = new SlotDescriptor(slotIdGenerator_.getNextId(), srcTupleDesc); + k2SlotDesc.setColumn(k2); + srcTupleDesc.addSlot(k2SlotDesc); + SlotDescriptor v1SlotDesc = new SlotDescriptor(slotIdGenerator_.getNextId(), srcTupleDesc); + v1SlotDesc.setColumn(v1); + srcTupleDesc.addSlot(v1SlotDesc); + List fullSchema = Lists.newArrayList(); + fullSchema.add(k1); + fullSchema.add(k2); + fullSchema.add(v1); + fullSchema.add(new_v2); + + new Expectations(){ + { + targetTable.getFullSchema(); + result = fullSchema; + k1.getName(); + result = "k1"; + k2.getName(); + result = "k2"; + v1.getName(); + result = "v1"; + new_v2.getName(); + result = "v2"; + new_v2.getDefaultValue(); + result = "1"; + new_v2.getDefaultValueExpr(); + result = new IntLiteral(1); + } + }; + + UpdatePlanner updatePlanner = new UpdatePlanner(1, targetTable, setExprs, srcTupleDesc, analyzer); + List outputExpr = Deencapsulation.invoke(updatePlanner, "computeOutputExprs"); + Assert.assertEquals(4, outputExpr.size()); + Expr outputExpr1 = outputExpr.get(0); + Assert.assertTrue(outputExpr1 instanceof SlotRef); + Assert.assertEquals(((SlotRef) outputExpr1).getDesc().getColumn().getName(), "k1"); + Expr outputExpr2 = outputExpr.get(1); + Assert.assertTrue(outputExpr2 instanceof SlotRef); + Assert.assertEquals(((SlotRef) outputExpr2).getDesc().getColumn().getName(), "k2"); + Expr outputExpr3 = outputExpr.get(2); + Assert.assertTrue(outputExpr3 instanceof IntLiteral); + Assert.assertEquals(((IntLiteral) outputExpr3).getValue(), 1); + Expr outputExpr4 = outputExpr.get(3); + Assert.assertTrue(outputExpr4 instanceof IntLiteral); + Assert.assertEquals(((IntLiteral) outputExpr4).getValue(), 1); + } +}