diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp index 06331023e833c4..796defc6d07b79 100644 --- a/be/src/exec/base_scanner.cpp +++ b/be/src/exec/base_scanner.cpp @@ -155,7 +155,18 @@ Status BaseScanner::init_expr_ctxes() { return Status::OK(); } -Status BaseScanner::fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) { +Status BaseScanner::fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool, bool* fill_tuple) { + RETURN_IF_ERROR(_fill_dest_tuple(dest_tuple, mem_pool)); + if (_success) { + free_expr_local_allocations(); + *fill_tuple = true; + } else { + *fill_tuple = false; + } + return Status::OK(); +} + +Status BaseScanner::_fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) { // filter src tuple by preceding filter first if (!ExecNode::eval_conjuncts(&_pre_filter_ctxs[0], _pre_filter_ctxs.size(), _src_tuple_row)) { _counter->num_rows_unselected++; diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h index bce0f4b8ca694f..c01891c3818db4 100644 --- a/be/src/exec/base_scanner.h +++ b/be/src/exec/base_scanner.h @@ -58,12 +58,13 @@ class BaseScanner { // Close this scanner virtual void close() = 0; - Status fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool); + Status fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool, bool* fill_tuple); void fill_slots_of_columns_from_path(int start, const std::vector& columns_from_path); void free_expr_local_allocations(); + protected: RuntimeState* _state; const TBrokerScanRangeParams& _params; @@ -106,6 +107,9 @@ class BaseScanner { // Used to record whether a row of data is successfully read. bool _success = false; bool _scanner_eof = false; + +private: + Status _fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool); }; } /* namespace doris */ diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index c4f771faaaf858..372cb1f0d4d513 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -114,13 +114,7 @@ Status BrokerScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, boo { COUNTER_UPDATE(_rows_read_counter, 1); SCOPED_TIMER(_materialize_timer); - RETURN_IF_ERROR(_convert_one_row(Slice(ptr, size), tuple, tuple_pool)); - if (_success) { - free_expr_local_allocations(); - *fill_tuple = true; - } else { - *fill_tuple = false; - } + RETURN_IF_ERROR(_convert_one_row(Slice(ptr, size), tuple, tuple_pool, fill_tuple)); break; // break always } } @@ -469,14 +463,14 @@ bool is_null(const Slice& slice) { } // Convert one row to this tuple -Status BrokerScanner::_convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool) { +Status BrokerScanner::_convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool, bool* fill_tuple) { RETURN_IF_ERROR(_line_to_src_tuple(line)); if (!_success) { // If not success, which means we met an invalid row, return. return Status::OK(); } - return fill_dest_tuple(tuple, tuple_pool); + return fill_dest_tuple(tuple, tuple_pool, fill_tuple); } // Convert one row to this tuple diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h index 059336da906130..23d1a81c02c5f2 100644 --- a/be/src/exec/broker_scanner.h +++ b/be/src/exec/broker_scanner.h @@ -86,7 +86,7 @@ class BrokerScanner : public BaseScanner { // Convert one row to one tuple // 'ptr' and 'len' is csv text line // output is tuple - Status _convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool); + Status _convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool, bool* fill_tuple); Status _line_to_src_tuple(const Slice& line); diff --git a/be/src/exec/json_scanner.cpp b/be/src/exec/json_scanner.cpp index 1e08eec5992072..945afde21db66b 100644 --- a/be/src/exec/json_scanner.cpp +++ b/be/src/exec/json_scanner.cpp @@ -95,8 +95,7 @@ Status JsonScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool } COUNTER_UPDATE(_rows_read_counter, 1); SCOPED_TIMER(_materialize_timer); - RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool)); - *fill_tuple = _success; + RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool, fill_tuple)); break; // break always } if (_scanner_eof) { diff --git a/be/src/exec/orc_scanner.cpp b/be/src/exec/orc_scanner.cpp index 254c7bb92a2893..fbefe1cb400c11 100644 --- a/be/src/exec/orc_scanner.cpp +++ b/be/src/exec/orc_scanner.cpp @@ -356,8 +356,7 @@ Status ORCScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool* } COUNTER_UPDATE(_rows_read_counter, 1); SCOPED_TIMER(_materialize_timer); - RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool)); - *fill_tuple = _success; + RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool, fill_tuple)); break; } if (_scanner_eof) { diff --git a/be/src/exec/parquet_scanner.cpp b/be/src/exec/parquet_scanner.cpp index 9a85a1253ad969..3295dc4bc7767c 100644 --- a/be/src/exec/parquet_scanner.cpp +++ b/be/src/exec/parquet_scanner.cpp @@ -80,8 +80,7 @@ Status ParquetScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bo COUNTER_UPDATE(_rows_read_counter, 1); SCOPED_TIMER(_materialize_timer); - RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool)); - *fill_tuple = _success; + RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool, fill_tuple)); break; // break always } if (_scanner_eof) {