From ef38c0e62a83b24b1fa862800b33e13ff730d559 Mon Sep 17 00:00:00 2001 From: Jilong Kou Date: Thu, 22 Jan 2026 17:50:30 +0800 Subject: [PATCH] Use dedicated reactor for RaftReplService timers Global timers using `all_worker` regex can compete with default worker reactors, potentially jamming high-priority IO tasks and causing deadlocks. Signed-off-by: Jilong Kou --- .gitignore | 3 + conanfile.py | 2 +- .../replication/service/raft_repl_service.cpp | 85 ++++++++++--------- 3 files changed, 51 insertions(+), 39 deletions(-) diff --git a/.gitignore b/.gitignore index e9e78180c..78b8a1f38 100644 --- a/.gitignore +++ b/.gitignore @@ -108,3 +108,6 @@ CMakeSettings.json CMakeUserPresets.json .vs/** .cache + +# claude +.claude/** diff --git a/conanfile.py b/conanfile.py index 0f468adf8..3d6747b74 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "7.2.1" + version = "7.2.2" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/lib/replication/service/raft_repl_service.cpp b/src/lib/replication/service/raft_repl_service.cpp index 603f17850..5a81b873c 100644 --- a/src/lib/replication/service/raft_repl_service.cpp +++ b/src/lib/replication/service/raft_repl_service.cpp @@ -22,6 +22,9 @@ #include "common/homestore_config.hpp" #include "common/homestore_assert.hpp" #include "replication/service/raft_repl_service.h" + +#include + #include "replication/repl_dev/raft_repl_dev.h" namespace homestore { @@ -624,49 +627,55 @@ void RaftReplService::trigger_snapshot_creation(group_id_t group_id, repl_lsn_t ////////////////////// Reaper Thread related ////////////////////////////////// void RaftReplService::start_repl_service_timers() { - // we need to explictly cancel the timers before we stop the repl_devs, but we cannot cancel a thread timer - // explictly(and exception will be threw out), so here we create a seperate gloable timer for each of them. - // Schedule the rdev garbage collector timer - LOGINFOMOD(replication, "Reaper Thread: scheduling GC every {} seconds", - HS_DYNAMIC_CONFIG(generic.repl_dev_cleanup_interval_sec)); - m_rdev_gc_timer_hdl = iomanager.schedule_global_timer( - HS_DYNAMIC_CONFIG(generic.repl_dev_cleanup_interval_sec) * 1000 * 1000 * 1000, true /* recurring */, nullptr, - iomgr::reactor_regex::all_worker, - [this](void*) { - LOGDEBUGMOD(replication, "Reaper Thread: Doing GC"); - gc_repl_reqs(); - gc_repl_devs(); - }, - true /* wait_to_schedule */); - - // Check for queued fetches at the minimum every second - uint64_t interval_ns = - std::min(HS_DYNAMIC_CONFIG(consensus.wait_data_write_timer_ms) * 1000 * 1000, 1ul * 1000 * 1000 * 1000); - m_rdev_fetch_timer_hdl = iomanager.schedule_global_timer( - interval_ns, true /* recurring */, nullptr, iomgr::reactor_regex::all_worker, - [this](void*) { fetch_pending_data(); }, true /* wait_to_schedule */); - - // Flush durable commit lsns to superblock - // FIXUP: what is the best value for flush_durable_commit_interval_ms? - m_flush_durable_commit_timer_hdl = iomanager.schedule_global_timer( - HS_DYNAMIC_CONFIG(consensus.flush_durable_commit_interval_ms) * 1000 * 1000, true /* recurring */, nullptr, - iomgr::reactor_regex::all_worker, [this](void*) { flush_durable_commit_lsn(); }, true /* wait_to_schedule */); - - m_replace_member_sync_check_timer_hdl = iomanager.schedule_global_timer( - HS_DYNAMIC_CONFIG(consensus.replace_member_sync_check_interval_ms) * 1000 * 1000, true /* recurring */, nullptr, - iomgr::reactor_regex::all_worker, [this](void*) { monitor_replace_member_replication_status(); }, - true /* wait_to_schedule */); + std::latch latch{1}; + iomanager.create_reactor("raft_repl_svc_timer", iomgr::INTERRUPT_LOOP, 1u, [this, &latch](bool is_started) { + if (is_started) { + m_reaper_fiber = iomanager.iofiber_self(); + LOGINFOMOD(replication, "Reaper Thread: scheduling GC every {} seconds", + HS_DYNAMIC_CONFIG(generic.repl_dev_cleanup_interval_sec)); + m_rdev_gc_timer_hdl = iomanager.schedule_thread_timer( + HS_DYNAMIC_CONFIG(generic.repl_dev_cleanup_interval_sec) * 1000 * 1000 * 1000, true /* recurring */, + nullptr, [this](void *) { + LOGDEBUGMOD(replication, "Reaper Thread: Doing GC"); + gc_repl_reqs(); + gc_repl_devs(); + }); + + // Check for queued fetches at the minimum every second + uint64_t interval_ns = std::min( + HS_DYNAMIC_CONFIG(consensus.wait_data_write_timer_ms) * 1000 * 1000, 1ul * 1000 * 1000 * 1000); + m_rdev_fetch_timer_hdl = iomanager.schedule_thread_timer(interval_ns, true /* recurring */, nullptr, + [this](void *) { fetch_pending_data(); }); + + // Flush durable commit lsns to superblock + // FIXUP: what is the best value for flush_durable_commit_interval_ms? + m_flush_durable_commit_timer_hdl = iomanager.schedule_thread_timer( + HS_DYNAMIC_CONFIG(consensus.flush_durable_commit_interval_ms) * 1000 * 1000, true /* recurring */, + nullptr, [this](void *) { flush_durable_commit_lsn(); }); + + m_replace_member_sync_check_timer_hdl = iomanager.schedule_thread_timer( + HS_DYNAMIC_CONFIG(consensus.replace_member_sync_check_interval_ms) * 1000 * 1000, true /* recurring */, + nullptr, [this](void *) { + monitor_replace_member_replication_status(); + }); + latch.count_down(); + } + }); + latch.wait(); } void RaftReplService::stop_repl_service_timers() { - iomanager.cancel_timer(m_rdev_gc_timer_hdl, true); - iomanager.cancel_timer(m_rdev_fetch_timer_hdl, true); - iomanager.cancel_timer(m_flush_durable_commit_timer_hdl, true); - iomanager.cancel_timer(m_replace_member_sync_check_timer_hdl, true); + iomanager.run_on_wait(m_reaper_fiber, [this]() { + LOGINFOMOD(replication, "Reaper Thread: Stopping timers"); + iomanager.cancel_timer(m_rdev_gc_timer_hdl, true); + iomanager.cancel_timer(m_rdev_fetch_timer_hdl, true); + iomanager.cancel_timer(m_flush_durable_commit_timer_hdl, true); + iomanager.cancel_timer(m_replace_member_sync_check_timer_hdl, true); + }); } -void RaftReplService::add_to_fetch_queue(cshared< RaftReplDev >& rdev, std::vector< repl_req_ptr_t > rreqs) { +void RaftReplService::add_to_fetch_queue(cshared &rdev, std::vector rreqs) { std::unique_lock lg(m_pending_fetch_mtx); m_pending_fetch_batches.push(std::make_pair(rdev, std::move(rreqs))); } @@ -674,7 +683,7 @@ void RaftReplService::add_to_fetch_queue(cshared< RaftReplDev >& rdev, std::vect void RaftReplService::fetch_pending_data() { std::unique_lock lg(m_pending_fetch_mtx); while (!m_pending_fetch_batches.empty()) { - auto const& [d, rreqs] = m_pending_fetch_batches.front(); + auto const &[d, rreqs] = m_pending_fetch_batches.front(); if (get_elapsed_time_ms(rreqs.at(0)->created_time()) < HS_DYNAMIC_CONFIG(consensus.wait_data_write_timer_ms)) { break; }