Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.12.1"
version = "6.12.2"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
6 changes: 6 additions & 0 deletions src/include/homestore/replication/repl_decls.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ struct peer_info {
uint64_t replication_idx_;
// The elapsed time since the last successful response from this peer, set to 0 on leader
uint64_t last_succ_resp_us_;
// The priority for leader election
uint32_t priority_;
// The peer is learner or not
bool is_learner_;
// The peer is new joiner or not
bool is_new_joiner_;
};

struct replica_member_info {
Expand Down
6 changes: 6 additions & 0 deletions src/lib/common/homestore_config.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,12 @@ table Consensus {
// Reading snapshot objects will be done by a background thread asynchronously
// instead of synchronous read by Raft worker threads
use_bg_thread_for_snapshot_io: bool = true;

// Maximum number of election timeout rounds to wait during a prioritized leader election process.
// Every election timeout will compare its priority with the target_priority(max priority of the peers initially)
// then decay the target_priority and wait again until its priority >= target_priority. This setting helps us to set proper priority for peers.
// 0 means all members have the same priority.
max_wait_rounds_of_priority_election: uint32 = 2;
}

table HomeStoreSettings {
Expand Down
9 changes: 6 additions & 3 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1168,7 +1168,10 @@ std::vector< peer_info > RaftReplDev::get_replication_status() const {
for (auto const& pinfo : rep_status) {
pi.emplace_back(peer_info{.id_ = boost::lexical_cast< replica_id_t >(pinfo.id_),
.replication_idx_ = pinfo.last_log_idx_,
.last_succ_resp_us_ = pinfo.last_succ_resp_us_});
.last_succ_resp_us_ = pinfo.last_succ_resp_us_,
.priority_ = pinfo.priority_,
.is_learner_ = pinfo.is_learner_,
.is_new_joiner_ = pinfo.is_new_joiner_});
}
return pi;
}
Expand Down Expand Up @@ -1271,8 +1274,8 @@ nuraft::ptr< nuraft::cluster_config > RaftReplDev::load_config() {

if (!js.contains("config")) {
auto cluster_conf = nuraft::cs_new< nuraft::cluster_config >();
cluster_conf->get_servers().push_back(
nuraft::cs_new< nuraft::srv_config >(m_raft_server_id, my_replica_id_str()));
cluster_conf->get_servers().push_back(nuraft::cs_new< nuraft::srv_config >(
m_raft_server_id, 0, my_replica_id_str(), "", false, raft_leader_priority));
js["config"] = serialize_cluster_config(*cluster_conf);
}
return deserialize_cluster_config(js["config"]);
Expand Down
7 changes: 6 additions & 1 deletion src/lib/replication/repl_dev/solo_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@ class SoloReplDev : public ReplDev {
bool is_leader() const override { return true; }
replica_id_t get_leader_id() const override { return m_group_id; }
std::vector< peer_info > get_replication_status() const override {
return std::vector< peer_info >{peer_info{.id_ = m_group_id, .replication_idx_ = 0, .last_succ_resp_us_ = 0}};
return std::vector< peer_info >{peer_info{.id_ = m_group_id,
.replication_idx_ = 0,
.last_succ_resp_us_ = 0,
.priority_ = 1,
.is_learner_ = false,
.is_new_joiner_ = false}};
}
bool is_ready_for_traffic() const override { return true; }
void purge() override {}
Expand Down
4 changes: 2 additions & 2 deletions src/lib/replication/service/generic_repl_svc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ void SoloReplService::stop() {
hs()->data_service().stop();
}

AsyncReplResult< shared< ReplDev > > SoloReplService::create_repl_dev(group_id_t group_id,
std::set< replica_id_t > const& members) {
AsyncReplResult< shared< ReplDev > >
SoloReplService::create_repl_dev(group_id_t group_id, std::set< replica_id_t > const& members) {
superblk< repl_dev_superblk > rd_sb{get_meta_blk_name()};
rd_sb.create();
rd_sb->group_id = group_id;
Expand Down
21 changes: 18 additions & 3 deletions src/lib/replication/service/raft_repl_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,17 @@ ReplServiceError RaftReplService::to_repl_error(nuraft::cmd_result_code code) {
return ret;
}

// NuRaft priority decay coefficient is set to 0.8(currently not configurable). For more details, please refer to
// https://github.com/eBay/NuRaft/blob/master/docs/leader_election_priority.md
int32_t RaftReplService::compute_raft_follower_priority() {
auto max_wait_round = std::min(raft_priority_election_round_upper_limit,
HS_DYNAMIC_CONFIG(consensus.max_wait_rounds_of_priority_election));
if (max_wait_round == 0) { return raft_leader_priority; }
auto priority = 1 + static_cast< int32_t >(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a nice formula

std::ceil(raft_leader_priority * std::pow(raft_priority_decay_coefficient, max_wait_round)));
return priority;
}

RaftReplService::RaftReplService(cshared< ReplApplication >& repl_app) : GenericReplService{repl_app} {
m_config_sb_bufs.reserve(100);
meta_service().register_handler(
Expand Down Expand Up @@ -344,14 +355,18 @@ AsyncReplResult< shared< ReplDev > > RaftReplService::create_repl_dev(group_id_t
return make_async_error< shared< ReplDev > >(to_repl_error(status.error()));
}

auto follower_priority = compute_raft_follower_priority();

auto my_id = m_repl_app->get_my_repl_id();
for (auto& member : members) {
if (member == my_id) { continue; } // Skip myself
do {
auto const result = m_msg_mgr->add_member(group_id, member).get();
auto srv_config = nuraft::srv_config(nuraft_mesg::to_server_id(member), 0, boost::uuids::to_string(member), "",
false, follower_priority);
auto const result = m_msg_mgr->add_member(group_id, srv_config).get();
if (result) {
LOGINFOMOD(replication, "Groupid={}, new member={} added", boost::uuids::to_string(group_id),
boost::uuids::to_string(member));
LOGINFOMOD(replication, "Groupid={}, new member={} added with priority={}", boost::uuids::to_string(group_id),
boost::uuids::to_string(member), follower_priority);
break;
} else if (result.error() != nuraft::CONFIG_CHANGING) {
LOGWARNMOD(replication, "Groupid={}, add member={} failed with error={}",
Expand Down
4 changes: 4 additions & 0 deletions src/lib/replication/service/raft_repl_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ namespace homestore {

constexpr auto cert_change_timeout = std::chrono::seconds(1200);
constexpr auto cert_check_sleep = std::chrono::seconds(1);
constexpr int32_t raft_leader_priority = 100;
constexpr double raft_priority_decay_coefficient = 0.8;
constexpr uint32_t raft_priority_election_round_upper_limit = 5;

struct repl_dev_superblk;
class RaftReplDev;
Expand All @@ -57,6 +60,7 @@ class RaftReplService : public GenericReplService,
~RaftReplService() override;

static ReplServiceError to_repl_error(nuraft::cmd_result_code code);
int32_t compute_raft_follower_priority();

///////////////////// Overrides of nuraft_mesg::MessagingApplication ////////////////////
std::string lookup_peer(nuraft_mesg::peer_id_t const&) override;
Expand Down
17 changes: 17 additions & 0 deletions src/tests/test_common/hs_repl_test_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/

#pragma once
#include "raft_repl_test_base.hpp"

#include <mutex>
#include <condition_variable>
#include <map>
Expand All @@ -35,6 +37,8 @@
#include <sisl/grpc/rpc_client.hpp>
#include "test_common/homestore_test_common.hpp"

#include <replication/service/raft_repl_service.h>

SISL_OPTION_GROUP(test_repl_common_setup,
(replicas, "", "replicas", "Total number of replicas",
::cxxopts::value< uint32_t >()->default_value("3"), "number"),
Expand Down Expand Up @@ -298,6 +302,19 @@ class HSReplTestHelper : public HSTestHelper {
auto v = hs()->repl_service().create_repl_dev(repl_group_id, members).get();
ASSERT_EQ(v.hasValue(), true)
<< "Error in creating repl dev for group_id=" << boost::uuids::to_string(repl_group_id).c_str();
auto& raftService = dynamic_cast< RaftReplService& >(hs()->repl_service());
auto follower_priority = raftService.compute_raft_follower_priority();
auto repl_dev = v.value();
ASSERT_EQ(my_replica_id_, repl_dev->get_leader_id());
auto peer_info = repl_dev->get_replication_status();
for (auto pinfo : peer_info) {
LOGINFO("Replica={} has priority={}", boost::uuids::to_string(pinfo.id_), pinfo.priority_);
if (pinfo.id_ == my_replica_id_) {
ASSERT_EQ(raft_leader_priority, pinfo.priority_);
} else {
ASSERT_EQ(follower_priority, pinfo.priority_);
}
}
}
}

Expand Down
74 changes: 74 additions & 0 deletions src/tests/test_raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,80 @@ TEST_F(RaftReplDevTest, LargeDataWrite) {
g_helper->sync_for_cleanup_start();
}

TEST_F(RaftReplDevTest, PriorityLeaderElection) {
LOGINFO("Homestore replica={} setup completed", g_helper->replica_num());
g_helper->sync_for_test_start();
uint64_t entries_per_attempt = SISL_OPTIONS["num_io"].as< uint64_t >();
if (g_helper->replica_num() == 0) {
auto leader = this->wait_and_get_leader_id();
ASSERT_EQ(leader, g_helper->my_replica_id());
}
this->write_on_leader(entries_per_attempt, true /* wait_for_commit */);

g_helper->sync_for_verify_start();
LOGINFO("Validate all data written so far by reading them");
this->validate_data();
g_helper->sync_for_cleanup_start();

LOGINFO("Restart leader");
if (g_helper->replica_num() == 0) { g_helper->restart_homestore(); }
g_helper->sync_for_test_start();

LOGINFO("Validate leader switched");
std::this_thread::sleep_for(std::chrono::milliseconds{500});
auto leader = this->wait_and_get_leader_id();
if (g_helper->replica_num() == 0) { ASSERT_NE(leader, g_helper->my_replica_id()); }
g_helper->sync_for_verify_start();

if (leader == g_helper->my_replica_id()) {
LOGINFO("Resign and trigger a priority leader election");
// resign and trigger a priority leader election
g_helper->restart_homestore();
}
g_helper->sync_for_test_start();

std::this_thread::sleep_for(std::chrono::milliseconds{500});
leader = this->wait_and_get_leader_id();
LOGINFO("Validate leader switched back to initial replica");
if (g_helper->replica_num() == 0) { ASSERT_EQ(leader, g_helper->my_replica_id()); }
g_helper->sync_for_verify_start();

LOGINFO("Post restart write the data again on the leader");
this->write_on_leader(entries_per_attempt, true /* wait_for_commit */);

LOGINFO("Validate all data written (including pre-restart data) by reading them");
this->validate_data();
g_helper->sync_for_cleanup_start();
}

TEST_F(RaftReplDevTest, ComputePriority) {
g_helper->sync_for_test_start();
auto& raftService = dynamic_cast< RaftReplService& >(hs()->repl_service());

HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { s.consensus.max_wait_rounds_of_priority_election = 0; });
HS_SETTINGS_FACTORY().save();
ASSERT_EQ(raftService.compute_raft_follower_priority(), raft_leader_priority);

for (auto i = 1; i <= int(raft_priority_election_round_upper_limit); i++) {
HS_SETTINGS_FACTORY().modifiable_settings(
[i](auto& s) { s.consensus.max_wait_rounds_of_priority_election = i; });
HS_SETTINGS_FACTORY().save();
auto follower_priority = raftService.compute_raft_follower_priority();
// Simulate nuraft algorithm
auto decayed_priority = raft_leader_priority;
for (auto j = 1; j <= i; j++) {
int gap = std::max((int)10, decayed_priority / 5);
decayed_priority = std::max(1, decayed_priority - gap);
}
LOGINFO("Follower priority={} decayed_priority={}", follower_priority, decayed_priority);
ASSERT_TRUE(follower_priority >= decayed_priority);
}
// Set back to default value
HS_SETTINGS_FACTORY().modifiable_settings([](auto& s) { s.consensus.max_wait_rounds_of_priority_election = 2; });
HS_SETTINGS_FACTORY().save();
g_helper->sync_for_cleanup_start();
}

int main(int argc, char* argv[]) {
int parsed_argc = argc;
char** orig_argv = argv;
Expand Down
Loading