Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,19 @@ class ExecEnv {

std::shared_ptr<MemTrackerLimiter> process_mem_tracker() { return _process_mem_tracker; }
void set_global_mem_tracker(const std::shared_ptr<MemTrackerLimiter>& process_tracker,
const std::shared_ptr<MemTrackerLimiter>& orphan_tracker) {
const std::shared_ptr<MemTrackerLimiter>& orphan_tracker,
const std::shared_ptr<MemTrackerLimiter>& bthread_mem_tracker) {
_process_mem_tracker = process_tracker;
_orphan_mem_tracker = orphan_tracker;
_orphan_mem_tracker_raw = orphan_tracker.get();
_bthread_mem_tracker = bthread_mem_tracker;
}
std::shared_ptr<MemTracker> allocator_cache_mem_tracker() {
return _allocator_cache_mem_tracker;
}
std::shared_ptr<MemTrackerLimiter> orphan_mem_tracker() { return _orphan_mem_tracker; }
MemTrackerLimiter* orphan_mem_tracker_raw() { return _orphan_mem_tracker_raw; }
std::shared_ptr<MemTrackerLimiter> bthread_mem_tracker() { return _bthread_mem_tracker; }
std::shared_ptr<MemTrackerLimiter> query_pool_mem_tracker() { return _query_pool_mem_tracker; }
std::shared_ptr<MemTrackerLimiter> load_pool_mem_tracker() { return _load_pool_mem_tracker; }
MemTrackerTaskPool* task_pool_mem_tracker_registry() { return _task_pool_mem_tracker_registry; }
Expand Down Expand Up @@ -227,6 +230,8 @@ class ExecEnv {
// and the consumption of the orphan mem tracker is close to 0, but greater than 0.
std::shared_ptr<MemTrackerLimiter> _orphan_mem_tracker;
MemTrackerLimiter* _orphan_mem_tracker_raw;
// Bthread default mem tracker
std::shared_ptr<MemTrackerLimiter> _bthread_mem_tracker;
// The ancestor for all querys tracker.
std::shared_ptr<MemTrackerLimiter> _query_pool_mem_tracker;
// The ancestor for all load tracker.
Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ Status ExecEnv::_init_mem_tracker() {
std::make_shared<MemTrackerLimiter>(global_memory_limit_bytes, "Process");
_orphan_mem_tracker = std::make_shared<MemTrackerLimiter>(-1, "Orphan", _process_mem_tracker);
_orphan_mem_tracker_raw = _orphan_mem_tracker.get();
thread_context()->_thread_mem_tracker_mgr->init_impl();
_bthread_mem_tracker = std::make_shared<MemTrackerLimiter>(-1, "Bthread", _orphan_mem_tracker);
thread_context()->_thread_mem_tracker_mgr->init();
thread_context()->_thread_mem_tracker_mgr->set_check_attach(false);
#if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && \
!defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) && !defined(USE_JEMALLOC)
Expand Down
7 changes: 2 additions & 5 deletions be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ MemTrackerLimiter::MemTrackerLimiter(int64_t byte_limit, const std::string& labe

MemTrackerLimiter::~MemTrackerLimiter() {
// TCMalloc hook will be triggered during destructor memtracker, may cause crash.
if (_label == "Process") doris::thread_context_ptr._init = false;
if (_label == "Process") doris::thread_context_ptr.init = false;
DCHECK(remain_child_count() == 0 || _label == "Process");
// In order to ensure `consumption of all limiter trackers` + `orphan tracker consumption` = `process tracker consumption`
// in real time. Merge its consumption into orphan when parent is process, to avoid repetition.
Expand All @@ -83,9 +83,7 @@ MemTrackerLimiter::~MemTrackerLimiter() {
_consumption->current_value());
}
if (_reset_zero) {
ExecEnv::GetInstance()->orphan_mem_tracker_raw()->cache_consume_local(
_consumption->current_value());
cache_consume_local(-_consumption->current_value());
reset_zero();
_all_ancestors.clear();
_all_ancestors.push_back(ExecEnv::GetInstance()->orphan_mem_tracker_raw());
}
Expand Down Expand Up @@ -212,7 +210,6 @@ std::string MemTrackerLimiter::log_usage(int max_recursive_depth,
}

void MemTrackerLimiter::print_log_usage(const std::string& msg) {
DCHECK(_limit != -1);
// only print the tracker log_usage in be log.
std::string detail = msg;
detail += "\n " + fmt::format(
Expand Down
6 changes: 6 additions & 0 deletions be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,12 @@ class MemTrackerLimiter final : public MemTracker {
void enable_print_log_usage() { _print_log_usage = true; }
void enable_reset_zero() { _reset_zero = true; }

void reset_zero() {
ExecEnv::GetInstance()->orphan_mem_tracker_raw()->cache_consume_local(
_consumption->current_value());
cache_consume_local(-_consumption->current_value());
}

// Logs the usage of this tracker limiter and optionally its children (recursively).
// If 'logged_consumption' is non-nullptr, sets the consumption value logged.
// 'max_recursive_depth' specifies the maximum number of levels of children
Expand Down
46 changes: 31 additions & 15 deletions be/src/runtime/memory/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,15 @@ class ThreadMemTrackerMgr {
ThreadMemTrackerMgr() {}

~ThreadMemTrackerMgr() {
flush_untracked_mem<false>();
DCHECK(_consumer_tracker_stack.empty());
DCHECK(_limiter_tracker_stack.size() == 1);
// if _init == false, exec env is not initialized when init(). and never consumed mem tracker once.
if (_init) {
flush_untracked_mem<false>();
if (bthread_self() == 0) {
DCHECK(_consumer_tracker_stack.empty());
DCHECK(_limiter_tracker_stack.size() == 1)
<< ", limiter_tracker_stack.size(): " << _limiter_tracker_stack.size();
}
}
}

// only for tcmalloc hook
Expand All @@ -60,12 +66,15 @@ class ThreadMemTrackerMgr {
// to avoid memory tracking loss.
void init();
void init_impl();
void clear();

// After attach, the current thread TCMalloc Hook starts to consume/release task mem_tracker
void attach_limiter_tracker(const std::string& task_id, const TUniqueId& fragment_instance_id,
const std::shared_ptr<MemTrackerLimiter>& mem_tracker);

void detach_limiter_tracker();
// Usually there are only two layers, the first is the default trackerOrphan;
// the second is the query tracker or bthread tracker.
int64_t get_attach_layers() { return _limiter_tracker_stack.size(); }

// Must be fast enough! Thread update_tracker may be called very frequently.
// So for performance, add tracker as early as possible, and then call update_tracker<Existed>.
Expand All @@ -88,11 +97,11 @@ class ThreadMemTrackerMgr {
bool is_attach_query() { return _fragment_instance_id_stack.back() != TUniqueId(); }

std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() {
if (_limiter_tracker_raw == nullptr) init_impl();
if (!_init) init();
return _limiter_tracker_stack.back();
}
MemTrackerLimiter* limiter_mem_tracker_raw() {
if (_limiter_tracker_raw == nullptr) init_impl();
if (!_init) init();
return _limiter_tracker_raw;
}

Expand All @@ -119,6 +128,8 @@ class ThreadMemTrackerMgr {
void exceeded(const std::string& failed_msg);

private:
// is false: ExecEnv::GetInstance()->initialized() = false when thread local is initialized
bool _init = false;
// Cache untracked mem, only update to _untracked_mems when switching mem tracker.
// Frequent calls to unordered_map _untracked_mems[] in consume will degrade performance.
int64_t _untracked_mem = 0;
Expand All @@ -141,23 +152,28 @@ class ThreadMemTrackerMgr {
};

inline void ThreadMemTrackerMgr::init() {
DCHECK(_limiter_tracker_stack.size() == 0);
DCHECK(_limiter_tracker_raw == nullptr);
DCHECK(_consumer_tracker_stack.empty());
// _limiter_tracker_stack[0] = orphan_mem_tracker
DCHECK(_limiter_tracker_stack.size() <= 1)
<< "limiter_tracker_stack.size(): " << _limiter_tracker_stack.size();
if (_limiter_tracker_raw == nullptr && ExecEnv::GetInstance()->initialized()) {
init_impl();
}
init_impl();
}

inline void ThreadMemTrackerMgr::init_impl() {
DCHECK(_limiter_tracker_stack.size() == 0);
DCHECK(_limiter_tracker_raw == nullptr);
_limiter_tracker_stack.push_back(ExecEnv::GetInstance()->orphan_mem_tracker());
_limiter_tracker_raw = ExecEnv::GetInstance()->orphan_mem_tracker_raw();
_task_id_stack.push_back("");
_fragment_instance_id_stack.push_back(TUniqueId());
_check_limit = true;
_init = true;
}

inline void ThreadMemTrackerMgr::clear() {
flush_untracked_mem<false>();
std::vector<std::shared_ptr<MemTrackerLimiter>>().swap(_limiter_tracker_stack);
std::vector<MemTracker*>().swap(_consumer_tracker_stack);
std::vector<std::string>().swap(_task_id_stack);
std::vector<TUniqueId>().swap(_fragment_instance_id_stack);
init_impl();
}

inline void ThreadMemTrackerMgr::push_consumer_tracker(MemTracker* tracker) {
Expand Down Expand Up @@ -196,7 +212,7 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
// Temporary memory may be allocated during the consumption of the mem tracker, which will lead to entering
// the TCMalloc Hook again, so suspend consumption to avoid falling into an infinite loop.
_stop_consume = true;
if (_limiter_tracker_raw == nullptr) init_impl();
if (!_init) init();
DCHECK(_limiter_tracker_raw);
old_untracked_mem = _untracked_mem;
if (CheckLimit) {
Expand Down
32 changes: 1 addition & 31 deletions be/src/runtime/thread_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ DEFINE_STATIC_THREAD_LOCAL(ThreadContext, ThreadContextPtr, _ptr);

ThreadContextPtr::ThreadContextPtr() {
INIT_STATIC_THREAD_LOCAL(ThreadContext, _ptr);
_init = true;
init = true;
}

AttachTask::AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
Expand Down Expand Up @@ -70,34 +70,4 @@ AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() {
}
}

SwitchBthread::SwitchBthread() {
_bthread_context = static_cast<ThreadContext*>(bthread_getspecific(btls_key));
// First call to bthread_getspecific (and before any bthread_setspecific) returns NULL
if (_bthread_context == nullptr) {
// Create thread-local data on demand.
_bthread_context = new ThreadContext;
// set the data so that next time bthread_getspecific in the thread returns the data.
CHECK_EQ(0, bthread_setspecific(btls_key, _bthread_context));
} else {
DCHECK(_bthread_context->type() == ThreadContext::TaskType::UNKNOWN);
_bthread_context->_thread_mem_tracker_mgr->flush_untracked_mem<false>();
}
_bthread_context->_thread_mem_tracker_mgr->init();
_bthread_context->set_type(ThreadContext::TaskType::BRPC);
bthread_context_key = btls_key;
bthread_context = _bthread_context;
}

SwitchBthread::~SwitchBthread() {
DCHECK(_bthread_context != nullptr);
_bthread_context->_thread_mem_tracker_mgr->flush_untracked_mem<false>();
_bthread_context->_thread_mem_tracker_mgr->init();
_bthread_context->set_type(ThreadContext::TaskType::UNKNOWN);
bthread_context = nullptr;
bthread_context_key = EMPTY_BTLS_KEY;
#ifndef NDEBUG
DorisMetrics::instance()->switch_bthread_count->increment(1);
#endif // NDEBUG
}

} // namespace doris
Loading