Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions be/src/runtime/memory/mem_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <fmt/format.h>

#include "runtime/thread_context.h"
#include "util/pretty_printer.h"
#include "util/string_util.h"
#include "util/time.h"

Expand Down Expand Up @@ -104,10 +103,9 @@ void NewMemTracker::make_group_snapshot(std::vector<NewMemTracker::Snapshot>* sn
}

std::string NewMemTracker::log_usage(NewMemTracker::Snapshot snapshot) {
return fmt::format(
"MemTracker Label={}, Parent Label={}, Used={}({} B), Peak={}({} B)", snapshot.label,
snapshot.parent, PrettyPrinter::print(snapshot.cur_consumption, TUnit::BYTES),
snapshot.cur_consumption, PrettyPrinter::print(snapshot.peak_consumption, TUnit::BYTES),
return fmt::format("MemTracker Label={}, Parent Label={}, Used={}({} B), Peak={}({} B)",
snapshot.label, snapshot.parent, print_bytes(snapshot.cur_consumption),
snapshot.cur_consumption, print_bytes(snapshot.peak_consumption),
snapshot.peak_consumption);
}

Expand Down
6 changes: 6 additions & 0 deletions be/src/runtime/memory/mem_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
// and modified by Doris
#pragma once

#include "util/pretty_printer.h"
#include "util/runtime_profile.h"

namespace doris {
Expand Down Expand Up @@ -50,6 +51,11 @@ class NewMemTracker {

~NewMemTracker();

static std::string print_bytes(int64_t bytes) {
return bytes >= 0 ? PrettyPrinter::print(bytes, TUnit::BYTES)
: "-" + PrettyPrinter::print(std::abs(bytes), TUnit::BYTES);
}

public:
const std::string& label() const { return _label; }
// Returns the memory consumed in bytes.
Expand Down
8 changes: 1 addition & 7 deletions be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,7 @@ class MemTrackerLimiter final : public NewMemTracker {
// but it may not actually alloc physical memory, which is not expected in mem hook fail.
//
// TODO: In order to ensure no OOM, currently reserve 200M, and then use the free mem in /proc/meminfo to ensure no OOM.
if (PerfCounters::get_vm_rss() - static_cast<int64_t>(MemInfo::allocator_cache_mem()) +
bytes >=
MemInfo::mem_limit() ||
if (MemInfo::proc_mem_no_allocator_cache() + bytes >= MemInfo::mem_limit() ||
PerfCounters::get_vm_rss() + bytes >= MemInfo::hard_mem_limit()) {
if (config::enable_proc_meminfo_cancel_query) {
return true;
Expand Down Expand Up @@ -172,10 +170,6 @@ class MemTrackerLimiter final : public NewMemTracker {
return msg.str();
}

static std::string print_bytes(int64_t bytes) {
return PrettyPrinter::print(bytes, TUnit::BYTES);
}

private:
// The following func, for automatic memory tracking and limiting based on system memory allocation.
friend class ThreadMemTrackerMgr;
Expand Down
7 changes: 4 additions & 3 deletions be/src/runtime/memory/mem_tracker_task_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "common/config.h"
#include "runtime/exec_env.h"
#include "util/pretty_printer.h"
#include "runtime/memory/mem_tracker.h"

namespace doris {

Expand Down Expand Up @@ -100,9 +101,9 @@ void MemTrackerTaskPool::logout_task_mem_tracker() {
LOG(INFO) << fmt::format(
"Deregister query/load memory tracker, queryId={}, Limit={}, CurrUsed={}, "
"PeakUsed={}",
it->first, PrettyPrinter::print(it->second->limit(), TUnit::BYTES),
PrettyPrinter::print(it->second->consumption(), TUnit::BYTES),
PrettyPrinter::print(it->second->peak_consumption(), TUnit::BYTES));
it->first, NewMemTracker::print_bytes(it->second->limit()),
NewMemTracker::print_bytes(it->second->consumption()),
NewMemTracker::print_bytes(it->second->peak_consumption()));
expired_task_ids.emplace_back(it->first);
}
}
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/memory/mem_tracker_task_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@

namespace doris {

// TODO: phmap `parallel_flat_hash_map` is not thread-safe. If it is not fixed in the future,
// can consider using other maps instead.
using TaskTrackersMap = phmap::parallel_flat_hash_map<
std::string, std::shared_ptr<MemTrackerLimiter>,
phmap::priv::hash_default_hash<std::string>, phmap::priv::hash_default_eq<std::string>,
Expand Down
11 changes: 0 additions & 11 deletions be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,19 +188,8 @@ static void attach_bthread() {
#endif
// Create thread-local data on demand.
bthread_context = new ThreadContext;
std::shared_ptr<MemTrackerLimiter> btls_tracker =
std::make_shared<MemTrackerLimiter>(-1, "Bthread:id=" + std::to_string(bthread_id),
ExecEnv::GetInstance()->bthread_mem_tracker());
bthread_context->attach_task(ThreadContext::TaskType::BRPC, "", TUniqueId(), btls_tracker);
// set the data so that next time bthread_getspecific in the thread returns the data.
CHECK_EQ(0, bthread_setspecific(btls_key, bthread_context));
} else {
// two scenarios:
// 1. A new bthread starts, but get a reuses btls.
// 2. A pthread switch occurs. Because the pthread switch cannot be accurately identified at the moment.
// So tracker call reset 0 like reuses btls.
DCHECK(bthread_context->_thread_mem_tracker_mgr->get_attach_layers() == 2);
bthread_context->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->reset_zero();
}
}

Expand Down
3 changes: 1 addition & 2 deletions be/src/service/doris_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -456,11 +456,10 @@ int main(int argc, char** argv) {
#if defined(LEAK_SANITIZER)
__lsan_do_leak_check();
#endif

doris::PerfCounters::refresh_proc_status();
#if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER)
doris::MemInfo::refresh_allocator_mem();
#endif
doris::PerfCounters::refresh_proc_status();
int64_t allocator_cache_mem_diff =
doris::MemInfo::allocator_cache_mem() -
doris::ExecEnv::GetInstance()->allocator_cache_mem_tracker()->consumption();
Expand Down
22 changes: 10 additions & 12 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,16 +120,14 @@ void PInternalServiceImpl<T>::_transmit_data(google::protobuf::RpcController* cn
const Status& extract_st) {
std::string query_id;
TUniqueId finst_id;
std::shared_ptr<MemTrackerLimiter> transmit_tracker;
std::shared_ptr<MemTrackerLimiter> transmit_tracker = nullptr;
if (request->has_query_id()) {
query_id = print_id(request->query_id());
finst_id.__set_hi(request->finst_id().hi());
finst_id.__set_lo(request->finst_id().lo());
// In some cases, query mem tracker does not exist in BE when transmit block, will get null pointer.
transmit_tracker = std::make_shared<MemTrackerLimiter>(
-1, fmt::format("QueryTransmit#queryId={}", query_id),
_exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id));
} else {
transmit_tracker = _exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id);
}
if (!transmit_tracker) {
query_id = "unkown_transmit_data";
transmit_tracker = std::make_shared<MemTrackerLimiter>(-1, "unkown_transmit_data");
}
Expand Down Expand Up @@ -595,16 +593,16 @@ void PInternalServiceImpl<T>::_transmit_block(google::protobuf::RpcController* c
const Status& extract_st) {
std::string query_id;
TUniqueId finst_id;
std::shared_ptr<MemTrackerLimiter> transmit_tracker;
std::shared_ptr<MemTrackerLimiter> transmit_tracker = nullptr;
if (request->has_query_id()) {
query_id = print_id(request->query_id());
finst_id.__set_hi(request->finst_id().hi());
finst_id.__set_lo(request->finst_id().lo());
// In some cases, query mem tracker does not exist in BE when transmit block, will get null pointer.
transmit_tracker = std::make_shared<MemTrackerLimiter>(
-1, fmt::format("QueryTransmit#queryId={}", query_id),
_exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id));
} else {
// phmap `parallel_flat_hash_map` is not thread safe, so get query mem tracker may be null pointer.
transmit_tracker =
_exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id);
}
if (!transmit_tracker) {
query_id = "unkown_transmit_block";
transmit_tracker = std::make_shared<MemTrackerLimiter>(-1, "unkown_transmit_block");
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/util/mem_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ size_t MemInfo::_s_tcmalloc_thread_bytes = 0;
size_t MemInfo::_s_allocator_cache_mem = 0;
std::string MemInfo::_s_allocator_cache_mem_str = "";
size_t MemInfo::_s_virtual_memory_used = 0;
int64_t MemInfo::_s_proc_mem_no_allocator_cache = -1;

void MemInfo::init() {
// Read from /proc/meminfo
Expand Down Expand Up @@ -96,7 +97,7 @@ void MemInfo::init() {
bool is_percent = true;
_s_mem_limit = ParseUtil::parse_mem_spec(config::mem_limit, -1, _s_physical_mem, &is_percent);
_s_mem_limit_str = PrettyPrinter::print(_s_mem_limit, TUnit::BYTES);
_s_hard_mem_limit = _s_physical_mem - std::min(209715200L, _s_physical_mem / 10); // 200M
_s_hard_mem_limit = _s_physical_mem - std::max(209715200L, _s_physical_mem / 10); // 200M

LOG(INFO) << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES) << ", /proc/meminfo/MemTotal: " << line;
_s_initialized = true;
Expand Down
5 changes: 5 additions & 0 deletions be/src/util/mem_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <string>

#include "common/logging.h"
#include "util/perf_counters.h"
#include "util/pretty_printer.h"

namespace doris {
Expand All @@ -45,6 +46,7 @@ class MemInfo {
static inline size_t allocator_virtual_mem() { return _s_virtual_memory_used; }
static inline size_t allocator_cache_mem() { return _s_allocator_cache_mem; }
static inline std::string allocator_cache_mem_str() { return _s_allocator_cache_mem_str; }
static inline int64_t proc_mem_no_allocator_cache() { return _s_proc_mem_no_allocator_cache; }

// Tcmalloc property `generic.total_physical_bytes` records the total length of the virtual memory
// obtained by the process malloc, not the physical memory actually used by the process in the OS.
Expand All @@ -65,6 +67,8 @@ class MemInfo {
_s_tcmalloc_transfer_bytes + _s_tcmalloc_thread_bytes;
_s_allocator_cache_mem_str = PrettyPrinter::print(_s_allocator_cache_mem, TUnit::BYTES);
_s_virtual_memory_used = _s_allocator_physical_mem + _s_pageheap_unmapped_bytes;
_s_proc_mem_no_allocator_cache =
PerfCounters::get_vm_rss() - static_cast<int64_t>(_s_allocator_cache_mem);
}

static inline int64_t mem_limit() {
Expand Down Expand Up @@ -100,6 +104,7 @@ class MemInfo {
static size_t _s_allocator_cache_mem;
static std::string _s_allocator_cache_mem_str;
static size_t _s_virtual_memory_used;
static int64_t _s_proc_mem_no_allocator_cache;
};

} // namespace doris
Expand Down
5 changes: 4 additions & 1 deletion be/src/vec/runtime/vdata_stream_recvr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block** next_block) {
}

// _cur_batch must be replaced with the returned batch.
_current_block.reset();
{
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker());
_current_block.reset();
}
*next_block = nullptr;
if (_is_cancelled) {
return Status::Cancelled("Cancelled");
Expand Down