From d51e3ea563267afffe8aab94880d88bf57f44c2f Mon Sep 17 00:00:00 2001 From: jacktengg <18241664+jacktengg@users.noreply.github.com> Date: Tue, 5 Dec 2023 14:08:44 +0800 Subject: [PATCH] [agg](profile) fix incorrent profile --- be/src/pipeline/exec/aggregation_sink_operator.cpp | 6 ++++-- be/src/vec/exec/distinct_vaggregation_node.cpp | 1 - be/src/vec/exec/vaggregation_node.cpp | 8 ++------ be/src/vec/exec/vaggregation_node.h | 2 -- be/src/vec/exprs/vectorized_agg_fn.cpp | 6 ------ be/src/vec/exprs/vectorized_agg_fn.h | 5 +---- 6 files changed, 7 insertions(+), 21 deletions(-) diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index 4d6f9636de7d70..a07d19f2c9a161 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -79,8 +79,6 @@ Status AggSinkLocalState::init(RuntimeState* state, Base::_shared_state->init_spill_partition_helper(p._spill_partition_count_bits); for (auto& evaluator : p._aggregate_evaluators) { Base::_shared_state->aggregate_evaluators.push_back(evaluator->clone(state, p._pool)); - Base::_shared_state->aggregate_evaluators.back()->set_timer(_exec_timer, _merge_timer, - _expr_timer); } if (p._is_streaming) { Base::_shared_state->data_queue->set_sink_dependency(Base::_dependency, 0); @@ -110,6 +108,10 @@ Status AggSinkLocalState::init(RuntimeState* state, _max_row_size_counter = ADD_COUNTER(Base::profile(), "MaxRowSizeInBytes", TUnit::UNIT); COUNTER_SET(_max_row_size_counter, (int64_t)0); + for (auto& evaluator : Base::_shared_state->aggregate_evaluators) { + evaluator->set_timer(_merge_timer, _expr_timer); + } + Base::_shared_state->agg_profile_arena = std::make_unique(); if (Base::_shared_state->probe_expr_ctxs.empty()) { diff --git a/be/src/vec/exec/distinct_vaggregation_node.cpp b/be/src/vec/exec/distinct_vaggregation_node.cpp index 642ad99bd93c64..a5c57792ba3aca 100644 --- a/be/src/vec/exec/distinct_vaggregation_node.cpp +++ b/be/src/vec/exec/distinct_vaggregation_node.cpp @@ -36,7 +36,6 @@ DistinctAggregationNode::DistinctAggregationNode(ObjectPool* pool, const TPlanNo Status DistinctAggregationNode::_distinct_pre_agg_with_serialized_key( doris::vectorized::Block* in_block, doris::vectorized::Block* out_block) { SCOPED_TIMER(_exec_timer); - SCOPED_TIMER(_build_timer); DCHECK(!_probe_expr_ctxs.empty()); size_t key_size = _probe_expr_ctxs.size(); diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index 2fd2be94e9baca..c594e99d02df1d 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -104,7 +104,6 @@ AggregationNode::AggregationNode(ObjectPool* pool, const TPlanNode& tnode, : ExecNode(pool, tnode, descs), _hash_table_compute_timer(nullptr), _hash_table_input_counter(nullptr), - _build_timer(nullptr), _expr_timer(nullptr), _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id), _intermediate_tuple_desc(nullptr), @@ -250,7 +249,6 @@ Status AggregationNode::prepare_profile(RuntimeState* state) { _serialize_key_arena_memory_usage = runtime_profile()->AddHighWaterMarkCounter( "SerializeKeyArena", TUnit::BYTES, "MemoryUsage"); - _build_timer = ADD_TIMER_WITH_LEVEL(runtime_profile(), "BuildTime", 1); _build_table_convert_timer = ADD_TIMER(runtime_profile(), "BuildConvertToPartitionedTime"); _serialize_key_timer = ADD_TIMER(runtime_profile(), "SerializeKeyTime"); _merge_timer = ADD_TIMER(runtime_profile(), "MergeTime"); @@ -293,7 +291,7 @@ Status AggregationNode::prepare_profile(RuntimeState* state) { // set profile timer to evaluators for (auto& evaluator : _aggregate_evaluators) { - evaluator->set_timer(_exec_timer, _merge_timer, _expr_timer); + evaluator->set_timer(_merge_timer, _expr_timer); } _offsets_of_aggregate_states.resize(_aggregate_evaluators.size()); @@ -404,7 +402,7 @@ Status AggregationNode::prepare_profile(RuntimeState* state) { _is_merge ? "true" : "false", _needs_finalize ? "true" : "false", _is_streaming_preagg ? "true" : "false", std::to_string(_aggregate_evaluators.size()), std::to_string(_limit)); - runtime_profile()->add_info_string("AggInfos:", fmt::to_string(msg)); + runtime_profile()->add_info_string("AggInfos", fmt::to_string(msg)); return Status::OK(); } @@ -675,7 +673,6 @@ Status AggregationNode::_serialize_without_key(RuntimeState* state, Block* block Status AggregationNode::_execute_without_key(Block* block) { DCHECK(_agg_data->without_key != nullptr); - SCOPED_TIMER(_build_timer); for (int i = 0; i < _aggregate_evaluators.size(); ++i) { RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_single_add( block, _agg_data->without_key + _offsets_of_aggregate_states[i], @@ -901,7 +898,6 @@ void AggregationNode::_find_in_hash_table(AggregateDataPtr* places, ColumnRawPtr Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* in_block, doris::vectorized::Block* out_block) { - SCOPED_TIMER(_build_timer); DCHECK(!_probe_expr_ctxs.empty()); size_t key_size = _probe_expr_ctxs.size(); diff --git a/be/src/vec/exec/vaggregation_node.h b/be/src/vec/exec/vaggregation_node.h index cf49817955c3d0..fba82aa8c9dda1 100644 --- a/be/src/vec/exec/vaggregation_node.h +++ b/be/src/vec/exec/vaggregation_node.h @@ -422,7 +422,6 @@ class AggregationNode : public ::doris::ExecNode { RuntimeProfile::Counter* _hash_table_compute_timer = nullptr; RuntimeProfile::Counter* _hash_table_emplace_timer = nullptr; RuntimeProfile::Counter* _hash_table_input_counter = nullptr; - RuntimeProfile::Counter* _build_timer = nullptr; RuntimeProfile::Counter* _expr_timer = nullptr; private: @@ -523,7 +522,6 @@ class AggregationNode : public ::doris::ExecNode { template Status _execute_with_serialized_key_helper(Block* block) { - SCOPED_TIMER(_build_timer); DCHECK(!_probe_expr_ctxs.empty()); size_t key_size = _probe_expr_ctxs.size(); diff --git a/be/src/vec/exprs/vectorized_agg_fn.cpp b/be/src/vec/exprs/vectorized_agg_fn.cpp index 06a776efacf3a1..f0f1d3f815d1da 100644 --- a/be/src/vec/exprs/vectorized_agg_fn.cpp +++ b/be/src/vec/exprs/vectorized_agg_fn.cpp @@ -69,7 +69,6 @@ AggFnEvaluator::AggFnEvaluator(const TExprNode& desc) _return_type(TypeDescriptor::from_thrift(desc.fn.ret_type)), _intermediate_slot_desc(nullptr), _output_slot_desc(nullptr), - _exec_timer(nullptr), _merge_timer(nullptr), _expr_timer(nullptr) { bool nullable = true; @@ -233,7 +232,6 @@ void AggFnEvaluator::destroy(AggregateDataPtr place) { Status AggFnEvaluator::execute_single_add(Block* block, AggregateDataPtr place, Arena* arena) { RETURN_IF_ERROR(_calc_argument_columns(block)); - SCOPED_TIMER(_exec_timer); _function->add_batch_single_place(block->rows(), place, _agg_columns.data(), arena); return Status::OK(); } @@ -241,7 +239,6 @@ Status AggFnEvaluator::execute_single_add(Block* block, AggregateDataPtr place, Status AggFnEvaluator::execute_batch_add(Block* block, size_t offset, AggregateDataPtr* places, Arena* arena, bool agg_many) { RETURN_IF_ERROR(_calc_argument_columns(block)); - SCOPED_TIMER(_exec_timer); _function->add_batch(block->rows(), places, offset, _agg_columns.data(), arena, agg_many); return Status::OK(); } @@ -249,7 +246,6 @@ Status AggFnEvaluator::execute_batch_add(Block* block, size_t offset, AggregateD Status AggFnEvaluator::execute_batch_add_selected(Block* block, size_t offset, AggregateDataPtr* places, Arena* arena) { RETURN_IF_ERROR(_calc_argument_columns(block)); - SCOPED_TIMER(_exec_timer); _function->add_batch_selected(block->rows(), places, offset, _agg_columns.data(), arena); return Status::OK(); } @@ -257,7 +253,6 @@ Status AggFnEvaluator::execute_batch_add_selected(Block* block, size_t offset, Status AggFnEvaluator::streaming_agg_serialize(Block* block, BufferWritable& buf, const size_t num_rows, Arena* arena) { RETURN_IF_ERROR(_calc_argument_columns(block)); - SCOPED_TIMER(_exec_timer); _function->streaming_agg_serialize(_agg_columns.data(), buf, num_rows, arena); return Status::OK(); } @@ -265,7 +260,6 @@ Status AggFnEvaluator::streaming_agg_serialize(Block* block, BufferWritable& buf Status AggFnEvaluator::streaming_agg_serialize_to_column(Block* block, MutableColumnPtr& dst, const size_t num_rows, Arena* arena) { RETURN_IF_ERROR(_calc_argument_columns(block)); - SCOPED_TIMER(_exec_timer); _function->streaming_agg_serialize_to_column(_agg_columns.data(), dst, num_rows, arena); return Status::OK(); } diff --git a/be/src/vec/exprs/vectorized_agg_fn.h b/be/src/vec/exprs/vectorized_agg_fn.h index b3fb4f6d5ebc1e..3cabd275614c1e 100644 --- a/be/src/vec/exprs/vectorized_agg_fn.h +++ b/be/src/vec/exprs/vectorized_agg_fn.h @@ -56,9 +56,7 @@ class AggFnEvaluator { const SlotDescriptor* intermediate_slot_desc, const SlotDescriptor* output_slot_desc); - void set_timer(RuntimeProfile::Counter* exec_timer, RuntimeProfile::Counter* merge_timer, - RuntimeProfile::Counter* expr_timer) { - _exec_timer = exec_timer; + void set_timer(RuntimeProfile::Counter* merge_timer, RuntimeProfile::Counter* expr_timer) { _merge_timer = merge_timer; _expr_timer = expr_timer; } @@ -123,7 +121,6 @@ class AggFnEvaluator { const SlotDescriptor* _intermediate_slot_desc = nullptr; const SlotDescriptor* _output_slot_desc = nullptr; - RuntimeProfile::Counter* _exec_timer = nullptr; RuntimeProfile::Counter* _merge_timer = nullptr; RuntimeProfile::Counter* _expr_timer = nullptr;