Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,12 @@ class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {
? DataDistribution(ExchangeType::PASSTHROUGH)
: DataSinkOperatorX<AggSinkLocalState>::required_data_distribution();
}
return _is_colocate && _require_bucket_distribution
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
bool require_data_distribution() const override { return _is_colocate; }
bool require_shuffled_data_distribution() const override { return !_probe_expr_ctxs.empty(); }
size_t get_revocable_mem_size(RuntimeState* state) const;

vectorized::AggregatedDataVariants* get_agg_data(RuntimeState* state) {
Expand Down
5 changes: 4 additions & 1 deletion be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,17 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalSt
if (_partition_by_eq_expr_ctxs.empty()) {
return {ExchangeType::PASSTHROUGH};
} else if (_order_by_eq_expr_ctxs.empty()) {
return _is_colocate && _require_bucket_distribution
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
return DataSinkOperatorX<AnalyticSinkLocalState>::required_data_distribution();
}

bool require_data_distribution() const override { return true; }
bool require_shuffled_data_distribution() const override {
return !_partition_by_eq_expr_ctxs.empty() && _order_by_eq_expr_ctxs.empty();
}

private:
Status _insert_range_column(vectorized::Block* block, const vectorized::VExprContextSPtr& expr,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,17 @@ class DistinctStreamingAggOperatorX final

DataDistribution required_data_distribution() const override {
if (_needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg)) {
return _is_colocate && _require_bucket_distribution
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
return StatefulOperatorX<DistinctStreamingAggLocalState>::required_data_distribution();
}

bool require_data_distribution() const override { return _is_colocate; }
bool require_shuffled_data_distribution() const override {
return _needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg);
}

private:
friend class DistinctStreamingAggLocalState;
Expand Down
3 changes: 3 additions & 0 deletions be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,9 @@ class HashJoinBuildSinkOperatorX final
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}

bool require_shuffled_data_distribution() const override {
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_broadcast_join;
}
bool is_shuffled_hash_join() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/pipeline/exec/hashjoin_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ class HashJoinProbeOperatorX final : public JoinProbeOperatorX<HashJoinProbeLoca
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs));
}

bool require_shuffled_data_distribution() const override {
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_broadcast_join;
}
bool is_shuffled_hash_join() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
Expand Down
6 changes: 6 additions & 0 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,11 @@ class OperatorBase {

virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); }
[[nodiscard]] virtual bool require_data_distribution() const { return false; }
[[nodiscard]] bool followed_by_shuffled_join() const { return _followed_by_shuffled_join; }
void set_followed_by_shuffled_join(bool followed_by_shuffled_join) {
_followed_by_shuffled_join = followed_by_shuffled_join;
}
[[nodiscard]] virtual bool require_shuffled_data_distribution() const { return false; }

protected:
OperatorBuilderBase* _operator_builder = nullptr;
Expand All @@ -263,6 +268,7 @@ class OperatorBase {
OperatorXPtr _child_x = nullptr;

bool _is_closed;
bool _followed_by_shuffled_join = false;
};

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,9 @@ class PartitionedAggSinkOperatorX : public DataSinkOperatorX<PartitionedAggSinkL
bool require_data_distribution() const override {
return _agg_sink_operator->require_data_distribution();
}
bool require_shuffled_data_distribution() const override {
return _agg_sink_operator->require_shuffled_data_distribution();
}

Status set_child(OperatorXPtr child) override {
RETURN_IF_ERROR(DataSinkOperatorX<PartitionedAggSinkLocalState>::set_child(child));
Expand Down
3 changes: 3 additions & 0 deletions be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,9 @@ class PartitionedHashJoinProbeOperatorX final
_distribution_partition_exprs));
}

bool require_shuffled_data_distribution() const override {
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
}
bool is_shuffled_hash_join() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
Expand Down
3 changes: 3 additions & 0 deletions be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ class PartitionedHashJoinSinkOperatorX
_distribution_partition_exprs);
}

bool require_shuffled_data_distribution() const override {
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
}
bool is_shuffled_hash_join() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/pipeline/exec/set_probe_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ class SetProbeSinkOperatorX final : public DataSinkOperatorX<SetProbeSinkLocalSt
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}

bool require_shuffled_data_distribution() const override { return true; }

std::shared_ptr<BasicSharedState> create_shared_state() const override { return nullptr; }

private:
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/set_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ class SetSinkOperatorX final : public DataSinkOperatorX<SetSinkLocalState<is_int
return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
bool require_shuffled_data_distribution() const override { return true; }

private:
template <class HashTableContext, bool is_intersected>
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
DataDistribution required_data_distribution() const override {
if (_is_analytic_sort) {
return _is_colocate && _require_bucket_distribution
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
} else if (_merge_by_exchange) {
Expand All @@ -96,6 +96,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
}
return DataSinkOperatorX<SortSinkLocalState>::required_data_distribution();
}
bool require_shuffled_data_distribution() const override { return _is_analytic_sort; }
bool require_data_distribution() const override { return _is_colocate; }

bool is_full_sort() const { return _algorithm == SortAlgorithm::FULL_SORT; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,40 @@ std::string LocalExchangeSinkLocalState::debug_string(int indentation_level) con
return fmt::to_string(debug_string_buffer);
}

