From 03ef19d823bb5324025db498e7edbd071f09f2e9 Mon Sep 17 00:00:00 2001 From: xiepengcheng01 <798789323@qq.com> Date: Wed, 11 May 2022 19:34:57 +0800 Subject: [PATCH 1/5] add vpre_filter_expr for vectorized to improve performance --- be/src/exec/base_scanner.cpp | 47 ++++++++++--------- be/src/exec/base_scanner.h | 8 +++- be/src/exec/broker_scan_node.cpp | 6 ++- be/src/exec/broker_scan_node.h | 2 + be/src/exec/broker_scanner.cpp | 21 +++++++++ be/src/exec/broker_scanner.h | 8 ++++ be/src/vec/exec/vbroker_scanner.cpp | 4 +- be/src/vec/exec/vbroker_scanner.h | 2 +- be/test/vec/exec/vbroker_scanner_test.cpp | 2 +- .../apache/doris/planner/LoadScanNode.java | 5 ++ .../org/apache/doris/planner/PlanNode.java | 7 +++ gensrc/thrift/PlanNodes.thrift | 1 + 12 files changed, 84 insertions(+), 29 deletions(-) diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index e06b52de2fe5ac..570929fd883a0d 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -66,6 +66,13 @@ BaseScanner::BaseScanner(RuntimeState* state, RuntimeProfile* profile, _scanner_eof(false) { } +BaseScanner::BaseScanner(RuntimeState* state, RuntimeProfile* profile, + const TBrokerScanRangeParams& params, const TExpr& vpre_filter_texpr, + ScannerCounter* counter) + : BaseScanner(state, profile, params, std::vector(), counter) { + _vpre_filter_texpr = vpre_filter_texpr; +} + Status BaseScanner::open() { RETURN_IF_ERROR(init_expr_ctxes()); if (_params.__isset.strict_mode) { @@ -129,18 +136,18 @@ Status BaseScanner::init_expr_ctxes() { // preceding filter expr should be initialized by using `_row_desc`, which is the source row descriptor if (!_pre_filter_texprs.empty()) { - if (_state->enable_vectorized_exec()) { - RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees( - _state->obj_pool(), _pre_filter_texprs, &_vpre_filter_ctxs)); - RETURN_IF_ERROR(vectorized::VExpr::prepare(_vpre_filter_ctxs, _state, *_row_desc, - _mem_tracker)); - RETURN_IF_ERROR(vectorized::VExpr::open(_vpre_filter_ctxs, _state)); - } else { - RETURN_IF_ERROR(Expr::create_expr_trees(_state->obj_pool(), _pre_filter_texprs, - &_pre_filter_ctxs)); - RETURN_IF_ERROR(Expr::prepare(_pre_filter_ctxs, _state, *_row_desc, _mem_tracker)); - RETURN_IF_ERROR(Expr::open(_pre_filter_ctxs, _state)); - } + RETURN_IF_ERROR( + Expr::create_expr_trees(_state->obj_pool(), _pre_filter_texprs, &_pre_filter_ctxs)); + RETURN_IF_ERROR(Expr::prepare(_pre_filter_ctxs, _state, *_row_desc, _mem_tracker)); + RETURN_IF_ERROR(Expr::open(_pre_filter_ctxs, _state)); + } + + if (_state->enable_vectorized_exec() && !_vpre_filter_texpr.nodes.empty()) { + _vpre_filter_ctx_ptr.reset(new doris::vectorized::VExprContext*); + RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(_state->obj_pool(), _vpre_filter_texpr, + _vpre_filter_ctx_ptr.get())); + RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->prepare(_state, *_row_desc, _mem_tracker)); + RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->open(_state)); } // Construct dest slots information @@ -302,14 +309,10 @@ Status BaseScanner::_fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) { Status BaseScanner::_filter_src_block() { auto origin_column_num = _src_block.columns(); // filter block - if (!_vpre_filter_ctxs.empty()) { - for (auto _vpre_filter_ctx : _vpre_filter_ctxs) { - auto old_rows = _src_block.rows(); - RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_vpre_filter_ctx, &_src_block, - origin_column_num)); - _counter->num_rows_unselected += old_rows - _src_block.rows(); - } - } + auto old_rows = _src_block.rows(); + RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_vpre_filter_ctx_ptr, &_src_block, + origin_column_num)); + _counter->num_rows_unselected += old_rows - _src_block.rows(); return Status::OK(); } @@ -453,8 +456,8 @@ void BaseScanner::close() { Expr::close(_pre_filter_ctxs, _state); } - if (_state->enable_vectorized_exec() && !_vpre_filter_ctxs.empty()) { - vectorized::VExpr::close(_vpre_filter_ctxs, _state); + if (_vpre_filter_ctx_ptr) { + (*_vpre_filter_ctx_ptr)->close(_state); } } diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h index 1c2ce211b5709f..da253cf4daeff3 100644 --- a/be/src/exec/base_scanner.h +++ b/be/src/exec/base_scanner.h @@ -57,12 +57,15 @@ class BaseScanner { const std::vector& broker_addresses, const std::vector& pre_filter_texprs, ScannerCounter* counter); + BaseScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params, + const TExpr& vpre_filter_texpr, ScannerCounter* counter); + virtual ~BaseScanner() { Expr::close(_dest_expr_ctx, _state); if (_state->enable_vectorized_exec()) { vectorized::VExpr::close(_dest_vexpr_ctx, _state); } - }; + } virtual Status init_expr_ctxes(); // Open this scanner, will initialize information need to @@ -138,7 +141,8 @@ class BaseScanner { // for vectorized load std::vector _dest_vexpr_ctx; - std::vector _vpre_filter_ctxs; + TExpr _vpre_filter_texpr; + std::unique_ptr _vpre_filter_ctx_ptr; vectorized::Block _src_block; int _num_of_columns_from_file; diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp index f2f3742cf89795..90517aa6cd2ee4 100644 --- a/be/src/exec/broker_scan_node.cpp +++ b/be/src/exec/broker_scan_node.cpp @@ -57,6 +57,10 @@ Status BrokerScanNode::init(const TPlanNode& tnode, RuntimeState* state) { _pre_filter_texprs = broker_scan_node.pre_filter_exprs; } + if (broker_scan_node.__isset.vpre_filter_expr) { + _vpre_filter_texpr = broker_scan_node.vpre_filter_expr; + } + return Status::OK(); } @@ -256,7 +260,7 @@ std::unique_ptr BrokerScanNode::create_scanner(const TBrokerScanRan if (_vectorized) { scan = new vectorized::VBrokerScanner( _runtime_state, runtime_profile(), scan_range.params, scan_range.ranges, - scan_range.broker_addresses, _pre_filter_texprs, counter); + scan_range.broker_addresses, _vpre_filter_texpr, counter); } else { scan = new BrokerScanner(_runtime_state, runtime_profile(), scan_range.params, scan_range.ranges, scan_range.broker_addresses, diff --git a/be/src/exec/broker_scan_node.h b/be/src/exec/broker_scan_node.h index e8fb99b24be051..c4e24f1ffb72a8 100644 --- a/be/src/exec/broker_scan_node.h +++ b/be/src/exec/broker_scan_node.h @@ -122,6 +122,8 @@ class BrokerScanNode : public ScanNode { // Because the row descriptor used for these exprs is `src_row_desc`, // which is initialized in XXXScanner. std::vector _pre_filter_texprs; + // for vectorized + TExpr _vpre_filter_texpr; RuntimeProfile::Counter* _wait_scanner_timer; }; diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index 2c21fd3d543f6b..e9ec9d9decc4eb 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -54,6 +54,27 @@ BrokerScanner::BrokerScanner(RuntimeState* state, RuntimeProfile* profile, _cur_decompressor(nullptr), _cur_line_reader_eof(false), _skip_lines(0) { + init(params); +} + +BrokerScanner::BrokerScanner(RuntimeState* state, RuntimeProfile* profile, + const TBrokerScanRangeParams& params, + const std::vector& ranges, + const std::vector& broker_addresses, + const TExpr& vpre_filter_texpr, ScannerCounter* counter) + : BaseScanner(state, profile, params, vpre_filter_texpr, counter), + _ranges(ranges), + _broker_addresses(broker_addresses), + _cur_file_reader(nullptr), + _cur_line_reader(nullptr), + _cur_decompressor(nullptr), + _next_range(0), + _cur_line_reader_eof(false), + _skip_lines(0) { + init(params); +} + +void BrokerScanner::init(const TBrokerScanRangeParams& params) { if (params.__isset.column_separator_length && params.column_separator_length > 1) { _value_separator = params.column_separator_str; _value_separator_length = params.column_separator_length; diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h index f10ce6851866c4..e067abbb89cc1b 100644 --- a/be/src/exec/broker_scanner.h +++ b/be/src/exec/broker_scanner.h @@ -56,8 +56,16 @@ class BrokerScanner : public BaseScanner { const TBrokerScanRangeParams& params, const std::vector& ranges, const std::vector& broker_addresses, const std::vector& pre_filter_texprs, ScannerCounter* counter); + + BrokerScanner(RuntimeState* state, RuntimeProfile* profile, + const TBrokerScanRangeParams& params, const std::vector& ranges, + const std::vector& broker_addresses, + const TExpr& vpre_filter_texprs, ScannerCounter* counter); + ~BrokerScanner() override; + void init(const TBrokerScanRangeParams& params); + // Open this scanner, will initialize information need to Status open() override; diff --git a/be/src/vec/exec/vbroker_scanner.cpp b/be/src/vec/exec/vbroker_scanner.cpp index 3006dba78854de..e77737c16dd161 100644 --- a/be/src/vec/exec/vbroker_scanner.cpp +++ b/be/src/vec/exec/vbroker_scanner.cpp @@ -38,8 +38,8 @@ VBrokerScanner::VBrokerScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params, const std::vector& ranges, const std::vector& broker_addresses, - const std::vector& pre_filter_texprs, ScannerCounter* counter) - : BrokerScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, + const TExpr& vpre_filter_texpr, ScannerCounter* counter) + : BrokerScanner(state, profile, params, ranges, broker_addresses, vpre_filter_texpr, counter) { _text_converter.reset(new (std::nothrow) TextConverter('\\')); } diff --git a/be/src/vec/exec/vbroker_scanner.h b/be/src/vec/exec/vbroker_scanner.h index 11d8b494fadbe9..02ee3442d378bd 100644 --- a/be/src/vec/exec/vbroker_scanner.h +++ b/be/src/vec/exec/vbroker_scanner.h @@ -26,7 +26,7 @@ class VBrokerScanner final : public BrokerScanner { const TBrokerScanRangeParams& params, const std::vector& ranges, const std::vector& broker_addresses, - const std::vector& pre_filter_texprs, ScannerCounter* counter); + const TExpr& vpre_filter_texpr, ScannerCounter* counter); ~VBrokerScanner(); virtual Status get_next(doris::Tuple* tuple, MemPool* tuple_pool, bool* eof, diff --git a/be/test/vec/exec/vbroker_scanner_test.cpp b/be/test/vec/exec/vbroker_scanner_test.cpp index 713aefc4a7a58f..a76a474da18c9d 100644 --- a/be/test/vec/exec/vbroker_scanner_test.cpp +++ b/be/test/vec/exec/vbroker_scanner_test.cpp @@ -74,7 +74,7 @@ class VBrokerScannerTest : public testing::Test { DescriptorTbl* _desc_tbl; std::vector _addresses; ScannerCounter _counter; - std::vector _pre_filter; + const TExpr _pre_filter; }; void VBrokerScannerTest::init_desc_table() { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java index 9ca69c819cdde0..3608707b90a11c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java @@ -217,6 +217,11 @@ protected void toThrift(TPlanNode planNode) { brokerScanNode.addToPreFilterExprs(e.treeToThrift()); } } + + if (vpreFilterConjunct != null) { + brokerScanNode.setVpreFilterExpr(vpreFilterConjunct.treeToThrift()); + } + planNode.setBrokerScanNode(brokerScanNode); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index d43747ed11479e..f1bef3089673a1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -109,6 +109,8 @@ abstract public class PlanNode extends TreeNode { // 4. Filter data by using "conjuncts". protected List preFilterConjuncts = Lists.newArrayList(); + protected Expr vpreFilterConjunct = null; + // Fragment that this PlanNode is executed in. Valid only after this PlanNode has been // assigned to a fragment. Set and maintained by enclosing PlanFragment. protected PlanFragment fragment; @@ -904,6 +906,11 @@ public void convertToVectoriezd() { initCompoundPredicate(vconjunct); } + if (!preFilterConjuncts.isEmpty()) { + vpreFilterConjunct = convertConjunctsToAndCompoundPredicate(preFilterConjuncts); + initCompoundPredicate(vpreFilterConjunct); + } + for (PlanNode child : children) { child.convertToVectoriezd(); } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 94168a90743727..49d2fa4e276c7d 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -253,6 +253,7 @@ struct TBrokerScanNode { 2: optional list partition_exprs 3: optional list partition_infos 4: optional list pre_filter_exprs + 5: optional Exprs.TExpr vpre_filter_expr } struct TEsScanNode { From 3aa56490379ff1a8d248a9d0c55ef1daa01cfb40 Mon Sep 17 00:00:00 2001 From: xiepengcheng01 <798789323@qq.com> Date: Thu, 12 May 2022 17:27:55 +0800 Subject: [PATCH 2/5] add ut and format code --- be/test/vec/exec/vbroker_scanner_test.cpp | 110 +++++++++++++++++++--- gensrc/thrift/PlanNodes.thrift | 3 +- 2 files changed, 98 insertions(+), 15 deletions(-) diff --git a/be/test/vec/exec/vbroker_scanner_test.cpp b/be/test/vec/exec/vbroker_scanner_test.cpp index a76a474da18c9d..50d74a335f5a11 100644 --- a/be/test/vec/exec/vbroker_scanner_test.cpp +++ b/be/test/vec/exec/vbroker_scanner_test.cpp @@ -74,7 +74,7 @@ class VBrokerScannerTest : public testing::Test { DescriptorTbl* _desc_tbl; std::vector _addresses; ScannerCounter _counter; - const TExpr _pre_filter; + TExpr _vpre_filter; }; void VBrokerScannerTest::init_desc_table() { @@ -343,6 +343,51 @@ void VBrokerScannerTest::init_params() { _params.expr_of_dest_slot.emplace(i + 1, expr); _params.src_slot_ids.push_back(4 + i); } + + // init pre_filter expr: k1 < '8' + TExpr filter_expr; + { + TExprNode expr_node; + expr_node.__set_node_type(TExprNodeType::BINARY_PRED); + expr_node.type = gen_type_desc(TPrimitiveType::BOOLEAN); + expr_node.__set_num_children(2); + expr_node.__isset.opcode = true; + expr_node.__set_opcode(TExprOpcode::LT); + expr_node.__isset.vector_opcode = true; + expr_node.__set_vector_opcode(TExprOpcode::LT); + expr_node.__isset.fn = true; + expr_node.fn.name.function_name = "lt"; + expr_node.fn.binary_type = TFunctionBinaryType::BUILTIN; + expr_node.fn.ret_type = int_type; + expr_node.fn.has_var_args = false; + filter_expr.nodes.push_back(expr_node); + } + { + TExprNode expr_node; + expr_node.__set_node_type(TExprNodeType::SLOT_REF); + expr_node.type = varchar_type; + expr_node.__set_num_children(0); + expr_node.__isset.slot_ref = true; + TSlotRef slot_ref; + slot_ref.__set_slot_id(4); + slot_ref.__set_tuple_id(1); + expr_node.__set_slot_ref(slot_ref); + expr_node.__isset.output_column = true; + expr_node.__set_output_column(0); + filter_expr.nodes.push_back(expr_node); + } + { + TExprNode expr_node; + expr_node.__set_node_type(TExprNodeType::STRING_LITERAL); + expr_node.type = varchar_type; + expr_node.__set_num_children(0); + expr_node.__isset.string_literal = true; + TStringLiteral string_literal; + string_literal.__set_value("8"); + expr_node.__set_string_literal(string_literal); + filter_expr.nodes.push_back(expr_node); + } + _vpre_filter = filter_expr; // _params.__isset.expr_of_dest_slot = true; _params.__set_dest_tuple_id(_dst_tuple_id); _params.__set_src_tuple_id(_src_tuple_id); @@ -363,18 +408,55 @@ TEST_F(VBrokerScannerTest, normal) { range.file_type = TFileType::FILE_LOCAL; range.format_type = TFileFormatType::FORMAT_CSV_PLAIN; ranges.push_back(range); + TExpr expr; + VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, expr, &_counter); + auto st = scanner.open(); + ASSERT_TRUE(st.ok()); - VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, _pre_filter, + vectorized::Block block; + bool eof = false; + st = scanner.get_next(&block, &eof); + ASSERT_TRUE(st.ok()); + ASSERT_TRUE(eof); + auto columns = block.get_columns_with_type_and_name(); + ASSERT_EQ(columns.size(), 3); + ASSERT_EQ(columns[0].to_string(0), "1"); + ASSERT_EQ(columns[0].to_string(1), "4"); + ASSERT_EQ(columns[0].to_string(2), "8"); + + ASSERT_EQ(columns[1].to_string(0), "2"); + ASSERT_EQ(columns[1].to_string(1), "5"); + ASSERT_EQ(columns[1].to_string(2), "9"); + + ASSERT_EQ(columns[2].to_string(0), "3"); + ASSERT_EQ(columns[2].to_string(1), "6"); + ASSERT_EQ(columns[2].to_string(2), "10"); +} + +TEST_F(VBrokerScannerTest, normal_with_pre_filter) { + std::vector ranges; + TBrokerRangeDesc range; + range.path = "./be/test/exec/test_data/broker_scanner/normal.csv"; + range.start_offset = 0; + range.size = -1; + range.splittable = true; + range.file_type = TFileType::FILE_LOCAL; + range.format_type = TFileFormatType::FORMAT_CSV_PLAIN; + ranges.push_back(range); + // pre_filter expr: k1 < '8' + VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, _vpre_filter, &_counter); auto st = scanner.open(); ASSERT_TRUE(st.ok()); - std::unique_ptr block(new vectorized::Block()); + vectorized::Block block; bool eof = false; - st = scanner.get_next(block.get(), &eof); + // end of file + st = scanner.get_next(&block, &eof); ASSERT_TRUE(st.ok()); ASSERT_TRUE(eof); - auto columns = block->get_columns(); + ASSERT_EQ(2, block.rows()); + auto columns = block.get_columns_with_type_and_name(); ASSERT_EQ(columns.size(), 3); ASSERT_EQ(columns[0]->get_int(0), 1); @@ -406,19 +488,20 @@ TEST_F(VBrokerScannerTest, normal2) { range.start_offset = 0; range.size = 4; ranges.push_back(range); - - VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, _pre_filter, - &_counter); + TExpr expr; + VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, expr, &_counter); auto st = scanner.open(); ASSERT_TRUE(st.ok()); - std::unique_ptr block(new vectorized::Block()); + vectorized::Block block; bool eof = false; - st = scanner.get_next(block.get(), &eof); + st = scanner.get_next(&block, &eof); ASSERT_TRUE(st.ok()); ASSERT_TRUE(eof); - auto columns = block->get_columns(); + auto columns = block.get_columns_with_type_and_name(); ASSERT_EQ(columns.size(), 3); + ASSERT_EQ(columns[0].to_string(0), "1"); + ASSERT_EQ(columns[0].to_string(1), "3"); ASSERT_EQ(columns[0]->get_int(0), 1); ASSERT_EQ(columns[0]->get_int(1), 3); @@ -440,9 +523,8 @@ TEST_F(VBrokerScannerTest, normal5) { range.file_type = TFileType::FILE_LOCAL; range.format_type = TFileFormatType::FORMAT_CSV_PLAIN; ranges.push_back(range); - - VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, _pre_filter, - &_counter); + TExpr expr; + VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, expr, &_counter); auto st = scanner.open(); ASSERT_TRUE(st.ok()); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 49d2fa4e276c7d..539f0de518273e 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -252,7 +252,8 @@ struct TBrokerScanNode { // Partition info used to process partition select in broker load 2: optional list partition_exprs 3: optional list partition_infos - 4: optional list pre_filter_exprs + 4: optional list pre_filter_exprs + // Use in vec exec engine 5: optional Exprs.TExpr vpre_filter_expr } From 81b6e0323f7493f61f8eb6c2c01b53425e185ee6 Mon Sep 17 00:00:00 2001 From: xiepengcheng01 <798789323@qq.com> Date: Tue, 17 May 2022 17:39:58 +0800 Subject: [PATCH 3/5] update --- be/src/exec/broker_scan_node.cpp | 4 ++-- be/src/exec/json_scanner.cpp | 21 +++++++++++++++++++++ be/src/exec/json_scanner.h | 8 ++++++++ be/src/exec/parquet_scanner.cpp | 12 ++++++++++++ be/src/exec/parquet_scanner.h | 6 ++++++ be/src/vec/exec/vjson_scanner.cpp | 4 ++-- be/src/vec/exec/vjson_scanner.h | 2 +- be/src/vec/exec/vparquet_scanner.cpp | 5 ++--- be/src/vec/exec/vparquet_scanner.h | 2 +- 9 files changed, 55 insertions(+), 9 deletions(-) diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp index 90517aa6cd2ee4..de0ae3a33a8a93 100644 --- a/be/src/exec/broker_scan_node.cpp +++ b/be/src/exec/broker_scan_node.cpp @@ -233,7 +233,7 @@ std::unique_ptr BrokerScanNode::create_scanner(const TBrokerScanRan if (_vectorized) { scan = new vectorized::VParquetScanner( _runtime_state, runtime_profile(), scan_range.params, scan_range.ranges, - scan_range.broker_addresses, _pre_filter_texprs, counter); + scan_range.broker_addresses, _vpre_filter_texpr, counter); } else { scan = new ParquetScanner(_runtime_state, runtime_profile(), scan_range.params, scan_range.ranges, scan_range.broker_addresses, @@ -249,7 +249,7 @@ std::unique_ptr BrokerScanNode::create_scanner(const TBrokerScanRan if (_vectorized) { scan = new vectorized::VJsonScanner( _runtime_state, runtime_profile(), scan_range.params, scan_range.ranges, - scan_range.broker_addresses, _pre_filter_texprs, counter); + scan_range.broker_addresses, _vpre_filter_texpr, counter); } else { scan = new JsonScanner(_runtime_state, runtime_profile(), scan_range.params, scan_range.ranges, scan_range.broker_addresses, diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp index 0be3d4c089eb5b..8a537d9750934f 100644 --- a/be/src/exec/json_scanner.cpp +++ b/be/src/exec/json_scanner.cpp @@ -46,6 +46,27 @@ JsonScanner::JsonScanner(RuntimeState* state, RuntimeProfile* profile, _cur_json_reader(nullptr), _cur_reader_eof(false), _read_json_by_line(false) { + init_line_delimiter(params); +} + +JsonScanner::JsonScanner(RuntimeState* state, RuntimeProfile* profile, + const TBrokerScanRangeParams& params, + const std::vector& ranges, + const std::vector& broker_addresses, + const TExpr& vpre_filter_texpr, ScannerCounter* counter) + : BaseScanner(state, profile, params, vpre_filter_texpr, counter), + _ranges(ranges), + _broker_addresses(broker_addresses), + _cur_file_reader(nullptr), + _cur_line_reader(nullptr), + _cur_json_reader(nullptr), + _next_range(0), + _cur_reader_eof(false), + _read_json_by_line(false) { + init_line_delimiter(params); +} + +void JsonScanner::init_line_delimiter(const TBrokerScanRangeParams& params) { if (params.__isset.line_delimiter_length && params.line_delimiter_length > 1) { _line_delimiter = params.line_delimiter_str; _line_delimiter_length = params.line_delimiter_length; diff --git a/be/src/exec/json_scanner.h b/be/src/exec/json_scanner.h index ab2f479e60999c..d8f207380b265b 100644 --- a/be/src/exec/json_scanner.h +++ b/be/src/exec/json_scanner.h @@ -56,8 +56,16 @@ class JsonScanner : public BaseScanner { const std::vector& ranges, const std::vector& broker_addresses, const std::vector& pre_filter_texprs, ScannerCounter* counter); + + JsonScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params, + const std::vector& ranges, + const std::vector& broker_addresses, + const TExpr& vpre_filter_texpr, ScannerCounter* counter); + ~JsonScanner(); + void init_line_delimiter(const TBrokerScanRangeParams& params); + // Open this scanner, will initialize information needed Status open() override; diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp index c6cb02e8c285e8..d5bd68917bad2f 100644 --- a/be/src/exec/parquet_scanner.cpp +++ b/be/src/exec/parquet_scanner.cpp @@ -43,6 +43,18 @@ ParquetScanner::ParquetScanner(RuntimeState* state, RuntimeProfile* profile, _cur_file_reader(nullptr), _cur_file_eof(false) {} +ParquetScanner::ParquetScanner(RuntimeState* state, RuntimeProfile* profile, + const TBrokerScanRangeParams& params, + const std::vector& ranges, + const std::vector& broker_addresses, + const TExpr& vpre_filter_texpr, ScannerCounter* counter) + : BaseScanner(state, profile, params, vpre_filter_texpr, counter), + _ranges(ranges), + _broker_addresses(broker_addresses), + _cur_file_reader(nullptr), + _next_range(0), + _cur_file_eof(false) {} + ParquetScanner::~ParquetScanner() { close(); } diff --git a/be/src/exec/parquet_scanner.h b/be/src/exec/parquet_scanner.h index d66802dd9504de..babfa2a6e27115 100644 --- a/be/src/exec/parquet_scanner.h +++ b/be/src/exec/parquet_scanner.h @@ -54,6 +54,12 @@ class ParquetScanner : public BaseScanner { const std::vector& broker_addresses, const std::vector& pre_filter_texprs, ScannerCounter* counter); + ParquetScanner(RuntimeState* state, RuntimeProfile* profile, + const TBrokerScanRangeParams& params, + const std::vector& ranges, + const std::vector& broker_addresses, + const TExpr& vpre_filter_texpr, ScannerCounter* counter); + ~ParquetScanner() override; // Open this scanner, will initialize information need to diff --git a/be/src/vec/exec/vjson_scanner.cpp b/be/src/vec/exec/vjson_scanner.cpp index e456e6dfc89713..84a742db41fcf0 100644 --- a/be/src/vec/exec/vjson_scanner.cpp +++ b/be/src/vec/exec/vjson_scanner.cpp @@ -40,8 +40,8 @@ VJsonScanner::VJsonScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params, const std::vector& ranges, const std::vector& broker_addresses, - const std::vector& pre_filter_texprs, ScannerCounter* counter) - : JsonScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter), + const TExpr& vpre_filter_texpr, ScannerCounter* counter) + : JsonScanner(state, profile, params, ranges, broker_addresses, vpre_filter_texpr, counter), _cur_vjson_reader(nullptr) {} Status VJsonScanner::get_next(vectorized::Block* output_block, bool* eof) { diff --git a/be/src/vec/exec/vjson_scanner.h b/be/src/vec/exec/vjson_scanner.h index 591ddc2ac24d01..ff3bb12ce447dc 100644 --- a/be/src/vec/exec/vjson_scanner.h +++ b/be/src/vec/exec/vjson_scanner.h @@ -46,7 +46,7 @@ class VJsonScanner : public JsonScanner { VJsonScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params, const std::vector& ranges, const std::vector& broker_addresses, - const std::vector& pre_filter_texprs, ScannerCounter* counter); + const TExpr& vpre_filter_texpr, ScannerCounter* counter); Status get_next(doris::Tuple* tuple, MemPool* tuple_pool, bool* eof, bool* fill_tuple) override { diff --git a/be/src/vec/exec/vparquet_scanner.cpp b/be/src/vec/exec/vparquet_scanner.cpp index 037bc1502804a7..0b9254628719f8 100644 --- a/be/src/vec/exec/vparquet_scanner.cpp +++ b/be/src/vec/exec/vparquet_scanner.cpp @@ -31,9 +31,8 @@ VParquetScanner::VParquetScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params, const std::vector& ranges, const std::vector& broker_addresses, - const std::vector& pre_filter_texprs, - ScannerCounter* counter) - : ParquetScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, + const TExpr& vpre_filter_texpr, ScannerCounter* counter) + : ParquetScanner(state, profile, params, ranges, broker_addresses, vpre_filter_texpr, counter), _batch(nullptr), _arrow_batch_cur_idx(0) {} diff --git a/be/src/vec/exec/vparquet_scanner.h b/be/src/vec/exec/vparquet_scanner.h index 72ac2809896348..9ecf926fd9288a 100644 --- a/be/src/vec/exec/vparquet_scanner.h +++ b/be/src/vec/exec/vparquet_scanner.h @@ -42,7 +42,7 @@ class VParquetScanner : public ParquetScanner { const TBrokerScanRangeParams& params, const std::vector& ranges, const std::vector& broker_addresses, - const std::vector& pre_filter_texprs, ScannerCounter* counter); + const TExpr& vpre_filter_texpr, ScannerCounter* counter); ~VParquetScanner() override; From 23bb5bc6755d190839f8ee30518acbb0e7e44fab Mon Sep 17 00:00:00 2001 From: xiepengcheng01 <798789323@qq.com> Date: Fri, 20 May 2022 14:32:46 +0800 Subject: [PATCH 4/5] rebase code --- be/src/exec/base_scanner.cpp | 9 +++-- be/src/exec/base_scanner.h | 2 ++ be/src/exec/broker_scanner.cpp | 5 +-- be/src/exec/json_scanner.cpp | 5 +-- be/src/exec/parquet_scanner.cpp | 5 +-- be/test/vec/exec/vbroker_scanner_test.cpp | 42 ++++++++++------------- 6 files changed, 29 insertions(+), 39 deletions(-) diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index 570929fd883a0d..68436953e7b516 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -67,9 +67,12 @@ BaseScanner::BaseScanner(RuntimeState* state, RuntimeProfile* profile, } BaseScanner::BaseScanner(RuntimeState* state, RuntimeProfile* profile, - const TBrokerScanRangeParams& params, const TExpr& vpre_filter_texpr, - ScannerCounter* counter) - : BaseScanner(state, profile, params, std::vector(), counter) { + const TBrokerScanRangeParams& params, + const std::vector& ranges, + const std::vector& broker_addresses, + const TExpr& vpre_filter_texpr, ScannerCounter* counter) + : BaseScanner(state, profile, params, ranges, broker_addresses, std::vector(), + counter) { _vpre_filter_texpr = vpre_filter_texpr; } diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h index da253cf4daeff3..79d6cd3b90a9db 100644 --- a/be/src/exec/base_scanner.h +++ b/be/src/exec/base_scanner.h @@ -58,6 +58,8 @@ class BaseScanner { const std::vector& pre_filter_texprs, ScannerCounter* counter); BaseScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params, + const std::vector& ranges, + const std::vector& broker_addresses, const TExpr& vpre_filter_texpr, ScannerCounter* counter); virtual ~BaseScanner() { diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index e9ec9d9decc4eb..a84f9914843917 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -62,13 +62,10 @@ BrokerScanner::BrokerScanner(RuntimeState* state, RuntimeProfile* profile, const std::vector& ranges, const std::vector& broker_addresses, const TExpr& vpre_filter_texpr, ScannerCounter* counter) - : BaseScanner(state, profile, params, vpre_filter_texpr, counter), - _ranges(ranges), - _broker_addresses(broker_addresses), + : BaseScanner(state, profile, params, ranges, broker_addresses, vpre_filter_texpr, counter), _cur_file_reader(nullptr), _cur_line_reader(nullptr), _cur_decompressor(nullptr), - _next_range(0), _cur_line_reader_eof(false), _skip_lines(0) { init(params); diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp index 8a537d9750934f..b2e02fce5aa072 100644 --- a/be/src/exec/json_scanner.cpp +++ b/be/src/exec/json_scanner.cpp @@ -54,13 +54,10 @@ JsonScanner::JsonScanner(RuntimeState* state, RuntimeProfile* profile, const std::vector& ranges, const std::vector& broker_addresses, const TExpr& vpre_filter_texpr, ScannerCounter* counter) - : BaseScanner(state, profile, params, vpre_filter_texpr, counter), - _ranges(ranges), - _broker_addresses(broker_addresses), + : BaseScanner(state, profile, params, ranges, broker_addresses, vpre_filter_texpr, counter), _cur_file_reader(nullptr), _cur_line_reader(nullptr), _cur_json_reader(nullptr), - _next_range(0), _cur_reader_eof(false), _read_json_by_line(false) { init_line_delimiter(params); diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp index d5bd68917bad2f..842e54589cfa7d 100644 --- a/be/src/exec/parquet_scanner.cpp +++ b/be/src/exec/parquet_scanner.cpp @@ -48,11 +48,8 @@ ParquetScanner::ParquetScanner(RuntimeState* state, RuntimeProfile* profile, const std::vector& ranges, const std::vector& broker_addresses, const TExpr& vpre_filter_texpr, ScannerCounter* counter) - : BaseScanner(state, profile, params, vpre_filter_texpr, counter), - _ranges(ranges), - _broker_addresses(broker_addresses), + : BaseScanner(state, profile, params, ranges, broker_addresses, vpre_filter_texpr, counter), _cur_file_reader(nullptr), - _next_range(0), _cur_file_eof(false) {} ParquetScanner::~ParquetScanner() { diff --git a/be/test/vec/exec/vbroker_scanner_test.cpp b/be/test/vec/exec/vbroker_scanner_test.cpp index 50d74a335f5a11..281a44dfedb919 100644 --- a/be/test/vec/exec/vbroker_scanner_test.cpp +++ b/be/test/vec/exec/vbroker_scanner_test.cpp @@ -413,24 +413,24 @@ TEST_F(VBrokerScannerTest, normal) { auto st = scanner.open(); ASSERT_TRUE(st.ok()); - vectorized::Block block; + std::unique_ptr block(new vectorized::Block()); bool eof = false; - st = scanner.get_next(&block, &eof); + st = scanner.get_next(block.get(), &eof); ASSERT_TRUE(st.ok()); ASSERT_TRUE(eof); - auto columns = block.get_columns_with_type_and_name(); + auto columns = block->get_columns(); ASSERT_EQ(columns.size(), 3); - ASSERT_EQ(columns[0].to_string(0), "1"); - ASSERT_EQ(columns[0].to_string(1), "4"); - ASSERT_EQ(columns[0].to_string(2), "8"); + ASSERT_EQ(columns[0]->get_int(0), 1); + ASSERT_EQ(columns[0]->get_int(1), 4); + ASSERT_EQ(columns[0]->get_int(2), 8); - ASSERT_EQ(columns[1].to_string(0), "2"); - ASSERT_EQ(columns[1].to_string(1), "5"); - ASSERT_EQ(columns[1].to_string(2), "9"); + ASSERT_EQ(columns[1]->get_int(0), 2); + ASSERT_EQ(columns[1]->get_int(1), 5); + ASSERT_EQ(columns[1]->get_int(2), 9); - ASSERT_EQ(columns[2].to_string(0), "3"); - ASSERT_EQ(columns[2].to_string(1), "6"); - ASSERT_EQ(columns[2].to_string(2), "10"); + ASSERT_EQ(columns[2]->get_int(0), 3); + ASSERT_EQ(columns[2]->get_int(1), 6); + ASSERT_EQ(columns[2]->get_int(2), 10); } TEST_F(VBrokerScannerTest, normal_with_pre_filter) { @@ -449,27 +449,23 @@ TEST_F(VBrokerScannerTest, normal_with_pre_filter) { auto st = scanner.open(); ASSERT_TRUE(st.ok()); - vectorized::Block block; + std::unique_ptr block(new vectorized::Block()); bool eof = false; // end of file - st = scanner.get_next(&block, &eof); + st = scanner.get_next(block.get(), &eof); ASSERT_TRUE(st.ok()); ASSERT_TRUE(eof); - ASSERT_EQ(2, block.rows()); - auto columns = block.get_columns_with_type_and_name(); + auto columns = block->get_columns(); ASSERT_EQ(columns.size(), 3); ASSERT_EQ(columns[0]->get_int(0), 1); ASSERT_EQ(columns[0]->get_int(1), 4); - ASSERT_EQ(columns[0]->get_int(2), 8); ASSERT_EQ(columns[1]->get_int(0), 2); ASSERT_EQ(columns[1]->get_int(1), 5); - ASSERT_EQ(columns[1]->get_int(2), 9); ASSERT_EQ(columns[2]->get_int(0), 3); ASSERT_EQ(columns[2]->get_int(1), 6); - ASSERT_EQ(columns[2]->get_int(2), 10); } TEST_F(VBrokerScannerTest, normal2) { @@ -493,15 +489,13 @@ TEST_F(VBrokerScannerTest, normal2) { auto st = scanner.open(); ASSERT_TRUE(st.ok()); - vectorized::Block block; + std::unique_ptr block(new vectorized::Block()); bool eof = false; - st = scanner.get_next(&block, &eof); + st = scanner.get_next(block.get(), &eof); ASSERT_TRUE(st.ok()); ASSERT_TRUE(eof); - auto columns = block.get_columns_with_type_and_name(); + auto columns = block->get_columns(); ASSERT_EQ(columns.size(), 3); - ASSERT_EQ(columns[0].to_string(0), "1"); - ASSERT_EQ(columns[0].to_string(1), "3"); ASSERT_EQ(columns[0]->get_int(0), 1); ASSERT_EQ(columns[0]->get_int(1), 3); From 4ef9d1fd532e34ce77fe8d7a2d7fc985b1715e36 Mon Sep 17 00:00:00 2001 From: xiepengcheng01 <798789323@qq.com> Date: Sat, 21 May 2022 16:47:08 +0800 Subject: [PATCH 5/5] refactor code --- be/src/exec/base_scanner.cpp | 36 ++--- be/src/exec/base_scanner.h | 6 - be/src/exec/broker_scan_node.cpp | 10 +- be/src/exec/broker_scan_node.h | 2 - be/src/exec/broker_scanner.cpp | 18 --- be/src/exec/broker_scanner.h | 8 -- be/src/exec/json_scanner.cpp | 18 --- be/src/exec/json_scanner.h | 8 -- be/src/exec/parquet_scanner.cpp | 9 -- be/src/exec/parquet_scanner.h | 6 - be/src/vec/exec/vbroker_scanner.cpp | 4 +- be/src/vec/exec/vbroker_scanner.h | 2 +- be/src/vec/exec/vjson_scanner.cpp | 4 +- be/src/vec/exec/vjson_scanner.h | 2 +- be/src/vec/exec/vparquet_scanner.cpp | 5 +- be/src/vec/exec/vparquet_scanner.h | 2 +- be/test/vec/exec/vbroker_scanner_test.cpp | 127 ++++++++++-------- .../apache/doris/planner/LoadScanNode.java | 14 +- gensrc/thrift/PlanNodes.thrift | 2 - 19 files changed, 107 insertions(+), 176 deletions(-) diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index 68436953e7b516..005e64c703c7c2 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -66,16 +66,6 @@ BaseScanner::BaseScanner(RuntimeState* state, RuntimeProfile* profile, _scanner_eof(false) { } -BaseScanner::BaseScanner(RuntimeState* state, RuntimeProfile* profile, - const TBrokerScanRangeParams& params, - const std::vector& ranges, - const std::vector& broker_addresses, - const TExpr& vpre_filter_texpr, ScannerCounter* counter) - : BaseScanner(state, profile, params, ranges, broker_addresses, std::vector(), - counter) { - _vpre_filter_texpr = vpre_filter_texpr; -} - Status BaseScanner::open() { RETURN_IF_ERROR(init_expr_ctxes()); if (_params.__isset.strict_mode) { @@ -139,18 +129,20 @@ Status BaseScanner::init_expr_ctxes() { // preceding filter expr should be initialized by using `_row_desc`, which is the source row descriptor if (!_pre_filter_texprs.empty()) { - RETURN_IF_ERROR( - Expr::create_expr_trees(_state->obj_pool(), _pre_filter_texprs, &_pre_filter_ctxs)); - RETURN_IF_ERROR(Expr::prepare(_pre_filter_ctxs, _state, *_row_desc, _mem_tracker)); - RETURN_IF_ERROR(Expr::open(_pre_filter_ctxs, _state)); - } - - if (_state->enable_vectorized_exec() && !_vpre_filter_texpr.nodes.empty()) { - _vpre_filter_ctx_ptr.reset(new doris::vectorized::VExprContext*); - RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(_state->obj_pool(), _vpre_filter_texpr, - _vpre_filter_ctx_ptr.get())); - RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->prepare(_state, *_row_desc, _mem_tracker)); - RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->open(_state)); + if (_state->enable_vectorized_exec()) { + // for vectorized, preceding filter exprs should be compounded to one passed from fe. + DCHECK(_pre_filter_texprs.size() == 1); + _vpre_filter_ctx_ptr.reset(new doris::vectorized::VExprContext*); + RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree( + _state->obj_pool(), _pre_filter_texprs[0], _vpre_filter_ctx_ptr.get())); + RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->prepare(_state, *_row_desc, _mem_tracker)); + RETURN_IF_ERROR((*_vpre_filter_ctx_ptr)->open(_state)); + } else { + RETURN_IF_ERROR(Expr::create_expr_trees(_state->obj_pool(), _pre_filter_texprs, + &_pre_filter_ctxs)); + RETURN_IF_ERROR(Expr::prepare(_pre_filter_ctxs, _state, *_row_desc, _mem_tracker)); + RETURN_IF_ERROR(Expr::open(_pre_filter_ctxs, _state)); + } } // Construct dest slots information diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h index 79d6cd3b90a9db..fe3e088d4e607a 100644 --- a/be/src/exec/base_scanner.h +++ b/be/src/exec/base_scanner.h @@ -57,11 +57,6 @@ class BaseScanner { const std::vector& broker_addresses, const std::vector& pre_filter_texprs, ScannerCounter* counter); - BaseScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params, - const std::vector& ranges, - const std::vector& broker_addresses, - const TExpr& vpre_filter_texpr, ScannerCounter* counter); - virtual ~BaseScanner() { Expr::close(_dest_expr_ctx, _state); if (_state->enable_vectorized_exec()) { @@ -143,7 +138,6 @@ class BaseScanner { // for vectorized load std::vector _dest_vexpr_ctx; - TExpr _vpre_filter_texpr; std::unique_ptr _vpre_filter_ctx_ptr; vectorized::Block _src_block; int _num_of_columns_from_file; diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp index de0ae3a33a8a93..f2f3742cf89795 100644 --- a/be/src/exec/broker_scan_node.cpp +++ b/be/src/exec/broker_scan_node.cpp @@ -57,10 +57,6 @@ Status BrokerScanNode::init(const TPlanNode& tnode, RuntimeState* state) { _pre_filter_texprs = broker_scan_node.pre_filter_exprs; } - if (broker_scan_node.__isset.vpre_filter_expr) { - _vpre_filter_texpr = broker_scan_node.vpre_filter_expr; - } - return Status::OK(); } @@ -233,7 +229,7 @@ std::unique_ptr BrokerScanNode::create_scanner(const TBrokerScanRan if (_vectorized) { scan = new vectorized::VParquetScanner( _runtime_state, runtime_profile(), scan_range.params, scan_range.ranges, - scan_range.broker_addresses, _vpre_filter_texpr, counter); + scan_range.broker_addresses, _pre_filter_texprs, counter); } else { scan = new ParquetScanner(_runtime_state, runtime_profile(), scan_range.params, scan_range.ranges, scan_range.broker_addresses, @@ -249,7 +245,7 @@ std::unique_ptr BrokerScanNode::create_scanner(const TBrokerScanRan if (_vectorized) { scan = new vectorized::VJsonScanner( _runtime_state, runtime_profile(), scan_range.params, scan_range.ranges, - scan_range.broker_addresses, _vpre_filter_texpr, counter); + scan_range.broker_addresses, _pre_filter_texprs, counter); } else { scan = new JsonScanner(_runtime_state, runtime_profile(), scan_range.params, scan_range.ranges, scan_range.broker_addresses, @@ -260,7 +256,7 @@ std::unique_ptr BrokerScanNode::create_scanner(const TBrokerScanRan if (_vectorized) { scan = new vectorized::VBrokerScanner( _runtime_state, runtime_profile(), scan_range.params, scan_range.ranges, - scan_range.broker_addresses, _vpre_filter_texpr, counter); + scan_range.broker_addresses, _pre_filter_texprs, counter); } else { scan = new BrokerScanner(_runtime_state, runtime_profile(), scan_range.params, scan_range.ranges, scan_range.broker_addresses, diff --git a/be/src/exec/broker_scan_node.h b/be/src/exec/broker_scan_node.h index c4e24f1ffb72a8..e8fb99b24be051 100644 --- a/be/src/exec/broker_scan_node.h +++ b/be/src/exec/broker_scan_node.h @@ -122,8 +122,6 @@ class BrokerScanNode : public ScanNode { // Because the row descriptor used for these exprs is `src_row_desc`, // which is initialized in XXXScanner. std::vector _pre_filter_texprs; - // for vectorized - TExpr _vpre_filter_texpr; RuntimeProfile::Counter* _wait_scanner_timer; }; diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index a84f9914843917..2c21fd3d543f6b 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -54,24 +54,6 @@ BrokerScanner::BrokerScanner(RuntimeState* state, RuntimeProfile* profile, _cur_decompressor(nullptr), _cur_line_reader_eof(false), _skip_lines(0) { - init(params); -} - -BrokerScanner::BrokerScanner(RuntimeState* state, RuntimeProfile* profile, - const TBrokerScanRangeParams& params, - const std::vector& ranges, - const std::vector& broker_addresses, - const TExpr& vpre_filter_texpr, ScannerCounter* counter) - : BaseScanner(state, profile, params, ranges, broker_addresses, vpre_filter_texpr, counter), - _cur_file_reader(nullptr), - _cur_line_reader(nullptr), - _cur_decompressor(nullptr), - _cur_line_reader_eof(false), - _skip_lines(0) { - init(params); -} - -void BrokerScanner::init(const TBrokerScanRangeParams& params) { if (params.__isset.column_separator_length && params.column_separator_length > 1) { _value_separator = params.column_separator_str; _value_separator_length = params.column_separator_length; diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h index e067abbb89cc1b..f10ce6851866c4 100644 --- a/be/src/exec/broker_scanner.h +++ b/be/src/exec/broker_scanner.h @@ -56,16 +56,8 @@ class BrokerScanner : public BaseScanner { const TBrokerScanRangeParams& params, const std::vector& ranges, const std::vector& broker_addresses, const std::vector& pre_filter_texprs, ScannerCounter* counter); - - BrokerScanner(RuntimeState* state, RuntimeProfile* profile, - const TBrokerScanRangeParams& params, const std::vector& ranges, - const std::vector& broker_addresses, - const TExpr& vpre_filter_texprs, ScannerCounter* counter); - ~BrokerScanner() override; - void init(const TBrokerScanRangeParams& params); - // Open this scanner, will initialize information need to Status open() override; diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp index b2e02fce5aa072..0be3d4c089eb5b 100644 --- a/be/src/exec/json_scanner.cpp +++ b/be/src/exec/json_scanner.cpp @@ -46,24 +46,6 @@ JsonScanner::JsonScanner(RuntimeState* state, RuntimeProfile* profile, _cur_json_reader(nullptr), _cur_reader_eof(false), _read_json_by_line(false) { - init_line_delimiter(params); -} - -JsonScanner::JsonScanner(RuntimeState* state, RuntimeProfile* profile, - const TBrokerScanRangeParams& params, - const std::vector& ranges, - const std::vector& broker_addresses, - const TExpr& vpre_filter_texpr, ScannerCounter* counter) - : BaseScanner(state, profile, params, ranges, broker_addresses, vpre_filter_texpr, counter), - _cur_file_reader(nullptr), - _cur_line_reader(nullptr), - _cur_json_reader(nullptr), - _cur_reader_eof(false), - _read_json_by_line(false) { - init_line_delimiter(params); -} - -void JsonScanner::init_line_delimiter(const TBrokerScanRangeParams& params) { if (params.__isset.line_delimiter_length && params.line_delimiter_length > 1) { _line_delimiter = params.line_delimiter_str; _line_delimiter_length = params.line_delimiter_length; diff --git a/be/src/exec/json_scanner.h b/be/src/exec/json_scanner.h index d8f207380b265b..ab2f479e60999c 100644 --- a/be/src/exec/json_scanner.h +++ b/be/src/exec/json_scanner.h @@ -56,16 +56,8 @@ class JsonScanner : public BaseScanner { const std::vector& ranges, const std::vector& broker_addresses, const std::vector& pre_filter_texprs, ScannerCounter* counter); - - JsonScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params, - const std::vector& ranges, - const std::vector& broker_addresses, - const TExpr& vpre_filter_texpr, ScannerCounter* counter); - ~JsonScanner(); - void init_line_delimiter(const TBrokerScanRangeParams& params); - // Open this scanner, will initialize information needed Status open() override; diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp index 842e54589cfa7d..c6cb02e8c285e8 100644 --- a/be/src/exec/parquet_scanner.cpp +++ b/be/src/exec/parquet_scanner.cpp @@ -43,15 +43,6 @@ ParquetScanner::ParquetScanner(RuntimeState* state, RuntimeProfile* profile, _cur_file_reader(nullptr), _cur_file_eof(false) {} -ParquetScanner::ParquetScanner(RuntimeState* state, RuntimeProfile* profile, - const TBrokerScanRangeParams& params, - const std::vector& ranges, - const std::vector& broker_addresses, - const TExpr& vpre_filter_texpr, ScannerCounter* counter) - : BaseScanner(state, profile, params, ranges, broker_addresses, vpre_filter_texpr, counter), - _cur_file_reader(nullptr), - _cur_file_eof(false) {} - ParquetScanner::~ParquetScanner() { close(); } diff --git a/be/src/exec/parquet_scanner.h b/be/src/exec/parquet_scanner.h index babfa2a6e27115..d66802dd9504de 100644 --- a/be/src/exec/parquet_scanner.h +++ b/be/src/exec/parquet_scanner.h @@ -54,12 +54,6 @@ class ParquetScanner : public BaseScanner { const std::vector& broker_addresses, const std::vector& pre_filter_texprs, ScannerCounter* counter); - ParquetScanner(RuntimeState* state, RuntimeProfile* profile, - const TBrokerScanRangeParams& params, - const std::vector& ranges, - const std::vector& broker_addresses, - const TExpr& vpre_filter_texpr, ScannerCounter* counter); - ~ParquetScanner() override; // Open this scanner, will initialize information need to diff --git a/be/src/vec/exec/vbroker_scanner.cpp b/be/src/vec/exec/vbroker_scanner.cpp index e77737c16dd161..3006dba78854de 100644 --- a/be/src/vec/exec/vbroker_scanner.cpp +++ b/be/src/vec/exec/vbroker_scanner.cpp @@ -38,8 +38,8 @@ VBrokerScanner::VBrokerScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params, const std::vector& ranges, const std::vector& broker_addresses, - const TExpr& vpre_filter_texpr, ScannerCounter* counter) - : BrokerScanner(state, profile, params, ranges, broker_addresses, vpre_filter_texpr, + const std::vector& pre_filter_texprs, ScannerCounter* counter) + : BrokerScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter) { _text_converter.reset(new (std::nothrow) TextConverter('\\')); } diff --git a/be/src/vec/exec/vbroker_scanner.h b/be/src/vec/exec/vbroker_scanner.h index 02ee3442d378bd..11d8b494fadbe9 100644 --- a/be/src/vec/exec/vbroker_scanner.h +++ b/be/src/vec/exec/vbroker_scanner.h @@ -26,7 +26,7 @@ class VBrokerScanner final : public BrokerScanner { const TBrokerScanRangeParams& params, const std::vector& ranges, const std::vector& broker_addresses, - const TExpr& vpre_filter_texpr, ScannerCounter* counter); + const std::vector& pre_filter_texprs, ScannerCounter* counter); ~VBrokerScanner(); virtual Status get_next(doris::Tuple* tuple, MemPool* tuple_pool, bool* eof, diff --git a/be/src/vec/exec/vjson_scanner.cpp b/be/src/vec/exec/vjson_scanner.cpp index 84a742db41fcf0..e456e6dfc89713 100644 --- a/be/src/vec/exec/vjson_scanner.cpp +++ b/be/src/vec/exec/vjson_scanner.cpp @@ -40,8 +40,8 @@ VJsonScanner::VJsonScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params, const std::vector& ranges, const std::vector& broker_addresses, - const TExpr& vpre_filter_texpr, ScannerCounter* counter) - : JsonScanner(state, profile, params, ranges, broker_addresses, vpre_filter_texpr, counter), + const std::vector& pre_filter_texprs, ScannerCounter* counter) + : JsonScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter), _cur_vjson_reader(nullptr) {} Status VJsonScanner::get_next(vectorized::Block* output_block, bool* eof) { diff --git a/be/src/vec/exec/vjson_scanner.h b/be/src/vec/exec/vjson_scanner.h index ff3bb12ce447dc..591ddc2ac24d01 100644 --- a/be/src/vec/exec/vjson_scanner.h +++ b/be/src/vec/exec/vjson_scanner.h @@ -46,7 +46,7 @@ class VJsonScanner : public JsonScanner { VJsonScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params, const std::vector& ranges, const std::vector& broker_addresses, - const TExpr& vpre_filter_texpr, ScannerCounter* counter); + const std::vector& pre_filter_texprs, ScannerCounter* counter); Status get_next(doris::Tuple* tuple, MemPool* tuple_pool, bool* eof, bool* fill_tuple) override { diff --git a/be/src/vec/exec/vparquet_scanner.cpp b/be/src/vec/exec/vparquet_scanner.cpp index 0b9254628719f8..037bc1502804a7 100644 --- a/be/src/vec/exec/vparquet_scanner.cpp +++ b/be/src/vec/exec/vparquet_scanner.cpp @@ -31,8 +31,9 @@ VParquetScanner::VParquetScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params, const std::vector& ranges, const std::vector& broker_addresses, - const TExpr& vpre_filter_texpr, ScannerCounter* counter) - : ParquetScanner(state, profile, params, ranges, broker_addresses, vpre_filter_texpr, + const std::vector& pre_filter_texprs, + ScannerCounter* counter) + : ParquetScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter), _batch(nullptr), _arrow_batch_cur_idx(0) {} diff --git a/be/src/vec/exec/vparquet_scanner.h b/be/src/vec/exec/vparquet_scanner.h index 9ecf926fd9288a..72ac2809896348 100644 --- a/be/src/vec/exec/vparquet_scanner.h +++ b/be/src/vec/exec/vparquet_scanner.h @@ -42,7 +42,7 @@ class VParquetScanner : public ParquetScanner { const TBrokerScanRangeParams& params, const std::vector& ranges, const std::vector& broker_addresses, - const TExpr& vpre_filter_texpr, ScannerCounter* counter); + const std::vector& pre_filter_texprs, ScannerCounter* counter); ~VParquetScanner() override; diff --git a/be/test/vec/exec/vbroker_scanner_test.cpp b/be/test/vec/exec/vbroker_scanner_test.cpp index 281a44dfedb919..428d82343c56a9 100644 --- a/be/test/vec/exec/vbroker_scanner_test.cpp +++ b/be/test/vec/exec/vbroker_scanner_test.cpp @@ -74,7 +74,7 @@ class VBrokerScannerTest : public testing::Test { DescriptorTbl* _desc_tbl; std::vector _addresses; ScannerCounter _counter; - TExpr _vpre_filter; + std::vector _pre_filter; }; void VBrokerScannerTest::init_desc_table() { @@ -343,51 +343,6 @@ void VBrokerScannerTest::init_params() { _params.expr_of_dest_slot.emplace(i + 1, expr); _params.src_slot_ids.push_back(4 + i); } - - // init pre_filter expr: k1 < '8' - TExpr filter_expr; - { - TExprNode expr_node; - expr_node.__set_node_type(TExprNodeType::BINARY_PRED); - expr_node.type = gen_type_desc(TPrimitiveType::BOOLEAN); - expr_node.__set_num_children(2); - expr_node.__isset.opcode = true; - expr_node.__set_opcode(TExprOpcode::LT); - expr_node.__isset.vector_opcode = true; - expr_node.__set_vector_opcode(TExprOpcode::LT); - expr_node.__isset.fn = true; - expr_node.fn.name.function_name = "lt"; - expr_node.fn.binary_type = TFunctionBinaryType::BUILTIN; - expr_node.fn.ret_type = int_type; - expr_node.fn.has_var_args = false; - filter_expr.nodes.push_back(expr_node); - } - { - TExprNode expr_node; - expr_node.__set_node_type(TExprNodeType::SLOT_REF); - expr_node.type = varchar_type; - expr_node.__set_num_children(0); - expr_node.__isset.slot_ref = true; - TSlotRef slot_ref; - slot_ref.__set_slot_id(4); - slot_ref.__set_tuple_id(1); - expr_node.__set_slot_ref(slot_ref); - expr_node.__isset.output_column = true; - expr_node.__set_output_column(0); - filter_expr.nodes.push_back(expr_node); - } - { - TExprNode expr_node; - expr_node.__set_node_type(TExprNodeType::STRING_LITERAL); - expr_node.type = varchar_type; - expr_node.__set_num_children(0); - expr_node.__isset.string_literal = true; - TStringLiteral string_literal; - string_literal.__set_value("8"); - expr_node.__set_string_literal(string_literal); - filter_expr.nodes.push_back(expr_node); - } - _vpre_filter = filter_expr; // _params.__isset.expr_of_dest_slot = true; _params.__set_dest_tuple_id(_dst_tuple_id); _params.__set_src_tuple_id(_src_tuple_id); @@ -408,8 +363,8 @@ TEST_F(VBrokerScannerTest, normal) { range.file_type = TFileType::FILE_LOCAL; range.format_type = TFileFormatType::FORMAT_CSV_PLAIN; ranges.push_back(range); - TExpr expr; - VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, expr, &_counter); + VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, _pre_filter, + &_counter); auto st = scanner.open(); ASSERT_TRUE(st.ok()); @@ -443,8 +398,72 @@ TEST_F(VBrokerScannerTest, normal_with_pre_filter) { range.file_type = TFileType::FILE_LOCAL; range.format_type = TFileFormatType::FORMAT_CSV_PLAIN; ranges.push_back(range); - // pre_filter expr: k1 < '8' - VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, _vpre_filter, + + // init pre_filter expr: k1 < '8' + TTypeDesc int_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::INT); + node.__set_scalar_type(scalar_type); + int_type.types.push_back(node); + } + TTypeDesc varchar_type; + { + TTypeNode node; + node.__set_type(TTypeNodeType::SCALAR); + TScalarType scalar_type; + scalar_type.__set_type(TPrimitiveType::VARCHAR); + scalar_type.__set_len(5000); + node.__set_scalar_type(scalar_type); + varchar_type.types.push_back(node); + } + + TExpr filter_expr; + { + TExprNode expr_node; + expr_node.__set_node_type(TExprNodeType::BINARY_PRED); + expr_node.type = gen_type_desc(TPrimitiveType::BOOLEAN); + expr_node.__set_num_children(2); + expr_node.__isset.opcode = true; + expr_node.__set_opcode(TExprOpcode::LT); + expr_node.__isset.vector_opcode = true; + expr_node.__set_vector_opcode(TExprOpcode::LT); + expr_node.__isset.fn = true; + expr_node.fn.name.function_name = "lt"; + expr_node.fn.binary_type = TFunctionBinaryType::BUILTIN; + expr_node.fn.ret_type = int_type; + expr_node.fn.has_var_args = false; + filter_expr.nodes.push_back(expr_node); + } + { + TExprNode expr_node; + expr_node.__set_node_type(TExprNodeType::SLOT_REF); + expr_node.type = varchar_type; + expr_node.__set_num_children(0); + expr_node.__isset.slot_ref = true; + TSlotRef slot_ref; + slot_ref.__set_slot_id(4); + slot_ref.__set_tuple_id(1); + expr_node.__set_slot_ref(slot_ref); + expr_node.__isset.output_column = true; + expr_node.__set_output_column(0); + filter_expr.nodes.push_back(expr_node); + } + { + TExprNode expr_node; + expr_node.__set_node_type(TExprNodeType::STRING_LITERAL); + expr_node.type = varchar_type; + expr_node.__set_num_children(0); + expr_node.__isset.string_literal = true; + TStringLiteral string_literal; + string_literal.__set_value("8"); + expr_node.__set_string_literal(string_literal); + filter_expr.nodes.push_back(expr_node); + } + _pre_filter.push_back(filter_expr); + VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, _pre_filter, &_counter); auto st = scanner.open(); ASSERT_TRUE(st.ok()); @@ -484,8 +503,8 @@ TEST_F(VBrokerScannerTest, normal2) { range.start_offset = 0; range.size = 4; ranges.push_back(range); - TExpr expr; - VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, expr, &_counter); + VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, _pre_filter, + &_counter); auto st = scanner.open(); ASSERT_TRUE(st.ok()); @@ -517,8 +536,8 @@ TEST_F(VBrokerScannerTest, normal5) { range.file_type = TFileType::FILE_LOCAL; range.format_type = TFileFormatType::FORMAT_CSV_PLAIN; ranges.push_back(range); - TExpr expr; - VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, expr, &_counter); + VBrokerScanner scanner(&_runtime_state, _profile, _params, ranges, _addresses, _pre_filter, + &_counter); auto st = scanner.open(); ASSERT_TRUE(st.ok()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java index 3608707b90a11c..9f063a47ed2f0f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/LoadScanNode.java @@ -34,6 +34,7 @@ import org.apache.doris.catalog.PrimitiveType; import org.apache.doris.catalog.Type; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.UserException; import org.apache.doris.load.loadv2.LoadTask; import org.apache.doris.rewrite.ExprRewriter; @@ -213,15 +214,14 @@ protected void toThrift(TPlanNode planNode) { planNode.setNodeType(TPlanNodeType.BROKER_SCAN_NODE); TBrokerScanNode brokerScanNode = new TBrokerScanNode(desc.getId().asInt()); if (!preFilterConjuncts.isEmpty()) { - for (Expr e : preFilterConjuncts) { - brokerScanNode.addToPreFilterExprs(e.treeToThrift()); + if (Config.enable_vectorized_load && vpreFilterConjunct != null) { + brokerScanNode.addToPreFilterExprs(vpreFilterConjunct.treeToThrift()); + } else { + for (Expr e : preFilterConjuncts) { + brokerScanNode.addToPreFilterExprs(e.treeToThrift()); + } } } - - if (vpreFilterConjunct != null) { - brokerScanNode.setVpreFilterExpr(vpreFilterConjunct.treeToThrift()); - } - planNode.setBrokerScanNode(brokerScanNode); } } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 539f0de518273e..d4d37e11ac554c 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -253,8 +253,6 @@ struct TBrokerScanNode { 2: optional list partition_exprs 3: optional list partition_infos 4: optional list pre_filter_exprs - // Use in vec exec engine - 5: optional Exprs.TExpr vpre_filter_expr } struct TEsScanNode {