From 44f5d23154d9534b2d38648414ea26f68689db6a Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Fri, 14 Oct 2022 03:59:57 +0800 Subject: [PATCH 1/8] 20221010_fix_bthread_local --- be/src/runtime/exec_env.h | 3 + be/src/runtime/exec_env_init.cpp | 3 +- be/src/runtime/memory/mem_tracker_limiter.cpp | 3 +- .../runtime/memory/thread_mem_tracker_mgr.h | 31 ++++--- be/src/runtime/thread_context.cpp | 32 +------ be/src/runtime/thread_context.h | 93 +++++++++---------- be/src/service/internal_service.cpp | 35 +------ 7 files changed, 70 insertions(+), 130 deletions(-) diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index fc70a6d3dbf972..f616f949200937 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -129,6 +129,7 @@ class ExecEnv { } std::shared_ptr orphan_mem_tracker() { return _orphan_mem_tracker; } MemTrackerLimiter* orphan_mem_tracker_raw() { return _orphan_mem_tracker_raw; } + std::shared_ptr bthread_mem_tracker() { return _bthread_mem_tracker; } std::shared_ptr query_pool_mem_tracker() { return _query_pool_mem_tracker; } std::shared_ptr load_pool_mem_tracker() { return _load_pool_mem_tracker; } MemTrackerTaskPool* task_pool_mem_tracker_registry() { return _task_pool_mem_tracker_registry; } @@ -227,6 +228,8 @@ class ExecEnv { // and the consumption of the orphan mem tracker is close to 0, but greater than 0. std::shared_ptr _orphan_mem_tracker; MemTrackerLimiter* _orphan_mem_tracker_raw; + // Bthread default mem tracker + std::shared_ptr _bthread_mem_tracker; // The ancestor for all querys tracker. std::shared_ptr _query_pool_mem_tracker; // The ancestor for all load tracker. diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index a55f5477782980..f2b550498a60de 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -208,7 +208,8 @@ Status ExecEnv::_init_mem_tracker() { std::make_shared(global_memory_limit_bytes, "Process"); _orphan_mem_tracker = std::make_shared(-1, "Orphan", _process_mem_tracker); _orphan_mem_tracker_raw = _orphan_mem_tracker.get(); - thread_context()->_thread_mem_tracker_mgr->init_impl(); + _bthread_mem_tracker = std::make_shared(-1, "Bthread", _orphan_mem_tracker); + thread_context()->_thread_mem_tracker_mgr->init(); thread_context()->_thread_mem_tracker_mgr->set_check_attach(false); #if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && \ !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) && !defined(USE_JEMALLOC) diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 1ca75b2af4d427..9d9594a5814966 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -74,7 +74,7 @@ MemTrackerLimiter::MemTrackerLimiter(int64_t byte_limit, const std::string& labe MemTrackerLimiter::~MemTrackerLimiter() { // TCMalloc hook will be triggered during destructor memtracker, may cause crash. - if (_label == "Process") doris::thread_context_ptr._init = false; + if (_label == "Process") doris::thread_context_ptr.init = false; DCHECK(remain_child_count() == 0 || _label == "Process"); // In order to ensure `consumption of all limiter trackers` + `orphan tracker consumption` = `process tracker consumption` // in real time. Merge its consumption into orphan when parent is process, to avoid repetition. @@ -212,7 +212,6 @@ std::string MemTrackerLimiter::log_usage(int max_recursive_depth, } void MemTrackerLimiter::print_log_usage(const std::string& msg) { - DCHECK(_limit != -1); // only print the tracker log_usage in be log. std::string detail = msg; detail += "\n " + fmt::format( diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 4c9099528d43bd..beaba558f99698 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -59,7 +59,7 @@ class ThreadMemTrackerMgr { // After thread initialization, calling `init` again must call `clear_untracked_mems` first // to avoid memory tracking loss. void init(); - void init_impl(); + void clear(); // After attach, the current thread TCMalloc Hook starts to consume/release task mem_tracker void attach_limiter_tracker(const std::string& task_id, const TUniqueId& fragment_instance_id, @@ -88,11 +88,11 @@ class ThreadMemTrackerMgr { bool is_attach_query() { return _fragment_instance_id_stack.back() != TUniqueId(); } std::shared_ptr limiter_mem_tracker() { - if (_limiter_tracker_raw == nullptr) init_impl(); + if (_init == false) init(); return _limiter_tracker_stack.back(); } MemTrackerLimiter* limiter_mem_tracker_raw() { - if (_limiter_tracker_raw == nullptr) init_impl(); + if (_init == false) init(); return _limiter_tracker_raw; } @@ -119,6 +119,8 @@ class ThreadMemTrackerMgr { void exceeded(const std::string& failed_msg); private: + // is false: ExecEnv::GetInstance()->initialized() = false when thread local is initialized + bool _init = false; // Cache untracked mem, only update to _untracked_mems when switching mem tracker. // Frequent calls to unordered_map _untracked_mems[] in consume will degrade performance. int64_t _untracked_mem = 0; @@ -141,23 +143,22 @@ class ThreadMemTrackerMgr { }; inline void ThreadMemTrackerMgr::init() { - DCHECK(_consumer_tracker_stack.empty()); - // _limiter_tracker_stack[0] = orphan_mem_tracker - DCHECK(_limiter_tracker_stack.size() <= 1) - << "limiter_tracker_stack.size(): " << _limiter_tracker_stack.size(); - if (_limiter_tracker_raw == nullptr && ExecEnv::GetInstance()->initialized()) { - init_impl(); - } -} - -inline void ThreadMemTrackerMgr::init_impl() { DCHECK(_limiter_tracker_stack.size() == 0); - DCHECK(_limiter_tracker_raw == nullptr); + DCHECK(_consumer_tracker_stack.empty()); _limiter_tracker_stack.push_back(ExecEnv::GetInstance()->orphan_mem_tracker()); _limiter_tracker_raw = ExecEnv::GetInstance()->orphan_mem_tracker_raw(); _task_id_stack.push_back(""); _fragment_instance_id_stack.push_back(TUniqueId()); _check_limit = true; + _init = true; +} + +inline void ThreadMemTrackerMgr::clear() { + std::vector>().swap(_limiter_tracker_stack); + std::vector().swap(_consumer_tracker_stack); + std::vector().swap(_task_id_stack); + std::vector().swap(_fragment_instance_id_stack); + flush_untracked_mem(); } inline void ThreadMemTrackerMgr::push_consumer_tracker(MemTracker* tracker) { @@ -196,7 +197,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() { // Temporary memory may be allocated during the consumption of the mem tracker, which will lead to entering // the TCMalloc Hook again, so suspend consumption to avoid falling into an infinite loop. _stop_consume = true; - if (_limiter_tracker_raw == nullptr) init_impl(); + if (_init == false) init(); DCHECK(_limiter_tracker_raw); old_untracked_mem = _untracked_mem; if (CheckLimit) { diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index 432c5be8798f1f..7631abaa7b0d96 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -26,7 +26,7 @@ DEFINE_STATIC_THREAD_LOCAL(ThreadContext, ThreadContextPtr, _ptr); ThreadContextPtr::ThreadContextPtr() { INIT_STATIC_THREAD_LOCAL(ThreadContext, _ptr); - _init = true; + init = true; } AttachTask::AttachTask(const std::shared_ptr& mem_tracker, @@ -70,34 +70,4 @@ AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() { } } -SwitchBthread::SwitchBthread() { - _bthread_context = static_cast(bthread_getspecific(btls_key)); - // First call to bthread_getspecific (and before any bthread_setspecific) returns NULL - if (_bthread_context == nullptr) { - // Create thread-local data on demand. - _bthread_context = new ThreadContext; - // set the data so that next time bthread_getspecific in the thread returns the data. - CHECK_EQ(0, bthread_setspecific(btls_key, _bthread_context)); - } else { - DCHECK(_bthread_context->type() == ThreadContext::TaskType::UNKNOWN); - _bthread_context->_thread_mem_tracker_mgr->flush_untracked_mem(); - } - _bthread_context->_thread_mem_tracker_mgr->init(); - _bthread_context->set_type(ThreadContext::TaskType::BRPC); - bthread_context_key = btls_key; - bthread_context = _bthread_context; -} - -SwitchBthread::~SwitchBthread() { - DCHECK(_bthread_context != nullptr); - _bthread_context->_thread_mem_tracker_mgr->flush_untracked_mem(); - _bthread_context->_thread_mem_tracker_mgr->init(); - _bthread_context->set_type(ThreadContext::TaskType::UNKNOWN); - bthread_context = nullptr; - bthread_context_key = EMPTY_BTLS_KEY; -#ifndef NDEBUG - DorisMetrics::instance()->switch_bthread_count->increment(1); -#endif // NDEBUG -} - } // namespace doris diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 46b2e86d44cff9..e5abb4cfae61e4 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -33,16 +33,12 @@ // Add thread mem tracker consumer during query execution. #define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) \ auto VARNAME_LINENUM(add_mem_consumer) = doris::AddThreadMemTrackerConsumer(mem_tracker) - // Attach to task when thread starts #define SCOPED_ATTACH_TASK(arg1, ...) \ auto VARNAME_LINENUM(attach_task) = AttachTask(arg1, ##__VA_ARGS__) - -#define SCOPED_SWITCH_BTHREAD_TLS() auto VARNAME_LINENUM(switch_bthread) = SwitchBthread() #else #define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) (void)0 #define SCOPED_ATTACH_TASK(arg1, ...) (void)0 -#define SCOPED_SWITCH_BTHREAD_TLS() (void)0 #endif namespace doris { @@ -81,7 +77,7 @@ class ThreadContextPtr { // Cannot add destructor `~ThreadContextPtr`, otherwise it will no longer be of type POD, the reason is as above. // TCMalloc hook is triggered during ThreadContext construction, which may lead to deadlock. - bool _init = false; + bool init = false; DECLARE_STATIC_THREAD_LOCAL(ThreadContext, _ptr); }; @@ -91,7 +87,7 @@ inline thread_local ThreadContextPtr thread_context_ptr; // To avoid performance problems caused by frequently calling `bthread_getspecific` to obtain bthread TLS // in tcmalloc hook, cache the key and value of bthread TLS in pthread TLS. inline thread_local ThreadContext* bthread_context; -inline thread_local bthread_key_t bthread_context_key; +inline thread_local bthread_t bthread_id; // The thread context saves some info about a working thread. // 2 required info: @@ -122,18 +118,19 @@ class ThreadContext { } ~ThreadContext() { - // Restore to the memory state before _init=true to ensure accurate overall memory statistics. + // Restore to the memory state before init=true to ensure accurate overall memory statistics. // Thereby ensuring that the memory alloc size is not tracked during the initialization of the - // ThreadContext before `_init = true in ThreadContextPtr()`, + // ThreadContext before `init = true in ThreadContextPtr()`, // Equal to the size of the memory release that is not tracked during the destruction of the - // ThreadContext after `_init = false in ~ThreadContextPtr()`, + // ThreadContext after `init = false in ~ThreadContextPtr()`, + _thread_mem_tracker_mgr->clear(); init(); - thread_context_ptr._init = false; + thread_context_ptr.init = false; } void init() { _type = TaskType::UNKNOWN; - _thread_mem_tracker_mgr->init(); + if (ExecEnv::GetInstance()->initialized()) _thread_mem_tracker_mgr->init(); _thread_id = get_thread_id(); } @@ -192,17 +189,31 @@ class ThreadContext { TUniqueId _fragment_instance_id; }; -static void update_bthread_context() { - if (btls_key != bthread_context_key) { - // pthread switch occurs, updating bthread_context and bthread_context_key cached in pthread tls. - bthread_context = static_cast(bthread_getspecific(btls_key)); - bthread_context_key = btls_key; +static void attach_bthread() { + bthread_id = bthread_self(); + bthread_context = static_cast(bthread_getspecific(btls_key)); + // First call to bthread_getspecific (and before any bthread_setspecific) returns NULL + if (bthread_context == nullptr) { + DCHECK(ExecEnv::GetInstance()->initialized()); + // Create thread-local data on demand. + bthread_context = new ThreadContext; + // set the data so that next time bthread_getspecific in the thread returns the data. + CHECK_EQ(0, bthread_setspecific(btls_key, bthread_context)); + } else { + bthread_context->detach_task(); } + bthread_context->attach_task(ThreadContext::TaskType::BRPC, "", TUniqueId(), + ExecEnv::GetInstance()->bthread_mem_tracker()); } static ThreadContext* thread_context() { - if (btls_key != EMPTY_BTLS_KEY && bthread_context != nullptr) { - update_bthread_context(); + if (bthread_self() != 0) { + if (bthread_self() != bthread_id) { + // pthread switch occurs, updating bthread_context and bthread_context_key cached in pthread tls. + thread_context_ptr.init = false; + attach_bthread(); + thread_context_ptr.init = true; + } return bthread_context; } else { return thread_context_ptr._ptr; @@ -228,16 +239,6 @@ class AddThreadMemTrackerConsumer { ~AddThreadMemTrackerConsumer(); }; -class SwitchBthread { -public: - explicit SwitchBthread(); - - ~SwitchBthread(); - -private: - ThreadContext* _bthread_context; -}; - class StopCheckThreadMemTrackerLimit { public: explicit StopCheckThreadMemTrackerLimit() { @@ -273,30 +274,22 @@ class StopCheckThreadMemTrackerLimit { ->_thread_mem_tracker_mgr->last_consumer_tracker(), \ msg), \ ##__VA_ARGS__); - // Mem Hook to consume thread mem tracker -#define MEM_MALLOC_HOOK(size) \ - do { \ - if (doris::btls_key != doris::EMPTY_BTLS_KEY && doris::bthread_context != nullptr) { \ - doris::update_bthread_context(); \ - doris::bthread_context->_thread_mem_tracker_mgr->consume(size); \ - } else if (LIKELY(doris::thread_context_ptr._init)) { \ - doris::thread_context_ptr._ptr->_thread_mem_tracker_mgr->consume(size); \ - } else { \ - doris::ThreadMemTrackerMgr::consume_no_attach(size); \ - } \ +#define MEM_MALLOC_HOOK(size) \ + do { \ + if (doris::thread_context_ptr.init) { \ + doris::thread_context()->_thread_mem_tracker_mgr->consume(size); \ + } else { \ + doris::ThreadMemTrackerMgr::consume_no_attach(size); \ + } \ } while (0) - -#define MEM_FREE_HOOK(size) \ - do { \ - if (doris::btls_key != doris::EMPTY_BTLS_KEY && doris::bthread_context != nullptr) { \ - doris::update_bthread_context(); \ - doris::bthread_context->_thread_mem_tracker_mgr->consume(-size); \ - } else if (doris::thread_context_ptr._init) { \ - doris::thread_context_ptr._ptr->_thread_mem_tracker_mgr->consume(-size); \ - } else { \ - doris::ThreadMemTrackerMgr::consume_no_attach(-size); \ - } \ +#define MEM_FREE_HOOK(size) \ + do { \ + if (doris::thread_context_ptr.init) { \ + doris::thread_context()->_thread_mem_tracker_mgr->consume(-size); \ + } else { \ + doris::ThreadMemTrackerMgr::consume_no_attach(-size); \ + } \ } while (0) #else #define STOP_CHECK_THREAD_MEM_TRACKER_LIMIT() (void)0 diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 87f89f48e00691..bfbffc8b340461 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -101,7 +101,6 @@ void PInternalServiceImpl::transmit_data(google::protobuf::RpcController* cntl_b const PTransmitDataParams* request, PTransmitDataResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); // TODO(zxy) delete in 1.2 version brpc::Controller* cntl = static_cast(cntl_base); attachment_transfer_request_row_batch(request, cntl); @@ -113,7 +112,6 @@ void PInternalServiceImpl::transmit_data_by_http(google::protobuf::RpcController const PEmptyRequest* request, PTransmitDataResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); PTransmitDataParams* request_raw = new PTransmitDataParams(); google::protobuf::Closure* done_raw = new NewHttpClosure(request_raw, done); @@ -169,7 +167,6 @@ void PInternalServiceImpl::tablet_writer_open(google::protobuf::RpcController* c const PTabletWriterOpenRequest* request, PTabletWriterOpenResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); VLOG_RPC << "tablet writer open, id=" << request->id() << ", index_id=" << request->index_id() << ", txn_id=" << request->txn_id(); brpc::ClosureGuard closure_guard(done); @@ -188,7 +185,6 @@ void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c google::protobuf::Closure* done) { auto span = telemetry::start_rpc_server_span("exec_plan_fragment", cntl_base); auto scope = OpentelemetryScope {span}; - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); auto st = Status::OK(); bool compact = request->has_compact() ? request->compact() : false; @@ -214,7 +210,6 @@ void PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcControl google::protobuf::Closure* done) { auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", controller); auto scope = OpentelemetryScope {span}; - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); auto st = _exec_env->fragment_mgr()->start_query_execution(request); st.to_protobuf(result->mutable_status()); @@ -224,7 +219,6 @@ void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcControll const PTabletWriterAddBlockRequest* request, PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); // TODO(zxy) delete in 1.2 version brpc::Controller* cntl = static_cast(cntl_base); attachment_transfer_request_block(request, cntl); @@ -235,7 +229,6 @@ void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcControll void PInternalServiceImpl::tablet_writer_add_block_by_http( google::protobuf::RpcController* cntl_base, const ::doris::PEmptyRequest* request, PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); PTabletWriterAddBlockRequest* request_raw = new PTabletWriterAddBlockRequest(); google::protobuf::Closure* done_raw = new NewHttpClosure(request_raw, done); @@ -263,7 +256,6 @@ void PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcControl int64_t execution_time_ns = 0; { SCOPED_RAW_TIMER(&execution_time_ns); - SCOPED_SWITCH_BTHREAD_TLS(); auto st = _exec_env->load_channel_mgr()->add_batch(*request, response); if (!st.ok()) { @@ -283,14 +275,12 @@ void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcControll const PTabletWriterAddBatchRequest* request, PTabletWriterAddBatchResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); _tablet_writer_add_batch(cntl_base, request, response, done); } void PInternalServiceImpl::tablet_writer_add_batch_by_http( google::protobuf::RpcController* cntl_base, const ::doris::PEmptyRequest* request, PTabletWriterAddBatchResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); PTabletWriterAddBatchRequest* request_raw = new PTabletWriterAddBatchRequest(); google::protobuf::Closure* done_raw = new NewHttpClosure(request_raw, done); @@ -321,7 +311,6 @@ void PInternalServiceImpl::_tablet_writer_add_batch(google::protobuf::RpcControl int64_t execution_time_ns = 0; { SCOPED_RAW_TIMER(&execution_time_ns); - SCOPED_SWITCH_BTHREAD_TLS(); // TODO(zxy) delete in 1.2 version brpc::Controller* cntl = static_cast(cntl_base); @@ -345,7 +334,6 @@ void PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController* const PTabletWriterCancelRequest* request, PTabletWriterCancelResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); VLOG_RPC << "tablet writer cancel, id=" << request->id() << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id(); brpc::ClosureGuard closure_guard(done); @@ -391,7 +379,6 @@ void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* google::protobuf::Closure* done) { auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", cntl_base); auto scope = OpentelemetryScope {span}; - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); TUniqueId tid; tid.__set_hi(request->finst_id().hi()); @@ -415,7 +402,6 @@ void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* cntl_base, const PFetchDataRequest* request, PFetchDataResult* result, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::Controller* cntl = static_cast(cntl_base); GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done); _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx); @@ -424,7 +410,6 @@ void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* cntl_base void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller, const PProxyRequest* request, PProxyResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); // PProxyRequest is defined in gensrc/proto/internal_service.proto // Currently it supports 2 kinds of requests: @@ -485,7 +470,6 @@ void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller, void PInternalServiceImpl::update_cache(google::protobuf::RpcController* controller, const PUpdateCacheRequest* request, PCacheResponse* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); _exec_env->result_cache()->update(request, response); } @@ -493,7 +477,6 @@ void PInternalServiceImpl::update_cache(google::protobuf::RpcController* control void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* controller, const PFetchCacheRequest* request, PFetchCacheResult* result, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); _exec_env->result_cache()->fetch(request, result); } @@ -501,7 +484,6 @@ void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* controll void PInternalServiceImpl::clear_cache(google::protobuf::RpcController* controller, const PClearCacheRequest* request, PCacheResponse* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); _exec_env->result_cache()->clear(request, response); } @@ -510,7 +492,6 @@ void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController* contr const ::doris::PMergeFilterRequest* request, ::doris::PMergeFilterResponse* response, ::google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); auto buf = static_cast(controller)->request_attachment(); Status st = _exec_env->fragment_mgr()->merge_filter(request, buf.to_string().data()); @@ -524,7 +505,6 @@ void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* contr const ::doris::PPublishFilterRequest* request, ::doris::PPublishFilterResponse* response, ::google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); auto attachment = static_cast(controller)->request_attachment(); UniqueId unique_id(request->query_id()); @@ -540,7 +520,6 @@ void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* contr void PInternalServiceImpl::send_data(google::protobuf::RpcController* controller, const PSendDataRequest* request, PSendDataResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); TUniqueId fragment_instance_id; fragment_instance_id.hi = request->fragment_instance_id().hi(); @@ -563,7 +542,6 @@ void PInternalServiceImpl::send_data(google::protobuf::RpcController* controller void PInternalServiceImpl::commit(google::protobuf::RpcController* controller, const PCommitRequest* request, PCommitResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); TUniqueId fragment_instance_id; fragment_instance_id.hi = request->fragment_instance_id().hi(); @@ -581,7 +559,6 @@ void PInternalServiceImpl::commit(google::protobuf::RpcController* controller, void PInternalServiceImpl::rollback(google::protobuf::RpcController* controller, const PRollbackRequest* request, PRollbackResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); TUniqueId fragment_instance_id; fragment_instance_id.hi = request->fragment_instance_id().hi(); @@ -600,7 +577,6 @@ void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController* c const PConstantExprRequest* request, PConstantExprResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); brpc::Controller* cntl = static_cast(cntl_base); @@ -635,7 +611,6 @@ void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* cntl_ const PTransmitDataParams* request, PTransmitDataResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); // TODO(zxy) delete in 1.2 version brpc::Controller* cntl = static_cast(cntl_base); attachment_transfer_request_block(request, cntl); @@ -647,7 +622,6 @@ void PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcControlle const PEmptyRequest* request, PTransmitDataResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); PTransmitDataParams* request_raw = new PTransmitDataParams(); google::protobuf::Closure* done_raw = new NewHttpClosure(request_raw, done); @@ -672,13 +646,15 @@ void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* cntl transmit_tracker = std::make_shared( -1, fmt::format("QueryTransmit#queryId={}", query_id), _exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id)); + transmit_tracker = + _exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id); } else { query_id = "unkown_transmit_block"; transmit_tracker = std::make_shared(-1, "unkown_transmit_block"); } SCOPED_ATTACH_TASK(transmit_tracker, ThreadContext::TaskType::QUERY, query_id, finst_id); - VLOG_ROW << "transmit block: fragment_instance_id=" << print_id(request->finst_id()) - << " query_id=" << query_id << " node=" << request->node_id(); + // LOG(WARNING) << "transmit block 1111: fragment_instance_id=" << print_id(request->finst_id()) + // << " query_id=" << query_id << " node=" << request->node_id() << ", " << transmit_tracker->label(); // The response is accessed when done->Run is called in transmit_block(), // give response a default value to avoid null pointers in high concurrency. Status st; @@ -703,7 +679,6 @@ void PInternalServiceImpl::check_rpc_channel(google::protobuf::RpcController* co const PCheckRPCChannelRequest* request, PCheckRPCChannelResponse* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); response->mutable_status()->set_status_code(0); if (request->data().size() != request->size()) { @@ -730,7 +705,6 @@ void PInternalServiceImpl::reset_rpc_channel(google::protobuf::RpcController* co const PResetRPCChannelRequest* request, PResetRPCChannelResponse* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); response->mutable_status()->set_status_code(0); if (request->all()) { @@ -764,7 +738,6 @@ void PInternalServiceImpl::hand_shake(google::protobuf::RpcController* cntl_base const PHandShakeRequest* request, PHandShakeResponse* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); if (request->has_hello()) { response->set_hello(request->hello()); From 8d1530a4e506fbdd982ff4672f93dab3c5b3ac4e Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Fri, 14 Oct 2022 15:24:02 +0800 Subject: [PATCH 2/8] fix ut --- be/src/runtime/exec_env.h | 4 +++- be/src/runtime/memory/thread_mem_tracker_mgr.h | 13 +++++++++++-- be/src/runtime/thread_context.h | 5 +++-- be/src/service/internal_service.cpp | 4 ++-- be/test/testutil/run_all_tests.cpp | 6 ++++-- 5 files changed, 23 insertions(+), 9 deletions(-) diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index f616f949200937..ac9bff86df5812 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -119,10 +119,12 @@ class ExecEnv { std::shared_ptr process_mem_tracker() { return _process_mem_tracker; } void set_global_mem_tracker(const std::shared_ptr& process_tracker, - const std::shared_ptr& orphan_tracker) { + const std::shared_ptr& orphan_tracker, + const std::shared_ptr& bthread_mem_tracker) { _process_mem_tracker = process_tracker; _orphan_mem_tracker = orphan_tracker; _orphan_mem_tracker_raw = orphan_tracker.get(); + _bthread_mem_tracker = bthread_mem_tracker; } std::shared_ptr allocator_cache_mem_tracker() { return _allocator_cache_mem_tracker; diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index beaba558f99698..69e70f014ec38c 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -45,8 +45,10 @@ class ThreadMemTrackerMgr { ~ThreadMemTrackerMgr() { flush_untracked_mem(); - DCHECK(_consumer_tracker_stack.empty()); - DCHECK(_limiter_tracker_stack.size() == 1); + if (bthread_self() == 0) { + DCHECK(_consumer_tracker_stack.empty()); + DCHECK(_limiter_tracker_stack.size() == 1) << ", limiter_tracker_stack.size(): " << _limiter_tracker_stack.size() ; + } } // only for tcmalloc hook @@ -59,6 +61,7 @@ class ThreadMemTrackerMgr { // After thread initialization, calling `init` again must call `clear_untracked_mems` first // to avoid memory tracking loss. void init(); + void init_impl(); void clear(); // After attach, the current thread TCMalloc Hook starts to consume/release task mem_tracker @@ -144,7 +147,12 @@ class ThreadMemTrackerMgr { inline void ThreadMemTrackerMgr::init() { DCHECK(_limiter_tracker_stack.size() == 0); + DCHECK(_limiter_tracker_raw == nullptr); DCHECK(_consumer_tracker_stack.empty()); + init_impl(); +} + +inline void ThreadMemTrackerMgr::init_impl() { _limiter_tracker_stack.push_back(ExecEnv::GetInstance()->orphan_mem_tracker()); _limiter_tracker_raw = ExecEnv::GetInstance()->orphan_mem_tracker_raw(); _task_id_stack.push_back(""); @@ -159,6 +167,7 @@ inline void ThreadMemTrackerMgr::clear() { std::vector().swap(_task_id_stack); std::vector().swap(_fragment_instance_id_stack); flush_untracked_mem(); + init_impl(); } inline void ThreadMemTrackerMgr::push_consumer_tracker(MemTracker* tracker) { diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index e5abb4cfae61e4..de7be77b4a8f8d 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -123,8 +123,7 @@ class ThreadContext { // ThreadContext before `init = true in ThreadContextPtr()`, // Equal to the size of the memory release that is not tracked during the destruction of the // ThreadContext after `init = false in ~ThreadContextPtr()`, - _thread_mem_tracker_mgr->clear(); - init(); + if (ExecEnv::GetInstance()->initialized()) _thread_mem_tracker_mgr->clear(); thread_context_ptr.init = false; } @@ -194,7 +193,9 @@ static void attach_bthread() { bthread_context = static_cast(bthread_getspecific(btls_key)); // First call to bthread_getspecific (and before any bthread_setspecific) returns NULL if (bthread_context == nullptr) { +#ifndef BE_TEST DCHECK(ExecEnv::GetInstance()->initialized()); +#endif // BE_TEST // Create thread-local data on demand. bthread_context = new ThreadContext; // set the data so that next time bthread_getspecific in the thread returns the data. diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index bfbffc8b340461..47bc7b8a9810d9 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -653,8 +653,8 @@ void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* cntl transmit_tracker = std::make_shared(-1, "unkown_transmit_block"); } SCOPED_ATTACH_TASK(transmit_tracker, ThreadContext::TaskType::QUERY, query_id, finst_id); - // LOG(WARNING) << "transmit block 1111: fragment_instance_id=" << print_id(request->finst_id()) - // << " query_id=" << query_id << " node=" << request->node_id() << ", " << transmit_tracker->label(); + VLOG_ROW << "transmit block: fragment_instance_id=" << print_id(request->finst_id()) + << " query_id=" << query_id << " node=" << request->node_id(); // The response is accessed when done->Run is called in transmit_block(), // give response a default value to avoid null pointers in high concurrency. Status st; diff --git a/be/test/testutil/run_all_tests.cpp b/be/test/testutil/run_all_tests.cpp index ea518722113ae4..e75c3d483f8934 100644 --- a/be/test/testutil/run_all_tests.cpp +++ b/be/test/testutil/run_all_tests.cpp @@ -30,9 +30,11 @@ int main(int argc, char** argv) { std::shared_ptr process_mem_tracker = std::make_shared(-1, "Process"); - std::shared_ptr _orphan_mem_tracker = + std::shared_ptr orphan_mem_tracker = std::make_shared(-1, "Orphan", process_mem_tracker); - doris::ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, _orphan_mem_tracker); + std::shared_ptr bthread_mem_tracker = + std::make_shared(-1, "Bthread", orphan_mem_tracker); + doris::ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, orphan_mem_tracker, bthread_mem_tracker); doris::thread_context()->_thread_mem_tracker_mgr->init(); doris::TabletSchemaCache::create_global_schema_cache(); doris::StoragePageCache::create_global_cache(1 << 30, 10); From 0962fa438b9c6e66886a95acb2113ff961e38d4e Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Fri, 14 Oct 2022 16:19:00 +0800 Subject: [PATCH 3/8] fix format --- be/src/runtime/memory/thread_mem_tracker_mgr.h | 3 ++- be/src/runtime/thread_context.h | 2 +- be/test/testutil/run_all_tests.cpp | 3 ++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 69e70f014ec38c..74060786f59b71 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -47,7 +47,8 @@ class ThreadMemTrackerMgr { flush_untracked_mem(); if (bthread_self() == 0) { DCHECK(_consumer_tracker_stack.empty()); - DCHECK(_limiter_tracker_stack.size() == 1) << ", limiter_tracker_stack.size(): " << _limiter_tracker_stack.size() ; + DCHECK(_limiter_tracker_stack.size() == 1) + << ", limiter_tracker_stack.size(): " << _limiter_tracker_stack.size(); } } diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index de7be77b4a8f8d..9100666837b112 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -195,7 +195,7 @@ static void attach_bthread() { if (bthread_context == nullptr) { #ifndef BE_TEST DCHECK(ExecEnv::GetInstance()->initialized()); -#endif // BE_TEST +#endif // BE_TEST \ // Create thread-local data on demand. bthread_context = new ThreadContext; // set the data so that next time bthread_getspecific in the thread returns the data. diff --git a/be/test/testutil/run_all_tests.cpp b/be/test/testutil/run_all_tests.cpp index e75c3d483f8934..ffe239cc0f9368 100644 --- a/be/test/testutil/run_all_tests.cpp +++ b/be/test/testutil/run_all_tests.cpp @@ -34,7 +34,8 @@ int main(int argc, char** argv) { std::make_shared(-1, "Orphan", process_mem_tracker); std::shared_ptr bthread_mem_tracker = std::make_shared(-1, "Bthread", orphan_mem_tracker); - doris::ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, orphan_mem_tracker, bthread_mem_tracker); + doris::ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, orphan_mem_tracker, + bthread_mem_tracker); doris::thread_context()->_thread_mem_tracker_mgr->init(); doris::TabletSchemaCache::create_global_schema_cache(); doris::StoragePageCache::create_global_cache(1 << 30, 10); From 300a171e74aaf0b25c7513a74399e0b502db112c Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Fri, 14 Oct 2022 20:29:19 +0800 Subject: [PATCH 4/8] fix --- be/src/runtime/thread_context.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 9100666837b112..844a45fe5ed693 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -195,7 +195,7 @@ static void attach_bthread() { if (bthread_context == nullptr) { #ifndef BE_TEST DCHECK(ExecEnv::GetInstance()->initialized()); -#endif // BE_TEST \ +#endif // Create thread-local data on demand. bthread_context = new ThreadContext; // set the data so that next time bthread_getspecific in the thread returns the data. From fbc05b445131503225a1db355244eb1dde07c4dc Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Fri, 14 Oct 2022 22:14:27 +0800 Subject: [PATCH 5/8] fix format --- be/src/runtime/thread_context.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 844a45fe5ed693..03d453b2b321ef 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -201,7 +201,7 @@ static void attach_bthread() { // set the data so that next time bthread_getspecific in the thread returns the data. CHECK_EQ(0, bthread_setspecific(btls_key, bthread_context)); } else { - bthread_context->detach_task(); + bthread_context->_thread_mem_tracker_mgr->clear(); } bthread_context->attach_task(ThreadContext::TaskType::BRPC, "", TUniqueId(), ExecEnv::GetInstance()->bthread_mem_tracker()); From a2e49c02b71c1f6bffc90856c6d7b3fb0eed49ab Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Mon, 17 Oct 2022 09:42:03 +0800 Subject: [PATCH 6/8] fix --- be/src/runtime/memory/mem_tracker_limiter.cpp | 4 +--- be/src/runtime/memory/mem_tracker_limiter.h | 6 ++++++ .../runtime/memory/thread_mem_tracker_mgr.h | 6 ++++-- be/src/runtime/thread_context.h | 19 ++++++++++++++----- be/src/service/internal_service.cpp | 2 -- 5 files changed, 25 insertions(+), 12 deletions(-) diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 9d9594a5814966..20639326f8593e 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -83,9 +83,7 @@ MemTrackerLimiter::~MemTrackerLimiter() { _consumption->current_value()); } if (_reset_zero) { - ExecEnv::GetInstance()->orphan_mem_tracker_raw()->cache_consume_local( - _consumption->current_value()); - cache_consume_local(-_consumption->current_value()); + reset_zero(); _all_ancestors.clear(); _all_ancestors.push_back(ExecEnv::GetInstance()->orphan_mem_tracker_raw()); } diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 7b072e9b9808f0..9a40e79b662e53 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -129,6 +129,12 @@ class MemTrackerLimiter final : public MemTracker { void enable_print_log_usage() { _print_log_usage = true; } void enable_reset_zero() { _reset_zero = true; } + void reset_zero() { + ExecEnv::GetInstance()->orphan_mem_tracker_raw()->cache_consume_local( + _consumption->current_value()); + cache_consume_local(-_consumption->current_value()); + } + // Logs the usage of this tracker limiter and optionally its children (recursively). // If 'logged_consumption' is non-nullptr, sets the consumption value logged. // 'max_recursive_depth' specifies the maximum number of levels of children diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 74060786f59b71..fbd1b424919bc2 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -68,8 +68,10 @@ class ThreadMemTrackerMgr { // After attach, the current thread TCMalloc Hook starts to consume/release task mem_tracker void attach_limiter_tracker(const std::string& task_id, const TUniqueId& fragment_instance_id, const std::shared_ptr& mem_tracker); - void detach_limiter_tracker(); + // Usually there are only two layers, the first is the default trackerOrphan; + // the second is the query tracker or bthread tracker. + int64_t get_attach_layers() { return _limiter_tracker_stack.size(); } // Must be fast enough! Thread update_tracker may be called very frequently. // So for performance, add tracker as early as possible, and then call update_tracker. @@ -163,11 +165,11 @@ inline void ThreadMemTrackerMgr::init_impl() { } inline void ThreadMemTrackerMgr::clear() { + flush_untracked_mem(); std::vector>().swap(_limiter_tracker_stack); std::vector().swap(_consumer_tracker_stack); std::vector().swap(_task_id_stack); std::vector().swap(_fragment_instance_id_stack); - flush_untracked_mem(); init_impl(); } diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 03d453b2b321ef..e15fd8f924355f 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -191,26 +191,35 @@ class ThreadContext { static void attach_bthread() { bthread_id = bthread_self(); bthread_context = static_cast(bthread_getspecific(btls_key)); - // First call to bthread_getspecific (and before any bthread_setspecific) returns NULL if (bthread_context == nullptr) { + // A new bthread starts, two scenarios: + // 1. First call to bthread_getspecific (and before any bthread_setspecific) returns NULL + // 2. There are not enough reusable btls in btls pool. #ifndef BE_TEST DCHECK(ExecEnv::GetInstance()->initialized()); #endif // Create thread-local data on demand. bthread_context = new ThreadContext; + std::shared_ptr btls_tracker = + std::make_shared(-1, "Bthread:id=" + std::to_string(bthread_id), + ExecEnv::GetInstance()->bthread_mem_tracker()); + bthread_context->attach_task(ThreadContext::TaskType::BRPC, "", TUniqueId(), btls_tracker); // set the data so that next time bthread_getspecific in the thread returns the data. CHECK_EQ(0, bthread_setspecific(btls_key, bthread_context)); } else { - bthread_context->_thread_mem_tracker_mgr->clear(); + // two scenarios: + // 1. A new bthread starts, but get a reuses btls. + // 2. A pthread switch occurs. Because the pthread switch cannot be accurately identified at the moment. + // So tracker call reset 0 like reuses btls. + DCHECK(bthread_context->_thread_mem_tracker_mgr->get_attach_layers() == 2); + bthread_context->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->reset_zero(); } - bthread_context->attach_task(ThreadContext::TaskType::BRPC, "", TUniqueId(), - ExecEnv::GetInstance()->bthread_mem_tracker()); } static ThreadContext* thread_context() { if (bthread_self() != 0) { if (bthread_self() != bthread_id) { - // pthread switch occurs, updating bthread_context and bthread_context_key cached in pthread tls. + // A new bthread starts or pthread switch occurs. thread_context_ptr.init = false; attach_bthread(); thread_context_ptr.init = true; diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 47bc7b8a9810d9..d7f1834843c123 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -646,8 +646,6 @@ void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* cntl transmit_tracker = std::make_shared( -1, fmt::format("QueryTransmit#queryId={}", query_id), _exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id)); - transmit_tracker = - _exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id); } else { query_id = "unkown_transmit_block"; transmit_tracker = std::make_shared(-1, "unkown_transmit_block"); From ee59a5e05292a4875a73b26f7faffe987a8e3012 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Mon, 17 Oct 2022 13:46:34 +0800 Subject: [PATCH 7/8] fix tracker --- be/src/runtime/memory/thread_mem_tracker_mgr.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index fbd1b424919bc2..9ecbd70a27e51d 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -44,7 +44,8 @@ class ThreadMemTrackerMgr { ThreadMemTrackerMgr() {} ~ThreadMemTrackerMgr() { - flush_untracked_mem(); + // exec env is not initialized when init(). and never consumed mem tracker once. + if (_limiter_tracker_raw != nullptr) flush_untracked_mem(); if (bthread_self() == 0) { DCHECK(_consumer_tracker_stack.empty()); DCHECK(_limiter_tracker_stack.size() == 1) From 9c615890c3026fa11eafd8bb940da0936a9d494d Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Mon, 17 Oct 2022 15:33:55 +0800 Subject: [PATCH 8/8] fix2 --- .../runtime/memory/thread_mem_tracker_mgr.h | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 9ecbd70a27e51d..0b56911a77bff9 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -44,12 +44,14 @@ class ThreadMemTrackerMgr { ThreadMemTrackerMgr() {} ~ThreadMemTrackerMgr() { - // exec env is not initialized when init(). and never consumed mem tracker once. - if (_limiter_tracker_raw != nullptr) flush_untracked_mem(); - if (bthread_self() == 0) { - DCHECK(_consumer_tracker_stack.empty()); - DCHECK(_limiter_tracker_stack.size() == 1) - << ", limiter_tracker_stack.size(): " << _limiter_tracker_stack.size(); + // if _init == false, exec env is not initialized when init(). and never consumed mem tracker once. + if (_init) { + flush_untracked_mem(); + if (bthread_self() == 0) { + DCHECK(_consumer_tracker_stack.empty()); + DCHECK(_limiter_tracker_stack.size() == 1) + << ", limiter_tracker_stack.size(): " << _limiter_tracker_stack.size(); + } } } @@ -95,11 +97,11 @@ class ThreadMemTrackerMgr { bool is_attach_query() { return _fragment_instance_id_stack.back() != TUniqueId(); } std::shared_ptr limiter_mem_tracker() { - if (_init == false) init(); + if (!_init) init(); return _limiter_tracker_stack.back(); } MemTrackerLimiter* limiter_mem_tracker_raw() { - if (_init == false) init(); + if (!_init) init(); return _limiter_tracker_raw; } @@ -210,7 +212,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() { // Temporary memory may be allocated during the consumption of the mem tracker, which will lead to entering // the TCMalloc Hook again, so suspend consumption to avoid falling into an infinite loop. _stop_consume = true; - if (_init == false) init(); + if (!_init) init(); DCHECK(_limiter_tracker_raw); old_untracked_mem = _untracked_mem; if (CheckLimit) {