From ea4526a8b2710567de3ccde1f644196ec458b378 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 25 Apr 2024 18:49:01 +0800 Subject: [PATCH] update --- be/src/pipeline/exec/aggregation_sink_operator.cpp | 11 +++++++---- be/src/pipeline/exec/aggregation_sink_operator.h | 2 +- .../exec/partitioned_aggregation_sink_operator.cpp | 6 ++++-- .../exec/partitioned_aggregation_sink_operator.h | 2 +- .../pipeline_x/pipeline_x_fragment_context.cpp | 10 ++++++++-- .../pipeline/pipeline_x/pipeline_x_fragment_context.h | 2 ++ 6 files changed, 23 insertions(+), 10 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index fd88b0d15218b0..6c9d27e2a2b063 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -616,7 +616,7 @@ void AggSinkLocalState::_init_hash_method(const vectorized::VExprContextSPtrs& p } AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs) + const DescriptorTbl& descs, bool require_bucket_distribution) : DataSinkOperatorX(operator_id, tnode.node_id), _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id), _intermediate_tuple_desc(nullptr), @@ -629,9 +629,12 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPla _limit(tnode.limit), _have_conjuncts((tnode.__isset.vconjunct && !tnode.vconjunct.nodes.empty()) || (tnode.__isset.conjuncts && !tnode.conjuncts.empty())), - _partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0] - : std::vector {}), - _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate), + _partition_exprs(require_bucket_distribution ? (tnode.__isset.distribute_expr_lists + ? tnode.distribute_expr_lists[0] + : std::vector {}) + : tnode.agg_node.grouping_exprs), + _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate && + require_bucket_distribution), _agg_fn_output_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {} Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index b3ffa19d6db791..0c34acfd7dfe84 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -143,7 +143,7 @@ class AggSinkLocalState : public PipelineXSinkLocalState { class AggSinkOperatorX final : public DataSinkOperatorX { public: AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs); + const DescriptorTbl& descs, bool require_bucket_distribution); ~AggSinkOperatorX() override = default; Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TPlanNode", diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 55a0650dc1ff1c..1c6eeccf07a50f 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -122,9 +122,11 @@ void PartitionedAggSinkLocalState::update_profile(RuntimeProfile* child_profile) PartitionedAggSinkOperatorX::PartitionedAggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs) + const DescriptorTbl& descs, + bool require_bucket_distribution) : DataSinkOperatorX(operator_id, tnode.node_id) { - _agg_sink_operator = std::make_unique(pool, operator_id, tnode, descs); + _agg_sink_operator = std::make_unique(pool, operator_id, tnode, descs, + require_bucket_distribution); } Status PartitionedAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 7ec582905d00cf..48c42b74b79b82 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -308,7 +308,7 @@ class PartitionedAggSinkLocalState class PartitionedAggSinkOperatorX : public DataSinkOperatorX { public: PartitionedAggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs); + const DescriptorTbl& descs, bool require_bucket_distribution); ~PartitionedAggSinkOperatorX() override = default; Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TPlanNode", diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 5fd59d2955a8ae..ee0ca7c6197a3c 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -1024,14 +1024,16 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN DataSinkOperatorXPtr sink; if (_runtime_state->enable_agg_spill() && !tnode.agg_node.grouping_exprs.empty()) { sink.reset(new PartitionedAggSinkOperatorX(pool, next_sink_operator_id(), tnode, - descs)); + descs, _require_bucket_distribution)); } else { - sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs)); + sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, + _require_bucket_distribution)); } sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); } + _require_bucket_distribution = true; break; } case TPlanNodeType::HASH_JOIN_NODE: { @@ -1096,6 +1098,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _pipeline_parent_map.push(op->node_id(), cur_pipe); _pipeline_parent_map.push(op->node_id(), build_side_pipe); } + _require_bucket_distribution = true; break; } case TPlanNodeType::CROSS_JOIN_NODE: { @@ -1201,6 +1204,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); + _require_bucket_distribution = true; break; } case TPlanNodeType::INTERSECT_NODE: { @@ -1258,6 +1262,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN print_plan_node_type(tnode.node_type)); } + _require_bucket_distribution = true; + return Status::OK(); } // NOLINTEND(readability-function-cognitive-complexity) diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index 31febc0d8aaf4d..c87f8f4f784051 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -239,6 +239,8 @@ class PipelineXFragmentContext : public PipelineFragmentContext { // Total instance num running on all BEs int _total_instances = -1; + + bool _require_bucket_distribution = false; }; } // namespace pipeline