diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index a28e497a6becd6..4016506fe0b309 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -122,15 +122,17 @@ class ExecEnv { PoolMemTrackerRegistry* pool_mem_trackers() { return _pool_mem_trackers; } std::shared_ptr new_process_mem_tracker() { return _process_mem_tracker; } - MemTrackerLimiter* process_mem_tracker_raw() { return _process_mem_tracker_raw; } - - void set_process_mem_tracker(const std::shared_ptr& tracker) { - _process_mem_tracker = tracker; - _process_mem_tracker_raw = tracker.get(); + void set_global_mem_tracker(const std::shared_ptr& process_tracker, + const std::shared_ptr& orphan_tracker) { + _process_mem_tracker = process_tracker; + _orphan_mem_tracker = orphan_tracker; + _orphan_mem_tracker_raw = orphan_tracker.get(); } std::shared_ptr allocator_cache_mem_tracker() { return _allocator_cache_mem_tracker; } + std::shared_ptr orphan_mem_tracker() { return _orphan_mem_tracker; } + MemTrackerLimiter* orphan_mem_tracker_raw() { return _orphan_mem_tracker_raw; } std::shared_ptr query_pool_mem_tracker() { return _query_pool_mem_tracker; } std::shared_ptr load_pool_mem_tracker() { return _load_pool_mem_tracker; } MemTrackerTaskPool* task_pool_mem_tracker_registry() { return _task_pool_mem_tracker_registry; } @@ -207,8 +209,15 @@ class ExecEnv { // The ancestor for all trackers. Every tracker is visible from the process down. // Not limit total memory by process tracker, and it's just used to track virtual memory of process. std::shared_ptr _process_mem_tracker; + // tcmalloc/jemalloc allocator cache tracker, Including thread cache, free heap, etc. std::shared_ptr _allocator_cache_mem_tracker; - MemTrackerLimiter* _process_mem_tracker_raw; + // The default tracker consumed by mem hook. If the thread does not attach other trackers, + // by default all consumption will be passed to the process tracker through the orphan tracker. + // In real time, `consumption of all limiter trackers` + `orphan tracker consumption` = `process tracker consumption`. + // Ideally, all threads are expected to attach to the specified tracker, so that "all memory has its own ownership", + // and the consumption of the orphan mem tracker is close to 0, but greater than 0. + std::shared_ptr _orphan_mem_tracker; + MemTrackerLimiter* _orphan_mem_tracker_raw; // The ancestor for all querys tracker. std::shared_ptr _query_pool_mem_tracker; // The ancestor for all load tracker. diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index 0fe31a1a6b10c9..68518e905b1093 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -205,7 +205,8 @@ Status ExecEnv::_init_mem_tracker() { _process_mem_tracker = std::make_shared(global_memory_limit_bytes, "Process"); - _process_mem_tracker_raw = _process_mem_tracker.get(); + _orphan_mem_tracker = std::make_shared(-1, "Orphan", _process_mem_tracker); + _orphan_mem_tracker_raw = _orphan_mem_tracker.get(); thread_context()->_thread_mem_tracker_mgr->init(); thread_context()->_thread_mem_tracker_mgr->set_check_attach(false); #if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && \ diff --git a/be/src/runtime/mem_pool.cpp b/be/src/runtime/mem_pool.cpp index 96a0daad4d6f40..21b475400ebb9c 100644 --- a/be/src/runtime/mem_pool.cpp +++ b/be/src/runtime/mem_pool.cpp @@ -60,7 +60,8 @@ MemPool::~MemPool() { } mem_tracker_->Release(total_bytes_released); THREAD_MEM_TRACKER_TRANSFER_FROM(total_bytes_released - peak_allocated_bytes_, - ExecEnv::GetInstance()->process_mem_tracker_raw()); + ExecEnv::GetInstance()->orphan_mem_tracker_raw()); + DorisMetrics::instance()->memory_pool_bytes_total->increment(-total_bytes_released); } @@ -81,7 +82,8 @@ void MemPool::free_all() { ChunkAllocator::instance()->free(chunk.chunk); } THREAD_MEM_TRACKER_TRANSFER_FROM(total_bytes_released - peak_allocated_bytes_, - ExecEnv::GetInstance()->process_mem_tracker_raw()); + ExecEnv::GetInstance()->orphan_mem_tracker_raw()); + chunks_.clear(); next_chunk_size_ = INITIAL_CHUNK_SIZE; current_chunk_idx_ = -1; @@ -146,7 +148,7 @@ bool MemPool::find_chunk(size_t min_size, bool check_limits) { mem_tracker_->Release(chunk_size); return false; } - THREAD_MEM_TRACKER_TRANSFER_TO(chunk_size, ExecEnv::GetInstance()->process_mem_tracker_raw()); + THREAD_MEM_TRACKER_TRANSFER_TO(chunk_size, ExecEnv::GetInstance()->orphan_mem_tracker_raw()); ASAN_POISON_MEMORY_REGION(chunk.data, chunk_size); // Put it before the first free chunk. If no free chunks, it goes at the end. if (first_free_idx == static_cast(chunks_.size())) { diff --git a/be/src/runtime/mem_pool.h b/be/src/runtime/mem_pool.h index 4c93e8568fa793..8eee3334ce5102 100644 --- a/be/src/runtime/mem_pool.h +++ b/be/src/runtime/mem_pool.h @@ -210,7 +210,7 @@ class MemPool { void reset_peak() { if (total_allocated_bytes_ - peak_allocated_bytes_ > 65536) { THREAD_MEM_TRACKER_TRANSFER_FROM(total_allocated_bytes_ - peak_allocated_bytes_, - ExecEnv::GetInstance()->process_mem_tracker_raw()); + ExecEnv::GetInstance()->orphan_mem_tracker_raw()); peak_allocated_bytes_ = total_allocated_bytes_; } } diff --git a/be/src/runtime/memory/mem_tracker.cpp b/be/src/runtime/memory/mem_tracker.cpp index 07ca883a941c95..bae8474cf4e40c 100644 --- a/be/src/runtime/memory/mem_tracker.cpp +++ b/be/src/runtime/memory/mem_tracker.cpp @@ -58,12 +58,14 @@ NewMemTracker::NewMemTracker(const std::string& label, RuntimeProfile* profile) _consumption = profile->AddSharedHighWaterMarkCounter(COUNTER_NAME, TUnit::BYTES); } - DCHECK(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker() != nullptr); - _label = fmt::format( - "{} | {}", label, - thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->label()); - _bind_group_num = - thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->group_num(); + DCHECK(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw() != nullptr); + MemTrackerLimiter* parent = + thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw(); + _label = fmt::format("[Observer] {} | {}", label, + parent->label() == "Orphan" ? "Process" : parent->label()); + _bind_group_num = parent->label() == "Orphan" + ? ExecEnv::GetInstance()->new_process_mem_tracker()->group_num() + : parent->group_num(); { std::lock_guard l(mem_tracker_pool[_bind_group_num].group_lock); _tracker_group_it = mem_tracker_pool[_bind_group_num].trackers.insert( @@ -112,4 +114,4 @@ std::string NewMemTracker::log_usage(NewMemTracker::Snapshot snapshot) { snapshot.peak_consumption); } -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp index 0494657396f0e5..28ce5389b79bf1 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.cpp +++ b/be/src/runtime/memory/mem_tracker_limiter.cpp @@ -41,7 +41,14 @@ MemTrackerLimiter::MemTrackerLimiter(int64_t byte_limit, const std::string& labe _label = label; _limit = byte_limit; _group_num = GetCurrentTimeMicros() % 1000; - _parent = parent ? parent : thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(); + if (parent || label == "Process") { + _parent = parent; + } else if (thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->label() == + "Orphan") { + _parent = ExecEnv::GetInstance()->new_process_mem_tracker(); + } else { + _parent = thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker(); + } DCHECK(_parent || label == "Process"); // Walks the MemTrackerLimiter hierarchy and populates _all_ancestors and _limited_ancestors @@ -69,6 +76,18 @@ MemTrackerLimiter::~MemTrackerLimiter() { if (_label == "Process") doris::thread_context_ptr._init = false; DCHECK(remain_child_count() == 0 || _label == "Process"); consume(_untracked_mem.exchange(0)); +#ifndef BE_TEST + // In order to ensure `consumption of all limiter trackers` + `orphan tracker consumption` = `process tracker consumption` + // in real time. Merge its consumption into orphan when all third level limiter trackers are destructed, to avoid repetition. + // the first layer: process; + // the second layer: a tracker that will not be destructed globally (query/load pool, load channel mgr, etc.); + // the third layer: a query/load/compaction task generates a tracker (query tracker, load channel tracker, etc.). + if (_parent->parent()->label() == "Process") { + ExecEnv::GetInstance()->orphan_mem_tracker_raw()->cache_consume_local( + _consumption->current_value()); + } +#endif + if (_parent) { std::lock_guard l(_parent->_child_tracker_limiter_lock); if (_child_tracker_it != _parent->_child_tracker_limiters.end()) { @@ -258,7 +277,7 @@ std::string MemTrackerLimiter::mem_limit_exceeded(const std::string& msg, // The limit of the current tracker and parents is less than 0, the consume will not fail, // and the current process memory has no excess limit. detail += fmt::format("unknown exceed reason, executing msg:<{}>", msg); - print_log_usage_tracker = ExecEnv::GetInstance()->process_mem_tracker_raw(); + print_log_usage_tracker = ExecEnv::GetInstance()->new_process_mem_tracker().get(); } auto failed_msg = MemTrackerLimiter::limit_exceeded_errmsg_suffix_str(detail); if (print_log_usage_tracker != nullptr) print_log_usage_tracker->print_log_usage(failed_msg); diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index 0886279c6ea33e..7f294ef9982bc5 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -217,7 +217,7 @@ class MemTrackerLimiter final : public NewMemTracker { "alloc size {}", PerfCounters::get_vm_rss_str(), MemInfo::allocator_cache_mem_str(), MemInfo::mem_limit_str(), print_bytes(bytes)); - ExecEnv::GetInstance()->process_mem_tracker_raw()->print_log_usage(err_msg); + ExecEnv::GetInstance()->new_process_mem_tracker()->print_log_usage(err_msg); return err_msg; } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp index 40879b6c4a3dd9..3cb97c6ff8474f 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp @@ -29,24 +29,25 @@ void ThreadMemTrackerMgr::attach_limiter_tracker( const std::shared_ptr& mem_tracker) { DCHECK(mem_tracker); flush_untracked_mem(); - _task_id = task_id; - _fragment_instance_id = fragment_instance_id; - _limiter_tracker = mem_tracker; + _task_id_stack.push_back(task_id); + _fragment_instance_id_stack.push_back(fragment_instance_id); + _limiter_tracker_stack.push_back(mem_tracker); _limiter_tracker_raw = mem_tracker.get(); } void ThreadMemTrackerMgr::detach_limiter_tracker() { + DCHECK(!_limiter_tracker_stack.empty()); flush_untracked_mem(); - _task_id = ""; - _fragment_instance_id = TUniqueId(); - _limiter_tracker = ExecEnv::GetInstance()->new_process_mem_tracker(); - _limiter_tracker_raw = ExecEnv::GetInstance()->process_mem_tracker_raw(); + _task_id_stack.pop_back(); + _fragment_instance_id_stack.pop_back(); + _limiter_tracker_stack.pop_back(); + _limiter_tracker_raw = _limiter_tracker_stack.back().get(); } void ThreadMemTrackerMgr::exceeded_cancel_task(const std::string& cancel_details) { - if (_fragment_instance_id != TUniqueId()) { + if (_fragment_instance_id_stack.back() != TUniqueId()) { ExecEnv::GetInstance()->fragment_mgr()->cancel( - _fragment_instance_id, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, + _fragment_instance_id_stack.back(), PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED, cancel_details); } } diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h index 3f652ecd5a9fa4..2736ff9630ff81 100644 --- a/be/src/runtime/memory/thread_mem_tracker_mgr.h +++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h @@ -50,7 +50,7 @@ class ThreadMemTrackerMgr { // only for tcmalloc hook static void consume_no_attach(int64_t size) { - ExecEnv::GetInstance()->process_mem_tracker_raw()->consume(size); + ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume(size); } // After thread initialization, calling `init` again must call `clear_untracked_mems` first @@ -78,9 +78,11 @@ class ThreadMemTrackerMgr { template void flush_untracked_mem(); - bool is_attach_query() { return _fragment_instance_id != TUniqueId(); } + bool is_attach_query() { return _fragment_instance_id_stack.back() != TUniqueId(); } - std::shared_ptr limiter_mem_tracker() { return _limiter_tracker; } + std::shared_ptr limiter_mem_tracker() { + return _limiter_tracker_stack.back(); + } MemTrackerLimiter* limiter_mem_tracker_raw() { return _limiter_tracker_raw; } void set_check_limit(bool check_limit) { _check_limit = check_limit; } @@ -95,8 +97,8 @@ class ThreadMemTrackerMgr { return fmt::format( "ThreadMemTrackerMgr debug, _untracked_mem:{}, _task_id:{}, " "_limiter_tracker:<{}>, _consumer_tracker_stack:<{}>", - std::to_string(_untracked_mem), _task_id, _limiter_tracker->log_usage(1), - fmt::to_string(consumer_tracker_buf)); + std::to_string(_untracked_mem), _task_id_stack.back(), + _limiter_tracker_raw->log_usage(1), fmt::to_string(consumer_tracker_buf)); } private: @@ -112,25 +114,32 @@ class ThreadMemTrackerMgr { int64_t old_untracked_mem = 0; std::string failed_msg = std::string(); - std::shared_ptr _limiter_tracker; - std::vector _consumer_tracker_stack; + // _limiter_tracker_stack[0] = orphan_mem_tracker + std::vector> _limiter_tracker_stack; MemTrackerLimiter* _limiter_tracker_raw; + std::vector _consumer_tracker_stack; // If true, call memtracker try_consume, otherwise call consume. bool _check_limit = false; // If there is a memory new/delete operation in the consume method, it may enter infinite recursion. bool _stop_consume = false; bool _check_attach = true; - std::string _task_id; - TUniqueId _fragment_instance_id; + std::vector _task_id_stack; + std::vector _fragment_instance_id_stack; ExceedCallBack _cb_func = nullptr; }; inline void ThreadMemTrackerMgr::init() { DCHECK(_consumer_tracker_stack.empty()); - _task_id = ""; - _limiter_tracker = ExecEnv::GetInstance()->new_process_mem_tracker(); - _limiter_tracker_raw = ExecEnv::GetInstance()->process_mem_tracker_raw(); + // _limiter_tracker_stack[0] = orphan_mem_tracker + DCHECK(_limiter_tracker_stack.size() <= 1) + << "limiter_tracker_stack.size(): " << _limiter_tracker_stack.size(); + if (_limiter_tracker_stack.size() == 0) { + _limiter_tracker_stack.push_back(ExecEnv::GetInstance()->orphan_mem_tracker()); + _limiter_tracker_raw = ExecEnv::GetInstance()->orphan_mem_tracker_raw(); + _task_id_stack.push_back(""); + _fragment_instance_id_stack.push_back(TUniqueId()); + } _check_limit = true; } @@ -170,7 +179,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() { // the TCMalloc Hook again, so suspend consumption to avoid falling into an infinite loop. _stop_consume = true; old_untracked_mem = _untracked_mem; - DCHECK(_limiter_tracker); + DCHECK(_limiter_tracker_raw); if (CheckLimit) { #ifndef BE_TEST // When all threads are started, `attach_limiter_tracker` is expected to be called to bind the limiter tracker. @@ -180,7 +189,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() { // TODO(zxy) The current p0 test cannot guarantee that all threads are checked, // so disable it and try to open it when memory tracking is not on time. // DCHECK(!_check_attach || btls_key != EMPTY_BTLS_KEY || - // _limiter_tracker->label() != "Process"); + // _limiter_tracker_raw->label() != "Process"); #endif if (!_limiter_tracker_raw->try_consume(old_untracked_mem, failed_msg)) { // The memory has been allocated, so when TryConsume fails, need to continue to complete diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 4036a99786977e..28c00ba48bc513 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -193,14 +193,13 @@ Status RuntimeState::init(const TUniqueId& fragment_instance_id, const TQueryOpt Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) { bool has_query_mem_tracker = _query_options.__isset.mem_limit && (_query_options.mem_limit > 0); int64_t bytes_limit = has_query_mem_tracker ? _query_options.mem_limit : -1; - if (bytes_limit > ExecEnv::GetInstance()->process_mem_tracker_raw()->limit()) { + if (bytes_limit > ExecEnv::GetInstance()->process_mem_tracker()->limit()) { VLOG_NOTICE << "Query memory limit " << PrettyPrinter::print(bytes_limit, TUnit::BYTES) << " exceeds process memory limit of " - << PrettyPrinter::print( - ExecEnv::GetInstance()->process_mem_tracker_raw()->limit(), - TUnit::BYTES) + << PrettyPrinter::print(ExecEnv::GetInstance()->process_mem_tracker()->limit(), + TUnit::BYTES) << ". Using process memory limit instead"; - bytes_limit = ExecEnv::GetInstance()->process_mem_tracker_raw()->limit(); + bytes_limit = ExecEnv::GetInstance()->process_mem_tracker()->limit(); } // we do not use global query-map for now, to avoid mem-exceeded different fragments diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index 96ef5742610285..a752088c851092 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -472,7 +472,7 @@ int main(int argc, char** argv) { doris::ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->logout_task_mem_tracker(); // The process tracker print log usage interval is 1s to avoid a large number of tasks being // canceled when the process exceeds the mem limit, resulting in too many duplicate logs. - doris::ExecEnv::GetInstance()->process_mem_tracker_raw()->enable_print_log_usage(); + doris::ExecEnv::GetInstance()->new_process_mem_tracker()->enable_print_log_usage(); sleep(1); } diff --git a/be/src/vec/common/pod_array.h b/be/src/vec/common/pod_array.h index 5deb16d106209e..9c8ee3f7f55cfa 100644 --- a/be/src/vec/common/pod_array.h +++ b/be/src/vec/common/pod_array.h @@ -115,7 +115,7 @@ class PODArrayBase : private boost::noncopyable, inline void reset_peak() { if (UNLIKELY(c_end - c_end_peak > 65536)) { THREAD_MEM_TRACKER_TRANSFER_FROM(c_end - c_end_peak, - ExecEnv::GetInstance()->process_mem_tracker_raw()); + ExecEnv::GetInstance()->orphan_mem_tracker_raw()); c_end_peak = c_end; } } @@ -127,7 +127,7 @@ class PODArrayBase : private boost::noncopyable, template void alloc(size_t bytes, TAllocatorParams&&... allocator_params) { THREAD_MEM_TRACKER_TRANSFER_TO(bytes - pad_right - pad_left, - ExecEnv::GetInstance()->process_mem_tracker_raw()); + ExecEnv::GetInstance()->orphan_mem_tracker_raw()); c_start = c_end = c_end_peak = reinterpret_cast(TAllocator::alloc( bytes, std::forward(allocator_params)...)) + @@ -144,7 +144,7 @@ class PODArrayBase : private boost::noncopyable, TAllocator::free(c_start - pad_left, allocated_bytes()); THREAD_MEM_TRACKER_TRANSFER_FROM(c_end_of_storage - c_end_peak, - ExecEnv::GetInstance()->process_mem_tracker_raw()); + ExecEnv::GetInstance()->orphan_mem_tracker_raw()); } template @@ -157,7 +157,7 @@ class PODArrayBase : private boost::noncopyable, unprotect(); THREAD_MEM_TRACKER_TRANSFER_TO(bytes - allocated_bytes(), - ExecEnv::GetInstance()->process_mem_tracker_raw()); + ExecEnv::GetInstance()->orphan_mem_tracker_raw()); ptrdiff_t end_diff = c_end - c_start;