Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -583,10 +583,8 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPla
_limit(tnode.limit),
_have_conjuncts((tnode.__isset.vconjunct && !tnode.vconjunct.nodes.empty()) ||
(tnode.__isset.conjuncts && !tnode.conjuncts.empty())),
_partition_exprs(require_bucket_distribution ? (tnode.__isset.distribute_expr_lists
? tnode.distribute_expr_lists[0]
: std::vector<TExpr> {})
: tnode.agg_node.grouping_exprs),
_partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0]
: std::vector<TExpr> {}),
_is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate &&
require_bucket_distribution),
_agg_fn_output_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {}
Expand Down
6 changes: 4 additions & 2 deletions be/src/pipeline/exec/analytic_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,14 @@ vectorized::BlockRowPos AnalyticSinkLocalState::_get_partition_by_end() {
}

AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int operator_id,
const TPlanNode& tnode, const DescriptorTbl& descs)
const TPlanNode& tnode, const DescriptorTbl& descs,
bool require_bucket_distribution)
: DataSinkOperatorX(operator_id, tnode.node_id),
_buffered_tuple_id(tnode.analytic_node.__isset.buffered_tuple_id
? tnode.analytic_node.buffered_tuple_id
: 0),
_is_colocate(tnode.analytic_node.__isset.is_colocate && tnode.analytic_node.is_colocate),
_is_colocate(tnode.analytic_node.__isset.is_colocate && tnode.analytic_node.is_colocate &&
require_bucket_distribution),
_partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0]
: std::vector<TExpr> {}) {}

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class AnalyticSinkLocalState : public PipelineXSinkLocalState<AnalyticSharedStat
class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalState> {
public:
AnalyticSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
const DescriptorTbl& descs);
const DescriptorTbl& descs, bool require_bucket_distribution);
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TPlanNode",
DataSinkOperatorX<AnalyticSinkLocalState>::_name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,15 +323,17 @@ void DistinctStreamingAggLocalState::_emplace_into_hash_table_to_distinct(

DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, int operator_id,
const TPlanNode& tnode,
const DescriptorTbl& descs)
const DescriptorTbl& descs,
bool require_bucket_distribution)
: StatefulOperatorX<DistinctStreamingAggLocalState>(pool, tnode, operator_id, descs),
_intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
_output_tuple_id(tnode.agg_node.output_tuple_id),
_needs_finalize(tnode.agg_node.need_finalize),
_is_first_phase(tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase),
_partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0]
: std::vector<TExpr> {}),
_is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate) {
_is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate &&
require_bucket_distribution) {
if (tnode.agg_node.__isset.use_streaming_preaggregation) {
_is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation;
if (_is_streaming_preagg) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class DistinctStreamingAggOperatorX final
: public StatefulOperatorX<DistinctStreamingAggLocalState> {
public:
DistinctStreamingAggOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
const DescriptorTbl& descs);
const DescriptorTbl& descs, bool require_bucket_distribution);
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;
Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ class HashJoinBuildSinkOperatorX final
bool is_shuffled_hash_join() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
bool require_data_distribution() const override {
return _join_distribution == TJoinDistributionType::COLOCATE ||
_join_distribution == TJoinDistributionType::BUCKET_SHUFFLE;
}

private:
friend class HashJoinBuildSinkLocalState;
Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/exec/hashjoin_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ class HashJoinProbeOperatorX final : public JoinProbeOperatorX<HashJoinProbeLoca
bool is_shuffled_hash_join() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
bool require_data_distribution() const override {
return _join_distribution == TJoinDistributionType::COLOCATE ||
_join_distribution == TJoinDistributionType::BUCKET_SHUFFLE;
}

private:
Status _do_evaluate(vectorized::Block& block, vectorized::VExprContextSPtrs& exprs,
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ class OperatorBase {

virtual size_t revocable_mem_size(RuntimeState* state) const { return 0; }

virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); };
virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); }
[[nodiscard]] virtual bool require_data_distribution() const { return false; }

protected:
OperatorXPtr _child_x = nullptr;
Expand Down
3 changes: 3 additions & 0 deletions be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ class PartitionedHashJoinProbeOperatorX final
_inner_sink_operator = sink_operator;
_inner_probe_operator = probe_operator;
}
bool require_data_distribution() const override {
return _inner_probe_operator->require_data_distribution();
}

private:
Status _revoke_memory(RuntimeState* state);
Expand Down
4 changes: 4 additions & 0 deletions be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ class PartitionedHashJoinSinkOperatorX
_inner_probe_operator = probe_operator;
}

bool require_data_distribution() const override {
return _inner_probe_operator->require_data_distribution();
}

private:
friend class PartitionedHashJoinSinkLocalState;

Expand Down
6 changes: 4 additions & 2 deletions be/src/pipeline/exec/sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ Status SortSinkLocalState::open(RuntimeState* state) {
}

SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
const DescriptorTbl& descs)
const DescriptorTbl& descs, bool require_bucket_distribution)
: DataSinkOperatorX(operator_id, tnode.node_id),
_offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0),
_pool(pool),
Expand All @@ -82,7 +82,9 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, const TP
_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
_use_two_phase_read(tnode.sort_node.sort_info.use_two_phase_read),
_merge_by_exchange(tnode.sort_node.merge_by_exchange),
_is_colocate(tnode.sort_node.__isset.is_colocate ? tnode.sort_node.is_colocate : false),
_is_colocate(tnode.sort_node.__isset.is_colocate
? tnode.sort_node.is_colocate && require_bucket_distribution
: require_bucket_distribution),
_is_analytic_sort(tnode.sort_node.__isset.is_analytic_sort
? tnode.sort_node.is_analytic_sort
: false),
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class SortSinkLocalState : public PipelineXSinkLocalState<SortSharedState> {
class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
public:
SortSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
const DescriptorTbl& descs);
const DescriptorTbl& descs, bool require_bucket_distribution);
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TPlanNode",
DataSinkOperatorX<SortSinkLocalState>::_name);
Expand Down
6 changes: 4 additions & 2 deletions be/src/pipeline/exec/spill_sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,11 @@ Status SpillSortSinkLocalState::setup_in_memory_sort_op(RuntimeState* state) {
}