Status LocalExchangeSinkOperatorX::init(ExchangeType type, const int num_buckets,
const bool should_disable_bucket_shuffle,
const std::map<int, int>& shuffle_idx_to_instance_idx) {
_name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) + ")";
_type = type;
if (_type == ExchangeType::HASH_SHUFFLE) {
// For shuffle join, if data distribution has been broken by previous operator, we
// should use a HASH_SHUFFLE local exchanger to shuffle data again. To be mentioned,
// we should use map shuffle idx to instance idx because all instances will be
// distributed to all BEs. Otherwise, we should use shuffle idx directly.
if (should_disable_bucket_shuffle) {
std::for_each(shuffle_idx_to_instance_idx.begin(), shuffle_idx_to_instance_idx.end(),
[&](const auto& item) {
DCHECK(item.first != -1);
_shuffle_idx_to_instance_idx.push_back({item.first, item.second});
});
} else {
_shuffle_idx_to_instance_idx.resize(_num_partitions);
for (int i = 0; i < _num_partitions; i++) {
_shuffle_idx_to_instance_idx[i] = {i, i};
}
}
_partitioner.reset(new vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
_num_partitions));
RETURN_IF_ERROR(_partitioner->init(_texprs));
} else if (_type == ExchangeType::BUCKET_HASH_SHUFFLE) {
_partitioner.reset(
new vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(num_buckets));
RETURN_IF_ERROR(_partitioner->init(_texprs));
}

return Status::OK();
}

Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* in_block,
bool eos) {
auto& local_state = get_local_state(state);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,38 +95,8 @@ class LocalExchangeSinkOperatorX final : public DataSinkOperatorX<LocalExchangeS
return Status::InternalError("{} should not init with TPlanNode", Base::_name);
}

Status init(ExchangeType type, const int num_buckets, const bool is_shuffled_hash_join,
const std::map<int, int>& shuffle_idx_to_instance_idx) override {
_name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) + ")";
_type = type;
if (_type == ExchangeType::HASH_SHUFFLE || _type == ExchangeType::BUCKET_HASH_SHUFFLE) {
// For shuffle join, if data distribution has been broken by previous operator, we
// should use a HASH_SHUFFLE local exchanger to shuffle data again. To be mentioned,
// we should use map shuffle idx to instance idx because all instances will be
// distributed to all BEs. Otherwise, we should use shuffle idx directly.
if (is_shuffled_hash_join) {
std::for_each(shuffle_idx_to_instance_idx.begin(),
shuffle_idx_to_instance_idx.end(), [&](const auto& item) {
DCHECK(item.first != -1);
_shuffle_idx_to_instance_idx.push_back({item.first, item.second});
});
} else {
_shuffle_idx_to_instance_idx.resize(_num_partitions);
for (int i = 0; i < _num_partitions; i++) {
_shuffle_idx_to_instance_idx[i] = {i, i};
}
}
_partitioner.reset(
_type == ExchangeType::HASH_SHUFFLE || _bucket_seq_to_instance_idx.empty()
? new vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
_num_partitions)
: new vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
num_buckets));
RETURN_IF_ERROR(_partitioner->init(_texprs));
}

return Status::OK();
}
Status init(ExchangeType type, const int num_buckets, const bool should_disable_bucket_shuffle,
const std::map<int, int>& shuffle_idx_to_instance_idx) override;

Status prepare(RuntimeState* state) override {
if (_type == ExchangeType::HASH_SHUFFLE || _type == ExchangeType::BUCKET_HASH_SHUFFLE) {
Expand Down
29 changes: 1 addition & 28 deletions be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,35 +196,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* __rest
new_block_wrapper->unref(local_state._shared_state);
}
}
} else if (bucket_seq_to_instance_idx.empty()) {
/**
* If type is `BUCKET_HASH_SHUFFLE` and `_bucket_seq_to_instance_idx` is empty, which
* means no scan operators is included in this fragment so we also need a `HASH_SHUFFLE` here.
*/
const auto& map = local_state._parent->cast<LocalExchangeSinkOperatorX>()
._shuffle_idx_to_instance_idx;
DCHECK(!map.empty());
new_block_wrapper->ref(map.size());
for (const auto& it : map) {
DCHECK(it.second >= 0 && it.second < _num_partitions)
<< it.first << " : " << it.second << " " << _num_partitions;
uint32_t start = local_state._partition_rows_histogram[it.first];
uint32_t size = local_state._partition_rows_histogram[it.first + 1] - start;
if (size > 0) {
local_state._shared_state->add_mem_usage(
it.second, new_block_wrapper->data_block.allocated_bytes(), false);

if (!_enqueue_data_and_set_ready(it.second, local_state,
{new_block_wrapper, {row_idx, start, size}})) {
local_state._shared_state->sub_mem_usage(
it.second, new_block_wrapper->data_block.allocated_bytes(), false);
new_block_wrapper->unref(local_state._shared_state);
}
} else {
new_block_wrapper->unref(local_state._shared_state);
}
}
} else {
DCHECK(!bucket_seq_to_instance_idx.empty());
new_block_wrapper->ref(_num_partitions);
for (size_t i = 0; i < _num_partitions; i++) {
uint32_t start = local_state._partition_rows_histogram[i];
Expand Down
Loading