diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 3124a3981b47c7..de1f26057ff185 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -164,11 +164,12 @@ class AggSinkOperatorX final : public DataSinkOperatorX { ? DataDistribution(ExchangeType::PASSTHROUGH) : DataSinkOperatorX::required_data_distribution(); } - return _is_colocate && _require_bucket_distribution + return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } bool require_data_distribution() const override { return _is_colocate; } + bool require_shuffled_data_distribution() const override { return !_probe_expr_ctxs.empty(); } size_t get_revocable_mem_size(RuntimeState* state) const; vectorized::AggregatedDataVariants* get_agg_data(RuntimeState* state) { diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index d974f68cefaf26..eb65414206c5b4 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -102,7 +102,7 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorXrequire_data_distribution(); } + bool require_shuffled_data_distribution() const override { + return _agg_sink_operator->require_shuffled_data_distribution(); + } Status set_child(OperatorXPtr child) override { RETURN_IF_ERROR(DataSinkOperatorX::set_child(child)); diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index b10c514b2f4d42..b19537741a8e68 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -169,6 +169,9 @@ class PartitionedHashJoinProbeOperatorX final _distribution_partition_exprs)); } + bool require_shuffled_data_distribution() const override { + return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN; + } bool is_shuffled_hash_join() const override { return _join_distribution == TJoinDistributionType::PARTITIONED; } diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index 2fae1f15bfa569..f3b36086799b20 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -119,6 +119,9 @@ class PartitionedHashJoinSinkOperatorX _distribution_partition_exprs); } + bool require_shuffled_data_distribution() const override { + return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN; + } bool is_shuffled_hash_join() const override { return _join_distribution == TJoinDistributionType::PARTITIONED; } diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h b/be/src/pipeline/exec/set_probe_sink_operator.h index 9f80f03966b1f1..7d8bc006892a43 100644 --- a/be/src/pipeline/exec/set_probe_sink_operator.h +++ b/be/src/pipeline/exec/set_probe_sink_operator.h @@ -132,6 +132,8 @@ class SetProbeSinkOperatorX final : public DataSinkOperatorX create_shared_state() const override { return nullptr; } private: diff --git a/be/src/pipeline/exec/set_sink_operator.h b/be/src/pipeline/exec/set_sink_operator.h index 2a6bb63c02e815..16d54cabaafe23 100644 --- a/be/src/pipeline/exec/set_sink_operator.h +++ b/be/src/pipeline/exec/set_sink_operator.h @@ -124,6 +124,7 @@ class SetSinkOperatorX final : public DataSinkOperatorX diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index f29d9bbde0944c..ae4823c55d0f91 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -87,7 +87,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX { Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; DataDistribution required_data_distribution() const override { if (_is_analytic_sort) { - return _is_colocate && _require_bucket_distribution + return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); } else if (_merge_by_exchange) { @@ -96,6 +96,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX { } return DataSinkOperatorX::required_data_distribution(); } + bool require_shuffled_data_distribution() const override { return _is_analytic_sort; } bool require_data_distribution() const override { return _is_colocate; } bool is_full_sort() const { return _algorithm == SortAlgorithm::FULL_SORT; } diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp index 74cfc50175c8e1..bf3e21f5357e52 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp @@ -69,6 +69,40 @@ std::string LocalExchangeSinkLocalState::debug_string(int indentation_level) con return fmt::to_string(debug_string_buffer); } +Status LocalExchangeSinkOperatorX::init(ExchangeType type, const int num_buckets, + const bool should_disable_bucket_shuffle, + const std::map& shuffle_idx_to_instance_idx) { + _name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) + ")"; + _type = type; + if (_type == ExchangeType::HASH_SHUFFLE) { + // For shuffle join, if data distribution has been broken by previous operator, we + // should use a HASH_SHUFFLE local exchanger to shuffle data again. To be mentioned, + // we should use map shuffle idx to instance idx because all instances will be + // distributed to all BEs. Otherwise, we should use shuffle idx directly. + if (should_disable_bucket_shuffle) { + std::for_each(shuffle_idx_to_instance_idx.begin(), shuffle_idx_to_instance_idx.end(), + [&](const auto& item) { + DCHECK(item.first != -1); + _shuffle_idx_to_instance_idx.push_back({item.first, item.second}); + }); + } else { + _shuffle_idx_to_instance_idx.resize(_num_partitions); + for (int i = 0; i < _num_partitions; i++) { + _shuffle_idx_to_instance_idx[i] = {i, i}; + } + } + _partitioner.reset(new vectorized::Crc32HashPartitioner( + _num_partitions)); + RETURN_IF_ERROR(_partitioner->init(_texprs)); + } else if (_type == ExchangeType::BUCKET_HASH_SHUFFLE) { + _partitioner.reset( + new vectorized::Crc32HashPartitioner(num_buckets)); + RETURN_IF_ERROR(_partitioner->init(_texprs)); + } + + return Status::OK(); +} + Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block, bool eos) { auto& local_state = get_local_state(state); diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h index 2ee5c3af004c67..f1d60fc03c4360 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h @@ -95,38 +95,8 @@ class LocalExchangeSinkOperatorX final : public DataSinkOperatorX& shuffle_idx_to_instance_idx) override { - _name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) + ")"; - _type = type; - if (_type == ExchangeType::HASH_SHUFFLE || _type == ExchangeType::BUCKET_HASH_SHUFFLE) { - // For shuffle join, if data distribution has been broken by previous operator, we - // should use a HASH_SHUFFLE local exchanger to shuffle data again. To be mentioned, - // we should use map shuffle idx to instance idx because all instances will be - // distributed to all BEs. Otherwise, we should use shuffle idx directly. - if (is_shuffled_hash_join) { - std::for_each(shuffle_idx_to_instance_idx.begin(), - shuffle_idx_to_instance_idx.end(), [&](const auto& item) { - DCHECK(item.first != -1); - _shuffle_idx_to_instance_idx.push_back({item.first, item.second}); - }); - } else { - _shuffle_idx_to_instance_idx.resize(_num_partitions); - for (int i = 0; i < _num_partitions; i++) { - _shuffle_idx_to_instance_idx[i] = {i, i}; - } - } - _partitioner.reset( - _type == ExchangeType::HASH_SHUFFLE || _bucket_seq_to_instance_idx.empty() - ? new vectorized::Crc32HashPartitioner( - _num_partitions) - : new vectorized::Crc32HashPartitioner( - num_buckets)); - RETURN_IF_ERROR(_partitioner->init(_texprs)); - } - - return Status::OK(); - } + Status init(ExchangeType type, const int num_buckets, const bool should_disable_bucket_shuffle, + const std::map& shuffle_idx_to_instance_idx) override; Status prepare(RuntimeState* state) override { if (_type == ExchangeType::HASH_SHUFFLE || _type == ExchangeType::BUCKET_HASH_SHUFFLE) { diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp index 1353e832e24d04..72e475f5461679 100644 --- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp +++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp @@ -196,35 +196,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest new_block_wrapper->unref(local_state._shared_state); } } - } else if (bucket_seq_to_instance_idx.empty()) { - /** - * If type is `BUCKET_HASH_SHUFFLE` and `_bucket_seq_to_instance_idx` is empty, which - * means no scan operators is included in this fragment so we also need a `HASH_SHUFFLE` here. - */ - const auto& map = local_state._parent->cast() - ._shuffle_idx_to_instance_idx; - DCHECK(!map.empty()); - new_block_wrapper->ref(map.size()); - for (const auto& it : map) { - DCHECK(it.second >= 0 && it.second < _num_partitions) - << it.first << " : " << it.second << " " << _num_partitions; - uint32_t start = local_state._partition_rows_histogram[it.first]; - uint32_t size = local_state._partition_rows_histogram[it.first + 1] - start; - if (size > 0) { - local_state._shared_state->add_mem_usage( - it.second, new_block_wrapper->data_block.allocated_bytes(), false); - - if (!_enqueue_data_and_set_ready(it.second, local_state, - {new_block_wrapper, {row_idx, start, size}})) { - local_state._shared_state->sub_mem_usage( - it.second, new_block_wrapper->data_block.allocated_bytes(), false); - new_block_wrapper->unref(local_state._shared_state); - } - } else { - new_block_wrapper->unref(local_state._shared_state); - } - } } else { + DCHECK(!bucket_seq_to_instance_idx.empty()); new_block_wrapper->ref(_num_partitions); for (size_t i = 0; i < _num_partitions; i++) { uint32_t start = local_state._partition_rows_histogram[i]; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 18eb9582a4bd47..f4c2e1fbc7c25b 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -714,7 +714,7 @@ Status PipelineXFragmentContext::_build_pipelines(ObjectPool* pool, cur_pipe->_name.append(std::to_string(cur_pipe->id())); RETURN_IF_ERROR(_create_tree_helper(pool, request.fragment.plan.nodes, request, descs, nullptr, - &node_idx, root, cur_pipe, 0)); + &node_idx, root, cur_pipe, 0, false)); if (node_idx + 1 != request.fragment.plan.nodes.size()) { // TODO: print thrift msg for diagnostic purposes. @@ -725,13 +725,11 @@ Status PipelineXFragmentContext::_build_pipelines(ObjectPool* pool, return Status::OK(); } -Status PipelineXFragmentContext::_create_tree_helper(ObjectPool* pool, - const std::vector& tnodes, - const doris::TPipelineFragmentParams& request, - const DescriptorTbl& descs, - OperatorXPtr parent, int* node_idx, - OperatorXPtr* root, PipelinePtr& cur_pipe, - int child_idx) { +Status PipelineXFragmentContext::_create_tree_helper( + ObjectPool* pool, const std::vector& tnodes, + const doris::TPipelineFragmentParams& request, const DescriptorTbl& descs, + OperatorXPtr parent, int* node_idx, OperatorXPtr* root, PipelinePtr& cur_pipe, + int child_idx, const bool followed_by_shuffled_join) { // propagate error case if (*node_idx >= tnodes.size()) { // TODO: print thrift msg @@ -742,9 +740,11 @@ Status PipelineXFragmentContext::_create_tree_helper(ObjectPool* pool, const TPlanNode& tnode = tnodes[*node_idx]; int num_children = tnodes[*node_idx].num_children; + bool current_followed_by_shuffled_join = followed_by_shuffled_join; OperatorXPtr op = nullptr; RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], request, descs, op, cur_pipe, - parent == nullptr ? -1 : parent->node_id(), child_idx)); + parent == nullptr ? -1 : parent->node_id(), child_idx, + followed_by_shuffled_join)); // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr)); if (parent != nullptr) { @@ -754,6 +754,25 @@ Status PipelineXFragmentContext::_create_tree_helper(ObjectPool* pool, *root = op; } + /** + * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled hash join. + * + * For plan: + * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2) + * Exchange(id=3) -> ShuffledHashJoinBuild(id=2) + * We must ensure data distribution of `LocalExchange(id=0)` is same as Exchange(id=3). + * + * If an operator's is followed by a local exchange without shuffle (e.g. passthrough), a + * shuffled local exchanger will be used before join so it is not followed by shuffle join. + */ + auto require_shuffled_data_distribution = + cur_pipe->operator_xs().empty() + ? cur_pipe->sink_x()->require_shuffled_data_distribution() + : op->require_shuffled_data_distribution(); + current_followed_by_shuffled_join = + (followed_by_shuffled_join || op->is_shuffled_hash_join()) && + require_shuffled_data_distribution; + cur_pipe->_name.push_back('-'); cur_pipe->_name.append(std::to_string(op->id())); cur_pipe->_name.append(op->get_name()); @@ -762,7 +781,7 @@ Status PipelineXFragmentContext::_create_tree_helper(ObjectPool* pool, for (int i = 0; i < num_children; i++) { ++*node_idx; RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, request, descs, op, node_idx, nullptr, - cur_pipe, i)); + cur_pipe, i, current_followed_by_shuffled_join)); // we are expecting a child, but have used all nodes // this means we have been given a bad tree and must fail @@ -800,15 +819,29 @@ Status PipelineXFragmentContext::_add_local_exchange_impl( // 1. Create a new pipeline with local exchange sink. DataSinkOperatorXPtr sink; auto sink_id = next_sink_operator_id(); - const bool is_shuffled_hash_join = operator_xs.size() > idx - ? operator_xs[idx]->is_shuffled_hash_join() - : cur_pipe->sink_x()->is_shuffled_hash_join(); + /** + * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment. + * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`. + */ + const bool followed_by_shuffled_join = + operator_xs.size() > idx ? operator_xs[idx]->followed_by_shuffled_join() + : cur_pipe->sink_x()->followed_by_shuffled_join(); + const bool should_disable_bucket_shuffle = + bucket_seq_to_instance_idx.empty() && + shuffle_idx_to_instance_idx.find(-1) == shuffle_idx_to_instance_idx.end() && + followed_by_shuffled_join; sink.reset(new LocalExchangeSinkOperatorX( - sink_id, local_exchange_id, is_shuffled_hash_join ? _total_instances : _num_instances, + sink_id, local_exchange_id, + should_disable_bucket_shuffle ? _total_instances : _num_instances, data_distribution.partition_exprs, bucket_seq_to_instance_idx)); + if (should_disable_bucket_shuffle && + data_distribution.distribution_type == ExchangeType::BUCKET_HASH_SHUFFLE) { + data_distribution.distribution_type = ExchangeType::HASH_SHUFFLE; + } RETURN_IF_ERROR(new_pip->set_sink(sink)); RETURN_IF_ERROR(new_pip->sink_x()->init(data_distribution.distribution_type, num_buckets, - is_shuffled_hash_join, shuffle_idx_to_instance_idx)); + should_disable_bucket_shuffle, + shuffle_idx_to_instance_idx)); // 2. Create and initialize LocalExchangeSharedState. auto shared_state = LocalExchangeSharedState::create_shared(_num_instances); @@ -816,7 +849,7 @@ Status PipelineXFragmentContext::_add_local_exchange_impl( case ExchangeType::HASH_SHUFFLE: shared_state->exchanger = ShuffleExchanger::create_unique( std::max(cur_pipe->num_tasks(), _num_instances), - is_shuffled_hash_join ? _total_instances : _num_instances, + should_disable_bucket_shuffle ? _total_instances : _num_instances, _runtime_state->query_options().__isset.local_exchange_free_blocks_limit ? _runtime_state->query_options().local_exchange_free_blocks_limit : 0); @@ -971,7 +1004,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN const doris::TPipelineFragmentParams& request, const DescriptorTbl& descs, OperatorXPtr& op, PipelinePtr& cur_pipe, int parent_idx, - int child_idx) { + int child_idx, + const bool followed_by_shuffled_join) { // We directly construct the operator from Thrift because the given array is in the order of preorder traversal. // Therefore, here we need to use a stack-like structure. _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx); @@ -1045,6 +1079,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs, _require_bucket_distribution)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); + op->set_followed_by_shuffled_join(followed_by_shuffled_join); _require_bucket_distribution = _require_bucket_distribution || op->require_data_distribution(); } else if (tnode.agg_node.__isset.use_streaming_preaggregation && @@ -1075,6 +1110,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); } + sink->set_followed_by_shuffled_join(followed_by_shuffled_join); _require_bucket_distribution = _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); @@ -1216,6 +1252,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); } + sink->set_followed_by_shuffled_join(followed_by_shuffled_join); _require_bucket_distribution = _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); @@ -1255,6 +1292,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN DataSinkOperatorXPtr sink; sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, _require_bucket_distribution)); + sink->set_followed_by_shuffled_join(followed_by_shuffled_join); _require_bucket_distribution = _require_bucket_distribution || sink->require_data_distribution(); sink->set_dests_id({op->operator_id()}); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index 14e4b05d7e88ca..a695f73d28333a 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -141,12 +141,13 @@ class PipelineXFragmentContext : public PipelineFragmentContext { Status _create_tree_helper(ObjectPool* pool, const std::vector& tnodes, const doris::TPipelineFragmentParams& request, const DescriptorTbl& descs, OperatorXPtr parent, int* node_idx, - OperatorXPtr* root, PipelinePtr& cur_pipe, int child_idx); + OperatorXPtr* root, PipelinePtr& cur_pipe, int child_idx, + const bool followed_by_shuffled_join); Status _create_operator(ObjectPool* pool, const TPlanNode& tnode, const doris::TPipelineFragmentParams& request, const DescriptorTbl& descs, OperatorXPtr& op, PipelinePtr& cur_pipe, - int parent_idx, int child_idx); + int parent_idx, int child_idx, const bool followed_by_shuffled_join); template Status _build_operators_for_set_operation_node(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorXPtr& op, diff --git a/regression-test/suites/nereids_p0/join/test_join_local_shuffle.groovy b/regression-test/suites/nereids_p0/join/test_join_local_shuffle.groovy index c66131b57dcfc9..29fe192e2b5368 100644 --- a/regression-test/suites/nereids_p0/join/test_join_local_shuffle.groovy +++ b/regression-test/suites/nereids_p0/join/test_join_local_shuffle.groovy @@ -16,6 +16,10 @@ // under the License. suite("test_join_local_shuffle", "query,p0") { + sql "DROP TABLE IF EXISTS test_join_local_shuffle_1;" + sql "DROP TABLE IF EXISTS test_join_local_shuffle_2;" + sql "DROP TABLE IF EXISTS test_join_local_shuffle_3;" + sql "DROP TABLE IF EXISTS test_join_local_shuffle_4;" sql "SET enable_nereids_planner=true" sql "SET enable_fallback_to_original_planner=false" sql """ @@ -72,7 +76,7 @@ suite("test_join_local_shuffle", "query,p0") { sql "insert into test_join_local_shuffle_2 values(2, 0);" sql "insert into test_join_local_shuffle_3 values(2, 0);" sql "insert into test_join_local_shuffle_4 values(0, 1);" - qt_sql " select /*+SET_VAR(disable_join_reorder=true,enable_local_shuffle=true) */ * from (select c1, max(c2) from (select b.c1 c1, b.c2 c2 from test_join_local_shuffle_3 a join [shuffle] test_join_local_shuffle_1 b on a.c2 = b.c1 join [broadcast] test_join_local_shuffle_4 c on b.c1 = c.c1) t1 group by c1) t, test_join_local_shuffle_2 where t.c1 = test_join_local_shuffle_2.c2; " + qt_sql " select /*+SET_VAR(disable_join_reorder=true,enable_local_shuffle=true) */ * from (select c1, max(c2) from (select b.c1 c1, b.c2 c2 from test_join_local_shuffle_3 a join [shuffle] test_join_local_shuffle_1 b on a.c2 = b.c1 join [broadcast] test_join_local_shuffle_4 c on b.c1 = c.c1) t1 group by c1) t join [shuffle] test_join_local_shuffle_2 on t.c1 = test_join_local_shuffle_2.c2; " sql "DROP TABLE IF EXISTS test_join_local_shuffle_1;" sql "DROP TABLE IF EXISTS test_join_local_shuffle_2;"