Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.13.4"
version = "6.13.5"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down Expand Up @@ -54,7 +54,7 @@ def build_requirements(self):
def requirements(self):
self.requires("iomgr/[^11.3]@oss/master", transitive_headers=True)
self.requires("sisl/[^12.2]@oss/master", transitive_headers=True)
self.requires("nuraft_mesg/[>=3.7.5]@oss/main", transitive_headers=True)
self.requires("nuraft_mesg/[~3.8.0]@oss/main", transitive_headers=True)

self.requires("farmhash/cci.20190513@", transitive_headers=True)
if self.settings.arch in ['x86', 'x86_64']:
Expand Down
26 changes: 25 additions & 1 deletion src/include/homestore/blkdata_service.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,18 @@ class BlkDataService {
folly::Future< std::error_code > async_write(sisl::sg_list const& sgs, MultiBlkId const& in_blkids,
bool part_of_batch = false);

/**
* @brief : asynchronous write with input block ids;
*
* @param sgs : the data buffer that needs to be written
* @param hints : blk alloc hints
* @param in_blkids : input block ids that this write should be written to;
* @param cb : callback that will be triggered after write completes
* @param part_of_batch : is this write part of a batch;
*/
folly::Future< std::error_code > async_write(sisl::sg_list const& sgs, std::vector< MultiBlkId > const& in_blkids,
bool part_of_batch = false);

/**
* @brief Asynchronously reads data from the specified block ID into the provided buffer.
*
Expand Down Expand Up @@ -147,7 +159,8 @@ class BlkDataService {
BlkAllocStatus commit_blk(MultiBlkId const& bid);

/**
* @brief Allocates a contiguous block of disk space of the given size.
* @brief Allocates a contiguous block of disk space of the given size. This API should be called that when consumer
* is expecting blks only allocated on same chunk.
*
* @param size The size of the block to allocate, in bytes.
* @param hints Hints for how to allocate the block.
Expand All @@ -156,6 +169,17 @@ class BlkDataService {
*/
BlkAllocStatus alloc_blks(uint32_t size, blk_alloc_hints const& hints, MultiBlkId& out_blkids);

/**
* @brief Allocates blocks of disk space of the given size.This API should be called when consumer is expecting blk
* allocation happen on different chunks is possible and acceptable.
*
* @param size The size of the block to allocate, in bytes.
* @param hints Hints for how to allocate the block.
* @param out_blkids Output parameter that will be filled with the IDs of the allocated blocks.
* @return The status of the block allocation attempt.
*/
BlkAllocStatus alloc_blks(uint32_t size, blk_alloc_hints const& hints, std::vector< BlkId >& out_blkids);

/**
* @brief Asynchronously frees the specified block IDs.
* It is asynchronous because it might need to wait for pending read to complete if same block is being read and not
Expand Down
57 changes: 51 additions & 6 deletions src/include/homestore/replication/repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,16 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::

sisl::blob const& header() const { return m_header; }
sisl::blob const& key() const { return m_key; }
MultiBlkId const& local_blkid() const { return m_local_blkid; }
MultiBlkId const& local_blkid() const {
// Currently used by raft repl dev only where a single blob is expected.
// Code checks if its a valid blkid so return a dummy blkid.
if (!m_local_blkids.empty())
return m_local_blkids[0];
else
return dummy_blkid;
}

std::vector< MultiBlkId >& local_blkids() { return m_local_blkids; }
RemoteBlkId const& remote_blkid() const { return m_remote_blkid; }
const char* data() const {
DEBUG_ASSERT(m_data != nullptr,
Expand All @@ -141,6 +150,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::
bool has_state(repl_req_state_t s) const { return m_state.load() & uint32_cast(s); }
repl_journal_entry const* journal_entry() const { return m_journal_entry; }
uint32_t journal_entry_size() const;
uint32_t blkids_serialized_size() const;
bool is_localize_pending() const { return m_is_jentry_localize_pending; }
bool has_linked_data() const { return (m_op_code == journal_type_t::HS_DATA_LINKED); }

Expand All @@ -149,6 +159,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::
/////////////////////// Non modifiers methods //////////////////
std::string to_string() const;
std::string to_compact_string() const;
std::string blkids_to_string() const;
Clock::time_point created_time() const { return m_start_time; }
void set_created_time() { m_start_time = Clock::now(); }
bool is_expired() const;
Expand Down Expand Up @@ -195,7 +206,7 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::
bool save_fetched_data(sisl::GenericClientResponse const& fetched_data, uint8_t const* data, uint32_t data_size);

void set_remote_blkid(RemoteBlkId const& rbid) { m_remote_blkid = rbid; }
void set_local_blkid(MultiBlkId const& lbid) { m_local_blkid = lbid; } // Only used during recovery
void set_local_blkids(std::vector< MultiBlkId > const& lbids) { m_local_blkids = std::move(lbids); }
void set_is_volatile(bool is_volatile) { m_is_volatile.store(is_volatile); }
void set_lsn(int64_t lsn);
void add_state(repl_req_state_t s);
Expand Down Expand Up @@ -226,9 +237,10 @@ struct repl_req_ctx : public boost::intrusive_ref_counter< repl_req_ctx, boost::
std::atomic< bool > m_is_volatile{true}; // Is the log still in memory and not flushed to disk yet

/////////////// Data related section /////////////////
MultiBlkId m_local_blkid; // Local BlkId for the data
RemoteBlkId m_remote_blkid; // Corresponding remote blkid for the data
uint8_t const* m_data; // Raw data pointer containing the actual data
static inline MultiBlkId dummy_blkid;
std::vector< MultiBlkId > m_local_blkids; // Local BlkId for the data
RemoteBlkId m_remote_blkid; // Corresponding remote blkid for the data
uint8_t const* m_data; // Raw data pointer containing the actual data

/////////////// Journal/Buf related section /////////////////
std::variant< std::unique_ptr< uint8_t[] >, raft_buf_ptr_t > m_journal_buf; // Buf for the journal entry
Expand Down Expand Up @@ -400,7 +412,7 @@ class ReplDevListener {
virtual void on_no_space_left(repl_lsn_t lsn, chunk_num_t chunk_id) = 0;

/// @brief when restart, after all the logs are replayed and before joining raft group, notify the upper layer
virtual void on_log_replay_done(const group_id_t& group_id){};
virtual void on_log_replay_done(const group_id_t& group_id) {};

private:
std::weak_ptr< ReplDev > m_repl_dev;
Expand All @@ -411,6 +423,39 @@ class ReplDev {
ReplDev() = default;
virtual ~ReplDev() { detach_listener(); }

/// @brief Allocates blkids from the storage engine to write the value into. Storage
/// engine returns a blkid_list in cases where single contiguous blocks are not
/// available.
///
/// @param data_size - Size of the data.
/// @param hints - Specify block allocation hints.
/// @param out_blkids - List of bilkid's which may not be contiguous.
virtual std::error_code alloc_blks(uint32_t data_size, const blk_alloc_hints& hints,
std::vector< MultiBlkId >& out_blkids) = 0;

/// @brief Write data locally using the specified blkid's. Data is split across the blkids.
/// @param blkids - List of blkid's where data will be written.
/// @param value - vector of io buffers that contain value for the key.
/// @param part_of_batch - Is write is part of a batch. If part of the batch, then submit_batch needs to be called
/// at the end
/// @return A Future with std::error_code to notify if it has successfully write the data or any error code in case
/// of failure
virtual folly::Future< std::error_code > async_write(const std::vector< MultiBlkId >& blkids,
sisl::sg_list const& value, bool part_of_batch = false,
trace_id_t tid = 0) = 0;

/// @brief Creates a log/journal entry with <header, key, blkid> and calls the on_commit listener callback.
/// @param blkids - List of blkid's where data was written.
/// @param header - Blob representing the header (it is opaque and will be copied
/// as-is to the journal entry)
/// @param key - Blob representing the key (it is opaque and will be copied as-is to
/// the journal entry).
/// @param data_size - Size of the data.
/// @param ctx - User supplied context which will be passed to listener callbacks
virtual void async_write_journal(const std::vector< MultiBlkId >& blkids, sisl::blob const& header,
sisl::blob const& key, uint32_t data_size, repl_req_ptr_t ctx,
trace_id_t tid = 0) = 0;

/// @brief Replicate the data to the replica set. This method goes through the
/// following steps:
/// Step 1: Allocates blkid from the storage engine to write the value into. Storage
Expand Down
31 changes: 28 additions & 3 deletions src/lib/blkdata_svc/blkdata_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,35 @@ folly::Future< std::error_code > BlkDataService::async_write(sisl::sg_list const
}
}

folly::Future< std::error_code >
BlkDataService::async_write(sisl::sg_list const& sgs, std::vector< MultiBlkId > const& blkids, bool part_of_batch) {
if (is_stopping()) return folly::makeFuture< std::error_code >(std::make_error_code(std::errc::operation_canceled));
incr_pending_request_num();
static thread_local std::vector< folly::Future< std::error_code > > s_futs;
s_futs.clear();
for (const auto& blkid : blkids) {
s_futs.emplace_back(async_write(sgs, blkid, part_of_batch));
}
decr_pending_request_num();
return collect_all_futures(s_futs);
}

BlkAllocStatus BlkDataService::alloc_blks(uint32_t size, const blk_alloc_hints& hints, MultiBlkId& out_blkids) {
if (is_stopping()) return BlkAllocStatus::FAILED;
incr_pending_request_num();
HS_DBG_ASSERT_EQ(size % m_blk_size, 0, "Non aligned size requested");
HS_DBG_ASSERT_EQ(size % m_blk_size, 0, "Non aligned size requested size={} blk_size={}", size, m_blk_size);
blk_count_t nblks = static_cast< blk_count_t >(size / m_blk_size);

auto ret = m_vdev->alloc_blks(nblks, hints, out_blkids);
decr_pending_request_num();
return ret;
}

BlkAllocStatus BlkDataService::alloc_blks(uint32_t size, const blk_alloc_hints& hints,
std::vector< BlkId >& out_blkids) {
if (is_stopping()) return BlkAllocStatus::FAILED;
incr_pending_request_num();
HS_DBG_ASSERT_EQ(size % m_blk_size, 0, "Non aligned size requested size={} blk_size={}", size, m_blk_size);
blk_count_t nblks = static_cast< blk_count_t >(size / m_blk_size);

auto ret = m_vdev->alloc_blks(nblks, hints, out_blkids);
Expand Down Expand Up @@ -271,8 +296,8 @@ void BlkDataService::start() {

void BlkDataService::stop() {
start_stopping();
// we have no way to track the completion of each async io in detail which should be done in iomanager level, so we
// just wait for 3 seconds, and we expect each io will be completed within this time.
// we have no way to track the completion of each async io in detail which should be done in iomanager level, so
// we just wait for 3 seconds, and we expect each io will be completed within this time.

// TODO: find a better solution to track the completion of these aysnc calls
std::this_thread::sleep_for(std::chrono::milliseconds(3000));
Expand Down
41 changes: 33 additions & 8 deletions src/lib/replication/repl_dev/common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ repl_req_ctx::~repl_req_ctx() {
}

void repl_req_ctx::create_journal_entry(bool is_raft_buf, int32_t server_id) {
uint32_t val_size = has_linked_data() ? m_local_blkid.serialized_size() : 0;
uint32_t val_size = has_linked_data() ? blkids_serialized_size() : 0;
uint32_t entry_size = sizeof(repl_journal_entry) + m_header.size() + m_key.size() + val_size;

if (is_raft_buf) {
Expand Down Expand Up @@ -94,14 +94,25 @@ void repl_req_ctx::create_journal_entry(bool is_raft_buf, int32_t server_id) {
}

if (has_linked_data()) {
auto const b = m_local_blkid.serialize();
std::memcpy(raw_ptr, b.cbytes(), b.size());
for (const auto& blkid : m_local_blkids) {
auto const b = blkid.serialize();
std::memcpy(raw_ptr, b.cbytes(), b.size());
raw_ptr += b.size();
}
}
}

uint32_t repl_req_ctx::journal_entry_size() const {
return sizeof(repl_journal_entry) + m_header.size() + m_key.size() +
(has_linked_data() ? m_local_blkid.serialized_size() : 0);
(has_linked_data() ? blkids_serialized_size() : 0);
}

uint32_t repl_req_ctx::blkids_serialized_size() const {
uint32_t blkids_serialized_size = 0;
for (const auto& blkid : m_local_blkids) {
blkids_serialized_size += blkid.serialized_size();
}
return blkids_serialized_size;
}

void repl_req_ctx::change_raft_journal_buf(raft_buf_ptr_t new_buf, bool adjust_hdr_key) {
Expand All @@ -128,7 +139,7 @@ ReplServiceError repl_req_ctx::alloc_local_blks(cshared< ReplDevListener >& list
// if the committed_blk_id is already present, use it and skip allocation and commitment
LOGINFOMOD(replication, "[traceID={}] For Repl_key=[{}] data already exists, skip", rkey().traceID,
rkey().to_string());
m_local_blkid = hints_result.value().committed_blk_id.value();
m_local_blkids.emplace_back(hints_result.value().committed_blk_id.value());
add_state(repl_req_state_t::BLK_ALLOCATED);
add_state(repl_req_state_t::DATA_RECEIVED);
add_state(repl_req_state_t::DATA_WRITTEN);
Expand All @@ -138,14 +149,19 @@ ReplServiceError repl_req_ctx::alloc_local_blks(cshared< ReplDevListener >& list
return ReplServiceError::OK;
}

std::vector< BlkId > blkids;
auto status = data_service().alloc_blks(sisl::round_up(uint32_cast(data_size), data_service().get_blk_size()),
hints_result.value(), m_local_blkid);
hints_result.value(), blkids);
if (status != BlkAllocStatus::SUCCESS) {
LOGWARNMOD(replication, "[traceID={}] block allocation failure, repl_key=[{}], status=[{}]", rkey().traceID,
rkey(), status);
DEBUG_ASSERT_EQ(status, BlkAllocStatus::SUCCESS, "Unable to allocate blks");
return ReplServiceError::NO_SPACE_LEFT;
}

for (auto& blkid : blkids) {
m_local_blkids.emplace_back(blkid);
}
add_state(repl_req_state_t::BLK_ALLOCATED);
return ReplServiceError::OK;
}
Expand Down Expand Up @@ -246,7 +262,7 @@ std::string repl_req_ctx::to_string() const {
return fmt::format("repl_key=[{}], lsn={} state=[{}] m_headersize={} m_keysize={} is_proposer={} "
"local_blkid={} remote_blkid={}",
m_rkey.to_string(), m_lsn, req_state_name(uint32_cast(state())), m_header.size(), m_key.size(),
m_is_proposer, m_local_blkid.to_string(), m_remote_blkid.blkid.to_string());
m_is_proposer, blkids_to_string(), m_remote_blkid.blkid.to_string());
}

std::string repl_req_ctx::to_compact_string() const {
Expand All @@ -255,7 +271,16 @@ std::string repl_req_ctx::to_compact_string() const {
}

return fmt::format("dsn={} term={} lsn={} op={} local_blkid={} state=[{}]", m_rkey.dsn, m_rkey.term, m_lsn,
enum_name(m_op_code), m_local_blkid.to_string(), req_state_name(uint32_cast(state())));
enum_name(m_op_code), blkids_to_string(), req_state_name(uint32_cast(state())));
}

std::string repl_req_ctx::blkids_to_string() const {
std::string str = fmt::format("[");
for (const auto& blkid : m_local_blkids) {
fmt::format_to(std::back_inserter(str), "{} ", blkid.to_string());
}
fmt::format_to(std::back_inserter(str), "]");
return str;
}

bool repl_req_ctx::is_expired() const {
Expand Down
13 changes: 8 additions & 5 deletions src/lib/replication/repl_dev/raft_repl_dev.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,10 +337,13 @@ void RaftReplDev::async_alloc_write(sisl::blob const& header, sisl::blob const&
}
}

auto status = init_req_ctx(
rreq, repl_key{.server_id = server_id(), .term = raft_server()->get_term(), .dsn = m_next_dsn.fetch_add(1), .traceID = tid},
data.size ? journal_type_t::HS_DATA_LINKED : journal_type_t::HS_DATA_INLINED, true /* is_proposer */, header,
key, data.size, m_listener);
auto status = init_req_ctx(rreq,
repl_key{.server_id = server_id(),
.term = raft_server()->get_term(),
.dsn = m_next_dsn.fetch_add(1),
.traceID = tid},
data.size ? journal_type_t::HS_DATA_LINKED : journal_type_t::HS_DATA_INLINED,
true /* is_proposer */, header, key, data.size, m_listener);

