Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 14 additions & 10 deletions be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,24 @@ MemTrackerLimiter::~MemTrackerLimiter() {
// TCMalloc hook will be triggered during destructor memtracker, may cause crash.
if (_label == "Process") doris::thread_context_ptr._init = false;
DCHECK(remain_child_count() == 0 || _label == "Process");
consume(_untracked_mem.exchange(0));
#ifndef BE_TEST
// In order to ensure `consumption of all limiter trackers` + `orphan tracker consumption` = `process tracker consumption`
// in real time. Merge its consumption into orphan when all third level limiter trackers are destructed, to avoid repetition.
// the first layer: process;
// the second layer: a tracker that will not be destructed globally (query/load pool, load channel mgr, etc.);
// the third layer: a query/load/compaction task generates a tracker (query tracker, load channel tracker, etc.).
if ((_parent && _parent->label() == "Process") ||
(_parent->parent() && _parent->parent()->label() == "Process")) {
// in real time. Merge its consumption into orphan when parent is process, to avoid repetition.
if ((_parent && _parent->label() == "Process")) {
ExecEnv::GetInstance()->orphan_mem_tracker_raw()->cache_consume_local(
_consumption->current_value());
}
#endif
if (_reset_zero) cache_consume_local(-_consumption->current_value());
if (_reset_zero) {
ExecEnv::GetInstance()->orphan_mem_tracker_raw()->cache_consume_local(
_consumption->current_value());
cache_consume_local(-_consumption->current_value());
_all_ancestors.clear();
_all_ancestors.push_back(ExecEnv::GetInstance()->orphan_mem_tracker_raw());
}
for (auto& tracker : _all_ancestors) {
if (tracker->label() != "Process") {
tracker->_consumption->add(_untracked_mem);
}
}
if (_parent) {
std::lock_guard<std::mutex> l(_parent->_child_tracker_limiter_lock);
if (_child_tracker_it != _parent->_child_tracker_limiters.end()) {
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/memory/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class ThreadMemTrackerMgr {
~ThreadMemTrackerMgr() {
flush_untracked_mem<false>();
DCHECK(_consumer_tracker_stack.empty());
DCHECK(_limiter_tracker_stack.size() == 1);
}

// only for tcmalloc hook
Expand Down