From 7f339301f798721fccfb8932367e5abdb5a64cc1 Mon Sep 17 00:00:00 2001 From: yuwmao Date: Thu, 21 Nov 2024 22:47:29 +0800 Subject: [PATCH 1/4] Rename snapshot_data to snapshot_obj --- src/include/homestore/replication/repl_dev.h | 14 ++++----- .../repl_dev/raft_state_machine.cpp | 8 ++--- src/tests/test_common/raft_repl_test_base.hpp | 30 +++++++++---------- src/tests/test_solo_repl_dev.cpp | 4 +-- 4 files changed, 28 insertions(+), 28 deletions(-) diff --git a/src/include/homestore/replication/repl_dev.h b/src/include/homestore/replication/repl_dev.h index 20e9a170f..431b020a0 100644 --- a/src/include/homestore/replication/repl_dev.h +++ b/src/include/homestore/replication/repl_dev.h @@ -112,9 +112,9 @@ class nuraft_snapshot_context : public snapshot_context { nuraft::ptr< nuraft::snapshot > snapshot_; }; -struct snapshot_data { +struct snapshot_obj { void* user_ctx{nullptr}; - int64_t offset{0}; + uint64_t offset{0}; sisl::io_blob_safe blob; bool is_first_obj{false}; bool is_last_obj{false}; @@ -368,16 +368,16 @@ class ReplDevListener { /// uses offset given by the follower to the know the current state of the follower. /// Leader sends the snapshot data to the follower in batch. This callback is called multiple /// times on the leader till all the data is transferred to the follower. is_last_obj in - /// snapshot_data will be true once all the data has been trasnferred. After this the raft on + /// snapshot_obj will be true once all the data has been trasnferred. After this the raft on /// the follower side can do the incremental resync. - virtual int read_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) = 0; + virtual int read_snapshot_obj(shared< snapshot_context > context, shared< snapshot_obj > snp_obj) = 0; /// @brief Called on the follower when the leader sends the data during the baseline resyc. - /// is_last_obj in in snapshot_data will be true once all the data has been transfered. + /// is_last_obj in in snapshot_obj will be true once all the data has been transfered. /// After this the raft on the follower side can do the incremental resync. - virtual void write_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) = 0; + virtual void write_snapshot_obj(shared< snapshot_context > context, shared< snapshot_obj > snp_obj) = 0; - /// @brief Free up user-defined context inside the snapshot_data that is allocated during read_snapshot_data. + /// @brief Free up user-defined context inside the snapshot_obj that is allocated during read_snapshot_obj. virtual void free_user_snp_ctx(void*& user_snp_ctx) = 0; private: diff --git a/src/lib/replication/repl_dev/raft_state_machine.cpp b/src/lib/replication/repl_dev/raft_state_machine.cpp index 2047a3b28..66b95f145 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.cpp +++ b/src/lib/replication/repl_dev/raft_state_machine.cpp @@ -298,13 +298,13 @@ void RaftStateMachine::create_snapshot(nuraft::snapshot& s, nuraft::async_result int RaftStateMachine::read_logical_snp_obj(nuraft::snapshot& s, void*& user_ctx, ulong obj_id, raft_buf_ptr_t& data_out, bool& is_last_obj) { auto snp_ctx = std::make_shared< nuraft_snapshot_context >(s); - auto snp_data = std::make_shared< snapshot_data >(); + auto snp_data = std::make_shared< snapshot_obj >(); snp_data->user_ctx = user_ctx; snp_data->offset = obj_id; snp_data->is_last_obj = is_last_obj; // Listener will read the snapshot data and we pass through the same. - int ret = m_rd.m_listener->read_snapshot_data(snp_ctx, snp_data); + int ret = m_rd.m_listener->read_snapshot_obj(snp_ctx, snp_data); if (ret < 0) return ret; // Update user_ctx and whether is_last_obj @@ -321,7 +321,7 @@ int RaftStateMachine::read_logical_snp_obj(nuraft::snapshot& s, void*& user_ctx, void RaftStateMachine::save_logical_snp_obj(nuraft::snapshot& s, ulong& obj_id, nuraft::buffer& data, bool is_first_obj, bool is_last_obj) { auto snp_ctx = std::make_shared< nuraft_snapshot_context >(s); - auto snp_data = std::make_shared< snapshot_data >(); + auto snp_data = std::make_shared< snapshot_obj >(); snp_data->offset = obj_id; snp_data->is_first_obj = is_first_obj; snp_data->is_last_obj = is_last_obj; @@ -331,7 +331,7 @@ void RaftStateMachine::save_logical_snp_obj(nuraft::snapshot& s, ulong& obj_id, std::memcpy(blob.bytes(), data.data_begin(), data.size()); snp_data->blob = std::move(blob); - m_rd.m_listener->write_snapshot_data(snp_ctx, snp_data); + m_rd.m_listener->write_snapshot_obj(snp_ctx, snp_data); // Update the object offset. obj_id = snp_data->offset; diff --git a/src/tests/test_common/raft_repl_test_base.hpp b/src/tests/test_common/raft_repl_test_base.hpp index 889ab72bb..e40511e73 100644 --- a/src/tests/test_common/raft_repl_test_base.hpp +++ b/src/tests/test_common/raft_repl_test_base.hpp @@ -182,7 +182,7 @@ class TestReplicatedDB : public homestore::ReplDevListener { return make_async_success<>(); } - int read_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override { + int read_snapshot_obj(shared< snapshot_context > context, shared< snapshot_obj > snp_data) override { auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); if (snp_data->offset == 0) { @@ -195,37 +195,37 @@ class TestReplicatedDB : public homestore::ReplDevListener { } int64_t next_lsn = snp_data->offset; - std::vector< KeyValuePair > kv_snapshot_data; + std::vector< KeyValuePair > kv_snapshot_obj; // we can not use find to get the next element, since if the next lsn is a config lsn , it will not be put into // lsn_index_ and as a result, the find will return the end of the map. so here we use lower_bound to get the // first element to be read and transfered. for (auto iter = lsn_index_.lower_bound(next_lsn); iter != lsn_index_.end(); iter++) { auto& v = iter->second; - kv_snapshot_data.emplace_back(Key{v.id_}, v); + kv_snapshot_obj.emplace_back(Key{v.id_}, v); LOGTRACEMOD(replication, "[Replica={}] Read logical snapshot callback fetching lsn={} size={} pattern={}", g_helper->replica_num(), v.lsn_, v.data_size_, v.data_pattern_); - if (kv_snapshot_data.size() >= 10) { break; } + if (kv_snapshot_obj.size() >= 10) { break; } } - if (kv_snapshot_data.size() == 0) { + if (kv_snapshot_obj.size() == 0) { snp_data->is_last_obj = true; LOGINFOMOD(replication, "Snapshot is_last_obj is true"); return 0; } - int64_t kv_snapshot_data_size = sizeof(KeyValuePair) * kv_snapshot_data.size(); - sisl::io_blob_safe blob{static_cast< uint32_t >(kv_snapshot_data_size)}; - std::memcpy(blob.bytes(), kv_snapshot_data.data(), kv_snapshot_data_size); + int64_t kv_snapshot_obj_size = sizeof(KeyValuePair) * kv_snapshot_obj.size(); + sisl::io_blob_safe blob{static_cast< uint32_t >(kv_snapshot_obj_size)}; + std::memcpy(blob.bytes(), kv_snapshot_obj.data(), kv_snapshot_obj_size); snp_data->blob = std::move(blob); snp_data->is_last_obj = false; LOGINFOMOD(replication, "[Replica={}] Read logical snapshot callback obj_id={} term={} idx={} num_items={}", g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(), - kv_snapshot_data.size()); + kv_snapshot_obj.size()); return 0; } - void snapshot_data_write(uint64_t data_size, uint64_t data_pattern, MultiBlkId& out_blkids) { + void snapshot_obj_write(uint64_t data_size, uint64_t data_pattern, MultiBlkId& out_blkids) { auto block_size = SISL_OPTIONS["block_size"].as< uint32_t >(); auto write_sgs = test_common::HSTestHelper::create_sgs(data_size, block_size, data_pattern); auto fut = homestore::data_service().async_alloc_write(write_sgs, blk_alloc_hints{}, out_blkids); @@ -235,7 +235,7 @@ class TestReplicatedDB : public homestore::ReplDevListener { } } - void write_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override { + void write_snapshot_obj(shared< snapshot_context > context, shared< snapshot_obj > snp_data) override { auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); auto last_committed_idx = std::dynamic_pointer_cast< RaftReplDev >(repl_dev())->raft_server()->get_committed_log_idx(); @@ -246,10 +246,10 @@ class TestReplicatedDB : public homestore::ReplDevListener { return; } - size_t kv_snapshot_data_size = snp_data->blob.size(); - if (kv_snapshot_data_size == 0) return; + size_t kv_snapshot_obj_size = snp_data->blob.size(); + if (kv_snapshot_obj_size == 0) return; - size_t num_items = kv_snapshot_data_size / sizeof(KeyValuePair); + size_t num_items = kv_snapshot_obj_size / sizeof(KeyValuePair); std::unique_lock lk(db_mtx_); auto ptr = r_cast< const KeyValuePair* >(snp_data->blob.bytes()); for (size_t i = 0; i < num_items; i++) { @@ -261,7 +261,7 @@ class TestReplicatedDB : public homestore::ReplDevListener { // Write to data service and inmem map. MultiBlkId out_blkids; if (value.data_size_ != 0) { - snapshot_data_write(value.data_size_, value.data_pattern_, out_blkids); + snapshot_obj_write(value.data_size_, value.data_pattern_, out_blkids); value.blkid_ = out_blkids; } inmem_db_.insert_or_assign(key, value); diff --git a/src/tests/test_solo_repl_dev.cpp b/src/tests/test_solo_repl_dev.cpp index 1b990d592..eaec0ff1e 100644 --- a/src/tests/test_solo_repl_dev.cpp +++ b/src/tests/test_solo_repl_dev.cpp @@ -111,10 +111,10 @@ class SoloReplDevTest : public testing::Test { AsyncReplResult<> create_snapshot(shared< snapshot_context > context) override { return make_async_success<>(); } - int read_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override { + int read_snapshot_obj(shared< snapshot_context > context, shared< snapshot_obj > snp_data) override { return 0; } - void write_snapshot_data(shared< snapshot_context > context, shared< snapshot_data > snp_data) override {} + void write_snapshot_obj(shared< snapshot_context > context, shared< snapshot_obj > snp_data) override {} bool apply_snapshot(shared< snapshot_context > context) override { return true; } shared< snapshot_context > last_snapshot() override { return nullptr; } void free_user_snp_ctx(void*& user_snp_ctx) override {} From 9a90a69ff536ab187fb55d9e58f198dd1a94e484 Mon Sep 17 00:00:00 2001 From: yuwmao Date: Mon, 25 Nov 2024 17:28:26 +0800 Subject: [PATCH 2/4] Support Baseline resync For Nuraft baseline resync, we separate the process into two layers: HomeStore layer and Application layer. We use the first bit of the obj_id to indicate the message type: 0 is for HS, 1 is for Application. In the HomeStore layer, leader needs to transmit the DSN to the follower, this is intended to handle the following case: 1. Leader sends snapshot at LSN T1 to follower F1. 2. F1 fully receives the snapshot and now at T1. 3. Leader yield its leadership, F1 elected as leader. In this sequence the incremental resync will not kicked in to update the m_next_dsn, and as result, duplication may occur. --- src/include/homestore/replication/repl_dev.h | 11 +++++++ .../replication/repl_dev/raft_repl_dev.cpp | 31 +++++++++++++++++++ src/lib/replication/repl_dev/raft_repl_dev.h | 2 ++ .../repl_dev/raft_state_machine.cpp | 14 +++++++++ .../replication/repl_dev/raft_state_machine.h | 2 ++ src/tests/test_common/raft_repl_test_base.hpp | 24 ++++++++++++-- 6 files changed, 81 insertions(+), 3 deletions(-) diff --git a/src/include/homestore/replication/repl_dev.h b/src/include/homestore/replication/repl_dev.h index 431b020a0..30b437182 100644 --- a/src/include/homestore/replication/repl_dev.h +++ b/src/include/homestore/replication/repl_dev.h @@ -46,6 +46,10 @@ VENUM(journal_type_t, uint16_t, HS_CTRL_REPLACE = 3, // Control message to replace a member ) +// magic num comes from the first 8 bytes of 'echo homestore_resync_data | md5sum' +static constexpr uint64_t HOMESTORE_RESYNC_DATA_MAGIC = 0xa65dbd27c213f327; +static constexpr uint32_t HOMESTORE_RESYNC_DATA_PROTOCOL_VERSION_V1 = 0x01; + struct repl_key { int32_t server_id{0}; // Server Id which this req is originated from uint64_t term; // RAFT term number @@ -120,6 +124,13 @@ struct snapshot_obj { bool is_last_obj{false}; }; +struct snp_repl_dev_data { + uint64_t magic_num{HOMESTORE_RESYNC_DATA_MAGIC}; + uint32_t protocol_version{HOMESTORE_RESYNC_DATA_PROTOCOL_VERSION_V1}; + uint32_t crc{0}; + uint64_t dsn{0}; +}; + struct repl_journal_entry; struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::thread_safe_counter >, sisl::ObjLifeCounter< repl_req_ctx > { diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 2d93c4070..7ec8d5285 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -1491,6 +1491,37 @@ void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx handle_commit(rreq, true /* recovery */); } +void RaftReplDev::create_snp_resync_data(raft_buf_ptr_t& data_out) { + snp_repl_dev_data msg; + auto msg_size = sizeof(snp_repl_dev_data); + msg.dsn = m_next_dsn; + auto crc = crc32_ieee(0, reinterpret_cast< const unsigned char* >(&msg), msg_size); + RD_LOGD("create snapshot resync msg, dsn={}, crc={}", msg.dsn, crc); + msg.crc = crc; + data_out = nuraft::buffer::alloc(msg_size); + std::memcpy(data_out->data_begin(), &msg, msg_size); +} + +bool RaftReplDev::apply_snp_resync_data(nuraft::buffer& data) { + auto msg = r_cast< snp_repl_dev_data* >(data.data_begin()); + if (msg->magic_num != HOMESTORE_RESYNC_DATA_MAGIC || msg->protocol_version != + HOMESTORE_RESYNC_DATA_PROTOCOL_VERSION_V1) { + RD_LOGE("Snapshot resync data validation failed, magic={}, version={}", msg->magic_num, msg->protocol_version); + return false; + } + auto received_crc = msg->crc; + msg->crc = 0; + RD_LOGD("received snapshot resync msg, dsn={}, crc={}, received crc={}", msg->dsn, msg->crc, received_crc); + auto computed_crc = crc32_ieee(0, reinterpret_cast< const unsigned char* >(msg), + sizeof(snp_repl_dev_data)); + if (received_crc != computed_crc) { + RD_LOGE("Snapshot resync data crc mismatch, received_crc={}, computed_crc={}", received_crc, computed_crc); + return false; + } + m_next_dsn = msg->dsn; + return true; +} + void RaftReplDev::on_restart() { m_listener->on_restart(); } bool RaftReplDev::is_resync_mode() { diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index 2bf7cc52c..0550858cf 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -285,6 +285,8 @@ class RaftReplDev : public ReplDev, void commit_blk(repl_req_ptr_t rreq); void replace_member(repl_req_ptr_t rreq); void reset_quorum_size(uint32_t commit_quorum); + void create_snp_resync_data(raft_buf_ptr_t& data_out); + bool apply_snp_resync_data(nuraft::buffer& data); }; } // namespace homestore diff --git a/src/lib/replication/repl_dev/raft_state_machine.cpp b/src/lib/replication/repl_dev/raft_state_machine.cpp index 66b95f145..b95912601 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.cpp +++ b/src/lib/replication/repl_dev/raft_state_machine.cpp @@ -297,6 +297,12 @@ void RaftStateMachine::create_snapshot(nuraft::snapshot& s, nuraft::async_result int RaftStateMachine::read_logical_snp_obj(nuraft::snapshot& s, void*& user_ctx, ulong obj_id, raft_buf_ptr_t& data_out, bool& is_last_obj) { + if ((obj_id & snp_obj_id_type_mask) == 0) { + // This is the preserved msg for homestore to resync data + m_rd.create_snp_resync_data(data_out); + is_last_obj = false; + return 0; + } auto snp_ctx = std::make_shared< nuraft_snapshot_context >(s); auto snp_data = std::make_shared< snapshot_obj >(); snp_data->user_ctx = user_ctx; @@ -320,6 +326,14 @@ int RaftStateMachine::read_logical_snp_obj(nuraft::snapshot& s, void*& user_ctx, void RaftStateMachine::save_logical_snp_obj(nuraft::snapshot& s, ulong& obj_id, nuraft::buffer& data, bool is_first_obj, bool is_last_obj) { + if ((obj_id & snp_obj_id_type_mask) == 0) { + // Homestore preserved msg + if (m_rd.apply_snp_resync_data(data)) { + obj_id = snp_obj_id_type_mask; + LOGDEBUG("apply_snp_resync_data success, next obj_id={}", obj_id); + } + return; + } auto snp_ctx = std::make_shared< nuraft_snapshot_context >(s); auto snp_data = std::make_shared< snapshot_obj >(); snp_data->offset = obj_id; diff --git a/src/lib/replication/repl_dev/raft_state_machine.h b/src/lib/replication/repl_dev/raft_state_machine.h index 6bf4faf5a..2387f8457 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.h +++ b/src/lib/replication/repl_dev/raft_state_machine.h @@ -86,6 +86,8 @@ class StateMachineStore; #define RD_LOGE(...) RD_LOG(ERROR, ##__VA_ARGS__) #define RD_LOGC(...) RD_LOG(CRITICAL, ##__VA_ARGS__) +static constexpr uint64_t snp_obj_id_type_mask = 0x8000000000000000; + using AsyncNotify = folly::SemiFuture< folly::Unit >; using AsyncNotifier = folly::Promise< folly::Unit >; diff --git a/src/tests/test_common/raft_repl_test_base.hpp b/src/tests/test_common/raft_repl_test_base.hpp index e40511e73..690cbcc6f 100644 --- a/src/tests/test_common/raft_repl_test_base.hpp +++ b/src/tests/test_common/raft_repl_test_base.hpp @@ -182,10 +182,22 @@ class TestReplicatedDB : public homestore::ReplDevListener { return make_async_success<>(); } + static int64_t get_next_lsn(uint64_t& obj_id) { + return obj_id & 0x7fffffffffffffff; + } + static void set_resync_msg_type_bit(uint64_t& obj_id) { + obj_id |= 1ull << 63; + } + int read_snapshot_obj(shared< snapshot_context > context, shared< snapshot_obj > snp_data) override { auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); + if ((snp_data->offset & snp_obj_id_type_mask) == 0) { + LOGERRORMOD(replication, "invalid snapshot offset={}", snp_data->offset); + return -1; + } - if (snp_data->offset == 0) { + int64_t next_lsn = get_next_lsn(snp_data->offset); + if (next_lsn == 0) { snp_data->is_last_obj = false; snp_data->blob = sisl::io_blob_safe(sizeof(ulong)); LOGINFOMOD(replication, @@ -194,7 +206,6 @@ class TestReplicatedDB : public homestore::ReplDevListener { return 0; } - int64_t next_lsn = snp_data->offset; std::vector< KeyValuePair > kv_snapshot_obj; // we can not use find to get the next element, since if the next lsn is a config lsn , it will not be put into // lsn_index_ and as a result, the find will return the end of the map. so here we use lower_bound to get the @@ -236,11 +247,17 @@ class TestReplicatedDB : public homestore::ReplDevListener { } void write_snapshot_obj(shared< snapshot_context > context, shared< snapshot_obj > snp_data) override { + if ((snp_data->offset & snp_obj_id_type_mask) == 0) { + LOGERRORMOD(replication, "invalid snapshot offset={}", snp_data->offset); + return; + } + int64_t next_lsn = get_next_lsn(snp_data->offset); auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); auto last_committed_idx = std::dynamic_pointer_cast< RaftReplDev >(repl_dev())->raft_server()->get_committed_log_idx(); - if (snp_data->offset == 0) { + if (next_lsn == 0) { snp_data->offset = last_committed_lsn + 1; + set_resync_msg_type_bit(snp_data->offset); LOGINFOMOD(replication, "[Replica={}] Save logical snapshot callback return obj_id={}", g_helper->replica_num(), snp_data->offset); return; @@ -271,6 +288,7 @@ class TestReplicatedDB : public homestore::ReplDevListener { } snp_data->offset = last_committed_lsn + 1; + set_resync_msg_type_bit(snp_data->offset); LOGINFOMOD(replication, "[Replica={}] Save logical snapshot callback obj_id={} term={} idx={} is_last={} num_items={}", g_helper->replica_num(), snp_data->offset, s->get_last_log_term(), s->get_last_log_idx(), From 4124863449857b37b715dcebc290ea9544c6aace Mon Sep 17 00:00:00 2001 From: yuwmao Date: Thu, 28 Nov 2024 15:28:12 +0800 Subject: [PATCH 3/4] Fix comments --- src/include/homestore/replication/repl_dev.h | 2 ++ src/lib/replication/repl_dev/raft_repl_dev.cpp | 8 ++++++-- src/lib/replication/repl_dev/raft_state_machine.cpp | 7 ++++++- src/lib/replication/repl_dev/raft_state_machine.h | 4 +++- src/tests/test_common/raft_repl_test_base.hpp | 4 ++-- 5 files changed, 19 insertions(+), 6 deletions(-) diff --git a/src/include/homestore/replication/repl_dev.h b/src/include/homestore/replication/repl_dev.h index 30b437182..335cda834 100644 --- a/src/include/homestore/replication/repl_dev.h +++ b/src/include/homestore/replication/repl_dev.h @@ -124,6 +124,8 @@ struct snapshot_obj { bool is_last_obj{false}; }; +//HomeStore has some meta information to be transmitted during the baseline resync, +//Although now only dsn needs to be synced, this structure is defined as a general message, and we can easily add data if needed in the future. struct snp_repl_dev_data { uint64_t magic_num{HOMESTORE_RESYNC_DATA_MAGIC}; uint32_t protocol_version{HOMESTORE_RESYNC_DATA_PROTOCOL_VERSION_V1}; diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 7ec8d5285..33dc11765 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -1510,15 +1510,19 @@ bool RaftReplDev::apply_snp_resync_data(nuraft::buffer& data) { return false; } auto received_crc = msg->crc; - msg->crc = 0; RD_LOGD("received snapshot resync msg, dsn={}, crc={}, received crc={}", msg->dsn, msg->crc, received_crc); + msg->crc = 0; auto computed_crc = crc32_ieee(0, reinterpret_cast< const unsigned char* >(msg), sizeof(snp_repl_dev_data)); if (received_crc != computed_crc) { RD_LOGE("Snapshot resync data crc mismatch, received_crc={}, computed_crc={}", received_crc, computed_crc); return false; } - m_next_dsn = msg->dsn; + if (msg->dsn > m_next_dsn) { + m_next_dsn = msg->dsn; + RD_LOGD("Update next_dsn from {} to {}", m_next_dsn.load(), msg->dsn); + return true; + } return true; } diff --git a/src/lib/replication/repl_dev/raft_state_machine.cpp b/src/lib/replication/repl_dev/raft_state_machine.cpp index b95912601..b214fbf17 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.cpp +++ b/src/lib/replication/repl_dev/raft_state_machine.cpp @@ -297,6 +297,8 @@ void RaftStateMachine::create_snapshot(nuraft::snapshot& s, nuraft::async_result int RaftStateMachine::read_logical_snp_obj(nuraft::snapshot& s, void*& user_ctx, ulong obj_id, raft_buf_ptr_t& data_out, bool& is_last_obj) { + // For Nuraft baseline resync, we separate the process into two layers: HomeStore layer and Application layer. + // We use the highest bit of the obj_id to indicate the message type: 0 is for HS, 1 is for Application. if ((obj_id & snp_obj_id_type_mask) == 0) { // This is the preserved msg for homestore to resync data m_rd.create_snp_resync_data(data_out); @@ -363,7 +365,10 @@ bool RaftStateMachine::apply_snapshot(nuraft::snapshot& s) { m_rd.set_last_commit_lsn(s.get_last_log_idx()); m_rd.m_data_journal->set_last_durable_lsn(s.get_last_log_idx()); auto snp_ctx = std::make_shared< nuraft_snapshot_context >(s); - return m_rd.m_listener->apply_snapshot(snp_ctx); + auto res = m_rd.m_listener->apply_snapshot(snp_ctx); + //make sure the changes are flushed. + hs()->cp_mgr().trigger_cp_flush(true /* force */); + return res; } nuraft::ptr< nuraft::snapshot > RaftStateMachine::last_snapshot() { diff --git a/src/lib/replication/repl_dev/raft_state_machine.h b/src/lib/replication/repl_dev/raft_state_machine.h index 2387f8457..fdb1c1b9f 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.h +++ b/src/lib/replication/repl_dev/raft_state_machine.h @@ -86,7 +86,9 @@ class StateMachineStore; #define RD_LOGE(...) RD_LOG(ERROR, ##__VA_ARGS__) #define RD_LOGC(...) RD_LOG(CRITICAL, ##__VA_ARGS__) -static constexpr uint64_t snp_obj_id_type_mask = 0x8000000000000000; +// For the logic snapshot obj_id, we use the highest bit to indicate the type of the snapshot message. +// 0 is for HS, 1 is for Application. +static constexpr uint64_t snp_obj_id_type_mask = 1ULL << 63; using AsyncNotify = folly::SemiFuture< folly::Unit >; using AsyncNotifier = folly::Promise< folly::Unit >; diff --git a/src/tests/test_common/raft_repl_test_base.hpp b/src/tests/test_common/raft_repl_test_base.hpp index 690cbcc6f..7dc364411 100644 --- a/src/tests/test_common/raft_repl_test_base.hpp +++ b/src/tests/test_common/raft_repl_test_base.hpp @@ -183,10 +183,10 @@ class TestReplicatedDB : public homestore::ReplDevListener { } static int64_t get_next_lsn(uint64_t& obj_id) { - return obj_id & 0x7fffffffffffffff; + return obj_id & ((1ULL << 63) - 1); } static void set_resync_msg_type_bit(uint64_t& obj_id) { - obj_id |= 1ull << 63; + obj_id |= 1ULL << 63; } int read_snapshot_obj(shared< snapshot_context > context, shared< snapshot_obj > snp_data) override { From 0cb90b81eb03bd8625034fc635cd8995c119262e Mon Sep 17 00:00:00 2001 From: yuwmao Date: Tue, 3 Dec 2024 11:49:00 +0800 Subject: [PATCH 4/4] fix comments --- conanfile.py | 2 +- src/lib/replication/repl_dev/raft_repl_dev.cpp | 5 +++-- src/lib/replication/repl_dev/raft_state_machine.cpp | 8 ++++---- src/lib/replication/repl_dev/raft_state_machine.h | 4 +++- src/tests/test_common/raft_repl_test_base.hpp | 8 ++++++-- 5 files changed, 17 insertions(+), 10 deletions(-) diff --git a/conanfile.py b/conanfile.py index 99e129017..bc914e16c 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.5.18" + version = "6.5.19" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 33dc11765..72a39a27a 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -1495,7 +1495,7 @@ void RaftReplDev::create_snp_resync_data(raft_buf_ptr_t& data_out) { snp_repl_dev_data msg; auto msg_size = sizeof(snp_repl_dev_data); msg.dsn = m_next_dsn; - auto crc = crc32_ieee(0, reinterpret_cast< const unsigned char* >(&msg), msg_size); + auto crc = crc32_ieee(init_crc32, reinterpret_cast< const unsigned char* >(&msg), msg_size); RD_LOGD("create snapshot resync msg, dsn={}, crc={}", msg.dsn, crc); msg.crc = crc; data_out = nuraft::buffer::alloc(msg_size); @@ -1511,8 +1511,9 @@ bool RaftReplDev::apply_snp_resync_data(nuraft::buffer& data) { } auto received_crc = msg->crc; RD_LOGD("received snapshot resync msg, dsn={}, crc={}, received crc={}", msg->dsn, msg->crc, received_crc); + // Clear the crc field before verification, because the crc value computed by leader doesn't contain it. msg->crc = 0; - auto computed_crc = crc32_ieee(0, reinterpret_cast< const unsigned char* >(msg), + auto computed_crc = crc32_ieee(init_crc32, reinterpret_cast< const unsigned char* >(msg), sizeof(snp_repl_dev_data)); if (received_crc != computed_crc) { RD_LOGE("Snapshot resync data crc mismatch, received_crc={}, computed_crc={}", received_crc, computed_crc); diff --git a/src/lib/replication/repl_dev/raft_state_machine.cpp b/src/lib/replication/repl_dev/raft_state_machine.cpp index b214fbf17..b64a32c24 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.cpp +++ b/src/lib/replication/repl_dev/raft_state_machine.cpp @@ -299,7 +299,7 @@ int RaftStateMachine::read_logical_snp_obj(nuraft::snapshot& s, void*& user_ctx, bool& is_last_obj) { // For Nuraft baseline resync, we separate the process into two layers: HomeStore layer and Application layer. // We use the highest bit of the obj_id to indicate the message type: 0 is for HS, 1 is for Application. - if ((obj_id & snp_obj_id_type_mask) == 0) { + if (is_hs_snp_obj(obj_id)) { // This is the preserved msg for homestore to resync data m_rd.create_snp_resync_data(data_out); is_last_obj = false; @@ -328,10 +328,10 @@ int RaftStateMachine::read_logical_snp_obj(nuraft::snapshot& s, void*& user_ctx, void RaftStateMachine::save_logical_snp_obj(nuraft::snapshot& s, ulong& obj_id, nuraft::buffer& data, bool is_first_obj, bool is_last_obj) { - if ((obj_id & snp_obj_id_type_mask) == 0) { + if (is_hs_snp_obj(obj_id)) { // Homestore preserved msg if (m_rd.apply_snp_resync_data(data)) { - obj_id = snp_obj_id_type_mask; + obj_id = snp_obj_id_type_app; LOGDEBUG("apply_snp_resync_data success, next obj_id={}", obj_id); } return; @@ -367,7 +367,7 @@ bool RaftStateMachine::apply_snapshot(nuraft::snapshot& s) { auto snp_ctx = std::make_shared< nuraft_snapshot_context >(s); auto res = m_rd.m_listener->apply_snapshot(snp_ctx); //make sure the changes are flushed. - hs()->cp_mgr().trigger_cp_flush(true /* force */); + hs()->cp_mgr().trigger_cp_flush(true /* force */).get(); return res; } diff --git a/src/lib/replication/repl_dev/raft_state_machine.h b/src/lib/replication/repl_dev/raft_state_machine.h index fdb1c1b9f..8f00cec43 100644 --- a/src/lib/replication/repl_dev/raft_state_machine.h +++ b/src/lib/replication/repl_dev/raft_state_machine.h @@ -88,7 +88,7 @@ class StateMachineStore; // For the logic snapshot obj_id, we use the highest bit to indicate the type of the snapshot message. // 0 is for HS, 1 is for Application. -static constexpr uint64_t snp_obj_id_type_mask = 1ULL << 63; +static constexpr uint64_t snp_obj_id_type_app = 1ULL << 63; using AsyncNotify = folly::SemiFuture< folly::Unit >; using AsyncNotifier = folly::Promise< folly::Unit >; @@ -139,6 +139,8 @@ class RaftStateMachine : public nuraft::state_machine { std::string rdev_name() const; + static bool is_hs_snp_obj(uint64_t obj_id) { return (obj_id & snp_obj_id_type_app) == 0; } + private: void after_precommit_in_leader(const nuraft::raft_server::req_ext_cb_params& params); }; diff --git a/src/tests/test_common/raft_repl_test_base.hpp b/src/tests/test_common/raft_repl_test_base.hpp index 7dc364411..7445568b8 100644 --- a/src/tests/test_common/raft_repl_test_base.hpp +++ b/src/tests/test_common/raft_repl_test_base.hpp @@ -191,7 +191,11 @@ class TestReplicatedDB : public homestore::ReplDevListener { int read_snapshot_obj(shared< snapshot_context > context, shared< snapshot_obj > snp_data) override { auto s = std::dynamic_pointer_cast< nuraft_snapshot_context >(context)->nuraft_snapshot(); - if ((snp_data->offset & snp_obj_id_type_mask) == 0) { + if(RaftStateMachine::is_hs_snp_obj(snp_data->offset)) { + LOGERRORMOD(replication, "invalid snapshot offset={}", snp_data->offset); + return -1; + } + if ((snp_data->offset & snp_obj_id_type_app) == 0) { LOGERRORMOD(replication, "invalid snapshot offset={}", snp_data->offset); return -1; } @@ -247,7 +251,7 @@ class TestReplicatedDB : public homestore::ReplDevListener { } void write_snapshot_obj(shared< snapshot_context > context, shared< snapshot_obj > snp_data) override { - if ((snp_data->offset & snp_obj_id_type_mask) == 0) { + if (RaftStateMachine::is_hs_snp_obj(snp_data->offset)) { LOGERRORMOD(replication, "invalid snapshot offset={}", snp_data->offset); return; }