From cee39421ec478315ebadfc8892c25c7171cdef7a Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 26 Apr 2024 13:47:43 +0800 Subject: [PATCH 1/5] [Improvement](pipeline) Use hash shuffle for 1-phase Agg/Analytic ope rator #34122 --- be/src/pipeline/exec/aggregation_sink_operator.cpp | 11 +++++++---- be/src/pipeline/exec/aggregation_sink_operator.h | 2 +- .../exec/partitioned_aggregation_sink_operator.cpp | 6 ++++-- .../exec/partitioned_aggregation_sink_operator.h | 2 +- .../pipeline_x/pipeline_x_fragment_context.cpp | 10 ++++++++-- .../pipeline/pipeline_x/pipeline_x_fragment_context.h | 2 ++ 6 files changed, 23 insertions(+), 10 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 730337561e812a..b323df041faf42 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -615,7 +615,7 @@ void AggSinkLocalState::_init_hash_method(const vectorized::VExprContextSPtrs& p } AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs) + const DescriptorTbl& descs, bool require_bucket_distribution) : DataSinkOperatorX(operator_id, tnode.node_id), _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id), _intermediate_tuple_desc(nullptr), @@ -628,9 +628,12 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPla _limit(tnode.limit), _have_conjuncts((tnode.__isset.vconjunct && !tnode.vconjunct.nodes.empty()) || (tnode.__isset.conjuncts && !tnode.conjuncts.empty())), - _partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0] - : std::vector {}), - _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate), + _partition_exprs(require_bucket_distribution ? (tnode.__isset.distribute_expr_lists + ? tnode.distribute_expr_lists[0] + : std::vector {}) + : tnode.agg_node.grouping_exprs), + _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate && + require_bucket_distribution), _agg_fn_output_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {} Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index b3ffa19d6db791..0c34acfd7dfe84 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -143,7 +143,7 @@ class AggSinkLocalState : public PipelineXSinkLocalState { class AggSinkOperatorX final : public DataSinkOperatorX { public: AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs); + const DescriptorTbl& descs, bool require_bucket_distribution); ~AggSinkOperatorX() override = default; Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TPlanNode", diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index 053e6dee0cba70..9c5c1d6a81cb28 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -124,9 +124,11 @@ void PartitionedAggSinkLocalState::update_profile(RuntimeProfile* child_profile) PartitionedAggSinkOperatorX::PartitionedAggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs) + const DescriptorTbl& descs, + bool require_bucket_distribution) : DataSinkOperatorX(operator_id, tnode.node_id) { - _agg_sink_operator = std::make_unique(pool, operator_id, tnode, descs); + _agg_sink_operator = std::make_unique(pool, operator_id, tnode, descs, + require_bucket_distribution); } Status PartitionedAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 016869374bd921..4ec6b89e6867c8 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -289,7 +289,7 @@ class PartitionedAggSinkLocalState class PartitionedAggSinkOperatorX : public DataSinkOperatorX { public: PartitionedAggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs); + const DescriptorTbl& descs, bool require_bucket_distribution); ~PartitionedAggSinkOperatorX() override = default; Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TPlanNode", diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 7d90cebc8d22d2..991873b077a3d5 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -1067,14 +1067,16 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN DataSinkOperatorXPtr sink; if (_runtime_state->enable_agg_spill() && !tnode.agg_node.grouping_exprs.empty()) { sink.reset(new PartitionedAggSinkOperatorX(pool, next_sink_operator_id(), tnode, - descs)); + descs, _require_bucket_distribution)); } else { - sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs)); + sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, + _require_bucket_distribution)); } sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); } + _require_bucket_distribution = true; break; } case TPlanNodeType::HASH_JOIN_NODE: { @@ -1139,6 +1141,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _pipeline_parent_map.push(op->node_id(), cur_pipe); _pipeline_parent_map.push(op->node_id(), build_side_pipe); } + _require_bucket_distribution = true; break; } case TPlanNodeType::CROSS_JOIN_NODE: { @@ -1244,6 +1247,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); + _require_bucket_distribution = true; break; } case TPlanNodeType::INTERSECT_NODE: { @@ -1301,6 +1305,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN print_plan_node_type(tnode.node_type)); } + _require_bucket_distribution = true; + return Status::OK(); } // NOLINTEND(readability-function-cognitive-complexity) diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index 55866400374d22..14e4b05d7e88ca 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -235,6 +235,8 @@ class PipelineXFragmentContext : public PipelineFragmentContext { // Total instance num running on all BEs int _total_instances = -1; + + bool _require_bucket_distribution = false; }; } // namespace pipeline From 4f9addab7fa5f41013ea32afacbfdba34fe1dbb2 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Tue, 28 May 2024 14:28:17 +0800 Subject: [PATCH 2/5] =?UTF-8?q?[improvement](pipeline)=20Use=20hash=20shuf?= =?UTF-8?q?fle=20local=20exchange=20if=20no=20require=E2=80=A6=20(#35454)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …d data distribution This is a follow-up for #34122 . Currently, we use bucket shuffle local exchange to re-distribution data before a 'colocated' operator. But if no colocate/bucket-shuffle join followed, bucket-shuffle for this operator is not always suitable because the parallism will be restricted by the account of buckets --- .../pipeline/exec/aggregation_sink_operator.cpp | 6 ++---- be/src/pipeline/exec/analytic_sink_operator.cpp | 6 ++++-- be/src/pipeline/exec/analytic_sink_operator.h | 2 +- .../distinct_streaming_aggregation_operator.cpp | 6 ++++-- .../distinct_streaming_aggregation_operator.h | 2 +- be/src/pipeline/exec/hashjoin_build_sink.h | 4 ++++ be/src/pipeline/exec/hashjoin_probe_operator.h | 4 ++++ be/src/pipeline/exec/operator.h | 3 ++- .../exec/partitioned_hash_join_probe_operator.h | 3 +++ .../exec/partitioned_hash_join_sink_operator.h | 4 ++++ be/src/pipeline/exec/sort_sink_operator.cpp | 6 ++++-- be/src/pipeline/exec/sort_sink_operator.h | 2 +- .../pipeline/exec/spill_sort_sink_operator.cpp | 6 ++++-- be/src/pipeline/exec/spill_sort_sink_operator.h | 2 +- .../pipeline_x/pipeline_x_fragment_context.cpp | 16 ++++++++++------ 15 files changed, 49 insertions(+), 23 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index b323df041faf42..2eeedf42372150 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -628,10 +628,8 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPla _limit(tnode.limit), _have_conjuncts((tnode.__isset.vconjunct && !tnode.vconjunct.nodes.empty()) || (tnode.__isset.conjuncts && !tnode.conjuncts.empty())), - _partition_exprs(require_bucket_distribution ? (tnode.__isset.distribute_expr_lists - ? tnode.distribute_expr_lists[0] - : std::vector {}) - : tnode.agg_node.grouping_exprs), + _partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0] + : std::vector {}), _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate && require_bucket_distribution), _agg_fn_output_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {} diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index a1d3384edc6dde..b5a1835df991f6 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -193,12 +193,14 @@ vectorized::BlockRowPos AnalyticSinkLocalState::_get_partition_by_end() { } AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int operator_id, - const TPlanNode& tnode, const DescriptorTbl& descs) + const TPlanNode& tnode, const DescriptorTbl& descs, + bool require_bucket_distribution) : DataSinkOperatorX(operator_id, tnode.node_id), _buffered_tuple_id(tnode.analytic_node.__isset.buffered_tuple_id ? tnode.analytic_node.buffered_tuple_id : 0), - _is_colocate(tnode.analytic_node.__isset.is_colocate && tnode.analytic_node.is_colocate), + _is_colocate(tnode.analytic_node.__isset.is_colocate && tnode.analytic_node.is_colocate && + require_bucket_distribution), _partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0] : std::vector {}) {} diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index 3ae4a7b5cff5ca..b03d6ca709413d 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -86,7 +86,7 @@ class AnalyticSinkLocalState : public PipelineXSinkLocalState { public: AnalyticSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs); + const DescriptorTbl& descs, bool require_bucket_distribution); Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TPlanNode", DataSinkOperatorX::_name); diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index c33b436ba03984..ad8ce8ecbb2c29 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -374,7 +374,8 @@ void DistinctStreamingAggLocalState::_emplace_into_hash_table_to_distinct( DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs) + const DescriptorTbl& descs, + bool require_bucket_distribution) : StatefulOperatorX(pool, tnode, operator_id, descs), _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id), _intermediate_tuple_desc(nullptr), @@ -384,7 +385,8 @@ DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, i _is_first_phase(tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase), _partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0] : std::vector {}), - _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate) { + _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate && + require_bucket_distribution) { if (tnode.agg_node.__isset.use_streaming_preaggregation) { _is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation; if (_is_streaming_preagg) { diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h index ca091f743bd8d9..aea9f8be65aa06 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h @@ -97,7 +97,7 @@ class DistinctStreamingAggOperatorX final : public StatefulOperatorX { public: DistinctStreamingAggOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs); + const DescriptorTbl& descs, bool require_bucket_distribution); Status init(const TPlanNode& tnode, RuntimeState* state) override; Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 2dab03d5a1916c..d445e2f309c373 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -167,6 +167,10 @@ class HashJoinBuildSinkOperatorX final bool is_shuffled_hash_join() const override { return _join_distribution == TJoinDistributionType::PARTITIONED; } + bool require_data_distribution() const override { + return _join_distribution == TJoinDistributionType::COLOCATE || + _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE; + } private: friend class HashJoinBuildSinkLocalState; diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index 5cdfe9feeb75ec..264f177bcc9c4b 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -178,6 +178,10 @@ class HashJoinProbeOperatorX final : public JoinProbeOperatorXrequire_data_distribution(); + } private: Status _revoke_memory(RuntimeState* state); diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index 82fe5eacd9445b..2fae1f15bfa569 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -129,6 +129,10 @@ class PartitionedHashJoinSinkOperatorX _inner_probe_operator = probe_operator; } + bool require_data_distribution() const override { + return _inner_probe_operator->require_data_distribution(); + } + private: friend class PartitionedHashJoinSinkLocalState; diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index d89e54614d14c6..dc9483349a1da0 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -75,7 +75,7 @@ Status SortSinkLocalState::open(RuntimeState* state) { } SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs) + const DescriptorTbl& descs, bool require_bucket_distribution) : DataSinkOperatorX(operator_id, tnode.node_id), _offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0), _pool(pool), @@ -85,7 +85,9 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, const TP _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples), _use_two_phase_read(tnode.sort_node.sort_info.use_two_phase_read), _merge_by_exchange(tnode.sort_node.merge_by_exchange), - _is_colocate(tnode.sort_node.__isset.is_colocate ? tnode.sort_node.is_colocate : false), + _is_colocate(tnode.sort_node.__isset.is_colocate + ? tnode.sort_node.is_colocate && require_bucket_distribution + : require_bucket_distribution), _is_analytic_sort(tnode.sort_node.__isset.is_analytic_sort ? tnode.sort_node.is_analytic_sort : false), diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index ad9c23401b4c69..4271c626ff538a 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -74,7 +74,7 @@ class SortSinkLocalState : public PipelineXSinkLocalState { class SortSinkOperatorX final : public DataSinkOperatorX { public: SortSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs); + const DescriptorTbl& descs, bool require_bucket_distribution); Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TPlanNode", DataSinkOperatorX::_name); diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index dfda2ff61e1022..92cd1f542d8b0c 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -112,9 +112,11 @@ Status SpillSortSinkLocalState::setup_in_memory_sort_op(RuntimeState* state) { } SpillSortSinkOperatorX::SpillSortSinkOperatorX(ObjectPool* pool, int operator_id, - const TPlanNode& tnode, const DescriptorTbl& descs) + const TPlanNode& tnode, const DescriptorTbl& descs, + bool require_bucket_distribution) : DataSinkOperatorX(operator_id, tnode.node_id) { - _sort_sink_operator = std::make_unique(pool, operator_id, tnode, descs); + _sort_sink_operator = std::make_unique(pool, operator_id, tnode, descs, + require_bucket_distribution); } Status SpillSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h b/be/src/pipeline/exec/spill_sort_sink_operator.h index 9382edd6933978..5c50517f93f15f 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.h +++ b/be/src/pipeline/exec/spill_sort_sink_operator.h @@ -64,7 +64,7 @@ class SpillSortSinkOperatorX final : public DataSinkOperatorX::_name); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 991873b077a3d5..8b358bab99dd58 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -1042,7 +1042,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN request.query_options.__isset.enable_distinct_streaming_aggregation && request.query_options.enable_distinct_streaming_aggregation && !tnode.agg_node.grouping_exprs.empty()) { - op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs)); + op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs, + _require_bucket_distribution)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); } else if (tnode.agg_node.__isset.use_streaming_preaggregation && tnode.agg_node.use_streaming_preaggregation && @@ -1141,7 +1142,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _pipeline_parent_map.push(op->node_id(), cur_pipe); _pipeline_parent_map.push(op->node_id(), build_side_pipe); } - _require_bucket_distribution = true; + _require_bucket_distribution = + _require_bucket_distribution || op->require_data_distribution(); break; } case TPlanNodeType::CROSS_JOIN_NODE: { @@ -1204,9 +1206,11 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN DataSinkOperatorXPtr sink; if (_runtime_state->enable_sort_spill()) { - sink.reset(new SpillSortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs)); + sink.reset(new SpillSortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, + _require_bucket_distribution)); } else { - sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs)); + sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, + _require_bucket_distribution)); } sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); @@ -1243,11 +1247,11 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN _dag[downstream_pipeline_id].push_back(cur_pipe->id()); DataSinkOperatorXPtr sink; - sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs)); + sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, + _require_bucket_distribution)); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); - _require_bucket_distribution = true; break; } case TPlanNodeType::INTERSECT_NODE: { From cf17cad4334c839f6a6edecd2ea708efa0a8aa25 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Tue, 4 Jun 2024 12:16:35 +0800 Subject: [PATCH 3/5] [Improvement](local shuffle) Use grouping expr for hash shuffling (#35716) Use grouping expr for hash shuffling --- be/src/pipeline/exec/aggregation_sink_operator.cpp | 9 +++++---- be/src/pipeline/exec/aggregation_sink_operator.h | 7 +++++-- be/src/pipeline/exec/analytic_sink_operator.cpp | 9 +++++---- be/src/pipeline/exec/analytic_sink_operator.h | 5 ++++- .../exec/distinct_streaming_aggregation_operator.cpp | 9 +++++---- .../exec/distinct_streaming_aggregation_operator.h | 5 ++++- .../exec/partitioned_aggregation_sink_operator.h | 4 ++++ be/src/pipeline/exec/sort_sink_operator.cpp | 5 ++--- be/src/pipeline/exec/sort_sink_operator.h | 4 +++- be/src/pipeline/exec/spill_sort_sink_operator.h | 3 +++ .../pipeline/pipeline_x/pipeline_x_fragment_context.cpp | 8 ++++++++ 11 files changed, 48 insertions(+), 20 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 2eeedf42372150..704c256737a644 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -628,10 +628,11 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPla _limit(tnode.limit), _have_conjuncts((tnode.__isset.vconjunct && !tnode.vconjunct.nodes.empty()) || (tnode.__isset.conjuncts && !tnode.conjuncts.empty())), - _partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0] - : std::vector {}), - _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate && - require_bucket_distribution), + _partition_exprs(tnode.__isset.distribute_expr_lists && require_bucket_distribution + ? tnode.distribute_expr_lists[0] + : tnode.agg_node.grouping_exprs), + _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate), + _require_bucket_distribution(require_bucket_distribution), _agg_fn_output_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {} Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 0c34acfd7dfe84..3124a3981b47c7 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -164,9 +164,11 @@ class AggSinkOperatorX final : public DataSinkOperatorX { ? DataDistribution(ExchangeType::PASSTHROUGH) : DataSinkOperatorX::required_data_distribution(); } - return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) - : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); + return _is_colocate && _require_bucket_distribution + ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) + : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } + bool require_data_distribution() const override { return _is_colocate; } size_t get_revocable_mem_size(RuntimeState* state) const; vectorized::AggregatedDataVariants* get_agg_data(RuntimeState* state) { @@ -213,6 +215,7 @@ class AggSinkOperatorX final : public DataSinkOperatorX { const std::vector _partition_exprs; const bool _is_colocate; + const bool _require_bucket_distribution; RowDescriptor _agg_fn_output_row_descriptor; }; diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index b5a1835df991f6..5b4f5cee5cb016 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -199,10 +199,11 @@ AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int operator_id, _buffered_tuple_id(tnode.analytic_node.__isset.buffered_tuple_id ? tnode.analytic_node.buffered_tuple_id : 0), - _is_colocate(tnode.analytic_node.__isset.is_colocate && tnode.analytic_node.is_colocate && - require_bucket_distribution), - _partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0] - : std::vector {}) {} + _is_colocate(tnode.analytic_node.__isset.is_colocate && tnode.analytic_node.is_colocate), + _require_bucket_distribution(require_bucket_distribution), + _partition_exprs(tnode.__isset.distribute_expr_lists && require_bucket_distribution + ? tnode.distribute_expr_lists[0] + : tnode.analytic_node.partition_exprs) {} Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state)); diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index b03d6ca709413d..d974f68cefaf26 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -102,13 +102,15 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX::required_data_distribution(); } + bool require_data_distribution() const override { return true; } + private: Status _insert_range_column(vectorized::Block* block, const vectorized::VExprContextSPtr& expr, vectorized::IColumn* dst_column, size_t length); @@ -125,6 +127,7 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX _num_agg_input; const bool _is_colocate; + const bool _require_bucket_distribution; const std::vector _partition_exprs; }; diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index ad8ce8ecbb2c29..16c0df07b4961b 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -383,10 +383,11 @@ DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, i _output_tuple_desc(nullptr), _needs_finalize(tnode.agg_node.need_finalize), _is_first_phase(tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase), - _partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0] - : std::vector {}), - _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate && - require_bucket_distribution) { + _partition_exprs(tnode.__isset.distribute_expr_lists && require_bucket_distribution + ? tnode.distribute_expr_lists[0] + : tnode.agg_node.grouping_exprs), + _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate), + _require_bucket_distribution(require_bucket_distribution) { if (tnode.agg_node.__isset.use_streaming_preaggregation) { _is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation; if (_is_streaming_preagg) { diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h index aea9f8be65aa06..d0b0d963ead125 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h @@ -107,13 +107,15 @@ class DistinctStreamingAggOperatorX final DataDistribution required_data_distribution() const override { if (_needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg)) { - return _is_colocate + return _is_colocate && _require_bucket_distribution ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } return StatefulOperatorX::required_data_distribution(); } + bool require_data_distribution() const override { return _is_colocate; } + private: friend class DistinctStreamingAggLocalState; TupleId _intermediate_tuple_id; @@ -125,6 +127,7 @@ class DistinctStreamingAggOperatorX final const bool _is_first_phase; const std::vector _partition_exprs; const bool _is_colocate; + const bool _require_bucket_distribution; // group by k1,k2 vectorized::VExprContextSPtrs _probe_expr_ctxs; std::vector _aggregate_evaluators; diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 4ec6b89e6867c8..d79ba6fd3d4f8f 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -308,6 +308,10 @@ class PartitionedAggSinkOperatorX : public DataSinkOperatorXrequired_data_distribution(); } + bool require_data_distribution() const override { + return _agg_sink_operator->require_data_distribution(); + } + Status set_child(OperatorXPtr child) override { RETURN_IF_ERROR(DataSinkOperatorX::set_child(child)); return _agg_sink_operator->set_child(child); diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index dc9483349a1da0..61c35427e57987 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -85,9 +85,8 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, const TP _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples), _use_two_phase_read(tnode.sort_node.sort_info.use_two_phase_read), _merge_by_exchange(tnode.sort_node.merge_by_exchange), - _is_colocate(tnode.sort_node.__isset.is_colocate - ? tnode.sort_node.is_colocate && require_bucket_distribution - : require_bucket_distribution), + _is_colocate(tnode.sort_node.__isset.is_colocate && tnode.sort_node.is_colocate), + _require_bucket_distribution(require_bucket_distribution), _is_analytic_sort(tnode.sort_node.__isset.is_analytic_sort ? tnode.sort_node.is_analytic_sort : false), diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index 4271c626ff538a..f29d9bbde0944c 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -87,7 +87,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX { Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution() const override { if (_is_analytic_sort) { - return _is_colocate + return _is_colocate && _require_bucket_distribution ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } else if (_merge_by_exchange) { @@ -96,6 +96,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX { } return DataSinkOperatorX::required_data_distribution(); } + bool require_data_distribution() const override { return _is_colocate; } bool is_full_sort() const { return _algorithm == SortAlgorithm::FULL_SORT; } @@ -128,6 +129,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX { const bool _use_two_phase_read; const bool _merge_by_exchange; const bool _is_colocate = false; + const bool _require_bucket_distribution = false; const bool _is_analytic_sort = false; const std::vector _partition_exprs; }; diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h b/be/src/pipeline/exec/spill_sort_sink_operator.h index 5c50517f93f15f..fae5fe3270f3a0 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.h +++ b/be/src/pipeline/exec/spill_sort_sink_operator.h @@ -78,6 +78,9 @@ class SpillSortSinkOperatorX final : public DataSinkOperatorXrequired_data_distribution(); } + bool require_data_distribution() const override { + return _sort_sink_operator->require_data_distribution(); + } Status set_child(OperatorXPtr child) override { RETURN_IF_ERROR(DataSinkOperatorX::set_child(child)); return _sort_sink_operator->set_child(child); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 8b358bab99dd58..5b6da20c82f72b 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -1045,6 +1045,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs, _require_bucket_distribution)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); + _require_bucket_distribution = + _require_bucket_distribution || op->require_data_distribution(); } else if (tnode.agg_node.__isset.use_streaming_preaggregation && tnode.agg_node.use_streaming_preaggregation && !tnode.agg_node.grouping_exprs.empty()) { @@ -1073,6 +1075,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); } + _require_bucket_distribution = + _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); @@ -1212,6 +1216,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); } + _require_bucket_distribution = + _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); @@ -1249,6 +1255,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN DataSinkOperatorXPtr sink; sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); + _require_bucket_distribution = + _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); From 1e8aed43367a95d837227747e77ab94ed65c6bc0 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 5 Jul 2024 09:55:14 +0800 Subject: [PATCH 4/5] [pipeline](datagen) Improve datagen operator parallelism (#37195) Now we use ``` DataGenOperator (num_instance=1) -> ResultSinkOperator(num_instance=1) ``` for loading/query tasks. This PR use a local shuffle to improve its parallelism and the plan is ``` DataGenOperator (num_instance=1) -> LocalExchangeSink (num_instance=1) -> LocalExchangeSource (num_instance=(cores / 2) -> ResultSinkOperator(num_instance=(cores / 2)) ``` --- be/src/pipeline/exec/datagen_operator.cpp | 4 ++-- .../pipeline/pipeline_x/pipeline_x_fragment_context.cpp | 4 ++++ .../java/org/apache/doris/planner/DataGenScanNode.java | 9 +++++++++ regression-test/pipeline/p1/conf/regression-conf.groovy | 1 + .../suites/correctness_p0/test_assert_row_num.groovy | 2 +- .../suites/external_table_p0/tvf/test_numbers.groovy | 6 +++--- 6 files changed, 20 insertions(+), 6 deletions(-) diff --git a/be/src/pipeline/exec/datagen_operator.cpp b/be/src/pipeline/exec/datagen_operator.cpp index 4fbe21f71d5e32..1f84bbf145a488 100644 --- a/be/src/pipeline/exec/datagen_operator.cpp +++ b/be/src/pipeline/exec/datagen_operator.cpp @@ -97,8 +97,8 @@ Status DataGenLocalState::init(RuntimeState* state, LocalStateInfo& info) { // TODO: use runtime filter to filte result block, maybe this node need derive from vscan_node. for (const auto& filter_desc : p._runtime_filter_descs) { IRuntimeFilter* runtime_filter = nullptr; - RETURN_IF_ERROR(state->register_consumer_runtime_filter(filter_desc, false, p.node_id(), - &runtime_filter)); + RETURN_IF_ERROR(state->register_consumer_runtime_filter( + filter_desc, p.ignore_data_distribution(), p.node_id(), &runtime_filter)); runtime_filter->init_profile(_runtime_profile.get()); } return Status::OK(); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 5b6da20c82f72b..18eb9582a4bd47 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -1295,6 +1295,10 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN case TPlanNodeType::DATA_GEN_SCAN_NODE: { op.reset(new DataGenSourceOperatorX(pool, tnode, next_operator_id(), descs)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); + if (request.__isset.parallel_instances) { + cur_pipe->set_num_tasks(request.parallel_instances); + op->set_ignore_data_distribution(); + } break; } case TPlanNodeType::SCHEMA_SCAN_NODE: { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java index 14a50160d63435..f4e6dc93130d5c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java @@ -22,6 +22,7 @@ import org.apache.doris.common.NereidsException; import org.apache.doris.common.UserException; import org.apache.doris.datasource.ExternalScanNode; +import org.apache.doris.qe.ConnectContext; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.tablefunction.DataGenTableValuedFunction; import org.apache.doris.tablefunction.TableValuedFunctionTask; @@ -116,6 +117,14 @@ public boolean needToCheckColumnPriv() { // by multi-processes or multi-threads. So we assign instance number to 1. @Override public int getNumInstances() { + if (ConnectContext.get().getSessionVariable().isIgnoreStorageDataDistribution()) { + return ConnectContext.get().getSessionVariable().getParallelExecInstanceNum(); + } + return 1; + } + + @Override + public int getScanRangeNum() { return 1; } diff --git a/regression-test/pipeline/p1/conf/regression-conf.groovy b/regression-test/pipeline/p1/conf/regression-conf.groovy index 8f8458e47a60c3..2a0156e16b4d4c 100644 --- a/regression-test/pipeline/p1/conf/regression-conf.groovy +++ b/regression-test/pipeline/p1/conf/regression-conf.groovy @@ -60,6 +60,7 @@ excludeSuites = "000_the_start_sentinel_do_not_touch," + // keep this line as th "test_profile," + "test_refresh_mtmv," + "test_spark_load," + + "test_iot_auto_detect_concurrent," + "zzz_the_end_sentinel_do_not_touch" // keep this line as the last line // this dir will not be executed diff --git a/regression-test/suites/correctness_p0/test_assert_row_num.groovy b/regression-test/suites/correctness_p0/test_assert_row_num.groovy index 818213f56fee89..68e9740a321ab3 100644 --- a/regression-test/suites/correctness_p0/test_assert_row_num.groovy +++ b/regression-test/suites/correctness_p0/test_assert_row_num.groovy @@ -21,7 +21,7 @@ suite("test_assert_num_rows") { """ qt_sql_2 """ - SELECT * from numbers("number"="10") WHERE ( SELECT * FROM (SELECT 3) __DORIS_DUAL__ ) IS NOT NULL + SELECT * from numbers("number"="10") WHERE ( SELECT * FROM (SELECT 3) __DORIS_DUAL__ ) IS NOT NULL ORDER BY number """ sql """ DROP TABLE IF EXISTS table_9_undef_undef; diff --git a/regression-test/suites/external_table_p0/tvf/test_numbers.groovy b/regression-test/suites/external_table_p0/tvf/test_numbers.groovy index 6f0f74f6433e2f..c0f2cafa403c5d 100644 --- a/regression-test/suites/external_table_p0/tvf/test_numbers.groovy +++ b/regression-test/suites/external_table_p0/tvf/test_numbers.groovy @@ -39,17 +39,17 @@ order_qt_inner_join1 """ select a.number as num1, b.number as num2 from numbers("number" = "10") a inner join numbers("number" = "10") b - on a.number=b.number; + on a.number=b.number ORDER BY a.number,b.number; """ order_qt_inner_join2 """ select a.number as num1, b.number as num2 from numbers("number" = "6") a inner join numbers("number" = "6") b - on a.number>b.number; + on a.number>b.number ORDER BY a.number,b.number; """ order_qt_inner_join3 """ select a.number as num1, b.number as num2 from numbers("number" = "10") a inner join numbers("number" = "10") b - on a.number=b.number and b.number%2 = 0; + on a.number=b.number and b.number%2 = 0 ORDER BY a.number,b.number; """ order_qt_left_join """ select a.number as num1, b.number as num2 From 0f85e9690358451a2be0677c5d211c2076a6f6bd Mon Sep 17 00:00:00 2001 From: Gabriel Date: Wed, 28 Aug 2024 15:17:13 +0800 Subject: [PATCH 5/5] [fix](local shuffle) Fix hash shuffle local exchanger --- .../local_exchange/local_exchanger.cpp | 29 +------------------ .../pipeline_x_fragment_context.cpp | 23 +++++++++++---- .../join/test_join_local_shuffle.groovy | 6 +++- 3 files changed, 23 insertions(+), 35 deletions(-) diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp index 1353e832e24d04..72e475f5461679 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp @@ -196,35 +196,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest new_block_wrapper->unref(local_state._shared_state); } } - } else if (bucket_seq_to_instance_idx.empty()) { - /** - * If type is `BUCKET_HASH_SHUFFLE` and `_bucket_seq_to_instance_idx` is empty, which - * means no scan operators is included in this fragment so we also need a `HASH_SHUFFLE` here. - */ - const auto& map = local_state._parent->cast() - ._shuffle_idx_to_instance_idx; - DCHECK(!map.empty()); - new_block_wrapper->ref(map.size()); - for (const auto& it : map) { - DCHECK(it.second >= 0 && it.second < _num_partitions) - << it.first << " : " << it.second << " " << _num_partitions; - uint32_t start = local_state._partition_rows_histogram[it.first]; - uint32_t size = local_state._partition_rows_histogram[it.first + 1] - start; - if (size > 0) { - local_state._shared_state->add_mem_usage( - it.second, new_block_wrapper->data_block.allocated_bytes(), false); - - if (!_enqueue_data_and_set_ready(it.second, local_state, - {new_block_wrapper, {row_idx, start, size}})) { - local_state._shared_state->sub_mem_usage( - it.second, new_block_wrapper->data_block.allocated_bytes(), false); - new_block_wrapper->unref(local_state._shared_state); - } - } else { - new_block_wrapper->unref(local_state._shared_state); - } - } } else { + DCHECK(!bucket_seq_to_instance_idx.empty()); new_block_wrapper->ref(_num_partitions); for (size_t i = 0; i < _num_partitions; i++) { uint32_t start = local_state._partition_rows_histogram[i]; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 18eb9582a4bd47..a0c31384b2253b 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -800,15 +800,26 @@ Status PipelineXFragmentContext::_add_local_exchange_impl( // 1. Create a new pipeline with local exchange sink. DataSinkOperatorXPtr sink; auto sink_id = next_sink_operator_id(); - const bool is_shuffled_hash_join = operator_xs.size() > idx - ? operator_xs[idx]->is_shuffled_hash_join() - : cur_pipe->sink_x()->is_shuffled_hash_join(); + /** + * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment. + * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`. + */ + const bool should_disable_bucket_shuffle = + bucket_seq_to_instance_idx.empty() || + (operator_xs.size() > idx ? operator_xs[idx]->is_shuffled_hash_join() + : cur_pipe->sink_x()->is_shuffled_hash_join()); sink.reset(new LocalExchangeSinkOperatorX( - sink_id, local_exchange_id, is_shuffled_hash_join ? _total_instances : _num_instances, + sink_id, local_exchange_id, + should_disable_bucket_shuffle ? _total_instances : _num_instances, data_distribution.partition_exprs, bucket_seq_to_instance_idx)); + if (should_disable_bucket_shuffle && + data_distribution.distribution_type == ExchangeType::BUCKET_HASH_SHUFFLE) { + data_distribution.distribution_type = ExchangeType::HASH_SHUFFLE; + } RETURN_IF_ERROR(new_pip->set_sink(sink)); RETURN_IF_ERROR(new_pip->sink_x()->init(data_distribution.distribution_type, num_buckets, - is_shuffled_hash_join, shuffle_idx_to_instance_idx)); + should_disable_bucket_shuffle, + shuffle_idx_to_instance_idx)); // 2. Create and initialize LocalExchangeSharedState. auto shared_state = LocalExchangeSharedState::create_shared(_num_instances); @@ -816,7 +827,7 @@ Status PipelineXFragmentContext::_add_local_exchange_impl( case ExchangeType::HASH_SHUFFLE: shared_state->exchanger = ShuffleExchanger::create_unique( std::max(cur_pipe->num_tasks(), _num_instances), - is_shuffled_hash_join ? _total_instances : _num_instances, + should_disable_bucket_shuffle ? _total_instances : _num_instances, _runtime_state->query_options().__isset.local_exchange_free_blocks_limit ? _runtime_state->query_options().local_exchange_free_blocks_limit : 0); diff --git a/regression-test/suites/nereids_p0/join/test_join_local_shuffle.groovy b/regression-test/suites/nereids_p0/join/test_join_local_shuffle.groovy index c66131b57dcfc9..29fe192e2b5368 100644 --- a/regression-test/suites/nereids_p0/join/test_join_local_shuffle.groovy +++ b/regression-test/suites/nereids_p0/join/test_join_local_shuffle.groovy @@ -16,6 +16,10 @@ // under the License. suite("test_join_local_shuffle", "query,p0") { + sql "DROP TABLE IF EXISTS test_join_local_shuffle_1;" + sql "DROP TABLE IF EXISTS test_join_local_shuffle_2;" + sql "DROP TABLE IF EXISTS test_join_local_shuffle_3;" + sql "DROP TABLE IF EXISTS test_join_local_shuffle_4;" sql "SET enable_nereids_planner=true" sql "SET enable_fallback_to_original_planner=false" sql """ @@ -72,7 +76,7 @@ suite("test_join_local_shuffle", "query,p0") { sql "insert into test_join_local_shuffle_2 values(2, 0);" sql "insert into test_join_local_shuffle_3 values(2, 0);" sql "insert into test_join_local_shuffle_4 values(0, 1);" - qt_sql " select /*+SET_VAR(disable_join_reorder=true,enable_local_shuffle=true) */ * from (select c1, max(c2) from (select b.c1 c1, b.c2 c2 from test_join_local_shuffle_3 a join [shuffle] test_join_local_shuffle_1 b on a.c2 = b.c1 join [broadcast] test_join_local_shuffle_4 c on b.c1 = c.c1) t1 group by c1) t, test_join_local_shuffle_2 where t.c1 = test_join_local_shuffle_2.c2; " + qt_sql " select /*+SET_VAR(disable_join_reorder=true,enable_local_shuffle=true) */ * from (select c1, max(c2) from (select b.c1 c1, b.c2 c2 from test_join_local_shuffle_3 a join [shuffle] test_join_local_shuffle_1 b on a.c2 = b.c1 join [broadcast] test_join_local_shuffle_4 c on b.c1 = c.c1) t1 group by c1) t join [shuffle] test_join_local_shuffle_2 on t.c1 = test_join_local_shuffle_2.c2; " sql "DROP TABLE IF EXISTS test_join_local_shuffle_1;" sql "DROP TABLE IF EXISTS test_join_local_shuffle_2;"