From f9562ec2be8a57a91cf436c807c833c4cb0275f4 Mon Sep 17 00:00:00 2001 From: Mryange <2319153948@qq.com> Date: Tue, 19 Dec 2023 11:56:52 +0800 Subject: [PATCH 1/2] upd --- .../pipeline_x/pipeline_x_fragment_context.cpp | 7 ++++--- be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 14 ++++++-------- be/src/pipeline/pipeline_x/pipeline_x_task.h | 3 +-- be/src/runtime/fragment_mgr.cpp | 1 + 4 files changed, 12 insertions(+), 13 deletions(-) diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 0ac4bc2bd872fb..624def16e4fc09 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -601,8 +601,7 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( auto prepare_and_set_parent_profile = [&](PipelineXTask* task, size_t pip_idx) { DCHECK(pipeline_id_to_profile[pip_idx]); - RETURN_IF_ERROR(task->prepare(get_task_runtime_state(task->task_id()), local_params, - request.fragment.output_sink)); + RETURN_IF_ERROR(task->prepare(local_params, request.fragment.output_sink)); return Status::OK(); }; @@ -828,7 +827,7 @@ Status PipelineXFragmentContext::_add_local_exchange( OperatorXPtr source_op; source_op.reset(new LocalExchangeSourceOperatorX(pool, local_exchange_id)); RETURN_IF_ERROR(source_op->init(exchange_type)); - if (operator_xs.size() > 0) { + if (!operator_xs.empty()) { RETURN_IF_ERROR(operator_xs.front()->set_child(source_op)); } operator_xs.insert(operator_xs.begin(), source_op); @@ -878,6 +877,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN const DescriptorTbl& descs, OperatorXPtr& op, PipelinePtr& cur_pipe, int parent_idx, int child_idx) { + // We directly construct the operator from Thrift because the given array is in the order of preorder traversal. + // Therefore, here we need to use a stack-like structure. _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx); std::stringstream error_msg; switch (tnode.node_type) { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 283b7851e4c55b..77e8788fe6d637 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -56,7 +56,7 @@ PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeSta _source(_operators.front()), _root(_operators.back()), _sink(pipeline->sink_shared_pointer()), - _le_state_map(le_state_map), + _le_state_map(std::move(le_state_map)), _task_idx(task_idx), _execution_dep(state->get_query_ctx()->get_execution_dependency()) { _pipeline_task_watcher.start(); @@ -67,15 +67,13 @@ PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeSta pipeline->incr_created_tasks(); } -Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams& local_params, - const TDataSink& tsink) { +Status PipelineXTask::prepare(const TPipelineInstanceParams& local_params, const TDataSink& tsink) { DCHECK(_sink); DCHECK(_cur_state == PipelineTaskState::NOT_READY) << get_state_name(_cur_state); _init_profile(); SCOPED_TIMER(_task_profile->total_time_counter()); SCOPED_CPU_TIMER(_task_cpu_timer); SCOPED_TIMER(_prepare_timer); - DCHECK_EQ(state, _state); { // set sink local state @@ -85,20 +83,20 @@ Status PipelineXTask::prepare(RuntimeState* state, const TPipelineInstanceParams get_downstream_dependency(), _le_state_map, tsink}; - RETURN_IF_ERROR(_sink->setup_local_state(state, info)); + RETURN_IF_ERROR(_sink->setup_local_state(_state, info)); } std::vector no_scan_ranges; auto scan_ranges = find_with_default(local_params.per_node_scan_ranges, _operators.front()->node_id(), no_scan_ranges); - auto* parent_profile = state->get_sink_local_state(_sink->operator_id())->profile(); + auto* parent_profile = _state->get_sink_local_state(_sink->operator_id())->profile(); for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) { auto& op = _operators[op_idx]; auto& deps = get_upstream_dependency(op->operator_id()); LocalStateInfo info {parent_profile, scan_ranges, deps, _le_state_map, _task_idx, _source_dependency[op->operator_id()]}; - RETURN_IF_ERROR(op->setup_local_state(state, info)); - parent_profile = state->get_local_state(op->operator_id())->profile(); + RETURN_IF_ERROR(op->setup_local_state(_state, info)); + parent_profile = _state->get_local_state(op->operator_id())->profile(); } _block = doris::vectorized::Block::create_unique(); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index fc633339c3188e..f7b996f40a71e1 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -62,8 +62,7 @@ class PipelineXTask : public PipelineTask { return Status::InternalError("Should not reach here!"); } - Status prepare(RuntimeState* state, const TPipelineInstanceParams& local_params, - const TDataSink& tsink); + Status prepare(const TPipelineInstanceParams& local_params, const TDataSink& tsink); Status execute(bool* eos) override; diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 90ebbff37a26f2..c54786e5428408 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -220,6 +220,7 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { DCHECK(req.runtime_state != nullptr); if (req.query_statistics) { + // use to report 'insert into select' TQueryStatistics queryStatistics; DCHECK(req.query_statistics->collect_dml_statistics()); req.query_statistics->to_thrift(&queryStatistics); From 46cde955406cc610b63bbd175346e758c9ab56ad Mon Sep 17 00:00:00 2001 From: Mryange <2319153948@qq.com> Date: Tue, 19 Dec 2023 14:13:26 +0800 Subject: [PATCH 2/2] filter dep --- be/src/pipeline/exec/multi_cast_data_stream_source.cpp | 10 +++++++--- be/src/pipeline/exec/multi_cast_data_stream_source.h | 3 +++ be/src/pipeline/exec/scan_operator.cpp | 6 +++++- be/src/pipeline/exec/scan_operator.h | 4 ++++ be/src/pipeline/pipeline_x/operator.cpp | 6 +----- be/src/pipeline/pipeline_x/operator.h | 4 ++-- be/src/pipeline/pipeline_x/pipeline_x_task.cpp | 9 ++++++--- 7 files changed, 28 insertions(+), 14 deletions(-) diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp index 4c73f6ecb282fe..a4f3ff55a5c023 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp @@ -128,9 +128,13 @@ RuntimeProfile* MultiCastDataStreamerSourceOperator::get_runtime_profile() const MultiCastDataStreamSourceLocalState::MultiCastDataStreamSourceLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent), - vectorized::RuntimeFilterConsumer( - static_cast(parent)->dest_id_from_sink(), parent->runtime_filter_descs(), - static_cast(parent)->_row_desc(), _conjuncts) {}; + vectorized::RuntimeFilterConsumer(static_cast(parent)->dest_id_from_sink(), + parent->runtime_filter_descs(), + static_cast(parent)->_row_desc(), _conjuncts) { + _filter_dependency = std::make_shared( + parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY", + state->get_query_ctx()); +}; Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(Base::init(state, info)); diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h b/be/src/pipeline/exec/multi_cast_data_stream_source.h index 86034a76ce7a3c..baeca2ca7b1a93 100644 --- a/be/src/pipeline/exec/multi_cast_data_stream_source.h +++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h @@ -120,8 +120,11 @@ class MultiCastDataStreamSourceLocalState final friend class MultiCastDataStreamerSourceOperatorX; + RuntimeFilterDependency* filterdependency() override { return _filter_dependency.get(); } + private: vectorized::VExprContextSPtrs _output_expr_contexts; + std::shared_ptr _filter_dependency; }; class MultiCastDataStreamerSourceOperatorX final diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index d8d3958a7797a4..bdd50c03fff3a2 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -100,7 +100,11 @@ std::string ScanOperator::debug_string() const { template ScanLocalState::ScanLocalState(RuntimeState* state, OperatorXBase* parent) - : ScanLocalStateBase(state, parent) {} + : ScanLocalStateBase(state, parent) { + _filter_dependency = std::make_shared( + parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY", + state->get_query_ctx()); +} template bool ScanLocalState::ready_to_read() { diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 603f3804aae7e3..51551105851769 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -211,6 +211,8 @@ class ScanLocalState : public ScanLocalStateBase { Dependency* dependency() override { return _scan_dependency.get(); } + RuntimeFilterDependency* filterdependency() override { return _filter_dependency.get(); }; + protected: template friend class ScanOperatorX; @@ -405,6 +407,8 @@ class ScanLocalState : public ScanLocalStateBase { std::atomic _eos = false; std::mutex _block_lock; + + std::shared_ptr _filter_dependency; }; template diff --git a/be/src/pipeline/pipeline_x/operator.cpp b/be/src/pipeline/pipeline_x/operator.cpp index a466b8b3b677aa..076cdf4af0f307 100644 --- a/be/src/pipeline/pipeline_x/operator.cpp +++ b/be/src/pipeline/pipeline_x/operator.cpp @@ -322,11 +322,7 @@ PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXB _state(state), _finish_dependency(new FinishDependency(parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY", - state->get_query_ctx())) { - _filter_dependency = std::make_shared( - parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY", - state->get_query_ctx()); -} + state->get_query_ctx())) {} template Status PipelineXLocalState::init(RuntimeState* state, LocalStateInfo& info) { diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 783b15ac7eba19..08f8e3eaf45303 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -103,7 +103,8 @@ class PipelineXLocalStateBase { virtual Dependency* dependency() { return nullptr; } Dependency* finishdependency() { return _finish_dependency.get(); } - RuntimeFilterDependency* filterdependency() { return _filter_dependency.get(); } + // override in Scan MultiCastSink + virtual RuntimeFilterDependency* filterdependency() { return nullptr; } protected: friend class OperatorXBase; @@ -136,7 +137,6 @@ class PipelineXLocalStateBase { bool _closed = false; vectorized::Block _origin_block; std::shared_ptr _finish_dependency; - std::shared_ptr _filter_dependency; }; class OperatorXBase : public OperatorBase { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 77e8788fe6d637..965b91d3e4fc03 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -191,6 +191,7 @@ Status PipelineXTask::_open() { for (size_t i = 0; i < 2; i++) { auto st = local_state->open(_state); if (st.is()) { + DCHECK(_filter_dependency); _blocked_dep = _filter_dependency->is_blocked_by(this); if (_blocked_dep) { set_state(PipelineTaskState::BLOCKED_FOR_RF); @@ -375,9 +376,11 @@ std::string PipelineXTask::debug_string() { fmt::format_to(debug_string_buffer, "{}. {}\n", i, _write_dependencies->debug_string(1)); i++; - fmt::format_to(debug_string_buffer, "Runtime Filter Dependency Information: \n"); - fmt::format_to(debug_string_buffer, "{}. {}\n", i, _filter_dependency->debug_string(1)); - i++; + if (_filter_dependency) { + fmt::format_to(debug_string_buffer, "Runtime Filter Dependency Information: \n"); + fmt::format_to(debug_string_buffer, "{}. {}\n", i, _filter_dependency->debug_string(1)); + i++; + } fmt::format_to(debug_string_buffer, "Finish Dependency Information: \n"); for (size_t j = 0; j < _finish_dependencies.size(); j++, i++) {