diff --git a/be/src/runtime/memory/mem_tracker.cpp b/be/src/runtime/memory/mem_tracker.cpp index 31d40563d45cd6..02145922b4846a 100644 --- a/be/src/runtime/memory/mem_tracker.cpp +++ b/be/src/runtime/memory/mem_tracker.cpp @@ -23,7 +23,6 @@ #include #include "runtime/thread_context.h" -#include "util/pretty_printer.h" #include "util/string_util.h" #include "util/time.h" @@ -104,10 +103,9 @@ void NewMemTracker::make_group_snapshot(std::vector* sn } std::string NewMemTracker::log_usage(NewMemTracker::Snapshot snapshot) { - return fmt::format( - "MemTracker Label={}, Parent Label={}, Used={}({} B), Peak={}({} B)", snapshot.label, - snapshot.parent, PrettyPrinter::print(snapshot.cur_consumption, TUnit::BYTES), - snapshot.cur_consumption, PrettyPrinter::print(snapshot.peak_consumption, TUnit::BYTES), + return fmt::format("MemTracker Label={}, Parent Label={}, Used={}({} B), Peak={}({} B)", + snapshot.label, snapshot.parent, print_bytes(snapshot.cur_consumption), + snapshot.cur_consumption, print_bytes(snapshot.peak_consumption), snapshot.peak_consumption); } diff --git a/be/src/runtime/memory/mem_tracker.h b/be/src/runtime/memory/mem_tracker.h index 258f244aaea351..aec490c808a6a5 100644 --- a/be/src/runtime/memory/mem_tracker.h +++ b/be/src/runtime/memory/mem_tracker.h @@ -19,6 +19,7 @@ // and modified by Doris #pragma once +#include "util/pretty_printer.h" #include "util/runtime_profile.h" namespace doris { @@ -50,6 +51,11 @@ class NewMemTracker { ~NewMemTracker(); + static std::string print_bytes(int64_t bytes) { + return bytes >= 0 ? PrettyPrinter::print(bytes, TUnit::BYTES) + : "-" + PrettyPrinter::print(std::abs(bytes), TUnit::BYTES); + } + public: const std::string& label() const { return _label; } // Returns the memory consumed in bytes. diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h index e6126c15a66659..73aaa8e50000e1 100644 --- a/be/src/runtime/memory/mem_tracker_limiter.h +++ b/be/src/runtime/memory/mem_tracker_limiter.h @@ -76,9 +76,7 @@ class MemTrackerLimiter final : public NewMemTracker { // but it may not actually alloc physical memory, which is not expected in mem hook fail. // // TODO: In order to ensure no OOM, currently reserve 200M, and then use the free mem in /proc/meminfo to ensure no OOM. - if (PerfCounters::get_vm_rss() - static_cast(MemInfo::allocator_cache_mem()) + - bytes >= - MemInfo::mem_limit() || + if (MemInfo::proc_mem_no_allocator_cache() + bytes >= MemInfo::mem_limit() || PerfCounters::get_vm_rss() + bytes >= MemInfo::hard_mem_limit()) { if (config::enable_proc_meminfo_cancel_query) { return true; @@ -172,10 +170,6 @@ class MemTrackerLimiter final : public NewMemTracker { return msg.str(); } - static std::string print_bytes(int64_t bytes) { - return PrettyPrinter::print(bytes, TUnit::BYTES); - } - private: // The following func, for automatic memory tracking and limiting based on system memory allocation. friend class ThreadMemTrackerMgr; diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp b/be/src/runtime/memory/mem_tracker_task_pool.cpp index c4f9ba2dd7ccda..2b294f0223b68e 100644 --- a/be/src/runtime/memory/mem_tracker_task_pool.cpp +++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp @@ -20,6 +20,7 @@ #include "common/config.h" #include "runtime/exec_env.h" #include "util/pretty_printer.h" +#include "runtime/memory/mem_tracker.h" namespace doris { @@ -100,9 +101,9 @@ void MemTrackerTaskPool::logout_task_mem_tracker() { LOG(INFO) << fmt::format( "Deregister query/load memory tracker, queryId={}, Limit={}, CurrUsed={}, " "PeakUsed={}", - it->first, PrettyPrinter::print(it->second->limit(), TUnit::BYTES), - PrettyPrinter::print(it->second->consumption(), TUnit::BYTES), - PrettyPrinter::print(it->second->peak_consumption(), TUnit::BYTES)); + it->first, NewMemTracker::print_bytes(it->second->limit()), + NewMemTracker::print_bytes(it->second->consumption()), + NewMemTracker::print_bytes(it->second->peak_consumption())); expired_task_ids.emplace_back(it->first); } } diff --git a/be/src/runtime/memory/mem_tracker_task_pool.h b/be/src/runtime/memory/mem_tracker_task_pool.h index f8c5039eab00bc..4c21c4c40e24de 100644 --- a/be/src/runtime/memory/mem_tracker_task_pool.h +++ b/be/src/runtime/memory/mem_tracker_task_pool.h @@ -23,6 +23,8 @@ namespace doris { +// TODO: phmap `parallel_flat_hash_map` is not thread-safe. If it is not fixed in the future, +// can consider using other maps instead. using TaskTrackersMap = phmap::parallel_flat_hash_map< std::string, std::shared_ptr, phmap::priv::hash_default_hash, phmap::priv::hash_default_eq, diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h index b6143264a8a1d0..164dee1d3e13f4 100644 --- a/be/src/runtime/thread_context.h +++ b/be/src/runtime/thread_context.h @@ -188,19 +188,8 @@ static void attach_bthread() { #endif // Create thread-local data on demand. bthread_context = new ThreadContext; - std::shared_ptr btls_tracker = - std::make_shared(-1, "Bthread:id=" + std::to_string(bthread_id), - ExecEnv::GetInstance()->bthread_mem_tracker()); - bthread_context->attach_task(ThreadContext::TaskType::BRPC, "", TUniqueId(), btls_tracker); // set the data so that next time bthread_getspecific in the thread returns the data. CHECK_EQ(0, bthread_setspecific(btls_key, bthread_context)); - } else { - // two scenarios: - // 1. A new bthread starts, but get a reuses btls. - // 2. A pthread switch occurs. Because the pthread switch cannot be accurately identified at the moment. - // So tracker call reset 0 like reuses btls. - DCHECK(bthread_context->_thread_mem_tracker_mgr->get_attach_layers() == 2); - bthread_context->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->reset_zero(); } } diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index a752088c851092..bd5fe3aa93104b 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -456,11 +456,10 @@ int main(int argc, char** argv) { #if defined(LEAK_SANITIZER) __lsan_do_leak_check(); #endif - + doris::PerfCounters::refresh_proc_status(); #if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) doris::MemInfo::refresh_allocator_mem(); #endif - doris::PerfCounters::refresh_proc_status(); int64_t allocator_cache_mem_diff = doris::MemInfo::allocator_cache_mem() - doris::ExecEnv::GetInstance()->allocator_cache_mem_tracker()->consumption(); diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index b7bc9a179b6077..38f12b51809a04 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -120,16 +120,14 @@ void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController* cn const Status& extract_st) { std::string query_id; TUniqueId finst_id; - std::shared_ptr transmit_tracker; + std::shared_ptr transmit_tracker = nullptr; if (request->has_query_id()) { query_id = print_id(request->query_id()); finst_id.__set_hi(request->finst_id().hi()); finst_id.__set_lo(request->finst_id().lo()); - // In some cases, query mem tracker does not exist in BE when transmit block, will get null pointer. - transmit_tracker = std::make_shared( - -1, fmt::format("QueryTransmit#queryId={}", query_id), - _exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id)); - } else { + transmit_tracker = _exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id); + } + if (!transmit_tracker) { query_id = "unkown_transmit_data"; transmit_tracker = std::make_shared(-1, "unkown_transmit_data"); } @@ -595,16 +593,16 @@ void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* c const Status& extract_st) { std::string query_id; TUniqueId finst_id; - std::shared_ptr transmit_tracker; + std::shared_ptr transmit_tracker = nullptr; if (request->has_query_id()) { query_id = print_id(request->query_id()); finst_id.__set_hi(request->finst_id().hi()); finst_id.__set_lo(request->finst_id().lo()); - // In some cases, query mem tracker does not exist in BE when transmit block, will get null pointer. - transmit_tracker = std::make_shared( - -1, fmt::format("QueryTransmit#queryId={}", query_id), - _exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id)); - } else { + // phmap `parallel_flat_hash_map` is not thread safe, so get query mem tracker may be null pointer. + transmit_tracker = + _exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id); + } + if (!transmit_tracker) { query_id = "unkown_transmit_block"; transmit_tracker = std::make_shared(-1, "unkown_transmit_block"); } diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index 674c053136d05d..438fab038cb25e 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -48,6 +48,7 @@ size_t MemInfo::_s_tcmalloc_thread_bytes = 0; size_t MemInfo::_s_allocator_cache_mem = 0; std::string MemInfo::_s_allocator_cache_mem_str = ""; size_t MemInfo::_s_virtual_memory_used = 0; +int64_t MemInfo::_s_proc_mem_no_allocator_cache = -1; void MemInfo::init() { // Read from /proc/meminfo @@ -96,7 +97,7 @@ void MemInfo::init() { bool is_percent = true; _s_mem_limit = ParseUtil::parse_mem_spec(config::mem_limit, -1, _s_physical_mem, &is_percent); _s_mem_limit_str = PrettyPrinter::print(_s_mem_limit, TUnit::BYTES); - _s_hard_mem_limit = _s_physical_mem - std::min(209715200L, _s_physical_mem / 10); // 200M + _s_hard_mem_limit = _s_physical_mem - std::max(209715200L, _s_physical_mem / 10); // 200M LOG(INFO) << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES) << ", /proc/meminfo/MemTotal: " << line; _s_initialized = true; diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h index 637448c7c0dc68..e73d07ce1fdb72 100644 --- a/be/src/util/mem_info.h +++ b/be/src/util/mem_info.h @@ -23,6 +23,7 @@ #include #include "common/logging.h" +#include "util/perf_counters.h" #include "util/pretty_printer.h" namespace doris { @@ -45,6 +46,7 @@ class MemInfo { static inline size_t allocator_virtual_mem() { return _s_virtual_memory_used; } static inline size_t allocator_cache_mem() { return _s_allocator_cache_mem; } static inline std::string allocator_cache_mem_str() { return _s_allocator_cache_mem_str; } + static inline int64_t proc_mem_no_allocator_cache() { return _s_proc_mem_no_allocator_cache; } // Tcmalloc property `generic.total_physical_bytes` records the total length of the virtual memory // obtained by the process malloc, not the physical memory actually used by the process in the OS. @@ -65,6 +67,8 @@ class MemInfo { _s_tcmalloc_transfer_bytes + _s_tcmalloc_thread_bytes; _s_allocator_cache_mem_str = PrettyPrinter::print(_s_allocator_cache_mem, TUnit::BYTES); _s_virtual_memory_used = _s_allocator_physical_mem + _s_pageheap_unmapped_bytes; + _s_proc_mem_no_allocator_cache = + PerfCounters::get_vm_rss() - static_cast(_s_allocator_cache_mem); } static inline int64_t mem_limit() { @@ -100,6 +104,7 @@ class MemInfo { static size_t _s_allocator_cache_mem; static std::string _s_allocator_cache_mem_str; static size_t _s_virtual_memory_used; + static int64_t _s_proc_mem_no_allocator_cache; }; } // namespace doris diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp index 14630c25339cb2..eaa99ffd1421f5 100644 --- a/be/src/vec/runtime/vdata_stream_recvr.cpp +++ b/be/src/vec/runtime/vdata_stream_recvr.cpp @@ -53,7 +53,10 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block** next_block) { } // _cur_batch must be replaced with the returned batch. - _current_block.reset(); + { + SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker()); + _current_block.reset(); + } *next_block = nullptr; if (_is_cancelled) { return Status::Cancelled("Cancelled");