Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions be/src/olap/page_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,12 @@ template <typename TAllocator>
PageBase<TAllocator>::PageBase(size_t b, bool use_cache, segment_v2::PageTypePB page_type)
: LRUCacheValueBase(), _size(b), _capacity(b) {
if (use_cache) {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
StoragePageCache::instance()->mem_tracker(page_type));
_data = reinterpret_cast<char*>(TAllocator::alloc(_capacity, ALLOCATOR_ALIGNMENT_16));
_mem_tracker_by_allocator = StoragePageCache::instance()->mem_tracker(page_type);
} else {
_mem_tracker_by_allocator = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
}
{
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker_by_allocator);
_data = reinterpret_cast<char*>(TAllocator::alloc(_capacity, ALLOCATOR_ALIGNMENT_16));
}
}
Expand All @@ -40,7 +42,7 @@ template <typename TAllocator>
PageBase<TAllocator>::~PageBase() {
if (_data != nullptr) {
DCHECK(_capacity != 0 && _size != 0);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(TAllocator::mem_tracker_);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_mem_tracker_by_allocator);
TAllocator::free(_data, _capacity);
}
}
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/page_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class PageBase : private TAllocator, public LRUCacheValueBase {
// Effective size, smaller than capacity, such as data page remove checksum suffix.
size_t _size = 0;
size_t _capacity = 0;
std::shared_ptr<MemTrackerLimiter> _mem_tracker_by_allocator;
};

using DataPage = PageBase<Allocator<false>>;
Expand Down
21 changes: 14 additions & 7 deletions be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -452,27 +452,34 @@ class SwitchThreadMemTrackerLimiter {
const std::shared_ptr<doris::MemTrackerLimiter>& mem_tracker) {
DCHECK(mem_tracker);
doris::ThreadLocalHandle::create_thread_local_if_not_exits();
_old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker);
if (mem_tracker != thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) {
_old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker);
}
}

explicit SwitchThreadMemTrackerLimiter(const doris::QueryThreadContext& query_thread_context) {
doris::ThreadLocalHandle::create_thread_local_if_not_exits();
DCHECK(thread_context()->task_id() ==
query_thread_context.query_id); // workload group alse not change
DCHECK(query_thread_context.query_mem_tracker);
_old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(
query_thread_context.query_mem_tracker);
if (query_thread_context.query_mem_tracker !=
thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) {
_old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(
query_thread_context.query_mem_tracker);
}
}

~SwitchThreadMemTrackerLimiter() {
thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker);
if (_old_mem_tracker != nullptr) {
thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker);
}
doris::ThreadLocalHandle::del_thread_local_if_count_is_zero();
}

private:
std::shared_ptr<doris::MemTrackerLimiter> _old_mem_tracker;
std::shared_ptr<doris::MemTrackerLimiter> _old_mem_tracker {nullptr};
};

class AddThreadMemTrackerConsumer {
Expand Down
8 changes: 7 additions & 1 deletion be/src/util/byte_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,15 @@ struct ByteBuffer : private Allocator<false> {
size_t capacity;

private:
ByteBuffer(size_t capacity_) : pos(0), limit(capacity_), capacity(capacity_) {
ByteBuffer(size_t capacity_)
: pos(0),
limit(capacity_),
capacity(capacity_),
mem_tracker_(doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker()) {
ptr = reinterpret_cast<char*>(Allocator<false>::alloc(capacity_));
}

std::shared_ptr<MemTrackerLimiter> mem_tracker_;
};

} // namespace doris
33 changes: 2 additions & 31 deletions be/src/vec/common/allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,43 +211,14 @@ void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::memory_

template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::consume_memory(
size_t size) {
// Usually, an object that inherits Allocator has the same TLS tracker for each alloc.
// If an object that inherits Allocator needs to be reused by multiple queries,
// it is necessary to switch the same tracker to TLS when calling alloc.
// However, in ORC Reader, ORC DataBuffer will be reused, but we cannot switch TLS tracker,
// so we update the Allocator tracker when the TLS tracker changes.
// note that the tracker in thread context when object that inherit Allocator is constructed may be
// no attach memory tracker in tls. usually the memory tracker is attached in tls only during the first alloc.
if (mem_tracker_ == nullptr ||
mem_tracker_->label() != doris::thread_context()->thread_mem_tracker()->label()) {
mem_tracker_ = doris::thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
}
size_t size) const {
CONSUME_THREAD_MEM_TRACKER(size);
}

template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator>::release_memory(
size_t size) const {
doris::ThreadContext* thread_context = doris::thread_context(true);
if ((thread_context && thread_context->thread_mem_tracker()->label() != "Orphan") ||
mem_tracker_ == nullptr) {
// If thread_context exist and the label of thread_mem_tracker not equal to `Orphan`,
// this means that in the scope of SCOPED_ATTACH_TASK,
// so thread_mem_tracker should be used to release memory.
// If mem_tracker_ is nullptr there is a scenario where an object that inherits Allocator
// has never called alloc, but free memory.
// in phmap, the memory alloced by an object may be transferred to another object and then free.
// in this case, thread context must attach a memory tracker other than Orphan,
// otherwise memory tracking will be wrong.
RELEASE_THREAD_MEM_TRACKER(size);
} else {
// if thread_context does not exist or the label of thread_mem_tracker is equal to
// `Orphan`, it usually happens during object destruction. This means that
// the scope of SCOPED_ATTACH_TASK has been left, so release memory using Allocator tracker.
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(mem_tracker_);
RELEASE_THREAD_MEM_TRACKER(size);
}
RELEASE_THREAD_MEM_TRACKER(size);
}

template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator>
Expand Down
11 changes: 8 additions & 3 deletions be/src/vec/common/allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,14 @@ class Allocator {
// alloc will continue to execute, so the consume memtracker is forced.
void memory_check(size_t size) const;
// Increases consumption of this tracker by 'bytes'.
void consume_memory(size_t size);
// some special cases:
// 1. objects that inherit Allocator will not be shared by multiple queries.
// non-compliant: page cache, ORC ByteBuffer.
// 2. objects that inherit Allocator will only free memory allocated by themselves.
// non-compliant: phmap, the memory alloced by an object may be transferred to another object and then free.
// 3. the memory tracker in TLS is the same during the construction of objects that inherit Allocator
// and during subsequent memory allocation.
void consume_memory(size_t size) const;
void release_memory(size_t size) const;
void throw_bad_alloc(const std::string& err) const;
#ifndef NDEBUG
Expand Down Expand Up @@ -404,8 +411,6 @@ class Allocator {

static constexpr bool clear_memory = clear_memory_;

std::shared_ptr<doris::MemTrackerLimiter> mem_tracker_ {nullptr};

// Freshly mmapped pages are copy-on-write references to a global zero page.
// On the first write, a page fault occurs, and an actual writable page is
// allocated. If we are going to use this memory soon, such as when resizing
Expand Down