Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 153 additions & 31 deletions be/src/exec/base_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,20 @@
#include "runtime/raw_value.h"
#include "runtime/runtime_state.h"
#include "runtime/tuple.h"
#include "vec/data_types/data_type_factory.hpp"

namespace doris {

BaseScanner::BaseScanner(RuntimeState* state, RuntimeProfile* profile,
const TBrokerScanRangeParams& params,
const std::vector<TBrokerRangeDesc>& ranges,
const std::vector<TNetworkAddress>& broker_addresses,
const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
: _state(state),
_params(params),
_ranges(ranges),
_broker_addresses(broker_addresses),
_next_range(0),
_counter(counter),
_src_tuple(nullptr),
_src_tuple_row(nullptr),
Expand Down Expand Up @@ -71,6 +77,22 @@ Status BaseScanner::open() {
_rows_read_counter = ADD_COUNTER(_profile, "RowsRead", TUnit::UNIT);
_read_timer = ADD_TIMER(_profile, "TotalRawReadTime(*)");
_materialize_timer = ADD_TIMER(_profile, "MaterializeTupleTime(*)");

DCHECK(!_ranges.empty());
const auto& range = _ranges[0];
_num_of_columns_from_file = range.__isset.num_of_columns_from_file
? implicit_cast<int>(range.num_of_columns_from_file)
: implicit_cast<int>(_src_slot_descs.size());

// check consistency
if (range.__isset.num_of_columns_from_file) {
int size = range.columns_from_path.size();
for (const auto& r : _ranges) {
if (r.columns_from_path.size() != size) {
return Status::InternalError("ranges have different number of columns.");
}
}
}
return Status::OK();
}

Expand Down Expand Up @@ -272,59 +294,135 @@ Status BaseScanner::_fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool) {
}
void* slot = dest_tuple->get_slot(slot_desc->tuple_offset());
RawValue::write(value, slot, slot_desc->type(), mem_pool);
continue;
}
_success = true;
return Status::OK();
}

Status BaseScanner::filter_block(vectorized::Block* temp_block, size_t slot_num) {
Status BaseScanner::_filter_src_block() {
auto origin_column_num = _src_block.columns();
// filter block
if (!_vpre_filter_ctxs.empty()) {
for (auto _vpre_filter_ctx : _vpre_filter_ctxs) {
auto old_rows = temp_block->rows();
RETURN_IF_ERROR(
vectorized::VExprContext::filter_block(_vpre_filter_ctx, temp_block, slot_num));
_counter->num_rows_unselected += old_rows - temp_block->rows();
auto old_rows = _src_block.rows();
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(_vpre_filter_ctx, &_src_block,
origin_column_num));
_counter->num_rows_unselected += old_rows - _src_block.rows();
}
}
return Status::OK();
}

