From 09b0d33c4c01869d42ac0b41fa3a6cf189872589 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Wed, 21 Sep 2022 15:47:10 +0800 Subject: [PATCH] [fix](memtracker) Introduce orphan mem tracker to verify memory tracking accuracy (#12794) The mem hook consumes the orphan tracker by default. If the thread does not attach other trackers, by default all consumption will be passed to the process tracker through the orphan tracker. In real time, consumption of all other trackers + orphan tracker consumption = process tracker consumption. Ideally, all threads are expected to attach to the specified tracker, so that "all memory has its own ownership", and the consumption of the orphan mem tracker is close to 0, but greater than 0. --- be/src/runtime/exec_env.h | 21 ++++++++--- be/src/runtime/exec_env_init.cpp | 3 +- be/src/runtime/mem_pool.cpp | 8 ++-- be/src/runtime/mem_pool.h | 2 +- be/src/runtime/memory/mem_tracker.cpp | 16 ++++---- be/src/runtime/memory/mem_tracker_limiter.cpp | 23 +++++++++++- be/src/runtime/memory/mem_tracker_limiter.h | 2 +- .../runtime/memory/thread_mem_tracker_mgr.cpp | 19 +++++----- .../runtime/memory/thread_mem_tracker_mgr.h | 37 ++++++++++++------- be/src/runtime/runtime_state.cpp | 9 ++--- be/src/service/doris_main.cpp | 2 +- be/src/vec/common/pod_array.h | 8 ++-- 12 files changed, 96 insertions(+), 54 deletions(-) diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index a28e497a6becd6..4016506fe0b309 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -122,15 +122,17 @@ class ExecEnv { PoolMemTrackerRegistry* pool_mem_trackers() { return _pool_mem_trackers; } std::shared_ptr new_process_mem_tracker() { return _process_mem_tracker; } - MemTrackerLimiter* process_mem_tracker_raw() { return _process_mem_tracker_raw; } - - void set_process_mem_tracker(const std::shared_ptr& tracker) { - _process_mem_tracker = tracker; - _process_mem_tracker_raw = tracker.get(); + void set_global_mem_tracker(const std::shared_ptr& process_tracker, + const std::shared_ptr& orphan_tracker) { + _process_mem_tracker = process_tracker; + _orphan_mem_tracker = orphan_tracker; + _orphan_mem_tracker_raw = orphan_tracker.get(); } std::shared_ptr allocator_cache_mem_tracker() { return _allocator_cache_mem_tracker; } + std::shared_ptr orphan_mem_tracker() { return _orphan_mem_tracker; } + MemTrackerLimiter* orphan_mem_tracker_raw() { return _orphan_mem_tracker_raw; } std::shared_ptr query_pool_mem_tracker() { return _query_pool_mem_tracker; } std::shared_ptr load_pool_mem_tracker() { return _load_pool_mem_tracker; } MemTrackerTaskPool* task_pool_mem_tracker_registry() { return _task_pool_mem_tracker_registry; } @@ -207,8 +209,15 @@ class ExecEnv { // The ancestor for all trackers. Every tracker is visible from the process down. // Not limit total memory by process tracker, and it's just used to track virtual memory of process. std::shared_ptr _process_mem_tracker; + // tcmalloc/jemalloc allocator cache tracker, Including thread cache, free heap, etc. std::shared_ptr _allocator_cache_mem_tracker; - MemTrackerLimiter* _process_mem_tracker_raw; + // The default tracker consumed by mem hook. If the thread does not attach other trackers, + // by default all consumption will be passed to the process tracker through the orphan tracker. + // In real time, `consumption of all limiter trackers` + `orphan tracker consumption` = `process tracker consumption`. + // Ideally, all threads are expected to attach to the specified tracker, so that "all memory has its own ownership", + // and the consumption of the orphan mem tracker is close to 0, but greater than 0. + std::shared_ptr _orphan_mem_tracker; + MemTrackerLimiter* _orphan_mem_tracker_raw; // The ancestor for all querys tracker. std::shared_ptr _query_pool_mem_tracker; // The ancestor for all load tracker. diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 0fe31a1a6b10c9..68518e905b1093 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -205,7 +205,8 @@ Status ExecEnv::_init_mem_tracker() { _process_mem_tracker = std::make_shared(global_memory_limit_bytes, "Process"); - _process_mem_tracker_raw = _process_mem_tracker.get(); + _orphan_mem_tracker = std::make_shared(-1, "Orphan", _process_mem_tracker); + _orphan_mem_tracker_raw = _orphan_mem_tracker.get(); thread_context()->_thread_mem_tracker_mgr->init(); thread_context()->_thread_mem_tracker_mgr->set_check_attach(false); #if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && \ diff --git a/be/src/runtime/mem_pool.cpp b/be/src/runtime/mem_pool.cpp index 96a0daad4d6f40..21b475400ebb9c 100644 --- a/be/src/runtime/mem_pool.cpp +++ b/be/src/runtime/mem_pool.cpp @@ -60,7 +60,8 @@ MemPool::~MemPool() { } mem_tracker_->Release(total_bytes_released); THREAD_MEM_TRACKER_TRANSFER_FROM(total_bytes_released - peak_allocated_bytes_, - ExecEnv::GetInstance()->process_mem_tracker_raw()); + ExecEnv::GetInstance()->orphan_mem_tracker_raw()); + DorisMetrics::instance()->memory_pool_bytes_total->increment(-total_bytes_released); } @@ -81,7 +82,8 @@ void MemPool::free_all() { ChunkAllocator::instance()->free(chunk.chunk); } THREAD_MEM_TRACKER_TRANSFER_FROM(total_bytes_released - peak_allocated_bytes_, - ExecEnv::GetInstance()->process_mem_tracker_raw()); + ExecEnv::GetInstance()->orphan_mem_tracker_raw()); + chunks_.clear(); next_chunk_size_ = INITIAL_CHUNK_SIZE; current_chunk_idx_ = -1; @@ -146,7 +148,7 @@ bool MemPool::find_chunk(size_t min_size, bool check_limits) { mem_tracker_->Release(chunk_size); return false; } - THREAD_MEM_TRACKER_TRANSFER_TO(chunk_size, ExecEnv::GetInstance()->process_mem_tracker_raw()); + THREAD_MEM_TRACKER_TRANSFER_TO(chunk_size, ExecEnv::GetInstance()->orphan_mem_tracker_raw()); ASAN_POISON_MEMORY_REGION(chunk.data, chunk_size); // Put it before the first free chunk. If no free chunks, it goes at the end. if (first_free_idx == static_cast(chunks_.size())) { diff --git a/be/src/runtime/mem_pool.h b/be/src/runtime/mem_pool.h index 4c93e8568fa793..8eee3334ce5102 100644 --- a/be/src/runtime/mem_pool.h +++ b/be/src/runtime/mem_pool.h @@ -210,7 +210,7 @@ class MemPool { void reset_peak() { if (total_allocated_bytes_ - peak_allocated_bytes_ > 65536) { THREAD_MEM_TRACKER_TRANSFER_FROM(total_allocated_bytes_ - peak_allocated_bytes_, - ExecEnv::GetInstance()->process_mem_tracker_raw()); + ExecEnv::GetInstance()->orphan_mem_tracker_raw()); peak_allocated_bytes_ = total_allocated_bytes_; } } diff --git a/be/src/runtime/memory/mem_tracker.cpp b/be/src/runtime/memory/mem_tracker.cpp index 07ca883a941c95..bae8474cf4e40c 100644 --- a/be/src/runtime/memory/mem_tracker.cpp +++ b/be/src/runtime/memory/mem_tracker.cpp @@ -58,12 +58,14 @@ NewMemTracker::NewMemTracker(const std::string& label, RuntimeProfile* profile) _consumption = profile->AddSharedHighWaterMarkCounter(COUNTER_NAME, TUnit::BYTES); } - DCHECK(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker() != nullptr); - _label = fmt::format( - "{} | {}", label, - thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->label()); - _bind_group_num = - thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->group_num(); + DCHECK(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw() != nullptr); + MemTrackerLimiter* parent = + thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw(); + _label = fmt::format("[Observer] {} | {}", label, + parent->label() == "Orphan" ? "Process" : parent->label()); + _bind_group_num = parent->label() == "Orphan" + ? ExecEnv::GetInstance()->new_process_mem_tracker()->group_num() + : parent->group_num(); { std::lock_guard l(mem_tracker_pool[_bind_group_num].group_lock); _tracker_group_it = mem_tracker_pool[_bind_group_num].trackers.insert( @@ -112,4 +114,4 @@ std::string NewMemTracker::log_usage(NewMemTracker::Snapshot snapshot) { snapshot.peak_consumption); } -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 0494657396f0e5..28ce5389b79bf1 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -41,7 +41,14 @@ MemTrackerLimiter::MemTrackerLimiter(int64_t byte_limit, const std::string& labe _label = label; _limit = byte_limit; _group_num = GetCurrentTimeMicros() % 1000; - _parent = parent ? parent : thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(); + if (parent || label == "Process") { + _parent = parent; + } else if (thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->label() == + "Orphan") { + _parent = ExecEnv::GetInstance()->new_process_mem_tracker(); + } else { + _parent = thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(); + } DCHECK(_parent || label == "Process"); // Walks the MemTrackerLimiter hierarchy and populates _all_ancestors and _limited_ancestors @@ -69,6 +76,18 @@ MemTrackerLimiter::~MemTrackerLimiter() { if (_label == "Process") doris::thread_context_ptr._init = false; DCHECK(remain_child_count() == 0 || _label == "Process"); consume(_untracked_mem.exchange(0)); +#ifndef BE_TEST + // In order to ensure `consumption of all limiter trackers` + `orphan tracker consumption` = `process tracker consumption` + // in real time. Merge its consumption into orphan when all third level limiter trackers are destructed, to avoid repetition. + // the first layer: process; + // the second layer: a tracker that will not be destructed globally (query/load pool, load channel mgr, etc.); + // the third layer: a query/load/compaction task generates a tracker (query tracker, load channel tracker, etc.). + if (_parent->parent()->label() == "Process") { + ExecEnv::GetInstance()->orphan_mem_tracker_raw()->cache_consume_local( + _consumption->current_value()); + } +#endif + if (_parent) { std::lock_guard l(_parent->_child_tracker_limiter_lock); if (_child_tracker_it != _parent->_child_tracker_limiters.end()) { @@ -258,7 +277,7 @@ std::string MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, // The limit of the current tracker and parents is less than 0, the consume will not fail, // and the current process memory has no excess limit. detail += fmt::format("unknown exceed reason, executing msg:<{}>", msg); - print_log_usage_tracker = ExecEnv::GetInstance()->process_mem_tracker_raw(); + print_log_usage_tracker = ExecEnv::GetInstance()->new_process_mem_tracker().get(); } auto failed_msg = MemTrackerLimiter::limit_exceeded_errmsg_suffix_str(detail); if (print_log_usage_tracker != nullptr) print_log_usage_tracker->print_log_usage(failed_msg); diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 0886279c6ea33e..7f294ef9982bc5 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -217,7 +217,7 @@ class MemTrackerLimiter final : public NewMemTracker { "alloc size {}", PerfCounters::get_vm_rss_str(), MemInfo::allocator_cache_mem_str(), MemInfo::mem_limit_str(), print_bytes(bytes)); - ExecEnv::GetInstance()->process_mem_tracker_raw()->print_log_usage(err_msg); + ExecEnv::GetInstance()->new_process_mem_tracker()->print_log_usage(err_msg); return err_msg; } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp index 40879b6c4a3dd9..3cb97c6ff8474f 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp @@ -29,24 +29,25 @@ void ThreadMemTrackerMgr::attach_limiter_tracker( const std::shared_ptr& mem_tracker) { DCHECK(mem_tracker); flush_untracked_mem(); - _task_id = task_id; - _fragment_instance_id = fragment_instance_id; - _limiter_tracker = mem_tracker; + _task_id_stack.push_back(task_id); + _fragment_instance_id_stack.push_back(fragment_instance_id); + _limiter_tracker_stack.push_back(mem_tracker); _limiter_tracker_raw = mem_tracker.get(); } void ThreadMemTrackerMgr::detach_limiter_tracker() { + DCHECK(!_limiter_tracker_stack.empty()); flush_untracked_mem(); - _task_id = ""; - _fragment_instance_id = TUniqueId(); - _limiter_tracker = ExecEnv::GetInstance()->new_process_mem_tracker(); - _limiter_tracker_raw = ExecEnv::GetInstance()->process_mem_tracker_raw(); + _task_id_stack.pop_back(); + _fragment_instance_id_stack.pop_back(); + _limiter_tracker_stack.pop_back(); + _limiter_tracker_raw = _limiter_tracker_stack.back().get(); } void ThreadMemTrackerMgr::exceeded_cancel_task(const std::string& cancel_details) { - if (_fragment_instance_id != TUniqueId()) { + if (_fragment_instance_id_stack.back() != TUniqueId()) { ExecEnv::GetInstance()->fragment_mgr()->cancel( - _fragment_instance_id, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, + _fragment_instance_id_stack.back(), PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, cancel_details); } } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 3f652ecd5a9fa4..2736ff9630ff81 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -50,7 +50,7 @@ class ThreadMemTrackerMgr { // only for tcmalloc hook static void consume_no_attach(int64_t size) { - ExecEnv::GetInstance()->process_mem_tracker_raw()->consume(size); + ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume(size); } // After thread initialization, calling `init` again must call `clear_untracked_mems` first @@ -78,9 +78,11 @@ class ThreadMemTrackerMgr { template void flush_untracked_mem(); - bool is_attach_query() { return _fragment_instance_id != TUniqueId(); } + bool is_attach_query() { return _fragment_instance_id_stack.back() != TUniqueId(); } - std::shared_ptr limiter_mem_tracker() { return _limiter_tracker; } + std::shared_ptr limiter_mem_tracker() { + return _limiter_tracker_stack.back(); + } MemTrackerLimiter* limiter_mem_tracker_raw() { return _limiter_tracker_raw; } void set_check_limit(bool check_limit) { _check_limit = check_limit; } @@ -95,8 +97,8 @@ class ThreadMemTrackerMgr { return fmt::format( "ThreadMemTrackerMgr debug, _untracked_mem:{}, _task_id:{}, " "_limiter_tracker:<{}>, _consumer_tracker_stack:<{}>", - std::to_string(_untracked_mem), _task_id, _limiter_tracker->log_usage(1), - fmt::to_string(consumer_tracker_buf)); + std::to_string(_untracked_mem), _task_id_stack.back(), + _limiter_tracker_raw->log_usage(1), fmt::to_string(consumer_tracker_buf)); } private: @@ -112,25 +114,32 @@ class ThreadMemTrackerMgr { int64_t old_untracked_mem = 0; std::string failed_msg = std::string(); - std::shared_ptr _limiter_tracker; - std::vector _consumer_tracker_stack; + // _limiter_tracker_stack[0] = orphan_mem_tracker + std::vector> _limiter_tracker_stack; MemTrackerLimiter* _limiter_tracker_raw; + std::vector _consumer_tracker_stack; // If true, call memtracker try_consume, otherwise call consume. bool _check_limit = false; // If there is a memory new/delete operation in the consume method, it may enter infinite recursion. bool _stop_consume = false; bool _check_attach = true; - std::string _task_id; - TUniqueId _fragment_instance_id; + std::vector _task_id_stack; + std::vector _fragment_instance_id_stack; ExceedCallBack _cb_func = nullptr; }; inline void ThreadMemTrackerMgr::init() { DCHECK(_consumer_tracker_stack.empty()); - _task_id = ""; - _limiter_tracker = ExecEnv::GetInstance()->new_process_mem_tracker(); - _limiter_tracker_raw = ExecEnv::GetInstance()->process_mem_tracker_raw(); + // _limiter_tracker_stack[0] = orphan_mem_tracker + DCHECK(_limiter_tracker_stack.size() <= 1) + << "limiter_tracker_stack.size(): " << _limiter_tracker_stack.size(); + if (_limiter_tracker_stack.size() == 0) { + _limiter_tracker_stack.push_back(ExecEnv::GetInstance()->orphan_mem_tracker()); + _limiter_tracker_raw = ExecEnv::GetInstance()->orphan_mem_tracker_raw(); + _task_id_stack.push_back(""); + _fragment_instance_id_stack.push_back(TUniqueId()); + } _check_limit = true; } @@ -170,7 +179,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() { // the TCMalloc Hook again, so suspend consumption to avoid falling into an infinite loop. _stop_consume = true; old_untracked_mem = _untracked_mem; - DCHECK(_limiter_tracker); + DCHECK(_limiter_tracker_raw); if (CheckLimit) { #ifndef BE_TEST // When all threads are started, `attach_limiter_tracker` is expected to be called to bind the limiter tracker. @@ -180,7 +189,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() { // TODO(zxy) The current p0 test cannot guarantee that all threads are checked, // so disable it and try to open it when memory tracking is not on time. // DCHECK(!_check_attach || btls_key != EMPTY_BTLS_KEY || - // _limiter_tracker->label() != "Process"); + // _limiter_tracker_raw->label() != "Process"); #endif if (!_limiter_tracker_raw->try_consume(old_untracked_mem, failed_msg)) { // The memory has been allocated, so when TryConsume fails, need to continue to complete diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 4036a99786977e..28c00ba48bc513 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -193,14 +193,13 @@ Status RuntimeState::init(const TUniqueId& fragment_instance_id, const TQueryOpt Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) { bool has_query_mem_tracker = _query_options.__isset.mem_limit && (_query_options.mem_limit > 0); int64_t bytes_limit = has_query_mem_tracker ? _query_options.mem_limit : -1; - if (bytes_limit > ExecEnv::GetInstance()->process_mem_tracker_raw()->limit()) { + if (bytes_limit > ExecEnv::GetInstance()->process_mem_tracker()->limit()) { VLOG_NOTICE << "Query memory limit " << PrettyPrinter::print(bytes_limit, TUnit::BYTES) << " exceeds process memory limit of " - << PrettyPrinter::print( - ExecEnv::GetInstance()->process_mem_tracker_raw()->limit(), - TUnit::BYTES) + << PrettyPrinter::print(ExecEnv::GetInstance()->process_mem_tracker()->limit(), + TUnit::BYTES) << ". Using process memory limit instead"; - bytes_limit = ExecEnv::GetInstance()->process_mem_tracker_raw()->limit(); + bytes_limit = ExecEnv::GetInstance()->process_mem_tracker()->limit(); } // we do not use global query-map for now, to avoid mem-exceeded different fragments diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index 96ef5742610285..a752088c851092 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -472,7 +472,7 @@ int main(int argc, char** argv) { doris::ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->logout_task_mem_tracker(); // The process tracker print log usage interval is 1s to avoid a large number of tasks being // canceled when the process exceeds the mem limit, resulting in too many duplicate logs. - doris::ExecEnv::GetInstance()->process_mem_tracker_raw()->enable_print_log_usage(); + doris::ExecEnv::GetInstance()->new_process_mem_tracker()->enable_print_log_usage(); sleep(1); } diff --git a/be/src/vec/common/pod_array.h b/be/src/vec/common/pod_array.h index 5deb16d106209e..9c8ee3f7f55cfa 100644 --- a/be/src/vec/common/pod_array.h +++ b/be/src/vec/common/pod_array.h @@ -115,7 +115,7 @@ class PODArrayBase : private boost::noncopyable, inline void reset_peak() { if (UNLIKELY(c_end - c_end_peak > 65536)) { THREAD_MEM_TRACKER_TRANSFER_FROM(c_end - c_end_peak, - ExecEnv::GetInstance()->process_mem_tracker_raw()); + ExecEnv::GetInstance()->orphan_mem_tracker_raw()); c_end_peak = c_end; } } @@ -127,7 +127,7 @@ class PODArrayBase : private boost::noncopyable, template void alloc(size_t bytes, TAllocatorParams&&... allocator_params) { THREAD_MEM_TRACKER_TRANSFER_TO(bytes - pad_right - pad_left, - ExecEnv::GetInstance()->process_mem_tracker_raw()); + ExecEnv::GetInstance()->orphan_mem_tracker_raw()); c_start = c_end = c_end_peak = reinterpret_cast(TAllocator::alloc( bytes, std::forward(allocator_params)...)) + @@ -144,7 +144,7 @@ class PODArrayBase : private boost::noncopyable, TAllocator::free(c_start - pad_left, allocated_bytes()); THREAD_MEM_TRACKER_TRANSFER_FROM(c_end_of_storage - c_end_peak, - ExecEnv::GetInstance()->process_mem_tracker_raw()); + ExecEnv::GetInstance()->orphan_mem_tracker_raw()); } template @@ -157,7 +157,7 @@ class PODArrayBase : private boost::noncopyable, unprotect(); THREAD_MEM_TRACKER_TRANSFER_TO(bytes - allocated_bytes(), - ExecEnv::GetInstance()->process_mem_tracker_raw()); + ExecEnv::GetInstance()->orphan_mem_tracker_raw()); ptrdiff_t end_diff = c_end - c_start;