From 9c08221e345485779bc3c3512c3f36420cfc6770 Mon Sep 17 00:00:00 2001 From: zhangstar333 <87313068+zhangstar333@users.noreply.github.com> Date: Fri, 22 Mar 2024 19:43:14 +0800 Subject: [PATCH] [bug](udaf) fix memory leak in the java udaf (#32630) fix memory leak in the java udaf --- .../aggregate_function_java_udaf.h | 40 +++++++++++++------ 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h index 05bda448077dc0..720e143d4a5eca 100644 --- a/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h +++ b/be/src/vec/aggregate_functions/aggregate_function_java_udaf.h @@ -58,21 +58,28 @@ struct AggregateJavaUdafData { AggregateJavaUdafData() = default; AggregateJavaUdafData(int64_t num_args) { argument_size = num_args; } - ~AggregateJavaUdafData() { + ~AggregateJavaUdafData() = default; + + Status close_and_delete_object() { JNIEnv* env = nullptr; - //use defer as the jni call maybe have error, then directly return without delete - Defer defer([&, this] { - std::string temp; - temp.swap(serialize_data); + Defer defer {[&]() { if (env != nullptr) { env->DeleteGlobalRef(executor_cl); env->DeleteGlobalRef(executor_obj); } - }); - Status status; - RETURN_IF_STATUS_ERROR(status, JniUtil::GetJNIEnv(&env)); + }}; + Status st = JniUtil::GetJNIEnv(&env); + if (!st.ok()) { + LOG(WARNING) << "Failed to get JNIEnv"; + return st; + } env->CallNonvirtualVoidMethod(executor_obj, executor_cl, executor_close_id); - RETURN_IF_STATUS_ERROR(status, JniUtil::GetJniExceptionMsg(env)); + st = JniUtil::GetJniExceptionMsg(env); + if (!st.ok()) { + LOG(WARNING) << "Failed to close JAVA UDAF: " << st.to_string(); + return st; + } + return Status::OK(); } Status init_udaf(const TFunction& fn, const std::string& local_location) { @@ -521,8 +528,8 @@ class AggregateJavaUdaf final } void create(AggregateDataPtr __restrict place) const override { + new (place) Data(argument_types.size()); if (_first_created) { - new (place) Data(argument_types.size()); Status status = Status::OK(); SAFE_CREATE(RETURN_IF_STATUS_ERROR(status, this->data(place).init_udaf(_fn, _local_location)), @@ -532,16 +539,24 @@ class AggregateJavaUdaf final }); _first_created = false; _exec_place = place; + if (UNLIKELY(!status.ok())) { + throw doris::Exception(ErrorCode::INTERNAL_ERROR, status.to_string()); + } } } // To avoid multiple times JNI call, Here will destroy all data at once void destroy(AggregateDataPtr __restrict place) const noexcept override { if (place == _exec_place) { - this->data(_exec_place).destroy(); - this->data(_exec_place).~Data(); + Status status = Status::OK(); + status = this->data(_exec_place).destroy(); + status = this->data(_exec_place).close_and_delete_object(); _first_created = true; + if (UNLIKELY(!status.ok())) { + LOG(WARNING) << "Failed to destroy function: " << status.to_string(); + } } + this->data(place).~Data(); } String get_name() const override { return _fn.name.function_name; } @@ -625,7 +640,6 @@ class AggregateJavaUdaf final // so it's can't call ~Data, only to change _destory_deserialize flag. void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf, Arena*) const override { - new (place) Data(argument_types.size()); this->data(place).read(buf); }