Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -581,10 +581,11 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPla
_limit(tnode.limit),
_have_conjuncts((tnode.__isset.vconjunct && !tnode.vconjunct.nodes.empty()) ||
(tnode.__isset.conjuncts && !tnode.conjuncts.empty())),
_partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0]
: std::vector<TExpr> {}),
_is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate &&
require_bucket_distribution),
_partition_exprs(tnode.__isset.distribute_expr_lists && require_bucket_distribution
? tnode.distribute_expr_lists[0]
: tnode.agg_node.grouping_exprs),
_is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate),
_require_bucket_distribution(require_bucket_distribution),
_agg_fn_output_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {}

Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
Expand Down
7 changes: 5 additions & 2 deletions be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,11 @@ class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {
? DataDistribution(ExchangeType::PASSTHROUGH)
: DataSinkOperatorX<AggSinkLocalState>::required_data_distribution();
}
return _is_colocate ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
return _is_colocate && _require_bucket_distribution
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
bool require_data_distribution() const override { return _is_colocate; }
size_t get_revocable_mem_size(RuntimeState* state) const;

vectorized::AggregatedDataVariants* get_agg_data(RuntimeState* state) {
Expand Down Expand Up @@ -195,6 +197,7 @@ class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {

const std::vector<TExpr> _partition_exprs;
const bool _is_colocate;
const bool _require_bucket_distribution;

RowDescriptor _agg_fn_output_row_descriptor;
};
Expand Down
9 changes: 5 additions & 4 deletions be/src/pipeline/exec/analytic_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,11 @@ AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int operator_id,
_buffered_tuple_id(tnode.analytic_node.__isset.buffered_tuple_id
? tnode.analytic_node.buffered_tuple_id
: 0),
_is_colocate(tnode.analytic_node.__isset.is_colocate && tnode.analytic_node.is_colocate &&
require_bucket_distribution),
_partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0]
: std::vector<TExpr> {}) {}
_is_colocate(tnode.analytic_node.__isset.is_colocate && tnode.analytic_node.is_colocate),
_require_bucket_distribution(require_bucket_distribution),
_partition_exprs(tnode.__isset.distribute_expr_lists && require_bucket_distribution
? tnode.distribute_expr_lists[0]
: tnode.analytic_node.partition_exprs) {}

Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
Expand Down
5 changes: 4 additions & 1 deletion be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,15 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalSt
if (_partition_by_eq_expr_ctxs.empty()) {
return {ExchangeType::PASSTHROUGH};
} else if (_order_by_eq_expr_ctxs.empty()) {
return _is_colocate
return _is_colocate && _require_bucket_distribution
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
return DataSinkOperatorX<AnalyticSinkLocalState>::required_data_distribution();
}

bool require_data_distribution() const override { return true; }

private:
Status _insert_range_column(vectorized::Block* block, const vectorized::VExprContextSPtr& expr,
vectorized::IColumn* dst_column, size_t length);
Expand All @@ -106,6 +108,7 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalSt

std::vector<size_t> _num_agg_input;
const bool _is_colocate;
const bool _require_bucket_distribution;
const std::vector<TExpr> _partition_exprs;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,10 +330,11 @@ DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, i
_output_tuple_id(tnode.agg_node.output_tuple_id),
_needs_finalize(tnode.agg_node.need_finalize),
_is_first_phase(tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase),
_partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0]
: std::vector<TExpr> {}),
_is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate &&
require_bucket_distribution) {
_partition_exprs(tnode.__isset.distribute_expr_lists && require_bucket_distribution
? tnode.distribute_expr_lists[0]
: tnode.agg_node.grouping_exprs),
_is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate),
_require_bucket_distribution(require_bucket_distribution) {
if (tnode.agg_node.__isset.use_streaming_preaggregation) {
_is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation;
if (_is_streaming_preagg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,15 @@ class DistinctStreamingAggOperatorX final

DataDistribution required_data_distribution() const override {
if (_needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg)) {
return _is_colocate
return _is_colocate && _require_bucket_distribution
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
return StatefulOperatorX<DistinctStreamingAggLocalState>::required_data_distribution();
}

bool require_data_distribution() const override { return _is_colocate; }

private:
friend class DistinctStreamingAggLocalState;
TupleId _intermediate_tuple_id;
Expand All @@ -124,6 +126,7 @@ class DistinctStreamingAggOperatorX final
const bool _is_first_phase;
const std::vector<TExpr> _partition_exprs;
const bool _is_colocate;
const bool _require_bucket_distribution;
// group by k1,k2
vectorized::VExprContextSPtrs _probe_expr_ctxs;
std::vector<vectorized::AggFnEvaluator*> _aggregate_evaluators;
Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,10 @@ class PartitionedAggSinkOperatorX : public DataSinkOperatorX<PartitionedAggSinkL
return _agg_sink_operator->required_data_distribution();
}

bool require_data_distribution() const override {
return _agg_sink_operator->require_data_distribution();
}

Status set_child(OperatorXPtr child) override {
RETURN_IF_ERROR(DataSinkOperatorX<PartitionedAggSinkLocalState>::set_child(child));
return _agg_sink_operator->set_child(child);
Expand Down
5 changes: 2 additions & 3 deletions be/src/pipeline/exec/sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,8 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, const TP
_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
_use_two_phase_read(tnode.sort_node.sort_info.use_two_phase_read),
_merge_by_exchange(tnode.sort_node.merge_by_exchange),
_is_colocate(tnode.sort_node.__isset.is_colocate
? tnode.sort_node.is_colocate && require_bucket_distribution
: require_bucket_distribution),
_is_colocate(tnode.sort_node.__isset.is_colocate && tnode.sort_node.is_colocate),
_require_bucket_distribution(require_bucket_distribution),
_is_analytic_sort(tnode.sort_node.__isset.is_analytic_sort
? tnode.sort_node.is_analytic_sort
: false),
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
DataDistribution required_data_distribution() const override {
if (_is_analytic_sort) {
return _is_colocate
return _is_colocate && _require_bucket_distribution
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
} else if (_merge_by_exchange) {
Expand All @@ -75,6 +75,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
}
return DataSinkOperatorX<SortSinkLocalState>::required_data_distribution();
}
bool require_data_distribution() const override { return _is_colocate; }

bool is_full_sort() const { return _algorithm == SortAlgorithm::FULL_SORT; }

Expand Down Expand Up @@ -106,6 +107,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
const bool _use_two_phase_read;
const bool _merge_by_exchange;
const bool _is_colocate = false;
const bool _require_bucket_distribution = false;
const bool _is_analytic_sort = false;
const std::vector<TExpr> _partition_exprs;
};
Expand Down
3 changes: 3 additions & 0 deletions be/src/pipeline/exec/spill_sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ class SpillSortSinkOperatorX final : public DataSinkOperatorX<SpillSortSinkLocal
DataDistribution required_data_distribution() const override {
return _sort_sink_operator->required_data_distribution();
}
bool require_data_distribution() const override {
return _sort_sink_operator->require_data_distribution();
}
Status set_child(OperatorXPtr child) override {
RETURN_IF_ERROR(DataSinkOperatorX<SpillSortSinkLocalState>::set_child(child));
return _sort_sink_operator->set_child(child);
Expand Down
8 changes: 8 additions & 0 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1175,6 +1175,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
!tnode.agg_node.grouping_exprs.empty()) {
op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs,
_require_bucket_distribution));
_require_bucket_distribution =
_require_bucket_distribution || op->require_data_distribution();
RETURN_IF_ERROR(cur_pipe->add_operator(op));
} else if (tnode.agg_node.__isset.use_streaming_preaggregation &&
tnode.agg_node.use_streaming_preaggregation &&
Expand Down Expand Up @@ -1204,6 +1206,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
_require_bucket_distribution));
}
_require_bucket_distribution =
_require_bucket_distribution || sink->require_data_distribution();
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
Expand Down Expand Up @@ -1342,6 +1346,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
_require_bucket_distribution));
}
_require_bucket_distribution =
_require_bucket_distribution || sink->require_data_distribution();
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
Expand Down Expand Up @@ -1379,6 +1385,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
DataSinkOperatorXPtr sink;
sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
_require_bucket_distribution));
_require_bucket_distribution =
_require_bucket_distribution || sink->require_data_distribution();
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
Expand Down