Skip to content
13 changes: 12 additions & 1 deletion be/src/exec/base_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,18 @@ Status BaseScanner::init_expr_ctxes() {
return Status::OK();
}

Status BaseScanner::fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) {
Status BaseScanner::fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool, bool* fill_tuple) {
RETURN_IF_ERROR(_fill_dest_tuple(dest_tuple, mem_pool));
if (_success) {
free_expr_local_allocations();
*fill_tuple = true;
} else {
*fill_tuple = false;
}
return Status::OK();
}

Status BaseScanner::_fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) {
// filter src tuple by preceding filter first
if (!ExecNode::eval_conjuncts(&_pre_filter_ctxs[0], _pre_filter_ctxs.size(), _src_tuple_row)) {
_counter->num_rows_unselected++;
Expand Down
6 changes: 5 additions & 1 deletion be/src/exec/base_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@ class BaseScanner {

// Close this scanner
virtual void close() = 0;
Status fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool);
Status fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool, bool* fill_tuple);

void fill_slots_of_columns_from_path(int start,
const std::vector<std::string>& columns_from_path);

void free_expr_local_allocations();

protected:
RuntimeState* _state;
const TBrokerScanRangeParams& _params;
Expand Down Expand Up @@ -106,6 +107,9 @@ class BaseScanner {
// Used to record whether a row of data is successfully read.
bool _success = false;
bool _scanner_eof = false;

private:
Status _fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool);
};

} /* namespace doris */
Expand Down
12 changes: 3 additions & 9 deletions be/src/exec/broker_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,7 @@ Status BrokerScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, boo
{
COUNTER_UPDATE(_rows_read_counter, 1);
SCOPED_TIMER(_materialize_timer);
RETURN_IF_ERROR(_convert_one_row(Slice(ptr, size), tuple, tuple_pool));
if (_success) {
free_expr_local_allocations();
*fill_tuple = true;
} else {
*fill_tuple = false;
}
RETURN_IF_ERROR(_convert_one_row(Slice(ptr, size), tuple, tuple_pool, fill_tuple));
break; // break always
}
}
Expand Down Expand Up @@ -469,14 +463,14 @@ bool is_null(const Slice& slice) {
}

// Convert one row to this tuple
Status BrokerScanner::_convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool) {
Status BrokerScanner::_convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool, bool* fill_tuple) {
RETURN_IF_ERROR(_line_to_src_tuple(line));
if (!_success) {
// If not success, which means we met an invalid row, return.
return Status::OK();
}

return fill_dest_tuple(tuple, tuple_pool);
return fill_dest_tuple(tuple, tuple_pool, fill_tuple);
}

// Convert one row to this tuple
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/broker_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class BrokerScanner : public BaseScanner {
// Convert one row to one tuple
// 'ptr' and 'len' is csv text line
// output is tuple
Status _convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool);
Status _convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool, bool* fill_tuple);

Status _line_to_src_tuple(const Slice& line);

Expand Down
3 changes: 1 addition & 2 deletions be/src/exec/json_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ Status JsonScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool
}
COUNTER_UPDATE(_rows_read_counter, 1);
SCOPED_TIMER(_materialize_timer);
RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool));
*fill_tuple = _success;
RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool, fill_tuple));
break; // break always
}
if (_scanner_eof) {
Expand Down
3 changes: 1 addition & 2 deletions be/src/exec/orc_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -356,8 +356,7 @@ Status ORCScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool*
}
COUNTER_UPDATE(_rows_read_counter, 1);
SCOPED_TIMER(_materialize_timer);
RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool));
*fill_tuple = _success;
RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool, fill_tuple));
break;
}
if (_scanner_eof) {
Expand Down
3 changes: 1 addition & 2 deletions be/src/exec/parquet_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ Status ParquetScanner::get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bo

COUNTER_UPDATE(_rows_read_counter, 1);
SCOPED_TIMER(_materialize_timer);
RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool));
*fill_tuple = _success;
RETURN_IF_ERROR(fill_dest_tuple(tuple, tuple_pool, fill_tuple));
break; // break always
}
if (_scanner_eof) {
Expand Down