From 25c3c3717c1490c1ad037d6a73e3bdee07f6efca Mon Sep 17 00:00:00 2001 From: SleepyBear Date: Fri, 22 Apr 2022 21:31:35 +0800 Subject: [PATCH 01/13] [fix](broker load) oom --- be/src/exec/json_scanner.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp index 1e08eec5992072..9e824189deabdc 100644 --- a/be/src/exec/json_scanner.cpp +++ b/be/src/exec/json_scanner.cpp @@ -97,6 +97,7 @@ Status JsonScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool SCOPED_TIMER(_materialize_timer); RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool)); *fill_tuple = _success; + free_expr_local_allocations(); break; // break always } if (_scanner_eof) { From d2054850209ee6ff699f118eb1a311569b091318 Mon Sep 17 00:00:00 2001 From: SleepyBear Date: Fri, 22 Apr 2022 21:40:05 +0800 Subject: [PATCH 02/13] Update orc_scanner.cpp --- be/src/exec/orc_scanner.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/be/src/exec/orc_scanner.cpp b/be/src/exec/orc_scanner.cpp index 254c7bb92a2893..08dc12cff51561 100644 --- a/be/src/exec/orc_scanner.cpp +++ b/be/src/exec/orc_scanner.cpp @@ -358,6 +358,7 @@ Status ORCScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool* SCOPED_TIMER(_materialize_timer); RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool)); *fill_tuple = _success; + free_expr_local_allocations(); break; } if (_scanner_eof) { From e6a03b707fe114dbd99a14939e20d852b599cc4f Mon Sep 17 00:00:00 2001 From: SleepyBear Date: Fri, 22 Apr 2022 21:40:54 +0800 Subject: [PATCH 03/13] Update parquet_scanner.cpp --- be/src/exec/parquet_scanner.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp index 9a85a1253ad969..3b540321fe1828 100644 --- a/be/src/exec/parquet_scanner.cpp +++ b/be/src/exec/parquet_scanner.cpp @@ -82,6 +82,7 @@ Status ParquetScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bo SCOPED_TIMER(_materialize_timer); RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool)); *fill_tuple = _success; + free_expr_local_allocations(); break; // break always } if (_scanner_eof) { From d3379ef03284e42a2623127d75677edd444d89c8 Mon Sep 17 00:00:00 2001 From: SleepyBear Date: Fri, 22 Apr 2022 21:46:51 +0800 Subject: [PATCH 04/13] Update json_scanner.cpp --- be/src/exec/json_scanner.cpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp index 9e824189deabdc..81e47435c9181c 100644 --- a/be/src/exec/json_scanner.cpp +++ b/be/src/exec/json_scanner.cpp @@ -96,8 +96,12 @@ Status JsonScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool COUNTER_UPDATE(_rows_read_counter, 1); SCOPED_TIMER(_materialize_timer); RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool)); - *fill_tuple = _success; - free_expr_local_allocations(); + if (_success) { + free_expr_local_allocations(); + *fill_tuple = true; + else { + *fill_tuple = false; + } break; // break always } if (_scanner_eof) { From 92a64909bb30820e948ef045d6815ea9d7dab695 Mon Sep 17 00:00:00 2001 From: SleepyBear Date: Fri, 22 Apr 2022 21:48:32 +0800 Subject: [PATCH 05/13] Update orc_scanner.cpp --- be/src/exec/orc_scanner.cpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/be/src/exec/orc_scanner.cpp b/be/src/exec/orc_scanner.cpp index 08dc12cff51561..1bfef3af13e375 100644 --- a/be/src/exec/orc_scanner.cpp +++ b/be/src/exec/orc_scanner.cpp @@ -357,8 +357,12 @@ Status ORCScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool* COUNTER_UPDATE(_rows_read_counter, 1); SCOPED_TIMER(_materialize_timer); RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool)); - *fill_tuple = _success; - free_expr_local_allocations(); + if (_success) { + free_expr_local_allocations(); + *fill_tuple = true; + } else { + *fill_tuple = false; + } break; } if (_scanner_eof) { From 813698bfc5d1664d0e82a26e8033461f4fe7da4e Mon Sep 17 00:00:00 2001 From: SleepyBear Date: Fri, 22 Apr 2022 21:50:07 +0800 Subject: [PATCH 06/13] Update parquet_scanner.cpp --- be/src/exec/parquet_scanner.cpp | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp index 3b540321fe1828..5ab0ca5204a316 100644 --- a/be/src/exec/parquet_scanner.cpp +++ b/be/src/exec/parquet_scanner.cpp @@ -81,8 +81,12 @@ Status ParquetScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bo COUNTER_UPDATE(_rows_read_counter, 1); SCOPED_TIMER(_materialize_timer); RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool)); - *fill_tuple = _success; - free_expr_local_allocations(); + if (_success) { + free_expr_local_allocations(); + *fill_tuple = true; + } else { + *fill_tuple = false; + } break; // break always } if (_scanner_eof) { From 224bb0183239242174120aae1e2bf602edfa6d08 Mon Sep 17 00:00:00 2001 From: SleepyBear Date: Sun, 24 Apr 2022 22:11:57 +0800 Subject: [PATCH 07/13] Update base_scanner.h --- be/src/exec/base_scanner.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h index bce0f4b8ca694f..bb19dc7edca1a9 100644 --- a/be/src/exec/base_scanner.h +++ b/be/src/exec/base_scanner.h @@ -64,6 +64,8 @@ class BaseScanner { const std::vector& columns_from_path); void free_expr_local_allocations(); + + void fill_tuple_post_process(); protected: RuntimeState* _state; const TBrokerScanRangeParams& _params; From 4f6ba3280bd63762458f3efec191495dd5a7a5fa Mon Sep 17 00:00:00 2001 From: SleepyBear Date: Sun, 24 Apr 2022 22:15:10 +0800 Subject: [PATCH 08/13] Update base_scanner.cpp --- be/src/exec/base_scanner.cpp | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index 06331023e833c4..090cd17976a71e 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -268,6 +268,15 @@ void BaseScanner::free_expr_local_allocations() { } } +void BaseScanner::fill_tuple_post_process() { + if (_success) { + free_expr_local_allocations(); + *fill_tuple = true; + } else { + *fill_tuple = false; + } +} + void BaseScanner::close() { if (!_pre_filter_ctxs.empty()) { Expr::close(_pre_filter_ctxs, _state); From 89cba1031487606235f3be7929acc40da89ce8aa Mon Sep 17 00:00:00 2001 From: SleepyBear Date: Sun, 24 Apr 2022 22:16:27 +0800 Subject: [PATCH 09/13] Update broker_scanner.cpp --- be/src/exec/broker_scanner.cpp | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index c4f771faaaf858..2e1d32f61e0e48 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -115,12 +115,7 @@ Status BrokerScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, boo COUNTER_UPDATE(_rows_read_counter, 1); SCOPED_TIMER(_materialize_timer); RETURN_IF_ERROR(_convert_one_row(Slice(ptr, size), tuple, tuple_pool)); - if (_success) { - free_expr_local_allocations(); - *fill_tuple = true; - } else { - *fill_tuple = false; - } + fill_tuple_post_process(); break; // break always } } From a19657d8af76089bd01382b6d870d37e5ab1c7b2 Mon Sep 17 00:00:00 2001 From: SleepyBear Date: Sun, 24 Apr 2022 22:17:44 +0800 Subject: [PATCH 10/13] Update json_scanner.cpp --- be/src/exec/json_scanner.cpp | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp index 81e47435c9181c..d347000a4b40f8 100644 --- a/be/src/exec/json_scanner.cpp +++ b/be/src/exec/json_scanner.cpp @@ -96,12 +96,7 @@ Status JsonScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool COUNTER_UPDATE(_rows_read_counter, 1); SCOPED_TIMER(_materialize_timer); RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool)); - if (_success) { - free_expr_local_allocations(); - *fill_tuple = true; - else { - *fill_tuple = false; - } + fill_tuple_post_process(); break; // break always } if (_scanner_eof) { From f1ee6410b8f6e4389ab1ae26b48a1fbc7991c6ec Mon Sep 17 00:00:00 2001 From: SleepyBear Date: Sun, 24 Apr 2022 22:18:51 +0800 Subject: [PATCH 11/13] Update orc_scanner.cpp --- be/src/exec/orc_scanner.cpp | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/be/src/exec/orc_scanner.cpp b/be/src/exec/orc_scanner.cpp index 1bfef3af13e375..150fafb05b2e46 100644 --- a/be/src/exec/orc_scanner.cpp +++ b/be/src/exec/orc_scanner.cpp @@ -357,12 +357,7 @@ Status ORCScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool* COUNTER_UPDATE(_rows_read_counter, 1); SCOPED_TIMER(_materialize_timer); RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool)); - if (_success) { - free_expr_local_allocations(); - *fill_tuple = true; - } else { - *fill_tuple = false; - } + fill_tuple_post_process(); break; } if (_scanner_eof) { From 7c0ec15240a93e9642fe829cd65dbab21fe965be Mon Sep 17 00:00:00 2001 From: SleepyBear Date: Sun, 24 Apr 2022 22:19:58 +0800 Subject: [PATCH 12/13] Update parquet_scanner.cpp --- be/src/exec/parquet_scanner.cpp | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp index 5ab0ca5204a316..6a1b5d6e20a54f 100644 --- a/be/src/exec/parquet_scanner.cpp +++ b/be/src/exec/parquet_scanner.cpp @@ -81,12 +81,7 @@ Status ParquetScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bo COUNTER_UPDATE(_rows_read_counter, 1); SCOPED_TIMER(_materialize_timer); RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool)); - if (_success) { - free_expr_local_allocations(); - *fill_tuple = true; - } else { - *fill_tuple = false; - } + fill_tuple_post_process(); break; // break always } if (_scanner_eof) { From 0f75dc9935c51b67c9e1b38e765e07469a428352 Mon Sep 17 00:00:00 2001 From: morningman Date: Mon, 25 Apr 2022 00:28:49 +0800 Subject: [PATCH 13/13] refactor fill_dest_tuple --- be/src/exec/base_scanner.cpp | 22 ++++++++++++---------- be/src/exec/base_scanner.h | 6 ++++-- be/src/exec/broker_scanner.cpp | 7 +++---- be/src/exec/broker_scanner.h | 2 +- be/src/exec/json_scanner.cpp | 3 +-- be/src/exec/orc_scanner.cpp | 3 +-- be/src/exec/parquet_scanner.cpp | 3 +-- 7 files changed, 23 insertions(+), 23 deletions(-) diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index 090cd17976a71e..796defc6d07b79 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -155,7 +155,18 @@ Status BaseScanner::init_expr_ctxes() { return Status::OK(); } -Status BaseScanner::fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) { +Status BaseScanner::fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool, bool* fill_tuple) { + RETURN_IF_ERROR(_fill_dest_tuple(dest_tuple, mem_pool)); + if (_success) { + free_expr_local_allocations(); + *fill_tuple = true; + } else { + *fill_tuple = false; + } + return Status::OK(); +} + +Status BaseScanner::_fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) { // filter src tuple by preceding filter first if (!ExecNode::eval_conjuncts(&_pre_filter_ctxs[0], _pre_filter_ctxs.size(), _src_tuple_row)) { _counter->num_rows_unselected++; @@ -268,15 +279,6 @@ void BaseScanner::free_expr_local_allocations() { } } -void BaseScanner::fill_tuple_post_process() { - if (_success) { - free_expr_local_allocations(); - *fill_tuple = true; - } else { - *fill_tuple = false; - } -} - void BaseScanner::close() { if (!_pre_filter_ctxs.empty()) { Expr::close(_pre_filter_ctxs, _state); diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h index bb19dc7edca1a9..c01891c3818db4 100644 --- a/be/src/exec/base_scanner.h +++ b/be/src/exec/base_scanner.h @@ -58,14 +58,13 @@ class BaseScanner { // Close this scanner virtual void close() = 0; - Status fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool); + Status fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool, bool* fill_tuple); void fill_slots_of_columns_from_path(int start, const std::vector& columns_from_path); void free_expr_local_allocations(); - void fill_tuple_post_process(); protected: RuntimeState* _state; const TBrokerScanRangeParams& _params; @@ -108,6 +107,9 @@ class BaseScanner { // Used to record whether a row of data is successfully read. bool _success = false; bool _scanner_eof = false; + +private: + Status _fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool); }; } /* namespace doris */ diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index 2e1d32f61e0e48..372cb1f0d4d513 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -114,8 +114,7 @@ Status BrokerScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, boo { COUNTER_UPDATE(_rows_read_counter, 1); SCOPED_TIMER(_materialize_timer); - RETURN_IF_ERROR(_convert_one_row(Slice(ptr, size), tuple, tuple_pool)); - fill_tuple_post_process(); + RETURN_IF_ERROR(_convert_one_row(Slice(ptr, size), tuple, tuple_pool, fill_tuple)); break; // break always } } @@ -464,14 +463,14 @@ bool is_null(const Slice& slice) { } // Convert one row to this tuple -Status BrokerScanner::_convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool) { +Status BrokerScanner::_convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool, bool* fill_tuple) { RETURN_IF_ERROR(_line_to_src_tuple(line)); if (!_success) { // If not success, which means we met an invalid row, return. return Status::OK(); } - return fill_dest_tuple(tuple, tuple_pool); + return fill_dest_tuple(tuple, tuple_pool, fill_tuple); } // Convert one row to this tuple diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h index 059336da906130..23d1a81c02c5f2 100644 --- a/be/src/exec/broker_scanner.h +++ b/be/src/exec/broker_scanner.h @@ -86,7 +86,7 @@ class BrokerScanner : public BaseScanner { // Convert one row to one tuple // 'ptr' and 'len' is csv text line // output is tuple - Status _convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool); + Status _convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool, bool* fill_tuple); Status _line_to_src_tuple(const Slice& line); diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp index d347000a4b40f8..945afde21db66b 100644 --- a/be/src/exec/json_scanner.cpp +++ b/be/src/exec/json_scanner.cpp @@ -95,8 +95,7 @@ Status JsonScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool } COUNTER_UPDATE(_rows_read_counter, 1); SCOPED_TIMER(_materialize_timer); - RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool)); - fill_tuple_post_process(); + RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool, fill_tuple)); break; // break always } if (_scanner_eof) { diff --git a/be/src/exec/orc_scanner.cpp b/be/src/exec/orc_scanner.cpp index 150fafb05b2e46..fbefe1cb400c11 100644 --- a/be/src/exec/orc_scanner.cpp +++ b/be/src/exec/orc_scanner.cpp @@ -356,8 +356,7 @@ Status ORCScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool* } COUNTER_UPDATE(_rows_read_counter, 1); SCOPED_TIMER(_materialize_timer); - RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool)); - fill_tuple_post_process(); + RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool, fill_tuple)); break; } if (_scanner_eof) { diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp index 6a1b5d6e20a54f..3295dc4bc7767c 100644 --- a/be/src/exec/parquet_scanner.cpp +++ b/be/src/exec/parquet_scanner.cpp @@ -80,8 +80,7 @@ Status ParquetScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bo COUNTER_UPDATE(_rows_read_counter, 1); SCOPED_TIMER(_materialize_timer); - RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool)); - fill_tuple_post_process(); + RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool, fill_tuple)); break; // break always } if (_scanner_eof) {