diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 79f5b5af083137..ff5cd9baf4f272 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -581,10 +581,11 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPla _limit(tnode.limit), _have_conjuncts((tnode.__isset.vconjunct && !tnode.vconjunct.nodes.empty()) || (tnode.__isset.conjuncts && !tnode.conjuncts.empty())), - _partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0] - : std::vector {}), - _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate && - require_bucket_distribution), + _partition_exprs(tnode.__isset.distribute_expr_lists && require_bucket_distribution + ? tnode.distribute_expr_lists[0] + : tnode.agg_node.grouping_exprs), + _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate), + _require_bucket_distribution(require_bucket_distribution), _agg_fn_output_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {} Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index d48debc2d8398b..ad8f0d7539baeb 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -146,9 +146,11 @@ class AggSinkOperatorX final : public DataSinkOperatorX { ? DataDistribution(ExchangeType::PASSTHROUGH) : DataSinkOperatorX::required_data_distribution(); } - return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) - : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); + return _is_colocate && _require_bucket_distribution + ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) + : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } + bool require_data_distribution() const override { return _is_colocate; } size_t get_revocable_mem_size(RuntimeState* state) const; vectorized::AggregatedDataVariants* get_agg_data(RuntimeState* state) { @@ -195,6 +197,7 @@ class AggSinkOperatorX final : public DataSinkOperatorX { const std::vector _partition_exprs; const bool _is_colocate; + const bool _require_bucket_distribution; RowDescriptor _agg_fn_output_row_descriptor; }; diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index 44481bbb9c677b..d027ba2f8b50df 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -197,10 +197,11 @@ AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int operator_id, _buffered_tuple_id(tnode.analytic_node.__isset.buffered_tuple_id ? tnode.analytic_node.buffered_tuple_id : 0), - _is_colocate(tnode.analytic_node.__isset.is_colocate && tnode.analytic_node.is_colocate && - require_bucket_distribution), - _partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0] - : std::vector {}) {} + _is_colocate(tnode.analytic_node.__isset.is_colocate && tnode.analytic_node.is_colocate), + _require_bucket_distribution(require_bucket_distribution), + _partition_exprs(tnode.__isset.distribute_expr_lists && require_bucket_distribution + ? tnode.distribute_expr_lists[0] + : tnode.analytic_node.partition_exprs) {} Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state)); diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index 46e7ffac3e8495..774160ee9abe89 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -83,13 +83,15 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX::required_data_distribution(); } + bool require_data_distribution() const override { return true; } + private: Status _insert_range_column(vectorized::Block* block, const vectorized::VExprContextSPtr& expr, vectorized::IColumn* dst_column, size_t length); @@ -106,6 +108,7 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX _num_agg_input; const bool _is_colocate; + const bool _require_bucket_distribution; const std::vector _partition_exprs; }; diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index 05137837473b70..f0cb738a3036fb 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -330,10 +330,11 @@ DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, i _output_tuple_id(tnode.agg_node.output_tuple_id), _needs_finalize(tnode.agg_node.need_finalize), _is_first_phase(tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase), - _partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0] - : std::vector {}), - _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate && - require_bucket_distribution) { + _partition_exprs(tnode.__isset.distribute_expr_lists && require_bucket_distribution + ? tnode.distribute_expr_lists[0] + : tnode.agg_node.grouping_exprs), + _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate), + _require_bucket_distribution(require_bucket_distribution) { if (tnode.agg_node.__isset.use_streaming_preaggregation) { _is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation; if (_is_streaming_preagg) { diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h index 1256210952dad5..e81664466786ab 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h @@ -106,13 +106,15 @@ class DistinctStreamingAggOperatorX final DataDistribution required_data_distribution() const override { if (_needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg)) { - return _is_colocate + return _is_colocate && _require_bucket_distribution ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } return StatefulOperatorX::required_data_distribution(); } + bool require_data_distribution() const override { return _is_colocate; } + private: friend class DistinctStreamingAggLocalState; TupleId _intermediate_tuple_id; @@ -124,6 +126,7 @@ class DistinctStreamingAggOperatorX final const bool _is_first_phase; const std::vector _partition_exprs; const bool _is_colocate; + const bool _require_bucket_distribution; // group by k1,k2 vectorized::VExprContextSPtrs _probe_expr_ctxs; std::vector _aggregate_evaluators; diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 5badc4916eb429..af3b0fa40770e9 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -308,6 +308,10 @@ class PartitionedAggSinkOperatorX : public DataSinkOperatorXrequired_data_distribution(); } + bool require_data_distribution() const override { + return _agg_sink_operator->require_data_distribution(); + } + Status set_child(OperatorXPtr child) override { RETURN_IF_ERROR(DataSinkOperatorX::set_child(child)); return _agg_sink_operator->set_child(child); diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index 282596032374ed..f2224383f86ef6 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -82,9 +82,8 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, const TP _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples), _use_two_phase_read(tnode.sort_node.sort_info.use_two_phase_read), _merge_by_exchange(tnode.sort_node.merge_by_exchange), - _is_colocate(tnode.sort_node.__isset.is_colocate - ? tnode.sort_node.is_colocate && require_bucket_distribution - : require_bucket_distribution), + _is_colocate(tnode.sort_node.__isset.is_colocate && tnode.sort_node.is_colocate), + _require_bucket_distribution(require_bucket_distribution), _is_analytic_sort(tnode.sort_node.__isset.is_analytic_sort ? tnode.sort_node.is_analytic_sort : false), diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index 5f5fce881e2e0c..fa59b1715dc83c 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -66,7 +66,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX { Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution() const override { if (_is_analytic_sort) { - return _is_colocate + return _is_colocate && _require_bucket_distribution ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } else if (_merge_by_exchange) { @@ -75,6 +75,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX { } return DataSinkOperatorX::required_data_distribution(); } + bool require_data_distribution() const override { return _is_colocate; } bool is_full_sort() const { return _algorithm == SortAlgorithm::FULL_SORT; } @@ -106,6 +107,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX { const bool _use_two_phase_read; const bool _merge_by_exchange; const bool _is_colocate = false; + const bool _require_bucket_distribution = false; const bool _is_analytic_sort = false; const std::vector _partition_exprs; }; diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h b/be/src/pipeline/exec/spill_sort_sink_operator.h index 978682d4bdee30..5347f22d11f2c3 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.h +++ b/be/src/pipeline/exec/spill_sort_sink_operator.h @@ -77,6 +77,9 @@ class SpillSortSinkOperatorX final : public DataSinkOperatorXrequired_data_distribution(); } + bool require_data_distribution() const override { + return _sort_sink_operator->require_data_distribution(); + } Status set_child(OperatorXPtr child) override { RETURN_IF_ERROR(DataSinkOperatorX::set_child(child)); return _sort_sink_operator->set_child(child); diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index dbfdaba6d91350..1da6f12736603c 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1175,6 +1175,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo !tnode.agg_node.grouping_exprs.empty()) { op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs, _require_bucket_distribution)); + _require_bucket_distribution = + _require_bucket_distribution || op->require_data_distribution(); RETURN_IF_ERROR(cur_pipe->add_operator(op)); } else if (tnode.agg_node.__isset.use_streaming_preaggregation && tnode.agg_node.use_streaming_preaggregation && @@ -1204,6 +1206,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); } + _require_bucket_distribution = + _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); @@ -1342,6 +1346,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); } + _require_bucket_distribution = + _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); @@ -1379,6 +1385,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo DataSinkOperatorXPtr sink; sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); + _require_bucket_distribution = + _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));