diff --git a/conanfile.py b/conanfile.py index 7c1a5d566..cfffdf0ba 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.12.1" + version = "6.12.2" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/include/homestore/replication/repl_decls.h b/src/include/homestore/replication/repl_decls.h index 90f2c67f7..54435c1c7 100644 --- a/src/include/homestore/replication/repl_decls.h +++ b/src/include/homestore/replication/repl_decls.h @@ -79,6 +79,12 @@ struct peer_info { uint64_t replication_idx_; // The elapsed time since the last successful response from this peer, set to 0 on leader uint64_t last_succ_resp_us_; + // The priority for leader election + uint32_t priority_; + // The peer is learner or not + bool is_learner_; + // The peer is new joiner or not + bool is_new_joiner_; }; struct replica_member_info { diff --git a/src/lib/common/homestore_config.fbs b/src/lib/common/homestore_config.fbs index 227fc7b9f..a661da497 100644 --- a/src/lib/common/homestore_config.fbs +++ b/src/lib/common/homestore_config.fbs @@ -284,6 +284,12 @@ table Consensus { // Reading snapshot objects will be done by a background thread asynchronously // instead of synchronous read by Raft worker threads use_bg_thread_for_snapshot_io: bool = true; + + // Maximum number of election timeout rounds to wait during a prioritized leader election process. + // Every election timeout will compare its priority with the target_priority(max priority of the peers initially) + // then decay the target_priority and wait again until its priority >= target_priority. This setting helps us to set proper priority for peers. + // 0 means all members have the same priority. + max_wait_rounds_of_priority_election: uint32 = 2; } table HomeStoreSettings { diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 5080d4689..92bc155ba 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -1168,7 +1168,10 @@ std::vector< peer_info > RaftReplDev::get_replication_status() const { for (auto const& pinfo : rep_status) { pi.emplace_back(peer_info{.id_ = boost::lexical_cast< replica_id_t >(pinfo.id_), .replication_idx_ = pinfo.last_log_idx_, - .last_succ_resp_us_ = pinfo.last_succ_resp_us_}); + .last_succ_resp_us_ = pinfo.last_succ_resp_us_, + .priority_ = pinfo.priority_, + .is_learner_ = pinfo.is_learner_, + .is_new_joiner_ = pinfo.is_new_joiner_}); } return pi; } @@ -1271,8 +1274,8 @@ nuraft::ptr< nuraft::cluster_config > RaftReplDev::load_config() { if (!js.contains("config")) { auto cluster_conf = nuraft::cs_new< nuraft::cluster_config >(); - cluster_conf->get_servers().push_back( - nuraft::cs_new< nuraft::srv_config >(m_raft_server_id, my_replica_id_str())); + cluster_conf->get_servers().push_back(nuraft::cs_new< nuraft::srv_config >( + m_raft_server_id, 0, my_replica_id_str(), "", false, raft_leader_priority)); js["config"] = serialize_cluster_config(*cluster_conf); } return deserialize_cluster_config(js["config"]); diff --git a/src/lib/replication/repl_dev/solo_repl_dev.h b/src/lib/replication/repl_dev/solo_repl_dev.h index 0fcbeb2aa..932e74511 100644 --- a/src/lib/replication/repl_dev/solo_repl_dev.h +++ b/src/lib/replication/repl_dev/solo_repl_dev.h @@ -53,7 +53,12 @@ class SoloReplDev : public ReplDev { bool is_leader() const override { return true; } replica_id_t get_leader_id() const override { return m_group_id; } std::vector< peer_info > get_replication_status() const override { - return std::vector< peer_info >{peer_info{.id_ = m_group_id, .replication_idx_ = 0, .last_succ_resp_us_ = 0}}; + return std::vector< peer_info >{peer_info{.id_ = m_group_id, + .replication_idx_ = 0, + .last_succ_resp_us_ = 0, + .priority_ = 1, + .is_learner_ = false, + .is_new_joiner_ = false}}; } bool is_ready_for_traffic() const override { return true; } void purge() override {} diff --git a/src/lib/replication/service/generic_repl_svc.cpp b/src/lib/replication/service/generic_repl_svc.cpp index e2932bef7..3fb0357fc 100644 --- a/src/lib/replication/service/generic_repl_svc.cpp +++ b/src/lib/replication/service/generic_repl_svc.cpp @@ -117,8 +117,8 @@ void SoloReplService::stop() { hs()->data_service().stop(); } -AsyncReplResult< shared< ReplDev > > SoloReplService::create_repl_dev(group_id_t group_id, - std::set< replica_id_t > const& members) { +AsyncReplResult< shared< ReplDev > > +SoloReplService::create_repl_dev(group_id_t group_id, std::set< replica_id_t > const& members) { superblk< repl_dev_superblk > rd_sb{get_meta_blk_name()}; rd_sb.create(); rd_sb->group_id = group_id; diff --git a/src/lib/replication/service/raft_repl_service.cpp b/src/lib/replication/service/raft_repl_service.cpp index d70593a94..c1016e452 100644 --- a/src/lib/replication/service/raft_repl_service.cpp +++ b/src/lib/replication/service/raft_repl_service.cpp @@ -59,6 +59,17 @@ ReplServiceError RaftReplService::to_repl_error(nuraft::cmd_result_code code) { return ret; } +// NuRaft priority decay coefficient is set to 0.8(currently not configurable). For more details, please refer to +// https://github.com/eBay/NuRaft/blob/master/docs/leader_election_priority.md +int32_t RaftReplService::compute_raft_follower_priority() { + auto max_wait_round = std::min(raft_priority_election_round_upper_limit, + HS_DYNAMIC_CONFIG(consensus.max_wait_rounds_of_priority_election)); + if (max_wait_round == 0) { return raft_leader_priority; } + auto priority = 1 + static_cast< int32_t >( + std::ceil(raft_leader_priority * std::pow(raft_priority_decay_coefficient, max_wait_round))); + return priority; +} + RaftReplService::RaftReplService(cshared< ReplApplication >& repl_app) : GenericReplService{repl_app} { m_config_sb_bufs.reserve(100); meta_service().register_handler( @@ -344,14 +355,18 @@ AsyncReplResult< shared< ReplDev > > RaftReplService::create_repl_dev(group_id_t return make_async_error< shared< ReplDev > >(to_repl_error(status.error())); } + auto follower_priority = compute_raft_follower_priority(); + auto my_id = m_repl_app->get_my_repl_id(); for (auto& member : members) { if (member == my_id) { continue; } // Skip myself do { - auto const result = m_msg_mgr->add_member(group_id, member).get(); + auto srv_config = nuraft::srv_config(nuraft_mesg::to_server_id(member), 0, boost::uuids::to_string(member), "", + false, follower_priority); + auto const result = m_msg_mgr->add_member(group_id, srv_config).get(); if (result) { - LOGINFOMOD(replication, "Groupid={}, new member={} added", boost::uuids::to_string(group_id), - boost::uuids::to_string(member)); + LOGINFOMOD(replication, "Groupid={}, new member={} added with priority={}", boost::uuids::to_string(group_id), + boost::uuids::to_string(member), follower_priority); break; } else if (result.error() != nuraft::CONFIG_CHANGING) { LOGWARNMOD(replication, "Groupid={}, add member={} failed with error={}", diff --git a/src/lib/replication/service/raft_repl_service.h b/src/lib/replication/service/raft_repl_service.h index 953ba95e9..87479aa01 100644 --- a/src/lib/replication/service/raft_repl_service.h +++ b/src/lib/replication/service/raft_repl_service.h @@ -33,6 +33,9 @@ namespace homestore { constexpr auto cert_change_timeout = std::chrono::seconds(1200); constexpr auto cert_check_sleep = std::chrono::seconds(1); +constexpr int32_t raft_leader_priority = 100; +constexpr double raft_priority_decay_coefficient = 0.8; +constexpr uint32_t raft_priority_election_round_upper_limit = 5; struct repl_dev_superblk; class RaftReplDev; @@ -57,6 +60,7 @@ class RaftReplService : public GenericReplService, ~RaftReplService() override; static ReplServiceError to_repl_error(nuraft::cmd_result_code code); + int32_t compute_raft_follower_priority(); ///////////////////// Overrides of nuraft_mesg::MessagingApplication //////////////////// std::string lookup_peer(nuraft_mesg::peer_id_t const&) override; diff --git a/src/tests/test_common/hs_repl_test_common.hpp b/src/tests/test_common/hs_repl_test_common.hpp index 4393b13d5..92ff45a69 100644 --- a/src/tests/test_common/hs_repl_test_common.hpp +++ b/src/tests/test_common/hs_repl_test_common.hpp @@ -17,6 +17,8 @@ */ #pragma once +#include "raft_repl_test_base.hpp" + #include #include #include @@ -35,6 +37,8 @@ #include #include "test_common/homestore_test_common.hpp" +#include + SISL_OPTION_GROUP(test_repl_common_setup, (replicas, "", "replicas", "Total number of replicas", ::cxxopts::value< uint32_t >()->default_value("3"), "number"), @@ -298,6 +302,19 @@ class HSReplTestHelper : public HSTestHelper { auto v = hs()->repl_service().create_repl_dev(repl_group_id, members).get(); ASSERT_EQ(v.hasValue(), true) << "Error in creating repl dev for group_id=" << boost::uuids::to_string(repl_group_id).c_str(); + auto& raftService = dynamic_cast< RaftReplService& >(hs()->repl_service()); + auto follower_priority = raftService.compute_raft_follower_priority(); + auto repl_dev = v.value(); + ASSERT_EQ(my_replica_id_, repl_dev->get_leader_id()); + auto peer_info = repl_dev->get_replication_status(); + for (auto pinfo : peer_info) { + LOGINFO("Replica={} has priority={}", boost::uuids::to_string(pinfo.id_), pinfo.priority_); + if (pinfo.id_ == my_replica_id_) { + ASSERT_EQ(raft_leader_priority, pinfo.priority_); + } else { + ASSERT_EQ(follower_priority, pinfo.priority_); + } + } } } diff --git a/src/tests/test_raft_repl_dev.cpp b/src/tests/test_raft_repl_dev.cpp index 6e21a64e8..f6d458943 100644 --- a/src/tests/test_raft_repl_dev.cpp +++ b/src/tests/test_raft_repl_dev.cpp @@ -484,6 +484,80 @@ TEST_F(RaftReplDevTest, LargeDataWrite) { g_helper->sync_for_cleanup_start(); } +TEST_F(RaftReplDevTest, PriorityLeaderElection) { + LOGINFO("Homestore replica={} setup completed", g_helper->replica_num()); + g_helper->sync_for_test_start(); + uint64_t entries_per_attempt = SISL_OPTIONS["num_io"].as< uint64_t >(); + if (g_helper->replica_num() == 0) { + auto leader = this->wait_and_get_leader_id(); + ASSERT_EQ(leader, g_helper->my_replica_id()); + } + this->write_on_leader(entries_per_attempt, true /* wait_for_commit */); + + g_helper->sync_for_verify_start(); + LOGINFO("Validate all data written so far by reading them"); + this->validate_data(); + g_helper->sync_for_cleanup_start(); + + LOGINFO("Restart leader"); + if (g_helper->replica_num() == 0) { g_helper->restart_homestore(); } + g_helper->sync_for_test_start(); + + LOGINFO("Validate leader switched"); + std::this_thread::sleep_for(std::chrono::milliseconds{500}); + auto leader = this->wait_and_get_leader_id(); + if (g_helper->replica_num() == 0) { ASSERT_NE(leader, g_helper->my_replica_id()); } + g_helper->sync_for_verify_start(); + + if (leader == g_helper->my_replica_id()) { + LOGINFO("Resign and trigger a priority leader election"); + // resign and trigger a priority leader election + g_helper->restart_homestore(); + } + g_helper->sync_for_test_start(); + + std::this_thread::sleep_for(std::chrono::milliseconds{500}); + leader = this->wait_and_get_leader_id(); + LOGINFO("Validate leader switched back to initial replica"); + if (g_helper->replica_num() == 0) { ASSERT_EQ(leader, g_helper->my_replica_id()); } + g_helper->sync_for_verify_start(); + + LOGINFO("Post restart write the data again on the leader"); + this->write_on_leader(entries_per_attempt, true /* wait_for_commit */); + + LOGINFO("Validate all data written (including pre-restart data) by reading them"); + this->validate_data(); + g_helper->sync_for_cleanup_start(); +} + +TEST_F(RaftReplDevTest, ComputePriority) { + g_helper->sync_for_test_start(); + auto& raftService = dynamic_cast< RaftReplService& >(hs()->repl_service()); + + HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { s.consensus.max_wait_rounds_of_priority_election = 0; }); + HS_SETTINGS_FACTORY().save(); + ASSERT_EQ(raftService.compute_raft_follower_priority(), raft_leader_priority); + + for (auto i = 1; i <= int(raft_priority_election_round_upper_limit); i++) { + HS_SETTINGS_FACTORY().modifiable_settings( + [i](auto& s) { s.consensus.max_wait_rounds_of_priority_election = i; }); + HS_SETTINGS_FACTORY().save(); + auto follower_priority = raftService.compute_raft_follower_priority(); + // Simulate nuraft algorithm + auto decayed_priority = raft_leader_priority; + for (auto j = 1; j <= i; j++) { + int gap = std::max((int)10, decayed_priority / 5); + decayed_priority = std::max(1, decayed_priority - gap); + } + LOGINFO("Follower priority={} decayed_priority={}", follower_priority, decayed_priority); + ASSERT_TRUE(follower_priority >= decayed_priority); + } + // Set back to default value + HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { s.consensus.max_wait_rounds_of_priority_election = 2; }); + HS_SETTINGS_FACTORY().save(); + g_helper->sync_for_cleanup_start(); +} + int main(int argc, char* argv[]) { int parsed_argc = argc; char** orig_argv = argv;