Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "1.0.10"
version = "1.0.11"
homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
topics = ("ebay")
Expand Down
49 changes: 17 additions & 32 deletions src/lib/homestore_backend/hs_homeobject.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,24 @@ void HSHomeObject::init_homestore() {
auto const new_id = app->discover_svcid(_our_id);
RELEASE_ASSERT(new_id == _our_id, "Received new SvcId [{}] AFTER recovery of [{}]?!", to_string(new_id),
to_string(_our_id));
recover_pg();
recover_shard();
}
initialize_chunk_selector();

// recover PG
HomeStore::instance()->meta_service().register_handler(
_pg_meta_name,
[this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {
on_pg_meta_blk_found(std::move(buf), voidptr_cast(mblk));
},
nullptr, true);
HomeStore::instance()->meta_service().read_sub_sb(_pg_meta_name);

// recover shard
HomeStore::instance()->meta_service().register_handler(
_shard_meta_name,
[this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) { on_shard_meta_blk_found(mblk, buf); },
[this](bool success) { on_shard_meta_blk_recover_completed(success); }, true);
HomeStore::instance()->meta_service().read_sub_sb(_shard_meta_name);

recovery_done_ = true;
LOGI("Initialize and start HomeStore is successfully");

Expand Down Expand Up @@ -205,21 +219,6 @@ void HSHomeObject::register_homestore_metablk_callback() {
LOGI("Found existing SvcId: [{}]", to_string(_our_id));
},
nullptr, true);

HomeStore::instance()->meta_service().register_handler(
_shard_meta_name,
[this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {
m_shard_sb_bufs.emplace_back(std::pair(std::move(buf), voidptr_cast(mblk)));
},
nullptr, true);

HomeStore::instance()->meta_service().register_handler(
_pg_meta_name,
[this](homestore::meta_blk* mblk, sisl::byte_view buf, size_t size) {
m_pg_sb_bufs.emplace_back(std::pair(std::move(buf), voidptr_cast(mblk)));
},
// TODO: move "repl_dev" to homestore::repl_dev and "index" to homestore::index
nullptr, true);
}

