From 196722efee23b8ee368aaafb16076bc62f37451d Mon Sep 17 00:00:00 2001 From: Gabriel Date: Tue, 8 Oct 2024 10:33:15 +0800 Subject: [PATCH 1/5] =?UTF-8?q?[fix](pipeline)=20Make=20all=20upstream=20t?= =?UTF-8?q?asks=20runnable=20if=20all=20tasks=20finishe=E2=80=A6=20(#41292?= =?UTF-8?q?)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Consider 3 pipelines in this fragment (... -> join -> shuffle) : pipeline 0 : `... -> local exchange sink` pipeline 1 : `... -> join build (INNER JOIN)` pipeline 2 : `local exchange source -> join probe (INNER JOIN) -> data stream sender ` Assume the JoinBuild returned 0 rows, join probe can finish directly once join build finished and do not need to wait for the `local exchange sink` finished. In this case, if pipeline 0 is blocked by a dependency for a long time, pipeline 2 should notify pipeline 0 to finish. --- be/src/pipeline/exec/hashjoin_build_sink.cpp | 10 +++++++--- .../partitioned_aggregation_sink_operator.h | 1 - .../pipeline/exec/spill_sort_sink_operator.h | 1 - be/src/pipeline/pipeline.cpp | 11 +++++++++- be/src/pipeline/pipeline.h | 17 +++++++++++++++- be/src/pipeline/pipeline_fragment_context.cpp | 4 ++-- be/src/pipeline/pipeline_fragment_context.h | 2 +- be/src/pipeline/pipeline_task.h | 4 ++++ be/src/pipeline/pipeline_x/operator.h | 1 + .../pipeline_x_fragment_context.cpp | 20 +++++++++++++++++-- .../pipeline_x/pipeline_x_fragment_context.h | 3 +++ .../pipeline/pipeline_x/pipeline_x_task.cpp | 3 +-- be/src/pipeline/pipeline_x/pipeline_x_task.h | 4 +++- be/src/pipeline/task_scheduler.cpp | 2 +- be/src/runtime/runtime_filter_mgr.cpp | 1 - 15 files changed, 67 insertions(+), 17 deletions(-) diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 33e64017d5066b..8f13b7dae4aabb 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -122,6 +122,9 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* state) { } Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_status) { + if (_closed) { + return Status::OK(); + } auto p = _parent->cast(); Defer defer {[&]() { if (_should_build_hash_table && p._shared_hashtable_controller) { @@ -129,8 +132,8 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu } }}; - if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled()) { - return Status::OK(); + if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled() || !_eos) { + return Base::close(state, exec_status); } auto* block = _shared_state->build_block.get(); uint64_t hash_table_size = block ? block->rows() : 0; @@ -148,7 +151,7 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu SCOPED_TIMER(_publish_runtime_filter_timer); RETURN_IF_ERROR(_runtime_filter_slots->publish(!_should_build_hash_table)); - return Status::OK(); + return Base::close(state, exec_status); } bool HashJoinBuildSinkLocalState::build_unique() const { @@ -519,6 +522,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); + local_state._eos = eos; if (local_state._should_build_hash_table) { // If eos or have already met a null value using short-circuit strategy, we do not need to pull // data from probe side. diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 8046c68ca453fa..dacebe5239064e 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -258,7 +258,6 @@ class PartitionedAggSinkLocalState std::unique_ptr _runtime_state; - bool _eos = false; std::shared_ptr _finish_dependency; // temp structures during spilling diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h b/be/src/pipeline/exec/spill_sort_sink_operator.h index fae5fe3270f3a0..7e8b7b36aab593 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.h +++ b/be/src/pipeline/exec/spill_sort_sink_operator.h @@ -55,7 +55,6 @@ class SpillSortSinkLocalState : public PipelineXSpillSinkLocalState _finish_dependency; }; diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp index 8b2123bcce20d4..2f69cca7cc1631 100644 --- a/be/src/pipeline/pipeline.cpp +++ b/be/src/pipeline/pipeline.cpp @@ -22,6 +22,7 @@ #include #include "pipeline/exec/operator.h" +#include "pipeline/pipeline_task.h" namespace doris::pipeline { @@ -99,4 +100,12 @@ Status Pipeline::set_sink(DataSinkOperatorXPtr& sink) { return Status::OK(); } -} // namespace doris::pipeline \ No newline at end of file +void Pipeline::make_all_runnable() { + for (auto* task : _tasks) { + if (task) { + task->clear_blocking_state(true); + } + } +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index 4693f8343fde88..db93e8dfe357ee 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -50,6 +50,7 @@ class Pipeline : public std::enable_shared_from_this { std::weak_ptr context) : _pipeline_id(pipeline_id), _context(std::move(context)), _num_tasks(num_tasks) { _init_profile(); + _tasks.resize(_num_tasks, nullptr); } void add_dependency(std::shared_ptr& pipeline) { @@ -155,14 +156,24 @@ class Pipeline : public std::enable_shared_from_this { void set_children(std::shared_ptr child) { _children.push_back(child); } void set_children(std::vector> children) { _children = children; } - void incr_created_tasks() { _num_tasks_created++; } + void incr_created_tasks(int i, PipelineTask* task) { + _num_tasks_created++; + _num_tasks_running++; + DCHECK_LT(i, _tasks.size()); + _tasks[i] = task; + } + + void make_all_runnable(); + void set_num_tasks(int num_tasks) { _num_tasks = num_tasks; + _tasks.resize(_num_tasks, nullptr); for (auto& op : operatorXs) { op->set_parallel_tasks(_num_tasks); } } int num_tasks() const { return _num_tasks; } + bool close_task() { return _num_tasks_running.fetch_sub(1) == 1; } std::string debug_string() { fmt::memory_buffer debug_string_buffer; @@ -243,6 +254,10 @@ class Pipeline : public std::enable_shared_from_this { int _num_tasks = 1; // How many tasks are already created? std::atomic _num_tasks_created = 0; + // How many tasks are already created and not finished? + std::atomic _num_tasks_running = 0; + // Tasks in this pipeline. + std::vector _tasks; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 550e60c210c080..e514a5d6b975ac 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -813,7 +813,7 @@ void PipelineFragmentContext::close_if_prepare_failed(Status /*st*/) { DCHECK(!task->is_pending_finish()); WARN_IF_ERROR(task->close(Status::OK()), fmt::format("Query {} closed since prepare failed", print_id(_query_id))); - close_a_pipeline(); + close_a_pipeline(task->pipeline_id()); } } @@ -960,7 +960,7 @@ void PipelineFragmentContext::_close_fragment_instance() { std::dynamic_pointer_cast(shared_from_this())); } -void PipelineFragmentContext::close_a_pipeline() { +void PipelineFragmentContext::close_a_pipeline(PipelineId pipeline_id) { std::lock_guard l(_task_mutex); g_pipeline_tasks_count << -1; ++_closed_tasks; diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index f60cb80359ab32..ce9b10be4ffac0 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -110,7 +110,7 @@ class PipelineFragmentContext : public TaskExecutionContext { [[nodiscard]] int get_fragment_id() const { return _fragment_id; } - void close_a_pipeline(); + virtual void close_a_pipeline(PipelineId pipeline_id); virtual void clear_finished_tasks() {} diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 47f96c5f8fa64f..661b16c99ed88e 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -291,6 +291,10 @@ class PipelineTask { std::string task_name() const { return fmt::format("task{}({})", _index, _pipeline->_name); } + PipelineId pipeline_id() const { return _pipeline->id(); } + + virtual void clear_blocking_state(bool wake_up_by_downstream = false) {} + protected: void _finish_p_dependency() { for (const auto& p : _pipeline->_parents) { diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index 72f47b576f6e15..a9f37d63831e3e 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -536,6 +536,7 @@ class PipelineXSinkLocalStateBase { // Set to true after close() has been called. subclasses should check and set this in // close(). bool _closed = false; + std::atomic _eos = false; //NOTICE: now add a faker profile, because sometimes the profile record is useless //so we want remove some counters and timers, eg: in join node, if it's broadcast_join //and shared hash table, some counter/timer about build hash table is useless, diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index ac527ed8e69888..1a940405f46b52 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -127,6 +127,8 @@ PipelineXFragmentContext::~PipelineXFragmentContext() { } else { _call_back(nullptr, &st); } + _dag.clear(); + _pip_id_to_pipeline.clear(); _runtime_state.reset(); _runtime_filter_states.clear(); _runtime_filter_mgr_map.clear(); @@ -525,6 +527,7 @@ Status PipelineXFragmentContext::_build_pipeline_x_tasks( _task_runtime_states.resize(_pipelines.size()); for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { _task_runtime_states[pip_idx].resize(_pipelines[pip_idx]->num_tasks()); + _pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get(); } auto& pipeline_id_to_profile = _runtime_state->pipeline_id_to_profile(); DCHECK(pipeline_id_to_profile.empty()); @@ -638,6 +641,7 @@ Status PipelineXFragmentContext::_build_pipeline_x_tasks( task_runtime_state.get(), ctx, pipeline_id_to_profile[pip_idx].get(), get_local_exchange_state(pipeline), i); + pipeline->incr_created_tasks(i, task.get()); pipeline_id_to_task.insert({pipeline->id(), task.get()}); _tasks[i].emplace_back(std::move(task)); } @@ -737,7 +741,6 @@ Status PipelineXFragmentContext::_build_pipeline_x_tasks( } } _pipeline_parent_map.clear(); - _dag.clear(); _op_id_to_le_state.clear(); return Status::OK(); @@ -1495,7 +1498,7 @@ void PipelineXFragmentContext::close_if_prepare_failed(Status st) { for (auto& t : task) { DCHECK(!t->is_pending_finish()); WARN_IF_ERROR(t->close(Status::OK()), "close_if_prepare_failed failed: "); - close_a_pipeline(); + close_a_pipeline(t->pipeline_id()); } } _query_ctx->cancel(st.to_string(), st, _fragment_id); @@ -1584,4 +1587,17 @@ std::string PipelineXFragmentContext::debug_string() { return fmt::to_string(debug_string_buffer); } + +void PipelineXFragmentContext::close_a_pipeline(PipelineId pipeline_id) { + // If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here + DCHECK(_pip_id_to_pipeline.contains(pipeline_id)); + if (_pip_id_to_pipeline[pipeline_id]->close_task()) { + if (_dag.contains(pipeline_id)) { + for (auto dep : _dag[pipeline_id]) { + _pip_id_to_pipeline[dep]->make_all_runnable(); + } + } + } + PipelineXFragmentContext::close_a_pipeline(pipeline_id); +} } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index 8a09f64ac6b902..60ac22a429e7cc 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -116,6 +116,8 @@ class PipelineXFragmentContext : public PipelineFragmentContext { std::string debug_string() override; + void close_a_pipeline(PipelineId pipeline_id) override; + private: void _close_fragment_instance() override; Status _build_pipeline_x_tasks(const doris::TPipelineFragmentParams& request, @@ -222,6 +224,7 @@ class PipelineXFragmentContext : public PipelineFragmentContext { std::map, std::shared_ptr>> _op_id_to_le_state; + std::map _pip_id_to_pipeline; // UniqueId -> runtime mgr std::map> _runtime_filter_mgr_map; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 00464e59f21711..b8965e6b21def9 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -66,7 +66,6 @@ PipelineXTask::PipelineXTask( if (shared_state) { _sink_shared_state = shared_state; } - pipeline->incr_created_tasks(); } Status PipelineXTask::prepare(const TPipelineInstanceParams& local_params, const TDataSink& tsink, @@ -248,7 +247,7 @@ Status PipelineXTask::execute(bool* eos) { cpu_qs->add_cpu_nanos(delta_cpu_time); } }}; - *eos = _sink->is_finished(_state); + *eos = _sink->is_finished(_state) || _wake_up_by_downstream || is_final_state(_cur_state); if (*eos) { return Status::OK(); } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index 47746b76fb0194..fa8889450dc507 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -138,7 +138,8 @@ class PipelineXTask : public PipelineTask { int task_id() const { return _index; }; - void clear_blocking_state() { + void clear_blocking_state(bool wake_up_by_downstream = false) override { + _wake_up_by_downstream = _wake_up_by_downstream || wake_up_by_downstream; _state->get_query_ctx()->get_execution_dependency()->set_always_ready(); // We use a lock to assure all dependencies are not deconstructed here. std::unique_lock lc(_dependency_lock); @@ -252,6 +253,7 @@ class PipelineXTask : public PipelineTask { std::atomic _finished {false}; std::mutex _dependency_lock; + std::atomic _wake_up_by_downstream = false; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index de697469575bc9..93cf2d9dd993b6 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -258,7 +258,7 @@ void _close_task(PipelineTask* task, PipelineTaskState state, Status exec_status task->set_close_pipeline_time(); task->finalize(); task->set_running(false); - task->fragment_context()->close_a_pipeline(); + task->fragment_context()->close_a_pipeline(task->pipeline_id()); } void TaskScheduler::_do_work(size_t index) { diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 24baf9b6c971e4..bc3416d053aff8 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -166,7 +166,6 @@ Status RuntimeFilterMgr::get_local_merge_producer_filters( } *local_merge_filters = &iter->second; DCHECK(!iter->second.filters.empty()); - DCHECK_GT(iter->second.merge_time, 0); return Status::OK(); } From 3b6edc52d7fb2b4b6507686fea9dd6dbf1ad72b7 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 27 Sep 2024 17:23:19 +0800 Subject: [PATCH 2/5] [fix](shuffle) Fix remaining tasks if all tasks are running on single BE (#41350) --- .../pipeline/exec/exchange_sink_operator.cpp | 35 +++++++++++-------- be/src/pipeline/exec/exchange_sink_operator.h | 4 ++- 2 files changed, 24 insertions(+), 15 deletions(-) diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index e4150b4f7ac68e..236ebe7a5aac1d 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -187,13 +187,17 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { PUniqueId id; id.set_hi(_state->query_id().hi); id.set_lo(_state->query_id().lo); - _sink_buffer = std::make_unique>( - id, p._dest_node_id, _sender_id, _state->be_number(), state, this); - register_channels(_sink_buffer.get()); - _queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), - "ExchangeSinkQueueDependency", true); - _sink_buffer->set_dependency(_queue_dependency, _finish_dependency); + if (!only_local_exchange) { + _sink_buffer = std::make_unique>( + id, p._dest_node_id, _sender_id, _state->be_number(), state, this); + register_channels(_sink_buffer.get()); + _queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + "ExchangeSinkQueueDependency", true); + _sink_buffer->set_dependency(_queue_dependency, _finish_dependency); + _finish_dependency->block(); + } + if ((_part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) && !only_local_exchange) { _broadcast_dependency = Dependency::create_shared( @@ -298,7 +302,6 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { fmt::format("Crc32HashPartitioner({})", _partition_count)); } - _finish_dependency->block(); if (_part_type == TPartitionType::HASH_PARTITIONED || _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED || _part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) { @@ -592,8 +595,9 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block final_st = st; } } - local_state._sink_buffer->set_should_stop(); - return final_st; + if (local_state._sink_buffer) { + local_state._sink_buffer->set_should_stop(); + } } return final_st; } @@ -679,11 +683,14 @@ Status ExchangeSinkOperatorX::channel_add_rows_with_idx( std::string ExchangeSinkLocalState::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level)); - fmt::format_to(debug_string_buffer, - ", Sink Buffer: (_should_stop = {}, _busy_channels = {}, _is_finishing = {}), " - "_reach_limit: {}", - _sink_buffer->_should_stop.load(), _sink_buffer->_busy_channels.load(), - _sink_buffer->_is_finishing.load(), _reach_limit.load()); + if (_sink_buffer) { + fmt::format_to( + debug_string_buffer, + ", Sink Buffer: (_should_stop = {}, _busy_channels = {}, _is_finishing = {}), " + "_reach_limit: {}", + _sink_buffer->_should_stop.load(), _sink_buffer->_busy_channels.load(), + _sink_buffer->_is_finishing.load(), _reach_limit.load()); + } return fmt::to_string(debug_string_buffer); } diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 5a7b8bf4201c27..a8abdcd71f9fa3 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -96,7 +96,9 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { std::vector dependencies() const override { std::vector dep_vec; - dep_vec.push_back(_queue_dependency.get()); + if (_queue_dependency) { + dep_vec.push_back(_queue_dependency.get()); + } if (_broadcast_dependency) { dep_vec.push_back(_broadcast_dependency.get()); } From 17fd5a87da61037399be4d35da64b758708c0bde Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 9 Oct 2024 11:21:27 +0800 Subject: [PATCH 3/5] [local exchange](fix) Fix correctness caused by local exchange (#41555) For plan `local exchange (hash shuffle) -> union -> colocated agg`, we must ensure local exchange use the same hash algorithm as MPP shuffling. This problem is covered by our test cases but only can be reproduced on multiple BEs so no case is added in this PR. --- .../pipeline/exec/aggregation_sink_operator.h | 2 +- be/src/pipeline/exec/analytic_sink_operator.h | 2 +- .../distinct_streaming_aggregation_operator.h | 2 +- be/src/pipeline/exec/hashjoin_build_sink.h | 2 +- .../pipeline/exec/hashjoin_probe_operator.h | 2 +- be/src/pipeline/exec/operator.h | 11 +++-- .../partitioned_hash_join_probe_operator.h | 2 +- .../partitioned_hash_join_sink_operator.h | 2 +- be/src/pipeline/exec/sort_sink_operator.h | 2 +- be/src/pipeline/exec/union_sink_operator.h | 6 +++ be/src/pipeline/exec/union_source_operator.h | 5 +++ be/src/pipeline/pipeline_x/operator.h | 4 -- .../pipeline_x_fragment_context.cpp | 43 ++++++++++--------- be/src/pipeline/pipeline_x/pipeline_x_task.h | 2 +- 14 files changed, 50 insertions(+), 37 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index de1f26057ff185..e082d803bcb238 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -164,7 +164,7 @@ class AggSinkOperatorX final : public DataSinkOperatorX { ? DataDistribution(ExchangeType::PASSTHROUGH) : DataSinkOperatorX::required_data_distribution(); } - return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join + return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index eb65414206c5b4..0269ba15be04da 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -102,7 +102,7 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX { Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution() const override { if (_is_analytic_sort) { - return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join + return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } else if (_merge_by_exchange) { diff --git a/be/src/pipeline/exec/union_sink_operator.h b/be/src/pipeline/exec/union_sink_operator.h index 97b704078c63ec..b2c1c414314c25 100644 --- a/be/src/pipeline/exec/union_sink_operator.h +++ b/be/src/pipeline/exec/union_sink_operator.h @@ -125,6 +125,12 @@ class UnionSinkOperatorX final : public DataSinkOperatorX { } } + bool require_shuffled_data_distribution() const override { + return _followed_by_shuffled_operator; + } + + bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; } + private: int _get_first_materialized_child_idx() const { return _first_materialized_child_idx; } diff --git a/be/src/pipeline/exec/union_source_operator.h b/be/src/pipeline/exec/union_source_operator.h index 60530521ec0a82..0c29374e690960 100644 --- a/be/src/pipeline/exec/union_source_operator.h +++ b/be/src/pipeline/exec/union_source_operator.h @@ -135,6 +135,11 @@ class UnionSourceOperatorX final : public OperatorX { return Status::OK(); } [[nodiscard]] int get_child_count() const { return _child_size; } + bool require_shuffled_data_distribution() const override { + return _followed_by_shuffled_operator; + } + + bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; } private: bool _has_data(RuntimeState* state) const { diff --git a/be/src/pipeline/pipeline_x/operator.h b/be/src/pipeline/pipeline_x/operator.h index a9f37d63831e3e..aed7d069c1af32 100644 --- a/be/src/pipeline/pipeline_x/operator.h +++ b/be/src/pipeline/pipeline_x/operator.h @@ -231,8 +231,6 @@ class OperatorXBase : public OperatorBase { [[nodiscard]] virtual bool can_terminate_early(RuntimeState* state) { return false; } - [[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; } - bool can_read() override { LOG(FATAL) << "should not reach here!"; return false; @@ -628,8 +626,6 @@ class DataSinkOperatorXBase : public OperatorBase { : DataDistribution(ExchangeType::NOOP); } - [[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; } - Status close(RuntimeState* state) override { return Status::InternalError("Should not reach here!"); } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 1a940405f46b52..f4b52d14d8f592 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -774,7 +774,7 @@ Status PipelineXFragmentContext::_create_tree_helper( ObjectPool* pool, const std::vector& tnodes, const doris::TPipelineFragmentParams& request, const DescriptorTbl& descs, OperatorXPtr parent, int* node_idx, OperatorXPtr* root, PipelinePtr& cur_pipe, - int child_idx, const bool followed_by_shuffled_join) { + int child_idx, const bool followed_by_shuffled_operator) { // propagate error case if (*node_idx >= tnodes.size()) { // TODO: print thrift msg @@ -785,11 +785,11 @@ Status PipelineXFragmentContext::_create_tree_helper( const TPlanNode& tnode = tnodes[*node_idx]; int num_children = tnodes[*node_idx].num_children; - bool current_followed_by_shuffled_join = followed_by_shuffled_join; + bool current_followed_by_shuffled_operator = followed_by_shuffled_operator; OperatorXPtr op = nullptr; RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], request, descs, op, cur_pipe, parent == nullptr ? -1 : parent->node_id(), child_idx, - followed_by_shuffled_join)); + followed_by_shuffled_operator)); // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr)); if (parent != nullptr) { @@ -800,7 +800,7 @@ Status PipelineXFragmentContext::_create_tree_helper( } /** - * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled hash join. + * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators). * * For plan: * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2) @@ -814,8 +814,8 @@ Status PipelineXFragmentContext::_create_tree_helper( cur_pipe->operator_xs().empty() ? cur_pipe->sink_x()->require_shuffled_data_distribution() : op->require_shuffled_data_distribution(); - current_followed_by_shuffled_join = - (followed_by_shuffled_join || op->is_shuffled_hash_join()) && + current_followed_by_shuffled_operator = + (followed_by_shuffled_operator || op->is_shuffled_operator()) && require_shuffled_data_distribution; cur_pipe->_name.push_back('-'); @@ -826,7 +826,7 @@ Status PipelineXFragmentContext::_create_tree_helper( for (int i = 0; i < num_children; i++) { ++*node_idx; RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, request, descs, op, node_idx, nullptr, - cur_pipe, i, current_followed_by_shuffled_join)); + cur_pipe, i, current_followed_by_shuffled_operator)); // we are expecting a child, but have used all nodes // this means we have been given a bad tree and must fail @@ -868,13 +868,13 @@ Status PipelineXFragmentContext::_add_local_exchange_impl( * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment. * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`. */ - const bool followed_by_shuffled_join = - operator_xs.size() > idx ? operator_xs[idx]->followed_by_shuffled_join() - : cur_pipe->sink_x()->followed_by_shuffled_join(); + const bool followed_by_shuffled_operator = + operator_xs.size() > idx ? operator_xs[idx]->followed_by_shuffled_operator() + : cur_pipe->sink_x()->followed_by_shuffled_operator(); const bool should_disable_bucket_shuffle = bucket_seq_to_instance_idx.empty() && shuffle_idx_to_instance_idx.find(-1) == shuffle_idx_to_instance_idx.end() && - followed_by_shuffled_join; + followed_by_shuffled_operator; sink.reset(new LocalExchangeSinkOperatorX( sink_id, local_exchange_id, should_disable_bucket_shuffle ? _total_instances : _num_instances, @@ -1050,7 +1050,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN const DescriptorTbl& descs, OperatorXPtr& op, PipelinePtr& cur_pipe, int parent_idx, int child_idx, - const bool followed_by_shuffled_join) { + const bool followed_by_shuffled_operator) { // We directly construct the operator from Thrift because the given array is in the order of preorder traversal. // Therefore, here we need to use a stack-like structure. _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx); @@ -1124,7 +1124,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs, _require_bucket_distribution)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); - op->set_followed_by_shuffled_join(followed_by_shuffled_join); + op->set_followed_by_shuffled_operator(followed_by_shuffled_operator); _require_bucket_distribution = _require_bucket_distribution || op->require_data_distribution(); } else if (tnode.agg_node.__isset.use_streaming_preaggregation && @@ -1155,7 +1155,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); } - sink->set_followed_by_shuffled_join(followed_by_shuffled_join); + sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator); _require_bucket_distribution = _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); @@ -1206,8 +1206,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _pipeline_parent_map.push(op->node_id(), cur_pipe); _pipeline_parent_map.push(op->node_id(), build_side_pipe); - sink->set_followed_by_shuffled_join(sink->is_shuffled_hash_join()); - op->set_followed_by_shuffled_join(op->is_shuffled_hash_join()); + sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator()); + op->set_followed_by_shuffled_operator(op->is_shuffled_operator()); } else { op.reset(new HashJoinProbeOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); @@ -1228,8 +1228,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _pipeline_parent_map.push(op->node_id(), cur_pipe); _pipeline_parent_map.push(op->node_id(), build_side_pipe); - sink->set_followed_by_shuffled_join(sink->is_shuffled_hash_join()); - op->set_followed_by_shuffled_join(op->is_shuffled_hash_join()); + sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator()); + op->set_followed_by_shuffled_operator(op->is_shuffled_operator()); } _require_bucket_distribution = _require_bucket_distribution || op->require_data_distribution(); @@ -1259,6 +1259,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN case TPlanNodeType::UNION_NODE: { int child_count = tnode.num_children; op.reset(new UnionSourceOperatorX(pool, tnode, next_operator_id(), descs)); + op->set_followed_by_shuffled_operator(_require_bucket_distribution); RETURN_IF_ERROR(cur_pipe->add_operator(op)); const auto downstream_pipeline_id = cur_pipe->id(); @@ -1301,7 +1302,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); } - sink->set_followed_by_shuffled_join(followed_by_shuffled_join); + sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator); _require_bucket_distribution = _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); @@ -1341,7 +1342,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN DataSinkOperatorXPtr sink; sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); - sink->set_followed_by_shuffled_join(followed_by_shuffled_join); + sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator); _require_bucket_distribution = _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); @@ -1352,11 +1353,13 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN case TPlanNodeType::INTERSECT_NODE: { RETURN_IF_ERROR(_build_operators_for_set_operation_node( pool, tnode, descs, op, cur_pipe, parent_idx, child_idx)); + op->set_followed_by_shuffled_operator(_require_bucket_distribution); break; } case TPlanNodeType::EXCEPT_NODE: { RETURN_IF_ERROR(_build_operators_for_set_operation_node( pool, tnode, descs, op, cur_pipe, parent_idx, child_idx)); + op->set_followed_by_shuffled_operator(_require_bucket_distribution); break; } case TPlanNodeType::REPEAT_NODE: { diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index fa8889450dc507..23c2c46469232d 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -139,10 +139,10 @@ class PipelineXTask : public PipelineTask { int task_id() const { return _index; }; void clear_blocking_state(bool wake_up_by_downstream = false) override { - _wake_up_by_downstream = _wake_up_by_downstream || wake_up_by_downstream; _state->get_query_ctx()->get_execution_dependency()->set_always_ready(); // We use a lock to assure all dependencies are not deconstructed here. std::unique_lock lc(_dependency_lock); + _wake_up_by_downstream = _wake_up_by_downstream || wake_up_by_downstream; if (!_finished) { _execution_dep->set_always_ready(); for (auto* dep : _filter_dependencies) { From 355463cfef2b17b0ec4ae62086995e06849f1c87 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 9 Oct 2024 17:22:12 +0800 Subject: [PATCH 4/5] [minor](log) Delete useless logics checking (#41589) --- .../local_exchange_sink_operator.cpp | 18 ++---------------- .../local_exchange_sink_operator.h | 2 -- 2 files changed, 2 insertions(+), 18 deletions(-) diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp index 78c2761dcc783e..87706ce68fb917 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp @@ -47,26 +47,14 @@ Status LocalExchangeSinkLocalState::open(RuntimeState* state) { return Status::OK(); } -Status LocalExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) { - if (_closed) { - return Status::OK(); - } - RETURN_IF_ERROR(Base::close(state, exec_status)); - if (exec_status.ok()) { - DCHECK(_release_count) << "Do not finish correctly! " << debug_string(0); - } - return Status::OK(); -} - std::string LocalExchangeSinkLocalState::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "{}, _channel_id: {}, _num_partitions: {}, _num_senders: {}, _num_sources: {}, " - "_running_sink_operators: {}, _running_source_operators: {}, _release_count: {}", + "_running_sink_operators: {}, _running_source_operators: {}", Base::debug_string(indentation_level), _channel_id, _exchanger->_num_partitions, _exchanger->_num_senders, _exchanger->_num_sources, - _exchanger->_running_sink_operators, _exchanger->_running_source_operators, - _release_count); + _exchanger->_running_sink_operators, _exchanger->_running_source_operators); return fmt::to_string(debug_string_buffer); } @@ -113,13 +101,11 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* // If all exchange sources ended due to limit reached, current task should also finish if (local_state._exchanger->_running_source_operators == 0) { - local_state._release_count = true; local_state._shared_state->sub_running_sink_operators(); return Status::EndOfFile("receiver eof"); } if (eos) { local_state._shared_state->sub_running_sink_operators(); - local_state._release_count = true; } return Status::OK(); diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h index f1d60fc03c4360..49d320c0537a03 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h @@ -38,7 +38,6 @@ class LocalExchangeSinkLocalState final : public PipelineXSinkLocalState Date: Wed, 9 Oct 2024 17:21:56 +0800 Subject: [PATCH 5/5] [fix](schema scan) Finish schema scanner if limitation is reached (#41592) --- be/src/pipeline/exec/schema_scan_operator.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp b/be/src/pipeline/exec/schema_scan_operator.cpp index d5353655ab070a..73e54d52be25ca 100644 --- a/be/src/pipeline/exec/schema_scan_operator.cpp +++ b/be/src/pipeline/exec/schema_scan_operator.cpp @@ -285,6 +285,9 @@ Status SchemaScanOperatorX::get_block(RuntimeState* state, vectorized::Block* bl } while (block->rows() == 0 && !*eos); local_state.reached_limit(block, eos); + if (*eos) { + local_state._finish_dependency->set_always_ready(); + } return Status::OK(); }