Status BaseScanner::execute_exprs(vectorized::Block* output_block, vectorized::Block* temp_block) {
Status BaseScanner::_materialize_dest_block(vectorized::Block* dest_block) {
// Do vectorized expr here
Status status;
if (!_dest_vexpr_ctx.empty()) {
*output_block = vectorized::VExprContext::get_output_block_after_execute_exprs(
_dest_vexpr_ctx, *temp_block, status);
if (UNLIKELY(output_block->rows() == 0)) {
return status;
int ctx_idx = 0;
size_t rows = _src_block.rows();
auto filter_column = vectorized::ColumnUInt8::create(rows, 1);
auto& filter_map = filter_column->get_data();

for (auto slot_desc : _dest_tuple_desc->slots()) {
if (!slot_desc->is_materialized()) {
continue;
}
int dest_index = ctx_idx++;

auto* ctx = _dest_vexpr_ctx[dest_index];
int result_column_id = -1;
// PT1 => dest primitive type
RETURN_IF_ERROR(ctx->execute(&_src_block, &result_column_id));
auto column_ptr = _src_block.get_by_position(result_column_id).column;

// because of src_slot_desc is always be nullable, so the column_ptr after do dest_expr
// is likely to be nullable
if (LIKELY(column_ptr->is_nullable())) {
auto nullable_column =
reinterpret_cast<const vectorized::ColumnNullable*>(column_ptr.get());
for (int i = 0; i < rows; ++i) {
if (filter_map[i] && nullable_column->is_null_at(i)) {
if (_strict_mode && (_src_slot_descs_order_by_dest[dest_index]) &&
!_src_block.get_by_position(dest_index).column->is_null_at(i)) {
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string {
return _src_block.dump_one_line(i, _num_of_columns_from_file);
},
[&]() -> std::string {
auto raw_value =
_src_block.get_by_position(ctx_idx).column->get_data_at(
i);
std::string raw_string = raw_value.to_string();
fmt::memory_buffer error_msg;
fmt::format_to(error_msg,
"column({}) value is incorrect while strict "
"mode is {}, "
"src value is {}",
slot_desc->col_name(), _strict_mode, raw_string);
return fmt::to_string(error_msg);
},
&_scanner_eof));
filter_map[i] = false;
} else if (!slot_desc->is_nullable()) {
RETURN_IF_ERROR(_state->append_error_msg_to_file(
[&]() -> std::string {
return _src_block.dump_one_line(i, _num_of_columns_from_file);
},
[&]() -> std::string {
fmt::memory_buffer error_msg;
fmt::format_to(error_msg,
"column({}) values is null while columns is not "
"nullable",
slot_desc->col_name());
return fmt::to_string(error_msg);
},
&_scanner_eof));
filter_map[i] = false;
}
}
}
if (!slot_desc->is_nullable()) column_ptr = nullable_column->get_nested_column_ptr();
} else if (slot_desc->is_nullable()) {
column_ptr = vectorized::make_nullable(column_ptr);
}
dest_block->insert(vectorized::ColumnWithTypeAndName(
std::move(column_ptr), slot_desc->get_data_type_ptr(), slot_desc->col_name()));
}

// after do the dest block insert operation, clear _src_block to remove the reference of origin column
_src_block.clear();

size_t dest_size = dest_block->columns();
// do filter
dest_block->insert(vectorized::ColumnWithTypeAndName(
std::move(filter_column), std::make_shared<vectorized::DataTypeUInt8>(),
"filter column"));
RETURN_IF_ERROR(vectorized::Block::filter_block(dest_block, dest_size, dest_size));
_counter->num_rows_filtered += rows - dest_block->rows();

return Status::OK();
}

Status BaseScanner::fill_dest_block(vectorized::Block* dest_block,
std::vector<vectorized::MutableColumnPtr>& columns) {
if (columns.empty() || columns[0]->size() == 0) {
return Status::OK();
}

std::unique_ptr<vectorized::Block> temp_block(new vectorized::Block());
auto n_columns = 0;
for (const auto slot_desc : _src_slot_descs) {
temp_block->insert(vectorized::ColumnWithTypeAndName(std::move(columns[n_columns++]),
slot_desc->get_data_type_ptr(),
slot_desc->col_name()));
// TODO: opt the reuse of src_block or dest_block column. some case we have to
// shallow copy the column of src_block to dest block
Status BaseScanner::_init_src_block() {
DCHECK(_src_block.columns() == 0);
for (auto i = 0; i < _num_of_columns_from_file; ++i) {
SlotDescriptor* slot_desc = _src_slot_descs[i];
if (slot_desc == nullptr) {
continue;
}
auto data_type = slot_desc->get_data_type_ptr();
_src_block.insert(vectorized::ColumnWithTypeAndName(
data_type->create_column(), slot_desc->get_data_type_ptr(), slot_desc->col_name()));
}

RETURN_IF_ERROR(BaseScanner::filter_block(temp_block.get(), _dest_tuple_desc->slots().size()));
return Status::OK();
}

if (_dest_vexpr_ctx.empty()) {
*dest_block = *temp_block;
} else {
RETURN_IF_ERROR(BaseScanner::execute_exprs(dest_block, temp_block.get()));
Status BaseScanner::_fill_dest_block(vectorized::Block* dest_block, bool* eof) {
*eof = _scanner_eof;
_fill_columns_from_path();
if (LIKELY(_src_block.rows() > 0)) {
RETURN_IF_ERROR(BaseScanner::_filter_src_block());
RETURN_IF_ERROR(BaseScanner::_materialize_dest_block(dest_block));
}

return Status::OK();
Expand All @@ -337,7 +435,7 @@ void BaseScanner::fill_slots_of_columns_from_path(
auto slot_desc = _src_slot_descs.at(i + start);
_src_tuple->set_not_null(slot_desc->null_indicator_offset());
void* slot = _src_tuple->get_slot(slot_desc->tuple_offset());
StringValue* str_slot = reinterpret_cast<StringValue*>(slot);
auto* str_slot = reinterpret_cast<StringValue*>(slot);
const std::string& column_from_path = columns_from_path[i];
str_slot->ptr = const_cast<char*>(column_from_path.c_str());
str_slot->len = column_from_path.size();
Expand All @@ -360,4 +458,28 @@ void BaseScanner::close() {
}
}

void BaseScanner::_fill_columns_from_path() {
const TBrokerRangeDesc& range = _ranges.at(_next_range - 1);
if (range.__isset.num_of_columns_from_file) {
size_t start = range.num_of_columns_from_file;
size_t rows = _src_block.rows();

for (size_t i = 0; i < range.columns_from_path.size(); ++i) {
auto slot_desc = _src_slot_descs.at(i + start);
if (slot_desc == nullptr) continue;
auto is_nullable = slot_desc->is_nullable();
auto data_type = vectorized::DataTypeFactory::instance().create_data_type(TYPE_VARCHAR,
is_nullable);
auto data_column = data_type->create_column();
const std::string& column_from_path = range.columns_from_path[i];
for (size_t j = 0; j < rows; ++j) {
data_column->insert_data(const_cast<char*>(column_from_path.c_str()),
column_from_path.size());
}
_src_block.insert(vectorized::ColumnWithTypeAndName(std::move(data_column), data_type,
slot_desc->col_name()));
}
}
}

} // namespace doris
30 changes: 20 additions & 10 deletions be/src/exec/base_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ struct ScannerCounter {
class BaseScanner {
public:
BaseScanner(RuntimeState* state, RuntimeProfile* profile, const TBrokerScanRangeParams& params,
const std::vector<TBrokerRangeDesc>& ranges,
const std::vector<TNetworkAddress>& broker_addresses,
const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter);

virtual ~BaseScanner() {
Expr::close(_dest_expr_ctx, _state);
if (_state->enable_vectorized_exec()) {
Expand All @@ -77,21 +80,22 @@ class BaseScanner {
virtual void close() = 0;
Status fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool, bool* fill_tuple);

Status fill_dest_block(vectorized::Block* dest_block,
std::vector<vectorized::MutableColumnPtr>& columns);

void fill_slots_of_columns_from_path(int start,
const std::vector<std::string>& columns_from_path);

void free_expr_local_allocations();

Status filter_block(vectorized::Block* temp_block, size_t slot_num);

Status execute_exprs(vectorized::Block* output_block, vectorized::Block* temp_block);

protected:
Status _fill_dest_block(vectorized::Block* dest_block, bool* eof);
virtual Status _init_src_block();

RuntimeState* _state;
const TBrokerScanRangeParams& _params;

//const TBrokerScanRangeParams& _params;
const std::vector<TBrokerRangeDesc>& _ranges;
const std::vector<TNetworkAddress>& _broker_addresses;
int _next_range;
// used for process stat
ScannerCounter* _counter;

Expand All @@ -109,9 +113,6 @@ class BaseScanner {
// Dest tuple descriptor and dest expr context
const TupleDescriptor* _dest_tuple_desc;
std::vector<ExprContext*> _dest_expr_ctx;
// for vectorized
std::vector<vectorized::VExprContext*> _dest_vexpr_ctx;
std::vector<vectorized::VExprContext*> _vpre_filter_ctxs;
// the map values of dest slot id to src slot desc
// if there is not key of dest slot id in dest_sid_to_src_sid_without_trans, it will be set to nullptr
std::vector<SlotDescriptor*> _src_slot_descs_order_by_dest;
Expand All @@ -135,7 +136,16 @@ class BaseScanner {
bool _success = false;
bool _scanner_eof = false;

// for vectorized load
std::vector<vectorized::VExprContext*> _dest_vexpr_ctx;
std::vector<vectorized::VExprContext*> _vpre_filter_ctxs;
vectorized::Block _src_block;
int _num_of_columns_from_file;

private:
Status _filter_src_block();
void _fill_columns_from_path();
Status _materialize_dest_block(vectorized::Block* output_block);
Status _fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool);
};

Expand Down
5 changes: 1 addition & 4 deletions be/src/exec/broker_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,10 @@ BrokerScanner::BrokerScanner(RuntimeState* state, RuntimeProfile* profile,
const std::vector<TBrokerRangeDesc>& ranges,
const std::vector<TNetworkAddress>& broker_addresses,
const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
: BaseScanner(state, profile, params, pre_filter_texprs, counter),
_ranges(ranges),
_broker_addresses(broker_addresses),
: BaseScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter),
_cur_file_reader(nullptr),
_cur_line_reader(nullptr),
_cur_decompressor(nullptr),
_next_range(0),
_cur_line_reader_eof(false),
_skip_lines(0) {
if (params.__isset.column_separator_length && params.column_separator_length > 1) {
Expand Down
4 changes: 0 additions & 4 deletions be/src/exec/broker_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,6 @@ class BrokerScanner : public BaseScanner {
Status _convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool, bool* fill_tuple);

protected:
const std::vector<TBrokerRangeDesc>& _ranges;
const std::vector<TNetworkAddress>& _broker_addresses;

std::string _value_separator;
std::string _line_delimiter;
TFileFormatType::type _file_format_type;
Expand All @@ -113,7 +110,6 @@ class BrokerScanner : public BaseScanner {
FileReader* _cur_file_reader;
LineReader* _cur_line_reader;
Decompressor* _cur_decompressor;
int _next_range;
bool _cur_line_reader_eof;

// When we fetch range start from 0, header_type="csv_with_names" skip first line
Expand Down
5 changes: 1 addition & 4 deletions be/src/exec/json_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,10 @@ JsonScanner::JsonScanner(RuntimeState* state, RuntimeProfile* profile,
const std::vector<TBrokerRangeDesc>& ranges,
const std::vector<TNetworkAddress>& broker_addresses,
const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
: BaseScanner(state, profile, params, pre_filter_texprs, counter),
_ranges(ranges),
_broker_addresses(broker_addresses),
: BaseScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter),
_cur_file_reader(nullptr),
_cur_line_reader(nullptr),
_cur_json_reader(nullptr),
_next_range(0),
_cur_reader_eof(false),
_read_json_by_line(false) {
if (params.__isset.line_delimiter_length && params.line_delimiter_length > 1) {
Expand Down
4 changes: 0 additions & 4 deletions be/src/exec/json_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,6 @@ class JsonScanner : public BaseScanner {
bool& num_as_string, bool& fuzzy_parse);

protected:
const std::vector<TBrokerRangeDesc>& _ranges;
const std::vector<TNetworkAddress>& _broker_addresses;

std::string _jsonpath;
std::string _jsonpath_file;

Expand All @@ -91,7 +88,6 @@ class JsonScanner : public BaseScanner {
FileReader* _cur_file_reader;
LineReader* _cur_line_reader;
JsonReader* _cur_json_reader;
int _next_range;
bool _cur_reader_eof;
bool _read_json_by_line;

Expand Down
5 changes: 1 addition & 4 deletions be/src/exec/orc_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,11 +120,8 @@ ORCScanner::ORCScanner(RuntimeState* state, RuntimeProfile* profile,
const std::vector<TBrokerRangeDesc>& ranges,
const std::vector<TNetworkAddress>& broker_addresses,
const std::vector<TExpr>& pre_filter_texprs, ScannerCounter* counter)
: BaseScanner(state, profile, params, pre_filter_texprs, counter),
_ranges(ranges),
_broker_addresses(broker_addresses),
: BaseScanner(state, profile, params, ranges, broker_addresses, pre_filter_texprs, counter),
// _splittable(params.splittable),
_next_range(0),
_cur_file_eof(true),
_total_groups(0),
_current_group(0),
Expand Down
Loading