Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 1 addition & 14 deletions be/src/runtime/mem_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,11 @@ MemPool::ChunkInfo::ChunkInfo(const Chunk& chunk_) : chunk(chunk_), allocated_by
DorisMetrics::instance()->memory_pool_bytes_total->increment(chunk.size);
}


MemPool::MemPool(const std::string& label)
: current_chunk_idx_(-1),
next_chunk_size_(INITIAL_CHUNK_SIZE),
total_allocated_bytes_(0),
total_reserved_bytes_(0),
peak_allocated_bytes_(0) {
total_reserved_bytes_(0) {
mem_tracker_own_ = MemTracker::CreateTracker(-1, label + ":MemPool");
mem_tracker_ = mem_tracker_own_.get();
}
Expand All @@ -59,9 +57,6 @@ MemPool::~MemPool() {
ChunkAllocator::instance()->free(chunk.chunk);
}
mem_tracker_->Release(total_bytes_released);
THREAD_MEM_TRACKER_TRANSFER_FROM(total_bytes_released - peak_allocated_bytes_,
ExecEnv::GetInstance()->orphan_mem_tracker_raw());

DorisMetrics::instance()->memory_pool_bytes_total->increment(-total_bytes_released);
}

Expand All @@ -81,15 +76,11 @@ void MemPool::free_all() {
total_bytes_released += chunk.chunk.size;
ChunkAllocator::instance()->free(chunk.chunk);
}
THREAD_MEM_TRACKER_TRANSFER_FROM(total_bytes_released - peak_allocated_bytes_,
ExecEnv::GetInstance()->orphan_mem_tracker_raw());

chunks_.clear();
next_chunk_size_ = INITIAL_CHUNK_SIZE;
current_chunk_idx_ = -1;
total_allocated_bytes_ = 0;
total_reserved_bytes_ = 0;
peak_allocated_bytes_ = 0;

