Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions be/src/runtime/memory/mem_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <fmt/format.h>

#include "runtime/thread_context.h"
#include "util/pretty_printer.h"
#include "util/string_util.h"
#include "util/time.h"

Expand Down Expand Up @@ -103,11 +102,10 @@ void MemTracker::make_group_snapshot(std::vector<MemTracker::Snapshot>* snapshot
}

std::string MemTracker::log_usage(MemTracker::Snapshot snapshot) {
return fmt::format(
"MemTracker Label={}, Parent Label={}, Used={}({} B), Peak={}({} B)", snapshot.label,
snapshot.parent, PrettyPrinter::print(snapshot.cur_consumption, TUnit::BYTES),
snapshot.cur_consumption, PrettyPrinter::print(snapshot.peak_consumption, TUnit::BYTES),
snapshot.peak_consumption);
return fmt::format("MemTracker Label={}, Parent Label={}, Used={}({} B), Peak={}({} B)",
snapshot.label, snapshot.parent, print_bytes(snapshot.cur_consumption),
snapshot.cur_consumption, print_bytes(snapshot.peak_consumption),
snapshot.peak_consumption);
}

static std::unordered_map<std::string, std::shared_ptr<MemTracker>> global_mem_trackers;
Expand Down
6 changes: 6 additions & 0 deletions be/src/runtime/memory/mem_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
// and modified by Doris
#pragma once

#include "util/pretty_printer.h"
#include "util/runtime_profile.h"

namespace doris {
Expand Down Expand Up @@ -56,6 +57,11 @@ class MemTracker {
static std::shared_ptr<MemTracker> get_global_mem_tracker(const std::string& label);
static void make_global_mem_tracker_snapshot(std::vector<MemTracker::Snapshot>* snapshots);

static std::string print_bytes(int64_t bytes) {
return bytes >= 0 ? PrettyPrinter::print(bytes, TUnit::BYTES)
: "-" + PrettyPrinter::print(std::abs(bytes), TUnit::BYTES);
}

public:
const std::string& label() const { return _label; }
// Returns the memory consumed in bytes.
Expand Down
4 changes: 0 additions & 4 deletions be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,6 @@ class MemTrackerLimiter final : public MemTracker {
return msg.str();
}

static std::string print_bytes(int64_t bytes) {
return PrettyPrinter::print(bytes, TUnit::BYTES);
}

private:
// The following func, for automatic memory tracking and limiting based on system memory allocation.
friend class ThreadMemTrackerMgr;
Expand Down
10 changes: 5 additions & 5 deletions be/src/runtime/memory/mem_tracker_task_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::register_query_scanner_me
const std::string& query_id) {
return register_task_mem_tracker_impl("Scanner#" + query_id, -1,
fmt::format("Scanner#Query#Id={}", query_id),
ExecEnv::GetInstance()->query_pool_mem_tracker());
get_task_mem_tracker(query_id));
}

std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::register_load_mem_tracker(
Expand All @@ -69,7 +69,7 @@ std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::register_load_scanner_mem
const std::string& load_id) {
return register_task_mem_tracker_impl("Scanner#" + load_id, -1,
fmt::format("Scanner#Load#Id={}", load_id),
ExecEnv::GetInstance()->load_pool_mem_tracker());
get_task_mem_tracker(load_id));
}

std::shared_ptr<MemTrackerLimiter> MemTrackerTaskPool::get_task_mem_tracker(
Expand Down Expand Up @@ -104,9 +104,9 @@ void MemTrackerTaskPool::logout_task_mem_tracker() {
LOG(INFO) << fmt::format(
"Deregister query/load memory tracker, queryId={}, Limit={}, CurrUsed={}, "
"PeakUsed={}",
it->first, PrettyPrinter::print(it->second->limit(), TUnit::BYTES),
PrettyPrinter::print(it->second->consumption(), TUnit::BYTES),
PrettyPrinter::print(it->second->peak_consumption(), TUnit::BYTES));
it->first, MemTracker::print_bytes(it->second->limit()),
MemTracker::print_bytes(it->second->consumption()),
MemTracker::print_bytes(it->second->peak_consumption()));
expired_task_ids.emplace_back(it->first);
} else if (config::memory_verbose_track) {
it->second->print_log_usage("query routine");
Expand Down
12 changes: 4 additions & 8 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,8 @@ void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController* cntl_
query_id = print_id(request->query_id());
finst_id.__set_hi(request->finst_id().hi());
finst_id.__set_lo(request->finst_id().lo());
// In some cases, query mem tracker does not exist in BE when transmit block, will get null pointer.
transmit_tracker = std::make_shared<MemTrackerLimiter>(
-1, fmt::format("QueryTransmit#queryId={}", query_id),
_exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id));
transmit_tracker =
_exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id);
} else {
query_id = "unkown_transmit_data";
transmit_tracker = std::make_shared<MemTrackerLimiter>(-1, "unkown_transmit_data");
Expand Down Expand Up @@ -642,10 +640,8 @@ void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* cntl
query_id = print_id(request->query_id());
finst_id.__set_hi(request->finst_id().hi());
finst_id.__set_lo(request->finst_id().lo());
// In some cases, query mem tracker does not exist in BE when transmit block, will get null pointer.
transmit_tracker = std::make_shared<MemTrackerLimiter>(
-1, fmt::format("QueryTransmit#queryId={}", query_id),
_exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id));
transmit_tracker =
_exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id);
} else {
query_id = "unkown_transmit_block";
transmit_tracker = std::make_shared<MemTrackerLimiter>(-1, "unkown_transmit_block");
Expand Down
5 changes: 4 additions & 1 deletion be/src/vec/runtime/vdata_stream_recvr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block** next_block) {
}

// _cur_batch must be replaced with the returned batch.
_current_block.reset();
{
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker());
_current_block.reset();
}
*next_block = nullptr;
if (_is_cancelled) {
return Status::Cancelled("Cancelled");
Expand Down