From 073281c8444385133f88c83a3581d9f700952f72 Mon Sep 17 00:00:00 2001 From: Amos Bird Date: Sat, 28 May 2022 13:46:36 +0800 Subject: [PATCH 1/2] [Bugfix(Vec)] Fix some memory leak issues --- be/src/vec/exec/join/vhash_join_node.cpp | 2 ++ be/src/vec/exec/vaggregation_node.cpp | 5 +++-- be/src/vec/exec/vexchange_node.cpp | 3 +++ be/src/vec/exec/vsort_node.cpp | 3 +-- be/src/vec/exprs/vexpr_context.cpp | 9 ++++++++- be/src/vec/exprs/vexpr_context.h | 1 + 6 files changed, 18 insertions(+), 5 deletions(-) diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp index e3d83cfe119cfc..e9d61a982d052b 100644 --- a/be/src/vec/exec/join/vhash_join_node.cpp +++ b/be/src/vec/exec/join/vhash_join_node.cpp @@ -820,6 +820,8 @@ Status HashJoinNode::close(RuntimeState* state) { return Status::OK(); } + VExpr::close(_build_expr_ctxs, state); + VExpr::close(_probe_expr_ctxs, state); if (_vother_join_conjunct_ptr) (*_vother_join_conjunct_ptr)->close(state); _hash_table_mem_tracker->release(_mem_used); diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp index e7ff52f1d6ca86..b8f92bd4bfe2a6 100644 --- a/be/src/vec/exec/vaggregation_node.cpp +++ b/be/src/vec/exec/vaggregation_node.cpp @@ -406,10 +406,11 @@ Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) { Status AggregationNode::close(RuntimeState* state) { if (is_closed()) return Status::OK(); - RETURN_IF_ERROR(ExecNode::close(state)); + for (auto* aggregate_evaluator : _aggregate_evaluators) aggregate_evaluator->close(state); VExpr::close(_probe_expr_ctxs, state); if (_executor.close) _executor.close(); - return Status::OK(); + + return ExecNode::close(state); } Status AggregationNode::_create_agg_status(AggregateDataPtr data) { diff --git a/be/src/vec/exec/vexchange_node.cpp b/be/src/vec/exec/vexchange_node.cpp index 57cbcff921fb46..f790aa74363eed 100644 --- a/be/src/vec/exec/vexchange_node.cpp +++ b/be/src/vec/exec/vexchange_node.cpp @@ -103,6 +103,9 @@ Status VExchangeNode::close(RuntimeState* state) { if (_stream_recvr != nullptr) { _stream_recvr->close(); } + + if (_is_merging) _vsort_exec_exprs.close(state); + return ExecNode::close(state); } diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp index 4d4284bc26168d..0a3aa18b2fd80d 100644 --- a/be/src/vec/exec/vsort_node.cpp +++ b/be/src/vec/exec/vsort_node.cpp @@ -107,8 +107,7 @@ Status VSortNode::close(RuntimeState* state) { } _block_mem_tracker->release(_total_mem_usage); _vsort_exec_exprs.close(state); - ExecNode::close(state); - return Status::OK(); + return ExecNode::close(state); } void VSortNode::debug_string(int indentation_level, stringstream* out) const { diff --git a/be/src/vec/exprs/vexpr_context.cpp b/be/src/vec/exprs/vexpr_context.cpp index 6615e0ff8c56ae..ea3300eaf38538 100644 --- a/be/src/vec/exprs/vexpr_context.cpp +++ b/be/src/vec/exprs/vexpr_context.cpp @@ -30,6 +30,14 @@ VExprContext::VExprContext(VExpr* expr) _closed(false), _last_result_column_id(-1) {} +VExprContext::~VExprContext() { + DCHECK(!_prepared || _closed); + + for (int i = 0; i < _fn_contexts.size(); ++i) { + delete _fn_contexts[i]; + } +} + doris::Status VExprContext::execute(doris::vectorized::Block* block, int* result_column_id) { Status st = _root->execute(this, block, result_column_id); _last_result_column_id = *result_column_id; @@ -68,7 +76,6 @@ void VExprContext::close(doris::RuntimeState* state) { for (int i = 0; i < _fn_contexts.size(); ++i) { _fn_contexts[i]->impl()->close(); - delete _fn_contexts[i]; } // _pool can be NULL if Prepare() was never called if (_pool != NULL) { diff --git a/be/src/vec/exprs/vexpr_context.h b/be/src/vec/exprs/vexpr_context.h index 80da73531b6f36..86a9d086a6ff59 100644 --- a/be/src/vec/exprs/vexpr_context.h +++ b/be/src/vec/exprs/vexpr_context.h @@ -27,6 +27,7 @@ class VExpr; class VExprContext { public: VExprContext(VExpr* expr); + ~VExprContext(); Status prepare(RuntimeState* state, const RowDescriptor& row_desc, const std::shared_ptr& tracker); Status open(RuntimeState* state); From abe529a5392aee77c040da229497d66304b43de7 Mon Sep 17 00:00:00 2001 From: Amos Bird Date: Sun, 29 May 2022 12:59:27 +0800 Subject: [PATCH 2/2] Fix one more case --- be/src/vec/exec/vanalytic_eval_node.cpp | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp index b4fe98ec869474..a88c7a746a0c67 100644 --- a/be/src/vec/exec/vanalytic_eval_node.cpp +++ b/be/src/vec/exec/vanalytic_eval_node.cpp @@ -223,6 +223,9 @@ Status VAnalyticEvalNode::open(RuntimeState* state) { for (size_t i = 0; i < _agg_functions_size; ++i) { RETURN_IF_ERROR(VExpr::open(_agg_expr_ctxs[i], state)); } + for (auto* agg_function : _agg_functions) { + RETURN_IF_ERROR(agg_function->open(state)); + } return Status::OK(); } @@ -230,9 +233,14 @@ Status VAnalyticEvalNode::close(RuntimeState* state) { if (is_closed()) { return Status::OK(); } - ExecNode::close(state); + + VExpr::close(_partition_by_eq_expr_ctxs, state); + VExpr::close(_order_by_eq_expr_ctxs, state); + for (size_t i = 0; i < _agg_functions_size; ++i) VExpr::close(_agg_expr_ctxs[i], state); + for (auto* agg_function : _agg_functions) agg_function->close(state); + _destory_agg_status(); - return Status::OK(); + return ExecNode::close(state); } Status VAnalyticEvalNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {