diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index 957a6ca8bd3efe..92afb4849b0bf0 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -886,8 +886,6 @@ struct LocalMergeExchangeSharedState : public LocalExchangeSharedState { void create_dependencies(int local_exchange_id) override { sink_deps.resize(source_deps.size()); - std::vector new_deps(sink_deps.size(), nullptr); - source_deps.swap(new_deps); for (size_t i = 0; i < source_deps.size(); i++) { source_deps[i] = std::make_shared(local_exchange_id, local_exchange_id, diff --git a/be/src/vec/common/sort/partition_sorter.cpp b/be/src/vec/common/sort/partition_sorter.cpp index 1ea7c6de6a8a77..c363a41d1c772e 100644 --- a/be/src/vec/common/sort/partition_sorter.cpp +++ b/be/src/vec/common/sort/partition_sorter.cpp @@ -58,20 +58,17 @@ Status PartitionSorter::append_block(Block* input_block) { Block sorted_block = VectorizedUtils::create_empty_columnswithtypename(_row_desc); DCHECK(input_block->columns() == sorted_block.columns()); RETURN_IF_ERROR(partial_sort(*input_block, sorted_block)); - RETURN_IF_ERROR(_state->add_sorted_block(sorted_block)); + _state->add_sorted_block(Block::create_shared(std::move(sorted_block))); return Status::OK(); } Status PartitionSorter::prepare_for_read() { - auto& cursors = _state->get_cursors(); auto& blocks = _state->get_sorted_block(); auto& priority_queue = _state->get_priority_queue(); for (auto& block : blocks) { - cursors.emplace_back(block, _sort_description); - } - for (auto& cursor : cursors) { - priority_queue.push(MergeSortCursor(&cursor)); + priority_queue.push(MergeSortCursorImpl::create_shared(block, _sort_description)); } + blocks.clear(); return Status::OK(); } @@ -84,29 +81,30 @@ void PartitionSorter::reset_sorter_state(RuntimeState* runtime_state) { } Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos) { - if (_state->get_sorted_block().empty()) { + if (_state->get_priority_queue().empty()) { + *eos = true; + } else if (_state->get_priority_queue().size() == 1 && _has_global_limit) { + block->swap(*_state->get_priority_queue().top().impl->block); + block->set_num_rows(_partition_inner_limit); *eos = true; } else { - if (_state->get_sorted_block().size() == 1 && _has_global_limit) { - auto& sorted_block = _state->get_sorted_block()[0]; - block->swap(sorted_block); - block->set_num_rows(_partition_inner_limit); - *eos = true; - } else { - RETURN_IF_ERROR(partition_sort_read(block, eos, state->batch_size())); - } + RETURN_IF_ERROR(partition_sort_read(block, eos, state->batch_size())); } return Status::OK(); } Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int batch_size) { - const auto& sorted_block = _state->get_sorted_block()[0]; - size_t num_columns = sorted_block.columns(); + auto& priority_queue = _state->get_priority_queue(); + if (priority_queue.empty()) { + *eos = true; + return Status::OK(); + } + const auto& sorted_block = priority_queue.top().impl->block; + size_t num_columns = sorted_block->columns(); MutableBlock m_block = - VectorizedUtils::build_mutable_mem_reuse_block(output_block, sorted_block); + VectorizedUtils::build_mutable_mem_reuse_block(output_block, *sorted_block); MutableColumns& merged_columns = m_block.mutable_columns(); size_t current_output_rows = 0; - auto& priority_queue = _state->get_priority_queue(); bool get_enough_data = false; while (!priority_queue.empty()) { @@ -121,7 +119,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int //1 row_number no need to check distinct, just output partition_inner_limit row if ((current_output_rows + _output_total_rows) < _partition_inner_limit) { for (size_t i = 0; i < num_columns; ++i) { - merged_columns[i]->insert_from(*current->all_columns[i], current->pos); + merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos); } } else { //rows has get enough @@ -155,7 +153,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int } } for (size_t i = 0; i < num_columns; ++i) { - merged_columns[i]->insert_from(*current->all_columns[i], current->pos); + merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos); } break; } @@ -180,7 +178,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int *_previous_row = current; } for (size_t i = 0; i < num_columns; ++i) { - merged_columns[i]->insert_from(*current->all_columns[i], current->pos); + merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos); } current_output_rows++; break; diff --git a/be/src/vec/common/sort/partition_sorter.h b/be/src/vec/common/sort/partition_sorter.h index 77dcb68371131c..01e009d200db8b 100644 --- a/be/src/vec/common/sort/partition_sorter.h +++ b/be/src/vec/common/sort/partition_sorter.h @@ -50,7 +50,7 @@ struct SortCursorCmp { SortCursorCmp(const MergeSortCursor& cursor) : row(cursor->pos), impl(cursor.impl) {} void reset() { - impl = nullptr; + impl->reset(); row = 0; } bool compare_two_rows(const MergeSortCursor& rhs) const { @@ -67,7 +67,7 @@ struct SortCursorCmp { return true; } int row = 0; - MergeSortCursorImpl* impl = nullptr; + std::shared_ptr impl = nullptr; }; class PartitionSorter final : public Sorter { diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp index eca7e15626b2eb..89f1c7d73f1c58 100644 --- a/be/src/vec/common/sort/sorter.cpp +++ b/be/src/vec/common/sort/sorter.cpp @@ -59,48 +59,44 @@ namespace doris::vectorized { void MergeSorterState::reset() { auto empty_queue = std::priority_queue(); priority_queue_.swap(empty_queue); - std::vector empty_cursors(0); - cursors_.swap(empty_cursors); - std::vector empty_blocks(0); + std::vector> empty_cursors(0); + std::vector> empty_blocks(0); sorted_blocks_.swap(empty_blocks); unsorted_block_ = Block::create_unique(unsorted_block_->clone_empty()); in_mem_sorted_bocks_size_ = 0; } -Status MergeSorterState::add_sorted_block(Block& block) { - auto rows = block.rows(); +void MergeSorterState::add_sorted_block(std::shared_ptr block) { + auto rows = block->rows(); if (0 == rows) { - return Status::OK(); + return; } - in_mem_sorted_bocks_size_ += block.bytes(); - sorted_blocks_.emplace_back(std::move(block)); + in_mem_sorted_bocks_size_ += block->bytes(); + sorted_blocks_.emplace_back(block); num_rows_ += rows; - return Status::OK(); } Status MergeSorterState::build_merge_tree(const SortDescription& sort_description) { for (auto& block : sorted_blocks_) { - cursors_.emplace_back(block, sort_description); - } - - if (sorted_blocks_.size() > 1) { - for (auto& cursor : cursors_) { - priority_queue_.emplace(&cursor); - } + priority_queue_.emplace( + MergeSortCursorImpl::create_shared(std::move(block), sort_description)); } + sorted_blocks_.clear(); return Status::OK(); } Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int batch_size, bool* eos) { - if (sorted_blocks_.empty()) { + DCHECK(sorted_blocks_.empty()); + DCHECK(unsorted_block_->empty()); + if (priority_queue_.empty()) { *eos = true; - } else if (sorted_blocks_.size() == 1) { + } else if (priority_queue_.size() == 1) { if (offset_ != 0) { - sorted_blocks_[0].skip_num_rows(offset_); + priority_queue_.top().impl->block->skip_num_rows(offset_); } - block->swap(sorted_blocks_[0]); + block->swap(*priority_queue_.top().impl->block); *eos = true; } else { RETURN_IF_ERROR(_merge_sort_read_impl(batch_size, block, eos)); @@ -110,9 +106,14 @@ Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int ba Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized::Block* block, bool* eos) { - size_t num_columns = sorted_blocks_[0].columns(); + if (priority_queue_.empty()) { + *eos = true; + return Status::OK(); + } + size_t num_columns = priority_queue_.top().impl->block->columns(); - MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(block, sorted_blocks_[0]); + MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block( + block, *priority_queue_.top().impl->block); MutableColumns& merged_columns = m_block.mutable_columns(); /// Take rows from queue in right order and push to 'merged'. @@ -123,7 +124,7 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized if (offset_ == 0) { for (size_t i = 0; i < num_columns; ++i) - merged_columns[i]->insert_from(*current->all_columns[i], current->pos); + merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos); ++merged_rows; } else { offset_--; @@ -134,7 +135,9 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized priority_queue_.push(current); } - if (merged_rows == batch_size) break; + if (merged_rows == batch_size) { + break; + } } block->set_columns(std::move(merged_columns)); @@ -261,22 +264,22 @@ Status FullSorter::_do_sort() { // if one block totally greater the heap top of _block_priority_queue // we can throw the block data directly. if (_state->num_rows() < _offset + _limit) { - static_cast(_state->add_sorted_block(desc_block)); - _block_priority_queue.emplace(_pool->add( - new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description))); + _state->add_sorted_block(Block::create_shared(std::move(desc_block))); + _block_priority_queue.emplace(MergeSortCursorImpl::create_shared( + _state->last_sorted_block(), _sort_description)); } else { - auto tmp_cursor_impl = - std::make_unique(desc_block, _sort_description); - MergeSortBlockCursor block_cursor(tmp_cursor_impl.get()); + auto tmp_cursor_impl = MergeSortCursorImpl::create_shared( + Block::create_shared(std::move(desc_block)), _sort_description); + MergeSortBlockCursor block_cursor(tmp_cursor_impl); if (!block_cursor.totally_greater(_block_priority_queue.top())) { - static_cast(_state->add_sorted_block(desc_block)); - _block_priority_queue.emplace(_pool->add( - new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description))); + _state->add_sorted_block(tmp_cursor_impl->block); + _block_priority_queue.emplace(MergeSortCursorImpl::create_shared( + _state->last_sorted_block(), _sort_description)); } } } else { // dispose normal sort logic - static_cast(_state->add_sorted_block(desc_block)); + _state->add_sorted_block(Block::create_shared(std::move(desc_block))); } return Status::OK(); } diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h index 478e91c0783f1e..aa7d88dfbc2a3a 100644 --- a/be/src/vec/common/sort/sorter.h +++ b/be/src/vec/common/sort/sorter.h @@ -59,7 +59,7 @@ class MergeSorterState { ~MergeSorterState() = default; - Status add_sorted_block(Block& block); + void add_sorted_block(std::shared_ptr block); Status build_merge_tree(const SortDescription& sort_description); @@ -72,23 +72,19 @@ class MergeSorterState { uint64_t num_rows() const { return num_rows_; } - Block& last_sorted_block() { return sorted_blocks_.back(); } + std::shared_ptr last_sorted_block() { return sorted_blocks_.back(); } - std::vector& get_sorted_block() { return sorted_blocks_; } + std::vector>& get_sorted_block() { return sorted_blocks_; } std::priority_queue& get_priority_queue() { return priority_queue_; } - std::vector& get_cursors() { return cursors_; } void reset(); std::unique_ptr unsorted_block_; private: - int _calc_spill_blocks_to_merge() const; - Status _merge_sort_read_impl(int batch_size, doris::vectorized::Block* block, bool* eos); std::priority_queue priority_queue_; - std::vector cursors_; - std::vector sorted_blocks_; + std::vector> sorted_blocks_; size_t in_mem_sorted_bocks_size_ = 0; uint64_t num_rows_ = 0; diff --git a/be/src/vec/common/sort/topn_sorter.cpp b/be/src/vec/common/sort/topn_sorter.cpp index 58c3cd2dd0cfad..1f24fb14c950a9 100644 --- a/be/src/vec/common/sort/topn_sorter.cpp +++ b/be/src/vec/common/sort/topn_sorter.cpp @@ -72,17 +72,16 @@ Status TopNSorter::_do_sort(Block* block) { // if one block totally greater the heap top of _block_priority_queue // we can throw the block data directly. if (_state->num_rows() < _offset + _limit) { - RETURN_IF_ERROR(_state->add_sorted_block(sorted_block)); - _block_priority_queue.emplace(_pool->add( - new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description))); + _state->add_sorted_block(Block::create_shared(std::move(sorted_block))); + _block_priority_queue.emplace(MergeSortCursorImpl::create_shared( + _state->last_sorted_block(), _sort_description)); } else { - auto tmp_cursor_impl = - std::make_unique(sorted_block, _sort_description); - MergeSortBlockCursor block_cursor(tmp_cursor_impl.get()); + auto tmp_cursor_impl = MergeSortCursorImpl::create_shared( + Block::create_shared(std::move(sorted_block)), _sort_description); + MergeSortBlockCursor block_cursor(tmp_cursor_impl); if (!block_cursor.totally_greater(_block_priority_queue.top())) { - RETURN_IF_ERROR(_state->add_sorted_block(sorted_block)); - _block_priority_queue.emplace(_pool->add( - new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description))); + _state->add_sorted_block(block_cursor.impl->block); + _block_priority_queue.emplace(tmp_cursor_impl); } } } else { diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h index 7e703e5982d76b..d31767f46e461f 100644 --- a/be/src/vec/core/sort_cursor.h +++ b/be/src/vec/core/sort_cursor.h @@ -120,7 +120,8 @@ struct HeapSortCursorImpl { * It is used in priority queue. */ struct MergeSortCursorImpl { - ColumnRawPtrs all_columns; + ENABLE_FACTORY_CREATOR(MergeSortCursorImpl); + std::shared_ptr block; ColumnRawPtrs sort_columns; SortDescription desc; size_t sort_columns_size = 0; @@ -130,37 +131,30 @@ struct MergeSortCursorImpl { MergeSortCursorImpl() = default; virtual ~MergeSortCursorImpl() = default; - MergeSortCursorImpl(Block& block, const SortDescription& desc_) - : desc(desc_), sort_columns_size(desc.size()) { - reset(block); + MergeSortCursorImpl(std::shared_ptr block_, const SortDescription& desc_) + : block(block_), desc(desc_), sort_columns_size(desc.size()) { + reset(); } MergeSortCursorImpl(const SortDescription& desc_) - : desc(desc_), sort_columns_size(desc.size()) {} + : block(Block::create_shared()), desc(desc_), sort_columns_size(desc.size()) {} bool empty() const { return rows == 0; } /// Set the cursor to the beginning of the new block. - void reset(Block& block) { - all_columns.clear(); + void reset() { sort_columns.clear(); - auto columns = block.get_columns_and_convert(); - size_t num_columns = columns.size(); - - for (size_t j = 0; j < num_columns; ++j) { - all_columns.push_back(columns[j].get()); - } - + auto columns = block->get_columns_and_convert(); for (size_t j = 0, size = desc.size(); j < size; ++j) { auto& column_desc = desc[j]; size_t column_number = !column_desc.column_name.empty() - ? block.get_position_by_name(column_desc.column_name) + ? block->get_position_by_name(column_desc.column_name) : column_desc.column_number; sort_columns.push_back(columns[column_number].get()); } pos = 0; - rows = all_columns[0]->size(); + rows = block->rows(); } bool is_first() const { return pos == 0; } @@ -174,11 +168,13 @@ struct MergeSortCursorImpl { using BlockSupplier = std::function; struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl { + ENABLE_FACTORY_CREATOR(BlockSupplierSortCursorImpl); BlockSupplierSortCursorImpl(const BlockSupplier& block_supplier, const VExprContextSPtrs& ordering_expr, const std::vector& is_asc_order, const std::vector& nulls_first) : _ordering_expr(ordering_expr), _block_supplier(block_supplier) { + block = Block::create_shared(); sort_columns_size = ordering_expr.size(); desc.resize(ordering_expr.size()); @@ -198,21 +194,21 @@ struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl { if (_is_eof) { return false; } - _block.clear(); + block->clear(); Status status; do { - status = _block_supplier(&_block, &_is_eof); - } while (_block.empty() && !_is_eof && status.ok()); + status = _block_supplier(block.get(), &_is_eof); + } while (block->empty() && !_is_eof && status.ok()); // If status not ok, upper callers could not detect whether it is eof or error. // So that fatal here, and should throw exception in the future. - if (status.ok() && !_block.empty()) { + if (status.ok() && !block->empty()) { if (_ordering_expr.size() > 0) { for (int i = 0; status.ok() && i < desc.size(); ++i) { // TODO yiguolei: throw exception if status not ok in the future - status = _ordering_expr[i]->execute(&_block, &desc[i].column_number); + status = _ordering_expr[i]->execute(block.get(), &desc[i].column_number); } } - MergeSortCursorImpl::reset(_block); + MergeSortCursorImpl::reset(); return status.ok(); } else if (!status.ok()) { throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, status.msg()); @@ -224,32 +220,21 @@ struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl { if (_is_eof) { return nullptr; } - return &_block; - } - - size_t columns_num() const { return all_columns.size(); } - - Block create_empty_blocks() const { - size_t num_columns = columns_num(); - MutableColumns columns(num_columns); - for (size_t i = 0; i < num_columns; ++i) { - columns[i] = all_columns[i]->clone_empty(); - } - return _block.clone_with_columns(std::move(columns)); + return block.get(); } VExprContextSPtrs _ordering_expr; - Block _block; BlockSupplier _block_supplier {}; bool _is_eof = false; }; /// For easy copying. struct MergeSortCursor { - MergeSortCursorImpl* impl; + ENABLE_FACTORY_CREATOR(MergeSortCursor); + std::shared_ptr impl; - MergeSortCursor(MergeSortCursorImpl* impl_) : impl(impl_) {} - MergeSortCursorImpl* operator->() const { return impl; } + MergeSortCursor(std::shared_ptr impl_) : impl(impl_) {} + MergeSortCursorImpl* operator->() const { return impl.get(); } /// The specified row of this cursor is greater than the specified row of another cursor. int8_t greater_at(const MergeSortCursor& rhs, size_t lhs_pos, size_t rhs_pos) const { @@ -289,10 +274,11 @@ struct MergeSortCursor { /// For easy copying. struct MergeSortBlockCursor { - MergeSortCursorImpl* impl = nullptr; + ENABLE_FACTORY_CREATOR(MergeSortBlockCursor); + std::shared_ptr impl = nullptr; - MergeSortBlockCursor(MergeSortCursorImpl* impl_) : impl(impl_) {} - MergeSortCursorImpl* operator->() const { return impl; } + MergeSortBlockCursor(std::shared_ptr impl_) : impl(impl_) {} + MergeSortCursorImpl* operator->() const { return impl.get(); } /// The specified row of this cursor is greater than the specified row of another cursor. int8_t less_at(const MergeSortBlockCursor& rhs, int rows) const { diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp b/be/src/vec/runtime/vsorted_run_merger.cpp index ef054190a3b45a..e2b2e9e25537f2 100644 --- a/be/src/vec/runtime/vsorted_run_merger.cpp +++ b/be/src/vec/runtime/vsorted_run_merger.cpp @@ -28,14 +28,6 @@ #include "vec/core/column_with_type_and_name.h" #include "vec/utils/util.hpp" -namespace doris { -namespace vectorized { -class VExprContext; -} // namespace vectorized -} // namespace doris - -using std::vector; - namespace doris::vectorized { VSortedRunMerger::VSortedRunMerger(const VExprContextSPtrs& ordering_expr, @@ -68,13 +60,14 @@ void VSortedRunMerger::init_timers(RuntimeProfile* profile) { _get_next_block_timer = ADD_TIMER(profile, "MergeGetNextBlock"); } -Status VSortedRunMerger::prepare(const vector& input_runs) { +Status VSortedRunMerger::prepare(const std::vector& input_runs) { try { for (const auto& supplier : input_runs) { if (_use_sort_desc) { - _cursors.emplace_back(supplier, _desc); + _cursors.emplace_back(BlockSupplierSortCursorImpl::create_shared(supplier, _desc)); } else { - _cursors.emplace_back(supplier, _ordering_expr, _is_asc_order, _nulls_first); + _cursors.emplace_back(BlockSupplierSortCursorImpl::create_shared( + supplier, _ordering_expr, _is_asc_order, _nulls_first)); } } } catch (const std::exception& e) { @@ -82,15 +75,8 @@ Status VSortedRunMerger::prepare(const vector& input_runs) { } for (auto& _cursor : _cursors) { - if (!_cursor._is_eof) { - _priority_queue.push(MergeSortCursor(&_cursor)); - } - } - - for (const auto& cursor : _cursors) { - if (!cursor._is_eof) { - _empty_block = cursor.create_empty_blocks(); - break; + if (!_cursor->_is_eof) { + _priority_queue.push(MergeSortCursor(_cursor)); } } @@ -139,7 +125,7 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { } } else { if (current->block_ptr() != nullptr) { - for (int i = 0; i < current->all_columns.size(); i++) { + for (int i = 0; i < current->block->columns(); i++) { auto& column_with_type = current->block_ptr()->get_by_position(i); column_with_type.column = column_with_type.column->cut( current->pos, current->rows - current->pos); @@ -153,9 +139,9 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { } } } else { - size_t num_columns = _empty_block.columns(); - MutableBlock m_block = - VectorizedUtils::build_mutable_mem_reuse_block(output_block, _empty_block); + size_t num_columns = _priority_queue.top().impl->block->columns(); + MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block( + output_block, *_priority_queue.top().impl->block); MutableColumns& merged_columns = m_block.mutable_columns(); if (num_columns != merged_columns.size()) { diff --git a/be/src/vec/runtime/vsorted_run_merger.h b/be/src/vec/runtime/vsorted_run_merger.h index 8dd706cad16f72..f01e6794e6e9f5 100644 --- a/be/src/vec/runtime/vsorted_run_merger.h +++ b/be/src/vec/runtime/vsorted_run_merger.h @@ -30,9 +30,7 @@ #include "vec/core/sort_description.h" #include "vec/exprs/vexpr_fwd.h" -namespace doris { - -namespace vectorized { +namespace doris::vectorized { // VSortedRunMerger is used to merge multiple sorted runs of blocks. A run is a sorted // sequence of blocks, which are fetched from a BlockSupplier function object. @@ -74,14 +72,12 @@ class VSortedRunMerger { int64_t _limit = -1; size_t _offset = 0; - std::vector _cursors; + std::vector> _cursors; std::priority_queue _priority_queue; /// In pipeline engine, if a cursor needs to read one more block from supplier, /// we make it as a pending cursor until the supplier is readable. - MergeSortCursorImpl* _pending_cursor = nullptr; - - Block _empty_block; + std::shared_ptr _pending_cursor = nullptr; // Times calls to get_next(). RuntimeProfile::Counter* _get_next_timer = nullptr; @@ -101,5 +97,4 @@ class VSortedRunMerger { bool has_next_block(MergeSortCursor& current); }; -} // namespace vectorized -} // namespace doris +} // namespace doris::vectorized