diff --git a/conanfile.py b/conanfile.py index d377ba0cc..0cf77c581 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "7.0.1" + version = "7.0.3" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" diff --git a/src/include/homestore/checkpoint/cp_mgr.hpp b/src/include/homestore/checkpoint/cp_mgr.hpp index da6dfa3d1..27e8a13b3 100644 --- a/src/include/homestore/checkpoint/cp_mgr.hpp +++ b/src/include/homestore/checkpoint/cp_mgr.hpp @@ -179,7 +179,7 @@ class CPManager { /// @brief Shutdown the checkpoint manager services. It will not trigger a flush, but cancels any existing /// checkpoint session abruptly. If caller needs clean shutdown, then they explicitly needs to trigger cp flush /// before calling shutdown. - void shutdown(); + void shutdown(bool require_extra_cp = false); /// @brief Register a CP consumer to the checkpoint manager. CP consumer provides the callback they are interested /// in the checkpoint process. Each consumer gets a CPContext, which consumer can put its own dirty buffer info diff --git a/src/lib/checkpoint/cp_mgr.cpp b/src/lib/checkpoint/cp_mgr.cpp index 960d885e2..b595569b9 100644 --- a/src/lib/checkpoint/cp_mgr.cpp +++ b/src/lib/checkpoint/cp_mgr.cpp @@ -114,7 +114,7 @@ void CPManager::create_first_cp() { m_cur_cp->m_cp_start_time = Clock::now(); } -void CPManager::shutdown() { +void CPManager::shutdown(bool require_extra_cp) { LOGINFO("Stopping cp timer"); stop_timer_thread(); @@ -126,6 +126,12 @@ void CPManager::shutdown() { LOGINFO("Trigger cp flush at CP shutdown"); auto success = do_trigger_cp_flush(true /* force */, true /* flush_on_shutdown */).get(); HS_REL_ASSERT_EQ(success, true, "CP Flush failed"); + + if (require_extra_cp) { + success = do_trigger_cp_flush(true /* force */, true /* flush_on_shutdown */).get(); + HS_REL_ASSERT_EQ(success, true, "CP Flush failed"); + } + LOGINFO("Trigger cp done"); delete (m_cur_cp); diff --git a/src/lib/homestore.cpp b/src/lib/homestore.cpp index 3dd23bcc5..e0b0329a9 100644 --- a/src/lib/homestore.cpp +++ b/src/lib/homestore.cpp @@ -324,6 +324,11 @@ void HomeStore::shutdown() { LOGINFO("Homestore shutdown is started"); m_resource_mgr->stop(); + bool is_solo = false; + if (hs()->has_repl_data_service()) { + auto& repl_svc = dynamic_cast< GenericReplService& >(hs()->repl_service()); + is_solo = repl_svc.get_impl_type() == repl_impl_type::solo; + } // 1 stop all the services, after which all the upper layer api call are rejected and there is not on-going request. // Note that, after stopping, all the service are alive. @@ -340,7 +345,11 @@ void HomeStore::shutdown() { // 2 call cp_manager shutdown, which will which trigger cp flush to make sure all the in-memory data of all the // services are flushed to disk. since all the upper layer api call are rejected and there is not on-going request, // so after cp flush is done, we can guarantee all the necessary data are persisted to disk. - m_cp_mgr->shutdown(); + // + // For varsize bitmap allocator (which is used by solo repl dev), the free blks are only made persistent on the cp + // after next, so we need to make sure we do extra cp to have the diskbitmap persisted on disk to avoid log reply on + // a graceful shutdown;; + m_cp_mgr->shutdown(is_solo /*require_extra_cp*/); m_cp_mgr.reset(); // 3 call reset/shutdown to clear all the services and after that all the services are dead, excluding metasevice diff --git a/src/lib/replication/repl_dev/solo_repl_dev.cpp b/src/lib/replication/repl_dev/solo_repl_dev.cpp index f088f3bfd..aef5b26f8 100644 --- a/src/lib/replication/repl_dev/solo_repl_dev.cpp +++ b/src/lib/replication/repl_dev/solo_repl_dev.cpp @@ -172,10 +172,20 @@ void SoloReplDev::async_write_journal(const std::vector< MultiBlkId >& blkids, s } void SoloReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx) { + auto cur_lsn = m_commit_upto.load(); + + auto force_replay = SISL_OPTIONS["solo_force_replay"].as< bool >(); + if (cur_lsn >= lsn and !force_replay) { + // Already committed + LOGINFO("SoloReplDev skipping already committed log_entry lsn={}, m_commit_upto at lsn={}", lsn, cur_lsn); + return; + } + repl_journal_entry const* entry = r_cast< repl_journal_entry const* >(buf.bytes()); uint32_t remain_size = buf.size() - sizeof(repl_journal_entry); HS_REL_ASSERT_EQ(entry->major_version, repl_journal_entry::JOURNAL_ENTRY_MAJOR, "Mismatched version of journal entry found"); + HS_LOG(DEBUG, solorepl, "SoloReplDev found journal entry at lsn={}", lsn); uint8_t const* raw_ptr = r_cast< uint8_t const* >(entry) + sizeof(repl_journal_entry); sisl::blob header{raw_ptr, entry->user_header_size}; @@ -198,16 +208,17 @@ void SoloReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx blkids.push_back(blkid); } - m_listener->on_pre_commit(lsn, header, key, nullptr); - - auto cur_lsn = m_commit_upto.load(); - if (cur_lsn < lsn) { m_commit_upto.compare_exchange_strong(cur_lsn, lsn); } + m_listener->on_pre_commit(lsn, header, key, nullptr /* context */); + if (cur_lsn < lsn) { + // we will only be here when we experienced a crash recocovery; + m_commit_upto.compare_exchange_strong(cur_lsn, lsn); + } for (const auto& blkid : blkids) { data_service().commit_blk(blkid); } - m_listener->on_commit(lsn, header, key, blkids, nullptr); + m_listener->on_commit(lsn, header, key, blkids, nullptr /* context */); } folly::Future< std::error_code > SoloReplDev::async_read(MultiBlkId const& bid, sisl::sg_list& sgs, uint32_t size, @@ -247,14 +258,13 @@ void SoloReplDev::cp_flush(CP*) { void SoloReplDev::truncate() { // Ignore truncate when HS is initializing. And we need atleast 3 checkpoints to start truncating. - if (homestore::hs()->is_initializing() || m_rd_sb->last_checkpoint_lsn_2 <= 0) { return; } // Truncate is safe anything below last_checkpoint_lsn - 2 as all the free blks // before that will be flushed in the last_checkpoint. HS_LOG(INFO, solorepl, "dev={} truncating at lsn={}", boost::uuids::to_string(group_id()), m_rd_sb->last_checkpoint_lsn_2); - m_data_journal->truncate(m_rd_sb->last_checkpoint_lsn_2); + m_data_journal->truncate(m_rd_sb->last_checkpoint_lsn_2, false /*in-memory-only*/); } void SoloReplDev::cp_cleanup(CP*) { diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index 76a94c1dd..50f3cde77 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -137,7 +137,7 @@ if (${io_tests}) add_test(NAME DataService-Epoll COMMAND test_data_service) add_test(NAME RaftReplDev-Epoll COMMAND test_raft_repl_dev) add_test(NAME RaftReplDevDynamic-Epoll COMMAND test_raft_repl_dev_dynamic --override_config homestore_config.consensus.replace_member_sync_check_interval_ms=1000) - add_test(NAME SoloReplDev-Epoll COMMAND test_solo_repl_dev) + add_test(NAME SoloReplDev-Epoll COMMAND test_solo_repl_dev --solo_force_replay=true) endif() can_build_spdk_io_tests(spdk_tests) diff --git a/src/tests/test_common/homestore_test_common.hpp b/src/tests/test_common/homestore_test_common.hpp index 4e18b70d1..857cefe4e 100644 --- a/src/tests/test_common/homestore_test_common.hpp +++ b/src/tests/test_common/homestore_test_common.hpp @@ -59,6 +59,8 @@ SISL_OPTION_GROUP( (num_io, "", "num_io", "number of IO operations", ::cxxopts::value< uint64_t >()->default_value("300"), "number"), (qdepth, "", "qdepth", "Max outstanding operations", ::cxxopts::value< uint32_t >()->default_value("8"), "number"), (spdk, "", "spdk", "spdk", ::cxxopts::value< bool >()->default_value("false"), "true or false"), + (solo_force_replay, "", "solo_force_replay", "solo_force_replay", + ::cxxopts::value< bool >()->default_value("false"), "true or false"), (flip_list, "", "flip_list", "btree flip list", ::cxxopts::value< std::vector< std::string > >(), "flips [...]"), (use_file, "", "use_file", "use file instead of real drive", ::cxxopts::value< bool >()->default_value("false"), "true or false"),