Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ void AggSinkLocalState::_init_hash_method(const vectorized::VExprContextSPtrs& p
}

AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
const DescriptorTbl& descs)
const DescriptorTbl& descs, bool require_bucket_distribution)
: DataSinkOperatorX<AggSinkLocalState>(operator_id, tnode.node_id),
_intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
_intermediate_tuple_desc(nullptr),
Expand All @@ -628,9 +628,11 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPla
_limit(tnode.limit),
_have_conjuncts((tnode.__isset.vconjunct && !tnode.vconjunct.nodes.empty()) ||
(tnode.__isset.conjuncts && !tnode.conjuncts.empty())),
_partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0]
: std::vector<TExpr> {}),
_partition_exprs(tnode.__isset.distribute_expr_lists && require_bucket_distribution
? tnode.distribute_expr_lists[0]
: tnode.agg_node.grouping_exprs),
_is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate),
_require_bucket_distribution(require_bucket_distribution),
_agg_fn_output_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {}

Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
Expand Down
9 changes: 6 additions & 3 deletions be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class AggSinkLocalState : public PipelineXSinkLocalState<AggSharedState> {
class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {
public:
AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
const DescriptorTbl& descs);
const DescriptorTbl& descs, bool require_bucket_distribution);
~AggSinkOperatorX() override = default;
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TPlanNode",
Expand All @@ -164,9 +164,11 @@ class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {
? DataDistribution(ExchangeType::PASSTHROUGH)
: DataSinkOperatorX<AggSinkLocalState>::required_data_distribution();
}
return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
return _is_colocate && _require_bucket_distribution
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
bool require_data_distribution() const override { return _is_colocate; }
size_t get_revocable_mem_size(RuntimeState* state) const;

vectorized::AggregatedDataVariants* get_agg_data(RuntimeState* state) {
Expand Down Expand Up @@ -213,6 +215,7 @@ class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {

const std::vector<TExpr> _partition_exprs;
const bool _is_colocate;
const bool _require_bucket_distribution;

RowDescriptor _agg_fn_output_row_descriptor;
};
Expand Down
9 changes: 6 additions & 3 deletions be/src/pipeline/exec/analytic_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,14 +193,17 @@ vectorized::BlockRowPos AnalyticSinkLocalState::_get_partition_by_end() {
}

AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int operator_id,
const TPlanNode& tnode, const DescriptorTbl& descs)
const TPlanNode& tnode, const DescriptorTbl& descs,
bool require_bucket_distribution)
: DataSinkOperatorX(operator_id, tnode.node_id),
_buffered_tuple_id(tnode.analytic_node.__isset.buffered_tuple_id
? tnode.analytic_node.buffered_tuple_id
: 0),
_is_colocate(tnode.analytic_node.__isset.is_colocate && tnode.analytic_node.is_colocate),
_partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0]
: std::vector<TExpr> {}) {}
_require_bucket_distribution(require_bucket_distribution),
_partition_exprs(tnode.__isset.distribute_expr_lists && require_bucket_distribution
? tnode.distribute_expr_lists[0]
: tnode.analytic_node.partition_exprs) {}

Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
Expand Down
7 changes: 5 additions & 2 deletions be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class AnalyticSinkLocalState : public PipelineXSinkLocalState<AnalyticSharedStat
class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalState> {
public:
AnalyticSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
const DescriptorTbl& descs);
const DescriptorTbl& descs, bool require_bucket_distribution);
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TPlanNode",
DataSinkOperatorX<AnalyticSinkLocalState>::_name);
Expand All @@ -102,13 +102,15 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalSt
if (_partition_by_eq_expr_ctxs.empty()) {
return {ExchangeType::PASSTHROUGH};
} else if (_order_by_eq_expr_ctxs.empty()) {
return _is_colocate
return _is_colocate && _require_bucket_distribution
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
return DataSinkOperatorX<AnalyticSinkLocalState>::required_data_distribution();
}

bool require_data_distribution() const override { return true; }

private:
Status _insert_range_column(vectorized::Block* block, const vectorized::VExprContextSPtr& expr,
vectorized::IColumn* dst_column, size_t length);
Expand All @@ -125,6 +127,7 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalSt

std::vector<size_t> _num_agg_input;
const bool _is_colocate;
const bool _require_bucket_distribution;
const std::vector<TExpr> _partition_exprs;
};

Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/datagen_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ Status DataGenLocalState::init(RuntimeState* state, LocalStateInfo& info) {
// TODO: use runtime filter to filte result block, maybe this node need derive from vscan_node.
for (const auto& filter_desc : p._runtime_filter_descs) {
IRuntimeFilter* runtime_filter = nullptr;
RETURN_IF_ERROR(state->register_consumer_runtime_filter(filter_desc, false, p.node_id(),
&runtime_filter));
RETURN_IF_ERROR(state->register_consumer_runtime_filter(
filter_desc, p.ignore_data_distribution(), p.node_id(), &runtime_filter));
runtime_filter->init_profile(_runtime_profile.get());
}
return Status::OK();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,17 +374,20 @@ void DistinctStreamingAggLocalState::_emplace_into_hash_table_to_distinct(

DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, int operator_id,
const TPlanNode& tnode,
const DescriptorTbl& descs)
const DescriptorTbl& descs,
bool require_bucket_distribution)
: StatefulOperatorX<DistinctStreamingAggLocalState>(pool, tnode, operator_id, descs),
_intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
_intermediate_tuple_desc(nullptr),
_output_tuple_id(tnode.agg_node.output_tuple_id),
_output_tuple_desc(nullptr),
_needs_finalize(tnode.agg_node.need_finalize),
_is_first_phase(tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase),
_partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0]
: std::vector<TExpr> {}),
_is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate) {
_partition_exprs(tnode.__isset.distribute_expr_lists && require_bucket_distribution
? tnode.distribute_expr_lists[0]
: tnode.agg_node.grouping_exprs),
_is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate),
_require_bucket_distribution(require_bucket_distribution) {
if (tnode.agg_node.__isset.use_streaming_preaggregation) {
_is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation;
if (_is_streaming_preagg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class DistinctStreamingAggOperatorX final
: public StatefulOperatorX<DistinctStreamingAggLocalState> {
public:
DistinctStreamingAggOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
const DescriptorTbl& descs);
const DescriptorTbl& descs, bool require_bucket_distribution);
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;
Expand All @@ -107,13 +107,15 @@ class DistinctStreamingAggOperatorX final

DataDistribution required_data_distribution() const override {
if (_needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg)) {
return _is_colocate
return _is_colocate && _require_bucket_distribution
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
return StatefulOperatorX<DistinctStreamingAggLocalState>::required_data_distribution();
}

bool require_data_distribution() const override { return _is_colocate; }

private:
friend class DistinctStreamingAggLocalState;
TupleId _intermediate_tuple_id;
Expand All @@ -125,6 +127,7 @@ class DistinctStreamingAggOperatorX final
const bool _is_first_phase;
const std::vector<TExpr> _partition_exprs;
const bool _is_colocate;
const bool _require_bucket_distribution;
// group by k1,k2
vectorized::VExprContextSPtrs _probe_expr_ctxs;
std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators;
Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ class HashJoinBuildSinkOperatorX final
bool is_shuffled_hash_join() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
bool require_data_distribution() const override {
return _join_distribution == TJoinDistributionType::COLOCATE ||
_join_distribution == TJoinDistributionType::BUCKET_SHUFFLE;
}

private:
friend class HashJoinBuildSinkLocalState;
Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/exec/hashjoin_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,10 @@ class HashJoinProbeOperatorX final : public JoinProbeOperatorX<HashJoinProbeLoca
bool is_shuffled_hash_join() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
bool require_data_distribution() const override {
return _join_distribution == TJoinDistributionType::COLOCATE ||
_join_distribution == TJoinDistributionType::BUCKET_SHUFFLE;
}

private:
Status _do_evaluate(vectorized::Block& block, vectorized::VExprContextSPtrs& exprs,
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ class OperatorBase {

virtual size_t revocable_mem_size(RuntimeState* state) const { return 0; }

virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); };
virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); }
[[nodiscard]] virtual bool require_data_distribution() const { return false; }

protected:
OperatorBuilderBase* _operator_builder = nullptr;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,11 @@ void PartitionedAggSinkLocalState::update_profile(RuntimeProfile* child_profile)

PartitionedAggSinkOperatorX::PartitionedAggSinkOperatorX(ObjectPool* pool, int operator_id,
const TPlanNode& tnode,
const DescriptorTbl& descs)
const DescriptorTbl& descs,
bool require_bucket_distribution)
: DataSinkOperatorX<PartitionedAggSinkLocalState>(operator_id, tnode.node_id) {
_agg_sink_operator = std::make_unique<AggSinkOperatorX>(pool, operator_id, tnode, descs);
_agg_sink_operator = std::make_unique<AggSinkOperatorX>(pool, operator_id, tnode, descs,
require_bucket_distribution);
}

