From 91de107b847795c3a5194d7b47ec35dbfc90f242 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 31 May 2024 15:28:00 +0800 Subject: [PATCH 1/4] [Improvement](local shuffle) Use grouping expr for hash shuffling --- be/src/pipeline/exec/aggregation_sink_operator.cpp | 11 +++++++---- be/src/pipeline/exec/aggregation_sink_operator.h | 9 +++++++-- be/src/pipeline/exec/analytic_sink_operator.cpp | 11 +++++++---- be/src/pipeline/exec/analytic_sink_operator.h | 5 ++++- .../exec/distinct_streaming_aggregation_operator.cpp | 11 +++++++---- .../exec/distinct_streaming_aggregation_operator.h | 5 ++++- .../exec/partitioned_aggregation_sink_operator.h | 4 ++++ be/src/pipeline/exec/sort_sink_operator.cpp | 5 ++--- be/src/pipeline/exec/sort_sink_operator.h | 4 +++- be/src/pipeline/exec/spill_sort_sink_operator.h | 3 +++ be/src/pipeline/pipeline_fragment_context.cpp | 8 ++++++++ 11 files changed, 56 insertions(+), 20 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 79f5b5af083137..5376b12a981091 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -581,10 +581,13 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPla _limit(tnode.limit), _have_conjuncts((tnode.__isset.vconjunct && !tnode.vconjunct.nodes.empty()) || (tnode.__isset.conjuncts && !tnode.conjuncts.empty())), - _partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0] - : std::vector {}), - _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate && - require_bucket_distribution), + _partition_exprs(tnode.__isset.distribute_expr_lists && + tnode.agg_node.__isset.is_colocate && + tnode.agg_node.is_colocate && require_bucket_distribution + ? tnode.distribute_expr_lists[0] + : tnode.agg_node.grouping_exprs), + _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate), + _require_bucket_distribution(require_bucket_distribution), _agg_fn_output_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {} Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index d48debc2d8398b..fe2be1a2afa06b 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -146,8 +146,12 @@ class AggSinkOperatorX final : public DataSinkOperatorX { ? DataDistribution(ExchangeType::PASSTHROUGH) : DataSinkOperatorX::required_data_distribution(); } - return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) - : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); + return _is_colocate && _require_bucket_distribution + ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) + : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); + } + bool require_data_distribution() const override { + return _is_colocate; } size_t get_revocable_mem_size(RuntimeState* state) const; @@ -195,6 +199,7 @@ class AggSinkOperatorX final : public DataSinkOperatorX { const std::vector _partition_exprs; const bool _is_colocate; + const bool _require_bucket_distribution; RowDescriptor _agg_fn_output_row_descriptor; }; diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index 44481bbb9c677b..112a1bb7429183 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -197,10 +197,13 @@ AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int operator_id, _buffered_tuple_id(tnode.analytic_node.__isset.buffered_tuple_id ? tnode.analytic_node.buffered_tuple_id : 0), - _is_colocate(tnode.analytic_node.__isset.is_colocate && tnode.analytic_node.is_colocate && - require_bucket_distribution), - _partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0] - : std::vector {}) {} + _is_colocate(tnode.analytic_node.__isset.is_colocate && tnode.analytic_node.is_colocate), + _require_bucket_distribution(require_bucket_distribution), + _partition_exprs( + tnode.__isset.distribute_expr_lists && tnode.analytic_node.__isset.is_colocate && + tnode.analytic_node.is_colocate && require_bucket_distribution + ? tnode.distribute_expr_lists[0] + : tnode.analytic_node.partition_exprs) {} Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state)); diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index 46e7ffac3e8495..9d2f47e499f1e7 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -83,13 +83,15 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX::required_data_distribution(); } + bool require_data_distribution() const override { return _is_colocate; } + private: Status _insert_range_column(vectorized::Block* block, const vectorized::VExprContextSPtr& expr, vectorized::IColumn* dst_column, size_t length); @@ -106,6 +108,7 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX _num_agg_input; const bool _is_colocate; + const bool _require_bucket_distribution; const std::vector _partition_exprs; }; diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index 05137837473b70..36a4fa6e1e7a6b 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -330,10 +330,13 @@ DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, i _output_tuple_id(tnode.agg_node.output_tuple_id), _needs_finalize(tnode.agg_node.need_finalize), _is_first_phase(tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase), - _partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0] - : std::vector {}), - _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate && - require_bucket_distribution) { + _partition_exprs(tnode.__isset.distribute_expr_lists && + tnode.agg_node.__isset.is_colocate && + tnode.agg_node.is_colocate && require_bucket_distribution + ? tnode.distribute_expr_lists[0] + : tnode.agg_node.grouping_exprs), + _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate), + _require_bucket_distribution(require_bucket_distribution) { if (tnode.agg_node.__isset.use_streaming_preaggregation) { _is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation; if (_is_streaming_preagg) { diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h index 1256210952dad5..e81664466786ab 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h @@ -106,13 +106,15 @@ class DistinctStreamingAggOperatorX final DataDistribution required_data_distribution() const override { if (_needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg)) { - return _is_colocate + return _is_colocate && _require_bucket_distribution ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } return StatefulOperatorX::required_data_distribution(); } + bool require_data_distribution() const override { return _is_colocate; } + private: friend class DistinctStreamingAggLocalState; TupleId _intermediate_tuple_id; @@ -124,6 +126,7 @@ class DistinctStreamingAggOperatorX final const bool _is_first_phase; const std::vector _partition_exprs; const bool _is_colocate; + const bool _require_bucket_distribution; // group by k1,k2 vectorized::VExprContextSPtrs _probe_expr_ctxs; std::vector _aggregate_evaluators; diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 5badc4916eb429..af3b0fa40770e9 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -308,6 +308,10 @@ class PartitionedAggSinkOperatorX : public DataSinkOperatorXrequired_data_distribution(); } + bool require_data_distribution() const override { + return _agg_sink_operator->require_data_distribution(); + } + Status set_child(OperatorXPtr child) override { RETURN_IF_ERROR(DataSinkOperatorX::set_child(child)); return _agg_sink_operator->set_child(child); diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index 282596032374ed..f2224383f86ef6 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -82,9 +82,8 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, const TP _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples), _use_two_phase_read(tnode.sort_node.sort_info.use_two_phase_read), _merge_by_exchange(tnode.sort_node.merge_by_exchange), - _is_colocate(tnode.sort_node.__isset.is_colocate - ? tnode.sort_node.is_colocate && require_bucket_distribution - : require_bucket_distribution), + _is_colocate(tnode.sort_node.__isset.is_colocate && tnode.sort_node.is_colocate), + _require_bucket_distribution(require_bucket_distribution), _is_analytic_sort(tnode.sort_node.__isset.is_analytic_sort ? tnode.sort_node.is_analytic_sort : false), diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index 5f5fce881e2e0c..fa59b1715dc83c 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -66,7 +66,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX { Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution() const override { if (_is_analytic_sort) { - return _is_colocate + return _is_colocate && _require_bucket_distribution ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } else if (_merge_by_exchange) { @@ -75,6 +75,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX { } return DataSinkOperatorX::required_data_distribution(); } + bool require_data_distribution() const override { return _is_colocate; } bool is_full_sort() const { return _algorithm == SortAlgorithm::FULL_SORT; } @@ -106,6 +107,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX { const bool _use_two_phase_read; const bool _merge_by_exchange; const bool _is_colocate = false; + const bool _require_bucket_distribution = false; const bool _is_analytic_sort = false; const std::vector _partition_exprs; }; diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h b/be/src/pipeline/exec/spill_sort_sink_operator.h index 978682d4bdee30..5347f22d11f2c3 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.h +++ b/be/src/pipeline/exec/spill_sort_sink_operator.h @@ -77,6 +77,9 @@ class SpillSortSinkOperatorX final : public DataSinkOperatorXrequired_data_distribution(); } + bool require_data_distribution() const override { + return _sort_sink_operator->require_data_distribution(); + } Status set_child(OperatorXPtr child) override { RETURN_IF_ERROR(DataSinkOperatorX::set_child(child)); return _sort_sink_operator->set_child(child); diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index dbfdaba6d91350..1da6f12736603c 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1175,6 +1175,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo !tnode.agg_node.grouping_exprs.empty()) { op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs, _require_bucket_distribution)); + _require_bucket_distribution = + _require_bucket_distribution || op->require_data_distribution(); RETURN_IF_ERROR(cur_pipe->add_operator(op)); } else if (tnode.agg_node.__isset.use_streaming_preaggregation && tnode.agg_node.use_streaming_preaggregation && @@ -1204,6 +1206,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); } + _require_bucket_distribution = + _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); @@ -1342,6 +1346,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); } + _require_bucket_distribution = + _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); @@ -1379,6 +1385,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo DataSinkOperatorXPtr sink; sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); + _require_bucket_distribution = + _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); From e12533f1476aecd972e7c79cd1098524836c0d12 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 31 May 2024 15:32:19 +0800 Subject: [PATCH 2/4] update --- be/src/pipeline/exec/aggregation_sink_operator.h | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index fe2be1a2afa06b..ad8f0d7539baeb 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -150,9 +150,7 @@ class AggSinkOperatorX final : public DataSinkOperatorX { ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } - bool require_data_distribution() const override { - return _is_colocate; - } + bool require_data_distribution() const override { return _is_colocate; } size_t get_revocable_mem_size(RuntimeState* state) const; vectorized::AggregatedDataVariants* get_agg_data(RuntimeState* state) { From d5ee32e7e996ddae1df0bf673e6db97e25501e3e Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 31 May 2024 16:40:00 +0800 Subject: [PATCH 3/4] udpate --- be/src/pipeline/exec/aggregation_sink_operator.cpp | 4 +--- be/src/pipeline/exec/analytic_sink_operator.cpp | 8 +++----- .../exec/distinct_streaming_aggregation_operator.cpp | 4 +--- 3 files changed, 5 insertions(+), 11 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 5376b12a981091..ff5cd9baf4f272 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -581,9 +581,7 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPla _limit(tnode.limit), _have_conjuncts((tnode.__isset.vconjunct && !tnode.vconjunct.nodes.empty()) || (tnode.__isset.conjuncts && !tnode.conjuncts.empty())), - _partition_exprs(tnode.__isset.distribute_expr_lists && - tnode.agg_node.__isset.is_colocate && - tnode.agg_node.is_colocate && require_bucket_distribution + _partition_exprs(tnode.__isset.distribute_expr_lists && require_bucket_distribution ? tnode.distribute_expr_lists[0] : tnode.agg_node.grouping_exprs), _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate), diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index 112a1bb7429183..d027ba2f8b50df 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -199,11 +199,9 @@ AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int operator_id, : 0), _is_colocate(tnode.analytic_node.__isset.is_colocate && tnode.analytic_node.is_colocate), _require_bucket_distribution(require_bucket_distribution), - _partition_exprs( - tnode.__isset.distribute_expr_lists && tnode.analytic_node.__isset.is_colocate && - tnode.analytic_node.is_colocate && require_bucket_distribution - ? tnode.distribute_expr_lists[0] - : tnode.analytic_node.partition_exprs) {} + _partition_exprs(tnode.__isset.distribute_expr_lists && require_bucket_distribution + ? tnode.distribute_expr_lists[0] + : tnode.analytic_node.partition_exprs) {} Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state)); diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index 36a4fa6e1e7a6b..f0cb738a3036fb 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -330,9 +330,7 @@ DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, i _output_tuple_id(tnode.agg_node.output_tuple_id), _needs_finalize(tnode.agg_node.need_finalize), _is_first_phase(tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase), - _partition_exprs(tnode.__isset.distribute_expr_lists && - tnode.agg_node.__isset.is_colocate && - tnode.agg_node.is_colocate && require_bucket_distribution + _partition_exprs(tnode.__isset.distribute_expr_lists && require_bucket_distribution ? tnode.distribute_expr_lists[0] : tnode.agg_node.grouping_exprs), _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate), From efaf5d854066c8b9c58812379755ae65f8f5ca97 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 31 May 2024 17:30:53 +0800 Subject: [PATCH 4/4] update --- be/src/pipeline/exec/analytic_sink_operator.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index 9d2f47e499f1e7..774160ee9abe89 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -90,7 +90,7 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX::required_data_distribution(); } - bool require_data_distribution() const override { return _is_colocate; } + bool require_data_distribution() const override { return true; } private: Status _insert_range_column(vectorized::Block* block, const vectorized::VExprContextSPtr& expr,