Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions be/src/exec/base_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ Status BaseScanner::init_expr_ctxes() {
Expr::create_expr_trees(_state->obj_pool(), _pre_filter_texprs, &_pre_filter_ctxs));
RETURN_IF_ERROR(Expr::prepare(_pre_filter_ctxs, _state, *_row_desc, _mem_tracker));
RETURN_IF_ERROR(Expr::open(_pre_filter_ctxs, _state));
// vec
RETURN_IF_ERROR(
vectorized::VExpr::create_expr_trees(_state->obj_pool(), _pre_filter_texprs, &_pre_filter_vctxs));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_pre_filter_vctxs, _state, *_row_desc, _mem_tracker));
RETURN_IF_ERROR(vectorized::VExpr::open(_pre_filter_vctxs, _state));
}

// Construct dest slots information
Expand Down Expand Up @@ -137,6 +142,12 @@ Status BaseScanner::init_expr_ctxes() {
RETURN_IF_ERROR(ctx->prepare(_state, *_row_desc.get(), _mem_tracker));
RETURN_IF_ERROR(ctx->open(_state));
_dest_expr_ctx.emplace_back(ctx);
// vec
vectorized::VExprContext* vctx = nullptr;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(_state->obj_pool(), it->second, &vctx));
RETURN_IF_ERROR(vctx->prepare(_state, *_row_desc.get(), _mem_tracker));
RETURN_IF_ERROR(vctx->open(_state));
_dest_vexpr_ctxs.emplace_back(vctx);
if (has_slot_id_map) {
auto it = _params.dest_sid_to_src_sid_without_trans.find(slot_desc->id());
if (it == std::end(_params.dest_sid_to_src_sid_without_trans)) {
Expand Down Expand Up @@ -272,6 +283,12 @@ void BaseScanner::close() {
if (!_pre_filter_ctxs.empty()) {
Expr::close(_pre_filter_ctxs, _state);
}
if (!_pre_filter_vctxs.empty()) {
vectorized::VExpr::close(_pre_filter_vctxs, _state);
}
if (!_dest_vexpr_ctxs.empty()) {
vectorized::VExpr::close(_dest_vexpr_ctxs, _state);
}
}

} // namespace doris
6 changes: 6 additions & 0 deletions be/src/exec/base_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include "common/status.h"
#include "exprs/expr.h"
#include "vec/exprs/vexpr.h"
#include "runtime/tuple.h"
#include "util/runtime_profile.h"

Expand All @@ -34,6 +35,7 @@ class RuntimeState;
class ExprContext;

namespace vectorized {
class VExprContext;
class IColumn;
using MutableColumnPtr = IColumn::MutablePtr;
}
Expand Down Expand Up @@ -94,6 +96,8 @@ class BaseScanner {
// Dest tuple descriptor and dest expr context
const TupleDescriptor* _dest_tuple_desc;
std::vector<ExprContext*> _dest_expr_ctx;
// for vec
std::vector<vectorized::VExprContext*> _dest_vexpr_ctxs;
// the map values of dest slot id to src slot desc
// if there is not key of dest slot id in dest_sid_to_src_sid_without_trans, it will be set to nullptr
std::vector<SlotDescriptor*> _src_slot_descs_order_by_dest;
Expand All @@ -103,6 +107,8 @@ class BaseScanner {
// and will be converted to `_pre_filter_ctxs` when scanner is open.
const std::vector<TExpr> _pre_filter_texprs;
std::vector<ExprContext*> _pre_filter_ctxs;
// for vec
std::vector<vectorized::VExprContext*> _pre_filter_vctxs;

bool _strict_mode;

Expand Down
13 changes: 10 additions & 3 deletions be/src/exec/broker_scan_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include "common/object_pool.h"
#include "vec/exec/vbroker_scanner.h"
#include "vec/exec/vparquet_scanner.h"
#include "exec/json_scanner.h"
#include "exec/orc_scanner.h"
#include "exec/parquet_scanner.h"
Expand Down Expand Up @@ -223,9 +224,15 @@ std::unique_ptr<BaseScanner> BrokerScanNode::create_scanner(const TBrokerScanRan
BaseScanner* scan = nullptr;
switch (scan_range.ranges[0].format_type) {
case TFileFormatType::FORMAT_PARQUET:
scan = new ParquetScanner(_runtime_state, runtime_profile(), scan_range.params,
scan_range.ranges, scan_range.broker_addresses,
_pre_filter_texprs, counter);
if (_vectorized) {
scan = new vectorized::VParquetScanner(_runtime_state, runtime_profile(), scan_range.params,
scan_range.ranges, scan_range.broker_addresses,
_pre_filter_texprs, counter);
} else {
scan = new ParquetScanner(_runtime_state, runtime_profile(), scan_range.params,
scan_range.ranges, scan_range.broker_addresses,
_pre_filter_texprs, counter);
}
break;
case TFileFormatType::FORMAT_ORC:
scan = new ORCScanner(_runtime_state, runtime_profile(), scan_range.params,
Expand Down
18 changes: 18 additions & 0 deletions be/src/exec/parquet_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,24 @@ Status ParquetReaderWrap::read_record_batch(const std::vector<SlotDescriptor*>&
return Status::OK();
}

Status ParquetReaderWrap::next_batch(std::shared_ptr<arrow::RecordBatch>* batch,
const std::vector<SlotDescriptor*>& tuple_slot_descs,
bool* eof) {
if (_batch->num_rows() == 0 ||
_current_line_of_batch != 0 ||
_current_line_of_group != 0) {
RETURN_IF_ERROR(read_record_batch(tuple_slot_descs, eof));
}
*batch = get_batch();
return Status::OK();
}

const std::shared_ptr<arrow::RecordBatch>& ParquetReaderWrap::get_batch() {
_current_line_of_batch += _batch->num_rows();
_current_line_of_group += _batch->num_rows();
return _batch;
}

Status ParquetReaderWrap::handle_timestamp(const std::shared_ptr<arrow::TimestampArray>& ts_array,
uint8_t* buf, int32_t* wbytes) {
const auto type = std::static_pointer_cast<arrow::TimestampType>(ts_array->type());
Expand Down
4 changes: 4 additions & 0 deletions be/src/exec/parquet_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,17 @@ class ParquetReaderWrap {
Status size(int64_t* size);
Status init_parquet_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
const std::string& timezone);
Status next_batch(std::shared_ptr<arrow::RecordBatch>* batch,
const std::vector<SlotDescriptor*>& tuple_slot_descs,
bool* eof);

private:
void fill_slot(Tuple* tuple, SlotDescriptor* slot_desc, MemPool* mem_pool, const uint8_t* value,
int32_t len);
Status column_indices(const std::vector<SlotDescriptor*>& tuple_slot_descs);
Status set_field_null(Tuple* tuple, const SlotDescriptor* slot_desc);
Status read_record_batch(const std::vector<SlotDescriptor*>& tuple_slot_descs, bool* eof);
const std::shared_ptr<arrow::RecordBatch>& get_batch();
Status handle_timestamp(const std::shared_ptr<arrow::TimestampArray>& ts_array, uint8_t* buf,
int32_t* wbtyes);

Expand Down
4 changes: 2 additions & 2 deletions be/src/exec/parquet_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ class ParquetScanner : public BaseScanner {
// Close this scanner
virtual void close();

private:
protected:
// Read next buffer from reader
Status open_next_reader();

private:
protected:
//const TBrokerScanRangeParams& _params;
const std::vector<TBrokerRangeDesc>& _ranges;
const std::vector<TNetworkAddress>& _broker_addresses;
Expand Down
36 changes: 19 additions & 17 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,27 +77,29 @@ static TFileFormatType::type parse_format(const std::string& format_str,
return parse_format("CSV", compress_type);
}
TFileFormatType::type format_type = TFileFormatType::FORMAT_UNKNOWN;
if (boost::iequals(format_str, "CSV")) {
if (iequal(format_str, "CSV")) {
if (compress_type.empty()) {
format_type = TFileFormatType::FORMAT_CSV_PLAIN;
}
if (boost::iequals(compress_type, "GZ")) {
if (iequal(compress_type, "GZ")) {
format_type = TFileFormatType::FORMAT_CSV_GZ;
} else if (boost::iequals(compress_type, "LZO")) {
} else if (iequal(compress_type, "LZO")) {
format_type = TFileFormatType::FORMAT_CSV_LZO;
} else if (boost::iequals(compress_type, "BZ2")) {
} else if (iequal(compress_type, "BZ2")) {
format_type = TFileFormatType::FORMAT_CSV_BZ2;
} else if (boost::iequals(compress_type, "LZ4FRAME")) {
} else if (iequal(compress_type, "LZ4FRAME")) {
format_type = TFileFormatType::FORMAT_CSV_LZ4FRAME;
} else if (boost::iequals(compress_type, "LZOP")) {
} else if (iequal(compress_type, "LZOP")) {
format_type = TFileFormatType::FORMAT_CSV_LZOP;
} else if (boost::iequals(compress_type, "DEFLATE")) {
} else if (iequal(compress_type, "DEFLATE")) {
format_type = TFileFormatType::FORMAT_CSV_DEFLATE;
}
} else if (boost::iequals(format_str, "JSON")) {
} else if (iequal(format_str, "JSON")) {
if (compress_type.empty()) {
format_type = TFileFormatType::FORMAT_JSON;
}
} else if (iequal(format_str, "PARQUET")) {
format_type = TFileFormatType::FORMAT_PARQUET;
}
return format_type;
}
Expand Down Expand Up @@ -262,7 +264,7 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext* ct
}

// get format of this put
if (!http_req->header(HTTP_COMPRESS_TYPE).empty() && boost::iequals(http_req->header(HTTP_FORMAT_KEY), "JSON")) {
if (!http_req->header(HTTP_COMPRESS_TYPE).empty() && iequal(http_req->header(HTTP_FORMAT_KEY), "JSON")) {
return Status::InternalError("compress data of JSON format is not supported.");
}
ctx->format =
Expand All @@ -283,7 +285,7 @@ Status StreamLoadAction::_on_header(HttpRequest* http_req, StreamLoadContext* ct
size_t json_max_body_bytes = config::streaming_load_json_max_mb * 1024 * 1024;
bool read_json_by_line = false;
if (!http_req->header(HTTP_READ_JSON_BY_LINE).empty()) {
if (boost::iequals(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) {
if (iequal(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) {
read_json_by_line = true;
}
}
Expand Down Expand Up @@ -431,9 +433,9 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
request.__set_negative(false);
}
if (!http_req->header(HTTP_STRICT_MODE).empty()) {
if (boost::iequals(http_req->header(HTTP_STRICT_MODE), "false")) {
if (iequal(http_req->header(HTTP_STRICT_MODE), "false")) {
request.__set_strictMode(false);
} else if (boost::iequals(http_req->header(HTTP_STRICT_MODE), "true")) {
} else if (iequal(http_req->header(HTTP_STRICT_MODE), "true")) {
request.__set_strictMode(true);
} else {
return Status::InvalidArgument("Invalid strict mode format. Must be bool type");
Expand All @@ -456,7 +458,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
request.__set_json_root(http_req->header(HTTP_JSONROOT));
}
if (!http_req->header(HTTP_STRIP_OUTER_ARRAY).empty()) {
if (boost::iequals(http_req->header(HTTP_STRIP_OUTER_ARRAY), "true")) {
if (iequal(http_req->header(HTTP_STRIP_OUTER_ARRAY), "true")) {
request.__set_strip_outer_array(true);
} else {
request.__set_strip_outer_array(false);
Expand All @@ -465,7 +467,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
request.__set_strip_outer_array(false);
}
if (!http_req->header(HTTP_NUM_AS_STRING).empty()) {
if (boost::iequals(http_req->header(HTTP_NUM_AS_STRING), "true")) {
if (iequal(http_req->header(HTTP_NUM_AS_STRING), "true")) {
request.__set_num_as_string(true);
} else {
request.__set_num_as_string(false);
Expand All @@ -474,7 +476,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
request.__set_num_as_string(false);
}
if (!http_req->header(HTTP_FUZZY_PARSE).empty()) {
if (boost::iequals(http_req->header(HTTP_FUZZY_PARSE), "true")) {
if (iequal(http_req->header(HTTP_FUZZY_PARSE), "true")) {
request.__set_fuzzy_parse(true);
} else {
request.__set_fuzzy_parse(false);
Expand All @@ -484,7 +486,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
}

if (!http_req->header(HTTP_READ_JSON_BY_LINE).empty()) {
if (boost::iequals(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) {
if (iequal(http_req->header(HTTP_READ_JSON_BY_LINE), "true")) {
request.__set_read_json_by_line(true);
} else {
request.__set_read_json_by_line(false);
Expand All @@ -507,7 +509,7 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req, StreamLoadContext*
}

if (!http_req->header(HTTP_LOAD_TO_SINGLE_TABLET).empty()) {
if (boost::iequals(http_req->header(HTTP_LOAD_TO_SINGLE_TABLET), "true")) {
if (iequal(http_req->header(HTTP_LOAD_TO_SINGLE_TABLET), "true")) {
request.__set_load_to_single_tablet(true);
} else {
request.__set_load_to_single_tablet(false);
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/rowset/segment_v2/rle_page.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ class RlePageBuilder : public PageBuilder {

void reset() override {
_count = 0;
_finished = false;
_rle_encoder->Clear();
_rle_encoder->Reserve(RLE_PAGE_HEADER_SIZE, 0);
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/descriptors.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ vectorized::DataTypePtr SlotDescriptor::get_data_type_ptr() const {

std::string SlotDescriptor::debug_string() const {
std::stringstream out;
out << "Slot(id=" << _id << " type=" << _type << " col=" << _col_pos
out << "Slot(id=" << _id << " type=" << _type << " col=" << _col_pos << " col_name=" << _col_name
<< " offset=" << _tuple_offset << " null=" << _null_indicator_offset.debug_string() << ")";
return out.str();
}
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ set(VEC_FILES
exec/vtable_function_node.cpp
exec/vbroker_scan_node.cpp
exec/vbroker_scanner.cpp
exec/vparquet_scanner.cpp
exec/join/vhash_join_node.cpp
exprs/vectorized_agg_fn.cpp
exprs/vectorized_fn_call.cpp
Expand Down Expand Up @@ -189,6 +190,11 @@ set(VEC_FILES
runtime/vdata_stream_recvr.cpp
runtime/vdata_stream_mgr.cpp
runtime/vpartition_info.cpp
runtime/vsorted_run_merger.cpp
runtime/vload_channel.cpp
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the code not add in this PR?

runtime/vload_channel_mgr.cpp
runtime/vtablets_channel.cpp
utils/arrow_column_to_doris_column.cpp
runtime/vsorted_run_merger.cpp)

add_library(Vec STATIC
Expand Down
Loading