HSHomeObject::~HSHomeObject() {
Expand All @@ -235,20 +234,6 @@ HSHomeObject::~HSHomeObject() {
iomanager.stop();
}

void HSHomeObject::initialize_chunk_selector() {
std::unordered_set< homestore::chunk_num_t > excluding_chunks;
std::scoped_lock lock_guard(_pg_lock);
for (auto& pair : _pg_map) {
for (auto& shard : pair.second->shards_) {
if (shard->info.state == ShardInfo::State::OPEN) {
excluding_chunks.emplace(d_cast< HS_Shard* >(shard.get())->sb_->chunk_id);
}
}
}

chunk_selector_->build_per_dev_chunk_heap(excluding_chunks);
}

HomeObjectStats HSHomeObject::_get_stats() const {
HomeObjectStats stats;
auto const& repl_svc = homestore::hs()->repl_service();
Expand Down
8 changes: 3 additions & 5 deletions src/lib/homestore_backend/hs_homeobject.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ class HSHomeObject : public HomeObjectImpl {
std::shared_ptr< BlobIndexTable > index_table;
};
std::unordered_map< std::string, PgIndexTable > index_table_pg_map_;
std::vector< std::pair< sisl::byte_view, void* > > m_pg_sb_bufs;
std::vector< std::pair< sisl::byte_view, void* > > m_shard_sb_bufs;

public:
#pragma pack(1)
Expand Down Expand Up @@ -248,9 +246,9 @@ class HSHomeObject : public HomeObjectImpl {

// recover part
void register_homestore_metablk_callback();
void initialize_chunk_selector();
void recover_pg();
void recover_shard();
void on_pg_meta_blk_found(sisl::byte_view const& buf, void* meta_cookie);
void on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf);
void on_shard_meta_blk_recover_completed(bool success);

void persist_pg_sb();

Expand Down
45 changes: 21 additions & 24 deletions src/lib/homestore_backend/hs_pg_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,31 +192,28 @@ PGInfo HSHomeObject::deserialize_pg_info(const unsigned char* json_str, size_t s
return pg_info;
}

void HSHomeObject::recover_pg() {
for (auto const& [buf, mblk] : m_pg_sb_bufs) {
homestore::superblk< pg_info_superblk > pg_sb(_pg_meta_name);
pg_sb.load(buf, mblk);

auto v = hs_repl_service().get_repl_dev(pg_sb->replica_set_uuid);
if (v.hasError()) {
// TODO: We need to raise an alert here, since without pg repl_dev all operations on that pg will fail
LOGE("open_repl_dev for group_id={} has failed", boost::uuids::to_string(pg_sb->replica_set_uuid));
return;
}
auto pg_id = pg_sb->id;
auto uuid_str = boost::uuids::to_string(pg_sb->index_table_uuid);
auto hs_pg = std::make_unique< HS_PG >(std::move(pg_sb), std::move(v.value()));
// During PG recovery check if index is already recoverd else
// add entry in map, so that index recovery can update the PG.
std::scoped_lock lg(index_lock_);
auto it = index_table_pg_map_.find(uuid_str);
RELEASE_ASSERT(it != index_table_pg_map_.end(), "IndexTable should be recovered before PG");
hs_pg->index_table_ = it->second.index_table;
it->second.pg_id = pg_id;

add_pg_to_map(std::move(hs_pg));
void HSHomeObject::on_pg_meta_blk_found(sisl::byte_view const& buf, void* meta_cookie) {
homestore::superblk< pg_info_superblk > pg_sb(_pg_meta_name);
pg_sb.load(buf, meta_cookie);

auto v = hs_repl_service().get_repl_dev(pg_sb->replica_set_uuid);
if (v.hasError()) {
// TODO: We need to raise an alert here, since without pg repl_dev all operations on that pg will fail
LOGE("open_repl_dev for group_id={} has failed", boost::uuids::to_string(pg_sb->replica_set_uuid));
return;
}
m_pg_sb_bufs.clear();
auto pg_id = pg_sb->id;
auto uuid_str = boost::uuids::to_string(pg_sb->index_table_uuid);
auto hs_pg = std::make_unique< HS_PG >(std::move(pg_sb), std::move(v.value()));
// During PG recovery check if index is already recoverd else
// add entry in map, so that index recovery can update the PG.
std::scoped_lock lg(index_lock_);
auto it = index_table_pg_map_.find(uuid_str);
RELEASE_ASSERT(it != index_table_pg_map_.end(), "IndexTable should be recovered before PG");
hs_pg->index_table_ = it->second.index_table;
it->second.pg_id = pg_id;

add_pg_to_map(std::move(hs_pg));
}

PGInfo HSHomeObject::HS_PG::pg_info_from_sb(homestore::superblk< pg_info_superblk > const& sb) {
Expand Down
22 changes: 16 additions & 6 deletions src/lib/homestore_backend/hs_shard_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,13 +273,23 @@ void HSHomeObject::on_shard_message_commit(int64_t lsn, sisl::blob const& h, hom
}
}

void HSHomeObject::recover_shard() {
for (auto& [buf, mblk] : m_shard_sb_bufs) {
homestore::superblk< shard_info_superblk > sb(_shard_meta_name);
sb.load(buf, mblk);
add_new_shard_to_map(std::make_unique< HS_Shard >(std::move(sb)));
void HSHomeObject::on_shard_meta_blk_found(homestore::meta_blk* mblk, sisl::byte_view buf) {
homestore::superblk< shard_info_superblk > sb(_shard_meta_name);
sb.load(buf, mblk);
add_new_shard_to_map(std::make_unique< HS_Shard >(std::move(sb)));
}

void HSHomeObject::on_shard_meta_blk_recover_completed(bool success) {
std::unordered_set< homestore::chunk_num_t > excluding_chunks;
std::scoped_lock lock_guard(_pg_lock);
for (auto& pair : _pg_map) {
for (auto& shard : pair.second->shards_) {
if (shard->info.state == ShardInfo::State::OPEN) {
excluding_chunks.emplace(d_cast< HS_Shard* >(shard.get())->sb_->chunk_id);
}
}
}
m_shard_sb_bufs.clear();
chunk_selector_->build_per_dev_chunk_heap(excluding_chunks);
}

void HSHomeObject::add_new_shard_to_map(ShardPtr&& shard) {
Expand Down
78 changes: 22 additions & 56 deletions src/lib/homestore_backend/tests/hs_shard_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,41 +125,16 @@ TEST_F(TestFixture, MockSealShard) {
}
#endif

class FixtureAppWithRecovery : public FixtureApp {
std::string fpath_{"/tmp/test_shard_manager.data." + std::to_string(rand())};

public:
std::list< std::filesystem::path > devices() const override {
auto device_info = std::list< std::filesystem::path >();
device_info.emplace_back(std::filesystem::canonical(fpath_));
return device_info;
}

std::string path() const { return fpath_; }
};

class ShardManagerTestingRecovery : public ::testing::Test {
public:
void SetUp() override { app = std::make_shared< FixtureAppWithRecovery >(); }

void SetUp() override { app = std::make_shared< FixtureApp >(); }
void TearDown() override { app->clean(); }

protected:
std::shared_ptr< FixtureApp > app;
};

// TODO: enable the following test case after we fix raft repl dev recovery issue.
/*
TEST_F(ShardManagerTestingRecovery, ShardManagerRecovery) {
// prepare the env first;
auto app_with_recovery = dp_cast< FixtureAppWithRecovery >(app);
const std::string fpath = app_with_recovery->path();
if (std::filesystem::exists(fpath)) { std::filesystem::remove(fpath); }
LOGI("creating device files with size {} ", homestore::in_bytes(2 * Gi));
LOGI("creating {} device file", fpath);
std::ofstream ofs{fpath, std::ios::binary | std::ios::out | std::ios::trunc};
std::filesystem::resize_file(fpath, 2 * Gi);

homeobject::pg_id_t _pg_id{1u};
homeobject::peer_id_t _peer1;
homeobject::peer_id_t _peer2;
Expand Down Expand Up @@ -187,6 +162,7 @@ TEST_F(ShardManagerTestingRecovery, ShardManagerRecovery) {
_home_object.reset();
LOGI("restart home_object");
_home_object = homeobject::init_homeobject(std::weak_ptr< homeobject::HomeObjectApplication >(app));
std::this_thread::sleep_for(std::chrono::seconds{5});
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we use pg->repl_dev_->get_leader_id() to check in L190 and sleep if not the leader?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use 5s because the election_time_out is 3.2s. https://github.com/eBay/nuraft_mesg/blob/7e8cdeb261270642dac186902351da2174d7f077/src/lib/manager_impl.cpp#L25C1-L25C72

i want to refactor this two test case here , since they are much similiar and have a lot of duplicated logic as

TEST_F(HomeObjectFixture, SealShardWithRestart) {

can we do this in a separate refactor PR?

homeobject::HSHomeObject* ho = dynamic_cast< homeobject::HSHomeObject* >(_home_object.get());
// check PG after recovery.
EXPECT_TRUE(ho->_pg_map.size() == 1);
Expand All @@ -201,15 +177,15 @@ TEST_F(ShardManagerTestingRecovery, ShardManagerRecovery) {
EXPECT_EQ(ShardInfo::State::OPEN, check_shard->info.state);

auto hs_shard = d_cast< homeobject::HSHomeObject::HS_Shard* >(check_shard);
EXPECT_TRUE(hs_shard->info == shard_info);
EXPECT_TRUE(hs_shard->sb_->id == shard_info.id);
EXPECT_TRUE(hs_shard->sb_->placement_group == shard_info.placement_group);
EXPECT_TRUE(hs_shard->sb_->state == shard_info.state);
EXPECT_TRUE(hs_shard->sb_->created_time == shard_info.created_time);
EXPECT_TRUE(hs_shard->sb_->last_modified_time == shard_info.last_modified_time);
EXPECT_TRUE(hs_shard->sb_->available_capacity_bytes == shard_info.available_capacity_bytes);
EXPECT_TRUE(hs_shard->sb_->total_capacity_bytes == shard_info.total_capacity_bytes);
EXPECT_TRUE(hs_shard->sb_->deleted_capacity_bytes == shard_info.deleted_capacity_bytes);
auto& recovered_shard_info = hs_shard->info;
EXPECT_TRUE(recovered_shard_info == shard_info);
EXPECT_TRUE(recovered_shard_info.placement_group == shard_info.placement_group);
EXPECT_TRUE(recovered_shard_info.state == shard_info.state);
EXPECT_TRUE(recovered_shard_info.created_time == shard_info.created_time);
EXPECT_TRUE(recovered_shard_info.last_modified_time == shard_info.last_modified_time);
EXPECT_TRUE(recovered_shard_info.available_capacity_bytes == shard_info.available_capacity_bytes);
EXPECT_TRUE(recovered_shard_info.total_capacity_bytes == shard_info.total_capacity_bytes);
EXPECT_TRUE(recovered_shard_info.deleted_capacity_bytes == shard_info.deleted_capacity_bytes);

// seal the shard when shard is recovery
e = _home_object->shard_manager()->seal_shard(shard_id).get();
Expand All @@ -221,6 +197,7 @@ TEST_F(ShardManagerTestingRecovery, ShardManagerRecovery) {
LOGI("restart home_object again");
// re-create the homeobject and pg infos and shard infos will be recover automatically.
_home_object = homeobject::init_homeobject(std::weak_ptr< homeobject::HomeObjectApplication >(app));
std::this_thread::sleep_for(std::chrono::seconds{5});
auto s = _home_object->shard_manager()->get_shard(shard_id).get();
ASSERT_TRUE(!!s);
EXPECT_EQ(ShardInfo::State::SEALED, s.value().state);
Expand All @@ -237,19 +214,9 @@ TEST_F(ShardManagerTestingRecovery, ShardManagerRecovery) {
EXPECT_EQ(2, pg_iter->second->shard_sequence_num_);
// finally close the homeobject and homestore.
_home_object.reset();
std::filesystem::remove(fpath);
}

TEST_F(ShardManagerTestingRecovery, SealedShardRecovery) {
// prepare the env first;
auto app_with_recovery = dp_cast< FixtureAppWithRecovery >(app);
const std::string fpath = app_with_recovery->path();
if (std::filesystem::exists(fpath)) { std::filesystem::remove(fpath); }
LOGI("creating device files with size {} ", homestore::in_bytes(2 * Gi));
LOGI("creating {} device file", fpath);
std::ofstream ofs{fpath, std::ios::binary | std::ios::out | std::ios::trunc};
std::filesystem::resize_file(fpath, 2 * Gi);

homeobject::pg_id_t _pg_id{1u};
homeobject::peer_id_t _peer1;
homeobject::peer_id_t _peer2;
Expand Down Expand Up @@ -285,24 +252,23 @@ TEST_F(ShardManagerTestingRecovery, SealedShardRecovery) {
LOGI("restart home_object");
// re-create the homeobject and pg infos and shard infos will be recover automatically.
_home_object = homeobject::init_homeobject(std::weak_ptr< homeobject::HomeObjectApplication >(app));
std::this_thread::sleep_for(std::chrono::seconds{5});
ho = dynamic_cast< homeobject::HSHomeObject* >(_home_object.get());
EXPECT_TRUE(ho->_pg_map.size() == 1);
// check shard internal state;
pg_iter = ho->_pg_map.find(_pg_id);
EXPECT_TRUE(pg_iter != ho->_pg_map.end());
EXPECT_EQ(1, pg_iter->second->shards_.size());
auto hs_shard = d_cast< homeobject::HSHomeObject::HS_Shard* >(pg_iter->second->shards_.front().get());
EXPECT_TRUE(hs_shard->info == shard_info);
EXPECT_TRUE(hs_shard->sb_->id == shard_info.id);
EXPECT_TRUE(hs_shard->sb_->placement_group == shard_info.placement_group);
EXPECT_TRUE(hs_shard->sb_->state == shard_info.state);
EXPECT_TRUE(hs_shard->sb_->created_time == shard_info.created_time);
EXPECT_TRUE(hs_shard->sb_->last_modified_time == shard_info.last_modified_time);
EXPECT_TRUE(hs_shard->sb_->available_capacity_bytes == shard_info.available_capacity_bytes);
EXPECT_TRUE(hs_shard->sb_->total_capacity_bytes == shard_info.total_capacity_bytes);
EXPECT_TRUE(hs_shard->sb_->deleted_capacity_bytes == shard_info.deleted_capacity_bytes);
auto& recovered_shard_info = hs_shard->info;
EXPECT_TRUE(recovered_shard_info == shard_info);
EXPECT_TRUE(recovered_shard_info.placement_group == shard_info.placement_group);
EXPECT_TRUE(recovered_shard_info.state == shard_info.state);
EXPECT_TRUE(recovered_shard_info.created_time == shard_info.created_time);
EXPECT_TRUE(recovered_shard_info.last_modified_time == shard_info.last_modified_time);
EXPECT_TRUE(recovered_shard_info.available_capacity_bytes == shard_info.available_capacity_bytes);
EXPECT_TRUE(recovered_shard_info.total_capacity_bytes == shard_info.total_capacity_bytes);
EXPECT_TRUE(recovered_shard_info.deleted_capacity_bytes == shard_info.deleted_capacity_bytes);
// finally close the homeobject and homestore.
_home_object.reset();
std::filesystem::remove(fpath);
}
*/