From d4a9a7f2e1ffcb3b9bc7dd1d65fa1ac897ed2395 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 29 Aug 2024 10:53:57 +0800 Subject: [PATCH] update --- .../pipeline/exec/aggregation_sink_operator.h | 3 +- be/src/pipeline/exec/analytic_sink_operator.h | 5 +- .../distinct_streaming_aggregation_operator.h | 5 +- be/src/pipeline/exec/hashjoin_build_sink.h | 3 + .../pipeline/exec/hashjoin_probe_operator.h | 3 + be/src/pipeline/exec/operator.h | 6 ++ .../partitioned_aggregation_sink_operator.h | 3 + .../partitioned_hash_join_probe_operator.h | 3 + .../partitioned_hash_join_sink_operator.h | 3 + .../pipeline/exec/set_probe_sink_operator.h | 2 + be/src/pipeline/exec/set_sink_operator.h | 1 + be/src/pipeline/exec/sort_sink_operator.h | 3 +- .../local_exchange_sink_operator.cpp | 14 +++-- .../local_exchange_sink_operator.h | 2 +- .../local_exchange/local_exchanger.cpp | 22 +------ be/src/pipeline/pipeline_fragment_context.cpp | 63 +++++++++++++++---- be/src/pipeline/pipeline_fragment_context.h | 5 +- .../join/test_join_local_shuffle.groovy | 6 +- 18 files changed, 106 insertions(+), 46 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 579b9eda1a6e9f..f7b225311a3c06 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -149,11 +149,12 @@ class AggSinkOperatorX final : public DataSinkOperatorX { ? DataDistribution(ExchangeType::PASSTHROUGH) : DataSinkOperatorX::required_data_distribution(); } - return _is_colocate && _require_bucket_distribution + return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } bool require_data_distribution() const override { return _is_colocate; } + bool require_shuffled_data_distribution() const override { return !_probe_expr_ctxs.empty(); } size_t get_revocable_mem_size(RuntimeState* state) const; AggregatedDataVariants* get_agg_data(RuntimeState* state) { diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index 47080b823803b3..6d713996b9cd72 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -82,7 +82,7 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorXrequire_data_distribution(); } + bool require_shuffled_data_distribution() const override { + return _agg_sink_operator->require_shuffled_data_distribution(); + } Status set_child(OperatorXPtr child) override { RETURN_IF_ERROR(DataSinkOperatorX::set_child(child)); diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index 6ee718a3354b27..a63ddb3e69d784 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -166,6 +166,9 @@ class PartitionedHashJoinProbeOperatorX final _distribution_partition_exprs)); } + bool require_shuffled_data_distribution() const override { + return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN; + } bool is_shuffled_hash_join() const override { return _join_distribution == TJoinDistributionType::PARTITIONED; } diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index 1592c29cdb05de..252c53be12d057 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -116,6 +116,9 @@ class PartitionedHashJoinSinkOperatorX _distribution_partition_exprs); } + bool require_shuffled_data_distribution() const override { + return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN; + } bool is_shuffled_hash_join() const override { return _join_distribution == TJoinDistributionType::PARTITIONED; } diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h b/be/src/pipeline/exec/set_probe_sink_operator.h index 93a862fa1cbe39..f21d58425814c8 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.h +++ b/be/src/pipeline/exec/set_probe_sink_operator.h @@ -98,6 +98,8 @@ class SetProbeSinkOperatorX final : public DataSinkOperatorX create_shared_state() const override { return nullptr; } private: diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h index 09a1fa09e7ccbf..ac0757e4467ac0 100644 --- a/be/src/pipeline/exec/set_sink_operator.h +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -95,6 +95,7 @@ class SetSinkOperatorX final : public DataSinkOperatorX diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index b842a56f2adc31..3188bfe3990084 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -64,7 +64,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX { Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution() const override { if (_is_analytic_sort) { - return _is_colocate && _require_bucket_distribution + return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } else if (_merge_by_exchange) { @@ -73,6 +73,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX { } return DataSinkOperatorX::required_data_distribution(); } + bool require_shuffled_data_distribution() const override { return _is_analytic_sort; } bool require_data_distribution() const override { return _is_colocate; } size_t get_revocable_mem_size(RuntimeState* state) const; diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp index f0a51696075925..98b1a719a49da7 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp @@ -36,16 +36,16 @@ std::vector LocalExchangeSinkLocalState::dependencies() const { } Status LocalExchangeSinkOperatorX::init(ExchangeType type, const int num_buckets, - const bool is_shuffled_hash_join, + const bool should_disable_bucket_shuffle, const std::map& shuffle_idx_to_instance_idx) { _name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) + ")"; _type = type; - if (_type == ExchangeType::HASH_SHUFFLE || _type == ExchangeType::BUCKET_HASH_SHUFFLE) { + if (_type == ExchangeType::HASH_SHUFFLE) { // For shuffle join, if data distribution has been broken by previous operator, we // should use a HASH_SHUFFLE local exchanger to shuffle data again. To be mentioned, // we should use map shuffle idx to instance idx because all instances will be // distributed to all BEs. Otherwise, we should use shuffle idx directly. - if (is_shuffled_hash_join) { + if (should_disable_bucket_shuffle) { std::for_each(shuffle_idx_to_instance_idx.begin(), shuffle_idx_to_instance_idx.end(), [&](const auto& item) { DCHECK(item.first != -1); @@ -58,9 +58,11 @@ Status LocalExchangeSinkOperatorX::init(ExchangeType type, const int num_buckets } } _partitioner.reset(new vectorized::Crc32HashPartitioner( - _type == ExchangeType::HASH_SHUFFLE || _bucket_seq_to_instance_idx.empty() - ? _num_partitions - : num_buckets)); + _num_partitions)); + RETURN_IF_ERROR(_partitioner->init(_texprs)); + } else if (_type == ExchangeType::BUCKET_HASH_SHUFFLE) { + _partitioner.reset( + new vectorized::Crc32HashPartitioner(num_buckets)); RETURN_IF_ERROR(_partitioner->init(_texprs)); } return Status::OK(); diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h index faa48d209f4b1e..e0e7688307c386 100644 --- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h @@ -102,7 +102,7 @@ class LocalExchangeSinkOperatorX final : public DataSinkOperatorX& shuffle_idx_to_instance_idx) override; Status prepare(RuntimeState* state) override; diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp b/be/src/pipeline/local_exchange/local_exchanger.cpp index 1bcd9f34ba854c..f4630f328bb70d 100644 --- a/be/src/pipeline/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/local_exchange/local_exchanger.cpp @@ -239,28 +239,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest new_block_wrapper->unref(local_state._shared_state, local_state._channel_id); } } - } else if (bucket_seq_to_instance_idx.empty()) { - /** - * If type is `BUCKET_HASH_SHUFFLE` and `_bucket_seq_to_instance_idx` is empty, which - * means no scan operators is included in this fragment so we also need a `HASH_SHUFFLE` here. - */ - const auto& map = local_state._parent->cast() - ._shuffle_idx_to_instance_idx; - DCHECK(!map.empty()); - new_block_wrapper->ref(map.size()); - for (const auto& it : map) { - DCHECK(it.second >= 0 && it.second < _num_partitions) - << it.first << " : " << it.second << " " << _num_partitions; - uint32_t start = local_state._partition_rows_histogram[it.first]; - uint32_t size = local_state._partition_rows_histogram[it.first + 1] - start; - if (size > 0) { - _enqueue_data_and_set_ready(it.second, local_state, - {new_block_wrapper, {row_idx, start, size}}); - } else { - new_block_wrapper->unref(local_state._shared_state, local_state._channel_id); - } - } } else { + DCHECK(!bucket_seq_to_instance_idx.empty()); new_block_wrapper->ref(_num_partitions); for (size_t i = 0; i < _num_partitions; i++) { uint32_t start = local_state._partition_rows_histogram[i]; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index c3942e8286e8e3..3b94b258d31073 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -616,7 +616,7 @@ Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, int node_idx = 0; RETURN_IF_ERROR(_create_tree_helper(pool, request.fragment.plan.nodes, request, descs, nullptr, - &node_idx, root, cur_pipe, 0)); + &node_idx, root, cur_pipe, 0, false)); if (node_idx + 1 != request.fragment.plan.nodes.size()) { return Status::InternalError( @@ -630,7 +630,8 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool, const doris::TPipelineFragmentParams& request, const DescriptorTbl& descs, OperatorXPtr parent, int* node_idx, OperatorXPtr* root, - PipelinePtr& cur_pipe, int child_idx) { + PipelinePtr& cur_pipe, int child_idx, + const bool followed_by_shuffled_join) { // propagate error case if (*node_idx >= tnodes.size()) { return Status::InternalError( @@ -640,9 +641,11 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool, const TPlanNode& tnode = tnodes[*node_idx]; int num_children = tnodes[*node_idx].num_children; + bool current_followed_by_shuffled_join = followed_by_shuffled_join; OperatorXPtr op = nullptr; RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], request, descs, op, cur_pipe, - parent == nullptr ? -1 : parent->node_id(), child_idx)); + parent == nullptr ? -1 : parent->node_id(), child_idx, + followed_by_shuffled_join)); // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr)); if (parent != nullptr) { @@ -651,12 +654,30 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool, } else { *root = op; } + /** + * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled hash join. + * + * For plan: + * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2) + * Exchange(id=3) -> ShuffledHashJoinBuild(id=2) + * We must ensure data distribution of `LocalExchange(id=0)` is same as Exchange(id=3). + * + * If an operator's is followed by a local exchange without shuffle (e.g. passthrough), a + * shuffled local exchanger will be used before join so it is not followed by shuffle join. + */ + auto require_shuffled_data_distribution = + cur_pipe->operator_xs().empty() + ? cur_pipe->sink_x()->require_shuffled_data_distribution() + : op->require_shuffled_data_distribution(); + current_followed_by_shuffled_join = + (followed_by_shuffled_join || op->is_shuffled_hash_join()) && + require_shuffled_data_distribution; // rely on that tnodes is preorder of the plan for (int i = 0; i < num_children; i++) { ++*node_idx; RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, request, descs, op, node_idx, nullptr, - cur_pipe, i)); + cur_pipe, i, current_followed_by_shuffled_join)); // we are expecting a child, but have used all nodes // this means we have been given a bad tree and must fail @@ -692,15 +713,30 @@ Status PipelineFragmentContext::_add_local_exchange_impl( // 1. Create a new pipeline with local exchange sink. DataSinkOperatorXPtr sink; auto sink_id = next_sink_operator_id(); - const bool is_shuffled_hash_join = operator_xs.size() > idx - ? operator_xs[idx]->is_shuffled_hash_join() - : cur_pipe->sink_x()->is_shuffled_hash_join(); + + /** + * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment. + * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`. + */ + const bool followed_by_shuffled_join = + operator_xs.size() > idx ? operator_xs[idx]->followed_by_shuffled_join() + : cur_pipe->sink_x()->followed_by_shuffled_join(); + const bool should_disable_bucket_shuffle = + bucket_seq_to_instance_idx.empty() && + shuffle_idx_to_instance_idx.find(-1) == shuffle_idx_to_instance_idx.end() && + followed_by_shuffled_join; sink.reset(new LocalExchangeSinkOperatorX( - sink_id, local_exchange_id, is_shuffled_hash_join ? _total_instances : _num_instances, + sink_id, local_exchange_id, + should_disable_bucket_shuffle ? _total_instances : _num_instances, data_distribution.partition_exprs, bucket_seq_to_instance_idx)); + if (should_disable_bucket_shuffle && + data_distribution.distribution_type == ExchangeType::BUCKET_HASH_SHUFFLE) { + data_distribution.distribution_type = ExchangeType::HASH_SHUFFLE; + } RETURN_IF_ERROR(new_pip->set_sink(sink)); RETURN_IF_ERROR(new_pip->sink_x()->init(data_distribution.distribution_type, num_buckets, - is_shuffled_hash_join, shuffle_idx_to_instance_idx)); + should_disable_bucket_shuffle, + shuffle_idx_to_instance_idx)); // 2. Create and initialize LocalExchangeSharedState. std::shared_ptr shared_state = @@ -711,7 +747,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl( case ExchangeType::HASH_SHUFFLE: shared_state->exchanger = ShuffleExchanger::create_unique( std::max(cur_pipe->num_tasks(), _num_instances), - is_shuffled_hash_join ? _total_instances : _num_instances, + should_disable_bucket_shuffle ? _total_instances : _num_instances, _runtime_state->query_options().__isset.local_exchange_free_blocks_limit ? _runtime_state->query_options().local_exchange_free_blocks_limit : 0); @@ -1123,7 +1159,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo const doris::TPipelineFragmentParams& request, const DescriptorTbl& descs, OperatorXPtr& op, PipelinePtr& cur_pipe, int parent_idx, - int child_idx) { + int child_idx, + const bool followed_by_shuffled_join) { // We directly construct the operator from Thrift because the given array is in the order of preorder traversal. // Therefore, here we need to use a stack-like structure. _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx); @@ -1214,6 +1251,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt) { op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs, _require_bucket_distribution)); + op->set_followed_by_shuffled_join(followed_by_shuffled_join); _require_bucket_distribution = _require_bucket_distribution || op->require_data_distribution(); RETURN_IF_ERROR(cur_pipe->add_operator(op)); @@ -1245,6 +1283,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); } + sink->set_followed_by_shuffled_join(followed_by_shuffled_join); _require_bucket_distribution = _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); @@ -1387,6 +1426,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); } + sink->set_followed_by_shuffled_join(followed_by_shuffled_join); _require_bucket_distribution = _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); @@ -1426,6 +1466,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo DataSinkOperatorXPtr sink; sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); + sink->set_followed_by_shuffled_join(followed_by_shuffled_join); _require_bucket_distribution = _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 7597c3ce9b55d0..06c882674412ee 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -145,12 +145,13 @@ class PipelineFragmentContext : public TaskExecutionContext { Status _create_tree_helper(ObjectPool* pool, const std::vector& tnodes, const doris::TPipelineFragmentParams& request, const DescriptorTbl& descs, OperatorXPtr parent, int* node_idx, - OperatorXPtr* root, PipelinePtr& cur_pipe, int child_idx); + OperatorXPtr* root, PipelinePtr& cur_pipe, int child_idx, + const bool followed_by_shuffled_join); Status _create_operator(ObjectPool* pool, const TPlanNode& tnode, const doris::TPipelineFragmentParams& request, const DescriptorTbl& descs, OperatorXPtr& op, PipelinePtr& cur_pipe, - int parent_idx, int child_idx); + int parent_idx, int child_idx, const bool followed_by_shuffled_join); template Status _build_operators_for_set_operation_node(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorXPtr& op, diff --git a/regression-test/suites/nereids_p0/join/test_join_local_shuffle.groovy b/regression-test/suites/nereids_p0/join/test_join_local_shuffle.groovy index c66131b57dcfc9..29fe192e2b5368 100644 --- a/regression-test/suites/nereids_p0/join/test_join_local_shuffle.groovy +++ b/regression-test/suites/nereids_p0/join/test_join_local_shuffle.groovy @@ -16,6 +16,10 @@ // under the License. suite("test_join_local_shuffle", "query,p0") { + sql "DROP TABLE IF EXISTS test_join_local_shuffle_1;" + sql "DROP TABLE IF EXISTS test_join_local_shuffle_2;" + sql "DROP TABLE IF EXISTS test_join_local_shuffle_3;" + sql "DROP TABLE IF EXISTS test_join_local_shuffle_4;" sql "SET enable_nereids_planner=true" sql "SET enable_fallback_to_original_planner=false" sql """ @@ -72,7 +76,7 @@ suite("test_join_local_shuffle", "query,p0") { sql "insert into test_join_local_shuffle_2 values(2, 0);" sql "insert into test_join_local_shuffle_3 values(2, 0);" sql "insert into test_join_local_shuffle_4 values(0, 1);" - qt_sql " select /*+SET_VAR(disable_join_reorder=true,enable_local_shuffle=true) */ * from (select c1, max(c2) from (select b.c1 c1, b.c2 c2 from test_join_local_shuffle_3 a join [shuffle] test_join_local_shuffle_1 b on a.c2 = b.c1 join [broadcast] test_join_local_shuffle_4 c on b.c1 = c.c1) t1 group by c1) t, test_join_local_shuffle_2 where t.c1 = test_join_local_shuffle_2.c2; " + qt_sql " select /*+SET_VAR(disable_join_reorder=true,enable_local_shuffle=true) */ * from (select c1, max(c2) from (select b.c1 c1, b.c2 c2 from test_join_local_shuffle_3 a join [shuffle] test_join_local_shuffle_1 b on a.c2 = b.c1 join [broadcast] test_join_local_shuffle_4 c on b.c1 = c.c1) t1 group by c1) t join [shuffle] test_join_local_shuffle_2 on t.c1 = test_join_local_shuffle_2.c2; " sql "DROP TABLE IF EXISTS test_join_local_shuffle_1;" sql "DROP TABLE IF EXISTS test_join_local_shuffle_2;"