diff --git a/conanfile.py b/conanfile.py index 8101ade90..74801da73 100644 --- a/conanfile.py +++ b/conanfile.py @@ -9,7 +9,7 @@ class HomestoreConan(ConanFile): name = "homestore" - version = "6.13.4" + version = "6.13.5" homepage = "https://github.com/eBay/Homestore" description = "HomeStore Storage Engine" @@ -54,7 +54,7 @@ def build_requirements(self): def requirements(self): self.requires("iomgr/[^11.3]@oss/master", transitive_headers=True) self.requires("sisl/[^12.2]@oss/master", transitive_headers=True) - self.requires("nuraft_mesg/[>=3.7.5]@oss/main", transitive_headers=True) + self.requires("nuraft_mesg/[~3.8.0]@oss/main", transitive_headers=True) self.requires("farmhash/cci.20190513@", transitive_headers=True) if self.settings.arch in ['x86', 'x86_64']: diff --git a/src/include/homestore/blkdata_service.hpp b/src/include/homestore/blkdata_service.hpp index 33a5fe2ac..e1992b983 100644 --- a/src/include/homestore/blkdata_service.hpp +++ b/src/include/homestore/blkdata_service.hpp @@ -114,6 +114,18 @@ class BlkDataService { folly::Future< std::error_code > async_write(sisl::sg_list const& sgs, MultiBlkId const& in_blkids, bool part_of_batch = false); + /** + * @brief : asynchronous write with input block ids; + * + * @param sgs : the data buffer that needs to be written + * @param hints : blk alloc hints + * @param in_blkids : input block ids that this write should be written to; + * @param cb : callback that will be triggered after write completes + * @param part_of_batch : is this write part of a batch; + */ + folly::Future< std::error_code > async_write(sisl::sg_list const& sgs, std::vector< MultiBlkId > const& in_blkids, + bool part_of_batch = false); + /** * @brief Asynchronously reads data from the specified block ID into the provided buffer. * @@ -147,7 +159,8 @@ class BlkDataService { BlkAllocStatus commit_blk(MultiBlkId const& bid); /** - * @brief Allocates a contiguous block of disk space of the given size. + * @brief Allocates a contiguous block of disk space of the given size. This API should be called that when consumer + * is expecting blks only allocated on same chunk. * * @param size The size of the block to allocate, in bytes. * @param hints Hints for how to allocate the block. @@ -156,6 +169,17 @@ class BlkDataService { */ BlkAllocStatus alloc_blks(uint32_t size, blk_alloc_hints const& hints, MultiBlkId& out_blkids); + /** + * @brief Allocates blocks of disk space of the given size.This API should be called when consumer is expecting blk + * allocation happen on different chunks is possible and acceptable. + * + * @param size The size of the block to allocate, in bytes. + * @param hints Hints for how to allocate the block. + * @param out_blkids Output parameter that will be filled with the IDs of the allocated blocks. + * @return The status of the block allocation attempt. + */ + BlkAllocStatus alloc_blks(uint32_t size, blk_alloc_hints const& hints, std::vector< BlkId >& out_blkids); + /** * @brief Asynchronously frees the specified block IDs. * It is asynchronous because it might need to wait for pending read to complete if same block is being read and not diff --git a/src/include/homestore/replication/repl_dev.h b/src/include/homestore/replication/repl_dev.h index 60a0f8430..9a4cba340 100644 --- a/src/include/homestore/replication/repl_dev.h +++ b/src/include/homestore/replication/repl_dev.h @@ -130,7 +130,16 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost:: sisl::blob const& header() const { return m_header; } sisl::blob const& key() const { return m_key; } - MultiBlkId const& local_blkid() const { return m_local_blkid; } + MultiBlkId const& local_blkid() const { + // Currently used by raft repl dev only where a single blob is expected. + // Code checks if its a valid blkid so return a dummy blkid. + if (!m_local_blkids.empty()) + return m_local_blkids[0]; + else + return dummy_blkid; + } + + std::vector< MultiBlkId >& local_blkids() { return m_local_blkids; } RemoteBlkId const& remote_blkid() const { return m_remote_blkid; } const char* data() const { DEBUG_ASSERT(m_data != nullptr, @@ -141,6 +150,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost:: bool has_state(repl_req_state_t s) const { return m_state.load() & uint32_cast(s); } repl_journal_entry const* journal_entry() const { return m_journal_entry; } uint32_t journal_entry_size() const; + uint32_t blkids_serialized_size() const; bool is_localize_pending() const { return m_is_jentry_localize_pending; } bool has_linked_data() const { return (m_op_code == journal_type_t::HS_DATA_LINKED); } @@ -149,6 +159,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost:: /////////////////////// Non modifiers methods ////////////////// std::string to_string() const; std::string to_compact_string() const; + std::string blkids_to_string() const; Clock::time_point created_time() const { return m_start_time; } void set_created_time() { m_start_time = Clock::now(); } bool is_expired() const; @@ -195,7 +206,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost:: bool save_fetched_data(sisl::GenericClientResponse const& fetched_data, uint8_t const* data, uint32_t data_size); void set_remote_blkid(RemoteBlkId const& rbid) { m_remote_blkid = rbid; } - void set_local_blkid(MultiBlkId const& lbid) { m_local_blkid = lbid; } // Only used during recovery + void set_local_blkids(std::vector< MultiBlkId > const& lbids) { m_local_blkids = std::move(lbids); } void set_is_volatile(bool is_volatile) { m_is_volatile.store(is_volatile); } void set_lsn(int64_t lsn); void add_state(repl_req_state_t s); @@ -226,9 +237,10 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost:: std::atomic< bool > m_is_volatile{true}; // Is the log still in memory and not flushed to disk yet /////////////// Data related section ///////////////// - MultiBlkId m_local_blkid; // Local BlkId for the data - RemoteBlkId m_remote_blkid; // Corresponding remote blkid for the data - uint8_t const* m_data; // Raw data pointer containing the actual data + static inline MultiBlkId dummy_blkid; + std::vector< MultiBlkId > m_local_blkids; // Local BlkId for the data + RemoteBlkId m_remote_blkid; // Corresponding remote blkid for the data + uint8_t const* m_data; // Raw data pointer containing the actual data /////////////// Journal/Buf related section ///////////////// std::variant< std::unique_ptr< uint8_t[] >, raft_buf_ptr_t > m_journal_buf; // Buf for the journal entry @@ -400,7 +412,7 @@ class ReplDevListener { virtual void on_no_space_left(repl_lsn_t lsn, chunk_num_t chunk_id) = 0; /// @brief when restart, after all the logs are replayed and before joining raft group, notify the upper layer - virtual void on_log_replay_done(const group_id_t& group_id){}; + virtual void on_log_replay_done(const group_id_t& group_id) {}; private: std::weak_ptr< ReplDev > m_repl_dev; @@ -411,6 +423,39 @@ class ReplDev { ReplDev() = default; virtual ~ReplDev() { detach_listener(); } + /// @brief Allocates blkids from the storage engine to write the value into. Storage + /// engine returns a blkid_list in cases where single contiguous blocks are not + /// available. + /// + /// @param data_size - Size of the data. + /// @param hints - Specify block allocation hints. + /// @param out_blkids - List of bilkid's which may not be contiguous. + virtual std::error_code alloc_blks(uint32_t data_size, const blk_alloc_hints& hints, + std::vector< MultiBlkId >& out_blkids) = 0; + + /// @brief Write data locally using the specified blkid's. Data is split across the blkids. + /// @param blkids - List of blkid's where data will be written. + /// @param value - vector of io buffers that contain value for the key. + /// @param part_of_batch - Is write is part of a batch. If part of the batch, then submit_batch needs to be called + /// at the end + /// @return A Future with std::error_code to notify if it has successfully write the data or any error code in case + /// of failure + virtual folly::Future< std::error_code > async_write(const std::vector< MultiBlkId >& blkids, + sisl::sg_list const& value, bool part_of_batch = false, + trace_id_t tid = 0) = 0; + + /// @brief Creates a log/journal entry with and calls the on_commit listener callback. + /// @param blkids - List of blkid's where data was written. + /// @param header - Blob representing the header (it is opaque and will be copied + /// as-is to the journal entry) + /// @param key - Blob representing the key (it is opaque and will be copied as-is to + /// the journal entry). + /// @param data_size - Size of the data. + /// @param ctx - User supplied context which will be passed to listener callbacks + virtual void async_write_journal(const std::vector< MultiBlkId >& blkids, sisl::blob const& header, + sisl::blob const& key, uint32_t data_size, repl_req_ptr_t ctx, + trace_id_t tid = 0) = 0; + /// @brief Replicate the data to the replica set. This method goes through the /// following steps: /// Step 1: Allocates blkid from the storage engine to write the value into. Storage diff --git a/src/lib/blkdata_svc/blkdata_service.cpp b/src/lib/blkdata_svc/blkdata_service.cpp index 58cc36c61..1219ed00e 100644 --- a/src/lib/blkdata_svc/blkdata_service.cpp +++ b/src/lib/blkdata_svc/blkdata_service.cpp @@ -208,10 +208,35 @@ folly::Future< std::error_code > BlkDataService::async_write(sisl::sg_list const } } +folly::Future< std::error_code > +BlkDataService::async_write(sisl::sg_list const& sgs, std::vector< MultiBlkId > const& blkids, bool part_of_batch) { + if (is_stopping()) return folly::makeFuture< std::error_code >(std::make_error_code(std::errc::operation_canceled)); + incr_pending_request_num(); + static thread_local std::vector< folly::Future< std::error_code > > s_futs; + s_futs.clear(); + for (const auto& blkid : blkids) { + s_futs.emplace_back(async_write(sgs, blkid, part_of_batch)); + } + decr_pending_request_num(); + return collect_all_futures(s_futs); +} + BlkAllocStatus BlkDataService::alloc_blks(uint32_t size, const blk_alloc_hints& hints, MultiBlkId& out_blkids) { if (is_stopping()) return BlkAllocStatus::FAILED; incr_pending_request_num(); - HS_DBG_ASSERT_EQ(size % m_blk_size, 0, "Non aligned size requested"); + HS_DBG_ASSERT_EQ(size % m_blk_size, 0, "Non aligned size requested size={} blk_size={}", size, m_blk_size); + blk_count_t nblks = static_cast< blk_count_t >(size / m_blk_size); + + auto ret = m_vdev->alloc_blks(nblks, hints, out_blkids); + decr_pending_request_num(); + return ret; +} + +BlkAllocStatus BlkDataService::alloc_blks(uint32_t size, const blk_alloc_hints& hints, + std::vector< BlkId >& out_blkids) { + if (is_stopping()) return BlkAllocStatus::FAILED; + incr_pending_request_num(); + HS_DBG_ASSERT_EQ(size % m_blk_size, 0, "Non aligned size requested size={} blk_size={}", size, m_blk_size); blk_count_t nblks = static_cast< blk_count_t >(size / m_blk_size); auto ret = m_vdev->alloc_blks(nblks, hints, out_blkids); @@ -271,8 +296,8 @@ void BlkDataService::start() { void BlkDataService::stop() { start_stopping(); - // we have no way to track the completion of each async io in detail which should be done in iomanager level, so we - // just wait for 3 seconds, and we expect each io will be completed within this time. + // we have no way to track the completion of each async io in detail which should be done in iomanager level, so + // we just wait for 3 seconds, and we expect each io will be completed within this time. // TODO: find a better solution to track the completion of these aysnc calls std::this_thread::sleep_for(std::chrono::milliseconds(3000)); diff --git a/src/lib/replication/repl_dev/common.cpp b/src/lib/replication/repl_dev/common.cpp index 6a39256f9..2782a36a5 100644 --- a/src/lib/replication/repl_dev/common.cpp +++ b/src/lib/replication/repl_dev/common.cpp @@ -63,7 +63,7 @@ repl_req_ctx::~repl_req_ctx() { } void repl_req_ctx::create_journal_entry(bool is_raft_buf, int32_t server_id) { - uint32_t val_size = has_linked_data() ? m_local_blkid.serialized_size() : 0; + uint32_t val_size = has_linked_data() ? blkids_serialized_size() : 0; uint32_t entry_size = sizeof(repl_journal_entry) + m_header.size() + m_key.size() + val_size; if (is_raft_buf) { @@ -94,14 +94,25 @@ void repl_req_ctx::create_journal_entry(bool is_raft_buf, int32_t server_id) { } if (has_linked_data()) { - auto const b = m_local_blkid.serialize(); - std::memcpy(raw_ptr, b.cbytes(), b.size()); + for (const auto& blkid : m_local_blkids) { + auto const b = blkid.serialize(); + std::memcpy(raw_ptr, b.cbytes(), b.size()); + raw_ptr += b.size(); + } } } uint32_t repl_req_ctx::journal_entry_size() const { return sizeof(repl_journal_entry) + m_header.size() + m_key.size() + - (has_linked_data() ? m_local_blkid.serialized_size() : 0); + (has_linked_data() ? blkids_serialized_size() : 0); +} + +uint32_t repl_req_ctx::blkids_serialized_size() const { + uint32_t blkids_serialized_size = 0; + for (const auto& blkid : m_local_blkids) { + blkids_serialized_size += blkid.serialized_size(); + } + return blkids_serialized_size; } void repl_req_ctx::change_raft_journal_buf(raft_buf_ptr_t new_buf, bool adjust_hdr_key) { @@ -128,7 +139,7 @@ ReplServiceError repl_req_ctx::alloc_local_blks(cshared< ReplDevListener >& list // if the committed_blk_id is already present, use it and skip allocation and commitment LOGINFOMOD(replication, "[traceID={}] For Repl_key=[{}] data already exists, skip", rkey().traceID, rkey().to_string()); - m_local_blkid = hints_result.value().committed_blk_id.value(); + m_local_blkids.emplace_back(hints_result.value().committed_blk_id.value()); add_state(repl_req_state_t::BLK_ALLOCATED); add_state(repl_req_state_t::DATA_RECEIVED); add_state(repl_req_state_t::DATA_WRITTEN); @@ -138,14 +149,19 @@ ReplServiceError repl_req_ctx::alloc_local_blks(cshared< ReplDevListener >& list return ReplServiceError::OK; } + std::vector< BlkId > blkids; auto status = data_service().alloc_blks(sisl::round_up(uint32_cast(data_size), data_service().get_blk_size()), - hints_result.value(), m_local_blkid); + hints_result.value(), blkids); if (status != BlkAllocStatus::SUCCESS) { LOGWARNMOD(replication, "[traceID={}] block allocation failure, repl_key=[{}], status=[{}]", rkey().traceID, rkey(), status); DEBUG_ASSERT_EQ(status, BlkAllocStatus::SUCCESS, "Unable to allocate blks"); return ReplServiceError::NO_SPACE_LEFT; } + + for (auto& blkid : blkids) { + m_local_blkids.emplace_back(blkid); + } add_state(repl_req_state_t::BLK_ALLOCATED); return ReplServiceError::OK; } @@ -246,7 +262,7 @@ std::string repl_req_ctx::to_string() const { return fmt::format("repl_key=[{}], lsn={} state=[{}] m_headersize={} m_keysize={} is_proposer={} " "local_blkid={} remote_blkid={}", m_rkey.to_string(), m_lsn, req_state_name(uint32_cast(state())), m_header.size(), m_key.size(), - m_is_proposer, m_local_blkid.to_string(), m_remote_blkid.blkid.to_string()); + m_is_proposer, blkids_to_string(), m_remote_blkid.blkid.to_string()); } std::string repl_req_ctx::to_compact_string() const { @@ -255,7 +271,16 @@ std::string repl_req_ctx::to_compact_string() const { } return fmt::format("dsn={} term={} lsn={} op={} local_blkid={} state=[{}]", m_rkey.dsn, m_rkey.term, m_lsn, - enum_name(m_op_code), m_local_blkid.to_string(), req_state_name(uint32_cast(state()))); + enum_name(m_op_code), blkids_to_string(), req_state_name(uint32_cast(state()))); +} + +std::string repl_req_ctx::blkids_to_string() const { + std::string str = fmt::format("["); + for (const auto& blkid : m_local_blkids) { + fmt::format_to(std::back_inserter(str), "{} ", blkid.to_string()); + } + fmt::format_to(std::back_inserter(str), "]"); + return str; } bool repl_req_ctx::is_expired() const { diff --git a/src/lib/replication/repl_dev/raft_repl_dev.cpp b/src/lib/replication/repl_dev/raft_repl_dev.cpp index 082f4fac4..88aa9d6c3 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.cpp +++ b/src/lib/replication/repl_dev/raft_repl_dev.cpp @@ -337,10 +337,13 @@ void RaftReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const& } } - auto status = init_req_ctx( - rreq, repl_key{.server_id = server_id(), .term = raft_server()->get_term(), .dsn = m_next_dsn.fetch_add(1), .traceID = tid}, - data.size ? journal_type_t::HS_DATA_LINKED : journal_type_t::HS_DATA_INLINED, true /* is_proposer */, header, - key, data.size, m_listener); + auto status = init_req_ctx(rreq, + repl_key{.server_id = server_id(), + .term = raft_server()->get_term(), + .dsn = m_next_dsn.fetch_add(1), + .traceID = tid}, + data.size ? journal_type_t::HS_DATA_LINKED : journal_type_t::HS_DATA_INLINED, + true /* is_proposer */, header, key, data.size, m_listener); if (status != ReplServiceError::OK) { RD_LOGI(tid, "Initializing rreq failed error={}, failing this req", status); @@ -1659,7 +1662,7 @@ void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx MultiBlkId entry_blkid; entry_blkid.deserialize(entry_to_val(jentry), true /* copy */); data_size = entry_blkid.blk_count() * get_blk_size(); - rreq->set_local_blkid(entry_blkid); + rreq->set_local_blkids({entry_blkid}); rreq->add_state(repl_req_state_t::BLK_ALLOCATED); rreq->add_state(repl_req_state_t::DATA_RECEIVED); } diff --git a/src/lib/replication/repl_dev/raft_repl_dev.h b/src/lib/replication/repl_dev/raft_repl_dev.h index bd6a6c448..42d100ebb 100644 --- a/src/lib/replication/repl_dev/raft_repl_dev.h +++ b/src/lib/replication/repl_dev/raft_repl_dev.h @@ -229,6 +229,24 @@ class RaftReplDev : public ReplDev, folly::SemiFuture< ReplServiceError > destroy_group(); //////////////// All ReplDev overrides/implementation /////////////////////// + virtual std::error_code alloc_blks(uint32_t size, const blk_alloc_hints& hints, + std::vector< MultiBlkId >& out_blkids) override { + RD_REL_ASSERT(false, "NOT SUPPORTED"); + return std::make_error_code(std::errc::operation_not_supported); + } + virtual folly::Future< std::error_code > async_write(const std::vector< MultiBlkId >& blkids, + sisl::sg_list const& value, bool part_of_batch = false, + trace_id_t tid = 0) override { + RD_REL_ASSERT(false, "NOT SUPPORTED"); + return folly::makeFuture< std::error_code >(std::make_error_code(std::errc::operation_not_supported)); + } + + virtual void async_write_journal(const std::vector< MultiBlkId >& blkids, sisl::blob const& header, + sisl::blob const& key, uint32_t data_size, repl_req_ptr_t ctx, + trace_id_t tid = 0) override { + RD_REL_ASSERT(false, "NOT SUPPORTED"); + } + void async_alloc_write(sisl::blob const& header, sisl::blob const& key, sisl::sg_list const& value, repl_req_ptr_t ctx, bool part_of_batch = false, trace_id_t tid = 0) override; folly::Future< std::error_code > async_read(MultiBlkId const& blkid, sisl::sg_list& sgs, uint32_t size, diff --git a/src/lib/replication/repl_dev/solo_repl_dev.cpp b/src/lib/replication/repl_dev/solo_repl_dev.cpp index bc6bdb8bb..587cb8b2e 100644 --- a/src/lib/replication/repl_dev/solo_repl_dev.cpp +++ b/src/lib/replication/repl_dev/solo_repl_dev.cpp @@ -39,7 +39,7 @@ void SoloReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const& // If it is header only entry, directly write to the journal if (rreq->has_linked_data() && !rreq->has_state(repl_req_state_t::DATA_WRITTEN)) { // Write the data - data_service().async_write(value, rreq->local_blkid()).thenValue([this, rreq = std::move(rreq)](auto&& err) { + data_service().async_write(value, rreq->local_blkids()).thenValue([this, rreq = std::move(rreq)](auto&& err) { HS_REL_ASSERT(!err, "Error in writing data"); // TODO: Find a way to return error to the Listener write_journal(std::move(rreq)); }); @@ -60,12 +60,92 @@ void SoloReplDev::write_journal(repl_req_ptr_t rreq) { auto cur_lsn = m_commit_upto.load(); if (cur_lsn < lsn) { m_commit_upto.compare_exchange_strong(cur_lsn, lsn); } - data_service().commit_blk(rreq->local_blkid()); - m_listener->on_commit(rreq->lsn(), rreq->header(), rreq->key(), {rreq->local_blkid()}, rreq); + for (const auto& blkid : rreq->local_blkids()) { + data_service().commit_blk(blkid); + } + m_listener->on_commit(rreq->lsn(), rreq->header(), rreq->key(), rreq->local_blkids(), rreq); decr_pending_request_num(); }); } +std::error_code SoloReplDev::alloc_blks(uint32_t data_size, const blk_alloc_hints& hints, + std::vector< MultiBlkId >& out_blkids) { + if (is_stopping()) { return std::make_error_code(std::errc::operation_canceled); } + + incr_pending_request_num(); + std::vector< BlkId > blkids; + auto status = + data_service().alloc_blks(sisl::round_up(uint32_cast(data_size), data_service().get_blk_size()), hints, blkids); + if (status != BlkAllocStatus::SUCCESS) { + DEBUG_ASSERT_EQ(status, BlkAllocStatus::SUCCESS, "Unable to allocate blks"); + decr_pending_request_num(); + return std::make_error_code(std::errc::no_space_on_device); + } + for (auto& blkid : blkids) { + out_blkids.emplace_back(blkid); + } + decr_pending_request_num(); + return std::error_code{}; +} + +folly::Future< std::error_code > SoloReplDev::async_write(const std::vector< MultiBlkId >& blkids, + sisl::sg_list const& value, bool part_of_batch, + trace_id_t tid) { + if (is_stopping()) { + return folly::makeFuture< std::error_code >(std::make_error_code(std::errc::operation_canceled)); + } + + incr_pending_request_num(); + HS_REL_ASSERT_GT(blkids.size(), 0, "Empty blkid vec"); + std::vector< folly::Future< std::error_code > > futs; + futs.reserve(blkids.size()); + sisl::sg_iterator sg_it{value.iovs}; + + for (const auto& blkid : blkids) { + auto sgs_size = blkid.blk_count() * data_service().get_blk_size(); + const auto iovs = sg_it.next_iovs(sgs_size); + uint32_t total_size = 0; + for (auto& iov : iovs) { + total_size += iov.iov_len; + } + if (total_size != sgs_size) { + LOGINFO("Block size mismatch total_size={} sgs_size={}", total_size, sgs_size); + return folly::makeFuture< std::error_code >(std::make_error_code(std::errc::invalid_argument)); + } + sisl::sg_list sgs{sgs_size, iovs}; + futs.emplace_back(data_service().async_write(sgs, blkid, part_of_batch)); + } + + return folly::collectAllUnsafe(futs).thenValue([this](auto&& v_res) { + for (const auto& err_c : v_res) { + if (sisl_unlikely(err_c.value())) { + return folly::makeFuture< std::error_code >(std::make_error_code(std::errc::io_error)); + } + } + + decr_pending_request_num(); + return folly::makeFuture< std::error_code >(std::error_code{}); + }); +} + +void SoloReplDev::async_write_journal(const std::vector< MultiBlkId >& blkids, sisl::blob const& header, + sisl::blob const& key, uint32_t data_size, repl_req_ptr_t rreq, trace_id_t tid) { + if (is_stopping()) { return; } + incr_pending_request_num(); + + // We expect clients to provide valid repl req ctx with blocks allocated. + HS_REL_ASSERT(rreq, "Invalid repl req ctx"); + rreq->add_state(repl_req_state_t::BLK_ALLOCATED); + rreq->set_local_blkids(blkids); + auto status = rreq->init(repl_key{.server_id = 0, .term = 1, .dsn = 1, .traceID = tid}, + data_size ? journal_type_t::HS_DATA_LINKED : journal_type_t::HS_DATA_INLINED, true, header, + key, data_size, m_listener); + HS_REL_ASSERT_EQ(status, ReplServiceError::OK, "Error in initializing repl req context."); + + // Write to journal. + write_journal(std::move(rreq)); +} + void SoloReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx) { repl_journal_entry const* entry = r_cast< repl_journal_entry const* >(buf.bytes()); uint32_t remain_size = buf.size() - sizeof(repl_journal_entry); @@ -83,22 +163,27 @@ void SoloReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx raw_ptr += entry->key_size; remain_size -= entry->key_size; - sisl::blob value_blob{raw_ptr, remain_size}; - MultiBlkId blkid; - if (remain_size) { blkid.deserialize(value_blob, true /* copy */); } + std::vector< MultiBlkId > blkids; + while (remain_size > 0) { + MultiBlkId blkid; + sisl::blob value_blob{raw_ptr, sizeof(BlkId)}; + blkid.deserialize(value_blob, true /* copy */); + raw_ptr += sizeof(BlkId); + remain_size -= sizeof(BlkId); + blkids.push_back(blkid); + } m_listener->on_pre_commit(lsn, header, key, nullptr); auto cur_lsn = m_commit_upto.load(); if (cur_lsn < lsn) { m_commit_upto.compare_exchange_strong(cur_lsn, lsn); } - m_listener->on_commit(lsn, header, key, {blkid}, nullptr); + m_listener->on_commit(lsn, header, key, blkids, nullptr); } folly::Future< std::error_code > SoloReplDev::async_read(MultiBlkId const& bid, sisl::sg_list& sgs, uint32_t size, bool part_of_batch, trace_id_t tid) { if (is_stopping()) { - LOGINFO("repl dev is being shutdown!"); return folly::makeFuture< std::error_code >(std::make_error_code(std::errc::operation_canceled)); } incr_pending_request_num(); @@ -109,7 +194,6 @@ folly::Future< std::error_code > SoloReplDev::async_read(MultiBlkId const& bid, folly::Future< std::error_code > SoloReplDev::async_free_blks(int64_t, MultiBlkId const& bid, trace_id_t tid) { if (is_stopping()) { - LOGINFO("repl dev is being shutdown!"); return folly::makeFuture< std::error_code >(std::make_error_code(std::errc::operation_canceled)); } incr_pending_request_num(); diff --git a/src/lib/replication/repl_dev/solo_repl_dev.h b/src/lib/replication/repl_dev/solo_repl_dev.h index 63838f254..35f089ec5 100644 --- a/src/lib/replication/repl_dev/solo_repl_dev.h +++ b/src/lib/replication/repl_dev/solo_repl_dev.h @@ -39,7 +39,14 @@ class SoloReplDev : public ReplDev { SoloReplDev(superblk< repl_dev_superblk >&& rd_sb, bool load_existing); virtual ~SoloReplDev() = default; - // TODO: implement graceful shutdown for solo repl dev + virtual std::error_code alloc_blks(uint32_t data_size, const blk_alloc_hints& hints, + std::vector< MultiBlkId >& out_blkids) override; + virtual folly::Future< std::error_code > async_write(const std::vector< MultiBlkId >& blkids, + sisl::sg_list const& value, bool part_of_batch = false, + trace_id_t tid = 0) override; + virtual void async_write_journal(const std::vector< MultiBlkId >& blkids, sisl::blob const& header, + sisl::blob const& key, uint32_t data_size, repl_req_ptr_t ctx, + trace_id_t tid = 0) override; void async_alloc_write(sisl::blob const& header, sisl::blob const& key, sisl::sg_list const& value, repl_req_ptr_t ctx, bool part_of_batch = false, trace_id_t tid = 0) override; diff --git a/src/tests/test_solo_repl_dev.cpp b/src/tests/test_solo_repl_dev.cpp index 13bcc52b0..4d271efcb 100644 --- a/src/tests/test_solo_repl_dev.cpp +++ b/src/tests/test_solo_repl_dev.cpp @@ -63,22 +63,15 @@ struct test_repl_req : public repl_req_ctx { sisl::byte_array header; sisl::byte_array key; sisl::sg_list write_sgs; - sisl::sg_list read_sgs; - MultiBlkId written_blkids; + std::vector< MultiBlkId > written_blkids; - test_repl_req() { - write_sgs.size = 0; - read_sgs.size = 0; - } + test_repl_req() { write_sgs.size = 0; } ~test_repl_req() { for (auto const& iov : write_sgs.iovs) { iomanager.iobuf_free(uintptr_cast(iov.iov_base)); } - - for (auto const& iov : read_sgs.iovs) { - iomanager.iobuf_free(uintptr_cast(iov.iov_base)); - } } + struct journal_header { uint32_t key_size; uint64_t key_pattern; @@ -100,12 +93,11 @@ class SoloReplDevTest : public testing::Test { void on_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key, std::vector< MultiBlkId > const& blkids, cintrusive< repl_req_ctx >& ctx) override { LOGINFO("Received on_commit lsn={}", lsn); - HS_REL_ASSERT(!blkids.empty(), "Invalid blkids size"); if (ctx == nullptr) { - m_test.validate_replay(*repl_dev(), lsn, header, key, blkids[0]); + m_test.validate_replay(*repl_dev(), lsn, header, key, blkids); } else { auto req = boost::static_pointer_cast< test_repl_req >(ctx); - req->written_blkids = blkids[0]; + req->written_blkids = std::move(blkids); m_test.on_write_complete(*repl_dev(), req); } } @@ -231,60 +223,116 @@ class SoloReplDevTest : public testing::Test { rdev->async_alloc_write(*req->header, req->key ? *req->key : sisl::blob{}, req->write_sgs, req); } + void async_write_data_and_journal(uint32_t key_size, uint64_t data_size, uint32_t max_size_per_iov) { + data_size = data_size == 0 ? g_block_size : data_size; + auto req = intrusive< test_repl_req >(new test_repl_req()); + req->header = sisl::make_byte_array(sizeof(test_repl_req::journal_header)); + auto hdr = r_cast< test_repl_req::journal_header* >(req->header->bytes()); + hdr->key_size = key_size; + hdr->key_pattern = ((long long)rand() << 32) | rand(); + hdr->data_size = data_size; + hdr->data_pattern = ((long long)rand() << 32) | rand(); + + if (key_size != 0) { + req->key = sisl::make_byte_array(key_size); + HSTestHelper::fill_data_buf(req->key->bytes(), key_size, hdr->key_pattern); + } + + req->write_sgs = HSTestHelper::create_sgs(data_size, max_size_per_iov, hdr->data_pattern); + + auto& rdev = (rand() % 2) ? m_repl_dev1 : m_repl_dev2; + + auto const cap = hs()->repl_service().get_cap_stats(); + LOGDEBUG("Before write, cap stats: used={} total={}", cap.used_capacity, cap.total_capacity); + + std::vector< MultiBlkId > blkids; + blk_alloc_hints hints; + auto err = rdev->alloc_blks(data_size, hints, blkids); + RELEASE_ASSERT(!err, "Error during alloc_blks"); + RELEASE_ASSERT(!blkids.empty(), "Empty blkids"); + + rdev->async_write(blkids, req->write_sgs).thenValue([this, rdev, blkids, data_size, req](auto&& err) { + RELEASE_ASSERT(!err, "Error during async_write"); + rdev->async_write_journal(blkids, *req->header, req->key ? *req->key : sisl::blob{}, data_size, req); + }); + } + void validate_replay(ReplDev& rdev, int64_t lsn, sisl::blob const& header, sisl::blob const& key, - MultiBlkId const& blkids) { + std::vector< MultiBlkId > const& blkids) { + if (blkids.empty()) { + m_task_waiter.one_complete(); + return; + } + auto const jhdr = r_cast< test_repl_req::journal_header const* >(header.cbytes()); HSTestHelper::validate_data_buf(key.cbytes(), key.size(), jhdr->key_pattern); - - uint32_t size = blkids.blk_count() * g_block_size; - if (size) { - auto read_sgs = HSTestHelper::create_sgs(size, size); - LOGINFO("[{}] Validating replay of lsn={} blkid = {}", boost::uuids::to_string(rdev.group_id()), lsn, - blkids.to_string()); - rdev.async_read(blkids, read_sgs, size) - .thenValue([this, hdr = *jhdr, read_sgs, lsn, blkids, &rdev](auto&& err) { - RELEASE_ASSERT(!err, "Error during async_read"); - HS_REL_ASSERT_EQ(hdr.data_size, read_sgs.size, "journal hdr data size mismatch with actual size"); - - for (auto const& iov : read_sgs.iovs) { - HSTestHelper::validate_data_buf(uintptr_cast(iov.iov_base), iov.iov_len, hdr.data_pattern); - iomanager.iobuf_free(uintptr_cast(iov.iov_base)); - } - LOGINFO("[{}] Replay of lsn={} blkid={} validated successfully", - boost::uuids::to_string(rdev.group_id()), lsn, blkids.to_string()); - m_task_waiter.one_complete(); - }); - } else { - m_task_waiter.one_complete(); + uint64_t total_io = blkids.size(); + auto io_count = std::make_shared< std::atomic< uint64_t > >(0); + for (const auto& blkid : blkids) { + uint32_t size = blkid.blk_count() * g_block_size; + if (size) { + auto read_sgs = HSTestHelper::create_sgs(size, size); + LOGDEBUG("[{}] Validating replay of lsn={} blkid = {}", boost::uuids::to_string(rdev.group_id()), lsn, + blkid.to_string()); + rdev.async_read(blkid, read_sgs, size) + .thenValue([this, io_count, total_io, hdr = *jhdr, read_sgs, lsn, blkid, &rdev](auto&& err) { + RELEASE_ASSERT(!err, "Error during async_read"); + // HS_REL_ASSERT_EQ(hdr.data_size, read_sgs.size, + // "journal hdr data size mismatch with actual size"); + + for (auto const& iov : read_sgs.iovs) { + HSTestHelper::validate_data_buf(uintptr_cast(iov.iov_base), iov.iov_len, hdr.data_pattern); + iomanager.iobuf_free(uintptr_cast(iov.iov_base)); + } + LOGDEBUG("[{}] Replay of lsn={} blkid={} validated successfully", + boost::uuids::to_string(rdev.group_id()), lsn, blkid.to_string()); + + io_count->fetch_add(1); + if (*io_count == total_io) { m_task_waiter.one_complete(); } + }); + } else { + m_task_waiter.one_complete(); + } } } void on_write_complete(ReplDev& rdev, intrusive< test_repl_req > req) { - // If we did send some data to the repl_dev, validate it by doing async_read - if (req->write_sgs.size != 0) { - req->read_sgs = HSTestHelper::create_sgs(req->write_sgs.size, req->write_sgs.size); - - auto const cap = hs()->repl_service().get_cap_stats(); - LOGINFO("Write complete with cap stats: used={} total={}", cap.used_capacity, cap.total_capacity); - - rdev.async_read(req->written_blkids, req->read_sgs, req->read_sgs.size) - .thenValue([this, &rdev, req](auto&& err) { - RELEASE_ASSERT(!err, "Error during async_read"); - - LOGINFO("[{}] Write complete with lsn={} for size={} blkids={}", - boost::uuids::to_string(rdev.group_id()), req->lsn(), req->write_sgs.size, - req->written_blkids.to_string()); - auto hdr = r_cast< test_repl_req::journal_header* >(req->header->bytes()); - HS_REL_ASSERT_EQ(hdr->data_size, req->read_sgs.size, - "journal hdr data size mismatch with actual size"); - - for (auto const& iov : req->read_sgs.iovs) { - HSTestHelper::validate_data_buf(uintptr_cast(iov.iov_base), iov.iov_len, hdr->data_pattern); - } - m_io_runner.next_task(); - }); - } else { + if (req->written_blkids.empty()) { m_io_runner.next_task(); + return; + } + + // If we did send some data to the repl_dev, validate it by doing async_read + auto io_count = std::make_shared< std::atomic< uint64_t > >(0); + for (const auto blkid : req->written_blkids) { + if (req->write_sgs.size != 0) { + auto const cap = hs()->repl_service().get_cap_stats(); + LOGDEBUG("Write complete with cap stats: used={} total={}", cap.used_capacity, cap.total_capacity); + + auto sgs_size = blkid.blk_count() * g_block_size; + auto read_sgs = HSTestHelper::create_sgs(sgs_size, sgs_size); + rdev.async_read(blkid, read_sgs, read_sgs.size) + .thenValue([this, io_count, blkid, &rdev, sgs_size, read_sgs, req](auto&& err) { + RELEASE_ASSERT(!err, "Error during async_read"); + + LOGINFO("[{}] Write complete with lsn={} for size={} blkid={}", + boost::uuids::to_string(rdev.group_id()), req->lsn(), sgs_size, blkid.to_string()); + auto hdr = r_cast< test_repl_req::journal_header* >(req->header->bytes()); + // HS_REL_ASSERT_EQ(hdr->data_size, read_sgs.size, + // "journal hdr data size mismatch with actual size"); + + for (auto const& iov : read_sgs.iovs) { + LOGDEBUG("Read data blkid={} len={} data={}", blkid.to_integer(), iov.iov_len, + *(uint64_t*)iov.iov_base); + HSTestHelper::validate_data_buf(uintptr_cast(iov.iov_base), iov.iov_len, hdr->data_pattern); + iomanager.iobuf_free(uintptr_cast(iov.iov_base)); + } + io_count->fetch_add(1); + if (*io_count == req->written_blkids.size()) { m_io_runner.next_task(); } + }); + } else { + m_io_runner.next_task(); + } } } }; @@ -319,6 +367,19 @@ TEST_F(SoloReplDevTest, TestHeaderOnly) { this->m_task_waiter.start([this]() { this->restart(); }).get(); } +TEST_F(SoloReplDevTest, TestAsyncWriteJournal) { + LOGINFO("Step 1: run on worker threads to schedule write for random bytes ranging {}-{}.", 0, 1 * Mi); + this->m_io_runner.set_task([this]() { + uint32_t nblks = rand() % ((1 * Mi) / g_block_size); + uint32_t key_size = rand() % 512 + 8; + this->async_write_data_and_journal(key_size, nblks * g_block_size, g_block_size); + }); + + this->m_io_runner.execute().get(); + LOGINFO("Step 2: Restart homestore and validate replay data.", g_block_size); + this->m_task_waiter.start([this]() { this->restart(); }).get(); +} + SISL_OPTION_GROUP(test_solo_repl_dev, (block_size, "", "block_size", "block size to io", ::cxxopts::value< uint32_t >()->default_value("4096"), "number"));