From f8bb964da13505f188cf543a1aec808629da806d Mon Sep 17 00:00:00 2001 From: lihaopeng Date: Wed, 27 Apr 2022 11:31:30 +0800 Subject: [PATCH 1/2] [feature-wip](stream-load-vec) Support stream load in vectorized engine (#8709) Implement vectorized stream load. Added fe configuration option `enable_vectorized_load` to enable vectorized stream load. Co-authored-by: tengjp@outlook.com Co-authored-by: mrhhsg@gmail.com Co-authored-by: minghong.zhou@163.com Co-authored-by: HappenLee Co-authored-by: zhoubintao <35688959+zbtzbtzbt@users.noreply.github.com> --- be/src/exec/base_scanner.h | 10 + be/src/exec/broker_scan_node.cpp | 14 +- be/src/exec/broker_scan_node.h | 13 +- be/src/exec/broker_scanner.cpp | 8 +- be/src/exec/broker_scanner.h | 16 +- be/src/exec/exec_node.cpp | 8 +- be/src/exec/tablet_sink.cpp | 307 ++++---- be/src/exec/tablet_sink.h | 121 ++-- be/src/olap/compaction.cpp | 2 +- be/src/olap/delta_writer.cpp | 45 +- be/src/olap/delta_writer.h | 11 +- be/src/olap/memtable.cpp | 180 ++++- be/src/olap/memtable.h | 82 ++- be/src/olap/merger.cpp | 50 ++ be/src/olap/merger.h | 4 + be/src/olap/olap_define.h | 3 + be/src/olap/reader.cpp | 11 + be/src/olap/row_block2.cpp | 2 - be/src/olap/row_cursor_cell.h | 2 +- be/src/olap/rowset/beta_rowset_writer.cpp | 41 +- be/src/olap/rowset/beta_rowset_writer.h | 2 + be/src/olap/rowset/rowset_writer.h | 7 +- .../olap/rowset/segment_v2/column_writer.cpp | 21 + be/src/olap/rowset/segment_v2/column_writer.h | 6 + .../olap/rowset/segment_v2/segment_writer.cpp | 94 ++- .../olap/rowset/segment_v2/segment_writer.h | 21 +- be/src/runtime/load_channel.cpp | 63 +- be/src/runtime/load_channel.h | 74 +- be/src/runtime/load_channel_mgr.cpp | 65 +- be/src/runtime/load_channel_mgr.h | 77 +- be/src/runtime/tablets_channel.cpp | 80 +- be/src/runtime/tablets_channel.h | 108 ++- be/src/service/internal_service.cpp | 31 + be/src/service/internal_service.h | 5 + be/src/udf/udf.cpp | 2 +- be/src/vec/CMakeLists.txt | 3 + .../aggregate_function_reader.cpp | 41 +- .../aggregate_function_reader.h | 7 +- .../aggregate_function_simple_factory.cpp | 4 +- .../aggregate_function_window.cpp | 96 +-- .../aggregate_function_window.h | 69 +- be/src/vec/core/block.cpp | 18 + be/src/vec/core/block.h | 25 + be/src/vec/exec/vbroker_scan_node.cpp | 228 ++++++ be/src/vec/exec/vbroker_scan_node.h | 52 ++ be/src/vec/exec/vbroker_scanner.cpp | 293 ++++++++ be/src/vec/exec/vbroker_scanner.h | 39 + be/src/vec/exprs/vexpr_context.cpp | 1 + be/src/vec/olap/block_reader.cpp | 2 +- be/src/vec/olap/olap_data_convertor.cpp | 684 ++++++++++++++++++ be/src/vec/olap/olap_data_convertor.h | 210 ++++++ be/src/vec/olap/vcollect_iterator.cpp | 47 +- be/src/vec/olap/vcollect_iterator.h | 5 + be/src/vec/sink/vtablet_sink.cpp | 293 +++++++- be/src/vec/sink/vtablet_sink.h | 44 ++ be/test/CMakeLists.txt | 3 + be/test/olap/delta_writer_test.cpp | 244 +++++++ .../olap/rowset/segment_v2/segment_test.cpp | 6 +- be/test/tools/benchmark_tool.cpp | 2 +- be/test/vec/exec/vbroker_scan_node_test.cpp | 644 +++++++++++++++++ be/test/vec/exec/vbroker_scanner_test.cpp | 460 ++++++++++++ be/test/vec/exec/vtablet_sink_test.cpp | 607 ++++++++++++++++ be/test/vec/exprs/vexpr_test.cpp | 10 +- .../java/org/apache/doris/common/Config.java | 2 + .../doris/planner/StreamLoadPlanner.java | 6 + gensrc/proto/internal_service.proto | 33 + 66 files changed, 5146 insertions(+), 618 deletions(-) create mode 100644 be/src/vec/exec/vbroker_scan_node.cpp create mode 100644 be/src/vec/exec/vbroker_scan_node.h create mode 100644 be/src/vec/exec/vbroker_scanner.cpp create mode 100644 be/src/vec/exec/vbroker_scanner.h create mode 100644 be/src/vec/olap/olap_data_convertor.cpp create mode 100644 be/src/vec/olap/olap_data_convertor.h create mode 100644 be/test/vec/exec/vbroker_scan_node_test.cpp create mode 100644 be/test/vec/exec/vbroker_scanner_test.cpp create mode 100644 be/test/vec/exec/vtablet_sink_test.cpp diff --git a/be/src/exec/base_scanner.h b/be/src/exec/base_scanner.h index c01891c3818db4..338818914b340b 100644 --- a/be/src/exec/base_scanner.h +++ b/be/src/exec/base_scanner.h @@ -33,6 +33,11 @@ class MemTracker; class RuntimeState; class ExprContext; +namespace vectorized { +class IColumn; +using MutableColumnPtr = IColumn::MutablePtr; +} + // The counter will be passed to each scanner. // Note that this struct is not thread safe. // So if we support concurrent scan in the future, we need to modify this struct. @@ -56,6 +61,11 @@ class BaseScanner { // Get next tuple virtual Status get_next(Tuple* tuple, MemPool* tuple_pool, bool* eof, bool *fill_tuple) = 0; + // Get next block + virtual Status get_next(std::vector& columns, bool* eof) { + return Status::NotSupported("Not Implemented get block"); + } + // Close this scanner virtual void close() = 0; Status fill_dest_tuple(Tuple* dest_tuple, MemPool* mem_pool, bool* fill_tuple); diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp index e16410e0f5983f..ce450745a7e29a 100644 --- a/be/src/exec/broker_scan_node.cpp +++ b/be/src/exec/broker_scan_node.cpp @@ -21,7 +21,7 @@ #include #include "common/object_pool.h" -#include "exec/broker_scanner.h" +#include "vec/exec/vbroker_scanner.h" #include "exec/json_scanner.h" #include "exec/orc_scanner.h" #include "exec/parquet_scanner.h" @@ -238,9 +238,15 @@ std::unique_ptr BrokerScanNode::create_scanner(const TBrokerScanRan _pre_filter_texprs, counter); break; default: - scan = new BrokerScanner(_runtime_state, runtime_profile(), scan_range.params, - scan_range.ranges, scan_range.broker_addresses, - _pre_filter_texprs, counter); + if (_vectorized) { + scan = new vectorized::VBrokerScanner(_runtime_state, runtime_profile(), scan_range.params, + scan_range.ranges, scan_range.broker_addresses, + _pre_filter_texprs, counter); + } else { + scan = new BrokerScanner(_runtime_state, runtime_profile(), scan_range.params, + scan_range.ranges, scan_range.broker_addresses, + _pre_filter_texprs, counter); + } } std::unique_ptr scanner(scan); return scanner; diff --git a/be/src/exec/broker_scan_node.h b/be/src/exec/broker_scan_node.h index c4494731d87b18..68adbf007c4433 100644 --- a/be/src/exec/broker_scan_node.h +++ b/be/src/exec/broker_scan_node.h @@ -65,7 +65,6 @@ class BrokerScanNode : public ScanNode { // Write debug string of this into out. virtual void debug_string(int indentation_level, std::stringstream* out) const override; -private: // Update process status to one failed status, // NOTE: Must hold the mutex of this scan node bool update_status(const Status& new_status) { @@ -76,8 +75,12 @@ class BrokerScanNode : public ScanNode { return false; } + std::unique_ptr create_scanner(const TBrokerScanRange& scan_range, + ScannerCounter* counter); + +private: // Create scanners to do scan job - Status start_scanners(); + virtual Status start_scanners(); // One scanner worker, This scanner will handle 'length' ranges start from start_idx void scanner_worker(int start_idx, int length); @@ -86,10 +89,8 @@ class BrokerScanNode : public ScanNode { Status scanner_scan(const TBrokerScanRange& scan_range, const std::vector& conjunct_ctxs, ScannerCounter* counter); - std::unique_ptr create_scanner(const TBrokerScanRange& scan_range, - ScannerCounter* counter); - -private: +protected: + bool _vectorized = false; TupleId _tuple_id; RuntimeState* _runtime_state; TupleDescriptor* _tuple_desc; diff --git a/be/src/exec/broker_scanner.cpp b/be/src/exec/broker_scanner.cpp index 646bcbc1f45b2a..f91a685c44f799 100644 --- a/be/src/exec/broker_scanner.cpp +++ b/be/src/exec/broker_scanner.cpp @@ -30,8 +30,6 @@ #include "exec/plain_binary_line_reader.h" #include "exec/plain_text_line_reader.h" #include "exec/s3_reader.h" -#include "exec/text_converter.h" -#include "exec/text_converter.hpp" #include "exprs/expr.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" @@ -82,10 +80,6 @@ BrokerScanner::~BrokerScanner() { Status BrokerScanner::open() { RETURN_IF_ERROR(BaseScanner::open()); // base default function - _text_converter.reset(new (std::nothrow) TextConverter('\\')); - if (_text_converter == nullptr) { - return Status::InternalError("No memory error."); - } return Status::OK(); } @@ -272,7 +266,7 @@ Status BrokerScanner::open_line_reader() { return Status::InternalError(ss.str()); } size += 1; - // not first range will always skip one line + // not first range will always skip one line _skip_lines = 1; } diff --git a/be/src/exec/broker_scanner.h b/be/src/exec/broker_scanner.h index 23d1a81c02c5f2..e56fce6bc63316 100644 --- a/be/src/exec/broker_scanner.h +++ b/be/src/exec/broker_scanner.h @@ -56,7 +56,7 @@ class BrokerScanner : public BaseScanner { const TBrokerScanRangeParams& params, const std::vector& ranges, const std::vector& broker_addresses, const std::vector& pre_filter_texprs, ScannerCounter* counter); - ~BrokerScanner(); + virtual ~BrokerScanner(); // Open this scanner, will initialize information need to Status open() override; @@ -67,12 +67,16 @@ class BrokerScanner : public BaseScanner { // Close this scanner void close() override; +protected: + // Read next buffer from reader + Status open_next_reader(); + + Status _line_to_src_tuple(const Slice& line); + private: Status open_file_reader(); Status create_decompressor(TFileFormatType::type type); Status open_line_reader(); - // Read next buffer from reader - Status open_next_reader(); // Split one text line to values void split_line(const Slice& line); @@ -88,14 +92,10 @@ class BrokerScanner : public BaseScanner { // output is tuple Status _convert_one_row(const Slice& line, Tuple* tuple, MemPool* tuple_pool, bool* fill_tuple); - Status _line_to_src_tuple(const Slice& line); - -private: +protected: const std::vector& _ranges; const std::vector& _broker_addresses; - std::unique_ptr _text_converter; - std::string _value_separator; std::string _line_delimiter; TFileFormatType::type _file_format_type; diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 2ff50229dce7cb..1603324fcf1c28 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -82,6 +82,7 @@ #include "vec/exec/vsort_node.h" #include "vec/exec/vtable_function_node.h" #include "vec/exec/vunion_node.h" +#include "vec/exec/vbroker_scan_node.h" #include "vec/exprs/vexpr.h" namespace doris { @@ -392,6 +393,7 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN case TPlanNodeType::SELECT_NODE: case TPlanNodeType::REPEAT_NODE: case TPlanNodeType::TABLE_FUNCTION_NODE: + case TPlanNodeType::BROKER_SCAN_NODE: break; default: { const auto& i = _TPlanNodeType_VALUES_TO_NAMES.find(tnode.node_type); @@ -555,7 +557,11 @@ Status ExecNode::create_node(RuntimeState* state, ObjectPool* pool, const TPlanN return Status::OK(); case TPlanNodeType::BROKER_SCAN_NODE: - *node = pool->add(new BrokerScanNode(pool, tnode, descs)); + if (state->enable_vectorized_exec()) { + *node = pool->add(new vectorized::VBrokerScanNode(pool, tnode, descs)); + } else { + *node = pool->add(new BrokerScanNode(pool, tnode, descs)); + } return Status::OK(); case TPlanNodeType::REPEAT_NODE: diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp index 9928b7e26fda2d..b195591de0ef3f 100644 --- a/be/src/exec/tablet_sink.cpp +++ b/be/src/exec/tablet_sink.cpp @@ -41,6 +41,9 @@ #include "util/time.h" #include "util/uid_util.h" +#include "vec/core/block.h" +#include "vec/sink/vtablet_sink.h" + namespace doris { namespace stream_load { @@ -65,7 +68,9 @@ NodeChannel::~NodeChannel() noexcept { delete _add_batch_closure; _add_batch_closure = nullptr; } - _cur_add_batch_request.release_id(); + if (!_is_vectorized) { + _cur_add_batch_request.release_id(); + } } // if "_cancelled" is set to true, @@ -86,7 +91,6 @@ Status NodeChannel::init(RuntimeState* state) { _row_desc.reset(new RowDescriptor(_tuple_desc, false)); _batch_size = state->batch_size(); - _cur_batch.reset(new RowBatch(*_row_desc, _batch_size)); _stub = state->exec_env()->brpc_internal_client_cache()->get_client(_node_info.host, _node_info.brpc_port); @@ -97,12 +101,18 @@ Status NodeChannel::init(RuntimeState* state) { return Status::InternalError("get rpc stub failed"); } - // Initialize _cur_add_batch_request - _cur_add_batch_request.set_allocated_id(&_parent->_load_id); - _cur_add_batch_request.set_index_id(_index_channel->_index_id); - _cur_add_batch_request.set_sender_id(_parent->_sender_id); - _cur_add_batch_request.set_backend_id(_node_id); - _cur_add_batch_request.set_eos(false); + if (!_is_vectorized) { + _cur_batch.reset(new RowBatch(*_row_desc, _batch_size)); + + // Initialize _cur_add_batch_request + _cur_add_batch_request.set_allocated_id(&_parent->_load_id); + _cur_add_batch_request.set_index_id(_index_channel->_index_id); + _cur_add_batch_request.set_sender_id(_parent->_sender_id); + _cur_add_batch_request.set_backend_id(_node_id); + _cur_add_batch_request.set_eos(false); + + _name = fmt::format("NodeChannel[{}-{}]", _index_channel->_index_id, _node_id); + } _rpc_timeout_ms = state->query_options().query_timeout * 1000; _timeout_watch.start(); @@ -110,7 +120,6 @@ Status NodeChannel::init(RuntimeState* state) { _load_info = "load_id=" + print_id(_parent->_load_id) + ", txn_id=" + std::to_string(_parent->_txn_id); - _name = fmt::format("NodeChannel[{}-{}]", _index_channel->_index_id, _node_id); return Status::OK(); } @@ -132,6 +141,7 @@ void NodeChannel::open() { request.set_load_channel_timeout_s(_parent->_load_channel_timeout_s); request.set_is_high_priority(_parent->_is_high_priority); request.set_sender_ip(BackendOptions::get_localhost()); + request.set_is_vectorized(_is_vectorized); _open_closure = new RefCountClosure(); _open_closure->ref(); @@ -186,67 +196,69 @@ Status NodeChannel::open_wait() { return status; } - // add batch closure - _add_batch_closure = ReusableClosure::create(); - _add_batch_closure->addFailedHandler([this](bool is_last_rpc) { - std::lock_guard l(this->_closed_lock); - if (this->_is_closed) { - // if the node channel is closed, no need to call `mark_as_failed`, - // and notice that _index_channel may already be destroyed. - return; - } - // If rpc failed, mark all tablets on this node channel as failed - _index_channel->mark_as_failed(this->node_id(), this->host(), - _add_batch_closure->cntl.ErrorText(), -1); - Status st = _index_channel->check_intolerable_failure(); - if (!st.ok()) { - _cancel_with_msg(fmt::format("{}, err: {}", channel_info(), st.get_error_msg())); - } else if (is_last_rpc) { - // if this is last rpc, will must set _add_batches_finished. otherwise, node channel's close_wait - // will be blocked. - _add_batches_finished = true; - } - }); - - _add_batch_closure->addSuccessHandler([this](const PTabletWriterAddBatchResult& result, - bool is_last_rpc) { - std::lock_guard l(this->_closed_lock); - if (this->_is_closed) { - // if the node channel is closed, no need to call the following logic, - // and notice that _index_channel may already be destroyed. - return; - } - Status status(result.status()); - if (status.ok()) { - // if has error tablet, handle them first - for (auto& error : result.tablet_errors()) { - _index_channel->mark_as_failed(this->node_id(), this->host(), error.msg(), - error.tablet_id()); + if (!_is_vectorized) { + // add batch closure + _add_batch_closure = ReusableClosure::create(); + _add_batch_closure->addFailedHandler([this](bool is_last_rpc) { + std::lock_guard l(this->_closed_lock); + if (this->_is_closed) { + // if the node channel is closed, no need to call `mark_as_failed`, + // and notice that _index_channel may already be destroyed. + return; } - + // If rpc failed, mark all tablets on this node channel as failed + _index_channel->mark_as_failed(this->node_id(), this->host(), + _add_batch_closure->cntl.ErrorText(), -1); Status st = _index_channel->check_intolerable_failure(); if (!st.ok()) { - _cancel_with_msg(st.get_error_msg()); + _cancel_with_msg(fmt::format("{}, err: {}", channel_info(), st.get_error_msg())); } else if (is_last_rpc) { - for (auto& tablet : result.tablet_vec()) { - TTabletCommitInfo commit_info; - commit_info.tabletId = tablet.tablet_id(); - commit_info.backendId = _node_id; - _tablet_commit_infos.emplace_back(std::move(commit_info)); - } + // if this is last rpc, will must set _add_batches_finished. otherwise, node channel's close_wait + // will be blocked. _add_batches_finished = true; } - } else { - _cancel_with_msg(fmt::format("{}, add batch req success but status isn't ok, err: {}", - channel_info(), status.get_error_msg())); - } + }); - if (result.has_execution_time_us()) { - _add_batch_counter.add_batch_execution_time_us += result.execution_time_us(); - _add_batch_counter.add_batch_wait_execution_time_us += result.wait_execution_time_us(); - _add_batch_counter.add_batch_num++; - } - }); + _add_batch_closure->addSuccessHandler([this](const PTabletWriterAddBatchResult& result, + bool is_last_rpc) { + std::lock_guard l(this->_closed_lock); + if (this->_is_closed) { + // if the node channel is closed, no need to call the following logic, + // and notice that _index_channel may already be destroyed. + return; + } + Status status(result.status()); + if (status.ok()) { + // if has error tablet, handle them first + for (auto& error : result.tablet_errors()) { + _index_channel->mark_as_failed(this->node_id(), this->host(), error.msg(), + error.tablet_id()); + } + + Status st = _index_channel->check_intolerable_failure(); + if (!st.ok()) { + _cancel_with_msg(st.get_error_msg()); + } else if (is_last_rpc) { + for (auto& tablet : result.tablet_vec()) { + TTabletCommitInfo commit_info; + commit_info.tabletId = tablet.tablet_id(); + commit_info.backendId = _node_id; + _tablet_commit_infos.emplace_back(std::move(commit_info)); + } + _add_batches_finished = true; + } + } else { + _cancel_with_msg(fmt::format("{}, add batch req success but status isn't ok, err: {}", + channel_info(), status.get_error_msg())); + } + + if (result.has_execution_time_us()) { + _add_batch_counter.add_batch_execution_time_us += result.execution_time_us(); + _add_batch_counter.add_batch_wait_execution_time_us += result.wait_execution_time_us(); + _add_batch_counter.add_batch_num++; + } + }); + } return status; } @@ -300,59 +312,6 @@ Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) { return Status::OK(); } -// Used for vectorized engine. -// TODO(cmy): deprecated, need refactor -Status NodeChannel::add_row(BlockRow& block_row, int64_t tablet_id) { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_node_channel_tracker); - // If add_row() when _eos_is_produced==true, there must be sth wrong, we can only mark this channel as failed. - auto st = none_of({_cancelled, _eos_is_produced}); - if (!st.ok()) { - if (_cancelled) { - std::lock_guard l(_cancel_msg_lock); - return Status::InternalError("add row failed. " + _cancel_msg); - } else { - return st.clone_and_prepend("already stopped, can't add row. cancelled/eos: "); - } - } - - // We use OlapTableSink mem_tracker which has the same ancestor of _plan node, - // so in the ideal case, mem limit is a matter for _plan node. - // But there is still some unfinished things, we do mem limit here temporarily. - // _cancelled may be set by rpc callback, and it's possible that _cancelled might be set in any of the steps below. - // It's fine to do a fake add_row() and return OK, because we will check _cancelled in next add_row() or mark_close(). - while (!_cancelled && _pending_batches_num > 0 && - (_pending_batches_bytes > _max_pending_batches_bytes || - _parent->_mem_tracker->any_limit_exceeded())) { - SCOPED_ATOMIC_TIMER(&_mem_exceeded_block_ns); - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } - - auto row_no = _cur_batch->add_row(); - if (row_no == RowBatch::INVALID_ROW_INDEX) { - { - SCOPED_ATOMIC_TIMER(&_queue_push_lock_ns); - std::lock_guard l(_pending_batches_lock); - _pending_batches_bytes += _cur_batch->tuple_data_pool()->total_reserved_bytes(); - //To simplify the add_row logic, postpone adding batch into req until the time of sending req - _pending_batches.emplace(std::move(_cur_batch), _cur_add_batch_request); - _pending_batches_num++; - } - - _cur_batch.reset(new RowBatch(*_row_desc, _batch_size)); - _cur_add_batch_request.clear_tablet_ids(); - - row_no = _cur_batch->add_row(); - } - DCHECK_NE(row_no, RowBatch::INVALID_ROW_INDEX); - - _cur_batch->get_row(row_no)->set_tuple( - 0, block_row.first->deep_copy_tuple(*_tuple_desc, _cur_batch->tuple_data_pool(), - block_row.second, 0, true)); - _cur_batch->commit_last_row(); - _cur_add_batch_request.add_tablet_ids(tablet_id); - return Status::OK(); -} - void NodeChannel::mark_close() { SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_node_channel_tracker); auto st = none_of({_cancelled, _eos_is_produced}); @@ -377,6 +336,11 @@ void NodeChannel::mark_close() { return; } +void NodeChannel::_close_check() { + std::lock_guard lg(_pending_batches_lock); + CHECK(_pending_batches.empty()) << name(); + CHECK(_cur_batch == nullptr) << name(); +} Status NodeChannel::close_wait(RuntimeState* state) { SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_node_channel_tracker); // set _is_closed to true finally @@ -403,11 +367,7 @@ Status NodeChannel::close_wait(RuntimeState* state) { _close_time_ms = UnixMillis() - _close_time_ms; if (_add_batches_finished) { - { - std::lock_guard lg(_pending_batches_lock); - CHECK(_pending_batches.empty()) << name(); - CHECK(_cur_batch == nullptr) << name(); - } + _close_check(); state->tablet_commit_infos().insert(state->tablet_commit_infos().end(), std::make_move_iterator(_tablet_commit_infos.begin()), std::make_move_iterator(_tablet_commit_infos.end())); @@ -588,8 +548,6 @@ void NodeChannel::clear_all_batches() { _cur_batch.reset(); } -IndexChannel::~IndexChannel() {} - Status IndexChannel::init(RuntimeState* state, const std::vector& tablets) { SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_index_channel_tracker); for (auto& tablet : tablets) { @@ -606,7 +564,11 @@ Status IndexChannel::init(RuntimeState* state, const std::vector_pool. // Because the deconstruction of NodeChannel may take a long time to wait rpc finish. // but the ObjectPool will hold a spin lock to delete objects. - channel = std::make_shared(_parent, this, node_id); + if (!_is_vectorized) { + channel = std::make_shared(_parent, this, node_id); + } else { + channel = std::make_shared(_parent, this, node_id); + } _node_channels.emplace(node_id, channel); } else { channel = it->second; @@ -623,35 +585,6 @@ Status IndexChannel::init(RuntimeState* state, const std::vectortotal_time_counter()); SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); - // Prepare the exprs to run. - RETURN_IF_ERROR(Expr::prepare(_output_expr_ctxs, state, _input_row_desc, _expr_mem_tracker)); + if (!_is_vectorized) { + // Prepare the exprs to run. + RETURN_IF_ERROR(Expr::prepare(_output_expr_ctxs, state, _input_row_desc, _expr_mem_tracker)); + } // get table's tuple descriptor _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_desc_id); @@ -783,27 +723,31 @@ Status OlapTableSink::prepare(RuntimeState* state) { LOG(WARNING) << "unknown destination tuple descriptor, id=" << _tuple_desc_id; return Status::InternalError("unknown destination tuple descriptor"); } - if (!_output_expr_ctxs.empty()) { - if (_output_expr_ctxs.size() != _output_tuple_desc->slots().size()) { - LOG(WARNING) << "number of exprs is not same with slots, num_exprs=" - << _output_expr_ctxs.size() - << ", num_slots=" << _output_tuple_desc->slots().size(); - return Status::InternalError("number of exprs is not same with slots"); - } - for (int i = 0; i < _output_expr_ctxs.size(); ++i) { - if (!is_type_compatible(_output_expr_ctxs[i]->root()->type().type, - _output_tuple_desc->slots()[i]->type().type)) { - LOG(WARNING) << "type of exprs is not match slot's, expr_type=" - << _output_expr_ctxs[i]->root()->type().type - << ", slot_type=" << _output_tuple_desc->slots()[i]->type().type - << ", slot_name=" << _output_tuple_desc->slots()[i]->col_name(); - return Status::InternalError("expr's type is not same with slot's"); + + _output_row_desc = _pool->add(new RowDescriptor(_output_tuple_desc, false)); + + if (!_is_vectorized) { + if (!_output_expr_ctxs.empty()) { + if (_output_expr_ctxs.size() != _output_tuple_desc->slots().size()) { + LOG(WARNING) << "number of exprs is not same with slots, num_exprs=" + << _output_expr_ctxs.size() + << ", num_slots=" << _output_tuple_desc->slots().size(); + return Status::InternalError("number of exprs is not same with slots"); + } + for (int i = 0; i < _output_expr_ctxs.size(); ++i) { + if (!is_type_compatible(_output_expr_ctxs[i]->root()->type().type, + _output_tuple_desc->slots()[i]->type().type)) { + LOG(WARNING) << "type of exprs is not match slot's, expr_type=" + << _output_expr_ctxs[i]->root()->type().type + << ", slot_type=" << _output_tuple_desc->slots()[i]->type().type + << ", slot_name=" << _output_tuple_desc->slots()[i]->col_name(); + return Status::InternalError("expr's type is not same with slot's"); + } } } - } - _output_row_desc = _pool->add(new RowDescriptor(_output_tuple_desc, false)); - _output_batch.reset(new RowBatch(*_output_row_desc, state->batch_size())); + _output_batch.reset(new RowBatch(*_output_row_desc, state->batch_size())); + } _max_decimalv2_val.resize(_output_tuple_desc->slots().size()); _min_decimalv2_val.resize(_output_tuple_desc->slots().size()); @@ -865,9 +809,8 @@ Status OlapTableSink::prepare(RuntimeState* state) { tablets.emplace_back(std::move(tablet_with_partition)); } } - auto channel = std::make_shared(this, index->index_id); - RETURN_IF_ERROR(channel->init(state, tablets)); - _channels.emplace_back(channel); + _channels.emplace_back(new IndexChannel(this, index->index_id, _is_vectorized)); + RETURN_IF_ERROR(_channels.back()->init(state, tablets)); } return Status::OK(); @@ -877,8 +820,10 @@ Status OlapTableSink::open(RuntimeState* state) { SCOPED_TIMER(_profile->total_time_counter()); SCOPED_TIMER(_open_timer); SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); - // Prepare the exprs to run. - RETURN_IF_ERROR(Expr::open(_output_expr_ctxs, state)); + if (!_is_vectorized) { + // Prepare the exprs to run. + RETURN_IF_ERROR(Expr::open(_output_expr_ctxs, state)); + } for (auto index_channel : _channels) { index_channel->for_each_node_channel( diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h index 1a902e834bff09..84f069dce7becd 100644 --- a/be/src/exec/tablet_sink.h +++ b/be/src/exec/tablet_sink.h @@ -54,6 +54,10 @@ class TupleDescriptor; class ExprContext; class TExpr; +namespace vectorized { +class Block; +class MutableBlock; +} namespace stream_load { class OlapTableSink; @@ -87,18 +91,18 @@ struct AddBatchCounter { // So using create() to get the closure pointer is recommended. We can delete the closure ptr before the capture vars destruction. // Delete this point is safe, don't worry about RPC callback will run after ReusableClosure deleted. template -class ReusableClosure : public google::protobuf::Closure { +class ReusableClosure final: public google::protobuf::Closure { public: ReusableClosure() : cid(INVALID_BTHREAD_ID) {} - ~ReusableClosure() { + ~ReusableClosure() override { // shouldn't delete when Run() is calling or going to be called, wait for current Run() done. join(); } static ReusableClosure* create() { return new ReusableClosure(); } - void addFailedHandler(std::function fn) { failed_handler = fn; } - void addSuccessHandler(std::function fn) { success_handler = fn; } + void addFailedHandler(const std::function& fn) { failed_handler = fn; } + void addSuccessHandler(const std::function& fn) { success_handler = fn; } void join() { // We rely on in_flight to assure one rpc is running, @@ -166,25 +170,27 @@ class IndexChannel; class NodeChannel { public: NodeChannel(OlapTableSink* parent, IndexChannel* index_channel, int64_t node_id); - ~NodeChannel() noexcept; + virtual ~NodeChannel() noexcept; // called before open, used to add tablet located in this backend void add_tablet(const TTabletWithPartition& tablet) { _all_tablets.emplace_back(tablet); } - Status init(RuntimeState* state); + virtual Status init(RuntimeState* state); // we use open/open_wait to parallel void open(); - Status open_wait(); + virtual Status open_wait(); Status add_row(Tuple* tuple, int64_t tablet_id); - - Status add_row(BlockRow& block_row, int64_t tablet_id); + virtual Status add_row(const BlockRow& block_row, int64_t tablet_id) { + LOG(FATAL) << "add block row to NodeChannel not supported"; + return Status::OK(); + } // two ways to stop channel: // 1. mark_close()->close_wait() PS. close_wait() will block waiting for the last AddBatch rpc response. // 2. just cancel() - void mark_close(); + virtual void mark_close(); Status close_wait(RuntimeState* state); void cancel(const std::string& cancel_msg); @@ -194,8 +200,8 @@ class NodeChannel { // 1: running, haven't reach eos. // only allow 1 rpc in flight // plz make sure, this func should be called after open_wait(). - int try_send_and_fetch_status(RuntimeState* state, - std::unique_ptr& thread_pool_token); + virtual int try_send_and_fetch_status(RuntimeState* state, + std::unique_ptr& thread_pool_token); void try_send_batch(RuntimeState* state); @@ -223,15 +229,21 @@ class NodeChannel { void clear_all_batches(); + virtual void clear_all_blocks() { + LOG(FATAL) << "NodeChannel::clear_all_blocks not supported"; + } + std::string channel_info() const { return fmt::format("{}, {}, node={}:{}", _name, _load_info, _node_info.host, _node_info.brpc_port); } -private: +protected: void _cancel_with_msg(const std::string& msg); + virtual void _close_check(); -private: +protected: + bool _is_vectorized = false; OlapTableSink* _parent = nullptr; IndexChannel* _index_channel = nullptr; int64_t _node_id = -1; @@ -248,6 +260,9 @@ class NodeChannel { int64_t _next_packet_seq = 0; MonotonicStopWatch _timeout_watch; + // the timestamp when this node channel be marked closed and finished closed + uint64_t _close_time_ms = 0; + // user cancel or get some errors std::atomic _cancelled {false}; SpinLock _cancel_msg_lock; @@ -257,26 +272,21 @@ class NodeChannel { std::atomic _send_finished {false}; // add batches finished means the last rpc has be response, used to check whether this channel can be closed - std::atomic _add_batches_finished {false}; + std::atomic _add_batches_finished {false}; // reuse for vectorized bool _eos_is_produced {false}; // only for restricting producer behaviors std::unique_ptr _row_desc; int _batch_size = 0; - std::unique_ptr _cur_batch; - PTabletWriterAddBatchRequest _cur_add_batch_request; - std::mutex _pending_batches_lock; - using AddBatchReq = std::pair, PTabletWriterAddBatchRequest>; - std::queue _pending_batches; - std::atomic _pending_batches_num {0}; // limit _pending_batches size std::atomic _pending_batches_bytes {0}; size_t _max_pending_batches_bytes {10 * 1024 * 1024}; + std::mutex _pending_batches_lock; // reuse for vectorized + std::atomic _pending_batches_num {0}; // reuse for vectorized std::shared_ptr _stub = nullptr; RefCountClosure* _open_closure = nullptr; - ReusableClosure* _add_batch_closure = nullptr; std::vector _all_tablets; std::vector _tablet_commit_infos; @@ -287,18 +297,6 @@ class NodeChannel { std::atomic _queue_push_lock_ns {0}; std::atomic _actual_consume_ns {0}; - // buffer for saving serialized row batch data. - // In the non-attachment approach, we need to use two PRowBatch structures alternately - // so that when one PRowBatch is sent, the other PRowBatch can be used for the serialization of the next RowBatch. - // This is not necessary with the attachment approach, because the memory structures - // are already copied into attachment memory before sending, and will wait for - // the previous RPC to be fully completed before the next copy. - std::string _tuple_data_buffer; - std::string* _tuple_data_buffer_ptr = nullptr; - - // the timestamp when this node channel be marked closed and finished closed - uint64_t _close_time_ms = 0; - // lock to protect _is_closed. // The methods in the IndexChannel are called back in the RpcClosure in the NodeChannel. // However, this rpc callback may occur after the whole task is finished (e.g. due to network latency), @@ -309,20 +307,36 @@ class NodeChannel { // The IndexChannel is definitely accessible until the NodeChannel is closed. std::mutex _closed_lock; bool _is_closed = false; + +private: + // buffer for saving serialized row batch data. + // In the non-attachment approach, we need to use two PRowBatch structures alternately + // so that when one PRowBatch is sent, the other PRowBatch can be used for the serialization of the next RowBatch. + // This is not necessary with the attachment approach, because the memory structures + // are already copied into attachment memory before sending, and will wait for + // the previous RPC to be fully completed before the next copy. + std::string _tuple_data_buffer; + std::string* _tuple_data_buffer_ptr = nullptr; + + std::unique_ptr _cur_batch; + PTabletWriterAddBatchRequest _cur_add_batch_request; + using AddBatchReq = std::pair, PTabletWriterAddBatchRequest>; + std::queue _pending_batches; + ReusableClosure* _add_batch_closure = nullptr; }; class IndexChannel { public: - IndexChannel(OlapTableSink* parent, int64_t index_id) : _parent(parent), _index_id(index_id) { + IndexChannel(OlapTableSink* parent, int64_t index_id, bool is_vec) : + _parent(parent), _index_id(index_id), _is_vectorized(is_vec) { _index_channel_tracker = MemTracker::create_tracker(-1, "IndexChannel"); } - ~IndexChannel(); + ~IndexChannel() = default; Status init(RuntimeState* state, const std::vector& tablets); - void add_row(Tuple* tuple, int64_t tablet_id); - - void add_row(BlockRow& block_row, int64_t tablet_id); + template + void add_row(const Row& tuple, int64_t tablet_id); void for_each_node_channel( const std::function&)>& func) { @@ -343,9 +357,11 @@ class IndexChannel { private: friend class NodeChannel; + friend class VNodeChannel; OlapTableSink* _parent; int64_t _index_id; + bool _is_vectorized = false; // from backend channel to tablet_id // ATTN: must be placed before `_node_channels` and `_channels_by_tablet`. @@ -370,6 +386,21 @@ class IndexChannel { std::shared_ptr _index_channel_tracker; }; +template +void IndexChannel::add_row(const Row& tuple, int64_t tablet_id) { + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_index_channel_tracker); + auto it = _channels_by_tablet.find(tablet_id); + DCHECK(it != _channels_by_tablet.end()) << "unknown tablet, tablet_id=" << tablet_id; + for (const auto& channel : it->second) { + // if this node channel is already failed, this add_row will be skipped + auto st = channel->add_row(tuple, tablet_id); + if (!st.ok()) { + mark_as_failed(channel->node_id(), channel->host(), st.get_error_msg(), tablet_id); + // continue add row to other node, the error will be checked for every batch outside + } + } +} + // Write data to Olap Table. // When OlapTableSink::open() called, there will be a consumer thread running in the background. // When you call OlapTableSink::send(), you will be the producer who products pending batches. @@ -414,8 +445,11 @@ class OlapTableSink : public DataSink { protected: friend class NodeChannel; + friend class VNodeChannel; friend class IndexChannel; + bool _is_vectorized = false; + std::shared_ptr _mem_tracker; ObjectPool* _pool; @@ -430,8 +464,6 @@ class OlapTableSink : public DataSink { // this is tuple descriptor of destination OLAP table TupleDescriptor* _output_tuple_desc = nullptr; RowDescriptor* _output_row_desc = nullptr; - std::vector _output_expr_ctxs; - std::unique_ptr _output_batch; bool _need_validate_data = false; @@ -444,7 +476,6 @@ class OlapTableSink : public DataSink { // TODO(zc): think about cache this data std::shared_ptr _schema; - OlapTablePartitionParam* _partition = nullptr; OlapTableLocationParam* _location = nullptr; DorisNodesInfo* _nodes_info = nullptr; @@ -470,7 +501,6 @@ class OlapTableSink : public DataSink { int64_t _convert_batch_ns = 0; int64_t _validate_data_ns = 0; int64_t _send_data_ns = 0; - int64_t _serialize_batch_ns = 0; int64_t _number_input_rows = 0; int64_t _number_output_rows = 0; int64_t _number_filtered_rows = 0; @@ -513,6 +543,11 @@ class OlapTableSink : public DataSink { // only compute tablet index in the corresponding partition once for the whole time in olap table sink enum FindTabletMode { FIND_TABLET_EVERY_ROW, FIND_TABLET_EVERY_BATCH, FIND_TABLET_EVERY_SINK }; FindTabletMode findTabletMode = FindTabletMode::FIND_TABLET_EVERY_ROW; + +private: + OlapTablePartitionParam* _partition = nullptr; + std::vector _output_expr_ctxs; + std::unique_ptr _output_batch; }; } // namespace stream_load diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 4f90a0e3dbb96f..852ec5cae74baa 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -86,7 +86,7 @@ Status Compaction::do_compaction_impl(int64_t permits) { // 2. write merged rows to output rowset // The test results show that merger is low-memory-footprint, there is no need to tracker its mem pool Merger::Statistics stats; - auto res = Merger::merge_rowsets(_tablet, compaction_type(), _input_rs_readers, + auto res = Merger::vmerge_rowsets(_tablet, compaction_type(), _input_rs_readers, _output_rs_writer.get(), &stats); if (!res.ok()) { LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << res diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 14b442c7388319..70eb6a7b8e7b15 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -29,19 +29,20 @@ namespace doris { -Status DeltaWriter::open(WriteRequest* req, DeltaWriter** writer) { - *writer = new DeltaWriter(req, StorageEngine::instance()); +Status DeltaWriter::open(WriteRequest* req, DeltaWriter** writer, bool is_vec) { + *writer = new DeltaWriter(req, StorageEngine::instance(), is_vec); return Status::OK(); } -DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine) +DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, bool is_vec) : _req(*req), _tablet(nullptr), _cur_rowset(nullptr), _rowset_writer(nullptr), _tablet_schema(nullptr), _delta_written_success(false), - _storage_engine(storage_engine) {} + _storage_engine(storage_engine), + _is_vec(is_vec) {} DeltaWriter::~DeltaWriter() { if (_is_init && !_delta_written_success) { @@ -195,6 +196,40 @@ Status DeltaWriter::write(const RowBatch* row_batch, const std::vector& row return Status::OK(); } +Status DeltaWriter::write(const vectorized::Block* block, const std::vector& row_idxs) { + if (UNLIKELY(row_idxs.empty())) { + return Status::OK(); + } + std::lock_guard l(_lock); + if (!_is_init && !_is_cancelled) { + RETURN_NOT_OK(init()); + } + + if (_is_cancelled) { + return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED); + } + + int start = 0, end = 0; + const size_t num_rows = row_idxs.size(); + for (; start < num_rows;) { + auto count = end + 1 - start; + if (end == num_rows - 1 || (row_idxs[end + 1] - row_idxs[start]) != count) { + _mem_table->insert(block, row_idxs[start], count); + start += count; + end = start; + } else { + end++; + } + } + + if (_mem_table->memory_usage() >= config::write_buffer_size) { + RETURN_NOT_OK(_flush_memtable_async()); + _reset_mem_table(); + } + + return Status::OK(); +} + Status DeltaWriter::_flush_memtable_async() { if (++_segment_counter > config::max_segment_num_per_rowset) { return Status::OLAPInternalError(OLAP_ERR_TOO_MANY_SEGMENTS); @@ -252,7 +287,7 @@ Status DeltaWriter::wait_flush() { void DeltaWriter::_reset_mem_table() { _mem_table.reset(new MemTable(_tablet->tablet_id(), _schema.get(), _tablet_schema, _req.slots, _req.tuple_desc, _tablet->keys_type(), _rowset_writer.get(), - _mem_tracker)); + _mem_tracker, _is_vec)); } Status DeltaWriter::close() { diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index 62bad10fc19ebd..b937e4f9e31d0b 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -53,7 +53,7 @@ struct WriteRequest { // This class is NOT thread-safe, external synchronization is required. class DeltaWriter { public: - static Status open(WriteRequest* req, DeltaWriter** writer); + static Status open(WriteRequest* req, DeltaWriter** writer, bool is_vec = false); ~DeltaWriter(); @@ -61,6 +61,8 @@ class DeltaWriter { Status write(Tuple* tuple); Status write(const RowBatch* row_batch, const std::vector& row_idxs); + Status write(const vectorized::Block* block, const std::vector& row_idxs); + // flush the last memtable to flush queue, must call it before close_wait() Status close(); // wait for all memtables to be flushed. @@ -88,7 +90,7 @@ class DeltaWriter { int64_t tablet_id() { return _tablet->tablet_id(); } private: - DeltaWriter(WriteRequest* req, StorageEngine* storage_engine); + DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, bool is_vec); // push a full memtable to flush executor Status _flush_memtable_async(); @@ -97,13 +99,13 @@ class DeltaWriter { void _reset_mem_table(); -private: bool _is_init = false; bool _is_cancelled = false; WriteRequest _req; TabletSharedPtr _tablet; RowsetSharedPtr _cur_rowset; std::unique_ptr _rowset_writer; + // TODO: Recheck the lifttime of _mem_table, Look only should use unique_ptr std::shared_ptr _mem_table; std::unique_ptr _schema; const TabletSchema* _tablet_schema; @@ -117,6 +119,9 @@ class DeltaWriter { int64_t _segment_counter = 0; std::mutex _lock; + + // use in vectorized load + bool _is_vec; }; } // namespace doris diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp index 9ff9cc3b48d581..993f0582ace59d 100644 --- a/be/src/olap/memtable.cpp +++ b/be/src/olap/memtable.cpp @@ -19,20 +19,22 @@ #include "common/logging.h" #include "olap/row.h" -#include "olap/row_cursor.h" #include "olap/rowset/column_data_writer.h" #include "olap/rowset/rowset_writer.h" #include "olap/schema.h" #include "runtime/tuple.h" -#include "util/debug_util.h" #include "util/doris_metrics.h" +#include "vec/core/field.h" +#include "vec/aggregate_functions/aggregate_function_simple_factory.h" +#include "vec/aggregate_functions/aggregate_function_reader.h" namespace doris { MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet_schema, const std::vector* slot_descs, TupleDescriptor* tuple_desc, KeysType keys_type, RowsetWriter* rowset_writer, - const std::shared_ptr& parent_tracker) + const std::shared_ptr& parent_tracker, + bool support_vec) : _tablet_id(tablet_id), _schema(schema), _tablet_schema(tablet_schema), @@ -42,19 +44,53 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet _buffer_mem_pool(new MemPool(_mem_tracker.get())), _table_mem_pool(new MemPool(_mem_tracker.get())), _schema_size(_schema->schema_size()), - _rowset_writer(rowset_writer) { - if (tablet_schema->sort_type() == SortType::ZORDER) { - _row_comparator = - std::make_shared(_schema, tablet_schema->sort_col_num()); - } else { - _row_comparator = std::make_shared(_schema); + _rowset_writer(rowset_writer), + _is_first_insertion(true), + _agg_functions(schema->num_columns()), + _mem_usage(0){ + if (support_vec) { + _skip_list = nullptr; + _vec_row_comparator = std::make_shared(_schema); + // TODO: Support ZOrderComparator in the future + _vec_skip_list = new VecTable(_vec_row_comparator.get(), _table_mem_pool.get(), + _keys_type == KeysType::DUP_KEYS); + }else{ + _vec_skip_list = nullptr; + if (tablet_schema->sort_type() == SortType::ZORDER) { + _row_comparator = + std::make_shared(_schema, tablet_schema->sort_col_num()); + } else { + _row_comparator = std::make_shared(_schema); + } + _skip_list = new Table(_row_comparator.get(), _table_mem_pool.get(), + _keys_type == KeysType::DUP_KEYS); + } +} + +void MemTable::_init_agg_functions(const vectorized::Block* block) { + for (uint32_t cid = _schema->num_key_columns(); cid < _schema->num_columns(); ++cid) { + FieldAggregationMethod agg_method = _tablet_schema->column(cid).aggregation(); + std::string agg_name = + TabletColumn::get_string_by_aggregation_type(agg_method) + vectorized::AGG_LOAD_SUFFIX; + std::transform(agg_name.begin(), agg_name.end(), agg_name.begin(), [](unsigned char c) { return std::tolower(c); }); + + // create aggregate function + vectorized::DataTypes argument_types{block->get_data_type(cid)}; + vectorized::AggregateFunctionPtr function = vectorized::AggregateFunctionSimpleFactory::instance().get( + agg_name, argument_types, {}, argument_types.back()->is_nullable()); + + DCHECK(function != nullptr); + _agg_functions[cid] = function; } - _skip_list = new Table(_row_comparator.get(), _table_mem_pool.get(), - _keys_type == KeysType::DUP_KEYS); } MemTable::~MemTable() { delete _skip_list; + delete _vec_skip_list; + + std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(), + std::default_delete()); + _mem_tracker->release(_mem_usage); } MemTable::RowCursorComparator::RowCursorComparator(const Schema* schema) : _schema(schema) {} @@ -65,6 +101,62 @@ int MemTable::RowCursorComparator::operator()(const char* left, const char* righ return compare_row(lhs_row, rhs_row); } +int MemTable::RowInBlockComparator::operator()(const RowInBlock* left, const RowInBlock* right) const{ + return _pblock->compare_at(left->_row_pos, right->_row_pos, + _schema->num_key_columns(), + *_pblock, -1); +} + +void MemTable::insert(const vectorized::Block* block, size_t row_pos, size_t num_rows) { + if (_is_first_insertion) { + _is_first_insertion = false; + auto cloneBlock = block->clone_without_columns(); + _input_mutable_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock); + _vec_row_comparator->set_block(&_input_mutable_block); + _output_mutable_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock); + if (_keys_type != KeysType::DUP_KEYS){ + _init_agg_functions(block); + } + } + size_t cursor_in_mutableblock = _input_mutable_block.rows(); + size_t oldsize = _input_mutable_block.allocated_bytes(); + _input_mutable_block.add_rows(block, row_pos, num_rows); + size_t newsize = _input_mutable_block.allocated_bytes(); + _mem_usage += newsize - oldsize; + _mem_tracker->consume(newsize - oldsize); + + for(int i = 0; i < num_rows; i++){ + _row_in_blocks.emplace_back(new RowInBlock{cursor_in_mutableblock + i}); + _insert_one_row_from_block(_row_in_blocks.back()); + } +} + +void MemTable::_insert_one_row_from_block(RowInBlock* row_in_block) { + _rows++; + bool overwritten = false; + if (_keys_type == KeysType::DUP_KEYS) { + // TODO: dup keys only need sort opertaion. Rethink skiplist is the beat way to sort columns? + _vec_skip_list->Insert(row_in_block, &overwritten); + DCHECK(!overwritten) << "Duplicate key model meet overwrite in SkipList"; + return; + } + + bool is_exist = _vec_skip_list->Find(row_in_block, &_vec_hint); + if (is_exist){ + _aggregate_two_row_in_block(row_in_block, _vec_hint.curr->key); + } else { + row_in_block->init_agg_places(_agg_functions, _schema->num_key_columns()); + for (auto cid = _schema->num_key_columns(); cid < _schema->num_columns(); cid++){ + auto col_ptr = _input_mutable_block.mutable_columns()[cid].get(); + auto place = row_in_block->_agg_places[cid]; + _agg_functions[cid]->add(place, const_cast(&col_ptr), + row_in_block->_row_pos, nullptr); + } + + _vec_skip_list->InsertWithHint(row_in_block, is_exist, &_vec_hint); + } +} + void MemTable::insert(const Tuple* tuple) { _rows++; bool overwritten = false; @@ -124,12 +216,65 @@ void MemTable::_aggregate_two_row(const ContiguousRow& src_row, TableKey row_in_ } } +void MemTable::_aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_in_skiplist){ + if (_tablet_schema->has_sequence_col()) { + auto sequence_idx = _tablet_schema->sequence_col_idx(); + auto res = _input_mutable_block.compare_at(row_in_skiplist->_row_pos, new_row->_row_pos, sequence_idx, _input_mutable_block, -1); + // dst sequence column larger than src, don't need to update + if (res > 0){ + return; + } + } + // dst is non-sequence row, or dst sequence is smaller + for (uint32_t cid = _schema->num_key_columns(); cid < _schema->num_columns(); ++cid) { + auto place = row_in_skiplist->_agg_places[cid]; + auto col_ptr = _input_mutable_block.mutable_columns()[cid].get(); + _agg_functions[cid]->add(place, const_cast(&col_ptr), + new_row->_row_pos, nullptr); + } + +} +vectorized::Block MemTable::_collect_vskiplist_results() { + VecTable::Iterator it(_vec_skip_list); + vectorized::Block in_block = _input_mutable_block.to_block(); + // TODO: should try to insert data by column, not by row. to opt the the code + if (_keys_type == KeysType::DUP_KEYS){ + for (it.SeekToFirst(); it.Valid(); it.Next()) { + _output_mutable_block.add_row(&in_block, it.key()->_row_pos); + } + } else { + for (it.SeekToFirst(); it.Valid(); it.Next()) { + auto& block_data = in_block.get_columns_with_type_and_name(); + // move key columns + for (size_t i = 0; i < _schema->num_key_columns(); ++i) { + _output_mutable_block.get_column_by_position(i)->insert_from(*block_data[i].column.get(), it.key()->_row_pos); + } + // get value columns from agg_places + for (size_t i = _schema->num_key_columns(); i < _schema->num_columns(); ++i) { + auto function = _agg_functions[i]; + function->insert_result_into(it.key()->_agg_places[i] , *(_output_mutable_block.get_column_by_position(i))); + function->destroy(it.key()->_agg_places[i]); + } + } + } + return _output_mutable_block.to_block(); +} + Status MemTable::flush() { VLOG_CRITICAL << "begin to flush memtable for tablet: " << _tablet_id << ", memsize: " << memory_usage() << ", rows: " << _rows; int64_t duration_ns = 0; - { - SCOPED_RAW_TIMER(&duration_ns); + RETURN_NOT_OK(_do_flush(duration_ns)); + DorisMetrics::instance()->memtable_flush_total->increment(1); + DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000); + VLOG_CRITICAL << "after flush memtable for tablet: " << _tablet_id + << ", flushsize: " << _flush_size; + return Status::OK(); +} + +Status MemTable::_do_flush(int64_t& duration_ns) { + SCOPED_RAW_TIMER(&duration_ns); + if (_skip_list) { Status st = _rowset_writer->flush_single_memtable(this, &_flush_size); if (st == Status::OLAPInternalError(OLAP_ERR_FUNC_NOT_IMPLEMENTED)) { // For alpha rowset, we do not implement "flush_single_memtable". @@ -145,11 +290,12 @@ Status MemTable::flush() { } else { RETURN_NOT_OK(st); } + } else { + vectorized::Block block = _collect_vskiplist_results(); + RETURN_NOT_OK(_rowset_writer->add_block(&block)); + _flush_size = block.allocated_bytes(); + RETURN_NOT_OK(_rowset_writer->flush()); } - DorisMetrics::instance()->memtable_flush_total->increment(1); - DorisMetrics::instance()->memtable_flush_duration_us->increment(duration_ns / 1000); - VLOG_CRITICAL << "after flush memtable for tablet: " << _tablet_id - << ", flushsize: " << _flush_size; return Status::OK(); } diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h index 3fad1ac0734b7b..b034368f63873b 100644 --- a/be/src/olap/memtable.h +++ b/be/src/olap/memtable.h @@ -24,6 +24,9 @@ #include "olap/skiplist.h" #include "runtime/mem_tracker.h" #include "util/tuple_row_zorder_compare.h" +#include "vec/core/block.h" +#include "vec/common/string_ref.h" +#include "vec/aggregate_functions/aggregate_function.h" namespace doris { @@ -40,13 +43,18 @@ class MemTable { MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet_schema, const std::vector* slot_descs, TupleDescriptor* tuple_desc, KeysType keys_type, RowsetWriter* rowset_writer, - const std::shared_ptr& parent_tracker); + const std::shared_ptr& parent_tracker, + bool support_vec = false); ~MemTable(); int64_t tablet_id() const { return _tablet_id; } size_t memory_usage() const { return _mem_tracker->consumption(); } - std::shared_ptr mem_tracker() { return _mem_tracker; } + std::shared_ptr& mem_tracker() { return _mem_tracker; } + void insert(const Tuple* tuple); + // insert tuple from (row_pos) to (row_pos+num_rows) + void insert(const vectorized::Block* block, size_t row_pos, size_t num_rows); + /// Flush Status flush(); Status close(); @@ -54,18 +62,62 @@ class MemTable { int64_t flush_size() const { return _flush_size; } private: + Status _do_flush(int64_t& duration_ns); + class RowCursorComparator : public RowComparator { public: RowCursorComparator(const Schema* schema); - virtual int operator()(const char* left, const char* right) const; + int operator()(const char* left, const char* right) const; + + private: + const Schema* _schema; + }; + + // row pos in _input_mutable_block + struct RowInBlock { + size_t _row_pos; + std::vector _agg_places; + explicit RowInBlock(size_t i) : _row_pos(i) {} + + void init_agg_places(std::vector& agg_functions, + int key_column_count) { + _agg_places.resize(agg_functions.size()); + for(int cid = 0; cid < agg_functions.size(); cid++) { + if (cid < key_column_count) { + _agg_places[cid] = nullptr; + } else { + auto function = agg_functions[cid]; + size_t place_size = function->size_of_data(); + _agg_places[cid] = new char[place_size]; + function->create(_agg_places[cid]); + } + } + } + + ~RowInBlock() { + for (auto agg_place : _agg_places) { + delete [] agg_place; + } + } + }; + class RowInBlockComparator { + public: + RowInBlockComparator(const Schema* schema) : _schema(schema) {}; + // call set_block before operator(). + // only first time insert block to create _input_mutable_block, + // so can not Comparator of construct to set pblock + void set_block(vectorized::MutableBlock* pblock) {_pblock = pblock;} + int operator()(const RowInBlock* left, const RowInBlock* right) const; private: const Schema* _schema; + vectorized::MutableBlock* _pblock;// 对应Memtable::_input_mutable_block }; private: typedef SkipList Table; typedef Table::key_type TableKey; + typedef SkipList VecTable; public: /// The iterator of memtable, so that the data in this memtable @@ -73,7 +125,7 @@ class MemTable { class Iterator { public: Iterator(MemTable* mem_table); - ~Iterator() {} + ~Iterator() = default; void seek_to_first(); bool valid(); @@ -85,9 +137,13 @@ class MemTable { Table::Iterator _it; }; + private: void _tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* mem_pool); void _aggregate_two_row(const ContiguousRow& new_row, TableKey row_in_skiplist); + // for vectorized + void _insert_one_row_from_block(RowInBlock* row_in_block); + void _aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_in_skiplist); int64_t _tablet_id; Schema* _schema; @@ -96,7 +152,11 @@ class MemTable { const std::vector* _slot_descs; KeysType _keys_type; + // TODO: change to unique_ptr of comparator std::shared_ptr _row_comparator; + + std::shared_ptr _vec_row_comparator; + std::shared_ptr _mem_tracker; // This is a buffer, to hold the memory referenced by the rows that have not // been inserted into the SkipList @@ -115,6 +175,9 @@ class MemTable { Table* _skip_list; Table::Hint _hint; + VecTable* _vec_skip_list; + VecTable::Hint _vec_hint; + RowsetWriter* _rowset_writer; // the data size flushed on disk of this memtable @@ -124,8 +187,19 @@ class MemTable { // in unique or aggragate key model. int64_t _rows = 0; + //for vectorized + vectorized::MutableBlock _input_mutable_block; + vectorized::MutableBlock _output_mutable_block; + vectorized::Block _collect_vskiplist_results(); + bool _is_first_insertion; + + void _init_agg_functions(const vectorized::Block* block); + std::vector _agg_functions; + std::vector _row_in_blocks; + size_t _mem_usage; }; // class MemTable + inline std::ostream& operator<<(std::ostream& os, const MemTable& table) { os << "MemTable(addr=" << &table << ", tablet=" << table.tablet_id() << ", mem=" << table.memory_usage(); diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp index e28a3b4a4ed317..b7d50babb6eccc 100644 --- a/be/src/olap/merger.cpp +++ b/be/src/olap/merger.cpp @@ -22,6 +22,7 @@ #include "olap/olap_define.h" #include "olap/tuple_reader.h" +#include "vec/olap/block_reader.h" #include "olap/row_cursor.h" #include "olap/tablet.h" #include "util/trace.h" @@ -87,4 +88,53 @@ Status Merger::merge_rowsets(TabletSharedPtr tablet, ReaderType reader_type, return Status::OK(); } +Status Merger::vmerge_rowsets(TabletSharedPtr tablet, ReaderType reader_type, + const std::vector& src_rowset_readers, + RowsetWriter* dst_rowset_writer, Statistics* stats_output) { + TRACE_COUNTER_SCOPE_LATENCY_US("merge_rowsets_latency_us"); + + vectorized::BlockReader reader; + TabletReader::ReaderParams reader_params; + reader_params.tablet = tablet; + reader_params.reader_type = reader_type; + reader_params.rs_readers = src_rowset_readers; + reader_params.version = dst_rowset_writer->version(); + + const auto& schema = tablet->tablet_schema(); + reader_params.return_columns.resize(schema.num_columns()); + std::iota(reader_params.return_columns.begin(), reader_params.return_columns.end(), 0); + reader_params.origin_return_columns = &reader_params.return_columns; + RETURN_NOT_OK(reader.init(reader_params)); + + vectorized::Block block = schema.create_block(reader_params.return_columns); + size_t output_rows = 0; + while (true) { + bool eof = false; + // Read one block from block reader + RETURN_NOT_OK_LOG( + reader.next_block_with_aggregation(&block, nullptr, nullptr, &eof), + "failed to read next block when merging rowsets of tablet " + tablet->full_name()); + if (eof) { + break; + } + RETURN_NOT_OK_LOG( + dst_rowset_writer->add_block(&block), + "failed to write block when merging rowsets of tablet " + tablet->full_name()); + output_rows += block.rows(); + block.clear_column_data(); + } + + if (stats_output != nullptr) { + stats_output->output_rows = output_rows; + stats_output->merged_rows = reader.merged_rows(); + stats_output->filtered_rows = reader.filtered_rows(); + } + + RETURN_NOT_OK_LOG( + dst_rowset_writer->flush(), + "failed to flush rowset when merging rowsets of tablet " + tablet->full_name()); + + return Status::OK(); +} + } // namespace doris diff --git a/be/src/olap/merger.h b/be/src/olap/merger.h index efd46271e1f7b4..e8ae558505e133 100644 --- a/be/src/olap/merger.h +++ b/be/src/olap/merger.h @@ -38,6 +38,10 @@ class Merger { static Status merge_rowsets(TabletSharedPtr tablet, ReaderType reader_type, const std::vector& src_rowset_readers, RowsetWriter* dst_rowset_writer, Statistics* stats_output); + + static Status vmerge_rowsets(TabletSharedPtr tablet, ReaderType reader_type, + const std::vector& src_rowset_readers, + RowsetWriter* dst_rowset_writer, Statistics* stats_output); }; } // namespace doris diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h index d24695607b5120..3e1559ad16b2d7 100644 --- a/be/src/olap/olap_define.h +++ b/be/src/olap/olap_define.h @@ -56,6 +56,9 @@ static const uint16_t OLAP_VARCHAR_MAX_LENGTH = 65535; // the max length supported for string type 2GB static const uint32_t OLAP_STRING_MAX_LENGTH = 2147483647; +// the max length supported for vec string type 1MB +static constexpr size_t MAX_SIZE_OF_VEC_STRING = 1024 * 1024; + // the max length supported for array static const uint16_t OLAP_ARRAY_MAX_LENGTH = 65535; diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index 05a1104c777855..90b47bce637bfe 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -310,6 +310,17 @@ Status TabletReader::_init_return_columns(const ReaderParams& read_params) { } } VLOG_NOTICE << "return column is empty, using full column as default."; + } else if ((read_params.reader_type == READER_CUMULATIVE_COMPACTION || + read_params.reader_type == READER_BASE_COMPACTION) && + !read_params.return_columns.empty()) { + _return_columns = read_params.return_columns; + for (auto id : read_params.return_columns) { + if (_tablet->tablet_schema().column(id).is_key()) { + _key_cids.push_back(id); + } else { + _value_cids.push_back(id); + } + } } else if (read_params.reader_type == READER_CHECKSUM) { _return_columns = read_params.return_columns; for (auto id : read_params.return_columns) { diff --git a/be/src/olap/row_block2.cpp b/be/src/olap/row_block2.cpp index 83bb249566eb2b..d7bf6783e883c1 100644 --- a/be/src/olap/row_block2.cpp +++ b/be/src/olap/row_block2.cpp @@ -355,8 +355,6 @@ Status RowBlockV2::_copy_data_to_column(int cid, Status RowBlockV2::_append_data_to_column(const ColumnVectorBatch* batch, size_t start, uint32_t len, doris::vectorized::MutableColumnPtr& origin_column) { - constexpr auto MAX_SIZE_OF_VEC_STRING = 1024l * 1024; - auto* column = origin_column.get(); uint32_t selected_size = len; bool nullable_mark_array[selected_size]; diff --git a/be/src/olap/row_cursor_cell.h b/be/src/olap/row_cursor_cell.h index ffe78b030d5acd..10ef938fce1857 100644 --- a/be/src/olap/row_cursor_cell.h +++ b/be/src/olap/row_cursor_cell.h @@ -33,4 +33,4 @@ struct RowCursorCell { void* _ptr; }; -} // namespace doris +} // namespace doris \ No newline at end of file diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 43b7c610041a2a..16ee1be6dcfe23 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -38,10 +38,6 @@ namespace doris { -// TODO(lingbin): Should be a conf that can be dynamically adjusted, or a member in the context -const uint32_t MAX_SEGMENT_SIZE = static_cast(OLAP_MAX_COLUMN_SEGMENT_FILE_SIZE * - OLAP_COLUMN_FILE_SEGMENT_SIZE_SCALE); - BetaRowsetWriter::BetaRowsetWriter() : _rowset_meta(nullptr), _num_segment(0), @@ -99,6 +95,41 @@ Status BetaRowsetWriter::init(const RowsetWriterContext& rowset_writer_context) return Status::OK(); } +Status BetaRowsetWriter::add_block(const vectorized::Block* block) { + if (block->rows() == 0) { + return Status::OK(); + } + if (UNLIKELY(_segment_writer == nullptr)) { + RETURN_NOT_OK(_create_segment_writer(&_segment_writer)); + } + size_t block_size_in_bytes = block->bytes(); + size_t block_row_num = block->rows(); + size_t row_avg_size_in_bytes = std::max((size_t)1, block_size_in_bytes / block_row_num); + size_t row_offset = 0; + + do { + auto max_row_add = _segment_writer->max_row_to_add(row_avg_size_in_bytes); + if (UNLIKELY(max_row_add < 1)) { + // no space for another signle row, need flush now + RETURN_NOT_OK(_flush_segment_writer(&_segment_writer)); + RETURN_NOT_OK(_create_segment_writer(&_segment_writer)); + max_row_add = _segment_writer->max_row_to_add(row_avg_size_in_bytes); + DCHECK(max_row_add > 0); + } + + size_t input_row_num = std::min(block_row_num - row_offset, size_t(max_row_add)); + auto s = _segment_writer->append_block(block, row_offset, input_row_num); + if (UNLIKELY(!s.ok())) { + LOG(WARNING) << "failed to append block: " << s.to_string(); + return Status::OLAPInternalError(OLAP_ERR_WRITER_DATA_WRITE_ERROR); + } + row_offset += input_row_num; + } while (row_offset < block_row_num); + + _num_rows_written += block_row_num; + return Status::OK(); +} + template Status BetaRowsetWriter::_add_row(const RowType& row) { if (PREDICT_FALSE(_segment_writer == nullptr)) { @@ -270,7 +301,7 @@ Status BetaRowsetWriter::_create_segment_writer(std::unique_ptrreset(new segment_v2::SegmentWriter(wblock.get(), _num_segment, _context.tablet_schema, - _context.data_dir, writer_options)); + _context.data_dir, _context.max_rows_per_segment, writer_options)); { std::lock_guard l(_lock); _wblocks.push_back(std::move(wblock)); diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index 8f9b54b51e890a..0bce1af5ccd90a 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -43,6 +43,8 @@ class BetaRowsetWriter : public RowsetWriter { // For Memtable::flush() Status add_row(const ContiguousRow& row) override { return _add_row(row); } + Status add_block(const vectorized::Block* block) override; + // add rowset by create hard link Status add_rowset(RowsetSharedPtr rowset) override; diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h index 6fb290e3dc7a0b..d23a4777140e06 100644 --- a/be/src/olap/rowset/rowset_writer.h +++ b/be/src/olap/rowset/rowset_writer.h @@ -24,6 +24,7 @@ #include "olap/column_mapping.h" #include "olap/rowset/rowset.h" #include "olap/rowset/rowset_writer_context.h" +#include "vec/core/block.h" namespace doris { @@ -43,6 +44,10 @@ class RowsetWriter { virtual Status add_row(const RowCursor& row) = 0; virtual Status add_row(const ContiguousRow& row) = 0; + virtual Status add_block(const vectorized::Block* block) { + return Status::OLAPInternalError(OLAP_ERR_FUNC_NOT_IMPLEMENTED); + } + // Precondition: the input `rowset` should have the same type of the rowset we're building virtual Status add_rowset(RowsetSharedPtr rowset) = 0; @@ -59,7 +64,7 @@ class RowsetWriter { virtual Status flush_single_memtable(MemTable* memtable, int64_t* flush_size) { return Status::OLAPInternalError(OLAP_ERR_FUNC_NOT_IMPLEMENTED); } - + // finish building and return pointer to the built rowset (guaranteed to be inited). // return nullptr when failed virtual RowsetSharedPtr build() = 0; diff --git a/be/src/olap/rowset/segment_v2/column_writer.cpp b/be/src/olap/rowset/segment_v2/column_writer.cpp index 234c182b029185..87770acdb12ba3 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.cpp +++ b/be/src/olap/rowset/segment_v2/column_writer.cpp @@ -191,6 +191,27 @@ Status ColumnWriter::append_nullable(const uint8_t* is_null_bits, const void* da return Status::OK(); } +Status ColumnWriter::append(const uint8_t* nullmap, const void* data, size_t num_rows) { + assert(data && num_rows > 0); + if (nullmap) { + size_t bitmap_size = BitmapSize(num_rows); + if (_null_bitmap.size() < bitmap_size) { + _null_bitmap.resize(bitmap_size); + } + uint8_t* bitmap_data = _null_bitmap.data(); + memset(bitmap_data, 0, bitmap_size); + for (size_t i = 0; i < num_rows; ++i) { + if (nullmap[i]) { + BitmapSet(bitmap_data, i); + } + } + return append_nullable(bitmap_data, data, num_rows); + } else { + const uint8_t* ptr = (const uint8_t*)data; + return append_data(&ptr, num_rows); + } +} + /////////////////////////////////////////////////////////////////////////////////// ScalarColumnWriter::ScalarColumnWriter(const ColumnWriterOptions& opts, diff --git a/be/src/olap/rowset/segment_v2/column_writer.h b/be/src/olap/rowset/segment_v2/column_writer.h index c5daddc6ceb59a..8b7cb60b32525e 100644 --- a/be/src/olap/rowset/segment_v2/column_writer.h +++ b/be/src/olap/rowset/segment_v2/column_writer.h @@ -101,6 +101,8 @@ class ColumnWriter { return append_nullable(&nullmap, data, 1); } + Status append(const uint8_t* nullmap, const void* data, size_t num_rows); + Status append_nullable(const uint8_t* nullmap, const void* data, size_t num_rows); virtual Status append_nulls(size_t num_rows) = 0; @@ -141,6 +143,10 @@ class ColumnWriter { private: std::unique_ptr _field; bool _is_nullable; + std::vector _null_bitmap; + +protected: + std::shared_ptr _mem_tracker; }; class FlushPageCallback { diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index a0c3ebb80fbb80..5eebb30a886970 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -38,16 +38,24 @@ const char* k_segment_magic = "D0R1"; const uint32_t k_segment_magic_length = 4; SegmentWriter::SegmentWriter(fs::WritableBlock* wblock, uint32_t segment_id, - const TabletSchema* tablet_schema, - DataDir* data_dir, const SegmentWriterOptions& opts) + const TabletSchema* tablet_schema, DataDir* data_dir, + uint32_t max_row_per_segment, const SegmentWriterOptions& opts) : _segment_id(segment_id), _tablet_schema(tablet_schema), _data_dir(data_dir), + _max_row_per_segment(max_row_per_segment), _opts(opts), _wblock(wblock), _mem_tracker( - MemTracker::create_virtual_tracker(-1, "SegmentWriter:Segment-" + std::to_string(segment_id))) { + MemTracker::create_virtual_tracker(-1, "SegmentWriter:Segment-" + std::to_string(segment_id))), + _olap_data_convertor(tablet_schema) { CHECK_NOTNULL(_wblock); + size_t num_short_key_column = _tablet_schema->num_short_key_columns(); + for (size_t cid = 0; cid < num_short_key_column; ++cid) { + const auto& column = _tablet_schema->column(cid); + _short_key_coders.push_back(get_key_coder(column.type())); + _short_key_index_size.push_back(column.index_length()); + } } SegmentWriter::~SegmentWriter() { @@ -102,6 +110,86 @@ Status SegmentWriter::init(uint32_t write_mbytes_per_sec __attribute__((unused)) return Status::OK(); } +Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_pos, + size_t num_rows) { + assert(block && num_rows > 0 && row_pos + num_rows <= block->rows() && + block->columns() == _column_writers.size()); + _olap_data_convertor.set_source_content(block, row_pos, num_rows); + + // find all row pos for short key indexes + std::vector short_key_pos; + if (UNLIKELY(_short_key_row_pos == 0)) { + short_key_pos.push_back(0); + } + while (_short_key_row_pos + _opts.num_rows_per_block < _row_count + num_rows) { + _short_key_row_pos += _opts.num_rows_per_block; + short_key_pos.push_back(_short_key_row_pos - _row_count); + } + + // convert column data from engine format to storage layer format + std::vector short_key_columns; + size_t num_key_columns = _tablet_schema->num_short_key_columns(); + for (size_t cid = 0; cid < _column_writers.size(); ++cid) { + auto converted_result = _olap_data_convertor.convert_column_data(cid); + if (converted_result.first != Status::OK()) { + return converted_result.first; + } + if (cid < num_key_columns) { + short_key_columns.push_back(converted_result.second); + } + _column_writers[cid]->append(converted_result.second->get_nullmap(), + converted_result.second->get_data(), num_rows); + } + + // create short key indexes + std::vector key_column_fields; + for (const auto pos : short_key_pos) { + for (const auto& column : short_key_columns) { + key_column_fields.push_back(column->get_data_at(pos)); + } + std::string encoded_key = encode_short_keys(key_column_fields); + RETURN_IF_ERROR(_index_builder->add_item(encoded_key)); + key_column_fields.clear(); + } + + _row_count += num_rows; + _olap_data_convertor.clear_source_content(); + return Status::OK(); +} + +int64_t SegmentWriter::max_row_to_add(size_t row_avg_size_in_bytes) { + int64_t size_rows = ((int64_t)MAX_SEGMENT_SIZE - (int64_t)estimate_segment_size()) / row_avg_size_in_bytes; + int64_t count_rows = (int64_t)_max_row_per_segment - _row_count; + + return std::min(size_rows, count_rows); +} + + +std::string SegmentWriter::encode_short_keys( + const std::vector key_column_fields, bool null_first) { + size_t num_key_columns = _tablet_schema->num_short_key_columns(); + assert(key_column_fields.size() == num_key_columns && + _short_key_coders.size() == num_key_columns && + _short_key_index_size.size() == num_key_columns); + + std::string encoded_keys; + for (size_t cid = 0; cid < num_key_columns; ++cid) { + auto field = key_column_fields[cid]; + if (UNLIKELY(!field)) { + if (null_first) { + encoded_keys.push_back(KEY_NULL_FIRST_MARKER); + } else { + encoded_keys.push_back(KEY_NULL_LAST_MARKER); + } + continue; + } + encoded_keys.push_back(KEY_NORMAL_MARKER); + _short_key_coders[cid]->encode_ascending(field, _short_key_index_size[cid], + &encoded_keys); + } + return encoded_keys; +} + template Status SegmentWriter::append_row(const RowType& row) { for (size_t cid = 0; cid < _column_writers.size(); ++cid) { diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h index baa91198d05b0f..cc047e19c96f26 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.h +++ b/be/src/olap/rowset/segment_v2/segment_writer.h @@ -25,9 +25,14 @@ #include "common/status.h" // Status #include "gen_cpp/segment_v2.pb.h" #include "gutil/macros.h" +#include "vec/core/block.h" +#include "vec/olap/olap_data_convertor.h" namespace doris { +// TODO(lingbin): Should be a conf that can be dynamically adjusted, or a member in the context +const uint32_t MAX_SEGMENT_SIZE = static_cast(OLAP_MAX_COLUMN_SEGMENT_FILE_SIZE * + OLAP_COLUMN_FILE_SEGMENT_SIZE_SCALE); class DataDir; class MemTracker; class RowBlock; @@ -35,6 +40,7 @@ class RowCursor; class TabletSchema; class TabletColumn; class ShortKeyIndexBuilder; +class KeyCoder; namespace fs { class WritableBlock; @@ -55,7 +61,7 @@ class SegmentWriter { public: explicit SegmentWriter(fs::WritableBlock* block, uint32_t segment_id, const TabletSchema* tablet_schema, - DataDir* data_dir, + DataDir* data_dir, uint32_t max_row_per_segment, const SegmentWriterOptions& opts); ~SegmentWriter(); @@ -64,6 +70,10 @@ class SegmentWriter { template Status append_row(const RowType& row); + Status append_block(const vectorized::Block* block, size_t row_pos, size_t num_rows); + + int64_t max_row_to_add(size_t row_avg_size_in_bytes); + uint64_t estimate_segment_size(); uint32_t num_rows_written() { return _row_count; } @@ -83,10 +93,14 @@ class SegmentWriter { Status _write_footer(); Status _write_raw_data(const std::vector& slices); + std::string encode_short_keys(const std::vector key_column_fields, + bool null_first = true); + private: uint32_t _segment_id; const TabletSchema* _tablet_schema; DataDir* _data_dir; + uint32_t _max_row_per_segment; SegmentWriterOptions _opts; // Not owned. owned by RowsetWriter @@ -97,6 +111,11 @@ class SegmentWriter { std::vector> _column_writers; std::shared_ptr _mem_tracker; uint32_t _row_count = 0; + + vectorized::OlapBlockDataConvertor _olap_data_convertor; + std::vector< const KeyCoder* > _short_key_coders; + std::vector< uint16_t > _short_key_index_size; + size_t _short_key_row_pos = 0; }; } // namespace segment_v2 diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index 766e00919d0520..eb8756c81aa5af 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -25,9 +25,9 @@ namespace doris { LoadChannel::LoadChannel(const UniqueId& load_id, int64_t mem_limit, int64_t timeout_s, - bool is_high_priority, const std::string& sender_ip) + bool is_high_priority, const std::string& sender_ip, bool is_vec) : _load_id(load_id), _timeout_s(timeout_s), _is_high_priority(is_high_priority), - _sender_ip(sender_ip) { + _sender_ip(sender_ip), _is_vec(is_vec) { _mem_tracker = MemTracker::create_tracker( mem_limit, "LoadChannel:" + _load_id.to_string(), nullptr, MemTrackerLevel::TASK); // _last_updated_time should be set before being inserted to @@ -39,7 +39,7 @@ LoadChannel::LoadChannel(const UniqueId& load_id, int64_t mem_limit, int64_t tim LoadChannel::~LoadChannel() { LOG(INFO) << "load channel removed. mem peak usage=" << _mem_tracker->peak_consumption() << ", info=" << _mem_tracker->debug_string() << ", load_id=" << _load_id - << ", is high priority=" << _is_high_priority << ", sender_ip=" << _sender_ip; + << ", is high priority=" << _is_high_priority << ", sender_ip=" << _sender_ip << ", is_vec=" << _is_vec; } Status LoadChannel::open(const PTabletWriterOpenRequest& params) { @@ -54,7 +54,7 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& params) { } else { // create a new tablets channel TabletsChannelKey key(params.id(), index_id); - channel.reset(new TabletsChannel(key, _is_high_priority)); + channel.reset(new TabletsChannel(key, _is_high_priority, _is_vec)); _tablets_channels.insert({index_id, channel}); } } @@ -66,50 +66,23 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& params) { return Status::OK(); } -Status LoadChannel::add_batch(const PTabletWriterAddBatchRequest& request, - PTabletWriterAddBatchResult* response) { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); - int64_t index_id = request.index_id(); - // 1. get tablets channel - std::shared_ptr channel; - { - std::lock_guard l(_lock); - auto it = _tablets_channels.find(index_id); - if (it == _tablets_channels.end()) { - if (_finished_channel_ids.find(index_id) != _finished_channel_ids.end()) { - // this channel is already finished, just return OK - return Status::OK(); - } - std::stringstream ss; - ss << "load channel " << _load_id << " add batch with unknown index id: " << index_id; - return Status::InternalError(ss.str()); +Status LoadChannel::_get_tablets_channel(std::shared_ptr& channel, bool& is_finished, const int64_t index_id) { + std::lock_guard l(_lock); + auto it = _tablets_channels.find(index_id); + if (it == _tablets_channels.end()) { + if (_finished_channel_ids.find(index_id) != _finished_channel_ids.end()) { + // this channel is already finished, just return OK + is_finished = true; + return Status::OK(); } - channel = it->second; + std::stringstream ss; + ss << "load channel " << _load_id << " add batch with unknown index id: " << index_id; + return Status::InternalError(ss.str()); } - // 2. check if mem consumption exceed limit - handle_mem_exceed_limit(false); - - // 3. add batch to tablets channel - if (request.has_row_batch()) { - RETURN_IF_ERROR(channel->add_batch(request, response)); - } - - // 4. handle eos - Status st; - if (request.has_eos() && request.eos()) { - bool finished = false; - RETURN_IF_ERROR(channel->close(request.sender_id(), request.backend_id(), - &finished, request.partition_ids(), - response->mutable_tablet_vec())); - if (finished) { - std::lock_guard l(_lock); - _tablets_channels.erase(index_id); - _finished_channel_ids.emplace(index_id); - } - } - _last_updated_time.store(time(nullptr)); - return st; + is_finished = false; + channel = it->second; + return Status::OK(); } void LoadChannel::handle_mem_exceed_limit(bool force) { diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index ba0ad3033498c8..37ee8453c912e4 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -27,27 +27,29 @@ #include "gen_cpp/Types_types.h" #include "gen_cpp/internal_service.pb.h" #include "runtime/mem_tracker.h" +#include "runtime/tablets_channel.h" +#include "runtime/thread_context.h" #include "util/uid_util.h" namespace doris { class Cache; -class TabletsChannel; // A LoadChannel manages tablets channels for all indexes // corresponding to a certain load job class LoadChannel { public: LoadChannel(const UniqueId& load_id, int64_t mem_limit, int64_t timeout_s, - bool is_high_priority, const std::string& sender_ip); + bool is_high_priority, const std::string& sender_ip, bool is_vec); ~LoadChannel(); // open a new load channel if not exist Status open(const PTabletWriterOpenRequest& request); // this batch must belong to a index in one transaction - Status add_batch(const PTabletWriterAddBatchRequest& request, - PTabletWriterAddBatchResult* response); + template + Status add_batch(const TabletWriterAddRequest& request, + TabletWriterAddResult* response); // return true if this load channel has been opened and all tablets channels are closed then. bool is_finished(); @@ -70,6 +72,29 @@ class LoadChannel { bool is_high_priority() const { return _is_high_priority; } +protected: + Status _get_tablets_channel(std::shared_ptr& channel, + bool& is_finished, + const int64_t index_id); + + template + Status _handle_eos(std::shared_ptr& channel, + const Request& request, + Response* response) { + bool finished = false; + auto index_id = request.index_id(); + RETURN_IF_ERROR(channel->close(request.sender_id(), request.backend_id(), + &finished, request.partition_ids(), + response->mutable_tablet_vec())); + if (finished) { + std::lock_guard l(_lock); + _tablets_channels.erase(index_id); + _finished_channel_ids.emplace(index_id); + } + return Status::OK(); + } + + private: // when mem consumption exceeds limit, should call this method to find the channel // that consumes the largest memory(, and then we can reduce its memory usage). @@ -99,8 +124,49 @@ class LoadChannel { // the ip where tablet sink locate std::string _sender_ip = ""; + + // true if this load is vectorized + bool _is_vec = false; }; +template +Status LoadChannel::add_batch(const TabletWriterAddRequest& request, + TabletWriterAddResult* response) { + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + int64_t index_id = request.index_id(); + // 1. get tablets channel + std::shared_ptr channel; + bool is_finished; + Status st = _get_tablets_channel(channel, is_finished, index_id); + if (!st.ok() || is_finished) { + return st; + } + + // 2. check if mem consumption exceed limit + handle_mem_exceed_limit(false); + + // 3. add batch to tablets channel + if constexpr (std::is_same_v) { + if (request.has_row_batch()) { + RETURN_IF_ERROR(channel->add_batch(request, response)); + } + } else { + if (request.has_block()) { + RETURN_IF_ERROR(channel->add_batch(request, response)); + } + } + + // 4. handle eos + if (request.has_eos() && request.eos()) { + st = _handle_eos(channel, request, response); + if (!st.ok()) { + return st; + } + } + _last_updated_time.store(time(nullptr)); + return st; +} + inline std::ostream& operator<<(std::ostream& os, const LoadChannel& load_channel) { os << "LoadChannel(id=" << load_channel.load_id() << ", mem=" << load_channel.mem_consumption() << ", last_update_time=" << static_cast(load_channel.last_updated_time()) diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index 9de0736301b7db..35f7c3b82fc76b 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -18,7 +18,6 @@ #include "runtime/load_channel_mgr.h" #include "gutil/strings/substitute.h" -#include "olap/lru_cache.h" #include "runtime/load_channel.h" #include "runtime/mem_tracker.h" #include "runtime/thread_context.h" @@ -95,6 +94,11 @@ Status LoadChannelMgr::init(int64_t process_mem_limit) { return Status::OK(); } +LoadChannel* LoadChannelMgr::_create_load_channel(const UniqueId& load_id, int64_t mem_limit, int64_t timeout_s, + bool is_high_priority, const std::string& sender_ip, bool is_vec) { + return new LoadChannel(load_id, mem_limit, timeout_s, is_high_priority, sender_ip, is_vec); +} + Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) { SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); UniqueId load_id(params.id()); @@ -115,8 +119,8 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) { int64_t job_timeout_s = calc_job_timeout_s(timeout_in_req_s); bool is_high_priority = (params.has_is_high_priority() && params.is_high_priority()); - channel.reset(new LoadChannel(load_id, job_max_memory, job_timeout_s, is_high_priority, - params.sender_ip())); + channel.reset(_create_load_channel(load_id, job_max_memory, job_timeout_s, is_high_priority, + params.sender_ip(), params.is_vectorized())); _load_channels.insert({load_id, channel}); } } @@ -127,55 +131,16 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) { static void dummy_deleter(const CacheKey& key, void* value) {} -Status LoadChannelMgr::add_batch(const PTabletWriterAddBatchRequest& request, - PTabletWriterAddBatchResult* response) { - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); - UniqueId load_id(request.id()); - // 1. get load channel - std::shared_ptr channel; +void LoadChannelMgr::_finish_load_channel(const UniqueId load_id) { + VLOG_NOTICE << "removing load channel " << load_id << " because it's finished"; { std::lock_guard l(_lock); - auto it = _load_channels.find(load_id); - if (it == _load_channels.end()) { - auto handle = _last_success_channel->lookup(load_id.to_string()); - // success only when eos be true - if (handle != nullptr) { - _last_success_channel->release(handle); - if (request.has_eos() && request.eos()) { - return Status::OK(); - } - } - return Status::InternalError(strings::Substitute( - "fail to add batch in load channel. unknown load_id=$0", load_id.to_string())); - } - channel = it->second; + _load_channels.erase(load_id); + auto handle = + _last_success_channel->insert(load_id.to_string(), nullptr, 1, dummy_deleter); + _last_success_channel->release(handle); } - - if (!channel->is_high_priority()) { - // 2. check if mem consumption exceed limit - // If this is a high priority load task, do not handle this. - // because this may block for a while, which may lead to rpc timeout. - _handle_mem_exceed_limit(); - } - - // 3. add batch to load channel - // batch may not exist in request(eg: eos request without batch), - // this case will be handled in load channel's add batch method. - RETURN_IF_ERROR(channel->add_batch(request, response)); - - // 4. handle finish - if (channel->is_finished()) { - VLOG_NOTICE << "removing load channel " << load_id << " because it's finished"; - { - std::lock_guard l(_lock); - _load_channels.erase(load_id); - auto handle = - _last_success_channel->insert(load_id.to_string(), nullptr, 1, dummy_deleter); - _last_success_channel->release(handle); - } - VLOG_CRITICAL << "removed load channel " << load_id; - } - return Status::OK(); + VLOG_CRITICAL << "removed load channel " << load_id; } void LoadChannelMgr::_handle_mem_exceed_limit() { @@ -223,7 +188,7 @@ Status LoadChannelMgr::cancel(const PTabletWriterCancelRequest& params) { } } - if (cancelled_channel.get() != nullptr) { + if (cancelled_channel != nullptr) { cancelled_channel->cancel(); LOG(INFO) << "load channel has been cancelled: " << load_id; } diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h index 1da0ec75a5fc05..292f3776e8d28a 100644 --- a/be/src/runtime/load_channel_mgr.h +++ b/be/src/runtime/load_channel_mgr.h @@ -28,15 +28,17 @@ #include "gen_cpp/Types_types.h" #include "gen_cpp/internal_service.pb.h" #include "gutil/ref_counted.h" +#include "runtime/load_channel.h" #include "runtime/tablets_channel.h" +#include "runtime/thread_context.h" #include "util/countdown_latch.h" #include "util/thread.h" #include "util/uid_util.h" +#include "olap/lru_cache.h" namespace doris { class Cache; -class LoadChannel; // LoadChannelMgr -> LoadChannel -> TabletsChannel -> DeltaWriter // All dispatched load data for this backend is routed from this class @@ -50,20 +52,29 @@ class LoadChannelMgr { // open a new load channel if not exist Status open(const PTabletWriterOpenRequest& request); - Status add_batch(const PTabletWriterAddBatchRequest& request, - PTabletWriterAddBatchResult* response); + template + Status add_batch(const TabletWriterAddRequest& request, + TabletWriterAddResult* response); // cancel all tablet stream for 'load_id' load Status cancel(const PTabletWriterCancelRequest& request); private: + static LoadChannel* _create_load_channel(const UniqueId& load_id, int64_t mem_limit, int64_t timeout_s, + bool is_high_priority, const std::string& sender_ip, bool is_vec); + + template + Status _get_load_channel(std::shared_ptr& channel, bool& is_eof, + const UniqueId& load_id, const Request& request); + + void _finish_load_channel(UniqueId load_id); // check if the total load mem consumption exceeds limit. // If yes, it will pick a load channel to try to reduce memory consumption. void _handle_mem_exceed_limit(); Status _start_bg_worker(); -private: +protected: // lock protect the load channel map std::mutex _lock; // load id -> load channel @@ -79,4 +90,62 @@ class LoadChannelMgr { Status _start_load_channels_clean(); }; +template +Status LoadChannelMgr::_get_load_channel(std::shared_ptr& channel, + bool& is_eof, + const UniqueId& load_id, + const Request& request) { + is_eof = false; + std::lock_guard l(_lock); + auto it = _load_channels.find(load_id); + if (it == _load_channels.end()) { + auto handle = _last_success_channel->lookup(load_id.to_string()); + // success only when eos be true + if (handle != nullptr) { + _last_success_channel->release(handle); + if (request.has_eos() && request.eos()) { + is_eof = true; + return Status::OK(); + } + } + return Status::InternalError(strings::Substitute( + "fail to add batch in load channel. unknown load_id=$0", load_id.to_string())); + } + channel = it->second; + return Status::OK(); +} + +template +Status LoadChannelMgr::add_batch(const TabletWriterAddRequest& request, + TabletWriterAddResult* response) { + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + UniqueId load_id(request.id()); + // 1. get load channel + std::shared_ptr channel; + bool is_eof; + auto status = _get_load_channel(channel, is_eof, load_id, request); + if (!status.ok() || is_eof) { + return status; + } + + if (!channel->is_high_priority()) { + // 2. check if mem consumption exceed limit + // If this is a high priority load task, do not handle this. + // because this may block for a while, which may lead to rpc timeout. + _handle_mem_exceed_limit(); + } + + // 3. add batch to load channel + // batch may not exist in request(eg: eos request without batch), + // this case will be handled in load channel's add batch method. + RETURN_IF_ERROR(channel->add_batch(request, response)); + + // 4. handle finish + if (channel->is_finished()) { + _finish_load_channel(load_id); + } + return Status::OK(); +} + + } // namespace doris diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 6ae7721cf9ae28..e8ad30d83f16d1 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -18,8 +18,6 @@ #include "runtime/tablets_channel.h" #include "exec/tablet_info.h" -#include "gutil/strings/substitute.h" -#include "olap/delta_writer.h" #include "olap/memtable.h" #include "runtime/row_batch.h" #include "runtime/tuple_row.h" @@ -32,8 +30,8 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_writer_count, MetricUnit::NOUNIT); std::atomic TabletsChannel::_s_tablet_writer_count; -TabletsChannel::TabletsChannel(const TabletsChannelKey& key, bool is_high_priority) - : _key(key), _state(kInitialized), _closed_senders(64), _is_high_priority(is_high_priority) { +TabletsChannel::TabletsChannel(const TabletsChannelKey& key, bool is_high_priority, bool is_vec) + : _key(key), _state(kInitialized), _closed_senders(64), _is_high_priority(is_high_priority), _is_vec(is_vec) { _mem_tracker = MemTracker::create_tracker(-1, "TabletsChannel:" + std::to_string(key.index_id)); static std::once_flag once_flag; std::call_once(once_flag, [] { @@ -76,78 +74,6 @@ Status TabletsChannel::open(const PTabletWriterOpenRequest& request) { return Status::OK(); } -Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& request, - PTabletWriterAddBatchResult* response) { - DCHECK(request.tablet_ids_size() == request.row_batch().num_rows()); - SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); - int64_t cur_seq; - { - std::lock_guard l(_lock); - if (_state != kOpened) { - return _state == kFinished - ? _close_status - : Status::InternalError(strings::Substitute("TabletsChannel $0 state: $1", - _key.to_string(), _state)); - } - cur_seq = _next_seqs[request.sender_id()]; - // check packet - if (request.packet_seq() < cur_seq) { - LOG(INFO) << "packet has already recept before, expect_seq=" << cur_seq - << ", recept_seq=" << request.packet_seq(); - return Status::OK(); - } else if (request.packet_seq() > cur_seq) { - LOG(WARNING) << "lost data packet, expect_seq=" << cur_seq - << ", recept_seq=" << request.packet_seq(); - return Status::InternalError("lost data packet"); - } - } - - RowBatch row_batch(*_row_desc, request.row_batch()); - std::unordered_map /* row index */> tablet_to_rowidxs; - for (int i = 0; i < request.tablet_ids_size(); ++i) { - int64_t tablet_id = request.tablet_ids(i); - if (_broken_tablets.find(tablet_id) != _broken_tablets.end()) { - // skip broken tablets - continue; - } - auto it = tablet_to_rowidxs.find(tablet_id); - if (it == tablet_to_rowidxs.end()) { - tablet_to_rowidxs.emplace(tablet_id, std::initializer_list{ i }); - } else { - it->second.emplace_back(i); - } - } - - google::protobuf::RepeatedPtrField* tablet_errors = response->mutable_tablet_errors(); - for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) { - auto tablet_writer_it = _tablet_writers.find(tablet_to_rowidxs_it.first); - if (tablet_writer_it == _tablet_writers.end()) { - return Status::InternalError( - strings::Substitute("unknown tablet to append data, tablet=$0", tablet_to_rowidxs_it.first)); - } - - Status st = tablet_writer_it->second->write(&row_batch, tablet_to_rowidxs_it.second); - if (!st.ok()) { - auto err_msg = strings::Substitute( - "tablet writer write failed, tablet_id=$0, txn_id=$1, err=$2", - tablet_to_rowidxs_it.first, _txn_id, st.to_string()); - LOG(WARNING) << err_msg; - PTabletError* error = tablet_errors->Add(); - error->set_tablet_id(tablet_to_rowidxs_it.first); - error->set_msg(err_msg); - _broken_tablets.insert(tablet_to_rowidxs_it.first); - // continue write to other tablet. - // the error will return back to sender. - } - } - - { - std::lock_guard l(_lock); - _next_seqs[request.sender_id()] = cur_seq + 1; - } - return Status::OK(); -} - Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished, const google::protobuf::RepeatedField& partition_ids, google::protobuf::RepeatedPtrField* tablet_vec) { @@ -288,7 +214,7 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request wrequest.is_high_priority = _is_high_priority; DeltaWriter* writer = nullptr; - auto st = DeltaWriter::open(&wrequest, &writer); + auto st = DeltaWriter::open(&wrequest, &writer, _is_vec); if (!st.ok()) { std::stringstream ss; ss << "open delta writer failed, tablet_id=" << tablet.tablet_id() diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index 226b2b76db05f5..1d0fe3c5fa14aa 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +#pragma once + #include #include #include @@ -25,9 +27,14 @@ #include "gen_cpp/internal_service.pb.h" #include "runtime/descriptors.h" #include "runtime/mem_tracker.h" +#include "runtime/thread_context.h" #include "util/bitmap.h" #include "util/priority_thread_pool.hpp" #include "util/uid_util.h" +#include "gutil/strings/substitute.h" + +#include "vec/core/block.h" +#include "olap/delta_writer.h" namespace doris { @@ -54,14 +61,15 @@ class OlapTableSchemaParam; // Write channel for a particular (load, index). class TabletsChannel { public: - TabletsChannel(const TabletsChannelKey& key, bool is_high_priority); + TabletsChannel(const TabletsChannelKey& key, bool is_high_priority, bool is_vec); ~TabletsChannel(); Status open(const PTabletWriterOpenRequest& request); // no-op when this channel has been closed or cancelled - Status add_batch(const PTabletWriterAddBatchRequest& request, PTabletWriterAddBatchResult* response); + template + Status add_batch(const TabletWriterAddRequest& request, TabletWriterAddResult* response); // Mark sender with 'sender_id' as closed. // If all senders are closed, close this channel, set '*finished' to true, update 'tablet_vec' @@ -83,10 +91,12 @@ class TabletsChannel { int64_t mem_consumption() const { return _mem_tracker->consumption(); } private: + template + Status _get_current_seq(int64_t& cur_seq, const Request& request); + // open all writer Status _open_all_writers(const PTabletWriterOpenRequest& request); -private: // id of this load channel TabletsChannelKey _key; @@ -104,6 +114,7 @@ class TabletsChannel { int64_t _txn_id = -1; int64_t _index_id = -1; OlapTableSchemaParam* _schema = nullptr; + TupleDescriptor* _tuple_desc = nullptr; // row_desc used to construct RowDescriptor* _row_desc = nullptr; @@ -130,6 +141,97 @@ class TabletsChannel { static std::atomic _s_tablet_writer_count; bool _is_high_priority = false; + + bool _is_vec = false; }; +template +Status TabletsChannel::_get_current_seq(int64_t& cur_seq, const Request& request) { + std::lock_guard l(_lock); + if (_state != kOpened) { + return _state == kFinished + ? _close_status + : Status::InternalError(strings::Substitute("TabletsChannel $0 state: $1", + _key.to_string(), _state)); + } + cur_seq = _next_seqs[request.sender_id()]; + // check packet + if (request.packet_seq() > cur_seq) { + LOG(WARNING) << "lost data packet, expect_seq=" << cur_seq + << ", recept_seq=" << request.packet_seq(); + return Status::InternalError("lost data packet"); + } + return Status::OK(); +} + +template +Status TabletsChannel::add_batch(const TabletWriterAddRequest& request, + TabletWriterAddResult* response) { + SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker); + int64_t cur_seq = 0; + + auto status = _get_current_seq(cur_seq, request); + if (UNLIKELY(!status.ok())) { + return status; + } + + if (request.packet_seq() < cur_seq) { + LOG(INFO) << "packet has already recept before, expect_seq=" << cur_seq + << ", recept_seq=" << request.packet_seq(); + return Status::OK(); + } + + std::unordered_map /* row index */> tablet_to_rowidxs; + for (int i = 0; i < request.tablet_ids_size(); ++i) { + int64_t tablet_id = request.tablet_ids(i); + if (_broken_tablets.find(tablet_id) != _broken_tablets.end()) { + // skip broken tablets + continue; + } + auto it = tablet_to_rowidxs.find(tablet_id); + if (it == tablet_to_rowidxs.end()) { + tablet_to_rowidxs.emplace(tablet_id, std::initializer_list{ i }); + } else { + it->second.emplace_back(i); + } + } + + auto get_send_data = [&] () { + if constexpr (std::is_same_v) { + return RowBatch(*_row_desc, request.row_batch()); + } else { + return vectorized::Block(request.block()); + } + }; + + auto send_data = get_send_data(); + google::protobuf::RepeatedPtrField* tablet_errors = response->mutable_tablet_errors(); + for (const auto& tablet_to_rowidxs_it : tablet_to_rowidxs) { + auto tablet_writer_it = _tablet_writers.find(tablet_to_rowidxs_it.first); + if (tablet_writer_it == _tablet_writers.end()) { + return Status::InternalError( + strings::Substitute("unknown tablet to append data, tablet=$0", tablet_to_rowidxs_it.first)); + } + + Status st = tablet_writer_it->second->write(&send_data, tablet_to_rowidxs_it.second); + if (!st.ok()) { + auto err_msg = strings::Substitute( + "tablet writer write failed, tablet_id=$0, txn_id=$1, err=$2", + tablet_to_rowidxs_it.first, _txn_id, st.code()); + LOG(WARNING) << err_msg; + PTabletError* error = tablet_errors->Add(); + error->set_tablet_id(tablet_to_rowidxs_it.first); + error->set_msg(err_msg); + _broken_tablets.insert(tablet_to_rowidxs_it.first); + // continue write to other tablet. + // the error will return back to sender. + } + } + + { + std::lock_guard l(_lock); + _next_seqs[request.sender_id()] = cur_seq + 1; + } + return Status::OK(); +} } // namespace doris diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 5080b1c21825b2..9fa45b9a66ebee 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -123,6 +123,37 @@ void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController st.to_protobuf(response->mutable_status()); } +template +void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController* cntl_base, + const PTabletWriterAddBlockRequest* request, + PTabletWriterAddBlockResult* response, + google::protobuf::Closure* done) { + VLOG_RPC << "tablet writer add block, id=" << request->id() + << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id() + << ", current_queued_size=" << _tablet_worker_pool.get_queue_size(); + int64_t submit_task_time_ns = MonotonicNanos(); + _tablet_worker_pool.offer([cntl_base, request, response, done, submit_task_time_ns, this]() { + int64_t wait_execution_time_ns = MonotonicNanos() - submit_task_time_ns; + brpc::ClosureGuard closure_guard(done); + int64_t execution_time_ns = 0; + { + SCOPED_RAW_TIMER(&execution_time_ns); + brpc::Controller* cntl = static_cast(cntl_base); + attachment_transfer_request_block(request, cntl); + auto st = _exec_env->load_channel_mgr()->add_batch(*request, response); + if (!st.ok()) { + LOG(WARNING) << "tablet writer add block failed, message=" << st.get_error_msg() + << ", id=" << request->id() << ", index_id=" << request->index_id() + << ", sender_id=" << request->sender_id() + << ", backend id=" << request->backend_id(); + } + st.to_protobuf(response->mutable_status()); + } + response->set_execution_time_us(execution_time_ns / NANOS_PER_MICRO); + response->set_wait_execution_time_us(wait_execution_time_ns / NANOS_PER_MICRO); + }); +} + template void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcController* cntl_base, const PTabletWriterAddBatchRequest* request, diff --git a/be/src/service/internal_service.h b/be/src/service/internal_service.h index c4073bf86ed282..ce4913701d0bff 100644 --- a/be/src/service/internal_service.h +++ b/be/src/service/internal_service.h @@ -64,6 +64,11 @@ class PInternalServiceImpl : public T { PTabletWriterAddBatchResult* response, google::protobuf::Closure* done) override; + void tablet_writer_add_block(google::protobuf::RpcController* controller, + const PTabletWriterAddBlockRequest* request, + PTabletWriterAddBlockResult* response, + google::protobuf::Closure* done) override; + void tablet_writer_cancel(google::protobuf::RpcController* controller, const PTabletWriterCancelRequest* request, PTabletWriterCancelResult* response, diff --git a/be/src/udf/udf.cpp b/be/src/udf/udf.cpp index b343155cffba00..f7491d4c5ecc28 100644 --- a/be/src/udf/udf.cpp +++ b/be/src/udf/udf.cpp @@ -178,7 +178,7 @@ doris_udf::FunctionContext* FunctionContextImpl::create_context( const doris_udf::FunctionContext::TypeDesc& return_type, const std::vector& arg_types, int varargs_buffer_size, bool debug) { - doris_udf::FunctionContext* ctx = new doris_udf::FunctionContext(); + auto* ctx = new doris_udf::FunctionContext(); ctx->_impl->_state = state; ctx->_impl->_pool = new FreePool(pool); ctx->_impl->_intermediate_type = intermediate_type; diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt index 63f07d14082ffb..7555e9d0ca9222 100644 --- a/be/src/vec/CMakeLists.txt +++ b/be/src/vec/CMakeLists.txt @@ -99,6 +99,8 @@ set(VEC_FILES exec/vassert_num_rows_node.cpp exec/vrepeat_node.cpp exec/vtable_function_node.cpp + exec/vbroker_scan_node.cpp + exec/vbroker_scanner.cpp exec/join/vhash_join_node.cpp exprs/vectorized_agg_fn.cpp exprs/vectorized_fn_call.cpp @@ -177,6 +179,7 @@ set(VEC_FILES olap/vgeneric_iterators.cpp olap/vcollect_iterator.cpp olap/block_reader.cpp + olap/olap_data_convertor.cpp sink/mysql_result_writer.cpp sink/result_sink.cpp sink/vdata_stream_sender.cpp diff --git a/be/src/vec/aggregate_functions/aggregate_function_reader.cpp b/be/src/vec/aggregate_functions/aggregate_function_reader.cpp index ce78397794fb4e..f90515fd5e09dc 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_reader.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_reader.cpp @@ -20,29 +20,38 @@ namespace doris::vectorized { // auto spread at nullable condition, null value do not participate aggregate -void register_aggregate_function_reader(AggregateFunctionSimpleFactory& factory) { +void register_aggregate_function_reader_load(AggregateFunctionSimpleFactory& factory) { // add a suffix to the function name here to distinguish special functions of agg reader - auto register_function_reader = [&](const std::string& name, - const AggregateFunctionCreator& creator) { - factory.register_function(name + agg_reader_suffix, creator, false); + auto register_function = [&](const std::string& name, + const AggregateFunctionCreator& creator) { + factory.register_function(name + AGG_READER_SUFFIX, creator, false); + factory.register_function(name + AGG_LOAD_SUFFIX, creator, false); }; - register_function_reader("sum", create_aggregate_function_sum_reader); - register_function_reader("max", create_aggregate_function_max); - register_function_reader("min", create_aggregate_function_min); - register_function_reader("replace_if_not_null", create_aggregate_function_replace_if_not_null); - register_function_reader("bitmap_union", create_aggregate_function_bitmap_union); - register_function_reader("hll_union", create_aggregate_function_HLL_union); + register_function("sum", create_aggregate_function_sum_reader); + register_function("max", create_aggregate_function_max); + register_function("min", create_aggregate_function_min); + register_function("bitmap_union", create_aggregate_function_bitmap_union); + register_function("hll_union", create_aggregate_function_HLL_union); } -void register_aggregate_function_reader_no_spread(AggregateFunctionSimpleFactory& factory) { - auto register_function_reader = [&](const std::string& name, - const AggregateFunctionCreator& creator, bool nullable) { - factory.register_function(name + agg_reader_suffix, creator, nullable); +// only replace funtion in load/reader do different agg operation. +// because Doris can ensure that the data is globally ordered in reader, but cannot in load +// 1. reader, get the first value of input data. +// 2. load, get the last value of input data. +void register_aggregate_function_replace_reader_load(AggregateFunctionSimpleFactory& factory) { + auto register_function = [&](const std::string& name, const std::string& suffix, + const AggregateFunctionCreator& creator, bool nullable) { + factory.register_function(name + suffix, creator, nullable); }; - register_function_reader("replace", create_aggregate_function_replace, false); - register_function_reader("replace", create_aggregate_function_replace_nullable, true); + register_function("replace", AGG_READER_SUFFIX, create_aggregate_function_first, false); + register_function("replace", AGG_READER_SUFFIX, create_aggregate_function_first, true); + register_function("replace", AGG_LOAD_SUFFIX, create_aggregate_function_last, false); + register_function("replace", AGG_LOAD_SUFFIX, create_aggregate_function_last, true); + + register_function("replace_if_not_null", AGG_READER_SUFFIX, create_aggregate_function_first, false); + register_function("replace_if_not_null", AGG_LOAD_SUFFIX, create_aggregate_function_last, false); } } // namespace doris::vectorized diff --git a/be/src/vec/aggregate_functions/aggregate_function_reader.h b/be/src/vec/aggregate_functions/aggregate_function_reader.h index f44be5ee574168..86fea6f079b855 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_reader.h +++ b/be/src/vec/aggregate_functions/aggregate_function_reader.h @@ -26,10 +26,11 @@ namespace doris::vectorized { -static const std::string agg_reader_suffix = "_reader"; +static auto constexpr AGG_READER_SUFFIX = "_reader"; +static auto constexpr AGG_LOAD_SUFFIX = "_load"; -void register_aggregate_function_reader(AggregateFunctionSimpleFactory& factory); +void register_aggregate_function_reader_load(AggregateFunctionSimpleFactory& factory); -void register_aggregate_function_reader_no_spread(AggregateFunctionSimpleFactory& factory); +void register_aggregate_function_replace_reader_load(AggregateFunctionSimpleFactory& factory); } // namespace doris::vectorized diff --git a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp index 6315fd6600c3fc..4931958276605b 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp @@ -58,7 +58,7 @@ AggregateFunctionSimpleFactory& AggregateFunctionSimpleFactory::instance() { register_aggregate_function_uniq(instance); register_aggregate_function_bitmap(instance); register_aggregate_function_combinator_distinct(instance); - register_aggregate_function_reader(instance); // register aggregate function for agg reader + register_aggregate_function_reader_load(instance); // register aggregate function for agg reader register_aggregate_function_window_rank(instance); register_aggregate_function_stddev_variance_pop(instance); register_aggregate_function_topn(instance); @@ -72,7 +72,7 @@ AggregateFunctionSimpleFactory& AggregateFunctionSimpleFactory::instance() { register_aggregate_function_combinator_null(instance); register_aggregate_function_stddev_variance_samp(instance); - register_aggregate_function_reader_no_spread(instance); + register_aggregate_function_replace_reader_load(instance); register_aggregate_function_window_lead_lag(instance); register_aggregate_function_HLL_union_agg(instance); register_aggregate_function_percentile_approx(instance); diff --git a/be/src/vec/aggregate_functions/aggregate_function_window.cpp b/be/src/vec/aggregate_functions/aggregate_function_window.cpp index b96f241e513c47..53a4c4931c4f18 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_window.cpp +++ b/be/src/vec/aggregate_functions/aggregate_function_window.cpp @@ -23,7 +23,7 @@ #include "common/logging.h" #include "vec/aggregate_functions/aggregate_function_simple_factory.h" #include "vec/aggregate_functions/factory_helpers.h" -#include "vec/aggregate_functions/helpers.h" + namespace doris::vectorized { AggregateFunctionPtr create_aggregate_function_dense_rank(const std::string& name, @@ -53,44 +53,6 @@ AggregateFunctionPtr create_aggregate_function_row_number(const std::string& nam return std::make_shared(argument_types); } -template