From 8ce113ce81549a749b73df3d15458edb82b2849a Mon Sep 17 00:00:00 2001 From: Gabriel Date: Thu, 21 Dec 2023 14:52:18 +0800 Subject: [PATCH] [pipelineX](fix) Fix TPCH Q2 --- .../exec/exchange_source_operator.cpp | 8 ++--- .../pipeline/exec/exchange_source_operator.h | 10 +++++-- be/src/pipeline/pipeline.h | 30 ++++++++++++++----- be/src/pipeline/pipeline_x/dependency.h | 19 ++++-------- .../pipeline_x_fragment_context.cpp | 2 +- 5 files changed, 39 insertions(+), 30 deletions(-) diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp b/be/src/pipeline/exec/exchange_source_operator.cpp index 99c9e8e9fc2666..255cb151410fda 100644 --- a/be/src/pipeline/exec/exchange_source_operator.cpp +++ b/be/src/pipeline/exec/exchange_source_operator.cpp @@ -109,11 +109,9 @@ ExchangeSourceOperatorX::ExchangeSourceOperatorX(ObjectPool* pool, const TPlanNo : OperatorX(pool, tnode, operator_id, descs), _num_senders(num_senders), _is_merging(tnode.exchange_node.__isset.sort_info), - _is_hash_partition( - tnode.exchange_node.__isset.partition_type && - (tnode.exchange_node.partition_type == TPartitionType::HASH_PARTITIONED || - tnode.exchange_node.partition_type == - TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED)), + _partition_type(tnode.exchange_node.__isset.partition_type + ? tnode.exchange_node.partition_type + : TPartitionType::UNPARTITIONED), _input_row_desc(descs, tnode.exchange_node.input_row_tuples, std::vector(tnode.nullable_tuples.begin(), tnode.nullable_tuples.begin() + diff --git a/be/src/pipeline/exec/exchange_source_operator.h b/be/src/pipeline/exec/exchange_source_operator.h index 9e8f21a0bcc4c5..221a43779a11fb 100644 --- a/be/src/pipeline/exec/exchange_source_operator.h +++ b/be/src/pipeline/exec/exchange_source_operator.h @@ -118,17 +118,21 @@ class ExchangeSourceOperatorX final : public OperatorX { } DataDistribution get_local_exchange_type() const override { - if (!_is_hash_partition || OperatorX::ignore_data_distribution()) { + if (OperatorX::ignore_data_distribution()) { return {ExchangeType::NOOP}; } - return {ExchangeType::HASH_SHUFFLE}; + return _partition_type == TPartitionType::HASH_PARTITIONED + ? DataDistribution(ExchangeType::HASH_SHUFFLE) + : _partition_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED + ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE) + : DataDistribution(ExchangeType::NOOP); } private: friend class ExchangeLocalState; const int _num_senders; const bool _is_merging; - const bool _is_hash_partition; + const TPartitionType::type _partition_type; RowDescriptor _input_row_desc; std::shared_ptr _sub_plan_query_statistics_recvr; diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index 70f2a273151ae3..305676856a0372 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -122,14 +122,30 @@ class Pipeline : public std::enable_shared_from_this { return _collect_query_statistics_with_every_batch; } - bool need_to_local_shuffle(const DataDistribution target_data_distribution) const { - if (target_data_distribution.distribution_type == ExchangeType::BUCKET_HASH_SHUFFLE || - target_data_distribution.distribution_type == ExchangeType::HASH_SHUFFLE) { - // If `_data_distribution` of this pipeline does not match the `target_data_distribution`, - // we should do local shuffle. - return target_data_distribution.operator!=(_data_distribution); + static bool is_hash_exchange(ExchangeType idx) { + return idx == ExchangeType::HASH_SHUFFLE || idx == ExchangeType::BUCKET_HASH_SHUFFLE; + } + + bool need_to_local_exchange(const DataDistribution target_data_distribution) const { + if (target_data_distribution.distribution_type != ExchangeType::BUCKET_HASH_SHUFFLE && + target_data_distribution.distribution_type != ExchangeType::HASH_SHUFFLE) { + return true; + } else if (operatorXs.front()->ignore_data_hash_distribution()) { + if (_data_distribution.distribution_type == + target_data_distribution.distribution_type && + (_data_distribution.partition_exprs.empty() || + target_data_distribution.partition_exprs.empty())) { + return true; + } + return _data_distribution.distribution_type != + target_data_distribution.distribution_type || + _data_distribution.partition_exprs != target_data_distribution.partition_exprs; + } else { + return _data_distribution.distribution_type != + target_data_distribution.distribution_type && + !(is_hash_exchange(_data_distribution.distribution_type) && + is_hash_exchange(target_data_distribution.distribution_type)); } - return true; } void init_data_distribution() { set_data_distribution(operatorXs.front()->get_local_exchange_type()); diff --git a/be/src/pipeline/pipeline_x/dependency.h b/be/src/pipeline/pipeline_x/dependency.h index db7be6f4691d72..5b57f48dae8401 100644 --- a/be/src/pipeline/pipeline_x/dependency.h +++ b/be/src/pipeline/pipeline_x/dependency.h @@ -605,22 +605,13 @@ struct DataDistribution { DataDistribution(const DataDistribution& other) : distribution_type(other.distribution_type), partition_exprs(other.partition_exprs) {} bool need_local_exchange() const { return distribution_type != ExchangeType::NOOP; } - bool operator==(const DataDistribution& other) const { - if (distribution_type == other.distribution_type && - (distribution_type == ExchangeType::HASH_SHUFFLE || - distribution_type == ExchangeType::BUCKET_HASH_SHUFFLE) && - (partition_exprs.empty() || other.partition_exprs.empty())) { - return true; - } - return distribution_type == other.distribution_type && - partition_exprs == other.partition_exprs; - } - DataDistribution operator=(const DataDistribution& other) const { - return DataDistribution(other.distribution_type, other.partition_exprs); + DataDistribution& operator=(const DataDistribution& other) { + distribution_type = other.distribution_type; + partition_exprs = other.partition_exprs; + return *this; } - bool operator!=(const DataDistribution& other) const { return !operator==(other); } ExchangeType distribution_type; - const std::vector partition_exprs; + std::vector partition_exprs; }; class Exchanger; diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 09faf87a09a50d..fe7388735ed413 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -726,7 +726,7 @@ Status PipelineXFragmentContext::_add_local_exchange( return Status::OK(); } - if (!cur_pipe->need_to_local_shuffle(data_distribution)) { + if (!cur_pipe->need_to_local_exchange(data_distribution)) { return Status::OK(); } *do_local_exchange = true;