Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomestoreConan(ConanFile):
name = "homestore"
version = "6.5.5"
version = "6.5.6"

homepage = "https://github.com/eBay/Homestore"
description = "HomeStore Storage Engine"
Expand Down
10 changes: 9 additions & 1 deletion src/include/homestore/index/index_internal.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ struct IndexBuffer : public sisl::ObjLifeCounter< IndexBuffer > {
sisl::atomic_counter< int > m_wait_for_down_buffers{0}; // Number of children need to wait for before persisting
#ifndef NDEBUG
// Down buffers are not mandatory members, but only to keep track of any bugs and asserts
std::vector< std::weak_ptr< IndexBuffer > > m_down_buffers;
std::vector<std::weak_ptr<IndexBuffer> > m_down_buffers;
std::mutex m_down_buffers_mtx;
std::shared_ptr< IndexBuffer > m_prev_up_buffer; // Keep a copy for debugging
#endif

Expand All @@ -123,6 +124,13 @@ struct IndexBuffer : public sisl::ObjLifeCounter< IndexBuffer > {

std::string to_string() const;
std::string to_string_dot() const;

void add_down_buffer(const IndexBufferPtr &buf);

void remove_down_buffer(const IndexBufferPtr &buf);
#ifndef NDEBUG
bool is_in_down_buffers(const IndexBufferPtr &buf);
#endif
};

// This is a special buffer which is used to write to the meta block
Expand Down
25 changes: 4 additions & 21 deletions src/lib/index/index_cp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,33 +266,16 @@ void IndexCPContext::process_txn_record(txn_record const* rec, std::map< BlkId,
#ifndef NDEBUG
// if (!is_sibling_link || (buf->m_up_buffer == real_up_buf)) { return buf;}
// Already linked with same buf or its not a sibling link to override
bool found{false};
for (auto const& dbuf : real_up_buf->m_down_buffers) {
if (dbuf.lock() == buf) {
found = true;
break;
}
if (real_up_buf->is_in_down_buffers(buf)) {
return buf;
}
if (found) { return buf; }
real_up_buf->m_down_buffers.emplace_back(buf);
#endif

if (buf->m_up_buffer != real_up_buf) {
if (buf->m_up_buffer) {
buf->m_up_buffer->m_wait_for_down_buffers.decrement(1);
#ifndef NDEBUG
bool found{false};
for (auto it = buf->m_up_buffer->m_down_buffers.begin(); it != buf->m_up_buffer->m_down_buffers.end(); ++it) {
if (it->lock() == buf) {
buf->m_up_buffer->m_down_buffers.erase(it);
found = true;
break;
}
}
HS_DBG_ASSERT(found, "Down buffer is linked to Up buf, but up_buf doesn't have down_buf in its list");
#endif
buf->m_up_buffer->remove_down_buffer(buf);
}
real_up_buf->m_wait_for_down_buffers.increment(1);
real_up_buf->add_down_buffer(buf);
buf->m_up_buffer = real_up_buf;
}
}
Expand Down
49 changes: 46 additions & 3 deletions src/lib/index/index_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,12 @@ std::string IndexBuffer::to_string() const {
// store m_down_buffers in a string
std::string down_bufs = "";
#ifndef NDEBUG
for (auto const& down_buf : m_down_buffers) {
if (auto ptr = down_buf.lock()) {
fmt::format_to(std::back_inserter(down_bufs), "[{}]", voidptr_cast(ptr.get()));
{
std::lock_guard lg(m_down_buffers_mtx);
for (auto const &down_buf: m_down_buffers) {
if (auto ptr = down_buf.lock()) {
fmt::format_to(std::back_inserter(down_bufs), "[{}]", voidptr_cast(ptr.get()));
}
}
}
#endif
Expand All @@ -178,6 +181,7 @@ std::string IndexBuffer::to_string() const {
down_bufs);
}
}

std::string IndexBuffer::to_string_dot() const {
auto str = fmt::format("IndexBuffer {} ", reinterpret_cast< void* >(const_cast< IndexBuffer* >(this)));
if (m_bytes == nullptr) {
Expand All @@ -190,6 +194,45 @@ std::string IndexBuffer::to_string_dot() const {
return str;
}

void IndexBuffer::add_down_buffer(const IndexBufferPtr &buf) {
m_wait_for_down_buffers.increment();
#ifndef NDEBUG
{
std::lock_guard lg(m_down_buffers_mtx);
m_down_buffers.push_back(buf);
}
#endif
}

void IndexBuffer::remove_down_buffer(const IndexBufferPtr &buf) {
m_wait_for_down_buffers.decrement();
#ifndef NDEBUG
bool found{false}; {
std::lock_guard lg(m_down_buffers_mtx);
for (auto it = buf->m_up_buffer->m_down_buffers.begin(); it != buf->m_up_buffer->m_down_buffers.end(); ++it) {
if (it->lock() == buf) {
buf->m_up_buffer->m_down_buffers.erase(it);
found = true;
break;
}
}
}
HS_DBG_ASSERT(found, "Down buffer is linked to up_buf, but up_buf doesn't have down_buf in its list");
#endif
}

#ifndef NDEBUG
bool IndexBuffer::is_in_down_buffers(const IndexBufferPtr &buf) {
std::lock_guard<std::mutex> lg(m_down_buffers_mtx);
for (auto const &dbuf: m_down_buffers) {
if (dbuf.lock() == buf) {
return true;
}
}
return false;
}
#endif

MetaIndexBuffer::MetaIndexBuffer(superblk< index_table_sb >& sb) : IndexBuffer{nullptr, BlkId{}}, m_sb{sb} {
m_is_meta_buf = true;
}
Expand Down
51 changes: 10 additions & 41 deletions src/lib/index/wb_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -396,14 +396,8 @@ void IndexWBCache::link_buf(IndexBufferPtr const& up_buf, IndexBufferPtr const&
HS_DBG_ASSERT((real_up_buf->m_dirtied_cp_id == down_buf->m_dirtied_cp_id) || (real_up_buf->is_meta_buf()),
"Up buffer is not modified by current cp, but down buffer is linked to it");
#ifndef NDEBUG
bool found{false};
for (auto const& dbuf : real_up_buf->m_down_buffers) {
if (dbuf.lock() == down_buf) {
found = true;
break;
}
}
HS_DBG_ASSERT(found, "Down buffer is linked to Up buf, but up_buf doesn't have down_buf in its list");
HS_DBG_ASSERT(real_up_buf->is_in_down_buffers(down_buf),
"Down buffer is linked to Up buf, but up_buf doesn't have down_buf in its list");
#endif
return;
}
Expand All @@ -412,25 +406,10 @@ void IndexWBCache::link_buf(IndexBufferPtr const& up_buf, IndexBufferPtr const&
// Now we link the down_buffer to the real up_buffer
if (down_buf->m_up_buffer) {
// release existing up_buffer's wait count
down_buf->m_up_buffer->m_wait_for_down_buffers.decrement();
#ifndef NDEBUG
bool found{false};
for (auto it = down_buf->m_up_buffer->m_down_buffers.begin(); it != down_buf->m_up_buffer->m_down_buffers.end();
++it) {
if (it->lock() == down_buf) {
down_buf->m_up_buffer->m_down_buffers.erase(it);
found = true;
break;
}
}
HS_DBG_ASSERT(found, "Down buffer is linked to Up buf, but up_buf doesn't have down_buf in its list");
#endif
down_buf->m_up_buffer->remove_down_buffer(down_buf);
}
real_up_buf->m_wait_for_down_buffers.increment(1);
down_buf->m_up_buffer = real_up_buf;
#ifndef NDEBUG
real_up_buf->m_down_buffers.emplace_back(down_buf);
#endif
real_up_buf->add_down_buffer(down_buf);
}

void IndexWBCache::free_buf(const IndexBufferPtr& buf, CPContext* cp_ctx) {
Expand Down Expand Up @@ -535,21 +514,8 @@ void IndexWBCache::recover(sisl::byte_view sb) {
pending_bufs.push_back(buf->m_up_buffer);
} else {
// Just ignore it
buf->m_up_buffer->m_wait_for_down_buffers.decrement();
#ifndef NDEBUG
bool found{false};
for (auto it = buf->m_up_buffer->m_down_buffers.begin();
it != buf->m_up_buffer->m_down_buffers.end(); ++it) {
auto sp = it->lock();
if (sp && sp == buf) {
found = true;
buf->m_up_buffer->m_down_buffers.erase(it);
break;
}
}
HS_DBG_ASSERT(found,
"Down buffer is linked to Up buf, but up_buf doesn't have down_buf in its list");
#endif
buf->m_up_buffer->remove_down_buffer(buf);
buf->m_up_buffer = nullptr;
}
}
}
Expand Down Expand Up @@ -754,7 +720,10 @@ std::pair< IndexBufferPtr, bool > IndexWBCache::on_buf_flush_done_internal(Index
IndexBufferPtr const& buf) {
IndexBufferPtrList buf_list;
#ifndef NDEBUG
buf->m_down_buffers.clear();
{
std::lock_guard lg(buf->m_down_buffers_mtx);
buf->m_down_buffers.clear();
}
#endif
buf->set_state(index_buf_state_t::CLEAN);

Expand Down