diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index de1f26057ff185..e082d803bcb238 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -164,7 +164,7 @@ class AggSinkOperatorX final : public DataSinkOperatorX { ? DataDistribution(ExchangeType::PASSTHROUGH) : DataSinkOperatorX::required_data_distribution(); } - return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join + return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index eb65414206c5b4..0269ba15be04da 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -102,7 +102,7 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorXquery_id().hi); id.set_lo(_state->query_id().lo); - _sink_buffer = std::make_unique>( - id, p._dest_node_id, _sender_id, _state->be_number(), state, this); - register_channels(_sink_buffer.get()); - _queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), - "ExchangeSinkQueueDependency", true); - _sink_buffer->set_dependency(_queue_dependency, _finish_dependency); + if (!only_local_exchange) { + _sink_buffer = std::make_unique>( + id, p._dest_node_id, _sender_id, _state->be_number(), state, this); + register_channels(_sink_buffer.get()); + _queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), + "ExchangeSinkQueueDependency", true); + _sink_buffer->set_dependency(_queue_dependency, _finish_dependency); + _finish_dependency->block(); + } + if ((_part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) && !only_local_exchange) { _broadcast_dependency = Dependency::create_shared( @@ -298,7 +302,6 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) { fmt::format("Crc32HashPartitioner({})", _partition_count)); } - _finish_dependency->block(); if (_part_type == TPartitionType::HASH_PARTITIONED || _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED || _part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) { @@ -592,8 +595,9 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block final_st = st; } } - local_state._sink_buffer->set_should_stop(); - return final_st; + if (local_state._sink_buffer) { + local_state._sink_buffer->set_should_stop(); + } } return final_st; } @@ -679,11 +683,14 @@ Status ExchangeSinkOperatorX::channel_add_rows_with_idx( std::string ExchangeSinkLocalState::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level)); - fmt::format_to(debug_string_buffer, - ", Sink Buffer: (_should_stop = {}, _busy_channels = {}, _is_finishing = {}), " - "_reach_limit: {}", - _sink_buffer->_should_stop.load(), _sink_buffer->_busy_channels.load(), - _sink_buffer->_is_finishing.load(), _reach_limit.load()); + if (_sink_buffer) { + fmt::format_to( + debug_string_buffer, + ", Sink Buffer: (_should_stop = {}, _busy_channels = {}, _is_finishing = {}), " + "_reach_limit: {}", + _sink_buffer->_should_stop.load(), _sink_buffer->_busy_channels.load(), + _sink_buffer->_is_finishing.load(), _reach_limit.load()); + } return fmt::to_string(debug_string_buffer); } diff --git a/be/src/pipeline/exec/exchange_sink_operator.h b/be/src/pipeline/exec/exchange_sink_operator.h index 5a7b8bf4201c27..a8abdcd71f9fa3 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.h +++ b/be/src/pipeline/exec/exchange_sink_operator.h @@ -96,7 +96,9 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> { std::vector dependencies() const override { std::vector dep_vec; - dep_vec.push_back(_queue_dependency.get()); + if (_queue_dependency) { + dep_vec.push_back(_queue_dependency.get()); + } if (_broadcast_dependency) { dep_vec.push_back(_broadcast_dependency.get()); } diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 33e64017d5066b..8f13b7dae4aabb 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -122,6 +122,9 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* state) { } Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_status) { + if (_closed) { + return Status::OK(); + } auto p = _parent->cast(); Defer defer {[&]() { if (_should_build_hash_table && p._shared_hashtable_controller) { @@ -129,8 +132,8 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu } }}; - if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled()) { - return Status::OK(); + if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled() || !_eos) { + return Base::close(state, exec_status); } auto* block = _shared_state->build_block.get(); uint64_t hash_table_size = block ? block->rows() : 0; @@ -148,7 +151,7 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu SCOPED_TIMER(_publish_runtime_filter_timer); RETURN_IF_ERROR(_runtime_filter_slots->publish(!_should_build_hash_table)); - return Status::OK(); + return Base::close(state, exec_status); } bool HashJoinBuildSinkLocalState::build_unique() const { @@ -519,6 +522,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block* SCOPED_TIMER(local_state.exec_time_counter()); COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); + local_state._eos = eos; if (local_state._should_build_hash_table) { // If eos or have already met a null value using short-circuit strategy, we do not need to pull // data from probe side. diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index dd9383987ec7ef..3b55dc5e44d3c1 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -167,7 +167,7 @@ class HashJoinBuildSinkOperatorX final bool require_shuffled_data_distribution() const override { return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_broadcast_join; } - bool is_shuffled_hash_join() const override { + bool is_shuffled_operator() const override { return _join_distribution == TJoinDistributionType::PARTITIONED; } bool require_data_distribution() const override { diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index f4b1a2c491f133..3c75a8d10dea8b 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -178,7 +178,7 @@ class HashJoinProbeOperatorX final : public JoinProbeOperatorX _runtime_state; - bool _eos = false; std::shared_ptr _finish_dependency; // temp structures during spilling diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index b19537741a8e68..04320177a20540 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -172,7 +172,7 @@ class PartitionedHashJoinProbeOperatorX final bool require_shuffled_data_distribution() const override { return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN; } - bool is_shuffled_hash_join() const override { + bool is_shuffled_operator() const override { return _join_distribution == TJoinDistributionType::PARTITIONED; } diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index f3b36086799b20..2b69c1433695d0 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -122,7 +122,7 @@ class PartitionedHashJoinSinkOperatorX bool require_shuffled_data_distribution() const override { return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN; } - bool is_shuffled_hash_join() const override { + bool is_shuffled_operator() const override { return _join_distribution == TJoinDistributionType::PARTITIONED; } diff --git a/be/src/pipeline/exec/schema_scan_operator.cpp b/be/src/pipeline/exec/schema_scan_operator.cpp index d5353655ab070a..73e54d52be25ca 100644 --- a/be/src/pipeline/exec/schema_scan_operator.cpp +++ b/be/src/pipeline/exec/schema_scan_operator.cpp @@ -285,6 +285,9 @@ Status SchemaScanOperatorX::get_block(RuntimeState* state, vectorized::Block* bl } while (block->rows() == 0 && !*eos); local_state.reached_limit(block, eos); + if (*eos) { + local_state._finish_dependency->set_always_ready(); + } return Status::OK(); } diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index ae4823c55d0f91..d13c7abc44bb9a 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -87,7 +87,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX { Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution() const override { if (_is_analytic_sort) { - return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join + return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } else if (_merge_by_exchange) { diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h b/be/src/pipeline/exec/spill_sort_sink_operator.h index fae5fe3270f3a0..7e8b7b36aab593 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.h +++ b/be/src/pipeline/exec/spill_sort_sink_operator.h @@ -55,7 +55,6 @@ class SpillSortSinkLocalState : public PipelineXSpillSinkLocalState _finish_dependency; }; diff --git a/be/src/pipeline/exec/union_sink_operator.h b/be/src/pipeline/exec/union_sink_operator.h index 97b704078c63ec..b2c1c414314c25 100644 --- a/be/src/pipeline/exec/union_sink_operator.h +++ b/be/src/pipeline/exec/union_sink_operator.h @@ -125,6 +125,12 @@ class UnionSinkOperatorX final : public DataSinkOperatorX { } } + bool require_shuffled_data_distribution() const override { + return _followed_by_shuffled_operator; + } + + bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; } + private: int _get_first_materialized_child_idx() const { return _first_materialized_child_idx; } diff --git a/be/src/pipeline/exec/union_source_operator.h b/be/src/pipeline/exec/union_source_operator.h index 60530521ec0a82..0c29374e690960 100644 --- a/be/src/pipeline/exec/union_source_operator.h +++ b/be/src/pipeline/exec/union_source_operator.h @@ -135,6 +135,11 @@ class UnionSourceOperatorX final : public OperatorX { return Status::OK(); } [[nodiscard]] int get_child_count() const { return _child_size; } + bool require_shuffled_data_distribution() const override { + return _followed_by_shuffled_operator; + } + + bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; } private: bool _has_data(RuntimeState* state) const { diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp index 8b2123bcce20d4..2f69cca7cc1631 100644 --- a/be/src/pipeline/pipeline.cpp +++ b/be/src/pipeline/pipeline.cpp @@ -22,6 +22,7 @@ #include #include "pipeline/exec/operator.h" +#include "pipeline/pipeline_task.h" namespace doris::pipeline { @@ -99,4 +100,12 @@ Status Pipeline::set_sink(DataSinkOperatorXPtr& sink) { return Status::OK(); } -} // namespace doris::pipeline \ No newline at end of file +void Pipeline::make_all_runnable() { + for (auto* task : _tasks) { + if (task) { + task->clear_blocking_state(true); + } + } +} + +} // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index 4693f8343fde88..db93e8dfe357ee 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -50,6 +50,7 @@ class Pipeline : public std::enable_shared_from_this { std::weak_ptr context) : _pipeline_id(pipeline_id), _context(std::move(context)), _num_tasks(num_tasks) { _init_profile(); + _tasks.resize(_num_tasks, nullptr); } void add_dependency(std::shared_ptr& pipeline) { @@ -155,14 +156,24 @@ class Pipeline : public std::enable_shared_from_this { void set_children(std::shared_ptr child) { _children.push_back(child); } void set_children(std::vector> children) { _children = children; } - void incr_created_tasks() { _num_tasks_created++; } + void incr_created_tasks(int i, PipelineTask* task) { + _num_tasks_created++; + _num_tasks_running++; + DCHECK_LT(i, _tasks.size()); + _tasks[i] = task; + } + + void make_all_runnable(); + void set_num_tasks(int num_tasks) { _num_tasks = num_tasks; + _tasks.resize(_num_tasks, nullptr); for (auto& op : operatorXs) { op->set_parallel_tasks(_num_tasks); } } int num_tasks() const { return _num_tasks; } + bool close_task() { return _num_tasks_running.fetch_sub(1) == 1; } std::string debug_string() { fmt::memory_buffer debug_string_buffer; @@ -243,6 +254,10 @@ class Pipeline : public std::enable_shared_from_this { int _num_tasks = 1; // How many tasks are already created? std::atomic _num_tasks_created = 0; + // How many tasks are already created and not finished? + std::atomic _num_tasks_running = 0; + // Tasks in this pipeline. + std::vector _tasks; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 550e60c210c080..e514a5d6b975ac 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -813,7 +813,7 @@ void PipelineFragmentContext::close_if_prepare_failed(Status /*st*/) { DCHECK(!task->is_pending_finish()); WARN_IF_ERROR(task->close(Status::OK()), fmt::format("Query {} closed since prepare failed", print_id(_query_id))); - close_a_pipeline(); + close_a_pipeline(task->pipeline_id()); } } @@ -960,7 +960,7 @@ void PipelineFragmentContext::_close_fragment_instance() { std::dynamic_pointer_cast(shared_from_this())); } -void PipelineFragmentContext::close_a_pipeline() { +void PipelineFragmentContext::close_a_pipeline(PipelineId pipeline_id) { std::lock_guard l(_task_mutex); g_pipeline_tasks_count << -1; ++_closed_tasks; diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index f60cb80359ab32..ce9b10be4ffac0 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -110,7 +110,7 @@ class PipelineFragmentContext : public TaskExecutionContext { [[nodiscard]] int get_fragment_id() const { return _fragment_id; } - void close_a_pipeline(); + virtual void close_a_pipeline(PipelineId pipeline_id); virtual void clear_finished_tasks() {} diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 47f96c5f8fa64f..661b16c99ed88e 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -291,6 +291,10 @@ class PipelineTask { std::string task_name() const { return fmt::format("task{}({})", _index, _pipeline->_name); } + PipelineId pipeline_id() const { return _pipeline->id(); } + + virtual void clear_blocking_state(bool wake_up_by_downstream = false) {} + protected: void _finish_p_dependency() { for (const auto& p : _pipeline->_parents) { diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp index 78c2761dcc783e..87706ce68fb917 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp @@ -47,26 +47,14 @@ Status LocalExchangeSinkLocalState::open(RuntimeState* state) { return Status::OK(); } -Status LocalExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) { - if (_closed) { - return Status::OK(); - } - RETURN_IF_ERROR(Base::close(state, exec_status)); - if (exec_status.ok()) { - DCHECK(_release_count) << "Do not finish correctly! " << debug_string(0); - } - return Status::OK(); -} - std::string LocalExchangeSinkLocalState::debug_string(int indentation_level) const { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "{}, _channel_id: {}, _num_partitions: {}, _num_senders: {}, _num_sources: {}, " - "_running_sink_operators: {}, _running_source_operators: {}, _release_count: {}", + "_running_sink_operators: {}, _running_source_operators: {}", Base::debug_string(indentation_level), _channel_id, _exchanger->_num_partitions, _exchanger->_num_senders, _exchanger->_num_sources, - _exchanger->_running_sink_operators, _exchanger->_running_source_operators, - _release_count); + _exchanger->_running_sink_operators, _exchanger->_running_source_operators); return fmt::to_string(debug_string_buffer); } @@ -113,13 +101,11 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* // If all exchange sources ended due to limit reached, current task should also finish if (local_state._exchanger->_running_source_operators == 0) { - local_state._release_count = true; local_state._shared_state->sub_running_sink_operators(); return Status::EndOfFile("receiver eof"); } if (eos) { local_state._shared_state->sub_running_sink_operators(); - local_state._release_count = true; } return Status::OK(); diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h index f1d60fc03c4360..49d320c0537a03 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h @@ -38,7 +38,6 @@ class LocalExchangeSinkLocalState final : public PipelineXSinkLocalState _eos = false; //NOTICE: now add a faker profile, because sometimes the profile record is useless //so we want remove some counters and timers, eg: in join node, if it's broadcast_join //and shared hash table, some counter/timer about build hash table is useless, @@ -627,8 +626,6 @@ class DataSinkOperatorXBase : public OperatorBase { : DataDistribution(ExchangeType::NOOP); } - [[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; } - Status close(RuntimeState* state) override { return Status::InternalError("Should not reach here!"); } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index ac527ed8e69888..f4b52d14d8f592 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -127,6 +127,8 @@ PipelineXFragmentContext::~PipelineXFragmentContext() { } else { _call_back(nullptr, &st); } + _dag.clear(); + _pip_id_to_pipeline.clear(); _runtime_state.reset(); _runtime_filter_states.clear(); _runtime_filter_mgr_map.clear(); @@ -525,6 +527,7 @@ Status PipelineXFragmentContext::_build_pipeline_x_tasks( _task_runtime_states.resize(_pipelines.size()); for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { _task_runtime_states[pip_idx].resize(_pipelines[pip_idx]->num_tasks()); + _pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get(); } auto& pipeline_id_to_profile = _runtime_state->pipeline_id_to_profile(); DCHECK(pipeline_id_to_profile.empty()); @@ -638,6 +641,7 @@ Status PipelineXFragmentContext::_build_pipeline_x_tasks( task_runtime_state.get(), ctx, pipeline_id_to_profile[pip_idx].get(), get_local_exchange_state(pipeline), i); + pipeline->incr_created_tasks(i, task.get()); pipeline_id_to_task.insert({pipeline->id(), task.get()}); _tasks[i].emplace_back(std::move(task)); } @@ -737,7 +741,6 @@ Status PipelineXFragmentContext::_build_pipeline_x_tasks( } } _pipeline_parent_map.clear(); - _dag.clear(); _op_id_to_le_state.clear(); return Status::OK(); @@ -771,7 +774,7 @@ Status PipelineXFragmentContext::_create_tree_helper( ObjectPool* pool, const std::vector& tnodes, const doris::TPipelineFragmentParams& request, const DescriptorTbl& descs, OperatorXPtr parent, int* node_idx, OperatorXPtr* root, PipelinePtr& cur_pipe, - int child_idx, const bool followed_by_shuffled_join) { + int child_idx, const bool followed_by_shuffled_operator) { // propagate error case if (*node_idx >= tnodes.size()) { // TODO: print thrift msg @@ -782,11 +785,11 @@ Status PipelineXFragmentContext::_create_tree_helper( const TPlanNode& tnode = tnodes[*node_idx]; int num_children = tnodes[*node_idx].num_children; - bool current_followed_by_shuffled_join = followed_by_shuffled_join; + bool current_followed_by_shuffled_operator = followed_by_shuffled_operator; OperatorXPtr op = nullptr; RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], request, descs, op, cur_pipe, parent == nullptr ? -1 : parent->node_id(), child_idx, - followed_by_shuffled_join)); + followed_by_shuffled_operator)); // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr)); if (parent != nullptr) { @@ -797,7 +800,7 @@ Status PipelineXFragmentContext::_create_tree_helper( } /** - * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled hash join. + * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators). * * For plan: * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2) @@ -811,8 +814,8 @@ Status PipelineXFragmentContext::_create_tree_helper( cur_pipe->operator_xs().empty() ? cur_pipe->sink_x()->require_shuffled_data_distribution() : op->require_shuffled_data_distribution(); - current_followed_by_shuffled_join = - (followed_by_shuffled_join || op->is_shuffled_hash_join()) && + current_followed_by_shuffled_operator = + (followed_by_shuffled_operator || op->is_shuffled_operator()) && require_shuffled_data_distribution; cur_pipe->_name.push_back('-'); @@ -823,7 +826,7 @@ Status PipelineXFragmentContext::_create_tree_helper( for (int i = 0; i < num_children; i++) { ++*node_idx; RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, request, descs, op, node_idx, nullptr, - cur_pipe, i, current_followed_by_shuffled_join)); + cur_pipe, i, current_followed_by_shuffled_operator)); // we are expecting a child, but have used all nodes // this means we have been given a bad tree and must fail @@ -865,13 +868,13 @@ Status PipelineXFragmentContext::_add_local_exchange_impl( * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment. * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`. */ - const bool followed_by_shuffled_join = - operator_xs.size() > idx ? operator_xs[idx]->followed_by_shuffled_join() - : cur_pipe->sink_x()->followed_by_shuffled_join(); + const bool followed_by_shuffled_operator = + operator_xs.size() > idx ? operator_xs[idx]->followed_by_shuffled_operator() + : cur_pipe->sink_x()->followed_by_shuffled_operator(); const bool should_disable_bucket_shuffle = bucket_seq_to_instance_idx.empty() && shuffle_idx_to_instance_idx.find(-1) == shuffle_idx_to_instance_idx.end() && - followed_by_shuffled_join; + followed_by_shuffled_operator; sink.reset(new LocalExchangeSinkOperatorX( sink_id, local_exchange_id, should_disable_bucket_shuffle ? _total_instances : _num_instances, @@ -1047,7 +1050,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN const DescriptorTbl& descs, OperatorXPtr& op, PipelinePtr& cur_pipe, int parent_idx, int child_idx, - const bool followed_by_shuffled_join) { + const bool followed_by_shuffled_operator) { // We directly construct the operator from Thrift because the given array is in the order of preorder traversal. // Therefore, here we need to use a stack-like structure. _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx); @@ -1121,7 +1124,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs, _require_bucket_distribution)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); - op->set_followed_by_shuffled_join(followed_by_shuffled_join); + op->set_followed_by_shuffled_operator(followed_by_shuffled_operator); _require_bucket_distribution = _require_bucket_distribution || op->require_data_distribution(); } else if (tnode.agg_node.__isset.use_streaming_preaggregation && @@ -1152,7 +1155,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); } - sink->set_followed_by_shuffled_join(followed_by_shuffled_join); + sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator); _require_bucket_distribution = _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); @@ -1203,8 +1206,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _pipeline_parent_map.push(op->node_id(), cur_pipe); _pipeline_parent_map.push(op->node_id(), build_side_pipe); - sink->set_followed_by_shuffled_join(sink->is_shuffled_hash_join()); - op->set_followed_by_shuffled_join(op->is_shuffled_hash_join()); + sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator()); + op->set_followed_by_shuffled_operator(op->is_shuffled_operator()); } else { op.reset(new HashJoinProbeOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); @@ -1225,8 +1228,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _pipeline_parent_map.push(op->node_id(), cur_pipe); _pipeline_parent_map.push(op->node_id(), build_side_pipe); - sink->set_followed_by_shuffled_join(sink->is_shuffled_hash_join()); - op->set_followed_by_shuffled_join(op->is_shuffled_hash_join()); + sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator()); + op->set_followed_by_shuffled_operator(op->is_shuffled_operator()); } _require_bucket_distribution = _require_bucket_distribution || op->require_data_distribution(); @@ -1256,6 +1259,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN case TPlanNodeType::UNION_NODE: { int child_count = tnode.num_children; op.reset(new UnionSourceOperatorX(pool, tnode, next_operator_id(), descs)); + op->set_followed_by_shuffled_operator(_require_bucket_distribution); RETURN_IF_ERROR(cur_pipe->add_operator(op)); const auto downstream_pipeline_id = cur_pipe->id(); @@ -1298,7 +1302,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); } - sink->set_followed_by_shuffled_join(followed_by_shuffled_join); + sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator); _require_bucket_distribution = _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); @@ -1338,7 +1342,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN DataSinkOperatorXPtr sink; sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); - sink->set_followed_by_shuffled_join(followed_by_shuffled_join); + sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator); _require_bucket_distribution = _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); @@ -1349,11 +1353,13 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN case TPlanNodeType::INTERSECT_NODE: { RETURN_IF_ERROR(_build_operators_for_set_operation_node( pool, tnode, descs, op, cur_pipe, parent_idx, child_idx)); + op->set_followed_by_shuffled_operator(_require_bucket_distribution); break; } case TPlanNodeType::EXCEPT_NODE: { RETURN_IF_ERROR(_build_operators_for_set_operation_node( pool, tnode, descs, op, cur_pipe, parent_idx, child_idx)); + op->set_followed_by_shuffled_operator(_require_bucket_distribution); break; } case TPlanNodeType::REPEAT_NODE: { @@ -1495,7 +1501,7 @@ void PipelineXFragmentContext::close_if_prepare_failed(Status st) { for (auto& t : task) { DCHECK(!t->is_pending_finish()); WARN_IF_ERROR(t->close(Status::OK()), "close_if_prepare_failed failed: "); - close_a_pipeline(); + close_a_pipeline(t->pipeline_id()); } } _query_ctx->cancel(st.to_string(), st, _fragment_id); @@ -1584,4 +1590,17 @@ std::string PipelineXFragmentContext::debug_string() { return fmt::to_string(debug_string_buffer); } + +void PipelineXFragmentContext::close_a_pipeline(PipelineId pipeline_id) { + // If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here + DCHECK(_pip_id_to_pipeline.contains(pipeline_id)); + if (_pip_id_to_pipeline[pipeline_id]->close_task()) { + if (_dag.contains(pipeline_id)) { + for (auto dep : _dag[pipeline_id]) { + _pip_id_to_pipeline[dep]->make_all_runnable(); + } + } + } + PipelineXFragmentContext::close_a_pipeline(pipeline_id); +} } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index 8a09f64ac6b902..60ac22a429e7cc 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -116,6 +116,8 @@ class PipelineXFragmentContext : public PipelineFragmentContext { std::string debug_string() override; + void close_a_pipeline(PipelineId pipeline_id) override; + private: void _close_fragment_instance() override; Status _build_pipeline_x_tasks(const doris::TPipelineFragmentParams& request, @@ -222,6 +224,7 @@ class PipelineXFragmentContext : public PipelineFragmentContext { std::map, std::shared_ptr>> _op_id_to_le_state; + std::map _pip_id_to_pipeline; // UniqueId -> runtime mgr std::map> _runtime_filter_mgr_map; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp index 00464e59f21711..b8965e6b21def9 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp @@ -66,7 +66,6 @@ PipelineXTask::PipelineXTask( if (shared_state) { _sink_shared_state = shared_state; } - pipeline->incr_created_tasks(); } Status PipelineXTask::prepare(const TPipelineInstanceParams& local_params, const TDataSink& tsink, @@ -248,7 +247,7 @@ Status PipelineXTask::execute(bool* eos) { cpu_qs->add_cpu_nanos(delta_cpu_time); } }}; - *eos = _sink->is_finished(_state); + *eos = _sink->is_finished(_state) || _wake_up_by_downstream || is_final_state(_cur_state); if (*eos) { return Status::OK(); } diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h b/be/src/pipeline/pipeline_x/pipeline_x_task.h index 47746b76fb0194..23c2c46469232d 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_task.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h @@ -138,10 +138,11 @@ class PipelineXTask : public PipelineTask { int task_id() const { return _index; }; - void clear_blocking_state() { + void clear_blocking_state(bool wake_up_by_downstream = false) override { _state->get_query_ctx()->get_execution_dependency()->set_always_ready(); // We use a lock to assure all dependencies are not deconstructed here. std::unique_lock lc(_dependency_lock); + _wake_up_by_downstream = _wake_up_by_downstream || wake_up_by_downstream; if (!_finished) { _execution_dep->set_always_ready(); for (auto* dep : _filter_dependencies) { @@ -252,6 +253,7 @@ class PipelineXTask : public PipelineTask { std::atomic _finished {false}; std::mutex _dependency_lock; + std::atomic _wake_up_by_downstream = false; }; } // namespace doris::pipeline diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index de697469575bc9..93cf2d9dd993b6 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -258,7 +258,7 @@ void _close_task(PipelineTask* task, PipelineTaskState state, Status exec_status task->set_close_pipeline_time(); task->finalize(); task->set_running(false); - task->fragment_context()->close_a_pipeline(); + task->fragment_context()->close_a_pipeline(task->pipeline_id()); } void TaskScheduler::_do_work(size_t index) { diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 24baf9b6c971e4..bc3416d053aff8 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -166,7 +166,6 @@ Status RuntimeFilterMgr::get_local_merge_producer_filters( } *local_merge_filters = &iter->second; DCHECK(!iter->second.filters.empty()); - DCHECK_GT(iter->second.merge_time, 0); return Status::OK(); }