Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion be/src/runtime/exec_env.h
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,9 @@ class ExecEnv {
this->_dummy_lru_cache = dummy_lru_cache;
}
void set_write_cooldown_meta_executors();

static void set_tracking_memory(bool tracking_memory) {
_s_tracking_memory.store(tracking_memory, std::memory_order_acquire);
}
#endif
LoadStreamMapPool* load_stream_map_pool() { return _load_stream_map_pool.get(); }

Expand Down
8 changes: 6 additions & 2 deletions be/src/runtime/memory/thread_mem_tracker_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ class ThreadMemTrackerMgr {
fmt::to_string(consumer_tracker_buf));
}

int64_t untracked_mem() const { return _untracked_mem; }
int64_t reserved_mem() const { return _reserved_mem; }

private:
// is false: ExecEnv::ready() = false when thread local is initialized
bool _init = false;
Expand Down Expand Up @@ -190,7 +193,7 @@ inline void ThreadMemTrackerMgr::pop_consumer_tracker() {

inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_check) {
if (_reserved_mem != 0) {
if (_reserved_mem >= size) {
if (_reserved_mem > size) {
// only need to subtract _reserved_mem, no need to consume MemTracker,
// every time _reserved_mem is minus the sum of size >= SYNC_PROC_RESERVED_INTERVAL_BYTES,
// subtract size from process global reserved memory,
Expand All @@ -208,7 +211,8 @@ inline void ThreadMemTrackerMgr::consume(int64_t size, int skip_large_memory_che
}
return;
} else {
// reserved memory is insufficient, the remaining _reserved_mem is subtracted from this memory consumed,
// _reserved_mem <= size, reserved memory used done,
// the remaining _reserved_mem is subtracted from this memory consumed,
// and reset _reserved_mem to 0, and subtract the remaining _reserved_mem from
// process global reserved memory, this means that all reserved memory has been used by BE process.
size -= _reserved_mem;
Expand Down
6 changes: 0 additions & 6 deletions be/src/runtime/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,12 @@ class ThreadContext {

void attach_task(const TUniqueId& task_id,
const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
#ifndef BE_TEST
// will only attach_task at the beginning of the thread function, there should be no duplicate attach_task.
DCHECK(mem_tracker);
// Orphan is thread default tracker.
DCHECK(thread_mem_tracker()->label() == "Orphan")
<< ", thread mem tracker label: " << thread_mem_tracker()->label()
<< ", attach mem tracker label: " << mem_tracker->label();
#endif
_task_id = task_id;
thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker);
thread_mem_tracker_mgr->set_query_id(_task_id);
Expand Down Expand Up @@ -380,9 +378,7 @@ class AttachTask {
class SwitchThreadMemTrackerLimiter {
public:
explicit SwitchThreadMemTrackerLimiter(const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
#ifndef BE_TEST
DCHECK(mem_tracker);
#endif
ThreadLocalHandle::create_thread_local_if_not_exits();
_old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker);
Expand All @@ -391,9 +387,7 @@ class SwitchThreadMemTrackerLimiter {
explicit SwitchThreadMemTrackerLimiter(const QueryThreadContext& query_thread_context) {
ThreadLocalHandle::create_thread_local_if_not_exits();
DCHECK(thread_context()->task_id() == query_thread_context.query_id);
#ifndef BE_TEST
DCHECK(query_thread_context.query_mem_tracker);
#endif
_old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(
query_thread_context.query_mem_tracker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ TEST(MemTrackerTest, SingleTrackerNoLimit) {
t->release(5);
}

TEST(MemTestTest, SingleTrackerWithLimit) {
TEST(MemTrackerTest, SingleTrackerWithLimit) {
auto t = std::make_unique<MemTrackerLimiter>(MemTrackerLimiter::Type::GLOBAL, "limit tracker",
11);
EXPECT_TRUE(t->has_limit());
Expand Down
Loading