Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ class ExecEnv {
// declarations for classes in scoped_ptrs.
~ExecEnv();

const bool initialized() { return _is_init; }
const std::string& token() const;
ExternalScanContextMgr* external_scan_context_mgr() { return _external_scan_context_mgr; }
DataStreamMgr* stream_mgr() { return _stream_mgr; }
Expand All @@ -123,16 +124,19 @@ class ExecEnv {

std::shared_ptr<MemTrackerLimiter> new_process_mem_tracker() { return _process_mem_tracker; }
void set_global_mem_tracker(const std::shared_ptr<MemTrackerLimiter>& process_tracker,
const std::shared_ptr<MemTrackerLimiter>& orphan_tracker) {
const std::shared_ptr<MemTrackerLimiter>& orphan_tracker,
const std::shared_ptr<MemTrackerLimiter>& bthread_mem_tracker) {
_process_mem_tracker = process_tracker;
_orphan_mem_tracker = orphan_tracker;
_orphan_mem_tracker_raw = orphan_tracker.get();
_bthread_mem_tracker = bthread_mem_tracker;
}
std::shared_ptr<NewMemTracker> allocator_cache_mem_tracker() {
return _allocator_cache_mem_tracker;
}
std::shared_ptr<MemTrackerLimiter> orphan_mem_tracker() { return _orphan_mem_tracker; }
MemTrackerLimiter* orphan_mem_tracker_raw() { return _orphan_mem_tracker_raw; }
std::shared_ptr<MemTrackerLimiter> bthread_mem_tracker() { return _bthread_mem_tracker; }
std::shared_ptr<MemTrackerLimiter> query_pool_mem_tracker() { return _query_pool_mem_tracker; }
std::shared_ptr<MemTrackerLimiter> load_pool_mem_tracker() { return _load_pool_mem_tracker; }
MemTrackerTaskPool* task_pool_mem_tracker_registry() { return _task_pool_mem_tracker_registry; }
Expand Down Expand Up @@ -218,6 +222,8 @@ class ExecEnv {
// and the consumption of the orphan mem tracker is close to 0, but greater than 0.
std::shared_ptr<MemTrackerLimiter> _orphan_mem_tracker;
MemTrackerLimiter* _orphan_mem_tracker_raw;
// Bthread default mem tracker
std::shared_ptr<MemTrackerLimiter> _bthread_mem_tracker;
// The ancestor for all querys tracker.
std::shared_ptr<MemTrackerLimiter> _query_pool_mem_tracker;
// The ancestor for all load tracker.
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/exec_env_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ Status ExecEnv::_init_mem_tracker() {
std::make_shared<MemTrackerLimiter>(global_memory_limit_bytes, "Process");
_orphan_mem_tracker = std::make_shared<MemTrackerLimiter>(-1, "Orphan", _process_mem_tracker);
_orphan_mem_tracker_raw = _orphan_mem_tracker.get();
_bthread_mem_tracker = std::make_shared<MemTrackerLimiter>(-1, "Bthread", _orphan_mem_tracker);
thread_context()->_thread_mem_tracker_mgr->init();
thread_context()->_thread_mem_tracker_mgr->set_check_attach(false);
#if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && \
Expand Down
6 changes: 2 additions & 4 deletions be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ MemTrackerLimiter::MemTrackerLimiter(int64_t byte_limit, const std::string& labe
MemTrackerLimiter::~MemTrackerLimiter() {
// TCMalloc hook will be triggered during destructor memtracker, may cause crash.
#ifndef BE_TEST
if (_label == "Process") doris::thread_context_ptr._init = false;
if (_label == "Process") doris::thread_context_ptr.init = false;
#endif
DCHECK(remain_child_count() == 0 || _label == "Process");
// In order to ensure `consumption of all limiter trackers` + `orphan tracker consumption` = `process tracker consumption`
Expand All @@ -84,9 +84,7 @@ MemTrackerLimiter::~MemTrackerLimiter() {
_consumption->current_value());
}
if (_reset_zero) {
ExecEnv::GetInstance()->orphan_mem_tracker_raw()->cache_consume_local(
_consumption->current_value());
cache_consume_local(-_consumption->current_value());
reset_zero();
_all_ancestors.clear();
_all_ancestors.push_back(ExecEnv::GetInstance()->orphan_mem_tracker_raw());
}
Expand Down
6 changes: 6 additions & 0 deletions be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ class MemTrackerLimiter final : public NewMemTracker {
void enable_print_log_usage() { _print_log_usage = true; }
void enable_reset_zero() { _reset_zero = true; }

void reset_zero() {
ExecEnv::GetInstance()->orphan_mem_tracker_raw()->cache_consume_local(
_consumption->current_value());
cache_consume_local(-_consumption->current_value());
}

// Logs the usage of this tracker limiter and optionally its children (recursively).
// If 'logged_consumption' is non-nullptr, sets the consumption value logged.
// 'max_recursive_depth' specifies the maximum number of levels of children
Expand Down
21 changes: 2 additions & 19 deletions be/src/runtime/memory/tcmalloc_hook.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,28 +36,11 @@
// destructor to control the behavior of consume can lead to unexpected behavior,
// like this: if (LIKELY(doris::start_thread_mem_tracker)) {
void new_hook(const void* ptr, size_t size) {
if (doris::btls_key != doris::EMPTY_BTLS_KEY && doris::bthread_context != nullptr) {
// Currently in bthread, consume thread context mem tracker in bthread tls.
doris::update_bthread_context();
doris::bthread_context->_thread_mem_tracker_mgr->consume(tc_nallocx(size, 0));
} else if (doris::thread_context_ptr._init) {
doris::thread_context_ptr._ptr->_thread_mem_tracker_mgr->consume(tc_nallocx(size, 0));
} else {
doris::ThreadMemTrackerMgr::consume_no_attach(tc_nallocx(size, 0));
}
MEM_MALLOC_HOOK(tc_nallocx(size, 0));
}

void delete_hook(const void* ptr) {
if (doris::btls_key != doris::EMPTY_BTLS_KEY && doris::bthread_context != nullptr) {
doris::update_bthread_context();
doris::bthread_context->_thread_mem_tracker_mgr->consume(
-tc_malloc_size(const_cast<void*>(ptr)));
} else if (doris::thread_context_ptr._init) {
doris::thread_context_ptr._ptr->_thread_mem_tracker_mgr->consume(
-tc_malloc_size(const_cast<void*>(ptr)));
} else {
doris::ThreadMemTrackerMgr::consume_no_attach(-tc_malloc_size(const_cast<void*>(ptr)));
}
MEM_FREE_HOOK(tc_malloc_size(const_cast<void*>(ptr)));
}

void init_hook() {
Expand Down
71 changes: 52 additions & 19 deletions be/src/runtime/memory/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,25 +44,37 @@ class ThreadMemTrackerMgr {
ThreadMemTrackerMgr() {}

~ThreadMemTrackerMgr() {
flush_untracked_mem<false>();
DCHECK(_consumer_tracker_stack.empty());
DCHECK(_limiter_tracker_stack.size() == 1);
// if _init == false, exec env is not initialized when init(). and never consumed mem tracker once.
if (_init) {
flush_untracked_mem<false>();
if (bthread_self() == 0) {
DCHECK(_consumer_tracker_stack.empty());
DCHECK(_limiter_tracker_stack.size() == 1)
<< ", limiter_tracker_stack.size(): " << _limiter_tracker_stack.size();
}
}
}

// only for tcmalloc hook
static void consume_no_attach(int64_t size) {
ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume(size);
if (ExecEnv::GetInstance()->initialized()) {
ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume(size);
}
}

// After thread initialization, calling `init` again must call `clear_untracked_mems` first
// to avoid memory tracking loss.
void init();
void init_impl();
void clear();

// After attach, the current thread TCMalloc Hook starts to consume/release task mem_tracker
void attach_limiter_tracker(const std::string& task_id, const TUniqueId& fragment_instance_id,
const std::shared_ptr<MemTrackerLimiter>& mem_tracker);

void detach_limiter_tracker();
// Usually there are only two layers, the first is the default trackerOrphan;
// the second is the query tracker or bthread tracker.
int64_t get_attach_layers() { return _limiter_tracker_stack.size(); }

// Must be fast enough! Thread update_tracker may be called very frequently.
// So for performance, add tracker as early as possible, and then call update_tracker<Existed>.
Expand All @@ -82,9 +94,13 @@ class ThreadMemTrackerMgr {
bool is_attach_query() { return _fragment_instance_id_stack.back() != TUniqueId(); }

std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() {
if (!_init) init();
return _limiter_tracker_stack.back();
}
MemTrackerLimiter* limiter_mem_tracker_raw() { return _limiter_tracker_raw; }
MemTrackerLimiter* limiter_mem_tracker_raw() {
if (!_init) init();
return _limiter_tracker_raw;
}

void set_check_limit(bool check_limit) { _check_limit = check_limit; }
void set_check_attach(bool check_attach) { _check_attach = check_attach; }
Expand All @@ -109,6 +125,8 @@ class ThreadMemTrackerMgr {
void exceeded(const std::string& failed_msg);

private:
// is false: ExecEnv::GetInstance()->initialized() = false when thread local is initialized
bool _init = false;
// Cache untracked mem, only update to _untracked_mems when switching mem tracker.
// Frequent calls to unordered_map _untracked_mems[] in consume will degrade performance.
int64_t _untracked_mem = 0;
Expand All @@ -117,7 +135,7 @@ class ThreadMemTrackerMgr {

// _limiter_tracker_stack[0] = orphan_mem_tracker
std::vector<std::shared_ptr<MemTrackerLimiter>> _limiter_tracker_stack;
MemTrackerLimiter* _limiter_tracker_raw;
MemTrackerLimiter* _limiter_tracker_raw = nullptr;
std::vector<NewMemTracker*> _consumer_tracker_stack;

// If true, call memtracker try_consume, otherwise call consume.
Expand All @@ -131,26 +149,39 @@ class ThreadMemTrackerMgr {
};

inline void ThreadMemTrackerMgr::init() {
DCHECK(_limiter_tracker_stack.size() == 0);
DCHECK(_limiter_tracker_raw == nullptr);
DCHECK(_consumer_tracker_stack.empty());
// _limiter_tracker_stack[0] = orphan_mem_tracker
DCHECK(_limiter_tracker_stack.size() <= 1)
<< "limiter_tracker_stack.size(): " << _limiter_tracker_stack.size();
init_impl();
}

inline void ThreadMemTrackerMgr::init_impl() {
#ifdef BE_TEST
if (ExecEnv::GetInstance()->new_process_mem_tracker() == nullptr) {
std::shared_ptr<MemTrackerLimiter> process_mem_tracker =
std::make_shared<MemTrackerLimiter>(-1, "Process");
std::shared_ptr<MemTrackerLimiter> _orphan_mem_tracker =
std::shared_ptr<MemTrackerLimiter> orphan_mem_tracker =
std::make_shared<MemTrackerLimiter>(-1, "Orphan", process_mem_tracker);
ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, _orphan_mem_tracker);
std::shared_ptr<MemTrackerLimiter> bthread_mem_tracker =
std::make_shared<MemTrackerLimiter>(-1, "Bthread", orphan_mem_tracker);
ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, orphan_mem_tracker, bthread_mem_tracker);
}
#endif // BE_TEST
if (_limiter_tracker_stack.size() == 0) {
_limiter_tracker_stack.push_back(ExecEnv::GetInstance()->orphan_mem_tracker());
_limiter_tracker_raw = ExecEnv::GetInstance()->orphan_mem_tracker_raw();
_task_id_stack.push_back("");
_fragment_instance_id_stack.push_back(TUniqueId());
}
_limiter_tracker_stack.push_back(ExecEnv::GetInstance()->orphan_mem_tracker());
_limiter_tracker_raw = ExecEnv::GetInstance()->orphan_mem_tracker_raw();
_task_id_stack.push_back("");
_fragment_instance_id_stack.push_back(TUniqueId());
_check_limit = true;
_init = true;
}

inline void ThreadMemTrackerMgr::clear() {
flush_untracked_mem<false>();
std::vector<std::shared_ptr<MemTrackerLimiter>>().swap(_limiter_tracker_stack);
std::vector<NewMemTracker*>().swap(_consumer_tracker_stack);
std::vector<std::string>().swap(_task_id_stack);
std::vector<TUniqueId>().swap(_fragment_instance_id_stack);
init_impl();
}

inline void ThreadMemTrackerMgr::push_consumer_tracker(NewMemTracker* tracker) {
Expand All @@ -174,7 +205,7 @@ inline void ThreadMemTrackerMgr::consume(int64_t size) {
// it will cause tracker->consumption to be temporarily less than 0.
if ((_untracked_mem >= config::mem_tracker_consume_min_size_bytes ||
_untracked_mem <= -config::mem_tracker_consume_min_size_bytes) &&
!_stop_consume) {
!_stop_consume && ExecEnv::GetInstance()->initialized()) {
if (_check_limit) {
flush_untracked_mem<true>();
} else {
Expand All @@ -188,6 +219,8 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
// Temporary memory may be allocated during the consumption of the mem tracker, which will lead to entering
// the TCMalloc Hook again, so suspend consumption to avoid falling into an infinite loop.
_stop_consume = true;
if (!_init) init();
DCHECK(_limiter_tracker_raw);
old_untracked_mem = _untracked_mem;
DCHECK(_limiter_tracker_raw);
if (CheckLimit) {
Expand Down
49 changes: 11 additions & 38 deletions be/src/runtime/thread_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ DEFINE_STATIC_THREAD_LOCAL(ThreadContext, ThreadContextPtr, _ptr);

ThreadContextPtr::ThreadContextPtr() {
INIT_STATIC_THREAD_LOCAL(ThreadContext, _ptr);
_init = true;
init = true;
}

AttachTask::AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
Expand All @@ -39,10 +39,12 @@ AttachTask::AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
#else
if (ExecEnv::GetInstance()->new_process_mem_tracker() == nullptr) {
std::shared_ptr<MemTrackerLimiter> process_mem_tracker =
std::make_shared<MemTrackerLimiter>(-1, "Process");
std::shared_ptr<MemTrackerLimiter> _orphan_mem_tracker =
std::make_shared<MemTrackerLimiter>(-1, "Process");
std::shared_ptr<MemTrackerLimiter> orphan_mem_tracker =
std::make_shared<MemTrackerLimiter>(-1, "Orphan", process_mem_tracker);
ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, _orphan_mem_tracker);
std::shared_ptr<MemTrackerLimiter> bthread_mem_tracker =
std::make_shared<MemTrackerLimiter>(-1, "Bthread", orphan_mem_tracker);
ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, orphan_mem_tracker, bthread_mem_tracker);
}
thread_context()->attach_task(type, task_id, fragment_instance_id, ExecEnv::GetInstance()->orphan_mem_tracker());
#endif // BE_TEST
Expand All @@ -61,10 +63,12 @@ AttachTask::AttachTask(RuntimeState* runtime_state) {
#else
if (ExecEnv::GetInstance()->new_process_mem_tracker() == nullptr) {
std::shared_ptr<MemTrackerLimiter> process_mem_tracker =
std::make_shared<MemTrackerLimiter>(-1, "Process");
std::shared_ptr<MemTrackerLimiter> _orphan_mem_tracker =
std::make_shared<MemTrackerLimiter>(-1, "Process");
std::shared_ptr<MemTrackerLimiter> orphan_mem_tracker =
std::make_shared<MemTrackerLimiter>(-1, "Orphan", process_mem_tracker);
ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, _orphan_mem_tracker);
std::shared_ptr<MemTrackerLimiter> bthread_mem_tracker =
std::make_shared<MemTrackerLimiter>(-1, "Bthread", orphan_mem_tracker);
ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, orphan_mem_tracker, bthread_mem_tracker);
}
thread_context()->attach_task(ThreadContext::TaskType::QUERY, "", TUniqueId(), ExecEnv::GetInstance()->orphan_mem_tracker());
#endif // BE_TEST
Expand Down Expand Up @@ -92,35 +96,4 @@ AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() {
#endif // USE_MEM_TRACKER
}

SwitchBthread::SwitchBthread() {
#ifdef USE_MEM_TRACKER
_bthread_context = static_cast<ThreadContext*>(bthread_getspecific(btls_key));
// First call to bthread_getspecific (and before any bthread_setspecific) returns NULL
if (_bthread_context == nullptr) {
// Create thread-local data on demand.
_bthread_context = new ThreadContext;
// set the data so that next time bthread_getspecific in the thread returns the data.
CHECK_EQ(0, bthread_setspecific(btls_key, _bthread_context));
} else {
DCHECK(_bthread_context->type() == ThreadContext::TaskType::UNKNOWN);
_bthread_context->_thread_mem_tracker_mgr->flush_untracked_mem<false>();
}
_bthread_context->_thread_mem_tracker_mgr->init();
_bthread_context->set_type(ThreadContext::TaskType::BRPC);
bthread_context_key = btls_key;
bthread_context = _bthread_context;
#endif
}

SwitchBthread::~SwitchBthread() {
#ifdef USE_MEM_TRACKER
DCHECK(_bthread_context != nullptr);
_bthread_context->_thread_mem_tracker_mgr->flush_untracked_mem<false>();
_bthread_context->_thread_mem_tracker_mgr->init();
_bthread_context->set_type(ThreadContext::TaskType::UNKNOWN);
bthread_context = nullptr;
bthread_context_key = EMPTY_BTLS_KEY;
#endif // USE_MEM_TRACKER
}

} // namespace doris
Loading