diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index d79623ba4c3130..08ab5bfae867c6 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -583,10 +583,8 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPla _limit(tnode.limit), _have_conjuncts((tnode.__isset.vconjunct && !tnode.vconjunct.nodes.empty()) || (tnode.__isset.conjuncts && !tnode.conjuncts.empty())), - _partition_exprs(require_bucket_distribution ? (tnode.__isset.distribute_expr_lists - ? tnode.distribute_expr_lists[0] - : std::vector {}) - : tnode.agg_node.grouping_exprs), + _partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0] + : std::vector {}), _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate && require_bucket_distribution), _agg_fn_output_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {} diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp b/be/src/pipeline/exec/analytic_sink_operator.cpp index 12c4e7634e71a6..44481bbb9c677b 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.cpp +++ b/be/src/pipeline/exec/analytic_sink_operator.cpp @@ -191,12 +191,14 @@ vectorized::BlockRowPos AnalyticSinkLocalState::_get_partition_by_end() { } AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int operator_id, - const TPlanNode& tnode, const DescriptorTbl& descs) + const TPlanNode& tnode, const DescriptorTbl& descs, + bool require_bucket_distribution) : DataSinkOperatorX(operator_id, tnode.node_id), _buffered_tuple_id(tnode.analytic_node.__isset.buffered_tuple_id ? tnode.analytic_node.buffered_tuple_id : 0), - _is_colocate(tnode.analytic_node.__isset.is_colocate && tnode.analytic_node.is_colocate), + _is_colocate(tnode.analytic_node.__isset.is_colocate && tnode.analytic_node.is_colocate && + require_bucket_distribution), _partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0] : std::vector {}) {} diff --git a/be/src/pipeline/exec/analytic_sink_operator.h b/be/src/pipeline/exec/analytic_sink_operator.h index 0881cc4ff64423..46e7ffac3e8495 100644 --- a/be/src/pipeline/exec/analytic_sink_operator.h +++ b/be/src/pipeline/exec/analytic_sink_operator.h @@ -67,7 +67,7 @@ class AnalyticSinkLocalState : public PipelineXSinkLocalState { public: AnalyticSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs); + const DescriptorTbl& descs, bool require_bucket_distribution); Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TPlanNode", DataSinkOperatorX::_name); diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp index 0ea9a06ac710e6..05137837473b70 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp @@ -323,7 +323,8 @@ void DistinctStreamingAggLocalState::_emplace_into_hash_table_to_distinct( DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs) + const DescriptorTbl& descs, + bool require_bucket_distribution) : StatefulOperatorX(pool, tnode, operator_id, descs), _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id), _output_tuple_id(tnode.agg_node.output_tuple_id), @@ -331,7 +332,8 @@ DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, i _is_first_phase(tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase), _partition_exprs(tnode.__isset.distribute_expr_lists ? tnode.distribute_expr_lists[0] : std::vector {}), - _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate) { + _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate && + require_bucket_distribution) { if (tnode.agg_node.__isset.use_streaming_preaggregation) { _is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation; if (_is_streaming_preagg) { diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h index 0a3af64ed467cb..1256210952dad5 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h @@ -96,7 +96,7 @@ class DistinctStreamingAggOperatorX final : public StatefulOperatorX { public: DistinctStreamingAggOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs); + const DescriptorTbl& descs, bool require_bucket_distribution); Status init(const TPlanNode& tnode, RuntimeState* state) override; Status prepare(RuntimeState* state) override; Status open(RuntimeState* state) override; diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 0d40d3d49f090b..fb66c5222ab27c 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -148,6 +148,10 @@ class HashJoinBuildSinkOperatorX final bool is_shuffled_hash_join() const override { return _join_distribution == TJoinDistributionType::PARTITIONED; } + bool require_data_distribution() const override { + return _join_distribution == TJoinDistributionType::COLOCATE || + _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE; + } private: friend class HashJoinBuildSinkLocalState; diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index d86be6b43cec43..77ae990276078c 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -160,6 +160,10 @@ class HashJoinProbeOperatorX final : public JoinProbeOperatorXrequire_data_distribution(); + } private: Status _revoke_memory(RuntimeState* state); diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index e527d601fff61f..87e45672542d20 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -127,6 +127,10 @@ class PartitionedHashJoinSinkOperatorX _inner_probe_operator = probe_operator; } + bool require_data_distribution() const override { + return _inner_probe_operator->require_data_distribution(); + } + private: friend class PartitionedHashJoinSinkLocalState; diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp b/be/src/pipeline/exec/sort_sink_operator.cpp index 201c31d353b5df..282596032374ed 100644 --- a/be/src/pipeline/exec/sort_sink_operator.cpp +++ b/be/src/pipeline/exec/sort_sink_operator.cpp @@ -73,7 +73,7 @@ Status SortSinkLocalState::open(RuntimeState* state) { } SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs) + const DescriptorTbl& descs, bool require_bucket_distribution) : DataSinkOperatorX(operator_id, tnode.node_id), _offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0), _pool(pool), @@ -82,7 +82,9 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, const TP _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples), _use_two_phase_read(tnode.sort_node.sort_info.use_two_phase_read), _merge_by_exchange(tnode.sort_node.merge_by_exchange), - _is_colocate(tnode.sort_node.__isset.is_colocate ? tnode.sort_node.is_colocate : false), + _is_colocate(tnode.sort_node.__isset.is_colocate + ? tnode.sort_node.is_colocate && require_bucket_distribution + : require_bucket_distribution), _is_analytic_sort(tnode.sort_node.__isset.is_analytic_sort ? tnode.sort_node.is_analytic_sort : false), diff --git a/be/src/pipeline/exec/sort_sink_operator.h b/be/src/pipeline/exec/sort_sink_operator.h index ba279a4aac4f2a..5f5fce881e2e0c 100644 --- a/be/src/pipeline/exec/sort_sink_operator.h +++ b/be/src/pipeline/exec/sort_sink_operator.h @@ -53,7 +53,7 @@ class SortSinkLocalState : public PipelineXSinkLocalState { class SortSinkOperatorX final : public DataSinkOperatorX { public: SortSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs); + const DescriptorTbl& descs, bool require_bucket_distribution); Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TPlanNode", DataSinkOperatorX::_name); diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 1d1834f7dd21e6..33c222110cb07d 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -112,9 +112,11 @@ Status SpillSortSinkLocalState::setup_in_memory_sort_op(RuntimeState* state) { } SpillSortSinkOperatorX::SpillSortSinkOperatorX(ObjectPool* pool, int operator_id, - const TPlanNode& tnode, const DescriptorTbl& descs) + const TPlanNode& tnode, const DescriptorTbl& descs, + bool require_bucket_distribution) : DataSinkOperatorX(operator_id, tnode.node_id) { - _sort_sink_operator = std::make_unique(pool, operator_id, tnode, descs); + _sort_sink_operator = std::make_unique(pool, operator_id, tnode, descs, + require_bucket_distribution); } Status SpillSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h b/be/src/pipeline/exec/spill_sort_sink_operator.h index 699612abf7a53c..978682d4bdee30 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.h +++ b/be/src/pipeline/exec/spill_sort_sink_operator.h @@ -63,7 +63,7 @@ class SpillSortSinkOperatorX final : public DataSinkOperatorX::_name); diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 89588e39471196..0d7ca325f89e72 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -1167,7 +1167,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo request.query_options.__isset.enable_distinct_streaming_aggregation && request.query_options.enable_distinct_streaming_aggregation && !tnode.agg_node.grouping_exprs.empty()) { - op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs)); + op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs, + _require_bucket_distribution)); RETURN_IF_ERROR(cur_pipe->add_operator(op)); } else if (tnode.agg_node.__isset.use_streaming_preaggregation && tnode.agg_node.use_streaming_preaggregation && @@ -1265,6 +1266,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo _pipeline_parent_map.push(op->node_id(), cur_pipe); _pipeline_parent_map.push(op->node_id(), build_side_pipe); } + _require_bucket_distribution = + _require_bucket_distribution || op->require_data_distribution(); break; } case TPlanNodeType::CROSS_JOIN_NODE: { @@ -1327,9 +1330,11 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo DataSinkOperatorXPtr sink; if (_runtime_state->enable_sort_spill()) { - sink.reset(new SpillSortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs)); + sink.reset(new SpillSortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, + _require_bucket_distribution)); } else { - sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs)); + sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, + _require_bucket_distribution)); } sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); @@ -1366,7 +1371,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo _dag[downstream_pipeline_id].push_back(cur_pipe->id()); DataSinkOperatorXPtr sink; - sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs)); + sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs, + _require_bucket_distribution)); sink->set_dests_id({op->operator_id()}); RETURN_IF_ERROR(cur_pipe->set_sink(sink)); RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, _runtime_state.get())); @@ -1426,7 +1432,6 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo return Status::InternalError("Unsupported exec type in pipelineX: {}", print_plan_node_type(tnode.node_type)); } - _require_bucket_distribution = true; return Status::OK(); } diff --git a/be/src/pipeline/pipeline_task.cpp b/be/src/pipeline/pipeline_task.cpp index 867dc49dc33b03..3a956a9a863479 100644 --- a/be/src/pipeline/pipeline_task.cpp +++ b/be/src/pipeline/pipeline_task.cpp @@ -54,7 +54,6 @@ PipelineTask::PipelineTask( int task_idx) : _index(task_id), _pipeline(pipeline), - _prepared(false), _opened(false), _state(state), _fragment_context(fragment_context), @@ -117,7 +116,6 @@ Status PipelineTask::prepare(const TPipelineInstanceParams& local_params, const std::copy(deps.begin(), deps.end(), std::inserter(_filter_dependencies, _filter_dependencies.end())); } - _prepared = true; return Status::OK(); } @@ -172,7 +170,6 @@ void PipelineTask::_init_profile() { _wait_worker_timer = ADD_TIMER(_task_profile, "WaitWorkerTime"); - _block_counts = ADD_COUNTER(_task_profile, "NumBlockedTimes", TUnit::UNIT); _schedule_counts = ADD_COUNTER(_task_profile, "NumScheduleTimes", TUnit::UNIT); _yield_counts = ADD_COUNTER(_task_profile, "NumYieldTimes", TUnit::UNIT); _core_change_times = ADD_COUNTER(_task_profile, "CoreChangeTimes", TUnit::UNIT); diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index 0965ec1c18f65d..83ad8bec25840b 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -235,7 +235,6 @@ class PipelineTask { uint32_t _index; PipelinePtr _pipeline; bool _has_exceed_timeout = false; - bool _prepared; bool _opened; RuntimeState* _state = nullptr; int _previous_schedule_id = -1; @@ -254,7 +253,6 @@ class PipelineTask { // 3 update task statistics(update _queue_level/_core_id) int _queue_level = 0; int _core_id = 0; - Status _open_status = Status::OK(); RuntimeProfile* _parent_profile = nullptr; std::unique_ptr _task_profile; @@ -266,7 +264,6 @@ class PipelineTask { RuntimeProfile::Counter* _get_block_counter = nullptr; RuntimeProfile::Counter* _sink_timer = nullptr; RuntimeProfile::Counter* _close_timer = nullptr; - RuntimeProfile::Counter* _block_counts = nullptr; RuntimeProfile::Counter* _schedule_counts = nullptr; MonotonicStopWatch _wait_worker_watcher; RuntimeProfile::Counter* _wait_worker_timer = nullptr;