From 06fbe7d783a5439690012fcc1633a1e7cc4a403c Mon Sep 17 00:00:00 2001 From: Sanal P Date: Mon, 28 Apr 2025 11:00:43 -0700 Subject: [PATCH] Add async_write, alloc blks for solo repl dev. Add support for async write data, journal, alloc blks for solo repl dev. Raft repl dev doesnt support these operations. This is needed for nublocks where it need to write free blkids also to the journal. Free blocks are obtained after writing the new blkids to index. Add apis for allocation and write for vector of blkids . Raft repldev currently uses only a single blkid. Test solo repl dev changes to support vector of blkids. --- conanfile.py | 4 +- src/include/homestore/blkdata_service.hpp | 26 ++- src/include/homestore/replication/repl_dev.h | 57 +++++- src/lib/blkdata_svc/blkdata_service.cpp | 31 ++- src/lib/replication/repl_dev/common.cpp | 41 +++- .../replication/repl_dev/raft_repl_dev.cpp | 13 +- src/lib/replication/repl_dev/raft_repl_dev.h | 18 ++ .../replication/repl_dev/solo_repl_dev.cpp | 102 +++++++++- src/lib/replication/repl_dev/solo_repl_dev.h | 9 +- src/tests/test_solo_repl_dev.cpp | 179 ++++++++++++------ 10 files changed, 386 insertions(+), 94 deletions(-) diff --git a/conanfile.py b/conanfile.py index 8101ade90..74801da73 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.13.4" + version = "6.13.5" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" @@ -54,7 +54,7 @@ def build_requirements(self): def requirements(self): self.requires("iomgr/[^11.3]@oss/master", transitive_headers=True) self.requires("sisl/[^12.2]@oss/master", transitive_headers=True) - self.requires("nuraft_mesg/[>=3.7.5]@oss/main", transitive_headers=True) + self.requires("nuraft_mesg/[~3.8.0]@oss/main", transitive_headers=True) self.requires("farmhash/cci.20190513@", transitive_headers=True) if self.settings.arch in ['x86', 'x86_64']: diff --git a/src/include/homestore/blkdata_service.hpp b/src/include/homestore/blkdata_service.hpp index 33a5fe2ac..e1992b983 100644 --- a/src/include/homestore/blkdata_service.hpp +++ b/src/include/homestore/blkdata_service.hpp @@ -114,6 +114,18 @@ class BlkDataService { folly::Future< std::error_code > async_write(sisl::sg_list const& sgs, MultiBlkId const& in_blkids, bool part_of_batch = false); + /** + * @brief : asynchronous write with input block ids; + * + * @param sgs : the data buffer that needs to be written + * @param hints : blk alloc hints + * @param in_blkids : input block ids that this write should be written to; + * @param cb : callback that will be triggered after write completes + * @param part_of_batch : is this write part of a batch; + */ + folly::Future< std::error_code > async_write(sisl::sg_list const& sgs, std::vector< MultiBlkId > const& in_blkids, + bool part_of_batch = false); + /** * @brief Asynchronously reads data from the specified block ID into the provided buffer. * @@ -147,7 +159,8 @@ class BlkDataService { BlkAllocStatus commit_blk(MultiBlkId const& bid); /** - * @brief Allocates a contiguous block of disk space of the given size. + * @brief Allocates a contiguous block of disk space of the given size. This API should be called that when consumer + * is expecting blks only allocated on same chunk. * * @param size The size of the block to allocate, in bytes. * @param hints Hints for how to allocate the block. @@ -156,6 +169,17 @@ class BlkDataService { */ BlkAllocStatus alloc_blks(uint32_t size, blk_alloc_hints const& hints, MultiBlkId& out_blkids); + /** + * @brief Allocates blocks of disk space of the given size.This API should be called when consumer is expecting blk + * allocation happen on different chunks is possible and acceptable. + * + * @param size The size of the block to allocate, in bytes. + * @param hints Hints for how to allocate the block. + * @param out_blkids Output parameter that will be filled with the IDs of the allocated blocks. + * @return The status of the block allocation attempt. + */ + BlkAllocStatus alloc_blks(uint32_t size, blk_alloc_hints const& hints, std::vector< BlkId >& out_blkids); + /** * @brief Asynchronously frees the specified block IDs. * It is asynchronous because it might need to wait for pending read to complete if same block is being read and not diff --git a/src/include/homestore/replication/repl_dev.h b/src/include/homestore/replication/repl_dev.h index 60a0f8430..9a4cba340 100644 --- a/src/include/homestore/replication/repl_dev.h +++ b/src/include/homestore/replication/repl_dev.h @@ -130,7 +130,16 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost:: sisl::blob const& header() const { return m_header; } sisl::blob const& key() const { return m_key; } - MultiBlkId const& local_blkid() const { return m_local_blkid; } + MultiBlkId const& local_blkid() const { + // Currently used by raft repl dev only where a single blob is expected. + // Code checks if its a valid blkid so return a dummy blkid. + if (!m_local_blkids.empty()) + return m_local_blkids[0]; + else + return dummy_blkid; + } + + std::vector< MultiBlkId >& local_blkids() { return m_local_blkids; } RemoteBlkId const& remote_blkid() const { return m_remote_blkid; } const char* data() const { DEBUG_ASSERT(m_data != nullptr, @@ -141,6 +150,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost:: bool has_state(repl_req_state_t s) const { return m_state.load() & uint32_cast(s); } repl_journal_entry const* journal_entry() const { return m_journal_entry; } uint32_t journal_entry_size() const; + uint32_t blkids_serialized_size() const; bool is_localize_pending() const { return m_is_jentry_localize_pending; } bool has_linked_data() const { return (m_op_code == journal_type_t::HS_DATA_LINKED); } @@ -149,6 +159,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost:: /////////////////////// Non modifiers methods ////////////////// std::string to_string() const; std::string to_compact_string() const; + std::string blkids_to_string() const; Clock::time_point created_time() const { return m_start_time; } void set_created_time() { m_start_time = Clock::now(); } bool is_expired() const; @@ -195,7 +206,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost:: bool save_fetched_data(sisl::GenericClientResponse const& fetched_data, uint8_t const* data, uint32_t data_size); void set_remote_blkid(RemoteBlkId const& rbid) { m_remote_blkid = rbid; } - void set_local_blkid(MultiBlkId const& lbid) { m_local_blkid = lbid; } // Only used during recovery + void set_local_blkids(std::vector< MultiBlkId > const& lbids) { m_local_blkids = std::move(lbids); } void set_is_volatile(bool is_volatile) { m_is_volatile.store(is_volatile); } void set_lsn(int64_t lsn); void add_state(repl_req_state_t s); @@ -226,9 +237,10 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost:: std::atomic< bool > m_is_volatile{true}; // Is the log still in memory and not flushed to disk yet /////////////// Data related section ///////////////// - MultiBlkId m_local_blkid; // Local BlkId for the data - RemoteBlkId m_remote_blkid; // Corresponding remote blkid for the data - uint8_t const* m_data; // Raw data pointer containing the actual data + static inline MultiBlkId dummy_blkid; + std::vector< MultiBlkId > m_local_blkids; // Local BlkId for the data + RemoteBlkId m_remote_blkid; // Corresponding remote blkid for the data + uint8_t const* m_data; // Raw data pointer containing the actual data /////////////// Journal/Buf related section ///////////////// std::variant< std::unique_ptr< uint8_t[] >, raft_buf_ptr_t > m_journal_buf; // Buf for the journal entry @@ -400,7 +412,7 @@ class ReplDevListener { virtual void on_no_space_left(repl_lsn_t lsn, chunk_num_t chunk_id) = 0; /// @brief when restart, after all the logs are replayed and before joining raft group, notify the upper layer - virtual void on_log_replay_done(const group_id_t& group_id){}; + virtual void on_log_replay_done(const group_id_t& group_id) {}; private: std::weak_ptr< ReplDev > m_repl_dev; @@ -411,6 +423,39 @@ class ReplDev { ReplDev() = default; virtual ~ReplDev() { detach_listener(); } + /// @brief Allocates blkids from the storage engine to write the value into. Storage + /// engine returns a blkid_list in cases where single contiguous blocks are not + /// available. + /// + /// @param data_size - Size of the data. + /// @param hints - Specify block allocation hints. + /// @param out_blkids - List of bilkid's which may not be contiguous. + virtual std::error_code alloc_blks(uint32_t data_size, const blk_alloc_hints& hints, + std::vector< MultiBlkId >& out_blkids) = 0; + + /// @brief Write data locally using the specified blkid's. Data is split across the blkids. + /// @param blkids - List of blkid's where data will be written. + /// @param value - vector of io buffers that contain value for the key. + /// @param part_of_batch - Is write is part of a batch. If part of the batch, then submit_batch needs to be called + /// at the end + /// @return A Future with std::error_code to notify if it has successfully write the data or any error code in case + /// of failure + virtual folly::Future< std::error_code > async_write(const std::vector< MultiBlkId >& blkids, + sisl::sg_list const& value, bool part_of_batch = false, + trace_id_t tid = 0) = 0; + + /// @brief Creates a log/journal entry with and calls the on_commit listener callback. + /// @param blkids - List of blkid's where data was written. + /// @param header - Blob representing the header (it is opaque and will be copied + /// as-is to the journal entry) + /// @param key - Blob representing the key (it is opaque and will be copied as-is to + /// the journal entry). + /// @param data_size - Size of the data. + /// @param ctx - User supplied context which will be passed to listener callbacks + virtual void async_write_journal(const std::vector< MultiBlkId >& blkids, sisl::blob const& header, + sisl::blob const& key, uint32_t data_size, repl_req_ptr_t ctx, + trace_id_t tid = 0) = 0; + /// @brief Replicate the data to the replica set. This method goes through the /// following steps: /// Step 1: Allocates blkid from the storage engine to write the value into. Storage diff --git a/src/lib/blkdata_svc/blkdata_service.cpp b/src/lib/blkdata_svc/blkdata_service.cpp index 58cc36c61..1219ed00e 100644 --- a/src/lib/blkdata_svc/blkdata_service.cpp +++ b/src/lib/blkdata_svc/blkdata_service.cpp @@ -208,10 +208,35 @@ folly::Future< std::error_code > BlkDataService::async_write(sisl::sg_list const } } +folly::Future< std::error_code > +BlkDataService::async_write(sisl::sg_list const& sgs, std::vector< MultiBlkId > const& blkids, bool part_of_batch) { + if (is_stopping()) return folly::makeFuture< std::error_code >(std::make_error_code(std::errc::operation_canceled)); + incr_pending_request_num(); + static thread_local std::vector< folly::Future< std::error_code > > s_futs; + s_futs.clear(); + for (const auto& blkid : blkids) { + s_futs.emplace_back(async_write(sgs, blkid, part_of_batch)); + } + decr_pending_request_num(); + return collect_all_futures(s_futs); +} + BlkAllocStatus BlkDataService::alloc_blks(uint32_t size, const blk_alloc_hints& hints, MultiBlkId& out_blkids) { if (is_stopping()) return BlkAllocStatus::FAILED; incr_pending_request_num(); - HS_DBG_ASSERT_EQ(size % m_blk_size, 0, "Non aligned size requested"); + HS_DBG_ASSERT_EQ(size % m_blk_size, 0, "Non aligned size requested size={} blk_size={}", size, m_blk_size); + blk_count_t nblks = static_cast< blk_count_t >(size / m_blk_size); + + auto ret = m_vdev->alloc_blks(nblks, hints, out_blkids); + decr_pending_request_num(); + return ret; +} + +BlkAllocStatus BlkDataService::alloc_blks(uint32_t size, const blk_alloc_hints& hints, + std::vector< BlkId >& out_blkids) { + if (is_stopping()) return BlkAllocStatus::FAILED; + incr_pending_request_num(); + HS_DBG_ASSERT_EQ(size % m_blk_size, 0, "Non aligned size requested size={} blk_size={}", size, m_blk_size); blk_count_t nblks = static_cast< blk_count_t >(size / m_blk_size); auto ret = m_vdev->alloc_blks(nblks, hints, out_blkids); @@ -271,8 +296,8 @@ void BlkDataService::start() { void BlkDataService::stop() { start_stopping(); - // we have no way to track the completion of each async io in detail which should be done in iomanager level, so we - // just wait for 3 seconds, and we expect each io will be completed within this time. + // we have no way to track the completion of each async io in detail which should be done in iomanager level, so + // we just wait for 3 seconds, and we expect each io will be completed within this time. // TODO: find a better solution to track the completion of these aysnc calls std::this_thread::sleep_for(std::chrono::milliseconds(3000)); diff --git a/src/lib/replication/repl_dev/common.cpp b/src/lib/replication/repl_dev/common.cpp index 6a39256f9..2782a36a5 100644 --- a/src/lib/replication/repl_dev/common.cpp +++ b/src/lib/replication/repl_dev/common.cpp @@ -63,7 +63,7 @@ repl_req_ctx::~repl_req_ctx() { } void repl_req_ctx::create_journal_entry(bool is_raft_buf, int32_t server_id) { - uint32_t val_size = has_linked_data() ? m_local_blkid.serialized_size() : 0; + uint32_t val_size = has_linked_data() ? blkids_serialized_size() : 0; uint32_t entry_size = sizeof(repl_journal_entry) + m_header.size() + m_key.size() + val_size; if (is_raft_buf) { @@ -94,14 +94,25 @@ void repl_req_ctx::create_journal_entry(bool is_raft_buf, int32_t server_id) { } if (has_linked_data()) { - auto const b = m_local_blkid.serialize(); - std::memcpy(raw_ptr, b.cbytes(), b.size()); + for (const auto& blkid : m_local_blkids) { + auto const b = blkid.serialize(); + std::memcpy(raw_ptr, b.cbytes(), b.size()); + raw_ptr += b.size(); + } } } uint32_t repl_req_ctx::journal_entry_size() const { return sizeof(repl_journal_entry) + m_header.size() + m_key.size() + - (has_linked_data() ? m_local_blkid.serialized_size() : 0); + (has_linked_data() ? blkids_serialized_size() : 0); +} + +uint32_t repl_req_ctx::blkids_serialized_size() const { + uint32_t blkids_serialized_size = 0; + for (const auto& blkid : m_local_blkids) { + blkids_serialized_size += blkid.serialized_size(); + } + return blkids_serialized_size; } void repl_req_ctx::change_raft_journal_buf(raft_buf_ptr_t new_buf, bool adjust_hdr_key) { @@ -128,7 +139,7 @@ ReplServiceError repl_req_ctx::alloc_local_blks(cshared< ReplDevListener >& list // if the committed_blk_id is already present, use it and skip allocation and commitment LOGINFOMOD(replication, "[traceID={}] For Repl_key=[{}] data already exists, skip", rkey().traceID, rkey().to_string()); - m_local_blkid = hints_result.value().committed_blk_id.value(); + m_local_blkids.emplace_back(hints_result.value().committed_blk_id.value()); add_state(repl_req_state_t::BLK_ALLOCATED); add_state(repl_req_state_t::DATA_RECEIVED); add_state(repl_req_state_t::DATA_WRITTEN); @@ -138,14 +149,19 @@ ReplServiceError repl_req_ctx::alloc_local_blks(cshared< ReplDevListener >& list return ReplServiceError::OK; } + std::vector< BlkId > blkids; auto status = data_service().alloc_blks(sisl::round_up(uint32_cast(data_size), data_service().get_blk_size()), - hints_result.value(), m_local_blkid); + hints_result.value(), blkids); if (status != BlkAllocStatus::SUCCESS) { LOGWARNMOD(replication, "[traceID={}] block allocation failure, repl_key=[{}], status=[{}]", rkey().traceID, rkey(), status); DEBUG_ASSERT_EQ(status, BlkAllocStatus::SUCCESS, "Unable to allocate blks"); return ReplServiceError::NO_SPACE_LEFT; } + + for (auto& blkid : blkids) { + m_local_blkids.emplace_back(blkid); + } add_state(repl_req_state_t::BLK_ALLOCATED); return ReplServiceError::OK; } @@ -246,7 +262,7 @@ std::string repl_req_ctx::to_string() const { return fmt::format("repl_key=[{}], lsn={} state=[{}] m_headersize={} m_keysize={} is_proposer={} " "local_blkid={} remote_blkid={}", m_rkey.to_string(), m_lsn, req_state_name(uint32_cast(state())), m_header.size(), m_key.size(), - m_is_proposer, m_local_blkid.to_string(), m_remote_blkid.blkid.to_string()); + m_is_proposer, blkids_to_string(), m_remote_blkid.blkid.to_string()); } std::string repl_req_ctx::to_compact_string() const { @@ -255,7 +271,16 @@ std::string repl_req_ctx::to_compact_string() const { } return fmt::format("dsn={} term={} lsn={} op={} local_blkid={} state=[{}]", m_rkey.dsn, m_rkey.term, m_lsn, - enum_name(m_op_code), m_local_blkid.to_string(), req_state_name(uint32_cast(state()))); + enum_name(m_op_code), blkids_to_string(), req_state_name(uint32_cast(state()))); +} + +std::string repl_req_ctx::blkids_to_string() const { + std::string str = fmt::format("["); + for (const auto& blkid : m_local_blkids) { + fmt::format_to(std::back_inserter(str), "{} ", blkid.to_string()); + } + fmt::format_to(std::back_inserter(str), "]"); + return str; } bool repl_req_ctx::is_expired() const { diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 082f4fac4..88aa9d6c3 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -337,10 +337,13 @@ void RaftReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const& } } - auto status = init_req_ctx( - rreq, repl_key{.server_id = server_id(), .term = raft_server()->get_term(), .dsn = m_next_dsn.fetch_add(1), .traceID = tid}, - data.size ? journal_type_t::HS_DATA_LINKED : journal_type_t::HS_DATA_INLINED, true /* is_proposer */, header, - key, data.size, m_listener); + auto status = init_req_ctx(rreq, + repl_key{.server_id = server_id(), + .term = raft_server()->get_term(), + .dsn = m_next_dsn.fetch_add(1), + .traceID = tid}, + data.size ? journal_type_t::HS_DATA_LINKED : journal_type_t::HS_DATA_INLINED, + true /* is_proposer */, header, key, data.size, m_listener); if (status != ReplServiceError::OK) { RD_LOGI(tid, "Initializing rreq failed error={}, failing this req", status); @@ -1659,7 +1662,7 @@ void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx MultiBlkId entry_blkid; entry_blkid.deserialize(entry_to_val(jentry), true /* copy */); data_size = entry_blkid.blk_count() * get_blk_size(); - rreq->set_local_blkid(entry_blkid); + rreq->set_local_blkids({entry_blkid}); rreq->add_state(repl_req_state_t::BLK_ALLOCATED); rreq->add_state(repl_req_state_t::DATA_RECEIVED); } diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index bd6a6c448..42d100ebb 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -229,6 +229,24 @@ class RaftReplDev : public ReplDev, folly::SemiFuture< ReplServiceError > destroy_group(); //////////////// All ReplDev overrides/implementation /////////////////////// + virtual std::error_code alloc_blks(uint32_t size, const blk_alloc_hints& hints, + std::vector< MultiBlkId >& out_blkids) override { + RD_REL_ASSERT(false, "NOT SUPPORTED"); + return std::make_error_code(std::errc::operation_not_supported); + } + virtual folly::Future< std::error_code > async_write(const std::vector< MultiBlkId >& blkids, + sisl::sg_list const& value, bool part_of_batch = false, + trace_id_t tid = 0) override { + RD_REL_ASSERT(false, "NOT SUPPORTED"); + return folly::makeFuture< std::error_code >(std::make_error_code(std::errc::operation_not_supported)); + } + + virtual void async_write_journal(const std::vector< MultiBlkId >& blkids, sisl::blob const& header, + sisl::blob const& key, uint32_t data_size, repl_req_ptr_t ctx, + trace_id_t tid = 0) override { + RD_REL_ASSERT(false, "NOT SUPPORTED"); + } + void async_alloc_write(sisl::blob const& header, sisl::blob const& key, sisl::sg_list const& value, repl_req_ptr_t ctx, bool part_of_batch = false, trace_id_t tid = 0) override; folly::Future< std::error_code > async_read(MultiBlkId const& blkid, sisl::sg_list& sgs, uint32_t size, diff --git a/src/lib/replication/repl_dev/solo_repl_dev.cpp b/src/lib/replication/repl_dev/solo_repl_dev.cpp index bc6bdb8bb..587cb8b2e 100644 --- a/src/lib/replication/repl_dev/solo_repl_dev.cpp +++ b/src/lib/replication/repl_dev/solo_repl_dev.cpp @@ -39,7 +39,7 @@ void SoloReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const& // If it is header only entry, directly write to the journal if (rreq->has_linked_data() && !rreq->has_state(repl_req_state_t::DATA_WRITTEN)) { // Write the data - data_service().async_write(value, rreq->local_blkid()).thenValue([this, rreq = std::move(rreq)](auto&& err) { + data_service().async_write(value, rreq->local_blkids()).thenValue([this, rreq = std::move(rreq)](auto&& err) { HS_REL_ASSERT(!err, "Error in writing data"); // TODO: Find a way to return error to the Listener write_journal(std::move(rreq)); }); @@ -60,12 +60,92 @@ void SoloReplDev::write_journal(repl_req_ptr_t rreq) { auto cur_lsn = m_commit_upto.load(); if (cur_lsn < lsn) { m_commit_upto.compare_exchange_strong(cur_lsn, lsn); } - data_service().commit_blk(rreq->local_blkid()); - m_listener->on_commit(rreq->lsn(), rreq->header(), rreq->key(), {rreq->local_blkid()}, rreq); + for (const auto& blkid : rreq->local_blkids()) { + data_service().commit_blk(blkid); + } + m_listener->on_commit(rreq->lsn(), rreq->header(), rreq->key(), rreq->local_blkids(), rreq); decr_pending_request_num(); }); } +std::error_code SoloReplDev::alloc_blks(uint32_t data_size, const blk_alloc_hints& hints, + std::vector< MultiBlkId >& out_blkids) { + if (is_stopping()) { return std::make_error_code(std::errc::operation_canceled); } + + incr_pending_request_num(); + std::vector< BlkId > blkids; + auto status = + data_service().alloc_blks(sisl::round_up(uint32_cast(data_size), data_service().get_blk_size()), hints, blkids); + if (status != BlkAllocStatus::SUCCESS) { + DEBUG_ASSERT_EQ(status, BlkAllocStatus::SUCCESS, "Unable to allocate blks"); + decr_pending_request_num(); + return std::make_error_code(std::errc::no_space_on_device); + } + for (auto& blkid : blkids) { + out_blkids.emplace_back(blkid); + } + decr_pending_request_num(); + return std::error_code{}; +} + +folly::Future< std::error_code > SoloReplDev::async_write(const std::vector< MultiBlkId >& blkids, + sisl::sg_list const& value, bool part_of_batch, + trace_id_t tid) { + if (is_stopping()) { + return folly::makeFuture< std::error_code >(std::make_error_code(std::errc::operation_canceled)); + } + + incr_pending_request_num(); + HS_REL_ASSERT_GT(blkids.size(), 0, "Empty blkid vec"); + std::vector< folly::Future< std::error_code > > futs; + futs.reserve(blkids.size()); + sisl::sg_iterator sg_it{value.iovs}; + + for (const auto& blkid : blkids) { + auto sgs_size = blkid.blk_count() * data_service().get_blk_size(); + const auto iovs = sg_it.next_iovs(sgs_size); + uint32_t total_size = 0; + for (auto& iov : iovs) { + total_size += iov.iov_len; + } + if (total_size != sgs_size) { + LOGINFO("Block size mismatch total_size={} sgs_size={}", total_size, sgs_size); + return folly::makeFuture< std::error_code >(std::make_error_code(std::errc::invalid_argument)); + } + sisl::sg_list sgs{sgs_size, iovs}; + futs.emplace_back(data_service().async_write(sgs, blkid, part_of_batch)); + } + + return folly::collectAllUnsafe(futs).thenValue([this](auto&& v_res) { + for (const auto& err_c : v_res) { + if (sisl_unlikely(err_c.value())) { + return folly::makeFuture< std::error_code >(std::make_error_code(std::errc::io_error)); + } + } + + decr_pending_request_num(); + return folly::makeFuture< std::error_code >(std::error_code{}); + }); +} + +void SoloReplDev::async_write_journal(const std::vector< MultiBlkId >& blkids, sisl::blob const& header, + sisl::blob const& key, uint32_t data_size, repl_req_ptr_t rreq, trace_id_t tid) { + if (is_stopping()) { return; } + incr_pending_request_num(); + + // We expect clients to provide valid repl req ctx with blocks allocated. + HS_REL_ASSERT(rreq, "Invalid repl req ctx"); + rreq->add_state(repl_req_state_t::BLK_ALLOCATED); + rreq->set_local_blkids(blkids); + auto status = rreq->init(repl_key{.server_id = 0, .term = 1, .dsn = 1, .traceID = tid}, + data_size ? journal_type_t::HS_DATA_LINKED : journal_type_t::HS_DATA_INLINED, true, header, + key, data_size, m_listener); + HS_REL_ASSERT_EQ(status, ReplServiceError::OK, "Error in initializing repl req context."); + + // Write to journal. + write_journal(std::move(rreq)); +} + void SoloReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx) { repl_journal_entry const* entry = r_cast< repl_journal_entry const* >(buf.bytes()); uint32_t remain_size = buf.size() - sizeof(repl_journal_entry); @@ -83,22 +163,27 @@ void SoloReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx raw_ptr += entry->key_size; remain_size -= entry->key_size; - sisl::blob value_blob{raw_ptr, remain_size}; - MultiBlkId blkid; - if (remain_size) { blkid.deserialize(value_blob, true /* copy */); } + std::vector< MultiBlkId > blkids; + while (remain_size > 0) { + MultiBlkId blkid; + sisl::blob value_blob{raw_ptr, sizeof(BlkId)}; + blkid.deserialize(value_blob, true /* copy */); + raw_ptr += sizeof(BlkId); + remain_size -= sizeof(BlkId); + blkids.push_back(blkid); + } m_listener->on_pre_commit(lsn, header, key, nullptr); auto cur_lsn = m_commit_upto.load(); if (cur_lsn < lsn) { m_commit_upto.compare_exchange_strong(cur_lsn, lsn); } - m_listener->on_commit(lsn, header, key, {blkid}, nullptr); + m_listener->on_commit(lsn, header, key, blkids, nullptr); } folly::Future< std::error_code > SoloReplDev::async_read(MultiBlkId const& bid, sisl::sg_list& sgs, uint32_t size, bool part_of_batch, trace_id_t tid) { if (is_stopping()) { - LOGINFO("repl dev is being shutdown!"); return folly::makeFuture< std::error_code >(std::make_error_code(std::errc::operation_canceled)); } incr_pending_request_num(); @@ -109,7 +194,6 @@ folly::Future< std::error_code > SoloReplDev::async_read(MultiBlkId const& bid, folly::Future< std::error_code > SoloReplDev::async_free_blks(int64_t, MultiBlkId const& bid, trace_id_t tid) { if (is_stopping()) { - LOGINFO("repl dev is being shutdown!"); return folly::makeFuture< std::error_code >(std::make_error_code(std::errc::operation_canceled)); } incr_pending_request_num(); diff --git a/src/lib/replication/repl_dev/solo_repl_dev.h b/src/lib/replication/repl_dev/solo_repl_dev.h index 63838f254..35f089ec5 100644 --- a/src/lib/replication/repl_dev/solo_repl_dev.h +++ b/src/lib/replication/repl_dev/solo_repl_dev.h @@ -39,7 +39,14 @@ class SoloReplDev : public ReplDev { SoloReplDev(superblk< repl_dev_superblk >&& rd_sb, bool load_existing); virtual ~SoloReplDev() = default; - // TODO: implement graceful shutdown for solo repl dev + virtual std::error_code alloc_blks(uint32_t data_size, const blk_alloc_hints& hints, + std::vector< MultiBlkId >& out_blkids) override; + virtual folly::Future< std::error_code > async_write(const std::vector< MultiBlkId >& blkids, + sisl::sg_list const& value, bool part_of_batch = false, + trace_id_t tid = 0) override; + virtual void async_write_journal(const std::vector< MultiBlkId >& blkids, sisl::blob const& header, + sisl::blob const& key, uint32_t data_size, repl_req_ptr_t ctx, + trace_id_t tid = 0) override; void async_alloc_write(sisl::blob const& header, sisl::blob const& key, sisl::sg_list const& value, repl_req_ptr_t ctx, bool part_of_batch = false, trace_id_t tid = 0) override; diff --git a/src/tests/test_solo_repl_dev.cpp b/src/tests/test_solo_repl_dev.cpp index 13bcc52b0..4d271efcb 100644 --- a/src/tests/test_solo_repl_dev.cpp +++ b/src/tests/test_solo_repl_dev.cpp @@ -63,22 +63,15 @@ struct test_repl_req : public repl_req_ctx { sisl::byte_array header; sisl::byte_array key; sisl::sg_list write_sgs; - sisl::sg_list read_sgs; - MultiBlkId written_blkids; + std::vector< MultiBlkId > written_blkids; - test_repl_req() { - write_sgs.size = 0; - read_sgs.size = 0; - } + test_repl_req() { write_sgs.size = 0; } ~test_repl_req() { for (auto const& iov : write_sgs.iovs) { iomanager.iobuf_free(uintptr_cast(iov.iov_base)); } - - for (auto const& iov : read_sgs.iovs) { - iomanager.iobuf_free(uintptr_cast(iov.iov_base)); - } } + struct journal_header { uint32_t key_size; uint64_t key_pattern; @@ -100,12 +93,11 @@ class SoloReplDevTest : public testing::Test { void on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, std::vector< MultiBlkId > const& blkids, cintrusive< repl_req_ctx >& ctx) override { LOGINFO("Received on_commit lsn={}", lsn); - HS_REL_ASSERT(!blkids.empty(), "Invalid blkids size"); if (ctx == nullptr) { - m_test.validate_replay(*repl_dev(), lsn, header, key, blkids[0]); + m_test.validate_replay(*repl_dev(), lsn, header, key, blkids); } else { auto req = boost::static_pointer_cast< test_repl_req >(ctx); - req->written_blkids = blkids[0]; + req->written_blkids = std::move(blkids); m_test.on_write_complete(*repl_dev(), req); } } @@ -231,60 +223,116 @@ class SoloReplDevTest : public testing::Test { rdev->async_alloc_write(*req->header, req->key ? *req->key : sisl::blob{}, req->write_sgs, req); } + void async_write_data_and_journal(uint32_t key_size, uint64_t data_size, uint32_t max_size_per_iov) { + data_size = data_size == 0 ? g_block_size : data_size; + auto req = intrusive< test_repl_req >(new test_repl_req()); + req->header = sisl::make_byte_array(sizeof(test_repl_req::journal_header)); + auto hdr = r_cast< test_repl_req::journal_header* >(req->header->bytes()); + hdr->key_size = key_size; + hdr->key_pattern = ((long long)rand() << 32) | rand(); + hdr->data_size = data_size; + hdr->data_pattern = ((long long)rand() << 32) | rand(); + + if (key_size != 0) { + req->key = sisl::make_byte_array(key_size); + HSTestHelper::fill_data_buf(req->key->bytes(), key_size, hdr->key_pattern); + } + + req->write_sgs = HSTestHelper::create_sgs(data_size, max_size_per_iov, hdr->data_pattern); + + auto& rdev = (rand() % 2) ? m_repl_dev1 : m_repl_dev2; + + auto const cap = hs()->repl_service().get_cap_stats(); + LOGDEBUG("Before write, cap stats: used={} total={}", cap.used_capacity, cap.total_capacity); + + std::vector< MultiBlkId > blkids; + blk_alloc_hints hints; + auto err = rdev->alloc_blks(data_size, hints, blkids); + RELEASE_ASSERT(!err, "Error during alloc_blks"); + RELEASE_ASSERT(!blkids.empty(), "Empty blkids"); + + rdev->async_write(blkids, req->write_sgs).thenValue([this, rdev, blkids, data_size, req](auto&& err) { + RELEASE_ASSERT(!err, "Error during async_write"); + rdev->async_write_journal(blkids, *req->header, req->key ? *req->key : sisl::blob{}, data_size, req); + }); + } + void validate_replay(ReplDev& rdev, int64_t lsn, sisl::blob const& header, sisl::blob const& key, - MultiBlkId const& blkids) { + std::vector< MultiBlkId > const& blkids) { + if (blkids.empty()) { + m_task_waiter.one_complete(); + return; + } + auto const jhdr = r_cast< test_repl_req::journal_header const* >(header.cbytes()); HSTestHelper::validate_data_buf(key.cbytes(), key.size(), jhdr->key_pattern); - - uint32_t size = blkids.blk_count() * g_block_size; - if (size) { - auto read_sgs = HSTestHelper::create_sgs(size, size); - LOGINFO("[{}] Validating replay of lsn={} blkid = {}", boost::uuids::to_string(rdev.group_id()), lsn, - blkids.to_string()); - rdev.async_read(blkids, read_sgs, size) - .thenValue([this, hdr = *jhdr, read_sgs, lsn, blkids, &rdev](auto&& err) { - RELEASE_ASSERT(!err, "Error during async_read"); - HS_REL_ASSERT_EQ(hdr.data_size, read_sgs.size, "journal hdr data size mismatch with actual size"); - - for (auto const& iov : read_sgs.iovs) { - HSTestHelper::validate_data_buf(uintptr_cast(iov.iov_base), iov.iov_len, hdr.data_pattern); - iomanager.iobuf_free(uintptr_cast(iov.iov_base)); - } - LOGINFO("[{}] Replay of lsn={} blkid={} validated successfully", - boost::uuids::to_string(rdev.group_id()), lsn, blkids.to_string()); - m_task_waiter.one_complete(); - }); - } else { - m_task_waiter.one_complete(); + uint64_t total_io = blkids.size(); + auto io_count = std::make_shared< std::atomic< uint64_t > >(0); + for (const auto& blkid : blkids) { + uint32_t size = blkid.blk_count() * g_block_size; + if (size) { + auto read_sgs = HSTestHelper::create_sgs(size, size); + LOGDEBUG("[{}] Validating replay of lsn={} blkid = {}", boost::uuids::to_string(rdev.group_id()), lsn, + blkid.to_string()); + rdev.async_read(blkid, read_sgs, size) + .thenValue([this, io_count, total_io, hdr = *jhdr, read_sgs, lsn, blkid, &rdev](auto&& err) { + RELEASE_ASSERT(!err, "Error during async_read"); + // HS_REL_ASSERT_EQ(hdr.data_size, read_sgs.size, + // "journal hdr data size mismatch with actual size"); + + for (auto const& iov : read_sgs.iovs) { + HSTestHelper::validate_data_buf(uintptr_cast(iov.iov_base), iov.iov_len, hdr.data_pattern); + iomanager.iobuf_free(uintptr_cast(iov.iov_base)); + } + LOGDEBUG("[{}] Replay of lsn={} blkid={} validated successfully", + boost::uuids::to_string(rdev.group_id()), lsn, blkid.to_string()); + + io_count->fetch_add(1); + if (*io_count == total_io) { m_task_waiter.one_complete(); } + }); + } else { + m_task_waiter.one_complete(); + } } } void on_write_complete(ReplDev& rdev, intrusive< test_repl_req > req) { - // If we did send some data to the repl_dev, validate it by doing async_read - if (req->write_sgs.size != 0) { - req->read_sgs = HSTestHelper::create_sgs(req->write_sgs.size, req->write_sgs.size); - - auto const cap = hs()->repl_service().get_cap_stats(); - LOGINFO("Write complete with cap stats: used={} total={}", cap.used_capacity, cap.total_capacity); - - rdev.async_read(req->written_blkids, req->read_sgs, req->read_sgs.size) - .thenValue([this, &rdev, req](auto&& err) { - RELEASE_ASSERT(!err, "Error during async_read"); - - LOGINFO("[{}] Write complete with lsn={} for size={} blkids={}", - boost::uuids::to_string(rdev.group_id()), req->lsn(), req->write_sgs.size, - req->written_blkids.to_string()); - auto hdr = r_cast< test_repl_req::journal_header* >(req->header->bytes()); - HS_REL_ASSERT_EQ(hdr->data_size, req->read_sgs.size, - "journal hdr data size mismatch with actual size"); - - for (auto const& iov : req->read_sgs.iovs) { - HSTestHelper::validate_data_buf(uintptr_cast(iov.iov_base), iov.iov_len, hdr->data_pattern); - } - m_io_runner.next_task(); - }); - } else { + if (req->written_blkids.empty()) { m_io_runner.next_task(); + return; + } + + // If we did send some data to the repl_dev, validate it by doing async_read + auto io_count = std::make_shared< std::atomic< uint64_t > >(0); + for (const auto blkid : req->written_blkids) { + if (req->write_sgs.size != 0) { + auto const cap = hs()->repl_service().get_cap_stats(); + LOGDEBUG("Write complete with cap stats: used={} total={}", cap.used_capacity, cap.total_capacity); + + auto sgs_size = blkid.blk_count() * g_block_size; + auto read_sgs = HSTestHelper::create_sgs(sgs_size, sgs_size); + rdev.async_read(blkid, read_sgs, read_sgs.size) + .thenValue([this, io_count, blkid, &rdev, sgs_size, read_sgs, req](auto&& err) { + RELEASE_ASSERT(!err, "Error during async_read"); + + LOGINFO("[{}] Write complete with lsn={} for size={} blkid={}", + boost::uuids::to_string(rdev.group_id()), req->lsn(), sgs_size, blkid.to_string()); + auto hdr = r_cast< test_repl_req::journal_header* >(req->header->bytes()); + // HS_REL_ASSERT_EQ(hdr->data_size, read_sgs.size, + // "journal hdr data size mismatch with actual size"); + + for (auto const& iov : read_sgs.iovs) { + LOGDEBUG("Read data blkid={} len={} data={}", blkid.to_integer(), iov.iov_len, + *(uint64_t*)iov.iov_base); + HSTestHelper::validate_data_buf(uintptr_cast(iov.iov_base), iov.iov_len, hdr->data_pattern); + iomanager.iobuf_free(uintptr_cast(iov.iov_base)); + } + io_count->fetch_add(1); + if (*io_count == req->written_blkids.size()) { m_io_runner.next_task(); } + }); + } else { + m_io_runner.next_task(); + } } } }; @@ -319,6 +367,19 @@ TEST_F(SoloReplDevTest, TestHeaderOnly) { this->m_task_waiter.start([this]() { this->restart(); }).get(); } +TEST_F(SoloReplDevTest, TestAsyncWriteJournal) { + LOGINFO("Step 1: run on worker threads to schedule write for random bytes ranging {}-{}.", 0, 1 * Mi); + this->m_io_runner.set_task([this]() { + uint32_t nblks = rand() % ((1 * Mi) / g_block_size); + uint32_t key_size = rand() % 512 + 8; + this->async_write_data_and_journal(key_size, nblks * g_block_size, g_block_size); + }); + + this->m_io_runner.execute().get(); + LOGINFO("Step 2: Restart homestore and validate replay data.", g_block_size); + this->m_task_waiter.start([this]() { this->restart(); }).get(); +} + SISL_OPTION_GROUP(test_solo_repl_dev, (block_size, "", "block_size", "block size to io", ::cxxopts::value< uint32_t >()->default_value("4096"), "number"));