Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion be/src/vec/exec/distinct_vaggregation_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ DistinctAggregationNode::DistinctAggregationNode(ObjectPool* pool, const TPlanNo

Status DistinctAggregationNode::_distinct_pre_agg_with_serialized_key(
doris::vectorized::Block* in_block, doris::vectorized::Block* out_block) {
SCOPED_TIMER(_build_timer);
DCHECK(!_probe_expr_ctxs.empty());

size_t key_size = _probe_expr_ctxs.size();
Expand Down
13 changes: 7 additions & 6 deletions be/src/vec/exec/vaggregation_node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ AggregationNode::AggregationNode(ObjectPool* pool, const TPlanNode& tnode,
: ExecNode(pool, tnode, descs),
_hash_table_compute_timer(nullptr),
_hash_table_input_counter(nullptr),
_build_timer(nullptr),
_expr_timer(nullptr),
_exec_timer(nullptr),
_intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
Expand Down Expand Up @@ -332,7 +331,6 @@ Status AggregationNode::prepare_profile(RuntimeState* state) {
_serialize_key_arena_memory_usage = runtime_profile()->AddHighWaterMarkCounter(
"SerializeKeyArena", TUnit::BYTES, "MemoryUsage");

_build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
_build_table_convert_timer = ADD_TIMER(runtime_profile(), "BuildConvertToPartitionedTime");
_serialize_key_timer = ADD_TIMER(runtime_profile(), "SerializeKeyTime");
_exec_timer = ADD_TIMER(runtime_profile(), "ExecTime");
Expand Down Expand Up @@ -375,7 +373,7 @@ Status AggregationNode::prepare_profile(RuntimeState* state) {

// set profile timer to evaluators
for (auto& evaluator : _aggregate_evaluators) {
evaluator->set_timer(_exec_timer, _merge_timer, _expr_timer);
evaluator->set_timer(_merge_timer, _expr_timer);
}

_offsets_of_aggregate_states.resize(_aggregate_evaluators.size());
Expand Down Expand Up @@ -486,19 +484,21 @@ Status AggregationNode::prepare_profile(RuntimeState* state) {
_is_merge ? "true" : "false", _needs_finalize ? "true" : "false",
_is_streaming_preagg ? "true" : "false",
std::to_string(_aggregate_evaluators.size()), std::to_string(_limit));
runtime_profile()->add_info_string("AggInfos:", fmt::to_string(msg));
runtime_profile()->add_info_string("AggInfos", fmt::to_string(msg));
return Status::OK();
}

Status AggregationNode::prepare(RuntimeState* state) {
SCOPED_TIMER(_runtime_profile->total_time_counter());

RETURN_IF_ERROR(ExecNode::prepare(state));
SCOPED_TIMER(_exec_timer);
RETURN_IF_ERROR(prepare_profile(state));
return Status::OK();
}

Status AggregationNode::alloc_resource(doris::RuntimeState* state) {
SCOPED_TIMER(_exec_timer);
RETURN_IF_ERROR(ExecNode::alloc_resource(state));

RETURN_IF_ERROR(VExpr::open(_probe_expr_ctxs, state));
Expand Down Expand Up @@ -547,6 +547,7 @@ Status AggregationNode::open(RuntimeState* state) {

Status AggregationNode::do_pre_agg(vectorized::Block* input_block,
vectorized::Block* output_block) {
SCOPED_TIMER(_exec_timer);
RETURN_IF_ERROR(_executor.pre_agg(input_block, output_block));

// pre stream agg need use _num_row_return to decide whether to do pre stream agg
Expand Down Expand Up @@ -585,6 +586,7 @@ Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) {
}

Status AggregationNode::pull(doris::RuntimeState* state, vectorized::Block* block, bool* eos) {
SCOPED_TIMER(_exec_timer);
RETURN_IF_ERROR(_executor.get_result(state, block, eos));
_make_nullable_output_key(block);
// dispose the having clause, should not be execute in prestreaming agg
Expand All @@ -595,6 +597,7 @@ Status AggregationNode::pull(doris::RuntimeState* state, vectorized::Block* bloc
}

Status AggregationNode::sink(doris::RuntimeState* state, vectorized::Block* in_block, bool eos) {
SCOPED_TIMER(_exec_timer);
if (in_block->rows() > 0) {
RETURN_IF_ERROR(_executor.execute(in_block));
RETURN_IF_ERROR(_try_spill_disk());
Expand Down Expand Up @@ -744,7 +747,6 @@ Status AggregationNode::_serialize_without_key(RuntimeState* state, Block* block

Status AggregationNode::_execute_without_key(Block* block) {
DCHECK(_agg_data->without_key != nullptr);
SCOPED_TIMER(_build_timer);
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
RETURN_IF_ERROR(_aggregate_evaluators[i]->execute_single_add(
block, _agg_data->without_key + _offsets_of_aggregate_states[i],
Expand Down Expand Up @@ -1059,7 +1061,6 @@ void AggregationNode::_find_in_hash_table(AggregateDataPtr* places, ColumnRawPtr

Status AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* in_block,
doris::vectorized::Block* out_block) {
SCOPED_TIMER(_build_timer);
DCHECK(!_probe_expr_ctxs.empty());

size_t key_size = _probe_expr_ctxs.size();
Expand Down
2 changes: 0 additions & 2 deletions be/src/vec/exec/vaggregation_node.h
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,6 @@ class AggregationNode : public ::doris::ExecNode {
std::vector<size_t> _make_nullable_keys;
RuntimeProfile::Counter* _hash_table_compute_timer;
RuntimeProfile::Counter* _hash_table_input_counter;
RuntimeProfile::Counter* _build_timer;
RuntimeProfile::Counter* _expr_timer;
RuntimeProfile::Counter* _exec_timer;

Expand Down Expand Up @@ -973,7 +972,6 @@ class AggregationNode : public ::doris::ExecNode {
private:
template <bool limit>
Status _execute_with_serialized_key_helper(Block* block) {
SCOPED_TIMER(_build_timer);
DCHECK(!_probe_expr_ctxs.empty());

size_t key_size = _probe_expr_ctxs.size();
Expand Down
6 changes: 0 additions & 6 deletions be/src/vec/exprs/vectorized_agg_fn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ AggFnEvaluator::AggFnEvaluator(const TExprNode& desc)
_return_type(TypeDescriptor::from_thrift(desc.fn.ret_type)),
_intermediate_slot_desc(nullptr),
_output_slot_desc(nullptr),
_exec_timer(nullptr),
_merge_timer(nullptr),
_expr_timer(nullptr) {
bool nullable = true;
Expand Down Expand Up @@ -233,39 +232,34 @@ void AggFnEvaluator::destroy(AggregateDataPtr place) {

Status AggFnEvaluator::execute_single_add(Block* block, AggregateDataPtr place, Arena* arena) {
RETURN_IF_ERROR(_calc_argument_columns(block));
SCOPED_TIMER(_exec_timer);
_function->add_batch_single_place(block->rows(), place, _agg_columns.data(), arena);
return Status::OK();
}

Status AggFnEvaluator::execute_batch_add(Block* block, size_t offset, AggregateDataPtr* places,
Arena* arena, bool agg_many) {
RETURN_IF_ERROR(_calc_argument_columns(block));
SCOPED_TIMER(_exec_timer);
_function->add_batch(block->rows(), places, offset, _agg_columns.data(), arena, agg_many);
return Status::OK();
}

Status AggFnEvaluator::execute_batch_add_selected(Block* block, size_t offset,
AggregateDataPtr* places, Arena* arena) {
RETURN_IF_ERROR(_calc_argument_columns(block));
SCOPED_TIMER(_exec_timer);
_function->add_batch_selected(block->rows(), places, offset, _agg_columns.data(), arena);
return Status::OK();
}

Status AggFnEvaluator::streaming_agg_serialize(Block* block, BufferWritable& buf,
const size_t num_rows, Arena* arena) {
RETURN_IF_ERROR(_calc_argument_columns(block));
SCOPED_TIMER(_exec_timer);
_function->streaming_agg_serialize(_agg_columns.data(), buf, num_rows, arena);
return Status::OK();
}

Status AggFnEvaluator::streaming_agg_serialize_to_column(Block* block, MutableColumnPtr& dst,
const size_t num_rows, Arena* arena) {
RETURN_IF_ERROR(_calc_argument_columns(block));
SCOPED_TIMER(_exec_timer);
_function->streaming_agg_serialize_to_column(_agg_columns.data(), dst, num_rows, arena);
return Status::OK();
}
Expand Down
5 changes: 1 addition & 4 deletions be/src/vec/exprs/vectorized_agg_fn.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,7 @@ class AggFnEvaluator {
const SlotDescriptor* intermediate_slot_desc,
const SlotDescriptor* output_slot_desc);

void set_timer(RuntimeProfile::Counter* exec_timer, RuntimeProfile::Counter* merge_timer,
RuntimeProfile::Counter* expr_timer) {
_exec_timer = exec_timer;
void set_timer(RuntimeProfile::Counter* merge_timer, RuntimeProfile::Counter* expr_timer) {
_merge_timer = merge_timer;
_expr_timer = expr_timer;
}
Expand Down Expand Up @@ -120,7 +118,6 @@ class AggFnEvaluator {
const SlotDescriptor* _intermediate_slot_desc;
const SlotDescriptor* _output_slot_desc;

RuntimeProfile::Counter* _exec_timer;
RuntimeProfile::Counter* _merge_timer;
RuntimeProfile::Counter* _expr_timer;

Expand Down