diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 97440de3f09e4c..7c146c38a2b135 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -148,7 +148,7 @@ class AggSinkOperatorX final : public DataSinkOperatorX { ? DataDistribution(ExchangeType::PASSTHROUGH) : DataSinkOperatorX::required_data_distribution(); } - return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join + return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index cf2892eb7e6ceb..47a6ba234b5eb0 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -81,7 +81,7 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX create_shared_state() const = 0; [[nodiscard]] virtual DataDistribution required_data_distribution() const; - [[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; } - Status close(RuntimeState* state) override { return Status::InternalError("Should not reach here!"); } @@ -663,8 +664,6 @@ class OperatorXBase : public OperatorBase { [[nodiscard]] virtual Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) = 0; - [[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; } - Status close(RuntimeState* state) override; [[nodiscard]] virtual const RowDescriptor& intermediate_row_desc() const { diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index 8cccc9f8faeba6..3aab11f62d883e 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -168,7 +168,7 @@ class PartitionedHashJoinProbeOperatorX final bool require_shuffled_data_distribution() const override { return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN; } - bool is_shuffled_hash_join() const override { + bool is_shuffled_operator() const override { return _join_distribution == TJoinDistributionType::PARTITIONED; } diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index 1376964663f7f3..c768d7518b95c9 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -118,7 +118,7 @@ class PartitionedHashJoinSinkOperatorX bool require_shuffled_data_distribution() const override { return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN; } - bool is_shuffled_hash_join() const override { + bool is_shuffled_operator() const override { return _join_distribution == TJoinDistributionType::PARTITIONED; } diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index 0bd6dd9096482c..8462472dd02671 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -63,7 +63,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX { Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution() const override { if (_is_analytic_sort) { - return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join + return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } else if (_merge_by_exchange) { diff --git a/be/src/pipeline/exec/union_sink_operator.h b/be/src/pipeline/exec/union_sink_operator.h index 13dfb0ba6379cb..f939950143ae92 100644 --- a/be/src/pipeline/exec/union_sink_operator.h +++ b/be/src/pipeline/exec/union_sink_operator.h @@ -89,6 +89,12 @@ class UnionSinkOperatorX final : public DataSinkOperatorX { } } + bool require_shuffled_data_distribution() const override { + return _followed_by_shuffled_operator; + } + + bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; } + private: int _get_first_materialized_child_idx() const { return _first_materialized_child_idx; } diff --git a/be/src/pipeline/exec/union_source_operator.h b/be/src/pipeline/exec/union_source_operator.h index bf32e9a25c2454..2d112ebf2df579 100644 --- a/be/src/pipeline/exec/union_source_operator.h +++ b/be/src/pipeline/exec/union_source_operator.h @@ -95,6 +95,11 @@ class UnionSourceOperatorX final : public OperatorX { return Status::OK(); } [[nodiscard]] int get_child_count() const { return _child_size; } + bool require_shuffled_data_distribution() const override { + return _followed_by_shuffled_operator; + } + + bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; } private: bool _has_data(RuntimeState* state) const { diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 54c49321389db0..be8c2c43210cd9 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -667,7 +667,7 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool, const DescriptorTbl& descs, OperatorPtr parent, int* node_idx, OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx, - const bool followed_by_shuffled_join) { + const bool followed_by_shuffled_operator) { // propagate error case if (*node_idx >= tnodes.size()) { return Status::InternalError( @@ -677,11 +677,11 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool, const TPlanNode& tnode = tnodes[*node_idx]; int num_children = tnodes[*node_idx].num_children; - bool current_followed_by_shuffled_join = followed_by_shuffled_join; + bool current_followed_by_shuffled_operator = followed_by_shuffled_operator; OperatorPtr op = nullptr; RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], request, descs, op, cur_pipe, parent == nullptr ? -1 : parent->node_id(), child_idx, - followed_by_shuffled_join)); + followed_by_shuffled_operator)); // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr)); if (parent != nullptr) { @@ -691,7 +691,7 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool, *root = op; } /** - * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled hash join. + * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators). * * For plan: * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2) @@ -704,15 +704,15 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool, auto require_shuffled_data_distribution = cur_pipe->operators().empty() ? cur_pipe->sink()->require_shuffled_data_distribution() : op->require_shuffled_data_distribution(); - current_followed_by_shuffled_join = - (followed_by_shuffled_join || op->is_shuffled_hash_join()) && + current_followed_by_shuffled_operator = + (followed_by_shuffled_operator || op->is_shuffled_operator()) && require_shuffled_data_distribution; // rely on that tnodes is preorder of the plan for (int i = 0; i < num_children; i++) { ++*node_idx; RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, request, descs, op, node_idx, nullptr, - cur_pipe, i, current_followed_by_shuffled_join)); + cur_pipe, i, current_followed_by_shuffled_operator)); // we are expecting a child, but have used all nodes // this means we have been given a bad tree and must fail @@ -753,13 +753,13 @@ Status PipelineFragmentContext::_add_local_exchange_impl( * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment. * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`. */ - const bool followed_by_shuffled_join = operators.size() > idx - ? operators[idx]->followed_by_shuffled_join() - : cur_pipe->sink()->followed_by_shuffled_join(); + const bool followed_by_shuffled_operator = + operators.size() > idx ? operators[idx]->followed_by_shuffled_operator() + : cur_pipe->sink()->followed_by_shuffled_operator(); const bool should_disable_bucket_shuffle = bucket_seq_to_instance_idx.empty() && shuffle_idx_to_instance_idx.find(-1) == shuffle_idx_to_instance_idx.end() && - followed_by_shuffled_join; + followed_by_shuffled_operator; sink.reset(new LocalExchangeSinkOperatorX( sink_id, local_exchange_id, should_disable_bucket_shuffle ? _total_instances : _num_instances, @@ -1199,7 +1199,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo const DescriptorTbl& descs, OperatorPtr& op, PipelinePtr& cur_pipe, int parent_idx, int child_idx, - const bool followed_by_shuffled_join) { + const bool followed_by_shuffled_operator) { // We directly construct the operator from Thrift because the given array is in the order of preorder traversal. // Therefore, here we need to use a stack-like structure. _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx); @@ -1321,7 +1321,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs, _require_bucket_distribution)); - op->set_followed_by_shuffled_join(false); + op->set_followed_by_shuffled_operator(false); _require_bucket_distribution = true; RETURN_IF_ERROR(new_pipe->add_operator(op)); RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op)); @@ -1329,7 +1329,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo } else { op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs, _require_bucket_distribution)); - op->set_followed_by_shuffled_join(followed_by_shuffled_join); + op->set_followed_by_shuffled_operator(followed_by_shuffled_operator); _require_bucket_distribution = _require_bucket_distribution || op->require_data_distribution(); RETURN_IF_ERROR(cur_pipe->add_operator(op)); @@ -1384,7 +1384,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); } - sink->set_followed_by_shuffled_join(followed_by_shuffled_join); + sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator); _require_bucket_distribution = _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); @@ -1434,8 +1434,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo _pipeline_parent_map.push(op->node_id(), cur_pipe); _pipeline_parent_map.push(op->node_id(), build_side_pipe); - sink->set_followed_by_shuffled_join(sink->is_shuffled_hash_join()); - op->set_followed_by_shuffled_join(op->is_shuffled_hash_join()); + sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator()); + op->set_followed_by_shuffled_operator(op->is_shuffled_operator()); } else { op.reset(new HashJoinProbeOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); @@ -1456,8 +1456,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo _pipeline_parent_map.push(op->node_id(), cur_pipe); _pipeline_parent_map.push(op->node_id(), build_side_pipe); - sink->set_followed_by_shuffled_join(sink->is_shuffled_hash_join()); - op->set_followed_by_shuffled_join(op->is_shuffled_hash_join()); + sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator()); + op->set_followed_by_shuffled_operator(op->is_shuffled_operator()); } _require_bucket_distribution = _require_bucket_distribution || op->require_data_distribution(); @@ -1487,6 +1487,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo case TPlanNodeType::UNION_NODE: { int child_count = tnode.num_children; op.reset(new UnionSourceOperatorX(pool, tnode, next_operator_id(), descs)); + op->set_followed_by_shuffled_operator(_require_bucket_distribution); RETURN_IF_ERROR(cur_pipe->add_operator(op)); const auto downstream_pipeline_id = cur_pipe->id(); @@ -1498,6 +1499,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo _dag[downstream_pipeline_id].push_back(build_side_pipe->id()); DataSinkOperatorPtr sink; sink.reset(new UnionSinkOperatorX(i, next_sink_operator_id(), pool, tnode, descs)); + sink->set_followed_by_shuffled_operator(_require_bucket_distribution); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(build_side_pipe->set_sink(sink)); RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get())); @@ -1531,7 +1533,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); } - sink->set_followed_by_shuffled_join(followed_by_shuffled_join); + sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator); _require_bucket_distribution = _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); @@ -1571,7 +1573,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo DataSinkOperatorPtr sink; sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); - sink->set_followed_by_shuffled_join(followed_by_shuffled_join); + sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator); _require_bucket_distribution = _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); @@ -1582,11 +1584,13 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo case TPlanNodeType::INTERSECT_NODE: { RETURN_IF_ERROR(_build_operators_for_set_operation_node( pool, tnode, descs, op, cur_pipe, parent_idx, child_idx)); + op->set_followed_by_shuffled_operator(_require_bucket_distribution); break; } case TPlanNodeType::EXCEPT_NODE: { RETURN_IF_ERROR(_build_operators_for_set_operation_node( pool, tnode, descs, op, cur_pipe, parent_idx, child_idx)); + op->set_followed_by_shuffled_operator(_require_bucket_distribution); break; } case TPlanNodeType::REPEAT_NODE: { diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 36362e15813238..e216ef103766fd 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -136,10 +136,10 @@ class PipelineTask { bool is_finalized() const { return _finalized; } void clear_blocking_state(bool wake_up_by_downstream = false) { - _wake_up_by_downstream = _wake_up_by_downstream || wake_up_by_downstream; _state->get_query_ctx()->get_execution_dependency()->set_always_ready(); // We use a lock to assure all dependencies are not deconstructed here. std::unique_lock lc(_dependency_lock); + _wake_up_by_downstream = _wake_up_by_downstream || wake_up_by_downstream; if (!_finalized) { _execution_dep->set_always_ready(); for (auto* dep : _filter_dependencies) {