Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.5.14"
version = "6.5.16"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
9 changes: 7 additions & 2 deletions src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,8 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::
MultiBlkId const& local_blkid() const { return m_local_blkid; }
RemoteBlkId const& remote_blkid() const { return m_remote_blkid; }
const char* data() const {
DEBUG_ASSERT(m_data != nullptr, "m_data is nullptr, use before save_pushed/fetched_data or after release_data()");
DEBUG_ASSERT(m_data != nullptr,
"m_data is nullptr, use before save_pushed/fetched_data or after release_data()");
return r_cast< const char* >(m_data);
}
repl_req_state_t state() const { return repl_req_state_t(m_state.load()); }
Expand Down Expand Up @@ -349,7 +350,7 @@ class ReplDevListener {
/// @brief Called when the repl_dev is being destroyed. The consumer is expected to clean up any related resources.
/// However, it is expected that this call be idempotent. It is possible in rare scenarios that this can be called
/// after restart in case crash happened during the destroy.
virtual void on_destroy() = 0;
virtual void on_destroy(const group_id_t& group_id) = 0;

/// @brief Called when replace member is performed.
virtual void on_replace_member(const replica_member_info& member_out, const replica_member_info& member_in) = 0;
Expand Down Expand Up @@ -450,6 +451,10 @@ class ReplDev {
/// @return Block size
virtual uint32_t get_blk_size() const = 0;

/// @brief Gets the last commit lsn of this repldev
/// @return last_commit_lsn
virtual repl_lsn_t get_last_commit_lsn() const = 0;

virtual void attach_listener(shared< ReplDevListener > listener) { m_listener = std::move(listener); }

virtual void detach_listener() {
Expand Down
1 change: 1 addition & 0 deletions src/include/homestore/vchunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class VChunk {
uint16_t get_chunk_id() const;
cshared< Chunk > get_internal_chunk() const;
uint64_t size() const;
void reset();

private:
shared< Chunk > m_internal_chunk;
Expand Down
7 changes: 7 additions & 0 deletions src/lib/blkalloc/append_blk_allocator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,13 @@ bool AppendBlkAllocator::is_blk_alloced(const BlkId& in_bid, bool) const {
return in_bid.blk_num() < get_used_blks();
}

void AppendBlkAllocator::reset() {
m_last_append_offset.store(0);
m_freeable_nblks.store(0);
m_commit_offset.store(0);
m_is_dirty.store(true);
}

bool AppendBlkAllocator::is_blk_alloced_on_disk(BlkId const& bid, bool) const {
return bid.blk_num() < m_sb->commit_offset;
}
Expand Down
33 changes: 19 additions & 14 deletions src/lib/blkalloc/append_blk_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,21 @@ struct append_blk_sb_t {
};
#pragma pack()

//class AppendBlkAllocMetrics : public sisl::MetricsGroup {
//public:
// explicit AppendBlkAllocMetrics(const char* inst_name) : sisl::MetricsGroup("AppendBlkAlloc", inst_name) {
// REGISTER_COUNTER(num_alloc, "Number of blks alloc attempts");
// REGISTER_COUNTER(num_alloc_failure, "Number of blk alloc failures");
// class AppendBlkAllocMetrics : public sisl::MetricsGroup {
// public:
// explicit AppendBlkAllocMetrics(const char* inst_name) : sisl::MetricsGroup("AppendBlkAlloc", inst_name) {
// REGISTER_COUNTER(num_alloc, "Number of blks alloc attempts");
// REGISTER_COUNTER(num_alloc_failure, "Number of blk alloc failures");
//
// register_me_to_farm();
// }
// register_me_to_farm();
// }
//
// AppendBlkAllocMetrics(const AppendBlkAllocMetrics&) = delete;
// AppendBlkAllocMetrics(AppendBlkAllocMetrics&&) noexcept = delete;
// AppendBlkAllocMetrics& operator=(const AppendBlkAllocMetrics&) = delete;
// AppendBlkAllocMetrics& operator=(AppendBlkAllocMetrics&&) noexcept = delete;
// ~AppendBlkAllocMetrics() { deregister_me_from_farm(); }
//};
// AppendBlkAllocMetrics(const AppendBlkAllocMetrics&) = delete;
// AppendBlkAllocMetrics(AppendBlkAllocMetrics&&) noexcept = delete;
// AppendBlkAllocMetrics& operator=(const AppendBlkAllocMetrics&) = delete;
// AppendBlkAllocMetrics& operator=(AppendBlkAllocMetrics&&) noexcept = delete;
// ~AppendBlkAllocMetrics() { deregister_me_from_farm(); }
// };

//
// The assumption for AppendBlkAllocator:
Expand Down Expand Up @@ -108,6 +108,11 @@ class AppendBlkAllocator : public BlkAllocator {

std::string to_string() const override;

/**
* @brief : reset the allocator to initial state, so all the blks in this chunk are free.
*/
void reset() override;

void cp_flush(CP* cp) override;
void recovery_completed() override {}
nlohmann::json get_status(int log_level) const override;
Expand All @@ -121,7 +126,7 @@ class AppendBlkAllocator : public BlkAllocator {
std::atomic< blk_num_t > m_freeable_nblks{0}; // count of blks fragmentedly freed (both on-disk and in-memory)
std::atomic< blk_num_t > m_commit_offset{0}; // offset in on-disk version
std::atomic< bool > m_is_dirty{false};
//AppendBlkAllocMetrics m_metrics;
// AppendBlkAllocMetrics m_metrics;
superblk< append_blk_sb_t > m_sb; // only cp will be writing to this disk
};

Expand Down
1 change: 1 addition & 0 deletions src/lib/blkalloc/bitmap_blk_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class BitmapBlkAllocator : public BlkAllocator {
void cp_flush(CP* cp) override;

void recovery_completed() override {}
void reset() override {}
blk_num_t get_num_portions() const { return (m_num_blks - 1) / m_blks_per_portion + 1; }
blk_num_t get_blks_per_portion() const { return m_blks_per_portion; }

Expand Down
1 change: 1 addition & 0 deletions src/lib/blkalloc/blk_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ class BlkAllocator {
virtual bool is_blk_alloced(BlkId const& b, bool use_lock = false) const = 0;
virtual bool is_blk_alloced_on_disk(BlkId const& b, bool use_lock = false) const = 0;
virtual void recovery_completed() = 0;
virtual void reset() = 0;

virtual std::string to_string() const = 0;
virtual void cp_flush(CP* cp) = 0;
Expand Down
1 change: 1 addition & 0 deletions src/lib/blkalloc/fixed_blk_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ class FixedBlkAllocator : public BitmapBlkAllocator {
blk_num_t available_blks() const override;
blk_num_t get_used_blks() const override;
blk_num_t get_defrag_nblks() const override;
void reset() override{};
bool is_blk_alloced(BlkId const& in_bid, bool use_lock = false) const override;
std::string to_string() const override;

Expand Down
1 change: 1 addition & 0 deletions src/lib/blkalloc/varsize_blk_allocator.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ class VarsizeBlkAllocator : public BitmapBlkAllocator {
blk_num_t get_used_blks() const override;
bool is_blk_alloced(BlkId const& in_bid, bool use_lock = false) const override;
std::string to_string() const override;
void reset() override{};
nlohmann::json get_metrics_in_json();

private:
Expand Down
2 changes: 2 additions & 0 deletions src/lib/device/vchunk.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const uint8_t* VChunk::get_user_private() const { return m_internal_chunk->user_

blk_num_t VChunk::get_total_blks() const { return m_internal_chunk->blk_allocator()->get_total_blks(); }

void VChunk::reset() { m_internal_chunk->blk_allocator_mutable()->reset(); }

blk_num_t VChunk::available_blks() const { return m_internal_chunk->blk_allocator()->available_blks(); }

blk_num_t VChunk::get_defrag_nblks() const { return m_internal_chunk->blk_allocator()->get_defrag_nblks(); }
Expand Down
32 changes: 26 additions & 6 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -914,8 +914,7 @@ void RaftReplDev::handle_rollback(repl_req_ptr_t rreq) {
if (rreq->has_state(repl_req_state_t::BLK_ALLOCATED)) {
auto blkid = rreq->local_blkid();
data_service().async_free_blk(blkid).thenValue([this, blkid](auto&& err) {
HS_LOG_ASSERT(!err, "freeing blkid={} upon error failed, potential to cause blk leak",
blkid.to_string());
HS_LOG_ASSERT(!err, "freeing blkid={} upon error failed, potential to cause blk leak", blkid.to_string());
RD_LOGD("Rollback rreq: Releasing blkid={} freed successfully", blkid.to_string());
});
}
Expand Down Expand Up @@ -1212,7 +1211,7 @@ void RaftReplDev::leave() {

// We let the listener know right away, so that they can cleanup persistent structures soonest. This will
// reduce the time window of leaked resources if any
m_listener->on_destroy();
m_listener->on_destroy(group_id());

// Persist that destroy pending in superblk, so that in case of crash before cleanup of resources, it can be done
// post restart.
Expand All @@ -1227,7 +1226,8 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu
nuraft::cb_func::Param* param) {
auto ret = nuraft::cb_func::ReturnCode::Ok;

if (type == nuraft::cb_func::Type::GotAppendEntryReqFromLeader) {
switch (type) {
case nuraft::cb_func::Type::GotAppendEntryReqFromLeader: {
auto raft_req = r_cast< nuraft::req_msg* >(param->ctx);
auto const& entries = raft_req->log_entries();

Expand Down Expand Up @@ -1276,9 +1276,29 @@ std::pair< bool, nuraft::cb_func::ReturnCode > RaftReplDev::handle_raft_event(nu
sisl::VectorPool< repl_req_ptr_t >::free(reqs);
}
return {true, ret};
} else {
return {false, ret};
}

case nuraft::cb_func::Type::RemovedFromCluster: {
// a node will reach here when :
// 1. it is removed from the cluster and the new config(excluding this node) is being committed on this node
// 2. it is removed from the cluster , but the node is down and new config log(excluding this node) is not
// replicated to this removed node. when the node restart, leader will not send any append entry to this node,
// since it is not a member of the raft group. it will become a condidate and send request-vote request to other
// members of this raft group. a member will send RemovedFromCluster to the node if this member finds the node
// is no longer a member of the raft group.

// this will lazily cleanup the group
// TODO:cleanup this repl dev ASAP if necessary.
leave();

return {true, ret};
}

// TODO: Add more type handler if necessary
default:
break;
}
return {false, ret};
}

void RaftReplDev::flush_durable_commit_lsn() {
Expand Down
6 changes: 2 additions & 4 deletions src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ class RaftReplDev : public ReplDev,
std::string rdev_name() const { return m_rdev_name; }
std::string my_replica_id_str() const { return boost::uuids::to_string(m_my_repl_id); }
uint32_t get_blk_size() const override;
repl_lsn_t get_last_commit_lsn() const { return m_commit_upto_lsn.load(); }
repl_lsn_t get_last_commit_lsn() const override { return m_commit_upto_lsn.load(); }
void set_last_commit_lsn(repl_lsn_t lsn) { m_commit_upto_lsn.store(lsn); }
bool is_destroy_pending() const;
bool is_destroyed() const;
Expand Down Expand Up @@ -229,9 +229,7 @@ class RaftReplDev : public ReplDev,
*
* @param num_reserved_entries The number of reserved entries of the replication log.
*/
void truncate(uint32_t num_reserved_entries) {
m_data_journal->truncate(num_reserved_entries, m_compact_lsn.load());
}
void truncate(uint32_t num_reserved_entries) { m_data_journal->truncate(num_reserved_entries, m_compact_lsn.load()); }

void wait_for_logstore_ready() { m_data_journal->wait_for_log_store_ready(); }

Expand Down
30 changes: 26 additions & 4 deletions src/lib/replication/repl_dev/raft_state_machine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,32 @@ raft_buf_ptr_t RaftStateMachine::commit_ext(nuraft::state_machine::ext_op_params
}

void RaftStateMachine::commit_config(const ulong log_idx, raft_cluster_config_ptr_t& new_conf) {
// when reaching here, the config change log has already been committed, and the new config has been applied to the
// cluster

RD_LOGD("Raft channel: Commit new cluster conf , log_idx = {}", log_idx);
// TODO:add more logic here if necessary

#ifdef _PRERELEASE
auto& servers_in_new_conf = new_conf->get_servers();
std::vector< int32_t > server_ids_in_new_conf;
for (auto& server : servers_in_new_conf)
server_ids_in_new_conf.emplace_back(server->get_id());

auto my_id = m_rd.server_id();

std::ostringstream oss;
auto it = server_ids_in_new_conf.begin();
if (it != server_ids_in_new_conf.end()) {
oss << *it;
++it;
}
for (; it != server_ids_in_new_conf.end(); ++it) {
oss << "," << *it;
}

RD_LOG(INFO, "Raft channel: server ids in new cluster conf : {}, my_id {}, group_id {}", oss.str(), my_id,
m_rd.group_id_str());
#endif
}

void RaftStateMachine::rollback_config(const ulong log_idx, raft_cluster_config_ptr_t& conf) {
Expand Down Expand Up @@ -242,9 +266,7 @@ void RaftStateMachine::unlink_lsn_to_req(int64_t lsn, repl_req_ptr_t rreq) {
// it is possible a LSN mapped to different rreq in history
// due to log overwritten. Verify the rreq before removing
auto deleted = m_lsn_req_map.erase_if_equal(lsn, rreq);
if (deleted) {
RD_LOG(DEBUG, "Raft channel: erase lsn {}, rreq {}", lsn, rreq->to_string());
}
if (deleted) { RD_LOG(DEBUG, "Raft channel: erase lsn {}, rreq {}", lsn, rreq->to_string()); }
}

void RaftStateMachine::link_lsn_to_req(repl_req_ptr_t rreq, int64_t lsn) {
Expand Down
2 changes: 2 additions & 0 deletions src/lib/replication/repl_dev/solo_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class SoloReplDev : public ReplDev {

uuid_t group_id() const override { return m_group_id; }

repl_lsn_t get_last_commit_lsn() const override { return 0; }

uint32_t get_blk_size() const override;

void cp_flush(CP* cp);
Expand Down
6 changes: 3 additions & 3 deletions src/tests/test_common/raft_repl_test_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -306,10 +306,10 @@ class TestReplicatedDB : public homestore::ReplDevListener {
boost::uuids::to_string(member_out.id), boost::uuids::to_string(member_in.id));
}

void on_destroy() override {
void on_destroy(const group_id_t& group_id) override {
LOGINFOMOD(replication, "[Replica={}] Group={} is being destroyed", g_helper->replica_num(),
boost::uuids::to_string(repl_dev()->group_id()));
g_helper->unregister_listener(repl_dev()->group_id());
boost::uuids::to_string(group_id));
g_helper->unregister_listener(group_id);
}

void db_write(uint64_t data_size, uint32_t max_size_per_iov) {
Expand Down
2 changes: 1 addition & 1 deletion src/tests/test_solo_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ class SoloReplDevTest : public testing::Test {
LOGINFO("Received error={} on repl_dev", enum_name(error));
}
void on_replace_member(const replica_member_info& member_out, const replica_member_info& member_in) override {}
void on_destroy() override {}
void on_destroy(const group_id_t& group_id) override {}
};

class Application : public ReplApplication {
Expand Down