From c3236d03ab01753cf8b69977cba57e2ce614eed2 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Tue, 19 Dec 2023 18:02:30 +0800 Subject: [PATCH 1/5] 1 --- be/src/common/daemon.cpp | 19 +++++++++++++++++++ be/src/common/daemon.h | 7 +++++++ be/src/util/mem_info.cpp | 9 +++++---- 3 files changed, 31 insertions(+), 4 deletions(-) diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index 3879ef9ff8b2e8..fb30b3d1e091b9 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -63,6 +63,8 @@ namespace doris { +CountDownLatch Daemon::_je_purge_dirty_pages_thread_latch {1}; + void Daemon::tcmalloc_gc_thread() { // TODO All cache GC wish to be supported #if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) && \ @@ -352,6 +354,18 @@ void Daemon::block_spill_gc_thread() { } } +void Daemon::je_purge_dirty_pages_thread() const { + _je_purge_dirty_pages_thread_latch.reset(1); + do { + _je_purge_dirty_pages_thread_latch.wait(); + if (_is_stopped) { + break; + } + doris::MemInfo::je_purge_all_arena_dirty_pages(); + _je_purge_dirty_pages_thread_latch.reset(1); + } while (true); +} + void Daemon::start() { Status st; st = Thread::create( @@ -381,6 +395,9 @@ void Daemon::start() { st = Thread::create( "Daemon", "block_spill_gc_thread", [this]() { this->block_spill_gc_thread(); }, &_threads.emplace_back()); + st = Thread::create( + "Daemon", "je_purge_dirty_pages_thread", + [this]() { this->je_purge_dirty_pages_thread(); }, &_threads.emplace_back()); CHECK(st.ok()) << st; } @@ -390,7 +407,9 @@ void Daemon::stop() { LOG(INFO) << "Doris daemon stop returned since no bg threads latch."; return; } + _is_stopped = true; _stop_background_threads_latch.count_down(); + _je_purge_dirty_pages_thread_latch.count_down(); for (auto&& t : _threads) { if (t) { t->join(); diff --git a/be/src/common/daemon.h b/be/src/common/daemon.h index 139584ba93f9c8..fbfe8a2dfc8cae 100644 --- a/be/src/common/daemon.h +++ b/be/src/common/daemon.h @@ -36,6 +36,10 @@ class Daemon { // Stop background threads void stop(); + static void count_down_je_purge_dirty_pages_thread_latch() { + _je_purge_dirty_pages_thread_latch.count_down(); + } + private: void tcmalloc_gc_thread(); void memory_maintenance_thread(); @@ -43,8 +47,11 @@ class Daemon { void memtable_memory_limiter_tracker_refresh_thread(); void calculate_metrics_thread(); void block_spill_gc_thread(); + void je_purge_dirty_pages_thread() const; CountDownLatch _stop_background_threads_latch; + static CountDownLatch _je_purge_dirty_pages_thread_latch; + bool _is_stopped {}; std::vector> _threads; }; } // namespace doris diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index 416ae1ae200b3c..2fc46093beddfa 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -37,6 +37,7 @@ #include #include "common/config.h" +#include "common/daemon.h" #include "common/status.h" #include "gutil/strings/split.h" #include "runtime/exec_env.h" @@ -129,7 +130,7 @@ bool MemInfo::process_minor_gc() { std::string pre_sys_mem_available = MemInfo::sys_mem_available_str(); Defer defer {[&]() { - je_purge_all_arena_dirty_pages(); + Daemon::count_down_je_purge_dirty_pages_thread_latch(); std::stringstream ss; profile->pretty_print(&ss); LOG(INFO) << fmt::format( @@ -139,7 +140,7 @@ bool MemInfo::process_minor_gc() { }}; freed_mem += CacheManager::instance()->for_each_cache_prune_stale(profile.get()); - je_purge_all_arena_dirty_pages(); + Daemon::count_down_je_purge_dirty_pages_thread_latch(); if (freed_mem > _s_process_minor_gc_size) { return true; } @@ -180,7 +181,7 @@ bool MemInfo::process_full_gc() { std::string pre_sys_mem_available = MemInfo::sys_mem_available_str(); Defer defer {[&]() { - je_purge_all_arena_dirty_pages(); + Daemon::count_down_je_purge_dirty_pages_thread_latch(); std::stringstream ss; profile->pretty_print(&ss); LOG(INFO) << fmt::format( @@ -190,7 +191,7 @@ bool MemInfo::process_full_gc() { }}; freed_mem += CacheManager::instance()->for_each_cache_prune_all(profile.get()); - je_purge_all_arena_dirty_pages(); + Daemon::count_down_je_purge_dirty_pages_thread_latch(); if (freed_mem > _s_process_full_gc_size) { return true; } From 2ee30f3b71e2659c0f58c8e4b3979d3507a8156a Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Thu, 21 Dec 2023 18:29:03 +0800 Subject: [PATCH 2/5] 2 --- be/src/common/daemon.cpp | 12 ++++++------ be/src/common/daemon.h | 6 ------ be/src/util/mem_info.cpp | 8 ++++---- be/src/util/mem_info.h | 7 +++++++ 4 files changed, 17 insertions(+), 16 deletions(-) diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index fb30b3d1e091b9..922fee2efdfa9a 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -355,14 +355,16 @@ void Daemon::block_spill_gc_thread() { } void Daemon::je_purge_dirty_pages_thread() const { - _je_purge_dirty_pages_thread_latch.reset(1); do { - _je_purge_dirty_pages_thread_latch.wait(); - if (_is_stopped) { + { + std::unique_lock l(doris::MemInfo::je_purge_dirty_pages_lock); + doris::MemInfo::je_purge_dirty_pages_cv.wait(l); + } + if (_stop_background_threads_latch.count() == 0) { break; } + // cannot lock je_purge_all_arena_dirty_pages, otherwise it will block GC thread. doris::MemInfo::je_purge_all_arena_dirty_pages(); - _je_purge_dirty_pages_thread_latch.reset(1); } while (true); } @@ -407,9 +409,7 @@ void Daemon::stop() { LOG(INFO) << "Doris daemon stop returned since no bg threads latch."; return; } - _is_stopped = true; _stop_background_threads_latch.count_down(); - _je_purge_dirty_pages_thread_latch.count_down(); for (auto&& t : _threads) { if (t) { t->join(); diff --git a/be/src/common/daemon.h b/be/src/common/daemon.h index fbfe8a2dfc8cae..18f78cbe583250 100644 --- a/be/src/common/daemon.h +++ b/be/src/common/daemon.h @@ -36,10 +36,6 @@ class Daemon { // Stop background threads void stop(); - static void count_down_je_purge_dirty_pages_thread_latch() { - _je_purge_dirty_pages_thread_latch.count_down(); - } - private: void tcmalloc_gc_thread(); void memory_maintenance_thread(); @@ -50,8 +46,6 @@ class Daemon { void je_purge_dirty_pages_thread() const; CountDownLatch _stop_background_threads_latch; - static CountDownLatch _je_purge_dirty_pages_thread_latch; - bool _is_stopped {}; std::vector> _threads; }; } // namespace doris diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index 2fc46093beddfa..0d58881e2419aa 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -130,7 +130,7 @@ bool MemInfo::process_minor_gc() { std::string pre_sys_mem_available = MemInfo::sys_mem_available_str(); Defer defer {[&]() { - Daemon::count_down_je_purge_dirty_pages_thread_latch(); + notify_all_je_purge_dirty_pages_cv(); std::stringstream ss; profile->pretty_print(&ss); LOG(INFO) << fmt::format( @@ -140,7 +140,7 @@ bool MemInfo::process_minor_gc() { }}; freed_mem += CacheManager::instance()->for_each_cache_prune_stale(profile.get()); - Daemon::count_down_je_purge_dirty_pages_thread_latch(); + notify_all_je_purge_dirty_pages_cv(); if (freed_mem > _s_process_minor_gc_size) { return true; } @@ -181,7 +181,7 @@ bool MemInfo::process_full_gc() { std::string pre_sys_mem_available = MemInfo::sys_mem_available_str(); Defer defer {[&]() { - Daemon::count_down_je_purge_dirty_pages_thread_latch(); + notify_all_je_purge_dirty_pages_cv(); std::stringstream ss; profile->pretty_print(&ss); LOG(INFO) << fmt::format( @@ -191,7 +191,7 @@ bool MemInfo::process_full_gc() { }}; freed_mem += CacheManager::instance()->for_each_cache_prune_all(profile.get()); - Daemon::count_down_je_purge_dirty_pages_thread_latch(); + notify_all_je_purge_dirty_pages_cv(); if (freed_mem > _s_process_full_gc_size) { return true; } diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h index 3691934b800fff..227422a672801c 100644 --- a/be/src/util/mem_info.h +++ b/be/src/util/mem_info.h @@ -127,6 +127,13 @@ class MemInfo { #endif } + static std::mutex je_purge_dirty_pages_lock; + static std::condition_variable je_purge_dirty_pages_cv; + static void notify_all_je_purge_dirty_pages_cv() { + std::lock_guard l(je_purge_dirty_pages_lock); + je_purge_dirty_pages_cv.notify_all(); + } + static inline size_t allocator_virtual_mem() { return _s_virtual_memory_used.load(std::memory_order_relaxed); } From 4a2fb00ff1ee4a5f06264e3848010480c555db3d Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Thu, 21 Dec 2023 18:36:34 +0800 Subject: [PATCH 3/5] 3 --- be/src/common/daemon.cpp | 7 ++----- be/src/util/mem_info.cpp | 8 ++++---- be/src/util/mem_info.h | 4 ---- 3 files changed, 6 insertions(+), 13 deletions(-) diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index 922fee2efdfa9a..21521d7b5836ad 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -356,14 +356,11 @@ void Daemon::block_spill_gc_thread() { void Daemon::je_purge_dirty_pages_thread() const { do { - { - std::unique_lock l(doris::MemInfo::je_purge_dirty_pages_lock); - doris::MemInfo::je_purge_dirty_pages_cv.wait(l); - } + std::unique_lock l(doris::MemInfo::je_purge_dirty_pages_lock); + doris::MemInfo::je_purge_dirty_pages_cv.wait(l); if (_stop_background_threads_latch.count() == 0) { break; } - // cannot lock je_purge_all_arena_dirty_pages, otherwise it will block GC thread. doris::MemInfo::je_purge_all_arena_dirty_pages(); } while (true); } diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index 0d58881e2419aa..0b5036a340fe1b 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -130,7 +130,7 @@ bool MemInfo::process_minor_gc() { std::string pre_sys_mem_available = MemInfo::sys_mem_available_str(); Defer defer {[&]() { - notify_all_je_purge_dirty_pages_cv(); + je_purge_dirty_pages_cv.notify_all(); std::stringstream ss; profile->pretty_print(&ss); LOG(INFO) << fmt::format( @@ -140,7 +140,7 @@ bool MemInfo::process_minor_gc() { }}; freed_mem += CacheManager::instance()->for_each_cache_prune_stale(profile.get()); - notify_all_je_purge_dirty_pages_cv(); + je_purge_dirty_pages_cv.notify_all(); if (freed_mem > _s_process_minor_gc_size) { return true; } @@ -181,7 +181,7 @@ bool MemInfo::process_full_gc() { std::string pre_sys_mem_available = MemInfo::sys_mem_available_str(); Defer defer {[&]() { - notify_all_je_purge_dirty_pages_cv(); + je_purge_dirty_pages_cv.notify_all(); std::stringstream ss; profile->pretty_print(&ss); LOG(INFO) << fmt::format( @@ -191,7 +191,7 @@ bool MemInfo::process_full_gc() { }}; freed_mem += CacheManager::instance()->for_each_cache_prune_all(profile.get()); - notify_all_je_purge_dirty_pages_cv(); + je_purge_dirty_pages_cv.notify_all(); if (freed_mem > _s_process_full_gc_size) { return true; } diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h index 227422a672801c..c1a7bfe8789f92 100644 --- a/be/src/util/mem_info.h +++ b/be/src/util/mem_info.h @@ -129,10 +129,6 @@ class MemInfo { static std::mutex je_purge_dirty_pages_lock; static std::condition_variable je_purge_dirty_pages_cv; - static void notify_all_je_purge_dirty_pages_cv() { - std::lock_guard l(je_purge_dirty_pages_lock); - je_purge_dirty_pages_cv.notify_all(); - } static inline size_t allocator_virtual_mem() { return _s_virtual_memory_used.load(std::memory_order_relaxed); From bfa45be589f4332a77dd774c1c418ebcef478ccc Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Thu, 21 Dec 2023 22:20:29 +0800 Subject: [PATCH 4/5] 2 --- be/src/common/daemon.cpp | 8 +++++--- be/src/util/mem_info.cpp | 12 +++++++----- be/src/util/mem_info.h | 5 +++++ 3 files changed, 17 insertions(+), 8 deletions(-) diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp index 21521d7b5836ad..e3bf1a738b87a8 100644 --- a/be/src/common/daemon.cpp +++ b/be/src/common/daemon.cpp @@ -63,8 +63,6 @@ namespace doris { -CountDownLatch Daemon::_je_purge_dirty_pages_thread_latch {1}; - void Daemon::tcmalloc_gc_thread() { // TODO All cache GC wish to be supported #if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) && \ @@ -357,11 +355,15 @@ void Daemon::block_spill_gc_thread() { void Daemon::je_purge_dirty_pages_thread() const { do { std::unique_lock l(doris::MemInfo::je_purge_dirty_pages_lock); - doris::MemInfo::je_purge_dirty_pages_cv.wait(l); + while (_stop_background_threads_latch.count() != 0 && + !doris::MemInfo::je_purge_dirty_pages_notify.load(std::memory_order_relaxed)) { + doris::MemInfo::je_purge_dirty_pages_cv.wait_for(l, std::chrono::seconds(1)); + } if (_stop_background_threads_latch.count() == 0) { break; } doris::MemInfo::je_purge_all_arena_dirty_pages(); + doris::MemInfo::je_purge_dirty_pages_notify.store(false, std::memory_order_relaxed); } while (true); } diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp index 0b5036a340fe1b..ec79a8f1cc3a1f 100644 --- a/be/src/util/mem_info.cpp +++ b/be/src/util/mem_info.cpp @@ -37,7 +37,6 @@ #include #include "common/config.h" -#include "common/daemon.h" #include "common/status.h" #include "gutil/strings/split.h" #include "runtime/exec_env.h" @@ -81,6 +80,9 @@ int64_t MemInfo::_s_sys_mem_available_low_water_mark = -1; int64_t MemInfo::_s_sys_mem_available_warning_water_mark = -1; int64_t MemInfo::_s_process_minor_gc_size = -1; int64_t MemInfo::_s_process_full_gc_size = -1; +std::mutex MemInfo::je_purge_dirty_pages_lock; +std::condition_variable MemInfo::je_purge_dirty_pages_cv; +std::atomic MemInfo::je_purge_dirty_pages_notify {false}; void MemInfo::refresh_allocator_mem() { #if defined(ADDRESS_SANITIZER) || defined(LEAK_SANITIZER) || defined(THREAD_SANITIZER) @@ -130,7 +132,7 @@ bool MemInfo::process_minor_gc() { std::string pre_sys_mem_available = MemInfo::sys_mem_available_str(); Defer defer {[&]() { - je_purge_dirty_pages_cv.notify_all(); + notify_je_purge_dirty_pages(); std::stringstream ss; profile->pretty_print(&ss); LOG(INFO) << fmt::format( @@ -140,7 +142,7 @@ bool MemInfo::process_minor_gc() { }}; freed_mem += CacheManager::instance()->for_each_cache_prune_stale(profile.get()); - je_purge_dirty_pages_cv.notify_all(); + notify_je_purge_dirty_pages(); if (freed_mem > _s_process_minor_gc_size) { return true; } @@ -181,7 +183,7 @@ bool MemInfo::process_full_gc() { std::string pre_sys_mem_available = MemInfo::sys_mem_available_str(); Defer defer {[&]() { - je_purge_dirty_pages_cv.notify_all(); + notify_je_purge_dirty_pages(); std::stringstream ss; profile->pretty_print(&ss); LOG(INFO) << fmt::format( @@ -191,7 +193,7 @@ bool MemInfo::process_full_gc() { }}; freed_mem += CacheManager::instance()->for_each_cache_prune_all(profile.get()); - je_purge_dirty_pages_cv.notify_all(); + notify_je_purge_dirty_pages(); if (freed_mem > _s_process_full_gc_size) { return true; } diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h index c1a7bfe8789f92..60a196fdbeb035 100644 --- a/be/src/util/mem_info.h +++ b/be/src/util/mem_info.h @@ -129,6 +129,11 @@ class MemInfo { static std::mutex je_purge_dirty_pages_lock; static std::condition_variable je_purge_dirty_pages_cv; + static std::atomic je_purge_dirty_pages_notify; + static void notify_je_purge_dirty_pages() { + je_purge_dirty_pages_notify.store(true, std::memory_order_relaxed); + je_purge_dirty_pages_cv.notify_all(); + } static inline size_t allocator_virtual_mem() { return _s_virtual_memory_used.load(std::memory_order_relaxed); From 81a76f70685804d187af02583486a1401cf5abfb Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Thu, 21 Dec 2023 22:42:22 +0800 Subject: [PATCH 5/5] 6 --- be/src/util/mem_info.h | 1 + 1 file changed, 1 insertion(+) diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h index 60a196fdbeb035..8d702ddf065679 100644 --- a/be/src/util/mem_info.h +++ b/be/src/util/mem_info.h @@ -24,6 +24,7 @@ #include #include +#include #include #if !defined(__APPLE__) || !defined(_POSIX_C_SOURCE)