Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "7.0.1"
version = "7.0.3"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
2 changes: 1 addition & 1 deletion src/include/homestore/checkpoint/cp_mgr.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class CPManager {
/// @brief Shutdown the checkpoint manager services. It will not trigger a flush, but cancels any existing
/// checkpoint session abruptly. If caller needs clean shutdown, then they explicitly needs to trigger cp flush
/// before calling shutdown.
void shutdown();
void shutdown(bool require_extra_cp = false);

/// @brief Register a CP consumer to the checkpoint manager. CP consumer provides the callback they are interested
/// in the checkpoint process. Each consumer gets a CPContext, which consumer can put its own dirty buffer info
Expand Down
8 changes: 7 additions & 1 deletion src/lib/checkpoint/cp_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ void CPManager::create_first_cp() {
m_cur_cp->m_cp_start_time = Clock::now();
}

void CPManager::shutdown() {
void CPManager::shutdown(bool require_extra_cp) {
LOGINFO("Stopping cp timer");
stop_timer_thread();

Expand All @@ -126,6 +126,12 @@ void CPManager::shutdown() {
LOGINFO("Trigger cp flush at CP shutdown");
auto success = do_trigger_cp_flush(true /* force */, true /* flush_on_shutdown */).get();
HS_REL_ASSERT_EQ(success, true, "CP Flush failed");

if (require_extra_cp) {
success = do_trigger_cp_flush(true /* force */, true /* flush_on_shutdown */).get();
HS_REL_ASSERT_EQ(success, true, "CP Flush failed");
}

LOGINFO("Trigger cp done");

delete (m_cur_cp);
Expand Down
11 changes: 10 additions & 1 deletion src/lib/homestore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,11 @@ void HomeStore::shutdown() {
LOGINFO("Homestore shutdown is started");

m_resource_mgr->stop();
bool is_solo = false;
if (hs()->has_repl_data_service()) {
auto& repl_svc = dynamic_cast< GenericReplService& >(hs()->repl_service());
is_solo = repl_svc.get_impl_type() == repl_impl_type::solo;
}

// 1 stop all the services, after which all the upper layer api call are rejected and there is not on-going request.
// Note that, after stopping, all the service are alive.
Expand All @@ -340,7 +345,11 @@ void HomeStore::shutdown() {
// 2 call cp_manager shutdown, which will which trigger cp flush to make sure all the in-memory data of all the
// services are flushed to disk. since all the upper layer api call are rejected and there is not on-going request,
// so after cp flush is done, we can guarantee all the necessary data are persisted to disk.
m_cp_mgr->shutdown();
//
// For varsize bitmap allocator (which is used by solo repl dev), the free blks are only made persistent on the cp
// after next, so we need to make sure we do extra cp to have the diskbitmap persisted on disk to avoid log reply on
// a graceful shutdown;;
m_cp_mgr->shutdown(is_solo /*require_extra_cp*/);
m_cp_mgr.reset();

// 3 call reset/shutdown to clear all the services and after that all the services are dead, excluding metasevice
Expand Down
24 changes: 17 additions & 7 deletions src/lib/replication/repl_dev/solo_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,20 @@ void SoloReplDev::async_write_journal(const std::vector< MultiBlkId >& blkids, s
}

void SoloReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx) {
auto cur_lsn = m_commit_upto.load();

auto force_replay = SISL_OPTIONS["solo_force_replay"].as< bool >();
if (cur_lsn >= lsn and !force_replay) {
// Already committed
LOGINFO("SoloReplDev skipping already committed log_entry lsn={}, m_commit_upto at lsn={}", lsn, cur_lsn);
return;
}

repl_journal_entry const* entry = r_cast< repl_journal_entry const* >(buf.bytes());
uint32_t remain_size = buf.size() - sizeof(repl_journal_entry);
HS_REL_ASSERT_EQ(entry->major_version, repl_journal_entry::JOURNAL_ENTRY_MAJOR,
"Mismatched version of journal entry found");
HS_LOG(DEBUG, solorepl, "SoloReplDev found journal entry at lsn={}", lsn);

uint8_t const* raw_ptr = r_cast< uint8_t const* >(entry) + sizeof(repl_journal_entry);
sisl::blob header{raw_ptr, entry->user_header_size};
Expand All @@ -198,16 +208,17 @@ void SoloReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx
blkids.push_back(blkid);
}

m_listener->on_pre_commit(lsn, header, key, nullptr);

auto cur_lsn = m_commit_upto.load();
if (cur_lsn < lsn) { m_commit_upto.compare_exchange_strong(cur_lsn, lsn); }
m_listener->on_pre_commit(lsn, header, key, nullptr /* context */);
if (cur_lsn < lsn) {
// we will only be here when we experienced a crash recocovery;
m_commit_upto.compare_exchange_strong(cur_lsn, lsn);
}

for (const auto& blkid : blkids) {
data_service().commit_blk(blkid);
}

m_listener->on_commit(lsn, header, key, blkids, nullptr);
m_listener->on_commit(lsn, header, key, blkids, nullptr /* context */);
}

folly::Future< std::error_code > SoloReplDev::async_read(MultiBlkId const& bid, sisl::sg_list& sgs, uint32_t size,
Expand Down Expand Up @@ -247,14 +258,13 @@ void SoloReplDev::cp_flush(CP*) {

void SoloReplDev::truncate() {
// Ignore truncate when HS is initializing. And we need atleast 3 checkpoints to start truncating.

if (homestore::hs()->is_initializing() || m_rd_sb->last_checkpoint_lsn_2 <= 0) { return; }

// Truncate is safe anything below last_checkpoint_lsn - 2 as all the free blks
// before that will be flushed in the last_checkpoint.
HS_LOG(INFO, solorepl, "dev={} truncating at lsn={}", boost::uuids::to_string(group_id()),
m_rd_sb->last_checkpoint_lsn_2);
m_data_journal->truncate(m_rd_sb->last_checkpoint_lsn_2);
m_data_journal->truncate(m_rd_sb->last_checkpoint_lsn_2, false /*in-memory-only*/);
}

void SoloReplDev::cp_cleanup(CP*) {
Expand Down
2 changes: 1 addition & 1 deletion src/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ if (${io_tests})
add_test(NAME DataService-Epoll COMMAND test_data_service)
add_test(NAME RaftReplDev-Epoll COMMAND test_raft_repl_dev)
add_test(NAME RaftReplDevDynamic-Epoll COMMAND test_raft_repl_dev_dynamic --override_config homestore_config.consensus.replace_member_sync_check_interval_ms=1000)
add_test(NAME SoloReplDev-Epoll COMMAND test_solo_repl_dev)
add_test(NAME SoloReplDev-Epoll COMMAND test_solo_repl_dev --solo_force_replay=true)
endif()

can_build_spdk_io_tests(spdk_tests)
Expand Down
2 changes: 2 additions & 0 deletions src/tests/test_common/homestore_test_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ SISL_OPTION_GROUP(
(num_io, "", "num_io", "number of IO operations", ::cxxopts::value< uint64_t >()->default_value("300"), "number"),
(qdepth, "", "qdepth", "Max outstanding operations", ::cxxopts::value< uint32_t >()->default_value("8"), "number"),
(spdk, "", "spdk", "spdk", ::cxxopts::value< bool >()->default_value("false"), "true or false"),
(solo_force_replay, "", "solo_force_replay", "solo_force_replay",
::cxxopts::value< bool >()->default_value("false"), "true or false"),
(flip_list, "", "flip_list", "btree flip list", ::cxxopts::value< std::vector< std::string > >(), "flips [...]"),
(use_file, "", "use_file", "use file instead of real drive", ::cxxopts::value< bool >()->default_value("false"),
"true or false"),
Expand Down