SpillSortSinkOperatorX::SpillSortSinkOperatorX(ObjectPool* pool, int operator_id,
const TPlanNode& tnode, const DescriptorTbl& descs)
const TPlanNode& tnode, const DescriptorTbl& descs,
bool require_bucket_distribution)
: DataSinkOperatorX(operator_id, tnode.node_id) {
_sort_sink_operator = std::make_unique<SortSinkOperatorX>(pool, operator_id, tnode, descs);
_sort_sink_operator = std::make_unique<SortSinkOperatorX>(pool, operator_id, tnode, descs,
require_bucket_distribution);
}

Status SpillSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/spill_sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class SpillSortSinkOperatorX final : public DataSinkOperatorX<SpillSortSinkLocal
public:
using LocalStateType = SpillSortSinkLocalState;
SpillSortSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
const DescriptorTbl& descs);
const DescriptorTbl& descs, bool require_bucket_distribution);
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TPlanNode",
DataSinkOperatorX<SpillSortSinkLocalState>::_name);
Expand Down
15 changes: 10 additions & 5 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1167,7 +1167,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
request.query_options.__isset.enable_distinct_streaming_aggregation &&
request.query_options.enable_distinct_streaming_aggregation &&
!tnode.agg_node.grouping_exprs.empty()) {
op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs));
op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs,
_require_bucket_distribution));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
} else if (tnode.agg_node.__isset.use_streaming_preaggregation &&
tnode.agg_node.use_streaming_preaggregation &&
Expand Down Expand Up @@ -1265,6 +1266,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
_pipeline_parent_map.push(op->node_id(), cur_pipe);
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
}
_require_bucket_distribution =
_require_bucket_distribution || op->require_data_distribution();
break;
}
case TPlanNodeType::CROSS_JOIN_NODE: {
Expand Down Expand Up @@ -1327,9 +1330,11 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo

DataSinkOperatorXPtr sink;
if (_runtime_state->enable_sort_spill()) {
sink.reset(new SpillSortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs));
sink.reset(new SpillSortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
_require_bucket_distribution));
} else {
sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs));
sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
_require_bucket_distribution));
}
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
Expand Down Expand Up @@ -1366,7 +1371,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
_dag[downstream_pipeline_id].push_back(cur_pipe->id());

DataSinkOperatorXPtr sink;
sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs));
sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
_require_bucket_distribution));
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get()));
Expand Down Expand Up @@ -1426,7 +1432,6 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
return Status::InternalError("Unsupported exec type in pipelineX: {}",
print_plan_node_type(tnode.node_type));
}
_require_bucket_distribution = true;

return Status::OK();
}
Expand Down
3 changes: 0 additions & 3 deletions be/src/pipeline/pipeline_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ PipelineTask::PipelineTask(
int task_idx)
: _index(task_id),
_pipeline(pipeline),
_prepared(false),
_opened(false),
_state(state),
_fragment_context(fragment_context),
Expand Down Expand Up @@ -117,7 +116,6 @@ Status PipelineTask::prepare(const TPipelineInstanceParams& local_params, const
std::copy(deps.begin(), deps.end(),
std::inserter(_filter_dependencies, _filter_dependencies.end()));
}
_prepared = true;
return Status::OK();
}

Expand Down Expand Up @@ -172,7 +170,6 @@ void PipelineTask::_init_profile() {

_wait_worker_timer = ADD_TIMER(_task_profile, "WaitWorkerTime");

_block_counts = ADD_COUNTER(_task_profile, "NumBlockedTimes", TUnit::UNIT);
_schedule_counts = ADD_COUNTER(_task_profile, "NumScheduleTimes", TUnit::UNIT);
_yield_counts = ADD_COUNTER(_task_profile, "NumYieldTimes", TUnit::UNIT);
_core_change_times = ADD_COUNTER(_task_profile, "CoreChangeTimes", TUnit::UNIT);
Expand Down
3 changes: 0 additions & 3 deletions be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ class PipelineTask {
uint32_t _index;
PipelinePtr _pipeline;
bool _has_exceed_timeout = false;
bool _prepared;
bool _opened;
RuntimeState* _state = nullptr;
int _previous_schedule_id = -1;
Expand All @@ -254,7 +253,6 @@ class PipelineTask {
// 3 update task statistics(update _queue_level/_core_id)
int _queue_level = 0;
int _core_id = 0;
Status _open_status = Status::OK();

RuntimeProfile* _parent_profile = nullptr;
std::unique_ptr<RuntimeProfile> _task_profile;
Expand All @@ -266,7 +264,6 @@ class PipelineTask {
RuntimeProfile::Counter* _get_block_counter = nullptr;
RuntimeProfile::Counter* _sink_timer = nullptr;
RuntimeProfile::Counter* _close_timer = nullptr;
RuntimeProfile::Counter* _block_counts = nullptr;
RuntimeProfile::Counter* _schedule_counts = nullptr;
MonotonicStopWatch _wait_worker_watcher;
RuntimeProfile::Counter* _wait_worker_timer = nullptr;
Expand Down