diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 4016506fe0b309..03603c4624c3f2 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -99,6 +99,7 @@ class ExecEnv { // declarations for classes in scoped_ptrs. ~ExecEnv(); + const bool initialized() { return _is_init; } const std::string& token() const; ExternalScanContextMgr* external_scan_context_mgr() { return _external_scan_context_mgr; } DataStreamMgr* stream_mgr() { return _stream_mgr; } @@ -123,16 +124,19 @@ class ExecEnv { std::shared_ptr new_process_mem_tracker() { return _process_mem_tracker; } void set_global_mem_tracker(const std::shared_ptr& process_tracker, - const std::shared_ptr& orphan_tracker) { + const std::shared_ptr& orphan_tracker, + const std::shared_ptr& bthread_mem_tracker) { _process_mem_tracker = process_tracker; _orphan_mem_tracker = orphan_tracker; _orphan_mem_tracker_raw = orphan_tracker.get(); + _bthread_mem_tracker = bthread_mem_tracker; } std::shared_ptr allocator_cache_mem_tracker() { return _allocator_cache_mem_tracker; } std::shared_ptr orphan_mem_tracker() { return _orphan_mem_tracker; } MemTrackerLimiter* orphan_mem_tracker_raw() { return _orphan_mem_tracker_raw; } + std::shared_ptr bthread_mem_tracker() { return _bthread_mem_tracker; } std::shared_ptr query_pool_mem_tracker() { return _query_pool_mem_tracker; } std::shared_ptr load_pool_mem_tracker() { return _load_pool_mem_tracker; } MemTrackerTaskPool* task_pool_mem_tracker_registry() { return _task_pool_mem_tracker_registry; } @@ -218,6 +222,8 @@ class ExecEnv { // and the consumption of the orphan mem tracker is close to 0, but greater than 0. std::shared_ptr _orphan_mem_tracker; MemTrackerLimiter* _orphan_mem_tracker_raw; + // Bthread default mem tracker + std::shared_ptr _bthread_mem_tracker; // The ancestor for all querys tracker. std::shared_ptr _query_pool_mem_tracker; // The ancestor for all load tracker. diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 68518e905b1093..8cb431ed51f5d3 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -207,6 +207,7 @@ Status ExecEnv::_init_mem_tracker() { std::make_shared(global_memory_limit_bytes, "Process"); _orphan_mem_tracker = std::make_shared(-1, "Orphan", _process_mem_tracker); _orphan_mem_tracker_raw = _orphan_mem_tracker.get(); + _bthread_mem_tracker = std::make_shared(-1, "Bthread", _orphan_mem_tracker); thread_context()->_thread_mem_tracker_mgr->init(); thread_context()->_thread_mem_tracker_mgr->set_check_attach(false); #if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && \ diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 9b4eedea65d9fd..48a2102e22b35e 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -74,7 +74,7 @@ MemTrackerLimiter::MemTrackerLimiter(int64_t byte_limit, const std::string& labe MemTrackerLimiter::~MemTrackerLimiter() { // TCMalloc hook will be triggered during destructor memtracker, may cause crash. #ifndef BE_TEST - if (_label == "Process") doris::thread_context_ptr._init = false; + if (_label == "Process") doris::thread_context_ptr.init = false; #endif DCHECK(remain_child_count() == 0 || _label == "Process"); // In order to ensure `consumption of all limiter trackers` + `orphan tracker consumption` = `process tracker consumption` @@ -84,9 +84,7 @@ MemTrackerLimiter::~MemTrackerLimiter() { _consumption->current_value()); } if (_reset_zero) { - ExecEnv::GetInstance()->orphan_mem_tracker_raw()->cache_consume_local( - _consumption->current_value()); - cache_consume_local(-_consumption->current_value()); + reset_zero(); _all_ancestors.clear(); _all_ancestors.push_back(ExecEnv::GetInstance()->orphan_mem_tracker_raw()); } diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index df3d2714941899..66bd42d8792cfc 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -132,6 +132,12 @@ class MemTrackerLimiter final : public NewMemTracker { void enable_print_log_usage() { _print_log_usage = true; } void enable_reset_zero() { _reset_zero = true; } + void reset_zero() { + ExecEnv::GetInstance()->orphan_mem_tracker_raw()->cache_consume_local( + _consumption->current_value()); + cache_consume_local(-_consumption->current_value()); + } + // Logs the usage of this tracker limiter and optionally its children (recursively). // If 'logged_consumption' is non-nullptr, sets the consumption value logged. // 'max_recursive_depth' specifies the maximum number of levels of children diff --git a/be/src/runtime/memory/tcmalloc_hook.h b/be/src/runtime/memory/tcmalloc_hook.h index 627f42795d4031..6ec9352ad312df 100644 --- a/be/src/runtime/memory/tcmalloc_hook.h +++ b/be/src/runtime/memory/tcmalloc_hook.h @@ -36,28 +36,11 @@ // destructor to control the behavior of consume can lead to unexpected behavior, // like this: if (LIKELY(doris::start_thread_mem_tracker)) { void new_hook(const void* ptr, size_t size) { - if (doris::btls_key != doris::EMPTY_BTLS_KEY && doris::bthread_context != nullptr) { - // Currently in bthread, consume thread context mem tracker in bthread tls. - doris::update_bthread_context(); - doris::bthread_context->_thread_mem_tracker_mgr->consume(tc_nallocx(size, 0)); - } else if (doris::thread_context_ptr._init) { - doris::thread_context_ptr._ptr->_thread_mem_tracker_mgr->consume(tc_nallocx(size, 0)); - } else { - doris::ThreadMemTrackerMgr::consume_no_attach(tc_nallocx(size, 0)); - } + MEM_MALLOC_HOOK(tc_nallocx(size, 0)); } void delete_hook(const void* ptr) { - if (doris::btls_key != doris::EMPTY_BTLS_KEY && doris::bthread_context != nullptr) { - doris::update_bthread_context(); - doris::bthread_context->_thread_mem_tracker_mgr->consume( - -tc_malloc_size(const_cast(ptr))); - } else if (doris::thread_context_ptr._init) { - doris::thread_context_ptr._ptr->_thread_mem_tracker_mgr->consume( - -tc_malloc_size(const_cast(ptr))); - } else { - doris::ThreadMemTrackerMgr::consume_no_attach(-tc_malloc_size(const_cast(ptr))); - } + MEM_FREE_HOOK(tc_malloc_size(const_cast(ptr))); } void init_hook() { diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 5b2c4a5e0c65b3..d0e4e6534ab29b 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -44,25 +44,37 @@ class ThreadMemTrackerMgr { ThreadMemTrackerMgr() {} ~ThreadMemTrackerMgr() { - flush_untracked_mem(); - DCHECK(_consumer_tracker_stack.empty()); - DCHECK(_limiter_tracker_stack.size() == 1); + // if _init == false, exec env is not initialized when init(). and never consumed mem tracker once. + if (_init) { + flush_untracked_mem(); + if (bthread_self() == 0) { + DCHECK(_consumer_tracker_stack.empty()); + DCHECK(_limiter_tracker_stack.size() == 1) + << ", limiter_tracker_stack.size(): " << _limiter_tracker_stack.size(); + } + } } // only for tcmalloc hook static void consume_no_attach(int64_t size) { - ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume(size); + if (ExecEnv::GetInstance()->initialized()) { + ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume(size); + } } // After thread initialization, calling `init` again must call `clear_untracked_mems` first // to avoid memory tracking loss. void init(); + void init_impl(); + void clear(); // After attach, the current thread TCMalloc Hook starts to consume/release task mem_tracker void attach_limiter_tracker(const std::string& task_id, const TUniqueId& fragment_instance_id, const std::shared_ptr& mem_tracker); - void detach_limiter_tracker(); + // Usually there are only two layers, the first is the default trackerOrphan; + // the second is the query tracker or bthread tracker. + int64_t get_attach_layers() { return _limiter_tracker_stack.size(); } // Must be fast enough! Thread update_tracker may be called very frequently. // So for performance, add tracker as early as possible, and then call update_tracker. @@ -82,9 +94,13 @@ class ThreadMemTrackerMgr { bool is_attach_query() { return _fragment_instance_id_stack.back() != TUniqueId(); } std::shared_ptr limiter_mem_tracker() { + if (!_init) init(); return _limiter_tracker_stack.back(); } - MemTrackerLimiter* limiter_mem_tracker_raw() { return _limiter_tracker_raw; } + MemTrackerLimiter* limiter_mem_tracker_raw() { + if (!_init) init(); + return _limiter_tracker_raw; + } void set_check_limit(bool check_limit) { _check_limit = check_limit; } void set_check_attach(bool check_attach) { _check_attach = check_attach; } @@ -109,6 +125,8 @@ class ThreadMemTrackerMgr { void exceeded(const std::string& failed_msg); private: + // is false: ExecEnv::GetInstance()->initialized() = false when thread local is initialized + bool _init = false; // Cache untracked mem, only update to _untracked_mems when switching mem tracker. // Frequent calls to unordered_map _untracked_mems[] in consume will degrade performance. int64_t _untracked_mem = 0; @@ -117,7 +135,7 @@ class ThreadMemTrackerMgr { // _limiter_tracker_stack[0] = orphan_mem_tracker std::vector> _limiter_tracker_stack; - MemTrackerLimiter* _limiter_tracker_raw; + MemTrackerLimiter* _limiter_tracker_raw = nullptr; std::vector _consumer_tracker_stack; // If true, call memtracker try_consume, otherwise call consume. @@ -131,26 +149,39 @@ class ThreadMemTrackerMgr { }; inline void ThreadMemTrackerMgr::init() { + DCHECK(_limiter_tracker_stack.size() == 0); + DCHECK(_limiter_tracker_raw == nullptr); DCHECK(_consumer_tracker_stack.empty()); - // _limiter_tracker_stack[0] = orphan_mem_tracker - DCHECK(_limiter_tracker_stack.size() <= 1) - << "limiter_tracker_stack.size(): " << _limiter_tracker_stack.size(); + init_impl(); +} + +inline void ThreadMemTrackerMgr::init_impl() { #ifdef BE_TEST if (ExecEnv::GetInstance()->new_process_mem_tracker() == nullptr) { std::shared_ptr process_mem_tracker = std::make_shared(-1, "Process"); - std::shared_ptr _orphan_mem_tracker = + std::shared_ptr orphan_mem_tracker = std::make_shared(-1, "Orphan", process_mem_tracker); - ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, _orphan_mem_tracker); + std::shared_ptr bthread_mem_tracker = + std::make_shared(-1, "Bthread", orphan_mem_tracker); + ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, orphan_mem_tracker, bthread_mem_tracker); } #endif // BE_TEST - if (_limiter_tracker_stack.size() == 0) { - _limiter_tracker_stack.push_back(ExecEnv::GetInstance()->orphan_mem_tracker()); - _limiter_tracker_raw = ExecEnv::GetInstance()->orphan_mem_tracker_raw(); - _task_id_stack.push_back(""); - _fragment_instance_id_stack.push_back(TUniqueId()); - } + _limiter_tracker_stack.push_back(ExecEnv::GetInstance()->orphan_mem_tracker()); + _limiter_tracker_raw = ExecEnv::GetInstance()->orphan_mem_tracker_raw(); + _task_id_stack.push_back(""); + _fragment_instance_id_stack.push_back(TUniqueId()); _check_limit = true; + _init = true; +} + +inline void ThreadMemTrackerMgr::clear() { + flush_untracked_mem(); + std::vector>().swap(_limiter_tracker_stack); + std::vector().swap(_consumer_tracker_stack); + std::vector().swap(_task_id_stack); + std::vector().swap(_fragment_instance_id_stack); + init_impl(); } inline void ThreadMemTrackerMgr::push_consumer_tracker(NewMemTracker* tracker) { @@ -174,7 +205,7 @@ inline void ThreadMemTrackerMgr::consume(int64_t size) { // it will cause tracker->consumption to be temporarily less than 0. if ((_untracked_mem >= config::mem_tracker_consume_min_size_bytes || _untracked_mem <= -config::mem_tracker_consume_min_size_bytes) && - !_stop_consume) { + !_stop_consume && ExecEnv::GetInstance()->initialized()) { if (_check_limit) { flush_untracked_mem(); } else { @@ -188,6 +219,8 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() { // Temporary memory may be allocated during the consumption of the mem tracker, which will lead to entering // the TCMalloc Hook again, so suspend consumption to avoid falling into an infinite loop. _stop_consume = true; + if (!_init) init(); + DCHECK(_limiter_tracker_raw); old_untracked_mem = _untracked_mem; DCHECK(_limiter_tracker_raw); if (CheckLimit) { diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp index b993308db8d841..2b4f9f19e679e2 100644 --- a/be/src/runtime/thread_context.cpp +++ b/be/src/runtime/thread_context.cpp @@ -25,7 +25,7 @@ DEFINE_STATIC_THREAD_LOCAL(ThreadContext, ThreadContextPtr, _ptr); ThreadContextPtr::ThreadContextPtr() { INIT_STATIC_THREAD_LOCAL(ThreadContext, _ptr); - _init = true; + init = true; } AttachTask::AttachTask(const std::shared_ptr& mem_tracker, @@ -39,10 +39,12 @@ AttachTask::AttachTask(const std::shared_ptr& mem_tracker, #else if (ExecEnv::GetInstance()->new_process_mem_tracker() == nullptr) { std::shared_ptr process_mem_tracker = - std::make_shared(-1, "Process"); - std::shared_ptr _orphan_mem_tracker = + std::make_shared(-1, "Process"); + std::shared_ptr orphan_mem_tracker = std::make_shared(-1, "Orphan", process_mem_tracker); - ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, _orphan_mem_tracker); + std::shared_ptr bthread_mem_tracker = + std::make_shared(-1, "Bthread", orphan_mem_tracker); + ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, orphan_mem_tracker, bthread_mem_tracker); } thread_context()->attach_task(type, task_id, fragment_instance_id, ExecEnv::GetInstance()->orphan_mem_tracker()); #endif // BE_TEST @@ -61,10 +63,12 @@ AttachTask::AttachTask(RuntimeState* runtime_state) { #else if (ExecEnv::GetInstance()->new_process_mem_tracker() == nullptr) { std::shared_ptr process_mem_tracker = - std::make_shared(-1, "Process"); - std::shared_ptr _orphan_mem_tracker = + std::make_shared(-1, "Process"); + std::shared_ptr orphan_mem_tracker = std::make_shared(-1, "Orphan", process_mem_tracker); - ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, _orphan_mem_tracker); + std::shared_ptr bthread_mem_tracker = + std::make_shared(-1, "Bthread", orphan_mem_tracker); + ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, orphan_mem_tracker, bthread_mem_tracker); } thread_context()->attach_task(ThreadContext::TaskType::QUERY, "", TUniqueId(), ExecEnv::GetInstance()->orphan_mem_tracker()); #endif // BE_TEST @@ -92,35 +96,4 @@ AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() { #endif // USE_MEM_TRACKER } -SwitchBthread::SwitchBthread() { -#ifdef USE_MEM_TRACKER - _bthread_context = static_cast(bthread_getspecific(btls_key)); - // First call to bthread_getspecific (and before any bthread_setspecific) returns NULL - if (_bthread_context == nullptr) { - // Create thread-local data on demand. - _bthread_context = new ThreadContext; - // set the data so that next time bthread_getspecific in the thread returns the data. - CHECK_EQ(0, bthread_setspecific(btls_key, _bthread_context)); - } else { - DCHECK(_bthread_context->type() == ThreadContext::TaskType::UNKNOWN); - _bthread_context->_thread_mem_tracker_mgr->flush_untracked_mem(); - } - _bthread_context->_thread_mem_tracker_mgr->init(); - _bthread_context->set_type(ThreadContext::TaskType::BRPC); - bthread_context_key = btls_key; - bthread_context = _bthread_context; -#endif -} - -SwitchBthread::~SwitchBthread() { -#ifdef USE_MEM_TRACKER - DCHECK(_bthread_context != nullptr); - _bthread_context->_thread_mem_tracker_mgr->flush_untracked_mem(); - _bthread_context->_thread_mem_tracker_mgr->init(); - _bthread_context->set_type(ThreadContext::TaskType::UNKNOWN); - bthread_context = nullptr; - bthread_context_key = EMPTY_BTLS_KEY; -#endif // USE_MEM_TRACKER -} - } // namespace doris diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index 421c59811bf036..b6143264a8a1d0 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -29,15 +29,17 @@ #include "runtime/memory/thread_mem_tracker_mgr.h" #include "runtime/threadlocal.h" +#ifdef USE_MEM_TRACKER // Add thread mem tracker consumer during query execution. #define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) \ auto VARNAME_LINENUM(add_mem_consumer) = doris::AddThreadMemTrackerConsumer(mem_tracker) - // Attach to task when thread starts #define SCOPED_ATTACH_TASK(arg1, ...) \ auto VARNAME_LINENUM(attach_task) = AttachTask(arg1, ##__VA_ARGS__) - -#define SCOPED_SWITCH_BTHREAD_TLS() auto VARNAME_LINENUM(switch_bthread) = SwitchBthread() +#else +#define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) (void)0 +#define SCOPED_ATTACH_TASK(arg1, ...) (void)0 +#endif namespace doris { @@ -75,7 +77,7 @@ class ThreadContextPtr { // Cannot add destructor `~ThreadContextPtr`, otherwise it will no longer be of type POD, the reason is as above. // TCMalloc hook is triggered during ThreadContext construction, which may lead to deadlock. - bool _init = false; + bool init = false; DECLARE_STATIC_THREAD_LOCAL(ThreadContext, _ptr); }; @@ -85,7 +87,7 @@ inline thread_local ThreadContextPtr thread_context_ptr; // To avoid performance problems caused by frequently calling `bthread_getspecific` to obtain bthread TLS // in tcmalloc hook, cache the key and value of bthread TLS in pthread TLS. inline thread_local ThreadContext* bthread_context; -inline thread_local bthread_key_t bthread_context_key; +inline thread_local bthread_t bthread_id; // The thread context saves some info about a working thread. // 2 required info: @@ -116,18 +118,18 @@ class ThreadContext { } ~ThreadContext() { - // Restore to the memory state before _init=true to ensure accurate overall memory statistics. + // Restore to the memory state before init=true to ensure accurate overall memory statistics. // Thereby ensuring that the memory alloc size is not tracked during the initialization of the - // ThreadContext before `_init = true in ThreadContextPtr()`, + // ThreadContext before `init = true in ThreadContextPtr()`, // Equal to the size of the memory release that is not tracked during the destruction of the - // ThreadContext after `_init = false in ~ThreadContextPtr()`, - init(); - thread_context_ptr._init = false; + // ThreadContext after `init = false in ~ThreadContextPtr()`, + if (ExecEnv::GetInstance()->initialized()) _thread_mem_tracker_mgr->clear(); + thread_context_ptr.init = false; } void init() { _type = TaskType::UNKNOWN; - _thread_mem_tracker_mgr->init(); + if (ExecEnv::GetInstance()->initialized()) _thread_mem_tracker_mgr->init(); _thread_id = get_thread_id(); } @@ -174,17 +176,42 @@ class ThreadContext { TUniqueId _fragment_instance_id; }; -static void update_bthread_context() { - if (btls_key != bthread_context_key) { - // pthread switch occurs, updating bthread_context and bthread_context_key cached in pthread tls. - bthread_context = static_cast(bthread_getspecific(btls_key)); - bthread_context_key = btls_key; +static void attach_bthread() { + bthread_id = bthread_self(); + bthread_context = static_cast(bthread_getspecific(btls_key)); + if (bthread_context == nullptr) { + // A new bthread starts, two scenarios: + // 1. First call to bthread_getspecific (and before any bthread_setspecific) returns NULL + // 2. There are not enough reusable btls in btls pool. +#ifndef BE_TEST + DCHECK(ExecEnv::GetInstance()->initialized()); +#endif + // Create thread-local data on demand. + bthread_context = new ThreadContext; + std::shared_ptr btls_tracker = + std::make_shared(-1, "Bthread:id=" + std::to_string(bthread_id), + ExecEnv::GetInstance()->bthread_mem_tracker()); + bthread_context->attach_task(ThreadContext::TaskType::BRPC, "", TUniqueId(), btls_tracker); + // set the data so that next time bthread_getspecific in the thread returns the data. + CHECK_EQ(0, bthread_setspecific(btls_key, bthread_context)); + } else { + // two scenarios: + // 1. A new bthread starts, but get a reuses btls. + // 2. A pthread switch occurs. Because the pthread switch cannot be accurately identified at the moment. + // So tracker call reset 0 like reuses btls. + DCHECK(bthread_context->_thread_mem_tracker_mgr->get_attach_layers() == 2); + bthread_context->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->reset_zero(); } } static ThreadContext* thread_context() { - if (btls_key != EMPTY_BTLS_KEY && bthread_context != nullptr) { - update_bthread_context(); + if (bthread_self() != 0) { + if (bthread_self() != bthread_id) { + // A new bthread starts or pthread switch occurs. + thread_context_ptr.init = false; + attach_bthread(); + thread_context_ptr.init = true; + } return bthread_context; } else { return thread_context_ptr._ptr; @@ -222,18 +249,6 @@ class AddThreadMemTrackerConsumer { ~AddThreadMemTrackerConsumer(); }; -class SwitchBthread { -public: - explicit SwitchBthread(); - - ~SwitchBthread(); - -private: -#ifdef USE_MEM_TRACKER - ThreadContext* _bthread_context; -#endif -}; - class StopCheckThreadMemTrackerLimit { public: explicit StopCheckThreadMemTrackerLimit() { @@ -246,6 +261,7 @@ class StopCheckThreadMemTrackerLimit { }; // The following macros are used to fix the tracking accuracy of caches etc. +#ifdef USE_MEM_TRACKER #define STOP_CHECK_THREAD_MEM_TRACKER_LIMIT() \ auto VARNAME_LINENUM(stop_check_limit) = StopCheckThreadMemTrackerLimit() #define CONSUME_THREAD_MEM_TRACKER(size) \ @@ -258,9 +274,38 @@ class StopCheckThreadMemTrackerLimit { #define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \ tracker->transfer_to( \ size, doris::thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()) -#define RETURN_LIMIT_EXCEEDED(state, msg, ...) \ - return doris::thread_context() \ - ->_thread_mem_tracker_mgr->limiter_mem_tracker_raw() \ - ->mem_limit_exceeded(state, fmt::format("exec node:<{}>, {}", "", msg), \ - ##__VA_ARGS__); +#define RETURN_LIMIT_EXCEEDED(state, msg, ...) \ + return doris::thread_context() \ + ->_thread_mem_tracker_mgr->limiter_mem_tracker_raw() \ + ->mem_limit_exceeded( \ + state, \ + fmt::format("exec node:<{}>, {}", "", msg), \ + ##__VA_ARGS__); +// Mem Hook to consume thread mem tracker +#define MEM_MALLOC_HOOK(size) \ + do { \ + if (doris::thread_context_ptr.init) { \ + doris::thread_context()->_thread_mem_tracker_mgr->consume(size); \ + } else { \ + doris::ThreadMemTrackerMgr::consume_no_attach(size); \ + } \ + } while (0) +#define MEM_FREE_HOOK(size) \ + do { \ + if (doris::thread_context_ptr.init) { \ + doris::thread_context()->_thread_mem_tracker_mgr->consume(-size); \ + } else { \ + doris::ThreadMemTrackerMgr::consume_no_attach(-size); \ + } \ + } while (0) +#else +#define STOP_CHECK_THREAD_MEM_TRACKER_LIMIT() (void)0 +#define CONSUME_THREAD_MEM_TRACKER(size) (void)0 +#define RELEASE_THREAD_MEM_TRACKER(size) (void)0 +#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) (void)0 +#define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) (void)0 +#define RETURN_LIMIT_EXCEEDED(state, msg, ...) (void)0 +#define MEM_MALLOC_HOOK(size) (void)0 +#define MEM_FREE_HOOK(size) (void)0 +#endif } // namespace doris diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 2b9ee2fb3f77e0..b7bc9a179b6077 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -91,7 +91,6 @@ void PInternalServiceImpl::transmit_data(google::protobuf::RpcController* cnt const PTransmitDataParams* request, PTransmitDataResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); VLOG_ROW << "transmit data: fragment_instance_id=" << print_id(request->finst_id()) << " node=" << request->node_id(); brpc::Controller* cntl = static_cast(cntl_base); @@ -105,7 +104,6 @@ void PInternalServiceImpl::transmit_data_by_http(google::protobuf::RpcControl const PEmptyRequest* request, PTransmitDataResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); PTransmitDataParams* request_raw = new PTransmitDataParams(); google::protobuf::Closure* done_raw = new NewHttpClosure(request_raw, done); @@ -163,7 +161,6 @@ void PInternalServiceImpl::tablet_writer_open(google::protobuf::RpcController const PTabletWriterOpenRequest* request, PTabletWriterOpenResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); VLOG_RPC << "tablet writer open, id=" << request->id() << ", index_id=" << request->index_id() << ", txn_id=" << request->txn_id(); brpc::ClosureGuard closure_guard(done); @@ -181,7 +178,6 @@ void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController const PExecPlanFragmentRequest* request, PExecPlanFragmentResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); auto st = Status::OK(); bool compact = request->has_compact() ? request->compact() : false; @@ -207,7 +203,6 @@ void PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcCont const PExecPlanFragmentStartRequest* request, PExecPlanFragmentResult* result, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); auto st = _exec_env->fragment_mgr()->start_query_execution(request); st.to_protobuf(result->mutable_status()); @@ -218,7 +213,6 @@ void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcContr const PTabletWriterAddBatchRequest* request, PTabletWriterAddBatchResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); _tablet_writer_add_batch(cntl_base, request, response, done); } @@ -226,7 +220,6 @@ template void PInternalServiceImpl::tablet_writer_add_batch_by_http( google::protobuf::RpcController* cntl_base, const ::doris::PEmptyRequest* request, PTabletWriterAddBatchResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); PTabletWriterAddBatchRequest* request_raw = new PTabletWriterAddBatchRequest(); google::protobuf::Closure* done_raw = new NewHttpClosure(request_raw, done); @@ -281,7 +274,6 @@ void PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcControll const PTabletWriterCancelRequest* request, PTabletWriterCancelResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); VLOG_RPC << "tablet writer cancel, id=" << request->id() << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id(); brpc::ClosureGuard closure_guard(done); @@ -327,7 +319,6 @@ void PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcControll const PCancelPlanFragmentRequest* request, PCancelPlanFragmentResult* result, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); TUniqueId tid; tid.__set_hi(request->finst_id().hi()); @@ -352,7 +343,6 @@ template void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* cntl_base, const PFetchDataRequest* request, PFetchDataResult* result, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::Controller* cntl = static_cast(cntl_base); GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done); _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx); @@ -362,7 +352,6 @@ template void PInternalServiceImpl::get_info(google::protobuf::RpcController* controller, const PProxyRequest* request, PProxyResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); // PProxyRequest is defined in gensrc/proto/internal_service.proto // Currently it supports 2 kinds of requests: @@ -425,7 +414,6 @@ void PInternalServiceImpl::update_cache(google::protobuf::RpcController* cont const PUpdateCacheRequest* request, PCacheResponse* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); _exec_env->result_cache()->update(request, response); } @@ -435,7 +423,6 @@ void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* contr const PFetchCacheRequest* request, PFetchCacheResult* result, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); _exec_env->result_cache()->fetch(request, result); } @@ -445,7 +432,6 @@ void PInternalServiceImpl::clear_cache(google::protobuf::RpcController* contr const PClearCacheRequest* request, PCacheResponse* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); _exec_env->result_cache()->clear(request, response); } @@ -455,7 +441,6 @@ void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController* co const ::doris::PMergeFilterRequest* request, ::doris::PMergeFilterResponse* response, ::google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); auto buf = static_cast(controller)->request_attachment(); Status st = _exec_env->fragment_mgr()->merge_filter(request, buf.to_string().data()); @@ -470,7 +455,6 @@ void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* co const ::doris::PPublishFilterRequest* request, ::doris::PPublishFilterResponse* response, ::google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); auto attachment = static_cast(controller)->request_attachment(); UniqueId unique_id(request->query_id()); @@ -487,7 +471,6 @@ template void PInternalServiceImpl::send_data(google::protobuf::RpcController* controller, const PSendDataRequest* request, PSendDataResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); TUniqueId fragment_instance_id; fragment_instance_id.hi = request->fragment_instance_id().hi(); @@ -511,7 +494,6 @@ template void PInternalServiceImpl::commit(google::protobuf::RpcController* controller, const PCommitRequest* request, PCommitResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); TUniqueId fragment_instance_id; fragment_instance_id.hi = request->fragment_instance_id().hi(); @@ -530,7 +512,6 @@ template void PInternalServiceImpl::rollback(google::protobuf::RpcController* controller, const PRollbackRequest* request, PRollbackResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); TUniqueId fragment_instance_id; fragment_instance_id.hi = request->fragment_instance_id().hi(); @@ -550,7 +531,6 @@ void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController const PConstantExprRequest* request, PConstantExprResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); brpc::Controller* cntl = static_cast(cntl_base); @@ -587,7 +567,6 @@ void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* cn const PTransmitDataParams* request, PTransmitDataResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); // TODO(zxy) delete in 1.2 version brpc::Controller* cntl = static_cast(cntl_base); attachment_transfer_request_block(request, cntl); @@ -600,7 +579,6 @@ void PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcContro const PEmptyRequest* request, PTransmitDataResult* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); PTransmitDataParams* request_raw = new PTransmitDataParams(); google::protobuf::Closure* done_raw = new NewHttpClosure(request_raw, done); @@ -658,7 +636,6 @@ void PInternalServiceImpl::check_rpc_channel(google::protobuf::RpcController* const PCheckRPCChannelRequest* request, PCheckRPCChannelResponse* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); response->mutable_status()->set_status_code(0); if (request->data().size() != request->size()) { @@ -686,7 +663,6 @@ void PInternalServiceImpl::reset_rpc_channel(google::protobuf::RpcController* const PResetRPCChannelRequest* request, PResetRPCChannelResponse* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); response->mutable_status()->set_status_code(0); if (request->all()) { @@ -721,7 +697,6 @@ void PInternalServiceImpl::hand_shake(google::protobuf::RpcController* cntl_b const PHandShakeRequest* request, PHandShakeResponse* response, google::protobuf::Closure* done) { - SCOPED_SWITCH_BTHREAD_TLS(); brpc::ClosureGuard closure_guard(done); if (request->has_hello()) { response->set_hello(request->hello()); diff --git a/be/test/testutil/run_all_tests.cpp b/be/test/testutil/run_all_tests.cpp index 842282e8c68fb4..aa2d0a74cf4d70 100644 --- a/be/test/testutil/run_all_tests.cpp +++ b/be/test/testutil/run_all_tests.cpp @@ -27,6 +27,15 @@ #include "util/mem_info.h" int main(int argc, char** argv) { + std::shared_ptr process_mem_tracker = + std::make_shared(-1, "Process"); + std::shared_ptr orphan_mem_tracker = + std::make_shared(-1, "Orphan", process_mem_tracker); + std::shared_ptr bthread_mem_tracker = + std::make_shared(-1, "Bthread", orphan_mem_tracker); + doris::ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, orphan_mem_tracker, + bthread_mem_tracker); + doris::thread_context()->_thread_mem_tracker_mgr->init(); doris::StoragePageCache::create_global_cache(1 << 30, 10); doris::SegmentLoader::create_global_instance(1000); std::string conf = std::string(getenv("DORIS_HOME")) + "/conf/be.conf";