if (status != ReplServiceError::OK) {
RD_LOGI(tid, "Initializing rreq failed error={}, failing this req", status);
Expand Down Expand Up @@ -1659,7 +1662,7 @@ void RaftReplDev::on_log_found(logstore_seq_num_t lsn, log_buffer buf, void* ctx
MultiBlkId entry_blkid;
entry_blkid.deserialize(entry_to_val(jentry), true /* copy */);
data_size = entry_blkid.blk_count() * get_blk_size();
rreq->set_local_blkid(entry_blkid);
rreq->set_local_blkids({entry_blkid});
rreq->add_state(repl_req_state_t::BLK_ALLOCATED);
rreq->add_state(repl_req_state_t::DATA_RECEIVED);
}
Expand Down
18 changes: 18 additions & 0 deletions src/lib/replication/repl_dev/raft_repl_dev.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,24 @@ class RaftReplDev : public ReplDev,
folly::SemiFuture< ReplServiceError > destroy_group();

//////////////// All ReplDev overrides/implementation ///////////////////////
virtual std::error_code alloc_blks(uint32_t size, const blk_alloc_hints& hints,
std::vector< MultiBlkId >& out_blkids) override {
RD_REL_ASSERT(false, "NOT SUPPORTED");
return std::make_error_code(std::errc::operation_not_supported);
}
virtual folly::Future< std::error_code > async_write(const std::vector< MultiBlkId >& blkids,
sisl::sg_list const& value, bool part_of_batch = false,
trace_id_t tid = 0) override {
RD_REL_ASSERT(false, "NOT SUPPORTED");
return folly::makeFuture< std::error_code >(std::make_error_code(std::errc::operation_not_supported));
}

virtual void async_write_journal(const std::vector< MultiBlkId >& blkids, sisl::blob const& header,
sisl::blob const& key, uint32_t data_size, repl_req_ptr_t ctx,
trace_id_t tid = 0) override {
RD_REL_ASSERT(false, "NOT SUPPORTED");
}

void async_alloc_write(sisl::blob const& header, sisl::blob const& key, sisl::sg_list const& value,
repl_req_ptr_t ctx, bool part_of_batch = false, trace_id_t tid = 0) override;
folly::Future< std::error_code > async_read(MultiBlkId const& blkid, sisl::sg_list& sgs, uint32_t size,
Expand Down
Loading
Loading