Status PartitionedAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ class PartitionedAggSinkLocalState
class PartitionedAggSinkOperatorX : public DataSinkOperatorX<PartitionedAggSinkLocalState> {
public:
PartitionedAggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
const DescriptorTbl& descs);
const DescriptorTbl& descs, bool require_bucket_distribution);
~PartitionedAggSinkOperatorX() override = default;
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TPlanNode",
Expand All @@ -308,6 +308,10 @@ class PartitionedAggSinkOperatorX : public DataSinkOperatorX<PartitionedAggSinkL
return _agg_sink_operator->required_data_distribution();
}

bool require_data_distribution() const override {
return _agg_sink_operator->require_data_distribution();
}

Status set_child(OperatorXPtr child) override {
RETURN_IF_ERROR(DataSinkOperatorX<PartitionedAggSinkLocalState>::set_child(child));
return _agg_sink_operator->set_child(child);
Expand Down
3 changes: 3 additions & 0 deletions be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ class PartitionedHashJoinProbeOperatorX final
_inner_sink_operator = sink_operator;
_inner_probe_operator = probe_operator;
}
bool require_data_distribution() const override {
return _inner_probe_operator->require_data_distribution();
}

private:
Status _revoke_memory(RuntimeState* state);
Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ class PartitionedHashJoinSinkOperatorX
_inner_probe_operator = probe_operator;
}

bool require_data_distribution() const override {
return _inner_probe_operator->require_data_distribution();
}

private:
friend class PartitionedHashJoinSinkLocalState;

Expand Down
5 changes: 3 additions & 2 deletions be/src/pipeline/exec/sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ Status SortSinkLocalState::open(RuntimeState* state) {
}

SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
const DescriptorTbl& descs)
const DescriptorTbl& descs, bool require_bucket_distribution)
: DataSinkOperatorX(operator_id, tnode.node_id),
_offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0),
_pool(pool),
Expand All @@ -85,7 +85,8 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, const TP
_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
_use_two_phase_read(tnode.sort_node.sort_info.use_two_phase_read),
_merge_by_exchange(tnode.sort_node.merge_by_exchange),
_is_colocate(tnode.sort_node.__isset.is_colocate ? tnode.sort_node.is_colocate : false),
_is_colocate(tnode.sort_node.__isset.is_colocate && tnode.sort_node.is_colocate),
_require_bucket_distribution(require_bucket_distribution),
_is_analytic_sort(tnode.sort_node.__isset.is_analytic_sort
? tnode.sort_node.is_analytic_sort
: false),
Expand Down
6 changes: 4 additions & 2 deletions be/src/pipeline/exec/sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class SortSinkLocalState : public PipelineXSinkLocalState<SortSharedState> {
class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
public:
SortSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
const DescriptorTbl& descs);
const DescriptorTbl& descs, bool require_bucket_distribution);
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TPlanNode",
DataSinkOperatorX<SortSinkLocalState>::_name);
Expand All @@ -87,7 +87,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
DataDistribution required_data_distribution() const override {
if (_is_analytic_sort) {
return _is_colocate
return _is_colocate && _require_bucket_distribution
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
} else if (_merge_by_exchange) {
Expand All @@ -96,6 +96,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
}
return DataSinkOperatorX<SortSinkLocalState>::required_data_distribution();
}
bool require_data_distribution() const override { return _is_colocate; }

bool is_full_sort() const { return _algorithm == SortAlgorithm::FULL_SORT; }

Expand Down Expand Up @@ -128,6 +129,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
const bool _use_two_phase_read;
const bool _merge_by_exchange;
const bool _is_colocate = false;
const bool _require_bucket_distribution = false;
const bool _is_analytic_sort = false;
const std::vector<TExpr> _partition_exprs;
};
Expand Down
6 changes: 4 additions & 2 deletions be/src/pipeline/exec/spill_sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,11 @@ Status SpillSortSinkLocalState::setup_in_memory_sort_op(RuntimeState* state) {
}

SpillSortSinkOperatorX::SpillSortSinkOperatorX(ObjectPool* pool, int operator_id,
const TPlanNode& tnode, const DescriptorTbl& descs)
const TPlanNode& tnode, const DescriptorTbl& descs,
bool require_bucket_distribution)
: DataSinkOperatorX(operator_id, tnode.node_id) {
_sort_sink_operator = std::make_unique<SortSinkOperatorX>(pool, operator_id, tnode, descs);
_sort_sink_operator = std::make_unique<SortSinkOperatorX>(pool, operator_id, tnode, descs,
require_bucket_distribution);
}

Status SpillSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
Expand Down
Loading