From ab6a50ad5fc201b9be090bcf2bd2cdcbb2d2e6d2 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Sun, 31 Jul 2022 10:59:02 +0100 Subject: [PATCH 1/4] Add mutex locks to sample indexing strategies (#4075) (cherry picked from commit 6b04017f856fb478beb49b58da1db02d459da706) --- .threading_canary | 2 +- .../strategies/seqnos_by_key_in_memory.h | 4 + src/consensus/aft/test/logging_stub.h | 6 + src/indexing/enclave_lfs_access.h | 5 + .../strategies/seqnos_by_key_bucketed.cpp | 7 + .../strategies/seqnos_by_key_in_memory.cpp | 2 + src/indexing/test/indexing.cpp | 497 ++++++++++++++++++ 7 files changed, 522 insertions(+), 1 deletion(-) diff --git a/.threading_canary b/.threading_canary index 580ef7df91bc..6607897a73a1 100644 --- a/.threading_canary +++ b/.threading_canary @@ -1 +1 @@ -This looks like a 'job' for Threading Canary! +This looks like a "job" for Threading Canary! diff --git a/include/ccf/indexing/strategies/seqnos_by_key_in_memory.h b/include/ccf/indexing/strategies/seqnos_by_key_in_memory.h index 56cac64d41d0..9b19a6e2a479 100644 --- a/include/ccf/indexing/strategies/seqnos_by_key_in_memory.h +++ b/include/ccf/indexing/strategies/seqnos_by_key_in_memory.h @@ -2,6 +2,7 @@ // Licensed under the Apache 2.0 License. #pragma once +#include "ccf/ds/pal.h" #include "ccf/indexing/strategies/visit_each_entry_in_map.h" #include "ccf/seq_no_collection.h" @@ -15,6 +16,9 @@ namespace ccf::indexing::strategies // Value is every SeqNo which talks about that key. std::unordered_map seqnos_by_key; + // Mutex guarding access to seqnos_by_key + ccf::Pal::Mutex lock; + void visit_entry( const ccf::TxID& tx_id, const ccf::ByteVector& k, diff --git a/src/consensus/aft/test/logging_stub.h b/src/consensus/aft/test/logging_stub.h index b85d979e3319..005fce0d25ca 100644 --- a/src/consensus/aft/test/logging_stub.h +++ b/src/consensus/aft/test/logging_stub.h @@ -17,6 +17,8 @@ namespace aft protected: ccf::NodeId _id; + std::mutex ledger_access; + public: std::vector> ledger; uint64_t skip_count = 0; @@ -31,6 +33,8 @@ namespace aft kv::Term term, kv::Version index) { + std::lock_guard lock(ledger_access); + // The payload that we eventually deserialise must include the // ledger entry as well as the View and Index that identify it. In // the real entries, they are nested in the payload and the IV. For @@ -73,6 +77,8 @@ namespace aft std::optional> get_entry_by_idx(size_t idx) { + std::lock_guard lock(ledger_access); + // Ledger indices are 1-based, hence the -1 if (idx > 0 && idx <= ledger.size()) { diff --git a/src/indexing/enclave_lfs_access.h b/src/indexing/enclave_lfs_access.h index a0caf637d84f..20103725297e 100644 --- a/src/indexing/enclave_lfs_access.h +++ b/src/indexing/enclave_lfs_access.h @@ -6,6 +6,7 @@ #include "ccf/crypto/sha256.h" #include "ccf/crypto/symmetric_key.h" #include "ccf/ds/hex.h" +#include "ccf/ds/pal.h" #include "ds/messaging.h" #include "indexing/lfs_interface.h" #include "indexing/lfs_ringbuffer_types.h" @@ -86,6 +87,7 @@ namespace ccf::indexing using PendingResult = std::weak_ptr; std::unordered_map pending; + ccf::Pal::Mutex pending_access; ringbuffer::WriterPtr to_host; @@ -136,6 +138,7 @@ namespace ccf::indexing dispatcher, LFSMsg::response, [this](const uint8_t* data, size_t size) { auto [obfuscated, encrypted] = ringbuffer::read_message(data, size); + std::lock_guard guard(pending_access); auto it = pending.find(obfuscated); if (it != pending.end()) { @@ -193,6 +196,7 @@ namespace ccf::indexing [this](const uint8_t* data, size_t size) { auto [obfuscated] = ringbuffer::read_message(data, size); + std::lock_guard guard(pending_access); auto it = pending.find(obfuscated); if (it != pending.end()) { @@ -258,6 +262,7 @@ namespace ccf::indexing FetchResultPtr fetch(const LFSKey& key) override { const auto obfuscated = obfuscate_key(key); + std::lock_guard guard(pending_access); auto it = pending.find(obfuscated); FetchResultPtr result; diff --git a/src/indexing/strategies/seqnos_by_key_bucketed.cpp b/src/indexing/strategies/seqnos_by_key_bucketed.cpp index 56b038b91c96..4d6b003ee3bc 100644 --- a/src/indexing/strategies/seqnos_by_key_bucketed.cpp +++ b/src/indexing/strategies/seqnos_by_key_bucketed.cpp @@ -5,6 +5,7 @@ #include "ccf/ds/hex.h" #include "ccf/ds/logger.h" +#include "ccf/ds/pal.h" #include "ds/lru.h" #include "ds/serialized.h" #include "indexing/lfs_interface.h" @@ -32,6 +33,8 @@ namespace ccf::indexing::strategies using BucketValue = std::pair; LRU old_results; + ccf::Pal::Mutex results_access; + std::string name; std::shared_ptr lfs_access; @@ -206,6 +209,8 @@ namespace ccf::indexing::strategies while (true) { + std::lock_guard guard(results_access); + const auto bucket_key = std::make_pair(serialised_key, from_range); const auto old_it = old_results.find(bucket_key); if (old_it != old_results.end()) @@ -350,6 +355,8 @@ namespace ccf::indexing::strategies { const auto range = impl->get_range_for(tx_id.seqno); + std::lock_guard guard(impl->results_access); + auto it = impl->current_results.find(k); if (it != impl->current_results.end()) { diff --git a/src/indexing/strategies/seqnos_by_key_in_memory.cpp b/src/indexing/strategies/seqnos_by_key_in_memory.cpp index cf157340d33f..2ff4d9230be9 100644 --- a/src/indexing/strategies/seqnos_by_key_in_memory.cpp +++ b/src/indexing/strategies/seqnos_by_key_in_memory.cpp @@ -8,6 +8,7 @@ namespace ccf::indexing::strategies void SeqnosByKey_InMemory_Untyped::visit_entry( const ccf::TxID& tx_id, const ccf::ByteVector& k, const ccf::ByteVector& v) { + std::lock_guard guard(lock); seqnos_by_key[k].insert(tx_id.seqno); } @@ -18,6 +19,7 @@ namespace ccf::indexing::strategies ccf::SeqNo to, std::optional max_seqnos) { + std::lock_guard guard(lock); const auto it = seqnos_by_key.find(serialised_key); if (it != seqnos_by_key.end()) { diff --git a/src/indexing/test/indexing.cpp b/src/indexing/test/indexing.cpp index 0c1816c5bd97..67940a790805 100644 --- a/src/indexing/test/indexing.cpp +++ b/src/indexing/test/indexing.cpp @@ -1,14 +1,18 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the Apache 2.0 License. +#include "ccf/indexing/strategies/seqnos_by_key_bucketed.h" #include "ccf/indexing/strategies/seqnos_by_key_in_memory.h" #include "consensus/aft/raft.h" #include "consensus/aft/test/logging_stub.h" #include "ds/test/stub_writer.h" +#include "host/lfs_file_handler.h" +#include "indexing/enclave_lfs_access.h" #include "indexing/historical_transaction_fetcher.h" #include "indexing/test/common.h" #include "node/share_manager.h" +#include #define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN #include @@ -448,3 +452,496 @@ TEST_CASE_TEMPLATE( index_a, index_b); } + +using namespace std::chrono_literals; +const auto max_multithread_run_time = 10s; + +// Uses the real classes, and access + update them concurrently +TEST_CASE( + "multi-threaded indexing - in memory" * doctest::test_suite("indexing")) +{ + auto kv_store_p = std::make_shared(); + auto& kv_store = *kv_store_p; + + auto ledger_secrets = std::make_shared(); + kv_store.set_encryptor(std::make_shared(ledger_secrets)); + + auto stub_writer = std::make_shared(); + auto cache = std::make_shared( + kv_store, ledger_secrets, stub_writer); + + auto fetcher = + std::make_shared(cache); + auto indexer_p = std::make_shared(fetcher); + auto& indexer = *indexer_p; + + auto index_a = std::make_shared(map_a); + REQUIRE(indexer.install_strategy(index_a)); + + auto index_b = std::make_shared(map_b); + REQUIRE(indexer.install_strategy(index_b)); + + auto ledger = add_raft_consensus(kv_store_p, indexer_p); + + ledger_secrets->init(); + { + INFO("Store one recovery member"); + // This is necessary to rekey the ledger and issue recovery shares for the + // new ledger secret + auto tx = kv_store.create_tx(); + auto config = tx.rw(ccf::Tables::CONFIGURATION); + constexpr size_t recovery_threshold = 1; + config->put({recovery_threshold}); + auto member_info = tx.rw(ccf::Tables::MEMBER_INFO); + auto member_public_encryption_keys = tx.rw( + ccf::Tables::MEMBER_ENCRYPTION_PUBLIC_KEYS); + + auto kp = crypto::make_key_pair(); + auto cert = kp->self_sign("CN=member", valid_from, valid_to); + auto member_id = + crypto::Sha256Hash(crypto::cert_pem_to_der(cert)).hex_str(); + + member_info->put(member_id, {ccf::MemberStatus::ACTIVE}); + member_public_encryption_keys->put( + member_id, crypto::make_rsa_key_pair()->public_key_pem()); + REQUIRE(tx.commit() == kv::CommitResult::SUCCESS); + } + + std::atomic finished = false; + std::atomic writes_to_hello = 0; + std::atomic writes_to_saluton = 0; + std::atomic writes_to_42 = 0; + + auto tx_advancer = [&]() { + size_t i = 0; + while (i < 1'000) + { + auto tx = kv_store.create_tx(); + tx.wo(map_a)->put(fmt::format("hello"), fmt::format("Value {}", i)); + ++writes_to_hello; + if (i % 2 == 0) + { + ++writes_to_saluton; + tx.wo(map_a)->put(fmt::format("saluton"), fmt::format("Value2 {}", i)); + } + if (i % 3 == 0) + { + ++writes_to_42; + tx.wo(map_b)->put(42, i); + } + + REQUIRE(tx.commit() == kv::CommitResult::SUCCESS); + ++i; + } + finished = true; + }; + + size_t handled_writes = 0; + const auto& writes = stub_writer->writes; + + auto index_ticker = [&]() { + while (!finished) + { + size_t loops = 0; + while (indexer.update_strategies(step_time, kv_store.current_txid()) || + handled_writes < writes.size()) + { + // Do the fetch, simulating an asynchronous fetch by the historical + // query system + for (auto it = writes.begin() + handled_writes; it != writes.end(); + ++it) + { + const auto& write = *it; + + const uint8_t* data = write.contents.data(); + size_t size = write.contents.size(); + REQUIRE(write.m == consensus::ledger_get_range); + auto [from_seqno, to_seqno, purpose_] = + ringbuffer::read_message(data, size); + auto& purpose = purpose_; + REQUIRE(purpose == consensus::LedgerRequestPurpose::HistoricalQuery); + + std::vector combined; + for (auto seqno = from_seqno; seqno <= to_seqno; ++seqno) + { + auto entry = ledger->get_raw_entry_by_idx(seqno); + if (!entry.has_value()) + { + // Possible that this operation beat consensus to the ledger, so + // pause and retry + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + entry = ledger->get_raw_entry_by_idx(seqno); + } + REQUIRE(entry.has_value()); + combined.insert(combined.end(), entry->begin(), entry->end()); + } + cache->handle_ledger_entries(from_seqno, to_seqno, combined); + } + + handled_writes = writes.end() - writes.begin(); + + if (loops++ > 100) + { + throw std::logic_error("Looks like a permanent loop"); + } + } + } + }; + + auto fetch_index_a = [&]() { + while (true) + { + const auto hello = index_a->get_all_write_txs("hello"); + const auto saluton = index_a->get_all_write_txs("saluton"); + + if ( + finished && hello.has_value() && hello->size() == writes_to_hello && + saluton.has_value() && saluton->size() == writes_to_saluton) + { + break; + } + } + }; + + auto fetch_index_b = [&]() { + while (true) + { + const auto forty_two = index_b->get_all_write_txs(42); + + if ( + finished && forty_two.has_value() && forty_two->size() == writes_to_42) + { + break; + } + } + }; + + std::vector threads; + threads.emplace_back(tx_advancer); + threads.emplace_back(index_ticker); + threads.emplace_back(fetch_index_a); + threads.emplace_back(fetch_index_a); + threads.emplace_back(fetch_index_a); + threads.emplace_back(fetch_index_b); + threads.emplace_back(fetch_index_b); + + std::atomic work_done = false; + + std::thread watchdog([&]() { + using Clock = std::chrono::system_clock; + const auto start_time = Clock::now(); + + while (!work_done) + { + const auto now = Clock::now(); + REQUIRE(now - start_time < max_multithread_run_time); + std::this_thread::sleep_for(50ms); + } + }); + + for (auto& thread : threads) + { + thread.join(); + } + + work_done = true; + watchdog.join(); +} + +class MockTransactionFetcher : public ccf::indexing::TransactionFetcher +{ + std::shared_ptr encryptor; + +public: + aft::LedgerStubProxy* ledger; + + MockTransactionFetcher(const std::shared_ptr& e) : + encryptor(e) + {} + + kv::ReadOnlyStorePtr deserialise_transaction( + ccf::SeqNo seqno, const uint8_t* data, size_t size) override + { + auto store = std::make_shared( + false /* Do not start from very first seqno */, + true /* Make use of historical secrets */); + + store->set_encryptor(encryptor); + + bool public_only = false; + auto exec = + store->deserialize({data, data + size}, ConsensusType::CFT, public_only); + if (exec == nullptr) + { + return nullptr; + } + + auto result = exec->apply(); + if (result == kv::ApplyResult::FAIL) + { + return nullptr; + } + + return store; + } + + std::vector fetch_transactions( + const ccf::SeqNoCollection& seqnos) override + { + std::vector ret; + + for (const auto& seqno : seqnos) + { + const auto entry = ledger->get_raw_entry_by_idx(seqno); + if (!entry.has_value()) + { + return {}; + } + + ret.push_back( + deserialise_transaction(seqno, entry->data(), entry->size())); + } + + return ret; + } +}; + +TEST_CASE( + "multi-threaded indexing - bucketed" * doctest::test_suite("indexing")) +{ + auto kv_store_p = std::make_shared(); + auto& kv_store = *kv_store_p; + + auto ledger_secrets = std::make_shared(); + auto encryptor = std::make_shared(ledger_secrets); + kv_store.set_encryptor(encryptor); + + auto stub_writer = std::make_shared(); + auto cache = std::make_shared( + kv_store, ledger_secrets, stub_writer); + + auto fetcher = std::make_shared(encryptor); + auto indexer_p = std::make_shared(fetcher); + auto& indexer = *indexer_p; + + messaging::BufferProcessor host_bp("lfs_host"); + messaging::BufferProcessor enclave_bp("lfs_enclave"); + + constexpr size_t buf_size = 1 << 16; + auto inbound_buffer = std::make_unique(buf_size); + ringbuffer::Reader inbound_reader(inbound_buffer->bd); + auto outbound_buffer = std::make_unique(buf_size); + + ringbuffer::Reader outbound_reader(outbound_buffer->bd); + asynchost::LFSFileHandler host_files( + std::make_shared(inbound_reader)); + host_files.register_message_handlers(host_bp.get_dispatcher()); + + auto enclave_lfs = std::make_shared( + std::make_shared(outbound_reader)); + enclave_lfs->register_message_handlers(enclave_bp.get_dispatcher()); + + ccfapp::AbstractNodeContext node_context; + node_context.install_subsystem(enclave_lfs); + + using IndexA_Bucketed = + ccf::indexing::strategies::SeqnosByKey_Bucketed; + auto index_a = std::make_shared(map_a, node_context, 100, 5); + REQUIRE(indexer.install_strategy(index_a)); + + auto index_b = std::make_shared(map_b); + REQUIRE(indexer.install_strategy(index_b)); + + auto ledger = add_raft_consensus(kv_store_p, indexer_p); + fetcher->ledger = ledger; + + ledger_secrets->init(); + { + INFO("Store one recovery member"); + // This is necessary to rekey the ledger and issue recovery shares for the + // new ledger secret + auto tx = kv_store.create_tx(); + auto config = tx.rw(ccf::Tables::CONFIGURATION); + constexpr size_t recovery_threshold = 1; + config->put({recovery_threshold}); + auto member_info = tx.rw(ccf::Tables::MEMBER_INFO); + auto member_public_encryption_keys = tx.rw( + ccf::Tables::MEMBER_ENCRYPTION_PUBLIC_KEYS); + + auto kp = crypto::make_key_pair(); + auto cert = kp->self_sign("CN=member", valid_from, valid_to); + auto member_id = + crypto::Sha256Hash(crypto::cert_pem_to_der(cert)).hex_str(); + + member_info->put(member_id, {ccf::MemberStatus::ACTIVE}); + member_public_encryption_keys->put( + member_id, crypto::make_rsa_key_pair()->public_key_pem()); + REQUIRE(tx.commit() == kv::CommitResult::SUCCESS); + } + + std::atomic all_submitted = false; + std::atomic writes_to_hello = 0; + std::atomic writes_to_saluton = 0; + std::atomic writes_to_42 = 0; + + auto tx_advancer = [&]() { + size_t i = 0; + constexpr auto tx_count = +#if NDEBUG + 1'000; +#else + 100; +#endif + + while (i < tx_count) + { + auto tx = kv_store.create_tx(); + tx.wo(map_a)->put(fmt::format("hello"), fmt::format("Value {}", i)); + ++writes_to_hello; + if (i % 2 == 0) + { + ++writes_to_saluton; + tx.wo(map_a)->put(fmt::format("saluton"), fmt::format("Value2 {}", i)); + } + if (i % 3 == 0) + { + ++writes_to_42; + tx.wo(map_b)->put(42, i); + } + + REQUIRE(tx.commit() == kv::CommitResult::SUCCESS); + ++i; + std::this_thread::yield(); + } + all_submitted = true; + }; + + auto get_all = + [&](const std::string& key) -> std::optional { + const auto max_range = index_a->max_requestable_range(); + auto range_start = 0; + + ccf::SeqNoCollection all_results; + + while (true) + { + const auto end_seqno = kv_store.get_txid().seqno; + const auto range_end = std::min(end_seqno, range_start + max_range); + + auto results = + index_a->get_write_txs_in_range(key, range_start, range_end); + + std::chrono::milliseconds sleep_time(10); + while (!results.has_value()) + { + // May be contesting for limited cached buckets with other users of this + // index (no handle for unique claims). Back-off exponentially, with + // random variation, to break deadlock + std::this_thread::sleep_for(sleep_time); + + sleep_time += std::chrono::milliseconds(rand() % sleep_time.count()); + + results = index_a->get_write_txs_in_range(key, range_start, range_end); + } + + for (auto seqno : *results) + { + all_results.insert(seqno); + } + + if (range_end == end_seqno) + { + return all_results; + } + else + { + range_start = range_end + 1; + } + } + }; + + auto fetch_index_a = [&]() { + while (true) + { + const auto hello = get_all("hello"); + const auto saluton = get_all("saluton"); + + if ( + all_submitted && hello.has_value() && + hello->size() == writes_to_hello && saluton.has_value() && + saluton->size() == writes_to_saluton) + { + break; + } + + std::this_thread::yield(); + } + }; + + auto fetch_index_b = [&]() { + while (true) + { + const auto forty_two = index_b->get_all_write_txs(42); + + if ( + all_submitted && forty_two.has_value() && + forty_two->size() == writes_to_42) + { + break; + } + + std::this_thread::yield(); + } + }; + + std::vector threads; + threads.emplace_back(tx_advancer); + threads.emplace_back(fetch_index_a); + threads.emplace_back(fetch_index_a); + threads.emplace_back(fetch_index_a); + threads.emplace_back(fetch_index_b); + threads.emplace_back(fetch_index_b); + + std::atomic work_done = false; + + std::thread ringbuffer_flusher([&]() { + while (!work_done) + { + host_bp.read_all(outbound_reader); + enclave_bp.read_all(inbound_reader); + std::this_thread::yield(); + } + }); + + std::thread index_ticker([&]() { + while (!work_done) + { + while (indexer.update_strategies(step_time, kv_store.current_txid())) + { + std::this_thread::yield(); + } + } + }); + + std::thread watchdog([&]() { + using Clock = std::chrono::system_clock; + const auto start_time = Clock::now(); + + while (!work_done) + { + const auto now = Clock::now(); + REQUIRE(now - start_time < max_multithread_run_time); + std::this_thread::sleep_for(50ms); + } + }); + + for (auto& thread : threads) + { + thread.join(); + } + + work_done = true; + ringbuffer_flusher.join(); + index_ticker.join(); + watchdog.join(); +} \ No newline at end of file From 66adce6cab88a58d7ca2c15fc862e6a637132c78 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Tue, 2 Aug 2022 14:40:44 +0000 Subject: [PATCH 2/4] Use 2.x mutex type --- .../ccf/indexing/strategies/seqnos_by_key_in_memory.h | 4 ++-- src/indexing/enclave_lfs_access.h | 10 +++++----- src/indexing/strategies/seqnos_by_key_bucketed.cpp | 8 ++++---- src/indexing/strategies/seqnos_by_key_in_memory.cpp | 4 ++-- 4 files changed, 13 insertions(+), 13 deletions(-) diff --git a/include/ccf/indexing/strategies/seqnos_by_key_in_memory.h b/include/ccf/indexing/strategies/seqnos_by_key_in_memory.h index 9b19a6e2a479..86c479ed7c08 100644 --- a/include/ccf/indexing/strategies/seqnos_by_key_in_memory.h +++ b/include/ccf/indexing/strategies/seqnos_by_key_in_memory.h @@ -2,7 +2,7 @@ // Licensed under the Apache 2.0 License. #pragma once -#include "ccf/ds/pal.h" +#include "ccf/ds/mutex.h" #include "ccf/indexing/strategies/visit_each_entry_in_map.h" #include "ccf/seq_no_collection.h" @@ -17,7 +17,7 @@ namespace ccf::indexing::strategies std::unordered_map seqnos_by_key; // Mutex guarding access to seqnos_by_key - ccf::Pal::Mutex lock; + ccf::Mutex lock; void visit_entry( const ccf::TxID& tx_id, diff --git a/src/indexing/enclave_lfs_access.h b/src/indexing/enclave_lfs_access.h index 20103725297e..5c1899ed6ccb 100644 --- a/src/indexing/enclave_lfs_access.h +++ b/src/indexing/enclave_lfs_access.h @@ -6,7 +6,7 @@ #include "ccf/crypto/sha256.h" #include "ccf/crypto/symmetric_key.h" #include "ccf/ds/hex.h" -#include "ccf/ds/pal.h" +#include "ccf/ds/mutex.h" #include "ds/messaging.h" #include "indexing/lfs_interface.h" #include "indexing/lfs_ringbuffer_types.h" @@ -87,7 +87,7 @@ namespace ccf::indexing using PendingResult = std::weak_ptr; std::unordered_map pending; - ccf::Pal::Mutex pending_access; + ccf::Mutex pending_access; ringbuffer::WriterPtr to_host; @@ -138,7 +138,7 @@ namespace ccf::indexing dispatcher, LFSMsg::response, [this](const uint8_t* data, size_t size) { auto [obfuscated, encrypted] = ringbuffer::read_message(data, size); - std::lock_guard guard(pending_access); + std::lock_guard guard(pending_access); auto it = pending.find(obfuscated); if (it != pending.end()) { @@ -196,7 +196,7 @@ namespace ccf::indexing [this](const uint8_t* data, size_t size) { auto [obfuscated] = ringbuffer::read_message(data, size); - std::lock_guard guard(pending_access); + std::lock_guard guard(pending_access); auto it = pending.find(obfuscated); if (it != pending.end()) { @@ -262,7 +262,7 @@ namespace ccf::indexing FetchResultPtr fetch(const LFSKey& key) override { const auto obfuscated = obfuscate_key(key); - std::lock_guard guard(pending_access); + std::lock_guard guard(pending_access); auto it = pending.find(obfuscated); FetchResultPtr result; diff --git a/src/indexing/strategies/seqnos_by_key_bucketed.cpp b/src/indexing/strategies/seqnos_by_key_bucketed.cpp index 4d6b003ee3bc..28082ea6acda 100644 --- a/src/indexing/strategies/seqnos_by_key_bucketed.cpp +++ b/src/indexing/strategies/seqnos_by_key_bucketed.cpp @@ -5,7 +5,7 @@ #include "ccf/ds/hex.h" #include "ccf/ds/logger.h" -#include "ccf/ds/pal.h" +#include "ccf/ds/mutex.h" #include "ds/lru.h" #include "ds/serialized.h" #include "indexing/lfs_interface.h" @@ -33,7 +33,7 @@ namespace ccf::indexing::strategies using BucketValue = std::pair; LRU old_results; - ccf::Pal::Mutex results_access; + ccf::Mutex results_access; std::string name; @@ -209,7 +209,7 @@ namespace ccf::indexing::strategies while (true) { - std::lock_guard guard(results_access); + std::lock_guard guard(results_access); const auto bucket_key = std::make_pair(serialised_key, from_range); const auto old_it = old_results.find(bucket_key); @@ -355,7 +355,7 @@ namespace ccf::indexing::strategies { const auto range = impl->get_range_for(tx_id.seqno); - std::lock_guard guard(impl->results_access); + std::lock_guard guard(impl->results_access); auto it = impl->current_results.find(k); if (it != impl->current_results.end()) diff --git a/src/indexing/strategies/seqnos_by_key_in_memory.cpp b/src/indexing/strategies/seqnos_by_key_in_memory.cpp index 2ff4d9230be9..11310cfdf406 100644 --- a/src/indexing/strategies/seqnos_by_key_in_memory.cpp +++ b/src/indexing/strategies/seqnos_by_key_in_memory.cpp @@ -8,7 +8,7 @@ namespace ccf::indexing::strategies void SeqnosByKey_InMemory_Untyped::visit_entry( const ccf::TxID& tx_id, const ccf::ByteVector& k, const ccf::ByteVector& v) { - std::lock_guard guard(lock); + std::lock_guard guard(lock); seqnos_by_key[k].insert(tx_id.seqno); } @@ -19,7 +19,7 @@ namespace ccf::indexing::strategies ccf::SeqNo to, std::optional max_seqnos) { - std::lock_guard guard(lock); + std::lock_guard guard(lock); const auto it = seqnos_by_key.find(serialised_key); if (it != seqnos_by_key.end()) { From 2fdcf65387967f90bff96580b0e49d7efe5d609b Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Thu, 4 Aug 2022 10:09:02 +0000 Subject: [PATCH 3/4] Include backport of #4099 --- src/indexing/test/indexing.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/indexing/test/indexing.cpp b/src/indexing/test/indexing.cpp index 67940a790805..3f6e59b98c27 100644 --- a/src/indexing/test/indexing.cpp +++ b/src/indexing/test/indexing.cpp @@ -458,7 +458,8 @@ const auto max_multithread_run_time = 10s; // Uses the real classes, and access + update them concurrently TEST_CASE( - "multi-threaded indexing - in memory" * doctest::test_suite("indexing")) + "multi-threaded indexing - in memory" * doctest::test_suite("indexing") * + doctest::may_fail(true)) { auto kv_store_p = std::make_shared(); auto& kv_store = *kv_store_p; @@ -707,7 +708,8 @@ class MockTransactionFetcher : public ccf::indexing::TransactionFetcher }; TEST_CASE( - "multi-threaded indexing - bucketed" * doctest::test_suite("indexing")) + "multi-threaded indexing - bucketed" * doctest::test_suite("indexing") * + doctest::may_fail(true)) { auto kv_store_p = std::make_shared(); auto& kv_store = *kv_store_p; From 88821ea780c1d5bb23d789ff8c88fb05588d4771 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Mon, 15 Aug 2022 13:14:30 +0000 Subject: [PATCH 4/4] Include backport of #4117 --- src/indexing/test/indexing.cpp | 84 +++++++++++++++++----------------- 1 file changed, 41 insertions(+), 43 deletions(-) diff --git a/src/indexing/test/indexing.cpp b/src/indexing/test/indexing.cpp index 3f6e59b98c27..a0d084880c96 100644 --- a/src/indexing/test/indexing.cpp +++ b/src/indexing/test/indexing.cpp @@ -458,8 +458,7 @@ const auto max_multithread_run_time = 10s; // Uses the real classes, and access + update them concurrently TEST_CASE( - "multi-threaded indexing - in memory" * doctest::test_suite("indexing") * - doctest::may_fail(true)) + "multi-threaded indexing - in memory" * doctest::test_suite("indexing")) { auto kv_store_p = std::make_shared(); auto& kv_store = *kv_store_p; @@ -540,8 +539,38 @@ TEST_CASE( size_t handled_writes = 0; const auto& writes = stub_writer->writes; - auto index_ticker = [&]() { - while (!finished) + auto fetch_index_a = [&]() { + while (true) + { + const auto hello = index_a->get_all_write_txs("hello"); + const auto saluton = index_a->get_all_write_txs("saluton"); + + if ( + finished && hello.has_value() && hello->size() == writes_to_hello && + saluton.has_value() && saluton->size() == writes_to_saluton) + { + break; + } + } + }; + + auto fetch_index_b = [&]() { + while (true) + { + const auto forty_two = index_b->get_all_write_txs(42); + + if ( + finished && forty_two.has_value() && forty_two->size() == writes_to_42) + { + break; + } + } + }; + + std::atomic work_done = false; + + std::thread index_ticker([&]() { + while (!work_done) { size_t loops = 0; while (indexer.update_strategies(step_time, kv_store.current_txid()) || @@ -587,47 +616,16 @@ TEST_CASE( } } } - }; - - auto fetch_index_a = [&]() { - while (true) - { - const auto hello = index_a->get_all_write_txs("hello"); - const auto saluton = index_a->get_all_write_txs("saluton"); - - if ( - finished && hello.has_value() && hello->size() == writes_to_hello && - saluton.has_value() && saluton->size() == writes_to_saluton) - { - break; - } - } - }; - - auto fetch_index_b = [&]() { - while (true) - { - const auto forty_two = index_b->get_all_write_txs(42); - - if ( - finished && forty_two.has_value() && forty_two->size() == writes_to_42) - { - break; - } - } - }; + }); std::vector threads; threads.emplace_back(tx_advancer); - threads.emplace_back(index_ticker); threads.emplace_back(fetch_index_a); threads.emplace_back(fetch_index_a); threads.emplace_back(fetch_index_a); threads.emplace_back(fetch_index_b); threads.emplace_back(fetch_index_b); - std::atomic work_done = false; - std::thread watchdog([&]() { using Clock = std::chrono::system_clock; const auto start_time = Clock::now(); @@ -646,6 +644,7 @@ TEST_CASE( } work_done = true; + index_ticker.join(); watchdog.join(); } @@ -708,9 +707,10 @@ class MockTransactionFetcher : public ccf::indexing::TransactionFetcher }; TEST_CASE( - "multi-threaded indexing - bucketed" * doctest::test_suite("indexing") * - doctest::may_fail(true)) + "multi-threaded indexing - bucketed" * doctest::test_suite("indexing")) { + srand(time(NULL)); + auto kv_store_p = std::make_shared(); auto& kv_store = *kv_store_p; @@ -833,16 +833,14 @@ TEST_CASE( auto results = index_a->get_write_txs_in_range(key, range_start, range_end); - std::chrono::milliseconds sleep_time(10); while (!results.has_value()) { // May be contesting for limited cached buckets with other users of this - // index (no handle for unique claims). Back-off exponentially, with - // random variation, to break deadlock + // index (no handle for unique claims). Uniform random sleep to avoid + // deadlock. + const auto sleep_time = std::chrono::milliseconds(rand() % 100); std::this_thread::sleep_for(sleep_time); - sleep_time += std::chrono::milliseconds(rand() % sleep_time.count()); - results = index_a->get_write_txs_in_range(key, range_start, range_end); }