From 1a15f533098eec6708121004ef835d111dc374f3 Mon Sep 17 00:00:00 2001 From: wangxixu Date: Thu, 13 Aug 2020 22:09:31 +0800 Subject: [PATCH 01/15] udf: replace function --- be/src/exprs/string_functions.cpp | 19 ++++++++ be/src/exprs/string_functions.h | 3 ++ be/test/exprs/string_functions_test.cpp | 13 ++++++ .../string-functions/str_replace.md | 46 +++++++++++++++++++ .../string-functions/str_replace.md | 46 +++++++++++++++++++ gensrc/script/doris_builtins_functions.py | 2 + 6 files changed, 129 insertions(+) create mode 100644 docs/en/sql-reference/sql-functions/string-functions/str_replace.md create mode 100644 docs/zh-CN/sql-reference/sql-functions/string-functions/str_replace.md diff --git a/be/src/exprs/string_functions.cpp b/be/src/exprs/string_functions.cpp index 998794c8392345..ecf7695f45ce12 100644 --- a/be/src/exprs/string_functions.cpp +++ b/be/src/exprs/string_functions.cpp @@ -998,4 +998,23 @@ StringVal StringFunctions::split_part(FunctionContext* context, const StringVal& int len = (find[field.val - 1] == -1 ? content.len : find[field.val - 1]) - start_pos; return StringVal(content.ptr + start_pos, len); } + +StringVal StringFunctions::str_replace(FunctionContext *context, const StringVal &origStr, const StringVal &oldStr, const StringVal &newStr) { + if (origStr.is_null || oldStr.is_null || newStr.is_null) { + return origStr; + } + std::string orig_str = std::string(reinterpret_cast(origStr.ptr), origStr.len); + std::string old_str = std::string(reinterpret_cast(oldStr.ptr), oldStr.len); + std::string new_str = std::string(reinterpret_cast(newStr.ptr), newStr.len); + std::string::size_type pos = 0; + std::string::size_type oldLen = old_str.size(); + std::string::size_type newLen = new_str.size(); + while(pos = orig_str.find(old_str, pos)) + { + if(pos == std::string::npos) break; + orig_str.replace(pos, oldLen, new_str); + pos += newLen; + } + return AnyValUtil::from_string_temp(context, orig_str); +} } diff --git a/be/src/exprs/string_functions.h b/be/src/exprs/string_functions.h index 13b96bcf5feb61..e60969bde5aa7c 100644 --- a/be/src/exprs/string_functions.h +++ b/be/src/exprs/string_functions.h @@ -190,6 +190,9 @@ class StringFunctions { static StringVal split_part(FunctionContext* context, const StringVal& content, const StringVal& delimiter, const IntVal& field); + + static StringVal str_replace(FunctionContext *context, const StringVal &origStr, + const StringVal &oldStr, const StringVal &newStr); }; } diff --git a/be/test/exprs/string_functions_test.cpp b/be/test/exprs/string_functions_test.cpp index 05d484c67cd1bd..339f89f4342cb7 100644 --- a/be/test/exprs/string_functions_test.cpp +++ b/be/test/exprs/string_functions_test.cpp @@ -494,6 +494,19 @@ TEST_F(StringFunctionsTest, rpad) { ASSERT_EQ(StringVal("呵呵hih"), StringFunctions::rpad(ctx, StringVal("呵呵"), IntVal(5), StringVal("hi"))); } + +TEST_F(StringFunctionsTest, str_replace) { + ASSERT_EQ(StringVal("http://www.baidu.com:8080"), + StringFunctions::str_replace(ctx, StringVal("http://www.baidu.com:9090"), StringVal("9090"), StringVal("8080"))); + + ASSERT_EQ(StringVal("http://www.baidu.com:9090"), + StringFunctions::str_replace(ctx, StringVal("http://www.baidu.com:9090"), StringVal("9070"), StringVal("8080"))); + ASSERT_EQ(StringVal("http://www.baidu.com:9090"), + StringFunctions::str_replace(ctx, StringVal("http://www.baidu.com:9090"), StringVal(""), StringVal("8080"))); + ASSERT_EQ(StringVal("http://www.baidu.com:"), + StringFunctions::str_replace(ctx, StringVal("http://www.baidu.com:9090"), StringVal("9090"), StringVal(""))); +} + } // namespace doris int main(int argc, char** argv) { diff --git a/docs/en/sql-reference/sql-functions/string-functions/str_replace.md b/docs/en/sql-reference/sql-functions/string-functions/str_replace.md new file mode 100644 index 00000000000000..f9a28cccece692 --- /dev/null +++ b/docs/en/sql-reference/sql-functions/string-functions/str_replace.md @@ -0,0 +1,46 @@ +--- +{ + "title": "str_replace", + "language": "zh-CN" +} +--- + + + +# str_replace +## description +### Syntax + +`VARCHAR STR_REPLACE (VARCHAR str, VARCHAR old, VARCHAR new)` + +replace all old substring with new substring in str + +## example + +``` +mysql> select str_replace("http://www.baidu.com:9090", "9090", ""); ++------------------------------------------------------+ +| str_replace('http://www.baidu.com:9090', '9090', '') | ++------------------------------------------------------+ +| http://www.baidu.com: | ++------------------------------------------------------+ +``` +##keyword +STR_REPLACE diff --git a/docs/zh-CN/sql-reference/sql-functions/string-functions/str_replace.md b/docs/zh-CN/sql-reference/sql-functions/string-functions/str_replace.md new file mode 100644 index 00000000000000..cf0bd769badfe4 --- /dev/null +++ b/docs/zh-CN/sql-reference/sql-functions/string-functions/str_replace.md @@ -0,0 +1,46 @@ +--- +{ + "title": "str_replace", + "language": "zh-CN" +} +--- + + + +# str_replace +## description +### Syntax + +`VARCHAR STR_REPLACE (VARCHAR str, VARCHAR old, VARCHAR new)` + +将str字符串中的old子串全部替换为new串 + +## example + +``` +mysql> select str_replace("http://www.baidu.com:9090", "9090", ""); ++------------------------------------------------------+ +| str_replace('http://www.baidu.com:9090', '9090', '') | ++------------------------------------------------------+ +| http://www.baidu.com: | ++------------------------------------------------------+ +``` +##keyword +STR_REPLACE diff --git a/gensrc/script/doris_builtins_functions.py b/gensrc/script/doris_builtins_functions.py index 5534f1b2b0407f..c966b7d8f9a9cb 100755 --- a/gensrc/script/doris_builtins_functions.py +++ b/gensrc/script/doris_builtins_functions.py @@ -605,6 +605,8 @@ '15FunctionContextENS2_18FunctionStateScopeE'], [['concat'], 'VARCHAR', ['VARCHAR', '...'], '_ZN5doris15StringFunctions6concatEPN9doris_udf15FunctionContextEiPKNS1_9StringValE'], + [['str_replace'], 'VARCHAR', ['VARCHAR', 'VARCHAR', 'VARCHAR'], + '_ZN5doris15StringFunctions11str_replaceEPN9doris_udf15FunctionContextERKNS1_9StringValES6_S6_'], [['concat_ws'], 'VARCHAR', ['VARCHAR', 'VARCHAR', '...'], '_ZN5doris15StringFunctions9concat_wsEPN9doris_udf' '15FunctionContextERKNS1_9StringValEiPS5_'], From 391158f52190755b5fc621c31bf703835c0a7b3b Mon Sep 17 00:00:00 2001 From: wangxixu Date: Fri, 14 Aug 2020 10:11:25 +0800 Subject: [PATCH 02/15] udf: replace function --- be/src/exprs/string_functions.cpp | 2 +- be/test/exprs/string_functions_test.cpp | 22 +++++++++++++++++++ .../string-functions/str_replace.md | 2 +- .../string-functions/str_replace.md | 2 +- 4 files changed, 25 insertions(+), 3 deletions(-) diff --git a/be/src/exprs/string_functions.cpp b/be/src/exprs/string_functions.cpp index ecf7695f45ce12..06c6d31aa75aa9 100644 --- a/be/src/exprs/string_functions.cpp +++ b/be/src/exprs/string_functions.cpp @@ -1001,7 +1001,7 @@ StringVal StringFunctions::split_part(FunctionContext* context, const StringVal& StringVal StringFunctions::str_replace(FunctionContext *context, const StringVal &origStr, const StringVal &oldStr, const StringVal &newStr) { if (origStr.is_null || oldStr.is_null || newStr.is_null) { - return origStr; + return StringVal::null(); } std::string orig_str = std::string(reinterpret_cast(origStr.ptr), origStr.len); std::string old_str = std::string(reinterpret_cast(oldStr.ptr), oldStr.len); diff --git a/be/test/exprs/string_functions_test.cpp b/be/test/exprs/string_functions_test.cpp index 339f89f4342cb7..09185505859138 100644 --- a/be/test/exprs/string_functions_test.cpp +++ b/be/test/exprs/string_functions_test.cpp @@ -496,15 +496,37 @@ TEST_F(StringFunctionsTest, rpad) { } TEST_F(StringFunctionsTest, str_replace) { + //exist substring ASSERT_EQ(StringVal("http://www.baidu.com:8080"), StringFunctions::str_replace(ctx, StringVal("http://www.baidu.com:9090"), StringVal("9090"), StringVal("8080"))); + //not exist substring ASSERT_EQ(StringVal("http://www.baidu.com:9090"), StringFunctions::str_replace(ctx, StringVal("http://www.baidu.com:9090"), StringVal("9070"), StringVal("8080"))); + + //old substring is empty ASSERT_EQ(StringVal("http://www.baidu.com:9090"), StringFunctions::str_replace(ctx, StringVal("http://www.baidu.com:9090"), StringVal(""), StringVal("8080"))); + + //new substring is empty ASSERT_EQ(StringVal("http://www.baidu.com:"), StringFunctions::str_replace(ctx, StringVal("http://www.baidu.com:9090"), StringVal("9090"), StringVal(""))); + + //origin string is null + ASSERT_EQ(StringVal::null(), + StringFunctions::str_replace(ctx, StringVal::null(), StringVal("hello"), StringVal("8080"))); + + //old substring is null + ASSERT_EQ(StringVal::null(), + StringFunctions::str_replace(ctx, StringVal("http://www.baidu.com:9090"), StringVal::null(), StringVal("8080"))); + + //new substring is null + ASSERT_EQ(StringVal::null(), + StringFunctions::str_replace(ctx, StringVal("http://www.baidu.com:9090"), StringVal("hello"), StringVal::null())); + + //substring contains Chinese character + ASSERT_EQ(StringVal("http://华夏zhongguo:9090"), + StringFunctions::str_replace(ctx, StringVal("http://中国hello:9090"), StringVal("中国hello"), StringVal("华夏zhongguo"))); } } // namespace doris diff --git a/docs/en/sql-reference/sql-functions/string-functions/str_replace.md b/docs/en/sql-reference/sql-functions/string-functions/str_replace.md index f9a28cccece692..dfbd8221b9f72a 100644 --- a/docs/en/sql-reference/sql-functions/string-functions/str_replace.md +++ b/docs/en/sql-reference/sql-functions/string-functions/str_replace.md @@ -42,5 +42,5 @@ mysql> select str_replace("http://www.baidu.com:9090", "9090", ""); | http://www.baidu.com: | +------------------------------------------------------+ ``` -##keyword +## keyword STR_REPLACE diff --git a/docs/zh-CN/sql-reference/sql-functions/string-functions/str_replace.md b/docs/zh-CN/sql-reference/sql-functions/string-functions/str_replace.md index cf0bd769badfe4..2180cfa0670741 100644 --- a/docs/zh-CN/sql-reference/sql-functions/string-functions/str_replace.md +++ b/docs/zh-CN/sql-reference/sql-functions/string-functions/str_replace.md @@ -42,5 +42,5 @@ mysql> select str_replace("http://www.baidu.com:9090", "9090", ""); | http://www.baidu.com: | +------------------------------------------------------+ ``` -##keyword +## keyword STR_REPLACE From 5eb52e1e5742d2e9f7aae5ac8da81545ae5f3df2 Mon Sep 17 00:00:00 2001 From: wangxixu Date: Fri, 14 Aug 2020 11:58:30 +0800 Subject: [PATCH 03/15] udf: replace function --- be/src/exprs/string_functions.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/exprs/string_functions.cpp b/be/src/exprs/string_functions.cpp index 06c6d31aa75aa9..deca61f9c3bc87 100644 --- a/be/src/exprs/string_functions.cpp +++ b/be/src/exprs/string_functions.cpp @@ -1009,7 +1009,7 @@ StringVal StringFunctions::str_replace(FunctionContext *context, const StringVal std::string::size_type pos = 0; std::string::size_type oldLen = old_str.size(); std::string::size_type newLen = new_str.size(); - while(pos = orig_str.find(old_str, pos)) + while ((pos = orig_str.find(old_str, pos))) { if(pos == std::string::npos) break; orig_str.replace(pos, oldLen, new_str); From 86f841f29fa4ee19773d6c5922465194a2887cf6 Mon Sep 17 00:00:00 2001 From: wangxixu Date: Tue, 18 Aug 2020 11:56:14 +0800 Subject: [PATCH 04/15] udf: replace function --- be/src/exprs/string_functions.cpp | 2 +- be/src/exprs/string_functions.h | 2 +- be/test/exprs/string_functions_test.cpp | 18 +++++++++--------- fe/fe-core/src/main/cup/sql_parser.cup | 2 ++ gensrc/script/doris_builtins_functions.py | 2 +- 5 files changed, 14 insertions(+), 12 deletions(-) diff --git a/be/src/exprs/string_functions.cpp b/be/src/exprs/string_functions.cpp index deca61f9c3bc87..1679eca54b9ddd 100644 --- a/be/src/exprs/string_functions.cpp +++ b/be/src/exprs/string_functions.cpp @@ -999,7 +999,7 @@ StringVal StringFunctions::split_part(FunctionContext* context, const StringVal& return StringVal(content.ptr + start_pos, len); } -StringVal StringFunctions::str_replace(FunctionContext *context, const StringVal &origStr, const StringVal &oldStr, const StringVal &newStr) { +StringVal StringFunctions::replace(FunctionContext *context, const StringVal &origStr, const StringVal &oldStr, const StringVal &newStr) { if (origStr.is_null || oldStr.is_null || newStr.is_null) { return StringVal::null(); } diff --git a/be/src/exprs/string_functions.h b/be/src/exprs/string_functions.h index e60969bde5aa7c..f1a4df9e373f51 100644 --- a/be/src/exprs/string_functions.h +++ b/be/src/exprs/string_functions.h @@ -191,7 +191,7 @@ class StringFunctions { static StringVal split_part(FunctionContext* context, const StringVal& content, const StringVal& delimiter, const IntVal& field); - static StringVal str_replace(FunctionContext *context, const StringVal &origStr, + static StringVal replace(FunctionContext *context, const StringVal &origStr, const StringVal &oldStr, const StringVal &newStr); }; } diff --git a/be/test/exprs/string_functions_test.cpp b/be/test/exprs/string_functions_test.cpp index 09185505859138..3a6e351f60f990 100644 --- a/be/test/exprs/string_functions_test.cpp +++ b/be/test/exprs/string_functions_test.cpp @@ -495,38 +495,38 @@ TEST_F(StringFunctionsTest, rpad) { StringFunctions::rpad(ctx, StringVal("呵呵"), IntVal(5), StringVal("hi"))); } -TEST_F(StringFunctionsTest, str_replace) { +TEST_F(StringFunctionsTest, replace) { //exist substring ASSERT_EQ(StringVal("http://www.baidu.com:8080"), - StringFunctions::str_replace(ctx, StringVal("http://www.baidu.com:9090"), StringVal("9090"), StringVal("8080"))); + StringFunctions::replace(ctx, StringVal("http://www.baidu.com:9090"), StringVal("9090"), StringVal("8080"))); //not exist substring ASSERT_EQ(StringVal("http://www.baidu.com:9090"), - StringFunctions::str_replace(ctx, StringVal("http://www.baidu.com:9090"), StringVal("9070"), StringVal("8080"))); + StringFunctions::replace(ctx, StringVal("http://www.baidu.com:9090"), StringVal("9070"), StringVal("8080"))); //old substring is empty ASSERT_EQ(StringVal("http://www.baidu.com:9090"), - StringFunctions::str_replace(ctx, StringVal("http://www.baidu.com:9090"), StringVal(""), StringVal("8080"))); + StringFunctions::replace(ctx, StringVal("http://www.baidu.com:9090"), StringVal(""), StringVal("8080"))); //new substring is empty ASSERT_EQ(StringVal("http://www.baidu.com:"), - StringFunctions::str_replace(ctx, StringVal("http://www.baidu.com:9090"), StringVal("9090"), StringVal(""))); + StringFunctions::replace(ctx, StringVal("http://www.baidu.com:9090"), StringVal("9090"), StringVal(""))); //origin string is null ASSERT_EQ(StringVal::null(), - StringFunctions::str_replace(ctx, StringVal::null(), StringVal("hello"), StringVal("8080"))); + StringFunctions::replace(ctx, StringVal::null(), StringVal("hello"), StringVal("8080"))); //old substring is null ASSERT_EQ(StringVal::null(), - StringFunctions::str_replace(ctx, StringVal("http://www.baidu.com:9090"), StringVal::null(), StringVal("8080"))); + StringFunctions::replace(ctx, StringVal("http://www.baidu.com:9090"), StringVal::null(), StringVal("8080"))); //new substring is null ASSERT_EQ(StringVal::null(), - StringFunctions::str_replace(ctx, StringVal("http://www.baidu.com:9090"), StringVal("hello"), StringVal::null())); + StringFunctions::replace(ctx, StringVal("http://www.baidu.com:9090"), StringVal("hello"), StringVal::null())); //substring contains Chinese character ASSERT_EQ(StringVal("http://华夏zhongguo:9090"), - StringFunctions::str_replace(ctx, StringVal("http://中国hello:9090"), StringVal("中国hello"), StringVal("华夏zhongguo"))); + StringFunctions::replace(ctx, StringVal("http://中国hello:9090"), StringVal("中国hello"), StringVal("华夏zhongguo"))); } } // namespace doris diff --git a/fe/fe-core/src/main/cup/sql_parser.cup b/fe/fe-core/src/main/cup/sql_parser.cup index a2d8294915cc21..7ba56810734001 100644 --- a/fe/fe-core/src/main/cup/sql_parser.cup +++ b/fe/fe-core/src/main/cup/sql_parser.cup @@ -4593,6 +4593,8 @@ keyword ::= {: RESULT = id; :} | KW_REPEATABLE:id {: RESULT = id; :} + | KW_REPLACE:id + {: RESULT = id; :} | KW_REPLACE_IF_NOT_NULL:id {: RESULT = id; :} | KW_REPOSITORY:id diff --git a/gensrc/script/doris_builtins_functions.py b/gensrc/script/doris_builtins_functions.py index c966b7d8f9a9cb..ed26fae15f87aa 100755 --- a/gensrc/script/doris_builtins_functions.py +++ b/gensrc/script/doris_builtins_functions.py @@ -605,7 +605,7 @@ '15FunctionContextENS2_18FunctionStateScopeE'], [['concat'], 'VARCHAR', ['VARCHAR', '...'], '_ZN5doris15StringFunctions6concatEPN9doris_udf15FunctionContextEiPKNS1_9StringValE'], - [['str_replace'], 'VARCHAR', ['VARCHAR', 'VARCHAR', 'VARCHAR'], + [['replace'], 'VARCHAR', ['VARCHAR', 'VARCHAR', 'VARCHAR'], '_ZN5doris15StringFunctions11str_replaceEPN9doris_udf15FunctionContextERKNS1_9StringValES6_S6_'], [['concat_ws'], 'VARCHAR', ['VARCHAR', 'VARCHAR', '...'], '_ZN5doris15StringFunctions9concat_wsEPN9doris_udf' From ade1afa75c94a178a6801ba1324acf25ad8b16fe Mon Sep 17 00:00:00 2001 From: wangxixu Date: Tue, 18 Aug 2020 11:59:59 +0800 Subject: [PATCH 05/15] udf: replace function --- .../string-functions/{str_replace.md => replace.md} | 10 +++++----- .../string-functions/{str_replace.md => replace.md} | 12 ++++++------ 2 files changed, 11 insertions(+), 11 deletions(-) rename docs/en/sql-reference/sql-functions/string-functions/{str_replace.md => replace.md} (83%) rename docs/zh-CN/sql-reference/sql-functions/string-functions/{str_replace.md => replace.md} (82%) diff --git a/docs/en/sql-reference/sql-functions/string-functions/str_replace.md b/docs/en/sql-reference/sql-functions/string-functions/replace.md similarity index 83% rename from docs/en/sql-reference/sql-functions/string-functions/str_replace.md rename to docs/en/sql-reference/sql-functions/string-functions/replace.md index dfbd8221b9f72a..6f86b18a679d3d 100644 --- a/docs/en/sql-reference/sql-functions/string-functions/str_replace.md +++ b/docs/en/sql-reference/sql-functions/string-functions/replace.md @@ -1,6 +1,6 @@ --- { - "title": "str_replace", + "title": "replace", "language": "zh-CN" } --- @@ -24,20 +24,20 @@ specific language governing permissions and limitations under the License. --> -# str_replace +# replace ## description ### Syntax -`VARCHAR STR_REPLACE (VARCHAR str, VARCHAR old, VARCHAR new)` +`VARCHAR REPLACE (VARCHAR str, VARCHAR old, VARCHAR new)` replace all old substring with new substring in str ## example ``` -mysql> select str_replace("http://www.baidu.com:9090", "9090", ""); +mysql> select replace("http://www.baidu.com:9090", "9090", ""); +------------------------------------------------------+ -| str_replace('http://www.baidu.com:9090', '9090', '') | +| replace('http://www.baidu.com:9090', '9090', '') | +------------------------------------------------------+ | http://www.baidu.com: | +------------------------------------------------------+ diff --git a/docs/zh-CN/sql-reference/sql-functions/string-functions/str_replace.md b/docs/zh-CN/sql-reference/sql-functions/string-functions/replace.md similarity index 82% rename from docs/zh-CN/sql-reference/sql-functions/string-functions/str_replace.md rename to docs/zh-CN/sql-reference/sql-functions/string-functions/replace.md index 2180cfa0670741..dda2625df07d90 100644 --- a/docs/zh-CN/sql-reference/sql-functions/string-functions/str_replace.md +++ b/docs/zh-CN/sql-reference/sql-functions/string-functions/replace.md @@ -1,6 +1,6 @@ --- { - "title": "str_replace", + "title": "replace", "language": "zh-CN" } --- @@ -24,23 +24,23 @@ specific language governing permissions and limitations under the License. --> -# str_replace +# replace ## description ### Syntax -`VARCHAR STR_REPLACE (VARCHAR str, VARCHAR old, VARCHAR new)` +`VARCHAR REPLACE (VARCHAR str, VARCHAR old, VARCHAR new)` 将str字符串中的old子串全部替换为new串 ## example ``` -mysql> select str_replace("http://www.baidu.com:9090", "9090", ""); +mysql> select replace("http://www.baidu.com:9090", "9090", ""); +------------------------------------------------------+ -| str_replace('http://www.baidu.com:9090', '9090', '') | +| replace('http://www.baidu.com:9090', '9090', '') | +------------------------------------------------------+ | http://www.baidu.com: | +------------------------------------------------------+ ``` ## keyword -STR_REPLACE +REPLACE From 1d95a491115064c9753694ad4d45c2f72b8d2645 Mon Sep 17 00:00:00 2001 From: wangxixu Date: Tue, 18 Aug 2020 16:01:45 +0800 Subject: [PATCH 06/15] udf: replace function --- docs/en/sql-reference/sql-functions/string-functions/replace.md | 2 +- gensrc/script/doris_builtins_functions.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/en/sql-reference/sql-functions/string-functions/replace.md b/docs/en/sql-reference/sql-functions/string-functions/replace.md index 6f86b18a679d3d..d5111fa43c8077 100644 --- a/docs/en/sql-reference/sql-functions/string-functions/replace.md +++ b/docs/en/sql-reference/sql-functions/string-functions/replace.md @@ -43,4 +43,4 @@ mysql> select replace("http://www.baidu.com:9090", "9090", ""); +------------------------------------------------------+ ``` ## keyword -STR_REPLACE +REPLACE diff --git a/gensrc/script/doris_builtins_functions.py b/gensrc/script/doris_builtins_functions.py index ed26fae15f87aa..b960ba6afbbba0 100755 --- a/gensrc/script/doris_builtins_functions.py +++ b/gensrc/script/doris_builtins_functions.py @@ -606,7 +606,7 @@ [['concat'], 'VARCHAR', ['VARCHAR', '...'], '_ZN5doris15StringFunctions6concatEPN9doris_udf15FunctionContextEiPKNS1_9StringValE'], [['replace'], 'VARCHAR', ['VARCHAR', 'VARCHAR', 'VARCHAR'], - '_ZN5doris15StringFunctions11str_replaceEPN9doris_udf15FunctionContextERKNS1_9StringValES6_S6_'], + '_ZN5doris15StringFunctions7replaceEPN9doris_udf15FunctionContextERKNS1_9StringValES6_S6_'], [['concat_ws'], 'VARCHAR', ['VARCHAR', 'VARCHAR', '...'], '_ZN5doris15StringFunctions9concat_wsEPN9doris_udf' '15FunctionContextERKNS1_9StringValEiPS5_'], From 51f81969401aa58ea262086cc796e900e68cd62b Mon Sep 17 00:00:00 2001 From: wangxixu Date: Sat, 7 Nov 2020 21:21:59 +0800 Subject: [PATCH 07/15] support using udf when loading data --- .../doris/load/loadv2/BrokerLoadJob.java | 3 ++- .../doris/load/loadv2/LoadLoadingTask.java | 6 ++++-- .../doris/load/loadv2/LoadingTaskPlanner.java | 19 ++++++++++--------- .../org/apache/doris/qe/OriginStatement.java | 10 ++++++++++ .../org/apache/doris/qe/StmtExecutor.java | 1 + 5 files changed, 27 insertions(+), 12 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 9ca4c77303820b..508206af005959 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -186,7 +186,8 @@ brokerFileGroups, getDeadlineMs(), execMemLimit, strictMode, transactionId, this, timezone, timeoutSecond); UUID uuid = UUID.randomUUID(); TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); - task.init(loadId, attachment.getFileStatusByTable(aggKey), attachment.getFileNumByTable(aggKey)); + task.init(loadId, attachment.getFileStatusByTable(aggKey), + attachment.getFileNumByTable(aggKey), this.getOriginStmt().getUserInfo()); idToTasks.put(task.getSignature(), task); // idToTasks contains previous LoadPendingTasks, so idToTasks is just used to save all tasks. // use newLoadingTasks to save new created loading tasks and submit them later. diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java index caa6081e7073e7..b08f36622fd509 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadLoadingTask.java @@ -18,6 +18,7 @@ package org.apache.doris.load.loadv2; import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.OlapTable; import org.apache.doris.common.LoadException; @@ -83,9 +84,10 @@ public LoadLoadingTask(Database db, OlapTable table, this.timeoutS = timeoutS; } - public void init(TUniqueId loadId, List> fileStatusList, int fileNum) throws UserException { + public void init(TUniqueId loadId, List> fileStatusList, int fileNum, UserIdentity userInfo) throws UserException { this.loadId = loadId; - planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId, db.getId(), table, brokerDesc, fileGroups, strictMode, timezone, this.timeoutS); + planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId, db.getId(), table, + brokerDesc, fileGroups, strictMode, timezone, this.timeoutS, userInfo); planner.plan(loadId, fileStatusList, fileNum); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java index ab46d8caeb71cd..92089e50461bb8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java @@ -22,6 +22,7 @@ import org.apache.doris.analysis.DescriptorTable; import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.OlapTable; @@ -32,6 +33,7 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.load.BrokerFileGroup; +import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.planner.BrokerScanNode; import org.apache.doris.planner.DataPartition; import org.apache.doris.planner.OlapTableSink; @@ -79,7 +81,8 @@ public class LoadingTaskPlanner { public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table, BrokerDesc brokerDesc, List brokerFileGroups, - boolean strictMode, String timezone, long timeoutS) { + boolean strictMode, String timezone, long timeoutS, + UserIdentity userInfo) { this.loadJobId = loadJobId; this.txnId = txnId; this.dbId = dbId; @@ -89,14 +92,12 @@ public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table this.strictMode = strictMode; this.analyzer.setTimezone(timezone); this.timeoutS = timeoutS; - - /* - * TODO(cmy): UDF currently belongs to a database. Therefore, before using UDF, - * we need to check whether the user has corresponding permissions on this database. - * But here we have lost user information and therefore cannot check permissions. - * So here we first prohibit users from using UDF in load. If necessary, improve it later. - */ - this.analyzer.setUDFAllowed(false); + if (Catalog.getCurrentCatalog().getAuth().checkDbPriv(userInfo, + Catalog.getCurrentCatalog().getDb(dbId).getFullName(), PrivPredicate.SELECT)) { + this.analyzer.setUDFAllowed(true); + } else { + this.analyzer.setUDFAllowed(false); + } } public void plan(TUniqueId loadId, List> fileStatusesList, int filesAdded) diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/OriginStatement.java b/fe/fe-core/src/main/java/org/apache/doris/qe/OriginStatement.java index c943d84a9626e5..5389fb8200e323 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/OriginStatement.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/OriginStatement.java @@ -17,6 +17,7 @@ package org.apache.doris.qe; +import org.apache.doris.analysis.UserIdentity; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.persist.gson.GsonUtils; @@ -39,6 +40,7 @@ public class OriginStatement implements Writable { // the idx of the specified statement in "originStmt", start from 0. @SerializedName(value = "idx") public final int idx; + public UserIdentity userInfo; public OriginStatement(String originStmt, int idx) { this.originStmt = originStmt; @@ -55,4 +57,12 @@ public void write(DataOutput out) throws IOException { String json = GsonUtils.GSON.toJson(this); Text.writeString(out, json); } + + public UserIdentity getUserInfo() { + return userInfo; + } + + public void setUserInfo(UserIdentity userInfo) { + this.userInfo = userInfo; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 6cd135faa009ed..3db3de0f657a6c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -1049,6 +1049,7 @@ private void handleExplainStmt(String result) throws IOException { private void handleDdlStmt() { try { + parsedStmt.getOrigStmt().setUserInfo(context.getCurrentUserIdentity()); DdlExecutor.execute(context.getCatalog(), (DdlStmt) parsedStmt); context.getState().setOk(); } catch (QueryStateException e) { From 9a3ae2df22d9a0782515c4b8a69a4d70b89d1f24 Mon Sep 17 00:00:00 2001 From: wangxixu Date: Sun, 8 Nov 2020 20:38:55 +0800 Subject: [PATCH 08/15] update --- .../java/org/apache/doris/load/loadv2/BulkLoadJob.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java index 04f3d938fc0d30..203ecdbf5f13b2 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java @@ -332,4 +332,11 @@ public void readFields(DataInput in) throws IOException { } } + public OriginStatement getOriginStmt() { + return originStmt; + } + + public void setOriginStmt(OriginStatement originStmt) { + this.originStmt = originStmt; + } } From 6db8e749dbdf3fc619127ae083d4679dfa61420f Mon Sep 17 00:00:00 2001 From: wangxixu Date: Mon, 9 Nov 2020 19:40:47 +0800 Subject: [PATCH 09/15] support udf when loading data --- .../apache/doris/analysis/StatementBase.java | 10 +++++++++ .../doris/load/loadv2/BrokerLoadJob.java | 8 ++++--- .../apache/doris/load/loadv2/BulkLoadJob.java | 21 +++++++++++++++---- .../doris/load/loadv2/LoadingTaskPlanner.java | 3 ++- .../doris/load/loadv2/SparkLoadJob.java | 6 ++++-- .../org/apache/doris/qe/ConnectProcessor.java | 1 + .../org/apache/doris/qe/OriginStatement.java | 10 --------- .../org/apache/doris/qe/StmtExecutor.java | 2 +- 8 files changed, 40 insertions(+), 21 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/StatementBase.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/StatementBase.java index 7f787bba68f3a7..da84fc5c4c3c5b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StatementBase.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StatementBase.java @@ -51,6 +51,8 @@ public abstract class StatementBase implements ParseNode { private OriginStatement origStmt; + private UserIdentity userInfo; + protected StatementBase() { } /** @@ -169,6 +171,14 @@ public OriginStatement getOrigStmt() { return origStmt; } + public UserIdentity getUserInfo() { + return userInfo; + } + + public void setUserInfo(UserIdentity userInfo) { + this.userInfo = userInfo; + } + /** * Resets the internal analysis state of this node. * For easier maintenance, class members that need to be reset are grouped into diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java index 508206af005959..7abbd0e1dae6cc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BrokerLoadJob.java @@ -18,6 +18,7 @@ package org.apache.doris.load.loadv2; import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.OlapTable; @@ -69,9 +70,10 @@ public BrokerLoadJob() { this.jobType = EtlJobType.BROKER; } - public BrokerLoadJob(long dbId, String label, BrokerDesc brokerDesc, OriginStatement originStmt) + public BrokerLoadJob(long dbId, String label, BrokerDesc brokerDesc, + OriginStatement originStmt, UserIdentity userInfo) throws MetaNotFoundException { - super(dbId, label, originStmt); + super(dbId, label, originStmt, userInfo); this.timeoutSecond = Config.broker_load_default_timeout_second; this.brokerDesc = brokerDesc; this.jobType = EtlJobType.BROKER; @@ -187,7 +189,7 @@ brokerFileGroups, getDeadlineMs(), execMemLimit, UUID uuid = UUID.randomUUID(); TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); task.init(loadId, attachment.getFileStatusByTable(aggKey), - attachment.getFileNumByTable(aggKey), this.getOriginStmt().getUserInfo()); + attachment.getFileNumByTable(aggKey), getUserInfo()); idToTasks.put(task.getSignature(), task); // idToTasks contains previous LoadPendingTasks, so idToTasks is just used to save all tasks. // use newLoadingTasks to save new created loading tasks and submit them later. diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java index 203ecdbf5f13b2..dba531afdf94f8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java @@ -22,6 +22,7 @@ import org.apache.doris.analysis.LoadStmt; import org.apache.doris.analysis.SqlParser; import org.apache.doris.analysis.SqlScanner; +import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.AuthorizationInfo; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; @@ -72,6 +73,8 @@ public abstract class BulkLoadJob extends LoadJob { // the expr of columns will be reanalyze when the log is replayed private OriginStatement originStmt; + private UserIdentity userInfo; + // include broker desc and data desc protected BrokerFileGroupAggInfo fileGroupAggInfo = new BrokerFileGroupAggInfo(); protected List commitInfos = Lists.newArrayList(); @@ -85,10 +88,11 @@ public BulkLoadJob() { super(); } - public BulkLoadJob(long dbId, String label, OriginStatement originStmt) throws MetaNotFoundException { + public BulkLoadJob(long dbId, String label, OriginStatement originStmt, UserIdentity userInfo) throws MetaNotFoundException { super(dbId, label); this.originStmt = originStmt; this.authorizationInfo = gatherAuthInfo(); + this.userInfo = userInfo; if (ConnectContext.get() != null) { SessionVariable var = ConnectContext.get().getSessionVariable(); @@ -112,11 +116,11 @@ public static BulkLoadJob fromLoadStmt(LoadStmt stmt) throws DdlException { switch (stmt.getEtlJobType()) { case BROKER: bulkLoadJob = new BrokerLoadJob(db.getId(), stmt.getLabel().getLabelName(), - stmt.getBrokerDesc(), stmt.getOrigStmt()); + stmt.getBrokerDesc(), stmt.getOrigStmt(), stmt.getUserInfo()); break; case SPARK: bulkLoadJob = new SparkLoadJob(db.getId(), stmt.getLabel().getLabelName(), - stmt.getResourceDesc(), stmt.getOrigStmt()); + stmt.getResourceDesc(), stmt.getOrigStmt(), stmt.getUserInfo()); break; case MINI: case DELETE: @@ -290,6 +294,7 @@ public void write(DataOutput out) throws IOException { super.write(out); brokerDesc.write(out); originStmt.write(out); + userInfo.write(out); out.writeInt(sessionVariables.size()); for (Map.Entry entry : sessionVariables.entrySet()) { @@ -318,7 +323,7 @@ public void readFields(DataInput in) throws IOException { // The origin stmt does not be analyzed in here. // The reason is that it will thrown MetaNotFoundException when the tableId could not be found by tableName. // The origin stmt will be analyzed after the replay is completed. - + userInfo = UserIdentity.read(in); if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_66) { int size = in.readInt(); for (int i = 0; i < size; i++) { @@ -339,4 +344,12 @@ public OriginStatement getOriginStmt() { public void setOriginStmt(OriginStatement originStmt) { this.originStmt = originStmt; } + + public UserIdentity getUserInfo() { + return userInfo; + } + + public void setUserInfo(UserIdentity userInfo) { + this.userInfo = userInfo; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java index 92089e50461bb8..c728701f2bdd28 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/LoadingTaskPlanner.java @@ -67,7 +67,7 @@ public class LoadingTaskPlanner { private final List fileGroups; private final boolean strictMode; private final long timeoutS; // timeout of load job, in second - + private UserIdentity userInfo; // Something useful // ConnectContext here is just a dummy object to avoid some NPE problem, like ctx.getDatabase() private Analyzer analyzer = new Analyzer(Catalog.getCurrentCatalog(), new ConnectContext()); @@ -92,6 +92,7 @@ public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table this.strictMode = strictMode; this.analyzer.setTimezone(timezone); this.timeoutS = timeoutS; + this.userInfo = userInfo; if (Catalog.getCurrentCatalog().getAuth().checkDbPriv(userInfo, Catalog.getCurrentCatalog().getDb(dbId).getFullName(), PrivPredicate.SELECT)) { this.analyzer.setUDFAllowed(true); diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java index 9297295748a407..3d3f7b34b07b56 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLoadJob.java @@ -25,6 +25,7 @@ import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; @@ -145,9 +146,10 @@ public SparkLoadJob() { jobType = EtlJobType.SPARK; } - public SparkLoadJob(long dbId, String label, ResourceDesc resourceDesc, OriginStatement originStmt) + public SparkLoadJob(long dbId, String label, ResourceDesc resourceDesc, + OriginStatement originStmt, UserIdentity userInfo) throws MetaNotFoundException { - super(dbId, label, originStmt); + super(dbId, label, originStmt, userInfo); this.resourceDesc = resourceDesc; timeoutSecond = Config.spark_load_default_timeout_second; jobType = EtlJobType.SPARK; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java index b7a905920fa7aa..e2908c98b6e9ce 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/ConnectProcessor.java @@ -186,6 +186,7 @@ private void handleQuery() { } parsedStmt = stmts.get(i); parsedStmt.setOrigStmt(new OriginStatement(originStmt, i)); + parsedStmt.setUserInfo(ctx.getCurrentUserIdentity()); executor = new StmtExecutor(ctx, parsedStmt); ctx.setExecutor(executor); executor.execute(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/OriginStatement.java b/fe/fe-core/src/main/java/org/apache/doris/qe/OriginStatement.java index 5389fb8200e323..c943d84a9626e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/OriginStatement.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/OriginStatement.java @@ -17,7 +17,6 @@ package org.apache.doris.qe; -import org.apache.doris.analysis.UserIdentity; import org.apache.doris.common.io.Text; import org.apache.doris.common.io.Writable; import org.apache.doris.persist.gson.GsonUtils; @@ -40,7 +39,6 @@ public class OriginStatement implements Writable { // the idx of the specified statement in "originStmt", start from 0. @SerializedName(value = "idx") public final int idx; - public UserIdentity userInfo; public OriginStatement(String originStmt, int idx) { this.originStmt = originStmt; @@ -57,12 +55,4 @@ public void write(DataOutput out) throws IOException { String json = GsonUtils.GSON.toJson(this); Text.writeString(out, json); } - - public UserIdentity getUserInfo() { - return userInfo; - } - - public void setUserInfo(UserIdentity userInfo) { - this.userInfo = userInfo; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 3db3de0f657a6c..b9ac363144f342 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -404,6 +404,7 @@ public void analyze(TQueryOptions tQueryOptions) throws UserException { try { parsedStmt = SqlParserUtils.getStmt(parser, originStmt.idx); parsedStmt.setOrigStmt(originStmt); + parsedStmt.setUserInfo(context.getCurrentUserIdentity()); } catch (Error e) { LOG.info("error happened when parsing stmt {}, id: {}", originStmt, context.getStmtId(), e); throw new AnalysisException("sql parsing error, please check your sql"); @@ -1049,7 +1050,6 @@ private void handleExplainStmt(String result) throws IOException { private void handleDdlStmt() { try { - parsedStmt.getOrigStmt().setUserInfo(context.getCurrentUserIdentity()); DdlExecutor.execute(context.getCatalog(), (DdlStmt) parsedStmt); context.getState().setOk(); } catch (QueryStateException e) { From 0f008491c48795f84ca50817f5af7c4016b1f611 Mon Sep 17 00:00:00 2001 From: wangxixu Date: Mon, 9 Nov 2020 19:44:11 +0800 Subject: [PATCH 10/15] support udf when loading data --- .../apache/doris/load/loadv2/SparkLoadJobTest.java | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java index 7f263d7a1615a0..0f7c5f2abaff95 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java @@ -21,11 +21,7 @@ import mockit.Injectable; import mockit.Mocked; -import org.apache.doris.analysis.BrokerDesc; -import org.apache.doris.analysis.DataDescription; -import org.apache.doris.analysis.LabelName; -import org.apache.doris.analysis.LoadStmt; -import org.apache.doris.analysis.ResourceDesc; +import org.apache.doris.analysis.*; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; @@ -217,7 +213,7 @@ public void testExecute(@Mocked Catalog catalog, @Mocked SparkLoadPendingTask pe }; ResourceDesc resourceDesc = new ResourceDesc(resourceName, Maps.newHashMap()); - SparkLoadJob job = new SparkLoadJob(dbId, label, resourceDesc, new OriginStatement(originStmt, 0)); + SparkLoadJob job = new SparkLoadJob(dbId, label, resourceDesc, new OriginStatement(originStmt, 0), new UserIdentity("root", "0.0.0.0")); job.execute(); // check transaction id and id to tasks @@ -228,7 +224,7 @@ public void testExecute(@Mocked Catalog catalog, @Mocked SparkLoadPendingTask pe @Test public void testOnPendingTaskFinished(@Mocked Catalog catalog, @Injectable String originStmt) throws MetaNotFoundException { ResourceDesc resourceDesc = new ResourceDesc(resourceName, Maps.newHashMap()); - SparkLoadJob job = new SparkLoadJob(dbId, label, resourceDesc, new OriginStatement(originStmt, 0)); + SparkLoadJob job = new SparkLoadJob(dbId, label, resourceDesc, new OriginStatement(originStmt, 0), new UserIdentity("root", "0.0.0.0")); SparkPendingTaskAttachment attachment = new SparkPendingTaskAttachment(pendingTaskId); attachment.setAppId(appId); attachment.setOutputPath(etlOutputPath); @@ -247,7 +243,7 @@ private SparkLoadJob getEtlStateJob(String originStmt) throws MetaNotFoundExcept sparkConfigs.put("spark.master", "yarn"); sparkConfigs.put("spark.submit.deployMode", "cluster"); sparkConfigs.put("spark.hadoop.yarn.resourcemanager.address", "127.0.0.1:9999"); - SparkLoadJob job = new SparkLoadJob(dbId, label, null, new OriginStatement(originStmt, 0)); + SparkLoadJob job = new SparkLoadJob(dbId, label, null, new OriginStatement(originStmt, 0), new UserIdentity("root", "0.0.0.0")); job.state = JobState.ETL; job.maxFilterRatio = 0.15; job.transactionId = transactionId; From 080a9142b864fd8d0c00d3d1baf6f1a44862de53 Mon Sep 17 00:00:00 2001 From: wangxixu Date: Mon, 9 Nov 2020 19:47:15 +0800 Subject: [PATCH 11/15] support udf when loading data --- .../org/apache/doris/load/loadv2/SparkLoadJobTest.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java index 0f7c5f2abaff95..69538b88640647 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLoadJobTest.java @@ -21,7 +21,12 @@ import mockit.Injectable; import mockit.Mocked; -import org.apache.doris.analysis.*; +import org.apache.doris.analysis.BrokerDesc; +import org.apache.doris.analysis.DataDescription; +import org.apache.doris.analysis.LabelName; +import org.apache.doris.analysis.LoadStmt; +import org.apache.doris.analysis.ResourceDesc; +import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Column; import org.apache.doris.catalog.Database; From eabea85532c6a021b97d770d0a7ee33cb7df3350 Mon Sep 17 00:00:00 2001 From: wangxixu Date: Mon, 9 Nov 2020 19:50:08 +0800 Subject: [PATCH 12/15] support udf when loading data --- .../java/org/apache/doris/load/loadv2/BulkLoadJob.java | 8 -------- 1 file changed, 8 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java index a2118f9135936d..b8ee0cf578fdc7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java @@ -343,14 +343,6 @@ public void readFields(DataInput in) throws IOException { } } - public OriginStatement getOriginStmt() { - return originStmt; - } - - public void setOriginStmt(OriginStatement originStmt) { - this.originStmt = originStmt; - } - public UserIdentity getUserInfo() { return userInfo; } From 134b0365750cbe64a26baabd28b7e103cf52c8c9 Mon Sep 17 00:00:00 2001 From: wangxixu Date: Tue, 10 Nov 2020 17:27:55 +0800 Subject: [PATCH 13/15] update --- .../doris/load/loadv2/BrokerLoadJobTest.java | 56 ++++++++++++++++++- .../apache/doris/qe/ConnectProcessorTest.java | 19 +++++++ .../org/apache/doris/qe/StmtExecutorTest.java | 22 ++++---- 3 files changed, 84 insertions(+), 13 deletions(-) diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java index 4b0286c0f68316..10a573223fd233 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/BrokerLoadJobTest.java @@ -19,14 +19,17 @@ import org.apache.doris.analysis.BrokerDesc; import org.apache.doris.analysis.DataDescription; -import org.apache.doris.analysis.ImportColumnDesc; import org.apache.doris.analysis.LabelName; import org.apache.doris.analysis.LoadStmt; -import org.apache.doris.analysis.SlotRef; +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Catalog; import org.apache.doris.catalog.Database; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Table; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.BrokerTable; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; import org.apache.doris.common.jmockit.Deencapsulation; @@ -38,6 +41,9 @@ import org.apache.doris.load.Load; import org.apache.doris.load.Source; import org.apache.doris.metric.MetricRepo; +import org.apache.doris.planner.BrokerScanNode; +import org.apache.doris.planner.OlapTableSink; +import org.apache.doris.planner.PlanFragment; import org.apache.doris.task.MasterTaskExecutor; import org.apache.doris.transaction.TransactionState; @@ -52,12 +58,16 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.UUID; import mockit.Expectations; import mockit.Injectable; import mockit.Mock; import mockit.MockUp; import mockit.Mocked; +import org.apache.doris.thrift.TUniqueId; public class BrokerLoadJobTest { @@ -323,6 +333,48 @@ public void testPendingTaskOnFinished(@Injectable BrokerPendingTaskAttachment at Assert.assertEquals(3, idToTasks.size()); } + @Test + public void testPendingTaskOnFinishedWithUserInfo(@Mocked BrokerPendingTaskAttachment attachment, + @Mocked Catalog catalog, + @Injectable BrokerDesc brokerDesc, + @Injectable LoadTaskCallback callback, + @Injectable Database database, + @Injectable FileGroupAggKey aggKey, + @Mocked OlapTable olapTable, + @Mocked PlanFragment sinkFragment, + @Mocked OlapTableSink olapTableSink, + @Mocked BrokerScanNode scanNode) throws Exception{ + List schema = new ArrayList<>(); + schema.add(new Column("a", PrimitiveType.BIGINT)); + Map properties = new HashMap<>(); + properties.put("broker_name", "test"); + properties.put("path", "hdfs://www.test.com"); + BrokerTable brokerTable = new BrokerTable(123L, "test", schema, properties); + BrokerFileGroup brokerFileGroup = new BrokerFileGroup(brokerTable); + List partitionIds = new ArrayList<>(); + partitionIds.add(123L); + Deencapsulation.setField(brokerFileGroup, "partitionIds", partitionIds); + List fileGroups = Lists.newArrayList(); + fileGroups.add(brokerFileGroup); + UUID uuid = UUID.randomUUID(); + TUniqueId loadId = new TUniqueId(uuid.getMostSignificantBits(), uuid.getLeastSignificantBits()); + LoadLoadingTask task = new LoadLoadingTask(database, olapTable,brokerDesc, fileGroups, + 100, 100,false, 100, callback, "", 100); + try { + UserIdentity userInfo = new UserIdentity("root", "localhost"); + userInfo.setIsAnalyzed(); + task.init(loadId, + attachment.getFileStatusByTable(aggKey), + attachment.getFileNumByTable(aggKey), + userInfo); + LoadingTaskPlanner planner = Deencapsulation.getField(task, "planner"); + Analyzer al = Deencapsulation.getField(planner, "analyzer"); + Assert.assertFalse(al.isUDFAllowed()); + } catch (Exception e) { + e.printStackTrace(); + } + } + @Test public void testLoadingTaskOnFinishedWithUnfinishedTask(@Injectable BrokerLoadingTaskAttachment attachment, @Injectable LoadTask loadTask1, diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java index 678751b6088114..11bf6f202e465e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/ConnectProcessorTest.java @@ -312,6 +312,25 @@ public void testQuery(@Mocked StmtExecutor executor) throws Exception { Assert.assertEquals(MysqlCommand.COM_QUERY, myContext.getCommand()); } + @Test + public void testQueryWithUserInfo(@Mocked StmtExecutor executor) throws Exception { + ConnectContext ctx = initMockContext(mockChannel(queryPacket), AccessTestUtil.fetchAdminCatalog()); + + ConnectProcessor processor = new ConnectProcessor(ctx); + + // Mock statement executor + new Expectations() { + { + executor.getQueryStatisticsForAuditLog(); + minTimes = 0; + result = statistics; + } + }; + processor.processOnce(); + StmtExecutor er = Deencapsulation.getField(processor, "executor"); + Assert.assertTrue(er.getParsedStmt().getUserInfo() != null); + } + @Test public void testQueryFail(@Mocked StmtExecutor executor) throws Exception { ConnectContext ctx = initMockContext(mockChannel(queryPacket), AccessTestUtil.fetchAdminCatalog()); diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java index c87bbf6cbc3aa7..28d65669786203 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java @@ -17,18 +17,8 @@ package org.apache.doris.qe; -import org.apache.doris.analysis.AccessTestUtil; -import org.apache.doris.analysis.Analyzer; -import org.apache.doris.analysis.DdlStmt; -import org.apache.doris.analysis.Expr; -import org.apache.doris.analysis.KillStmt; -import org.apache.doris.analysis.QueryStmt; -import org.apache.doris.analysis.RedirectStatus; -import org.apache.doris.analysis.SetStmt; -import org.apache.doris.analysis.ShowAuthorStmt; -import org.apache.doris.analysis.ShowStmt; +import org.apache.doris.analysis.*; import org.apache.doris.analysis.SqlParser; -import org.apache.doris.analysis.UseStmt; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.DdlException; import org.apache.doris.common.jmockit.Deencapsulation; @@ -526,6 +516,16 @@ public void testSet(@Mocked SetStmt setStmt, @Mocked SqlParser parser, @Mocked S Assert.assertEquals(QueryState.MysqlStateType.OK, state.getStateType()); } + @Test + public void testStmtWithUserInfo(@Mocked StatementBase stmt, @Mocked ConnectContext context) throws Exception { + StmtExecutor stmtExecutor = new StmtExecutor(ctx, stmt); + Deencapsulation.setField(stmtExecutor, "parsedStmt", null); + Deencapsulation.setField(stmtExecutor, "originStmt", new OriginStatement("show databases;", 1)); + stmtExecutor.execute(); + StatementBase newstmt = (StatementBase)Deencapsulation.getField(stmtExecutor, "parsedStmt"); + Assert.assertTrue(newstmt.getUserInfo() != null); + } + @Test public void testSetFail(@Mocked SetStmt setStmt, @Mocked SqlParser parser, @Mocked SetExecutor executor) throws Exception { new Expectations() { From 74c37133c9007afa2a3d4af8130c63a633a43469 Mon Sep 17 00:00:00 2001 From: wangxixu Date: Tue, 10 Nov 2020 17:59:14 +0800 Subject: [PATCH 14/15] support udf --- .../java/org/apache/doris/qe/StmtExecutorTest.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java index 28d65669786203..37329c4fce59c6 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/qe/StmtExecutorTest.java @@ -17,8 +17,19 @@ package org.apache.doris.qe; -import org.apache.doris.analysis.*; +import org.apache.doris.analysis.AccessTestUtil; +import org.apache.doris.analysis.Analyzer; +import org.apache.doris.analysis.DdlStmt; +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.KillStmt; +import org.apache.doris.analysis.QueryStmt; +import org.apache.doris.analysis.RedirectStatus; +import org.apache.doris.analysis.SetStmt; +import org.apache.doris.analysis.ShowAuthorStmt; +import org.apache.doris.analysis.ShowStmt; import org.apache.doris.analysis.SqlParser; +import org.apache.doris.analysis.StatementBase; +import org.apache.doris.analysis.UseStmt; import org.apache.doris.catalog.Catalog; import org.apache.doris.common.DdlException; import org.apache.doris.common.jmockit.Deencapsulation; From e3d94f4ddfe1dba02372483a4666c482d4c79b2a Mon Sep 17 00:00:00 2001 From: wangxixu Date: Wed, 11 Nov 2020 14:05:37 +0800 Subject: [PATCH 15/15] update --- .../main/java/org/apache/doris/common/FeMetaVersion.java | 4 +++- .../java/org/apache/doris/load/loadv2/BulkLoadJob.java | 7 ++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java index 0eaf61ec649562..e3ea0901aa8a3b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java +++ b/fe/fe-core/src/main/java/org/apache/doris/common/FeMetaVersion.java @@ -196,6 +196,8 @@ public final class FeMetaVersion { public static final int VERSION_91 = 91; // for mysql external table support resource public static final int VERSION_92 = 92; + //jira: 4863 for load job support udf + public static final int VERSION_93 = 93; // note: when increment meta version, should assign the latest version to VERSION_CURRENT - public static final int VERSION_CURRENT = VERSION_92; + public static final int VERSION_CURRENT = VERSION_93; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java index b8ee0cf578fdc7..d9edca1b67a084 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/BulkLoadJob.java @@ -329,7 +329,12 @@ public void readFields(DataInput in) throws IOException { // The origin stmt does not be analyzed in here. // The reason is that it will thrown MetaNotFoundException when the tableId could not be found by tableName. // The origin stmt will be analyzed after the replay is completed. - userInfo = UserIdentity.read(in); + + if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_93) { + userInfo = UserIdentity.read(in); + } else { + userInfo = new UserIdentity("",""); + } if (Catalog.getCurrentCatalogJournalVersion() >= FeMetaVersion.VERSION_66) { int size = in.readInt(); for (int i = 0; i < size; i++) {