From f9562ec2be8a57a91cf436c807c833c4cb0275f4 Mon Sep 17 00:00:00 2001 From: Mryange <2319153948@qq.com> Date: Tue, 19 Dec 2023 11:56:52 +0800 Subject: [PATCH 1/3] upd --- .../pipeline_x/pipeline_x_fragment_context.cpp | 7 ++++--- be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 14 ++++++-------- be/src/pipeline/pipeline_x/pipeline_x_task.h | 3 +-- be/src/runtime/fragment_mgr.cpp | 1 + 4 files changed, 12 insertions(+), 13 deletions(-) diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 0ac4bc2bd872fb..624def16e4fc09 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -601,8 +601,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( auto prepare_and_set_parent_profile = [&](PipelineXTask* task, size_t pip_idx) { DCHECK(pipeline_id_to_profile[pip_idx]); - RETURN_IF_ERROR(task->prepare(get_task_runtime_state(task->task_id()), local_params, - request.fragment.output_sink)); + RETURN_IF_ERROR(task->prepare(local_params, request.fragment.output_sink)); return Status::OK(); }; @@ -828,7 +827,7 @@ Status PipelineXFragmentContext::_add_local_exchange( OperatorXPtr source_op; source_op.reset(new LocalExchangeSourceOperatorX(pool, local_exchange_id)); RETURN_IF_ERROR(source_op->init(exchange_type)); - if (operator_xs.size() > 0) { + if (!operator_xs.empty()) { RETURN_IF_ERROR(operator_xs.front()->set_child(source_op)); } operator_xs.insert(operator_xs.begin(), source_op); @@ -878,6 +877,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN const DescriptorTbl& descs, OperatorXPtr& op, PipelinePtr& cur_pipe, int parent_idx, int child_idx) { + // We directly construct the operator from Thrift because the given array is in the order of preorder traversal. + // Therefore, here we need to use a stack-like structure. _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx); std::stringstream error_msg; switch (tnode.node_type) { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 283b7851e4c55b..77e8788fe6d637 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -56,7 +56,7 @@ PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeSta _source(_operators.front()), _root(_operators.back()), _sink(pipeline->sink_shared_pointer()), - _le_state_map(le_state_map), + _le_state_map(std::move(le_state_map)), _task_idx(task_idx), _execution_dep(state->get_query_ctx()->get_execution_dependency()) { _pipeline_task_watcher.start(); @@ -67,15 +67,13 @@ PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeSta pipeline->incr_created_tasks(); } -Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams& local_params, - const TDataSink& tsink) { +Status PipelineXTask::prepare(const TPipelineInstanceParams& local_params, const TDataSink& tsink) { DCHECK(_sink); DCHECK(_cur_state == PipelineTaskState::NOT_READY) << get_state_name(_cur_state); _init_profile(); SCOPED_TIMER(_task_profile->total_time_counter()); SCOPED_CPU_TIMER(_task_cpu_timer); SCOPED_TIMER(_prepare_timer); - DCHECK_EQ(state, _state); { // set sink local state @@ -85,20 +83,20 @@ Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams get_downstream_dependency(), _le_state_map, tsink}; - RETURN_IF_ERROR(_sink->setup_local_state(state, info)); + RETURN_IF_ERROR(_sink->setup_local_state(_state, info)); } std::vector no_scan_ranges; auto scan_ranges = find_with_default(local_params.per_node_scan_ranges, _operators.front()->node_id(), no_scan_ranges); - auto* parent_profile = state->get_sink_local_state(_sink->operator_id())->profile(); + auto* parent_profile = _state->get_sink_local_state(_sink->operator_id())->profile(); for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) { auto& op = _operators[op_idx]; auto& deps = get_upstream_dependency(op->operator_id()); LocalStateInfo info {parent_profile, scan_ranges, deps, _le_state_map, _task_idx, _source_dependency[op->operator_id()]}; - RETURN_IF_ERROR(op->setup_local_state(state, info)); - parent_profile = state->get_local_state(op->operator_id())->profile(); + RETURN_IF_ERROR(op->setup_local_state(_state, info)); + parent_profile = _state->get_local_state(op->operator_id())->profile(); } _block = doris::vectorized::Block::create_unique(); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index fc633339c3188e..f7b996f40a71e1 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -62,8 +62,7 @@ class PipelineXTask : public PipelineTask { return Status::InternalError("Should not reach here!"); } - Status prepare(RuntimeState* state, const TPipelineInstanceParams& local_params, - const TDataSink& tsink); + Status prepare(const TPipelineInstanceParams& local_params, const TDataSink& tsink); Status execute(bool* eos) override; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 90ebbff37a26f2..c54786e5428408 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -220,6 +220,7 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { DCHECK(req.runtime_state != nullptr); if (req.query_statistics) { + // use to report 'insert into select' TQueryStatistics queryStatistics; DCHECK(req.query_statistics->collect_dml_statistics()); req.query_statistics->to_thrift(&queryStatistics); From 46cde955406cc610b63bbd175346e758c9ab56ad Mon Sep 17 00:00:00 2001 From: Mryange <2319153948@qq.com> Date: Tue, 19 Dec 2023 14:13:26 +0800 Subject: [PATCH 2/3] filter dep --- be/src/pipeline/exec/multi_cast_data_stream_source.cpp | 10 +++++++--- be/src/pipeline/exec/multi_cast_data_stream_source.h | 3 +++ be/src/pipeline/exec/scan_operator.cpp | 6 +++++- be/src/pipeline/exec/scan_operator.h | 4 ++++ be/src/pipeline/pipeline_x/operator.cpp | 6 +----- be/src/pipeline/pipeline_x/operator.h | 4 ++-- be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 9 ++++++--- 7 files changed, 28 insertions(+), 14 deletions(-) diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp index 4c73f6ecb282fe..a4f3ff55a5c023 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -128,9 +128,13 @@ RuntimeProfile* MultiCastDataStreamerSourceOperator::get_runtime_profile() const MultiCastDataStreamSourceLocalState::MultiCastDataStreamSourceLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent), - vectorized::RuntimeFilterConsumer( - static_cast(parent)->dest_id_from_sink(), parent->runtime_filter_descs(), - static_cast(parent)->_row_desc(), _conjuncts) {}; + vectorized::RuntimeFilterConsumer(static_cast(parent)->dest_id_from_sink(), + parent->runtime_filter_descs(), + static_cast(parent)->_row_desc(), _conjuncts) { + _filter_dependency = std::make_shared( + parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY", + state->get_query_ctx()); +}; Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h index 86034a76ce7a3c..baeca2ca7b1a93 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -120,8 +120,11 @@ class MultiCastDataStreamSourceLocalState final friend class MultiCastDataStreamerSourceOperatorX; + RuntimeFilterDependency* filterdependency() override { return _filter_dependency.get(); } + private: vectorized::VExprContextSPtrs _output_expr_contexts; + std::shared_ptr _filter_dependency; }; class MultiCastDataStreamerSourceOperatorX final diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index d8d3958a7797a4..bdd50c03fff3a2 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -100,7 +100,11 @@ std::string ScanOperator::debug_string() const { template ScanLocalState::ScanLocalState(RuntimeState* state, OperatorXBase* parent) - : ScanLocalStateBase(state, parent) {} + : ScanLocalStateBase(state, parent) { + _filter_dependency = std::make_shared( + parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY", + state->get_query_ctx()); +} template bool ScanLocalState::ready_to_read() { diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 603f3804aae7e3..51551105851769 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -211,6 +211,8 @@ class ScanLocalState : public ScanLocalStateBase { Dependency* dependency() override { return _scan_dependency.get(); } + RuntimeFilterDependency* filterdependency() override { return _filter_dependency.get(); }; + protected: template friend class ScanOperatorX; @@ -405,6 +407,8 @@ class ScanLocalState : public ScanLocalStateBase { std::atomic _eos = false; std::mutex _block_lock; + + std::shared_ptr _filter_dependency; }; template diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index a466b8b3b677aa..076cdf4af0f307 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -322,11 +322,7 @@ PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXB _state(state), _finish_dependency(new FinishDependency(parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY", - state->get_query_ctx())) { - _filter_dependency = std::make_shared( - parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY", - state->get_query_ctx()); -} + state->get_query_ctx())) {} template Status PipelineXLocalState::init(RuntimeState* state, LocalStateInfo& info) { diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 783b15ac7eba19..08f8e3eaf45303 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -103,7 +103,8 @@ class PipelineXLocalStateBase { virtual Dependency* dependency() { return nullptr; } Dependency* finishdependency() { return _finish_dependency.get(); } - RuntimeFilterDependency* filterdependency() { return _filter_dependency.get(); } + // override in Scan MultiCastSink + virtual RuntimeFilterDependency* filterdependency() { return nullptr; } protected: friend class OperatorXBase; @@ -136,7 +137,6 @@ class PipelineXLocalStateBase { bool _closed = false; vectorized::Block _origin_block; std::shared_ptr _finish_dependency; - std::shared_ptr _filter_dependency; }; class OperatorXBase : public OperatorBase { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 77e8788fe6d637..965b91d3e4fc03 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -191,6 +191,7 @@ Status PipelineXTask::_open() { for (size_t i = 0; i < 2; i++) { auto st = local_state->open(_state); if (st.is()) { + DCHECK(_filter_dependency); _blocked_dep = _filter_dependency->is_blocked_by(this); if (_blocked_dep) { set_state(PipelineTaskState::BLOCKED_FOR_RF); @@ -375,9 +376,11 @@ std::string PipelineXTask::debug_string() { fmt::format_to(debug_string_buffer, "{}. {}\n", i, _write_dependencies->debug_string(1)); i++; - fmt::format_to(debug_string_buffer, "Runtime Filter Dependency Information: \n"); - fmt::format_to(debug_string_buffer, "{}. {}\n", i, _filter_dependency->debug_string(1)); - i++; + if (_filter_dependency) { + fmt::format_to(debug_string_buffer, "Runtime Filter Dependency Information: \n"); + fmt::format_to(debug_string_buffer, "{}. {}\n", i, _filter_dependency->debug_string(1)); + i++; + } fmt::format_to(debug_string_buffer, "Finish Dependency Information: \n"); for (size_t j = 0; j < _finish_dependencies.size(); j++, i++) { From f527015589f9c0c3e40692e8643fb4d021dac0a0 Mon Sep 17 00:00:00 2001 From: Mryange <2319153948@qq.com> Date: Tue, 19 Dec 2023 15:18:19 +0800 Subject: [PATCH 3/3] fin dep --- .../pipeline/exec/exchange_sink_operator.cpp | 1 + be/src/pipeline/exec/exchange_sink_operator.h | 9 ++++++++- be/src/pipeline/exec/scan_operator.cpp | 8 +++++++- be/src/pipeline/exec/scan_operator.h | 5 +++++ be/src/pipeline/pipeline_x/operator.cpp | 16 +++------------- be/src/pipeline/pipeline_x/operator.h | 19 +++++++++++++------ .../pipeline/pipeline_x/pipeline_x_task.cpp | 8 ++++++-- 7 files changed, 43 insertions(+), 23 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index 165576d1ab2980..1be45b7e514ebf 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -535,6 +535,7 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) { SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_close_timer); COUNTER_UPDATE(_wait_queue_timer, _queue_dependency->watcher_elapse_time()); + COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time()); if (_broadcast_dependency) { COUNTER_UPDATE(_wait_broadcast_buffer_timer, _broadcast_dependency->watcher_elapse_time()); } diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index bc91e5dc19d78d..a2327d44f59d5b 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -152,12 +152,17 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { : PipelineXSinkLocalState<>(parent, state), current_channel_idx(0), only_local_exchange(false), - _serializer(this) {} + _serializer(this) { + _finish_dependency = std::make_shared( + parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY", + state->get_query_ctx()); + } Status init(RuntimeState* state, LocalSinkStateInfo& info) override; Status open(RuntimeState* state) override; Status close(RuntimeState* state, Status exec_status) override; Dependency* dependency() override { return _exchange_sink_dependency.get(); } + Dependency* finishdependency() override { return _finish_dependency.get(); } Status serialize_block(vectorized::Block* src, PBlock* dest, int num_receivers = 1); void register_channels(pipeline::ExchangeSinkBuffer* buffer); Status get_next_available_buffer(vectorized::BroadcastPBlockHolder** holder); @@ -236,6 +241,8 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { std::vector> _local_channels_dependency; std::unique_ptr _partitioner; int _partition_count; + + std::shared_ptr _finish_dependency; }; class ExchangeSinkOperatorX final : public DataSinkOperatorX { diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index bdd50c03fff3a2..398765c8cc6149 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -101,6 +101,9 @@ std::string ScanOperator::debug_string() const { template ScanLocalState::ScanLocalState(RuntimeState* state, OperatorXBase* parent) : ScanLocalStateBase(state, parent) { + _finish_dependency = std::make_shared( + parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY", + state->get_query_ctx()); _filter_dependency = std::make_shared( parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY", state->get_query_ctx()); @@ -1315,6 +1318,9 @@ Status ScanLocalState::_init_profile() { _max_scanner_thread_num = ADD_COUNTER(_runtime_profile, "MaxScannerThreadNum", TUnit::UNIT); + _wait_for_finish_dependency_timer = + ADD_TIMER(_runtime_profile, "WaitForPendingFinishDependency"); + return Status::OK(); } @@ -1446,7 +1452,7 @@ Status ScanLocalState::close(RuntimeState* state) { _scanner_ctx->clear_and_join(reinterpret_cast(this), state); } COUNTER_SET(_wait_for_dependency_timer, _scan_dependency->watcher_elapse_time()); - + COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time()); return PipelineXLocalState<>::close(state); } diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 51551105851769..78cb399427e787 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -171,6 +171,8 @@ class ScanLocalStateBase : public PipelineXLocalState<>, public vectorized::Runt RuntimeProfile::Counter* _wait_for_scanner_done_timer = nullptr; // time of prefilter input block from scanner RuntimeProfile::Counter* _wait_for_eos_timer = nullptr; + + RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr; }; template @@ -212,6 +214,7 @@ class ScanLocalState : public ScanLocalStateBase { Dependency* dependency() override { return _scan_dependency.get(); } RuntimeFilterDependency* filterdependency() override { return _filter_dependency.get(); }; + Dependency* finishdependency() override { return _finish_dependency.get(); } protected: template @@ -409,6 +412,8 @@ class ScanLocalState : public ScanLocalStateBase { std::mutex _block_lock; std::shared_ptr _filter_dependency; + + std::shared_ptr _finish_dependency; }; template diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index 076cdf4af0f307..edfbd34035b4a9 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -308,21 +308,14 @@ Status OperatorX::setup_local_state(RuntimeState* state, LocalSt PipelineXSinkLocalStateBase::PipelineXSinkLocalStateBase(DataSinkOperatorXBase* parent, RuntimeState* state) - : _parent(parent), - _state(state), - _finish_dependency(new FinishDependency(parent->operator_id(), parent->node_id(), - parent->get_name() + "_FINISH_DEPENDENCY", - state->get_query_ctx())) {} + : _parent(parent), _state(state) {} PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXBase* parent) : _num_rows_returned(0), _rows_returned_counter(nullptr), _peak_memory_usage_counter(nullptr), _parent(parent), - _state(state), - _finish_dependency(new FinishDependency(parent->operator_id(), parent->node_id(), - parent->get_name() + "_FINISH_DEPENDENCY", - state->get_query_ctx())) {} + _state(state) {} template Status PipelineXLocalState::init(RuntimeState* state, LocalStateInfo& info) { @@ -330,8 +323,6 @@ Status PipelineXLocalState::init(RuntimeState* state, LocalState _runtime_profile->set_metadata(_parent->node_id()); _runtime_profile->set_is_sink(false); info.parent_profile->add_child(_runtime_profile.get(), true, nullptr); - _wait_for_finish_dependency_timer = - ADD_TIMER(_runtime_profile, "WaitForPendingFinishDependency"); _dependency = (DependencyType*)info.dependency.get(); if constexpr (!std::is_same_v) { auto& deps = info.upstream_dependencies; @@ -382,7 +373,6 @@ Status PipelineXLocalState::close(RuntimeState* state) { if constexpr (!std::is_same_v) { COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time()); } - COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time()); if (_rows_returned_counter != nullptr) { COUNTER_SET(_rows_returned_counter, _num_rows_returned); } @@ -442,7 +432,6 @@ Status PipelineXSinkLocalState::close(RuntimeState* state, Statu if constexpr (!std::is_same_v) { COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time()); } - COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time()); if (_peak_memory_usage_counter) { _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); } @@ -532,6 +521,7 @@ Status AsyncWriterSink::close(RuntimeState* state, Status exec_s return Status::OK(); } COUNTER_SET(_wait_for_dependency_timer, _async_writer_dependency->watcher_elapse_time()); + COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time()); // if the init failed, the _writer may be nullptr. so here need check if (_writer) { if (_writer->need_normal_close()) { diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 08f8e3eaf45303..d697970311de3e 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -102,7 +102,8 @@ class PipelineXLocalStateBase { virtual Dependency* dependency() { return nullptr; } - Dependency* finishdependency() { return _finish_dependency.get(); } + // override in Scan + virtual Dependency* finishdependency() { return nullptr; } // override in Scan MultiCastSink virtual RuntimeFilterDependency* filterdependency() { return nullptr; } @@ -122,7 +123,6 @@ class PipelineXLocalStateBase { RuntimeProfile::Counter* _blocks_returned_counter = nullptr; RuntimeProfile::Counter* _wait_for_dependency_timer = nullptr; RuntimeProfile::Counter* _memory_used_counter = nullptr; - RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr; RuntimeProfile::Counter* _projection_timer = nullptr; RuntimeProfile::Counter* _exec_timer = nullptr; // Account for peak memory used by this node @@ -136,7 +136,6 @@ class PipelineXLocalStateBase { vectorized::VExprContextSPtrs _projections; bool _closed = false; vectorized::Block _origin_block; - std::shared_ptr _finish_dependency; }; class OperatorXBase : public OperatorBase { @@ -397,7 +396,8 @@ class PipelineXSinkLocalStateBase { RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; } virtual Dependency* dependency() { return nullptr; } - Dependency* finishdependency() { return _finish_dependency.get(); } + // override in exchange sink , AsyncWriterSink + virtual Dependency* finishdependency() { return nullptr; } protected: DataSinkOperatorXBase* _parent = nullptr; @@ -424,7 +424,6 @@ class PipelineXSinkLocalStateBase { RuntimeProfile::Counter* _exec_timer = nullptr; RuntimeProfile::Counter* _memory_used_counter = nullptr; RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr; - std::shared_ptr _finish_dependency; }; class DataSinkOperatorXBase : public OperatorBase { @@ -659,7 +658,11 @@ class AsyncWriterSink : public PipelineXSinkLocalState { public: using Base = PipelineXSinkLocalState; AsyncWriterSink(DataSinkOperatorXBase* parent, RuntimeState* state) - : Base(parent, state), _async_writer_dependency(nullptr) {} + : Base(parent, state), _async_writer_dependency(nullptr) { + _finish_dependency = std::make_shared( + parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY", + state->get_query_ctx()); + } Status init(RuntimeState* state, LocalSinkStateInfo& info) override; @@ -672,11 +675,15 @@ class AsyncWriterSink : public PipelineXSinkLocalState { Status try_close(RuntimeState* state, Status exec_status) override; + Dependency* finishdependency() override { return _finish_dependency.get(); } + protected: vectorized::VExprContextSPtrs _output_vexpr_ctxs; std::unique_ptr _writer; std::shared_ptr _async_writer_dependency; + + std::shared_ptr _finish_dependency; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 965b91d3e4fc03..be62fcac213f84 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -118,7 +118,9 @@ Status PipelineXTask::_extract_dependencies() { DCHECK(dep != nullptr); _read_dependencies.push_back(dep); auto* fin_dep = local_state->finishdependency(); - _finish_dependencies.push_back(fin_dep); + if (fin_dep) { + _finish_dependencies.push_back(fin_dep); + } } { auto result = _state->get_sink_local_state_result(_sink->operator_id()); @@ -130,7 +132,9 @@ Status PipelineXTask::_extract_dependencies() { DCHECK(dep != nullptr); _write_dependencies = dep; auto* fin_dep = local_state->finishdependency(); - _finish_dependencies.push_back(fin_dep); + if (fin_dep) { + _finish_dependencies.push_back(fin_dep); + } } { auto result = _state->get_local_state_result(_source->operator_id());