mem_tracker_->Release(total_bytes_released);
DorisMetrics::instance()->memory_pool_bytes_total->increment(-total_bytes_released);
Expand Down Expand Up @@ -148,7 +139,6 @@ bool MemPool::find_chunk(size_t min_size, bool check_limits) {
mem_tracker_->Release(chunk_size);
return false;
}
THREAD_MEM_TRACKER_TRANSFER_TO(chunk_size, ExecEnv::GetInstance()->orphan_mem_tracker_raw());
ASAN_POISON_MEMORY_REGION(chunk.data, chunk_size);
// Put it before the first free chunk. If no free chunks, it goes at the end.
if (first_free_idx == static_cast<int>(chunks_.size())) {
Expand Down Expand Up @@ -214,8 +204,6 @@ void MemPool::acquire_data(MemPool* src, bool keep_current) {
src->total_allocated_bytes_ = 0;
}

reset_peak();

if (!keep_current) src->free_all();
DCHECK(src->check_integrity(false));
DCHECK(check_integrity(false));
Expand All @@ -228,7 +216,6 @@ void MemPool::exchange_data(MemPool* other) {
std::swap(next_chunk_size_, other->next_chunk_size_);
std::swap(total_allocated_bytes_, other->total_allocated_bytes_);
std::swap(total_reserved_bytes_, other->total_reserved_bytes_);
std::swap(peak_allocated_bytes_, other->peak_allocated_bytes_);
std::swap(chunks_, other->chunks_);

// update MemTracker
Expand Down
14 changes: 0 additions & 14 deletions be/src/runtime/mem_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ class MemPool {
next_chunk_size_(INITIAL_CHUNK_SIZE),
total_allocated_bytes_(0),
total_reserved_bytes_(0),
peak_allocated_bytes_(0),
mem_tracker_(mem_tracker) {
DCHECK(mem_tracker != nullptr);
}
Expand Down Expand Up @@ -161,7 +160,6 @@ class MemPool {

int64_t total_allocated_bytes() const { return total_allocated_bytes_; }
int64_t total_reserved_bytes() const { return total_reserved_bytes_; }
int64_t peak_allocated_bytes() const { return peak_allocated_bytes_; }

MemTracker* mem_tracker() { return mem_tracker_; }

Expand Down Expand Up @@ -207,14 +205,6 @@ class MemPool {
/// data. Otherwise the current chunk can be either empty or full.
bool check_integrity(bool check_current_chunk_empty);

void reset_peak() {
if (total_allocated_bytes_ - peak_allocated_bytes_ > 65536) {
THREAD_MEM_TRACKER_TRANSFER_FROM(total_allocated_bytes_ - peak_allocated_bytes_,
ExecEnv::GetInstance()->orphan_mem_tracker_raw());
peak_allocated_bytes_ = total_allocated_bytes_;
}
}

/// Return offset to unoccupied space in current chunk.
int64_t get_free_offset() const {
if (current_chunk_idx_ == -1) return 0;
Expand Down Expand Up @@ -246,7 +236,6 @@ class MemPool {
DCHECK_LE(info.allocated_bytes + size, info.chunk.size);
info.allocated_bytes += padding + size;
total_allocated_bytes_ += padding + size;
reset_peak();
DCHECK_LE(current_chunk_idx_, chunks_.size() - 1);
return result;
}
Expand Down Expand Up @@ -306,9 +295,6 @@ class MemPool {
/// sum of all bytes allocated in chunks_
int64_t total_reserved_bytes_;

/// Maximum number of bytes allocated from this pool at one time.
int64_t peak_allocated_bytes_;

std::vector<ChunkInfo> chunks_;

/// The current and peak memory footprint of this pool. This is different from
Expand Down
7 changes: 2 additions & 5 deletions be/src/runtime/memory/mem_tracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,8 @@ NewMemTracker::NewMemTracker(const std::string& label, RuntimeProfile* profile)
DCHECK(thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw() != nullptr);
MemTrackerLimiter* parent =
thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw();
_label = fmt::format("[Observer] {} | {}", label,
parent->label() == "Orphan" ? "Process" : parent->label());
_bind_group_num = parent->label() == "Orphan"
? ExecEnv::GetInstance()->new_process_mem_tracker()->group_num()
: parent->group_num();
_label = fmt::format("[Observer] {} | {}", label, parent->label());
_bind_group_num = parent->group_num();
{
std::lock_guard<std::mutex> l(mem_tracker_pool[_bind_group_num].group_lock);
_tracker_group_it = mem_tracker_pool[_bind_group_num].trackers.insert(
Expand Down
8 changes: 2 additions & 6 deletions be/src/runtime/memory/mem_tracker_limiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ MemTrackerLimiter::~MemTrackerLimiter() {
DCHECK(remain_child_count() == 0 || _label == "Process");
// In order to ensure `consumption of all limiter trackers` + `orphan tracker consumption` = `process tracker consumption`
// in real time. Merge its consumption into orphan when parent is process, to avoid repetition.
if ((_parent && _parent->label() == "Process")) {
if (_parent && _parent->label() == "Process") {
ExecEnv::GetInstance()->orphan_mem_tracker_raw()->cache_consume_local(
_consumption->current_value());
}
Expand All @@ -88,11 +88,7 @@ MemTrackerLimiter::~MemTrackerLimiter() {
_all_ancestors.clear();
_all_ancestors.push_back(ExecEnv::GetInstance()->orphan_mem_tracker_raw());
}
for (auto& tracker : _all_ancestors) {
if (tracker->label() != "Process") {
tracker->_consumption->add(_untracked_mem);
}
}
consume_local(_untracked_mem);
if (_parent) {
std::lock_guard<std::mutex> l(_parent->_child_tracker_limiter_lock);
if (_child_tracker_it != _parent->_child_tracker_limiters.end()) {
Expand Down
17 changes: 11 additions & 6 deletions be/src/runtime/memory/mem_tracker_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ class MemTrackerLimiter final : public NewMemTracker {
WARN_UNUSED_RESULT
bool try_consume(int64_t bytes, std::string& failed_msg);

void consume_local(int64_t bytes);

// When the accumulated untracked memory value exceeds the upper limit,
// the current value is returned and set to 0.
// Thread safety.
Expand Down Expand Up @@ -274,15 +276,18 @@ inline int64_t MemTrackerLimiter::add_untracked_mem(int64_t bytes) {
return 0;
}

inline void MemTrackerLimiter::consume_local(int64_t bytes) {
if (bytes == 0) return;
for (auto& tracker : _all_ancestors) {
if (tracker->label() == "Process") return;
tracker->_consumption->add(bytes);
}
}

inline void MemTrackerLimiter::cache_consume_local(int64_t bytes) {
if (bytes == 0) return;
int64_t consume_bytes = add_untracked_mem(bytes);
if (consume_bytes != 0) {
for (auto& tracker : _all_ancestors) {
if (tracker->label() == "Process") return;
tracker->_consumption->add(consume_bytes);
}
}
consume_local(consume_bytes);
}

inline bool MemTrackerLimiter::try_consume(int64_t bytes, std::string& failed_msg) {
Expand Down