Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions be/src/pipeline/exec/exchange_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,11 +109,9 @@ ExchangeSourceOperatorX::ExchangeSourceOperatorX(ObjectPool* pool, const TPlanNo
: OperatorX<ExchangeLocalState>(pool, tnode, operator_id, descs),
_num_senders(num_senders),
_is_merging(tnode.exchange_node.__isset.sort_info),
_is_hash_partition(
tnode.exchange_node.__isset.partition_type &&
(tnode.exchange_node.partition_type == TPartitionType::HASH_PARTITIONED ||
tnode.exchange_node.partition_type ==
TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED)),
_partition_type(tnode.exchange_node.__isset.partition_type
? tnode.exchange_node.partition_type
: TPartitionType::UNPARTITIONED),
_input_row_desc(descs, tnode.exchange_node.input_row_tuples,
std::vector<bool>(tnode.nullable_tuples.begin(),
tnode.nullable_tuples.begin() +
Expand Down
10 changes: 7 additions & 3 deletions be/src/pipeline/exec/exchange_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,17 +118,21 @@ class ExchangeSourceOperatorX final : public OperatorX<ExchangeLocalState> {
}

DataDistribution get_local_exchange_type() const override {
if (!_is_hash_partition || OperatorX<ExchangeLocalState>::ignore_data_distribution()) {
if (OperatorX<ExchangeLocalState>::ignore_data_distribution()) {
return {ExchangeType::NOOP};
}
return {ExchangeType::HASH_SHUFFLE};
return _partition_type == TPartitionType::HASH_PARTITIONED
? DataDistribution(ExchangeType::HASH_SHUFFLE)
: _partition_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE)
: DataDistribution(ExchangeType::NOOP);
}

private:
friend class ExchangeLocalState;
const int _num_senders;
const bool _is_merging;
const bool _is_hash_partition;
const TPartitionType::type _partition_type;
RowDescriptor _input_row_desc;
std::shared_ptr<QueryStatisticsRecvr> _sub_plan_query_statistics_recvr;

Expand Down
30 changes: 23 additions & 7 deletions be/src/pipeline/pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,30 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
return _collect_query_statistics_with_every_batch;
}

bool need_to_local_shuffle(const DataDistribution target_data_distribution) const {
if (target_data_distribution.distribution_type == ExchangeType::BUCKET_HASH_SHUFFLE ||
target_data_distribution.distribution_type == ExchangeType::HASH_SHUFFLE) {
// If `_data_distribution` of this pipeline does not match the `target_data_distribution`,
// we should do local shuffle.
return target_data_distribution.operator!=(_data_distribution);
static bool is_hash_exchange(ExchangeType idx) {
return idx == ExchangeType::HASH_SHUFFLE || idx == ExchangeType::BUCKET_HASH_SHUFFLE;
}

bool need_to_local_exchange(const DataDistribution target_data_distribution) const {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'need_to_local_exchange' can be made static [readability-convert-member-functions-to-static]

Suggested change
bool need_to_local_exchange(const DataDistribution target_data_distribution) const {
static bool need_to_local_exchange(const DataDistribution target_data_distribution) {

if (target_data_distribution.distribution_type != ExchangeType::BUCKET_HASH_SHUFFLE &&
target_data_distribution.distribution_type != ExchangeType::HASH_SHUFFLE) {
return true;
} else if (operatorXs.front()->ignore_data_hash_distribution()) {
if (_data_distribution.distribution_type ==
target_data_distribution.distribution_type &&
(_data_distribution.partition_exprs.empty() ||
target_data_distribution.partition_exprs.empty())) {
return true;
}
return _data_distribution.distribution_type !=
target_data_distribution.distribution_type ||
_data_distribution.partition_exprs != target_data_distribution.partition_exprs;
} else {
return _data_distribution.distribution_type !=
target_data_distribution.distribution_type &&
!(is_hash_exchange(_data_distribution.distribution_type) &&
is_hash_exchange(target_data_distribution.distribution_type));
}
return true;
}
void init_data_distribution() {
set_data_distribution(operatorXs.front()->get_local_exchange_type());
Expand Down
19 changes: 5 additions & 14 deletions be/src/pipeline/pipeline_x/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -605,22 +605,13 @@ struct DataDistribution {
DataDistribution(const DataDistribution& other)
: distribution_type(other.distribution_type), partition_exprs(other.partition_exprs) {}
bool need_local_exchange() const { return distribution_type != ExchangeType::NOOP; }
bool operator==(const DataDistribution& other) const {
if (distribution_type == other.distribution_type &&
(distribution_type == ExchangeType::HASH_SHUFFLE ||
distribution_type == ExchangeType::BUCKET_HASH_SHUFFLE) &&
(partition_exprs.empty() || other.partition_exprs.empty())) {
return true;
}
return distribution_type == other.distribution_type &&
partition_exprs == other.partition_exprs;
}
DataDistribution operator=(const DataDistribution& other) const {
return DataDistribution(other.distribution_type, other.partition_exprs);
DataDistribution& operator=(const DataDistribution& other) {
distribution_type = other.distribution_type;
partition_exprs = other.partition_exprs;
return *this;
}
bool operator!=(const DataDistribution& other) const { return !operator==(other); }
ExchangeType distribution_type;
const std::vector<TExpr> partition_exprs;
std::vector<TExpr> partition_exprs;
};

class Exchanger;
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,7 @@ Status PipelineXFragmentContext::_add_local_exchange(
return Status::OK();
}

if (!cur_pipe->need_to_local_shuffle(data_distribution)) {
if (!cur_pipe->need_to_local_exchange(data_distribution)) {
return Status::OK();
}
*do_local_exchange = true;
Expand Down