From 2a120204546da36bdf6ef860eace9b371bb50a3f Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 29 May 2024 19:23:19 +0800 Subject: [PATCH 1/6] udpate --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 30 +++++++++---------- be/src/pipeline/exec/exchange_sink_buffer.h | 16 +++++++--- .../pipeline/exec/exchange_sink_operator.cpp | 2 +- be/src/pipeline/exec/exchange_sink_operator.h | 3 ++ be/src/pipeline/exec/operator.h | 8 +++++ .../local_exchange_source_operator.cpp | 3 ++ .../local_exchange/local_exchanger.cpp | 25 ++++++++++++++++ .../pipeline/local_exchange/local_exchanger.h | 4 +++ be/src/pipeline/pipeline_fragment_context.h | 8 +++++ be/src/pipeline/pipeline_task.h | 6 ++++ be/src/runtime/fragment_mgr.cpp | 3 ++ 11 files changed, 87 insertions(+), 21 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index d66e5ebd6801c2..aa8def05c76b68 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -84,7 +84,8 @@ std::shared_ptr BroadcastPBlockHolderQueue::pop() { namespace pipeline { ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id, - int be_number, RuntimeState* state) + int be_number, RuntimeState* state, + ExchangeSinkLocalState* parent) : HasTaskExecutionCtx(state), _queue_capacity(0), _is_finishing(false), @@ -93,9 +94,8 @@ ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_ _sender_id(send_id), _be_number(be_number), _state(state), - _context(state->get_query_ctx()) {} - -ExchangeSinkBuffer::~ExchangeSinkBuffer() = default; + _context(state->get_query_ctx()), + _parent(parent) {} void ExchangeSinkBuffer::close() { // Could not clear the queue here, because there maybe a running rpc want to @@ -117,6 +117,12 @@ bool ExchangeSinkBuffer::can_write() const { } void ExchangeSinkBuffer::_set_ready_to_finish(bool all_done) { + if (all_done) { + auto weak_task_ctx = weak_task_exec_ctx(); + if (auto pip_ctx = weak_task_ctx.lock()) { + _parent->set_reach_limit(); + } + } if (_finish_dependency && _should_stop && all_done) { _finish_dependency->set_ready(); } @@ -213,8 +219,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { _instance_to_broadcast_package_queue[id]; if (_is_finishing) { - _rpc_channel_is_idle[id] = true; - _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1); + _turn_off_channel(id); return Status::OK(); } @@ -372,8 +377,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) { } broadcast_q.pop(); } else { - _rpc_channel_is_idle[id] = true; - _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1); + _turn_off_channel(id); } return Status::OK(); @@ -403,10 +407,7 @@ void ExchangeSinkBuffer::_ended(InstanceLoId id) { __builtin_unreachable(); } else { std::unique_lock lock(*_instance_to_package_queue_mutex[id]); - if (!_rpc_channel_is_idle[id]) { - _rpc_channel_is_idle[id] = true; - _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1); - } + _turn_off_channel(id); } } @@ -419,10 +420,7 @@ void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) { void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { std::unique_lock lock(*_instance_to_package_queue_mutex[id]); _instance_to_receiver_eof[id] = true; - if (!_rpc_channel_is_idle[id]) { - _rpc_channel_is_idle[id] = true; - _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1); - } + _turn_off_channel(id); std::queue> empty; swap(empty, _instance_to_broadcast_package_queue[id]); } diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 2955ff959deb84..85140229e98264 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -47,6 +47,7 @@ using InstanceLoId = int64_t; namespace pipeline { class Dependency; +class ExchangeSinkLocalState; } // namespace pipeline namespace vectorized { @@ -192,11 +193,11 @@ struct ExchangeRpcContext { }; // Each ExchangeSinkOperator have one ExchangeSinkBuffer -class ExchangeSinkBuffer : public HasTaskExecutionCtx { +class ExchangeSinkBuffer final : public HasTaskExecutionCtx { public: ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId dest_node_id, int send_id, int be_number, - RuntimeState* state); - ~ExchangeSinkBuffer(); + RuntimeState* state, ExchangeSinkLocalState* parent); + ~ExchangeSinkBuffer() = default; void register_sink(TUniqueId); Status add_block(TransmitInfo&& request); @@ -265,6 +266,12 @@ class ExchangeSinkBuffer : public HasTaskExecutionCtx { inline void _failed(InstanceLoId id, const std::string& err); inline void _set_receiver_eof(InstanceLoId id); inline bool _is_receiver_eof(InstanceLoId id); + inline void _turn_off_channel(InstanceLoId id) { + if (!_rpc_channel_is_idle[id]) { + _rpc_channel_is_idle[id] = true; + _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1); + } + } void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time); int64_t get_sum_rpc_time(); @@ -272,7 +279,8 @@ class ExchangeSinkBuffer : public HasTaskExecutionCtx { std::shared_ptr _queue_dependency = nullptr; std::shared_ptr _finish_dependency = nullptr; std::shared_ptr _broadcast_dependency = nullptr; - std::atomic _should_stop {false}; + std::atomic _should_stop = false; + ExchangeSinkLocalState* _parent = nullptr; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 244184bc7a3877..9c51bf526c80cd 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -132,7 +132,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { id.set_hi(_state->query_id().hi); id.set_lo(_state->query_id().lo); _sink_buffer = std::make_unique(id, p._dest_node_id, _sender_id, - _state->be_number(), state); + _state->be_number(), state, this); register_channels(_sink_buffer.get()); _queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 72fdbc3354d0fc..b5e508bcfcacf1 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -100,6 +100,8 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { RuntimeProfile::Counter* compress_timer() { return _compress_timer; } RuntimeProfile::Counter* uncompressed_bytes_counter() { return _uncompressed_bytes_counter; } [[nodiscard]] bool transfer_large_data_by_brpc() const; + bool eos() const override { return _reach_limit.load(); } + void set_reach_limit() { _reach_limit = true; }; [[nodiscard]] int sender_id() const { return _sender_id; } @@ -199,6 +201,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { // for external table sink hash partition std::unique_ptr _partition_function = nullptr; + std::atomic _reach_limit = false; }; class ExchangeSinkOperatorX final : public DataSinkOperatorX { diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index c6e713fe2f4b14..bd97ef1382e39d 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -312,6 +312,7 @@ class PipelineXSinkLocalStateBase { // idempotent (e.g. wait for runtime filters). virtual Status open(RuntimeState* state) = 0; virtual Status close(RuntimeState* state, Status exec_status) = 0; + [[nodiscard]] virtual bool eos() const { return false; } [[nodiscard]] virtual std::string debug_string(int indentation_level) const = 0; @@ -445,6 +446,13 @@ class DataSinkOperatorXBase : public OperatorBase { Status prepare(RuntimeState* state) override { return Status::OK(); } Status open(RuntimeState* state) override { return Status::OK(); } + [[nodiscard]] bool eos(RuntimeState* state) const { + auto result = state->get_sink_local_state_result(); + if (!result) { + return result.error(); + } + return result.value()->eos(); + } [[nodiscard]] virtual Status sink(RuntimeState* state, vectorized::Block* block, bool eos) = 0; diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp index ea9f7861dbfb23..64df8ed5dbd020 100644 --- a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp @@ -52,6 +52,9 @@ Status LocalExchangeSourceLocalState::close(RuntimeState* state) { return Status::OK(); } + if (_exchanger) { + _exchanger->close(*this); + } if (_shared_state) { _shared_state->sub_running_source_operators(); } diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp b/be/src/pipeline/local_exchange/local_exchanger.cpp index 973109483071f1..1c03b28415d815 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/local_exchange/local_exchanger.cpp @@ -39,6 +39,16 @@ Status ShuffleExchanger::sink(RuntimeState* state, vectorized::Block* in_block, return Status::OK(); } +void ShuffleExchanger::close(LocalExchangeSourceLocalState& local_state) { + PartitionedBlock partitioned_block; + while (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) { + auto block_wrapper = partitioned_block.first; + local_state._shared_state->sub_mem_usage( + local_state._channel_id, block_wrapper->data_block.allocated_bytes(), false); + block_wrapper->unref(local_state._shared_state); + } +} + Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos, LocalExchangeSourceLocalState& local_state) { PartitionedBlock partitioned_block; @@ -182,6 +192,14 @@ Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_blo return Status::OK(); } +void PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) { + vectorized::Block next_block; + while (_data_queue[local_state._channel_id].try_dequeue(next_block)) { + local_state._shared_state->sub_mem_usage(local_state._channel_id, + next_block.allocated_bytes()); + } +} + Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos, LocalExchangeSourceLocalState& local_state) { vectorized::Block next_block; @@ -255,6 +273,13 @@ Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* in_block return Status::OK(); } +void BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) { + vectorized::Block next_block; + while (_data_queue[local_state._channel_id].try_dequeue(next_block)) { + // do nothing + } +} + Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* block, bool* eos, LocalExchangeSourceLocalState& local_state) { vectorized::Block next_block; diff --git a/be/src/pipeline/local_exchange/local_exchanger.h b/be/src/pipeline/local_exchange/local_exchanger.h index 476f479e11ee80..bc07c80609460f 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.h +++ b/be/src/pipeline/local_exchange/local_exchanger.h @@ -48,6 +48,7 @@ class Exchanger { virtual Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos, LocalExchangeSinkLocalState& local_state) = 0; virtual ExchangeType get_type() const = 0; + virtual void close(LocalExchangeSourceLocalState& local_state) {} protected: friend struct LocalExchangeSharedState; @@ -108,6 +109,7 @@ class ShuffleExchanger : public Exchanger { Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, LocalExchangeSourceLocalState& local_state) override; + void close(LocalExchangeSourceLocalState& local_state) override; ExchangeType get_type() const override { return ExchangeType::HASH_SHUFFLE; } protected: @@ -150,6 +152,7 @@ class PassthroughExchanger final : public Exchanger { Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, LocalExchangeSourceLocalState& local_state) override; ExchangeType get_type() const override { return ExchangeType::PASSTHROUGH; } + void close(LocalExchangeSourceLocalState& local_state) override; private: std::vector> _data_queue; @@ -188,6 +191,7 @@ class BroadcastExchanger final : public Exchanger { Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos, LocalExchangeSourceLocalState& local_state) override; ExchangeType get_type() const override { return ExchangeType::BROADCAST; } + void close(LocalExchangeSourceLocalState& local_state) override; private: std::vector> _data_queue; diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 8bc1eb2913905c..be113b8b6fa5a0 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -131,6 +131,14 @@ class PipelineFragmentContext : public TaskExecutionContext { } } + void clear_finished_tasks() { + for (size_t j = 0; j < _tasks.size(); j++) { + for (size_t i = 0; i < _tasks[j].size(); i++) { + _tasks[j][i]->stop_if_eos(); + } + } + }; + private: Status _build_pipelines(ObjectPool* pool, const doris::TPipelineFragmentParams& request, const DescriptorTbl& descs, OperatorXPtr* root, PipelinePtr cur_pipe); diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index bb6587eec28d7b..45e68d60ae627f 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -222,6 +222,12 @@ class PipelineTask { std::string task_name() const { return fmt::format("task{}({})", _index, _pipeline->_name); } + void stop_if_eos() { + if (_sink->eos(_state)) { + clear_blocking_state(); + } + } + private: friend class RuntimeFilterDependency; bool _is_blocked(); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 852179e98e9bab..69fff2951ef941 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -1046,6 +1046,9 @@ void FragmentMgr::cancel_worker() { to_cancel.push_back(fragment_instance_itr.second->fragment_instance_id()); } } + for (auto& pipeline_itr : _pipeline_map) { + pipeline_itr.second->clear_finished_tasks(); + } for (auto it = _query_ctx_map.begin(); it != _query_ctx_map.end();) { if (auto q_ctx = it->second.lock()) { if (q_ctx->is_timeout(now) && q_ctx->enable_pipeline_x_exec()) { From 808962a1a927b1834239ea39976a821a43504952 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 30 May 2024 09:08:03 +0800 Subject: [PATCH 2/6] udpate --- be/src/pipeline/pipeline_task.cpp | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index d26b0fce387c75..78c5cf52c78bba 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -273,6 +273,7 @@ Status PipelineTask::execute(bool* eos) { SCOPED_TIMER(_task_profile->total_time_counter()); SCOPED_TIMER(_exec_timer); SCOPED_ATTACH_TASK(_state); + _eos = _sink->eos(_state) || _eos; *eos = _eos; if (_eos) { // If task is waken up by finish dependency, `_eos` is set to true by last execution, and we should return here. @@ -334,14 +335,15 @@ Status PipelineTask::execute(bool* eos) { Status::Error("fault_inject pipeline_task executing failed"); return status; }); - // Pull block from operator chain - if (!_dry_run) { + // `_dry_run` means sink operator need no more data + // `_sink->eos(_state)` means sink operator should be finished + if (_dry_run || _sink->eos(_state)) { + *eos = true; + _eos = true; + } else { SCOPED_TIMER(_get_block_timer); _get_block_counter->update(1); RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_root->get_block_after_projects(_state, block, eos)); - } else { - *eos = true; - _eos = true; } if (_block->rows() != 0 || *eos) { From 1e2785624b7402b2fffe5c3d4844f315df821f47 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 30 May 2024 11:47:23 +0800 Subject: [PATCH 3/6] update --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 22 +++++++++++++------ be/src/pipeline/exec/exchange_sink_buffer.h | 7 +----- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index aa8def05c76b68..0a783bb856b413 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -117,12 +117,6 @@ bool ExchangeSinkBuffer::can_write() const { } void ExchangeSinkBuffer::_set_ready_to_finish(bool all_done) { - if (all_done) { - auto weak_task_ctx = weak_task_exec_ctx(); - if (auto pip_ctx = weak_task_ctx.lock()) { - _parent->set_reach_limit(); - } - } if (_finish_dependency && _should_stop && all_done) { _finish_dependency->set_ready(); } @@ -420,7 +414,7 @@ void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) { void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { std::unique_lock lock(*_instance_to_package_queue_mutex[id]); _instance_to_receiver_eof[id] = true; - _turn_off_channel(id); + _turn_off_channel(id, true); std::queue> empty; swap(empty, _instance_to_broadcast_package_queue[id]); } @@ -430,6 +424,20 @@ bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId id) { return _instance_to_receiver_eof[id]; } +void ExchangeSinkBuffer::_turn_off_channel(InstanceLoId id, bool cleanup) { + if (!_rpc_channel_is_idle[id]) { + _rpc_channel_is_idle[id] = true; + auto all_done = _busy_channels.fetch_sub(1) == 1; + _set_ready_to_finish(all_done); + if (cleanup && all_done) { + auto weak_task_ctx = weak_task_exec_ctx(); + if (auto pip_ctx = weak_task_ctx.lock()) { + _parent->set_reach_limit(); + } + } + } +} + void ExchangeSinkBuffer::get_max_min_rpc_time(int64_t* max_time, int64_t* min_time) { int64_t local_max_time = 0; int64_t local_min_time = INT64_MAX; diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h b/be/src/pipeline/exec/exchange_sink_buffer.h index 85140229e98264..8eed559e712a07 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.h +++ b/be/src/pipeline/exec/exchange_sink_buffer.h @@ -266,12 +266,7 @@ class ExchangeSinkBuffer final : public HasTaskExecutionCtx { inline void _failed(InstanceLoId id, const std::string& err); inline void _set_receiver_eof(InstanceLoId id); inline bool _is_receiver_eof(InstanceLoId id); - inline void _turn_off_channel(InstanceLoId id) { - if (!_rpc_channel_is_idle[id]) { - _rpc_channel_is_idle[id] = true; - _set_ready_to_finish(_busy_channels.fetch_sub(1) == 1); - } - } + inline void _turn_off_channel(InstanceLoId id, bool cleanup = false); void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time); int64_t get_sum_rpc_time(); From 42d646f8342c74a4a8c23e46bd42391bc0b2c381 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 31 May 2024 10:36:45 +0800 Subject: [PATCH 4/6] update --- be/src/pipeline/exec/exchange_sink_buffer.cpp | 3 ++- be/src/pipeline/exec/exchange_sink_operator.cpp | 7 +++++-- .../local_exchange/local_exchange_source_operator.cpp | 5 +++++ 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp index 0a783bb856b413..8893db54cc5bf5 100644 --- a/be/src/pipeline/exec/exchange_sink_buffer.cpp +++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp @@ -408,7 +408,8 @@ void ExchangeSinkBuffer::_ended(InstanceLoId id) { void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) { _is_finishing = true; _context->cancel(Status::Cancelled(err)); - _ended(id); + std::unique_lock lock(*_instance_to_package_queue_mutex[id]); + _turn_off_channel(id, true); } void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) { diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 9c51bf526c80cd..67994c85cfa056 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -624,8 +624,11 @@ Status ExchangeSinkOperatorX::channel_add_rows_with_idx( std::string ExchangeSinkLocalState::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level)); - fmt::format_to(debug_string_buffer, ", Sink Buffer: (_should_stop = {}, _busy_channels = {})", - _sink_buffer->_should_stop.load(), _sink_buffer->_busy_channels.load()); + fmt::format_to(debug_string_buffer, + ", Sink Buffer: (_should_stop = {}, _busy_channels = {}, _is_finishing = {}), " + "_reach_limit: {}", + _sink_buffer->_should_stop.load(), _sink_buffer->_busy_channels.load(), + _sink_buffer->_is_finishing.load(), _reach_limit.load()); return fmt::to_string(debug_string_buffer); } diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp index 64df8ed5dbd020..1c8dff51a29b6c 100644 --- a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp @@ -70,6 +70,11 @@ std::string LocalExchangeSourceLocalState::debug_string(int indentation_level) c Base::debug_string(indentation_level), _channel_id, _exchanger->_num_partitions, _exchanger->_num_senders, _exchanger->_num_sources, _exchanger->_running_sink_operators, _exchanger->_running_source_operators); + size_t i = 0; + fmt::format_to(debug_string_buffer, ", MemTrackers: "); + for (auto* mem_tracker : _shared_state->mem_trackers) { + fmt::format_to(debug_string_buffer, "{}: {}, ", i, mem_tracker->consumption()); + } return fmt::to_string(debug_string_buffer); } From f11ccf19479ab88a3d922873c41a519ee03e8eb0 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 31 May 2024 15:00:51 +0800 Subject: [PATCH 5/6] update --- be/src/pipeline/exec/exchange_sink_operator.h | 2 +- be/src/pipeline/exec/operator.h | 6 +++--- be/src/pipeline/pipeline_fragment_context.h | 2 +- be/src/pipeline/pipeline_task.cpp | 2 +- be/src/pipeline/pipeline_task.h | 4 ++-- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index b5e508bcfcacf1..fa72db6702fc90 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -100,7 +100,7 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { RuntimeProfile::Counter* compress_timer() { return _compress_timer; } RuntimeProfile::Counter* uncompressed_bytes_counter() { return _uncompressed_bytes_counter; } [[nodiscard]] bool transfer_large_data_by_brpc() const; - bool eos() const override { return _reach_limit.load(); } + bool is_finished() const override { return _reach_limit.load(); } void set_reach_limit() { _reach_limit = true; }; [[nodiscard]] int sender_id() const { return _sender_id; } diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index bd97ef1382e39d..41b1aabac25375 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -312,7 +312,7 @@ class PipelineXSinkLocalStateBase { // idempotent (e.g. wait for runtime filters). virtual Status open(RuntimeState* state) = 0; virtual Status close(RuntimeState* state, Status exec_status) = 0; - [[nodiscard]] virtual bool eos() const { return false; } + [[nodiscard]] virtual bool is_finished() const { return false; } [[nodiscard]] virtual std::string debug_string(int indentation_level) const = 0; @@ -446,12 +446,12 @@ class DataSinkOperatorXBase : public OperatorBase { Status prepare(RuntimeState* state) override { return Status::OK(); } Status open(RuntimeState* state) override { return Status::OK(); } - [[nodiscard]] bool eos(RuntimeState* state) const { + [[nodiscard]] bool is_finished(RuntimeState* state) const { auto result = state->get_sink_local_state_result(); if (!result) { return result.error(); } - return result.value()->eos(); + return result.value()->is_finished(); } [[nodiscard]] virtual Status sink(RuntimeState* state, vectorized::Block* block, bool eos) = 0; diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index be113b8b6fa5a0..94dd96731c2804 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -134,7 +134,7 @@ class PipelineFragmentContext : public TaskExecutionContext { void clear_finished_tasks() { for (size_t j = 0; j < _tasks.size(); j++) { for (size_t i = 0; i < _tasks[j].size(); i++) { - _tasks[j][i]->stop_if_eos(); + _tasks[j][i]->stop_if_finished(); } } }; diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 78c5cf52c78bba..7523fb6c487a69 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -273,7 +273,7 @@ Status PipelineTask::execute(bool* eos) { SCOPED_TIMER(_task_profile->total_time_counter()); SCOPED_TIMER(_exec_timer); SCOPED_ATTACH_TASK(_state); - _eos = _sink->eos(_state) || _eos; + _eos = _sink->is_finished(_state) || _eos; *eos = _eos; if (_eos) { // If task is waken up by finish dependency, `_eos` is set to true by last execution, and we should return here. diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 45e68d60ae627f..6bc65905be68a8 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -222,8 +222,8 @@ class PipelineTask { std::string task_name() const { return fmt::format("task{}({})", _index, _pipeline->_name); } - void stop_if_eos() { - if (_sink->eos(_state)) { + void stop_if_finished() { + if (_sink->is_finished(_state)) { clear_blocking_state(); } } From a082eeac0f39e500062a0ff7abb1ca05bd806379 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 31 May 2024 15:49:59 +0800 Subject: [PATCH 6/6] update --- be/src/pipeline/pipeline_task.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 7523fb6c487a69..52a76828804acb 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -336,8 +336,8 @@ Status PipelineTask::execute(bool* eos) { return status; }); // `_dry_run` means sink operator need no more data - // `_sink->eos(_state)` means sink operator should be finished - if (_dry_run || _sink->eos(_state)) { + // `_sink->is_finished(_state)` means sink operator should be finished + if (_dry_run || _sink->is_finished(_state)) { *eos = true; _eos = true; } else {