From 00fa574559edea14eadc6d7a97ad289066e44bc4 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Tue, 13 Aug 2024 17:48:33 +0800 Subject: [PATCH 1/9] [Improvement](sort) Free sort blocks if this block is exhausted --- be/src/pipeline/dependency.h | 2 - be/src/vec/common/sort/partition_sorter.cpp | 43 ++++++------ be/src/vec/common/sort/partition_sorter.h | 4 +- be/src/vec/common/sort/sorter.cpp | 64 +++++++++++------- be/src/vec/common/sort/sorter.h | 16 ++--- be/src/vec/common/sort/topn_sorter.cpp | 22 +++--- be/src/vec/core/sort_cursor.h | 75 +++++++++++++-------- be/src/vec/runtime/vsorted_run_merger.cpp | 15 +++-- be/src/vec/runtime/vsorted_run_merger.h | 4 +- 9 files changed, 138 insertions(+), 107 deletions(-) diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h index 957a6ca8bd3efe..92afb4849b0bf0 100644 --- a/be/src/pipeline/dependency.h +++ b/be/src/pipeline/dependency.h @@ -886,8 +886,6 @@ struct LocalMergeExchangeSharedState : public LocalExchangeSharedState { void create_dependencies(int local_exchange_id) override { sink_deps.resize(source_deps.size()); - std::vector new_deps(sink_deps.size(), nullptr); - source_deps.swap(new_deps); for (size_t i = 0; i < source_deps.size(); i++) { source_deps[i] = std::make_shared(local_exchange_id, local_exchange_id, diff --git a/be/src/vec/common/sort/partition_sorter.cpp b/be/src/vec/common/sort/partition_sorter.cpp index 1ea7c6de6a8a77..c8ba646c4ba671 100644 --- a/be/src/vec/common/sort/partition_sorter.cpp +++ b/be/src/vec/common/sort/partition_sorter.cpp @@ -58,7 +58,7 @@ Status PartitionSorter::append_block(Block* input_block) { Block sorted_block = VectorizedUtils::create_empty_columnswithtypename(_row_desc); DCHECK(input_block->columns() == sorted_block.columns()); RETURN_IF_ERROR(partial_sort(*input_block, sorted_block)); - RETURN_IF_ERROR(_state->add_sorted_block(sorted_block)); + RETURN_IF_ERROR(_state->add_sorted_block(std::move(sorted_block))); return Status::OK(); } @@ -67,46 +67,49 @@ Status PartitionSorter::prepare_for_read() { auto& blocks = _state->get_sorted_block(); auto& priority_queue = _state->get_priority_queue(); for (auto& block : blocks) { - cursors.emplace_back(block, _sort_description); + cursors.emplace_back(MergeSortCursorImpl::create_shared(block, _sort_description)); } for (auto& cursor : cursors) { - priority_queue.push(MergeSortCursor(&cursor)); + priority_queue.push(std::move(cursor)); } + cursors.clear(); return Status::OK(); } // have done sorter and get topn records, so could reset those state to init void PartitionSorter::reset_sorter_state(RuntimeState* runtime_state) { - std::priority_queue empty_queue; + std::priority_queue> empty_queue; std::swap(_block_priority_queue, empty_queue); _state = MergeSorterState::create_unique(_row_desc, _offset, _limit, runtime_state, nullptr); _previous_row->reset(); } Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos) { - if (_state->get_sorted_block().empty()) { + if (_state->get_sorted_block().empty() && _state->get_priority_queue().empty()) { + *eos = true; + } else if (_state->get_cursors().size() == 1 && _has_global_limit) { + auto& cursor = _state->get_cursors()[0]; + block->swap(*cursor->block); + block->set_num_rows(_partition_inner_limit); *eos = true; } else { - if (_state->get_sorted_block().size() == 1 && _has_global_limit) { - auto& sorted_block = _state->get_sorted_block()[0]; - block->swap(sorted_block); - block->set_num_rows(_partition_inner_limit); - *eos = true; - } else { - RETURN_IF_ERROR(partition_sort_read(block, eos, state->batch_size())); - } + RETURN_IF_ERROR(partition_sort_read(block, eos, state->batch_size())); } return Status::OK(); } Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int batch_size) { - const auto& sorted_block = _state->get_sorted_block()[0]; - size_t num_columns = sorted_block.columns(); + auto& priority_queue = _state->get_priority_queue(); + if (priority_queue.empty()) { + *eos = true; + return Status::OK(); + } + const auto& sorted_block = priority_queue.top().impl->block; + size_t num_columns = sorted_block->columns(); MutableBlock m_block = - VectorizedUtils::build_mutable_mem_reuse_block(output_block, sorted_block); + VectorizedUtils::build_mutable_mem_reuse_block(output_block, *sorted_block); MutableColumns& merged_columns = m_block.mutable_columns(); size_t current_output_rows = 0; - auto& priority_queue = _state->get_priority_queue(); bool get_enough_data = false; while (!priority_queue.empty()) { @@ -121,7 +124,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int //1 row_number no need to check distinct, just output partition_inner_limit row if ((current_output_rows + _output_total_rows) < _partition_inner_limit) { for (size_t i = 0; i < num_columns; ++i) { - merged_columns[i]->insert_from(*current->all_columns[i], current->pos); + merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos); } } else { //rows has get enough @@ -155,7 +158,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int } } for (size_t i = 0; i < num_columns; ++i) { - merged_columns[i]->insert_from(*current->all_columns[i], current->pos); + merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos); } break; } @@ -180,7 +183,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int *_previous_row = current; } for (size_t i = 0; i < num_columns; ++i) { - merged_columns[i]->insert_from(*current->all_columns[i], current->pos); + merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos); } current_output_rows++; break; diff --git a/be/src/vec/common/sort/partition_sorter.h b/be/src/vec/common/sort/partition_sorter.h index 77dcb68371131c..01e009d200db8b 100644 --- a/be/src/vec/common/sort/partition_sorter.h +++ b/be/src/vec/common/sort/partition_sorter.h @@ -50,7 +50,7 @@ struct SortCursorCmp { SortCursorCmp(const MergeSortCursor& cursor) : row(cursor->pos), impl(cursor.impl) {} void reset() { - impl = nullptr; + impl->reset(); row = 0; } bool compare_two_rows(const MergeSortCursor& rhs) const { @@ -67,7 +67,7 @@ struct SortCursorCmp { return true; } int row = 0; - MergeSortCursorImpl* impl = nullptr; + std::shared_ptr impl = nullptr; }; class PartitionSorter final : public Sorter { diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp index eca7e15626b2eb..64defc208fb5a2 100644 --- a/be/src/vec/common/sort/sorter.cpp +++ b/be/src/vec/common/sort/sorter.cpp @@ -59,34 +59,37 @@ namespace doris::vectorized { void MergeSorterState::reset() { auto empty_queue = std::priority_queue(); priority_queue_.swap(empty_queue); - std::vector empty_cursors(0); + std::vector> empty_cursors(0); cursors_.swap(empty_cursors); - std::vector empty_blocks(0); + std::vector> empty_blocks(0); sorted_blocks_.swap(empty_blocks); unsorted_block_ = Block::create_unique(unsorted_block_->clone_empty()); in_mem_sorted_bocks_size_ = 0; } -Status MergeSorterState::add_sorted_block(Block& block) { +Status MergeSorterState::add_sorted_block(Block&& block) { auto rows = block.rows(); if (0 == rows) { return Status::OK(); } in_mem_sorted_bocks_size_ += block.bytes(); - sorted_blocks_.emplace_back(std::move(block)); + sorted_blocks_.emplace_back(Block::create_shared(std::move(block))); num_rows_ += rows; return Status::OK(); } Status MergeSorterState::build_merge_tree(const SortDescription& sort_description) { for (auto& block : sorted_blocks_) { - cursors_.emplace_back(block, sort_description); + cursors_.emplace_back( + MergeSortCursorImpl::create_shared(std::move(block), sort_description)); } - if (sorted_blocks_.size() > 1) { + sorted_blocks_.clear(); + if (cursors_.size() > 1) { for (auto& cursor : cursors_) { - priority_queue_.emplace(&cursor); + priority_queue_.emplace(std::move(cursor)); } + cursors_.clear(); } return Status::OK(); @@ -94,13 +97,13 @@ Status MergeSorterState::build_merge_tree(const SortDescription& sort_descriptio Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int batch_size, bool* eos) { - if (sorted_blocks_.empty()) { + if (cursors_.empty() && priority_queue_.empty()) { *eos = true; - } else if (sorted_blocks_.size() == 1) { + } else if (cursors_.size() == 1) { if (offset_ != 0) { - sorted_blocks_[0].skip_num_rows(offset_); + cursors_[0]->block->skip_num_rows(offset_); } - block->swap(sorted_blocks_[0]); + block->swap(*cursors_[0]->block); *eos = true; } else { RETURN_IF_ERROR(_merge_sort_read_impl(batch_size, block, eos)); @@ -110,9 +113,14 @@ Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int ba Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized::Block* block, bool* eos) { - size_t num_columns = sorted_blocks_[0].columns(); + if (priority_queue_.empty()) { + *eos = true; + return Status::OK(); + } + size_t num_columns = priority_queue_.top().impl->block->columns(); - MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(block, sorted_blocks_[0]); + MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block( + block, *priority_queue_.top().impl->block); MutableColumns& merged_columns = m_block.mutable_columns(); /// Take rows from queue in right order and push to 'merged'. @@ -123,7 +131,7 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized if (offset_ == 0) { for (size_t i = 0; i < num_columns; ++i) - merged_columns[i]->insert_from(*current->all_columns[i], current->pos); + merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos); ++merged_rows; } else { offset_--; @@ -134,7 +142,9 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized priority_queue_.push(current); } - if (merged_rows == batch_size) break; + if (merged_rows == batch_size) { + break; + } } block->set_columns(std::move(merged_columns)); @@ -261,22 +271,24 @@ Status FullSorter::_do_sort() { // if one block totally greater the heap top of _block_priority_queue // we can throw the block data directly. if (_state->num_rows() < _offset + _limit) { - static_cast(_state->add_sorted_block(desc_block)); - _block_priority_queue.emplace(_pool->add( - new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description))); + static_cast(_state->add_sorted_block(std::move(desc_block))); + _block_priority_queue.emplace( + MergeSortBlockCursor::create_shared(MergeSortCursorImpl::create_shared( + _state->last_sorted_block(), _sort_description))); } else { - auto tmp_cursor_impl = - std::make_unique(desc_block, _sort_description); - MergeSortBlockCursor block_cursor(tmp_cursor_impl.get()); - if (!block_cursor.totally_greater(_block_priority_queue.top())) { - static_cast(_state->add_sorted_block(desc_block)); - _block_priority_queue.emplace(_pool->add( - new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description))); + auto tmp_cursor_impl = MergeSortCursorImpl::create_shared( + Block::create_shared(std::move(desc_block)), _sort_description); + MergeSortBlockCursor block_cursor(tmp_cursor_impl); + if (!block_cursor.totally_greater(*_block_priority_queue.top())) { + static_cast(_state->add_sorted_block(std::move(*tmp_cursor_impl->block))); + _block_priority_queue.emplace( + MergeSortBlockCursor::create_shared(MergeSortCursorImpl::create_shared( + _state->last_sorted_block(), _sort_description))); } } } else { // dispose normal sort logic - static_cast(_state->add_sorted_block(desc_block)); + static_cast(_state->add_sorted_block(std::move(desc_block))); } return Status::OK(); } diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h index 478e91c0783f1e..abad3b8b2aa7a2 100644 --- a/be/src/vec/common/sort/sorter.h +++ b/be/src/vec/common/sort/sorter.h @@ -59,7 +59,7 @@ class MergeSorterState { ~MergeSorterState() = default; - Status add_sorted_block(Block& block); + Status add_sorted_block(Block&& block); Status build_merge_tree(const SortDescription& sort_description); @@ -72,23 +72,21 @@ class MergeSorterState { uint64_t num_rows() const { return num_rows_; } - Block& last_sorted_block() { return sorted_blocks_.back(); } + std::shared_ptr last_sorted_block() { return sorted_blocks_.back(); } - std::vector& get_sorted_block() { return sorted_blocks_; } + std::vector>& get_sorted_block() { return sorted_blocks_; } std::priority_queue& get_priority_queue() { return priority_queue_; } - std::vector& get_cursors() { return cursors_; } + std::vector>& get_cursors() { return cursors_; } void reset(); std::unique_ptr unsorted_block_; private: - int _calc_spill_blocks_to_merge() const; - Status _merge_sort_read_impl(int batch_size, doris::vectorized::Block* block, bool* eos); std::priority_queue priority_queue_; - std::vector cursors_; - std::vector sorted_blocks_; + std::vector> cursors_; + std::vector> sorted_blocks_; size_t in_mem_sorted_bocks_size_ = 0; uint64_t num_rows_ = 0; @@ -153,7 +151,7 @@ class Sorter { RuntimeProfile::Counter* _partial_sort_timer = nullptr; RuntimeProfile::Counter* _merge_block_timer = nullptr; - std::priority_queue _block_priority_queue; + std::priority_queue> _block_priority_queue; bool _materialize_sort_exprs; }; diff --git a/be/src/vec/common/sort/topn_sorter.cpp b/be/src/vec/common/sort/topn_sorter.cpp index 58c3cd2dd0cfad..94367ae242687d 100644 --- a/be/src/vec/common/sort/topn_sorter.cpp +++ b/be/src/vec/common/sort/topn_sorter.cpp @@ -72,17 +72,19 @@ Status TopNSorter::_do_sort(Block* block) { // if one block totally greater the heap top of _block_priority_queue // we can throw the block data directly. if (_state->num_rows() < _offset + _limit) { - RETURN_IF_ERROR(_state->add_sorted_block(sorted_block)); - _block_priority_queue.emplace(_pool->add( - new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description))); + RETURN_IF_ERROR(_state->add_sorted_block(std::move(sorted_block))); + _block_priority_queue.emplace( + MergeSortBlockCursor::create_shared(MergeSortCursorImpl::create_shared( + _state->last_sorted_block(), _sort_description))); } else { - auto tmp_cursor_impl = - std::make_unique(sorted_block, _sort_description); - MergeSortBlockCursor block_cursor(tmp_cursor_impl.get()); - if (!block_cursor.totally_greater(_block_priority_queue.top())) { - RETURN_IF_ERROR(_state->add_sorted_block(sorted_block)); - _block_priority_queue.emplace(_pool->add( - new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description))); + auto tmp_cursor_impl = MergeSortCursorImpl::create_shared( + Block::create_shared(std::move(sorted_block)), _sort_description); + MergeSortBlockCursor block_cursor(tmp_cursor_impl); + if (!block_cursor.totally_greater(*_block_priority_queue.top())) { + RETURN_IF_ERROR(_state->add_sorted_block(std::move(*block_cursor.impl->block))); + _block_priority_queue.emplace(MergeSortBlockCursor::create_shared( + MergeSortCursorImpl::create_shared(MergeSortCursorImpl( + _state->last_sorted_block(), _sort_description)))); } } } else { diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h index 7e703e5982d76b..1838727f23a4b7 100644 --- a/be/src/vec/core/sort_cursor.h +++ b/be/src/vec/core/sort_cursor.h @@ -120,7 +120,8 @@ struct HeapSortCursorImpl { * It is used in priority queue. */ struct MergeSortCursorImpl { - ColumnRawPtrs all_columns; + ENABLE_FACTORY_CREATOR(MergeSortCursorImpl); + std::shared_ptr block; ColumnRawPtrs sort_columns; SortDescription desc; size_t sort_columns_size = 0; @@ -130,9 +131,9 @@ struct MergeSortCursorImpl { MergeSortCursorImpl() = default; virtual ~MergeSortCursorImpl() = default; - MergeSortCursorImpl(Block& block, const SortDescription& desc_) - : desc(desc_), sort_columns_size(desc.size()) { - reset(block); + MergeSortCursorImpl(std::shared_ptr block_, const SortDescription& desc_) + : block(block_), desc(desc_), sort_columns_size(desc.size()) { + reset(); } MergeSortCursorImpl(const SortDescription& desc_) @@ -140,27 +141,38 @@ struct MergeSortCursorImpl { bool empty() const { return rows == 0; } /// Set the cursor to the beginning of the new block. - void reset(Block& block) { - all_columns.clear(); + void reset() { + block->clear(); sort_columns.clear(); - auto columns = block.get_columns_and_convert(); - size_t num_columns = columns.size(); - - for (size_t j = 0; j < num_columns; ++j) { - all_columns.push_back(columns[j].get()); + auto columns = block->get_columns_and_convert(); + for (size_t j = 0, size = desc.size(); j < size; ++j) { + auto& column_desc = desc[j]; + size_t column_number = !column_desc.column_name.empty() + ? block->get_position_by_name(column_desc.column_name) + : column_desc.column_number; + sort_columns.push_back(columns[column_number].get()); } + pos = 0; + rows = block->rows(); + } + + void reset(std::shared_ptr block_) { + block = block_; + sort_columns.clear(); + + auto columns = block->get_columns_and_convert(); for (size_t j = 0, size = desc.size(); j < size; ++j) { auto& column_desc = desc[j]; size_t column_number = !column_desc.column_name.empty() - ? block.get_position_by_name(column_desc.column_name) + ? block->get_position_by_name(column_desc.column_name) : column_desc.column_number; sort_columns.push_back(columns[column_number].get()); } pos = 0; - rows = all_columns[0]->size(); + rows = block->rows(); } bool is_first() const { return pos == 0; } @@ -174,11 +186,13 @@ struct MergeSortCursorImpl { using BlockSupplier = std::function; struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl { + ENABLE_FACTORY_CREATOR(BlockSupplierSortCursorImpl); BlockSupplierSortCursorImpl(const BlockSupplier& block_supplier, const VExprContextSPtrs& ordering_expr, const std::vector& is_asc_order, const std::vector& nulls_first) : _ordering_expr(ordering_expr), _block_supplier(block_supplier) { + _block = Block::create_shared(); sort_columns_size = ordering_expr.size(); desc.resize(ordering_expr.size()); @@ -191,6 +205,7 @@ struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl { BlockSupplierSortCursorImpl(const BlockSupplier& block_supplier, const SortDescription& desc_) : MergeSortCursorImpl(desc_), _block_supplier(block_supplier) { + _block = Block::create_shared(); _is_eof = !has_next_block(); } @@ -198,18 +213,18 @@ struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl { if (_is_eof) { return false; } - _block.clear(); + _block->clear(); Status status; do { - status = _block_supplier(&_block, &_is_eof); - } while (_block.empty() && !_is_eof && status.ok()); + status = _block_supplier(_block.get(), &_is_eof); + } while (_block->empty() && !_is_eof && status.ok()); // If status not ok, upper callers could not detect whether it is eof or error. // So that fatal here, and should throw exception in the future. - if (status.ok() && !_block.empty()) { + if (status.ok() && !_block->empty()) { if (_ordering_expr.size() > 0) { for (int i = 0; status.ok() && i < desc.size(); ++i) { // TODO yiguolei: throw exception if status not ok in the future - status = _ordering_expr[i]->execute(&_block, &desc[i].column_number); + status = _ordering_expr[i]->execute(_block.get(), &desc[i].column_number); } } MergeSortCursorImpl::reset(_block); @@ -224,32 +239,33 @@ struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl { if (_is_eof) { return nullptr; } - return &_block; + return _block.get(); } - size_t columns_num() const { return all_columns.size(); } + size_t columns_num() const { return block->columns(); } Block create_empty_blocks() const { size_t num_columns = columns_num(); MutableColumns columns(num_columns); for (size_t i = 0; i < num_columns; ++i) { - columns[i] = all_columns[i]->clone_empty(); + columns[i] = block->get_columns()[i]->clone_empty(); } - return _block.clone_with_columns(std::move(columns)); + return _block->clone_with_columns(std::move(columns)); } VExprContextSPtrs _ordering_expr; - Block _block; + std::shared_ptr _block; BlockSupplier _block_supplier {}; bool _is_eof = false; }; /// For easy copying. struct MergeSortCursor { - MergeSortCursorImpl* impl; + ENABLE_FACTORY_CREATOR(MergeSortCursor); + std::shared_ptr impl; - MergeSortCursor(MergeSortCursorImpl* impl_) : impl(impl_) {} - MergeSortCursorImpl* operator->() const { return impl; } + MergeSortCursor(std::shared_ptr impl_) : impl(impl_) {} + MergeSortCursorImpl* operator->() const { return impl.get(); } /// The specified row of this cursor is greater than the specified row of another cursor. int8_t greater_at(const MergeSortCursor& rhs, size_t lhs_pos, size_t rhs_pos) const { @@ -289,10 +305,11 @@ struct MergeSortCursor { /// For easy copying. struct MergeSortBlockCursor { - MergeSortCursorImpl* impl = nullptr; + ENABLE_FACTORY_CREATOR(MergeSortBlockCursor); + std::shared_ptr impl = nullptr; - MergeSortBlockCursor(MergeSortCursorImpl* impl_) : impl(impl_) {} - MergeSortCursorImpl* operator->() const { return impl; } + MergeSortBlockCursor(std::shared_ptr impl_) : impl(impl_) {} + MergeSortCursorImpl* operator->() const { return impl.get(); } /// The specified row of this cursor is greater than the specified row of another cursor. int8_t less_at(const MergeSortBlockCursor& rhs, int rows) const { diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp b/be/src/vec/runtime/vsorted_run_merger.cpp index ef054190a3b45a..d736c7f954fa19 100644 --- a/be/src/vec/runtime/vsorted_run_merger.cpp +++ b/be/src/vec/runtime/vsorted_run_merger.cpp @@ -72,9 +72,10 @@ Status VSortedRunMerger::prepare(const vector& input_runs) { try { for (const auto& supplier : input_runs) { if (_use_sort_desc) { - _cursors.emplace_back(supplier, _desc); + _cursors.emplace_back(BlockSupplierSortCursorImpl::create_shared(supplier, _desc)); } else { - _cursors.emplace_back(supplier, _ordering_expr, _is_asc_order, _nulls_first); + _cursors.emplace_back(BlockSupplierSortCursorImpl::create_shared( + supplier, _ordering_expr, _is_asc_order, _nulls_first)); } } } catch (const std::exception& e) { @@ -82,14 +83,14 @@ Status VSortedRunMerger::prepare(const vector& input_runs) { } for (auto& _cursor : _cursors) { - if (!_cursor._is_eof) { - _priority_queue.push(MergeSortCursor(&_cursor)); + if (!_cursor->_is_eof) { + _priority_queue.push(MergeSortCursor(_cursor)); } } for (const auto& cursor : _cursors) { - if (!cursor._is_eof) { - _empty_block = cursor.create_empty_blocks(); + if (!cursor->_is_eof) { + _empty_block = cursor->create_empty_blocks(); break; } } @@ -139,7 +140,7 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { } } else { if (current->block_ptr() != nullptr) { - for (int i = 0; i < current->all_columns.size(); i++) { + for (int i = 0; i < current->block->columns(); i++) { auto& column_with_type = current->block_ptr()->get_by_position(i); column_with_type.column = column_with_type.column->cut( current->pos, current->rows - current->pos); diff --git a/be/src/vec/runtime/vsorted_run_merger.h b/be/src/vec/runtime/vsorted_run_merger.h index 8dd706cad16f72..56fb4c026cb9dd 100644 --- a/be/src/vec/runtime/vsorted_run_merger.h +++ b/be/src/vec/runtime/vsorted_run_merger.h @@ -74,12 +74,12 @@ class VSortedRunMerger { int64_t _limit = -1; size_t _offset = 0; - std::vector _cursors; + std::vector> _cursors; std::priority_queue _priority_queue; /// In pipeline engine, if a cursor needs to read one more block from supplier, /// we make it as a pending cursor until the supplier is readable. - MergeSortCursorImpl* _pending_cursor = nullptr; + std::shared_ptr _pending_cursor = nullptr; Block _empty_block; From 0d45355a53c8873206c08aa8bd6ec2d57912ea4b Mon Sep 17 00:00:00 2001 From: Gabriel Date: Tue, 13 Aug 2024 18:47:12 +0800 Subject: [PATCH 2/9] update --- be/src/vec/core/sort_cursor.h | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h index 1838727f23a4b7..56ca4375e5fbd9 100644 --- a/be/src/vec/core/sort_cursor.h +++ b/be/src/vec/core/sort_cursor.h @@ -137,12 +137,11 @@ struct MergeSortCursorImpl { } MergeSortCursorImpl(const SortDescription& desc_) - : desc(desc_), sort_columns_size(desc.size()) {} + : block(Block::create_shared()), desc(desc_), sort_columns_size(desc.size()) {} bool empty() const { return rows == 0; } /// Set the cursor to the beginning of the new block. void reset() { - block->clear(); sort_columns.clear(); auto columns = block->get_columns_and_convert(); From 4ecf0596dd89f35b98cef1c13eb8568337d1fbc2 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 14 Aug 2024 15:48:46 +0800 Subject: [PATCH 3/9] update --- be/src/vec/common/sort/partition_sorter.cpp | 15 +++++---------- be/src/vec/common/sort/sorter.cpp | 18 +++++------------- be/src/vec/common/sort/sorter.h | 2 -- 3 files changed, 10 insertions(+), 25 deletions(-) diff --git a/be/src/vec/common/sort/partition_sorter.cpp b/be/src/vec/common/sort/partition_sorter.cpp index c8ba646c4ba671..903e6f68555b75 100644 --- a/be/src/vec/common/sort/partition_sorter.cpp +++ b/be/src/vec/common/sort/partition_sorter.cpp @@ -63,16 +63,12 @@ Status PartitionSorter::append_block(Block* input_block) { } Status PartitionSorter::prepare_for_read() { - auto& cursors = _state->get_cursors(); auto& blocks = _state->get_sorted_block(); auto& priority_queue = _state->get_priority_queue(); for (auto& block : blocks) { - cursors.emplace_back(MergeSortCursorImpl::create_shared(block, _sort_description)); + priority_queue.push(MergeSortCursorImpl::create_shared(block, _sort_description)); } - for (auto& cursor : cursors) { - priority_queue.push(std::move(cursor)); - } - cursors.clear(); + blocks.clear(); return Status::OK(); } @@ -85,11 +81,10 @@ void PartitionSorter::reset_sorter_state(RuntimeState* runtime_state) { } Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos) { - if (_state->get_sorted_block().empty() && _state->get_priority_queue().empty()) { + if (_state->get_priority_queue().empty()) { *eos = true; - } else if (_state->get_cursors().size() == 1 && _has_global_limit) { - auto& cursor = _state->get_cursors()[0]; - block->swap(*cursor->block); + } else if (_state->get_priority_queue().size() == 1 && _has_global_limit) { + block->swap(*_state->get_priority_queue().top().impl->block); block->set_num_rows(_partition_inner_limit); *eos = true; } else { diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp index 64defc208fb5a2..57ef61777dda0c 100644 --- a/be/src/vec/common/sort/sorter.cpp +++ b/be/src/vec/common/sort/sorter.cpp @@ -60,7 +60,6 @@ void MergeSorterState::reset() { auto empty_queue = std::priority_queue(); priority_queue_.swap(empty_queue); std::vector> empty_cursors(0); - cursors_.swap(empty_cursors); std::vector> empty_blocks(0); sorted_blocks_.swap(empty_blocks); unsorted_block_ = Block::create_unique(unsorted_block_->clone_empty()); @@ -80,30 +79,23 @@ Status MergeSorterState::add_sorted_block(Block&& block) { Status MergeSorterState::build_merge_tree(const SortDescription& sort_description) { for (auto& block : sorted_blocks_) { - cursors_.emplace_back( + priority_queue_.emplace( MergeSortCursorImpl::create_shared(std::move(block), sort_description)); } sorted_blocks_.clear(); - if (cursors_.size() > 1) { - for (auto& cursor : cursors_) { - priority_queue_.emplace(std::move(cursor)); - } - cursors_.clear(); - } - return Status::OK(); } Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int batch_size, bool* eos) { - if (cursors_.empty() && priority_queue_.empty()) { + if (priority_queue_.empty()) { *eos = true; - } else if (cursors_.size() == 1) { + } else if (priority_queue_.size() == 1) { if (offset_ != 0) { - cursors_[0]->block->skip_num_rows(offset_); + priority_queue_.top().impl->block->skip_num_rows(offset_); } - block->swap(*cursors_[0]->block); + block->swap(*priority_queue_.top().impl->block); *eos = true; } else { RETURN_IF_ERROR(_merge_sort_read_impl(batch_size, block, eos)); diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h index abad3b8b2aa7a2..1cd1be743d43e2 100644 --- a/be/src/vec/common/sort/sorter.h +++ b/be/src/vec/common/sort/sorter.h @@ -76,7 +76,6 @@ class MergeSorterState { std::vector>& get_sorted_block() { return sorted_blocks_; } std::priority_queue& get_priority_queue() { return priority_queue_; } - std::vector>& get_cursors() { return cursors_; } void reset(); std::unique_ptr unsorted_block_; @@ -85,7 +84,6 @@ class MergeSorterState { Status _merge_sort_read_impl(int batch_size, doris::vectorized::Block* block, bool* eos); std::priority_queue priority_queue_; - std::vector> cursors_; std::vector> sorted_blocks_; size_t in_mem_sorted_bocks_size_ = 0; uint64_t num_rows_ = 0; From 214f478e9031121dee818a8aac94629e5c5cda00 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 14 Aug 2024 19:58:04 +0800 Subject: [PATCH 4/9] update --- be/src/vec/common/sort/sorter.cpp | 2 ++ be/src/vec/runtime/vsorted_run_merger.cpp | 10 +--------- be/src/vec/runtime/vsorted_run_merger.h | 5 +---- 3 files changed, 4 insertions(+), 13 deletions(-) diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp index 57ef61777dda0c..ab3650b64d41d2 100644 --- a/be/src/vec/common/sort/sorter.cpp +++ b/be/src/vec/common/sort/sorter.cpp @@ -89,6 +89,8 @@ Status MergeSorterState::build_merge_tree(const SortDescription& sort_descriptio Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int batch_size, bool* eos) { + DCHECK(sorted_blocks_.empty()); + DCHECK(unsorted_block_->empty()); if (priority_queue_.empty()) { *eos = true; } else if (priority_queue_.size() == 1) { diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp b/be/src/vec/runtime/vsorted_run_merger.cpp index d736c7f954fa19..eca499f4e7c8d5 100644 --- a/be/src/vec/runtime/vsorted_run_merger.cpp +++ b/be/src/vec/runtime/vsorted_run_merger.cpp @@ -28,14 +28,6 @@ #include "vec/core/column_with_type_and_name.h" #include "vec/utils/util.hpp" -namespace doris { -namespace vectorized { -class VExprContext; -} // namespace vectorized -} // namespace doris - -using std::vector; - namespace doris::vectorized { VSortedRunMerger::VSortedRunMerger(const VExprContextSPtrs& ordering_expr, @@ -68,7 +60,7 @@ void VSortedRunMerger::init_timers(RuntimeProfile* profile) { _get_next_block_timer = ADD_TIMER(profile, "MergeGetNextBlock"); } -Status VSortedRunMerger::prepare(const vector& input_runs) { +Status VSortedRunMerger::prepare(const std::vector& input_runs) { try { for (const auto& supplier : input_runs) { if (_use_sort_desc) { diff --git a/be/src/vec/runtime/vsorted_run_merger.h b/be/src/vec/runtime/vsorted_run_merger.h index 56fb4c026cb9dd..d1b292ac9ef29b 100644 --- a/be/src/vec/runtime/vsorted_run_merger.h +++ b/be/src/vec/runtime/vsorted_run_merger.h @@ -30,9 +30,7 @@ #include "vec/core/sort_description.h" #include "vec/exprs/vexpr_fwd.h" -namespace doris { - -namespace vectorized { +namespace doris::vectorized { // VSortedRunMerger is used to merge multiple sorted runs of blocks. A run is a sorted // sequence of blocks, which are fetched from a BlockSupplier function object. @@ -101,5 +99,4 @@ class VSortedRunMerger { bool has_next_block(MergeSortCursor& current); }; -} // namespace vectorized } // namespace doris From 21602d52245886e0bd34226733dc5e3f300d463d Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 14 Aug 2024 23:01:33 +0800 Subject: [PATCH 5/9] update --- be/src/vec/common/sort/partition_sorter.cpp | 4 +-- be/src/vec/common/sort/sorter.cpp | 27 +++++++++---------- be/src/vec/common/sort/sorter.h | 4 +-- be/src/vec/common/sort/topn_sorter.cpp | 16 +++++------- be/src/vec/core/sort_cursor.h | 29 ++++++--------------- be/src/vec/runtime/vsorted_run_merger.cpp | 13 +++------ be/src/vec/runtime/vsorted_run_merger.h | 4 +-- 7 files changed, 36 insertions(+), 61 deletions(-) diff --git a/be/src/vec/common/sort/partition_sorter.cpp b/be/src/vec/common/sort/partition_sorter.cpp index 903e6f68555b75..193e8f3b0194fc 100644 --- a/be/src/vec/common/sort/partition_sorter.cpp +++ b/be/src/vec/common/sort/partition_sorter.cpp @@ -58,7 +58,7 @@ Status PartitionSorter::append_block(Block* input_block) { Block sorted_block = VectorizedUtils::create_empty_columnswithtypename(_row_desc); DCHECK(input_block->columns() == sorted_block.columns()); RETURN_IF_ERROR(partial_sort(*input_block, sorted_block)); - RETURN_IF_ERROR(_state->add_sorted_block(std::move(sorted_block))); + RETURN_IF_ERROR(_state->add_sorted_block(Block::create_shared(std::move(sorted_block)))); return Status::OK(); } @@ -74,7 +74,7 @@ Status PartitionSorter::prepare_for_read() { // have done sorter and get topn records, so could reset those state to init void PartitionSorter::reset_sorter_state(RuntimeState* runtime_state) { - std::priority_queue> empty_queue; + std::priority_queue empty_queue; std::swap(_block_priority_queue, empty_queue); _state = MergeSorterState::create_unique(_row_desc, _offset, _limit, runtime_state, nullptr); _previous_row->reset(); diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp index ab3650b64d41d2..4f793bbe1bea9b 100644 --- a/be/src/vec/common/sort/sorter.cpp +++ b/be/src/vec/common/sort/sorter.cpp @@ -66,13 +66,13 @@ void MergeSorterState::reset() { in_mem_sorted_bocks_size_ = 0; } -Status MergeSorterState::add_sorted_block(Block&& block) { - auto rows = block.rows(); +Status MergeSorterState::add_sorted_block(std::shared_ptr block) { + auto rows = block->rows(); if (0 == rows) { return Status::OK(); } - in_mem_sorted_bocks_size_ += block.bytes(); - sorted_blocks_.emplace_back(Block::create_shared(std::move(block))); + in_mem_sorted_bocks_size_ += block->bytes(); + sorted_blocks_.emplace_back(block); num_rows_ += rows; return Status::OK(); } @@ -265,24 +265,23 @@ Status FullSorter::_do_sort() { // if one block totally greater the heap top of _block_priority_queue // we can throw the block data directly. if (_state->num_rows() < _offset + _limit) { - static_cast(_state->add_sorted_block(std::move(desc_block))); - _block_priority_queue.emplace( - MergeSortBlockCursor::create_shared(MergeSortCursorImpl::create_shared( - _state->last_sorted_block(), _sort_description))); + static_cast( + _state->add_sorted_block(Block::create_shared(std::move(desc_block)))); + _block_priority_queue.emplace(MergeSortCursorImpl::create_shared( + _state->last_sorted_block(), _sort_description)); } else { auto tmp_cursor_impl = MergeSortCursorImpl::create_shared( Block::create_shared(std::move(desc_block)), _sort_description); MergeSortBlockCursor block_cursor(tmp_cursor_impl); - if (!block_cursor.totally_greater(*_block_priority_queue.top())) { - static_cast(_state->add_sorted_block(std::move(*tmp_cursor_impl->block))); - _block_priority_queue.emplace( - MergeSortBlockCursor::create_shared(MergeSortCursorImpl::create_shared( - _state->last_sorted_block(), _sort_description))); + if (!block_cursor.totally_greater(_block_priority_queue.top())) { + static_cast(_state->add_sorted_block(tmp_cursor_impl->block)); + _block_priority_queue.emplace(MergeSortCursorImpl::create_shared( + _state->last_sorted_block(), _sort_description)); } } } else { // dispose normal sort logic - static_cast(_state->add_sorted_block(std::move(desc_block))); + static_cast(_state->add_sorted_block(Block::create_shared(std::move(desc_block)))); } return Status::OK(); } diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h index 1cd1be743d43e2..1350760275eeee 100644 --- a/be/src/vec/common/sort/sorter.h +++ b/be/src/vec/common/sort/sorter.h @@ -59,7 +59,7 @@ class MergeSorterState { ~MergeSorterState() = default; - Status add_sorted_block(Block&& block); + Status add_sorted_block(std::shared_ptr block); Status build_merge_tree(const SortDescription& sort_description); @@ -149,7 +149,7 @@ class Sorter { RuntimeProfile::Counter* _partial_sort_timer = nullptr; RuntimeProfile::Counter* _merge_block_timer = nullptr; - std::priority_queue> _block_priority_queue; + std::priority_queue _block_priority_queue; bool _materialize_sort_exprs; }; diff --git a/be/src/vec/common/sort/topn_sorter.cpp b/be/src/vec/common/sort/topn_sorter.cpp index 94367ae242687d..9cd8b8ccea4efa 100644 --- a/be/src/vec/common/sort/topn_sorter.cpp +++ b/be/src/vec/common/sort/topn_sorter.cpp @@ -72,19 +72,17 @@ Status TopNSorter::_do_sort(Block* block) { // if one block totally greater the heap top of _block_priority_queue // we can throw the block data directly. if (_state->num_rows() < _offset + _limit) { - RETURN_IF_ERROR(_state->add_sorted_block(std::move(sorted_block))); - _block_priority_queue.emplace( - MergeSortBlockCursor::create_shared(MergeSortCursorImpl::create_shared( - _state->last_sorted_block(), _sort_description))); + RETURN_IF_ERROR( + _state->add_sorted_block(Block::create_shared(std::move(sorted_block)))); + _block_priority_queue.emplace(MergeSortCursorImpl::create_shared( + _state->last_sorted_block(), _sort_description)); } else { auto tmp_cursor_impl = MergeSortCursorImpl::create_shared( Block::create_shared(std::move(sorted_block)), _sort_description); MergeSortBlockCursor block_cursor(tmp_cursor_impl); - if (!block_cursor.totally_greater(*_block_priority_queue.top())) { - RETURN_IF_ERROR(_state->add_sorted_block(std::move(*block_cursor.impl->block))); - _block_priority_queue.emplace(MergeSortBlockCursor::create_shared( - MergeSortCursorImpl::create_shared(MergeSortCursorImpl( - _state->last_sorted_block(), _sort_description)))); + if (!block_cursor.totally_greater(_block_priority_queue.top())) { + RETURN_IF_ERROR(_state->add_sorted_block(block_cursor.impl->block)); + _block_priority_queue.emplace(tmp_cursor_impl); } } } else { diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h index 56ca4375e5fbd9..67bf93e0e52735 100644 --- a/be/src/vec/core/sort_cursor.h +++ b/be/src/vec/core/sort_cursor.h @@ -191,7 +191,7 @@ struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl { const std::vector& is_asc_order, const std::vector& nulls_first) : _ordering_expr(ordering_expr), _block_supplier(block_supplier) { - _block = Block::create_shared(); + block = Block::create_shared(); sort_columns_size = ordering_expr.size(); desc.resize(ordering_expr.size()); @@ -204,7 +204,6 @@ struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl { BlockSupplierSortCursorImpl(const BlockSupplier& block_supplier, const SortDescription& desc_) : MergeSortCursorImpl(desc_), _block_supplier(block_supplier) { - _block = Block::create_shared(); _is_eof = !has_next_block(); } @@ -212,21 +211,21 @@ struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl { if (_is_eof) { return false; } - _block->clear(); + block->clear(); Status status; do { - status = _block_supplier(_block.get(), &_is_eof); - } while (_block->empty() && !_is_eof && status.ok()); + status = _block_supplier(block.get(), &_is_eof); + } while (block->empty() && !_is_eof && status.ok()); // If status not ok, upper callers could not detect whether it is eof or error. // So that fatal here, and should throw exception in the future. - if (status.ok() && !_block->empty()) { + if (status.ok() && !block->empty()) { if (_ordering_expr.size() > 0) { for (int i = 0; status.ok() && i < desc.size(); ++i) { // TODO yiguolei: throw exception if status not ok in the future - status = _ordering_expr[i]->execute(_block.get(), &desc[i].column_number); + status = _ordering_expr[i]->execute(block.get(), &desc[i].column_number); } } - MergeSortCursorImpl::reset(_block); + MergeSortCursorImpl::reset(); return status.ok(); } else if (!status.ok()) { throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, status.msg()); @@ -238,22 +237,10 @@ struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl { if (_is_eof) { return nullptr; } - return _block.get(); - } - - size_t columns_num() const { return block->columns(); } - - Block create_empty_blocks() const { - size_t num_columns = columns_num(); - MutableColumns columns(num_columns); - for (size_t i = 0; i < num_columns; ++i) { - columns[i] = block->get_columns()[i]->clone_empty(); - } - return _block->clone_with_columns(std::move(columns)); + return block.get(); } VExprContextSPtrs _ordering_expr; - std::shared_ptr _block; BlockSupplier _block_supplier {}; bool _is_eof = false; }; diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp b/be/src/vec/runtime/vsorted_run_merger.cpp index eca499f4e7c8d5..e2b2e9e25537f2 100644 --- a/be/src/vec/runtime/vsorted_run_merger.cpp +++ b/be/src/vec/runtime/vsorted_run_merger.cpp @@ -80,13 +80,6 @@ Status VSortedRunMerger::prepare(const std::vector& input_runs) { } } - for (const auto& cursor : _cursors) { - if (!cursor->_is_eof) { - _empty_block = cursor->create_empty_blocks(); - break; - } - } - return Status::OK(); } @@ -146,9 +139,9 @@ Status VSortedRunMerger::get_next(Block* output_block, bool* eos) { } } } else { - size_t num_columns = _empty_block.columns(); - MutableBlock m_block = - VectorizedUtils::build_mutable_mem_reuse_block(output_block, _empty_block); + size_t num_columns = _priority_queue.top().impl->block->columns(); + MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block( + output_block, *_priority_queue.top().impl->block); MutableColumns& merged_columns = m_block.mutable_columns(); if (num_columns != merged_columns.size()) { diff --git a/be/src/vec/runtime/vsorted_run_merger.h b/be/src/vec/runtime/vsorted_run_merger.h index d1b292ac9ef29b..f01e6794e6e9f5 100644 --- a/be/src/vec/runtime/vsorted_run_merger.h +++ b/be/src/vec/runtime/vsorted_run_merger.h @@ -79,8 +79,6 @@ class VSortedRunMerger { /// we make it as a pending cursor until the supplier is readable. std::shared_ptr _pending_cursor = nullptr; - Block _empty_block; - // Times calls to get_next(). RuntimeProfile::Counter* _get_next_timer = nullptr; @@ -99,4 +97,4 @@ class VSortedRunMerger { bool has_next_block(MergeSortCursor& current); }; -} // namespace doris +} // namespace doris::vectorized From 4839df8e2bb9e4da5fef9b579f7e6fc0b58d9c43 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 14 Aug 2024 23:39:34 +0800 Subject: [PATCH 6/9] update --- cloud/src/recycler/hdfs_accessor.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cloud/src/recycler/hdfs_accessor.h b/cloud/src/recycler/hdfs_accessor.h index bfbe02aadc9029..2cb6692743dc62 100644 --- a/cloud/src/recycler/hdfs_accessor.h +++ b/cloud/src/recycler/hdfs_accessor.h @@ -23,6 +23,8 @@ #include // IWYU pragma: export #endif +#include + #include "recycler/storage_vault_accessor.h" namespace doris::cloud { From 42018665a56a7e45e7b19e109a99fd051cabb249 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 15 Aug 2024 09:01:31 +0800 Subject: [PATCH 7/9] update --- be/src/vec/core/sort_cursor.h | 17 ----------------- cloud/src/recycler/hdfs_accessor.h | 2 -- 2 files changed, 19 deletions(-) diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h index 67bf93e0e52735..d31767f46e461f 100644 --- a/be/src/vec/core/sort_cursor.h +++ b/be/src/vec/core/sort_cursor.h @@ -157,23 +157,6 @@ struct MergeSortCursorImpl { rows = block->rows(); } - void reset(std::shared_ptr block_) { - block = block_; - sort_columns.clear(); - - auto columns = block->get_columns_and_convert(); - for (size_t j = 0, size = desc.size(); j < size; ++j) { - auto& column_desc = desc[j]; - size_t column_number = !column_desc.column_name.empty() - ? block->get_position_by_name(column_desc.column_name) - : column_desc.column_number; - sort_columns.push_back(columns[column_number].get()); - } - - pos = 0; - rows = block->rows(); - } - bool is_first() const { return pos == 0; } bool is_last() const { return pos + 1 >= rows; } void next() { ++pos; } diff --git a/cloud/src/recycler/hdfs_accessor.h b/cloud/src/recycler/hdfs_accessor.h index 2cb6692743dc62..fe68f8cc5d09b4 100644 --- a/cloud/src/recycler/hdfs_accessor.h +++ b/cloud/src/recycler/hdfs_accessor.h @@ -25,8 +25,6 @@ #include -#include "recycler/storage_vault_accessor.h" - namespace doris::cloud { class HdfsVaultInfo; From e325d681221231f837fea29f71fd0b35fca84b86 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 15 Aug 2024 11:14:26 +0800 Subject: [PATCH 8/9] update --- cloud/src/recycler/hdfs_accessor.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cloud/src/recycler/hdfs_accessor.h b/cloud/src/recycler/hdfs_accessor.h index fe68f8cc5d09b4..bfbe02aadc9029 100644 --- a/cloud/src/recycler/hdfs_accessor.h +++ b/cloud/src/recycler/hdfs_accessor.h @@ -23,7 +23,7 @@ #include // IWYU pragma: export #endif -#include +#include "recycler/storage_vault_accessor.h" namespace doris::cloud { From 68508a60079a7aa5df17b3b3ea88b87956180ef8 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 16 Aug 2024 22:59:54 +0800 Subject: [PATCH 9/9] update --- be/src/vec/common/sort/partition_sorter.cpp | 2 +- be/src/vec/common/sort/sorter.cpp | 12 +++++------- be/src/vec/common/sort/sorter.h | 2 +- be/src/vec/common/sort/topn_sorter.cpp | 5 ++--- 4 files changed, 9 insertions(+), 12 deletions(-) diff --git a/be/src/vec/common/sort/partition_sorter.cpp b/be/src/vec/common/sort/partition_sorter.cpp index 193e8f3b0194fc..c363a41d1c772e 100644 --- a/be/src/vec/common/sort/partition_sorter.cpp +++ b/be/src/vec/common/sort/partition_sorter.cpp @@ -58,7 +58,7 @@ Status PartitionSorter::append_block(Block* input_block) { Block sorted_block = VectorizedUtils::create_empty_columnswithtypename(_row_desc); DCHECK(input_block->columns() == sorted_block.columns()); RETURN_IF_ERROR(partial_sort(*input_block, sorted_block)); - RETURN_IF_ERROR(_state->add_sorted_block(Block::create_shared(std::move(sorted_block)))); + _state->add_sorted_block(Block::create_shared(std::move(sorted_block))); return Status::OK(); } diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp index 4f793bbe1bea9b..89f1c7d73f1c58 100644 --- a/be/src/vec/common/sort/sorter.cpp +++ b/be/src/vec/common/sort/sorter.cpp @@ -66,15 +66,14 @@ void MergeSorterState::reset() { in_mem_sorted_bocks_size_ = 0; } -Status MergeSorterState::add_sorted_block(std::shared_ptr block) { +void MergeSorterState::add_sorted_block(std::shared_ptr block) { auto rows = block->rows(); if (0 == rows) { - return Status::OK(); + return; } in_mem_sorted_bocks_size_ += block->bytes(); sorted_blocks_.emplace_back(block); num_rows_ += rows; - return Status::OK(); } Status MergeSorterState::build_merge_tree(const SortDescription& sort_description) { @@ -265,8 +264,7 @@ Status FullSorter::_do_sort() { // if one block totally greater the heap top of _block_priority_queue // we can throw the block data directly. if (_state->num_rows() < _offset + _limit) { - static_cast( - _state->add_sorted_block(Block::create_shared(std::move(desc_block)))); + _state->add_sorted_block(Block::create_shared(std::move(desc_block))); _block_priority_queue.emplace(MergeSortCursorImpl::create_shared( _state->last_sorted_block(), _sort_description)); } else { @@ -274,14 +272,14 @@ Status FullSorter::_do_sort() { Block::create_shared(std::move(desc_block)), _sort_description); MergeSortBlockCursor block_cursor(tmp_cursor_impl); if (!block_cursor.totally_greater(_block_priority_queue.top())) { - static_cast(_state->add_sorted_block(tmp_cursor_impl->block)); + _state->add_sorted_block(tmp_cursor_impl->block); _block_priority_queue.emplace(MergeSortCursorImpl::create_shared( _state->last_sorted_block(), _sort_description)); } } } else { // dispose normal sort logic - static_cast(_state->add_sorted_block(Block::create_shared(std::move(desc_block)))); + _state->add_sorted_block(Block::create_shared(std::move(desc_block))); } return Status::OK(); } diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h index 1350760275eeee..aa7d88dfbc2a3a 100644 --- a/be/src/vec/common/sort/sorter.h +++ b/be/src/vec/common/sort/sorter.h @@ -59,7 +59,7 @@ class MergeSorterState { ~MergeSorterState() = default; - Status add_sorted_block(std::shared_ptr block); + void add_sorted_block(std::shared_ptr block); Status build_merge_tree(const SortDescription& sort_description); diff --git a/be/src/vec/common/sort/topn_sorter.cpp b/be/src/vec/common/sort/topn_sorter.cpp index 9cd8b8ccea4efa..1f24fb14c950a9 100644 --- a/be/src/vec/common/sort/topn_sorter.cpp +++ b/be/src/vec/common/sort/topn_sorter.cpp @@ -72,8 +72,7 @@ Status TopNSorter::_do_sort(Block* block) { // if one block totally greater the heap top of _block_priority_queue // we can throw the block data directly. if (_state->num_rows() < _offset + _limit) { - RETURN_IF_ERROR( - _state->add_sorted_block(Block::create_shared(std::move(sorted_block)))); + _state->add_sorted_block(Block::create_shared(std::move(sorted_block))); _block_priority_queue.emplace(MergeSortCursorImpl::create_shared( _state->last_sorted_block(), _sort_description)); } else { @@ -81,7 +80,7 @@ Status TopNSorter::_do_sort(Block* block) { Block::create_shared(std::move(sorted_block)), _sort_description); MergeSortBlockCursor block_cursor(tmp_cursor_impl); if (!block_cursor.totally_greater(_block_priority_queue.top())) { - RETURN_IF_ERROR(_state->add_sorted_block(block_cursor.impl->block)); + _state->add_sorted_block(block_cursor.impl->block); _block_priority_queue.emplace(tmp_cursor_impl); } }