diff --git a/.threading_canary b/.threading_canary index 580ef7df91bc..6607897a73a1 100644 --- a/.threading_canary +++ b/.threading_canary @@ -1 +1 @@ -This looks like a 'job' for Threading Canary! +This looks like a "job" for Threading Canary! diff --git a/include/ccf/indexing/strategies/seqnos_by_key_in_memory.h b/include/ccf/indexing/strategies/seqnos_by_key_in_memory.h index 56cac64d41d0..86c479ed7c08 100644 --- a/include/ccf/indexing/strategies/seqnos_by_key_in_memory.h +++ b/include/ccf/indexing/strategies/seqnos_by_key_in_memory.h @@ -2,6 +2,7 @@ // Licensed under the Apache 2.0 License. #pragma once +#include "ccf/ds/mutex.h" #include "ccf/indexing/strategies/visit_each_entry_in_map.h" #include "ccf/seq_no_collection.h" @@ -15,6 +16,9 @@ namespace ccf::indexing::strategies // Value is every SeqNo which talks about that key. std::unordered_map seqnos_by_key; + // Mutex guarding access to seqnos_by_key + ccf::Mutex lock; + void visit_entry( const ccf::TxID& tx_id, const ccf::ByteVector& k, diff --git a/src/consensus/aft/test/logging_stub.h b/src/consensus/aft/test/logging_stub.h index 0411615d5aa4..013da7eab9c2 100644 --- a/src/consensus/aft/test/logging_stub.h +++ b/src/consensus/aft/test/logging_stub.h @@ -17,6 +17,8 @@ namespace aft protected: ccf::NodeId _id; + std::mutex ledger_access; + public: std::vector> ledger; uint64_t skip_count = 0; @@ -31,6 +33,8 @@ namespace aft kv::Term term, kv::Version index) { + std::lock_guard lock(ledger_access); + // The payload that we eventually deserialise must include the // ledger entry as well as the View and Index that identify it. In // the real entries, they are nested in the payload and the IV. For @@ -73,6 +77,8 @@ namespace aft std::optional> get_entry_by_idx(size_t idx) { + std::lock_guard lock(ledger_access); + // Ledger indices are 1-based, hence the -1 if (idx > 0 && idx <= ledger.size()) { diff --git a/src/indexing/enclave_lfs_access.h b/src/indexing/enclave_lfs_access.h index a0caf637d84f..5c1899ed6ccb 100644 --- a/src/indexing/enclave_lfs_access.h +++ b/src/indexing/enclave_lfs_access.h @@ -6,6 +6,7 @@ #include "ccf/crypto/sha256.h" #include "ccf/crypto/symmetric_key.h" #include "ccf/ds/hex.h" +#include "ccf/ds/mutex.h" #include "ds/messaging.h" #include "indexing/lfs_interface.h" #include "indexing/lfs_ringbuffer_types.h" @@ -86,6 +87,7 @@ namespace ccf::indexing using PendingResult = std::weak_ptr; std::unordered_map pending; + ccf::Mutex pending_access; ringbuffer::WriterPtr to_host; @@ -136,6 +138,7 @@ namespace ccf::indexing dispatcher, LFSMsg::response, [this](const uint8_t* data, size_t size) { auto [obfuscated, encrypted] = ringbuffer::read_message(data, size); + std::lock_guard guard(pending_access); auto it = pending.find(obfuscated); if (it != pending.end()) { @@ -193,6 +196,7 @@ namespace ccf::indexing [this](const uint8_t* data, size_t size) { auto [obfuscated] = ringbuffer::read_message(data, size); + std::lock_guard guard(pending_access); auto it = pending.find(obfuscated); if (it != pending.end()) { @@ -258,6 +262,7 @@ namespace ccf::indexing FetchResultPtr fetch(const LFSKey& key) override { const auto obfuscated = obfuscate_key(key); + std::lock_guard guard(pending_access); auto it = pending.find(obfuscated); FetchResultPtr result; diff --git a/src/indexing/strategies/seqnos_by_key_bucketed.cpp b/src/indexing/strategies/seqnos_by_key_bucketed.cpp index 56b038b91c96..28082ea6acda 100644 --- a/src/indexing/strategies/seqnos_by_key_bucketed.cpp +++ b/src/indexing/strategies/seqnos_by_key_bucketed.cpp @@ -5,6 +5,7 @@ #include "ccf/ds/hex.h" #include "ccf/ds/logger.h" +#include "ccf/ds/mutex.h" #include "ds/lru.h" #include "ds/serialized.h" #include "indexing/lfs_interface.h" @@ -32,6 +33,8 @@ namespace ccf::indexing::strategies using BucketValue = std::pair; LRU old_results; + ccf::Mutex results_access; + std::string name; std::shared_ptr lfs_access; @@ -206,6 +209,8 @@ namespace ccf::indexing::strategies while (true) { + std::lock_guard guard(results_access); + const auto bucket_key = std::make_pair(serialised_key, from_range); const auto old_it = old_results.find(bucket_key); if (old_it != old_results.end()) @@ -350,6 +355,8 @@ namespace ccf::indexing::strategies { const auto range = impl->get_range_for(tx_id.seqno); + std::lock_guard guard(impl->results_access); + auto it = impl->current_results.find(k); if (it != impl->current_results.end()) { diff --git a/src/indexing/strategies/seqnos_by_key_in_memory.cpp b/src/indexing/strategies/seqnos_by_key_in_memory.cpp index cf157340d33f..11310cfdf406 100644 --- a/src/indexing/strategies/seqnos_by_key_in_memory.cpp +++ b/src/indexing/strategies/seqnos_by_key_in_memory.cpp @@ -8,6 +8,7 @@ namespace ccf::indexing::strategies void SeqnosByKey_InMemory_Untyped::visit_entry( const ccf::TxID& tx_id, const ccf::ByteVector& k, const ccf::ByteVector& v) { + std::lock_guard guard(lock); seqnos_by_key[k].insert(tx_id.seqno); } @@ -18,6 +19,7 @@ namespace ccf::indexing::strategies ccf::SeqNo to, std::optional max_seqnos) { + std::lock_guard guard(lock); const auto it = seqnos_by_key.find(serialised_key); if (it != seqnos_by_key.end()) { diff --git a/src/indexing/test/indexing.cpp b/src/indexing/test/indexing.cpp index 0c1816c5bd97..a0d084880c96 100644 --- a/src/indexing/test/indexing.cpp +++ b/src/indexing/test/indexing.cpp @@ -1,14 +1,18 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the Apache 2.0 License. +#include "ccf/indexing/strategies/seqnos_by_key_bucketed.h" #include "ccf/indexing/strategies/seqnos_by_key_in_memory.h" #include "consensus/aft/raft.h" #include "consensus/aft/test/logging_stub.h" #include "ds/test/stub_writer.h" +#include "host/lfs_file_handler.h" +#include "indexing/enclave_lfs_access.h" #include "indexing/historical_transaction_fetcher.h" #include "indexing/test/common.h" #include "node/share_manager.h" +#include #define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN #include @@ -448,3 +452,496 @@ TEST_CASE_TEMPLATE( index_a, index_b); } + +using namespace std::chrono_literals; +const auto max_multithread_run_time = 10s; + +// Uses the real classes, and access + update them concurrently +TEST_CASE( + "multi-threaded indexing - in memory" * doctest::test_suite("indexing")) +{ + auto kv_store_p = std::make_shared(); + auto& kv_store = *kv_store_p; + + auto ledger_secrets = std::make_shared(); + kv_store.set_encryptor(std::make_shared(ledger_secrets)); + + auto stub_writer = std::make_shared(); + auto cache = std::make_shared( + kv_store, ledger_secrets, stub_writer); + + auto fetcher = + std::make_shared(cache); + auto indexer_p = std::make_shared(fetcher); + auto& indexer = *indexer_p; + + auto index_a = std::make_shared(map_a); + REQUIRE(indexer.install_strategy(index_a)); + + auto index_b = std::make_shared(map_b); + REQUIRE(indexer.install_strategy(index_b)); + + auto ledger = add_raft_consensus(kv_store_p, indexer_p); + + ledger_secrets->init(); + { + INFO("Store one recovery member"); + // This is necessary to rekey the ledger and issue recovery shares for the + // new ledger secret + auto tx = kv_store.create_tx(); + auto config = tx.rw(ccf::Tables::CONFIGURATION); + constexpr size_t recovery_threshold = 1; + config->put({recovery_threshold}); + auto member_info = tx.rw(ccf::Tables::MEMBER_INFO); + auto member_public_encryption_keys = tx.rw( + ccf::Tables::MEMBER_ENCRYPTION_PUBLIC_KEYS); + + auto kp = crypto::make_key_pair(); + auto cert = kp->self_sign("CN=member", valid_from, valid_to); + auto member_id = + crypto::Sha256Hash(crypto::cert_pem_to_der(cert)).hex_str(); + + member_info->put(member_id, {ccf::MemberStatus::ACTIVE}); + member_public_encryption_keys->put( + member_id, crypto::make_rsa_key_pair()->public_key_pem()); + REQUIRE(tx.commit() == kv::CommitResult::SUCCESS); + } + + std::atomic finished = false; + std::atomic writes_to_hello = 0; + std::atomic writes_to_saluton = 0; + std::atomic writes_to_42 = 0; + + auto tx_advancer = [&]() { + size_t i = 0; + while (i < 1'000) + { + auto tx = kv_store.create_tx(); + tx.wo(map_a)->put(fmt::format("hello"), fmt::format("Value {}", i)); + ++writes_to_hello; + if (i % 2 == 0) + { + ++writes_to_saluton; + tx.wo(map_a)->put(fmt::format("saluton"), fmt::format("Value2 {}", i)); + } + if (i % 3 == 0) + { + ++writes_to_42; + tx.wo(map_b)->put(42, i); + } + + REQUIRE(tx.commit() == kv::CommitResult::SUCCESS); + ++i; + } + finished = true; + }; + + size_t handled_writes = 0; + const auto& writes = stub_writer->writes; + + auto fetch_index_a = [&]() { + while (true) + { + const auto hello = index_a->get_all_write_txs("hello"); + const auto saluton = index_a->get_all_write_txs("saluton"); + + if ( + finished && hello.has_value() && hello->size() == writes_to_hello && + saluton.has_value() && saluton->size() == writes_to_saluton) + { + break; + } + } + }; + + auto fetch_index_b = [&]() { + while (true) + { + const auto forty_two = index_b->get_all_write_txs(42); + + if ( + finished && forty_two.has_value() && forty_two->size() == writes_to_42) + { + break; + } + } + }; + + std::atomic work_done = false; + + std::thread index_ticker([&]() { + while (!work_done) + { + size_t loops = 0; + while (indexer.update_strategies(step_time, kv_store.current_txid()) || + handled_writes < writes.size()) + { + // Do the fetch, simulating an asynchronous fetch by the historical + // query system + for (auto it = writes.begin() + handled_writes; it != writes.end(); + ++it) + { + const auto& write = *it; + + const uint8_t* data = write.contents.data(); + size_t size = write.contents.size(); + REQUIRE(write.m == consensus::ledger_get_range); + auto [from_seqno, to_seqno, purpose_] = + ringbuffer::read_message(data, size); + auto& purpose = purpose_; + REQUIRE(purpose == consensus::LedgerRequestPurpose::HistoricalQuery); + + std::vector combined; + for (auto seqno = from_seqno; seqno <= to_seqno; ++seqno) + { + auto entry = ledger->get_raw_entry_by_idx(seqno); + if (!entry.has_value()) + { + // Possible that this operation beat consensus to the ledger, so + // pause and retry + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + entry = ledger->get_raw_entry_by_idx(seqno); + } + REQUIRE(entry.has_value()); + combined.insert(combined.end(), entry->begin(), entry->end()); + } + cache->handle_ledger_entries(from_seqno, to_seqno, combined); + } + + handled_writes = writes.end() - writes.begin(); + + if (loops++ > 100) + { + throw std::logic_error("Looks like a permanent loop"); + } + } + } + }); + + std::vector threads; + threads.emplace_back(tx_advancer); + threads.emplace_back(fetch_index_a); + threads.emplace_back(fetch_index_a); + threads.emplace_back(fetch_index_a); + threads.emplace_back(fetch_index_b); + threads.emplace_back(fetch_index_b); + + std::thread watchdog([&]() { + using Clock = std::chrono::system_clock; + const auto start_time = Clock::now(); + + while (!work_done) + { + const auto now = Clock::now(); + REQUIRE(now - start_time < max_multithread_run_time); + std::this_thread::sleep_for(50ms); + } + }); + + for (auto& thread : threads) + { + thread.join(); + } + + work_done = true; + index_ticker.join(); + watchdog.join(); +} + +class MockTransactionFetcher : public ccf::indexing::TransactionFetcher +{ + std::shared_ptr encryptor; + +public: + aft::LedgerStubProxy* ledger; + + MockTransactionFetcher(const std::shared_ptr& e) : + encryptor(e) + {} + + kv::ReadOnlyStorePtr deserialise_transaction( + ccf::SeqNo seqno, const uint8_t* data, size_t size) override + { + auto store = std::make_shared( + false /* Do not start from very first seqno */, + true /* Make use of historical secrets */); + + store->set_encryptor(encryptor); + + bool public_only = false; + auto exec = + store->deserialize({data, data + size}, ConsensusType::CFT, public_only); + if (exec == nullptr) + { + return nullptr; + } + + auto result = exec->apply(); + if (result == kv::ApplyResult::FAIL) + { + return nullptr; + } + + return store; + } + + std::vector fetch_transactions( + const ccf::SeqNoCollection& seqnos) override + { + std::vector ret; + + for (const auto& seqno : seqnos) + { + const auto entry = ledger->get_raw_entry_by_idx(seqno); + if (!entry.has_value()) + { + return {}; + } + + ret.push_back( + deserialise_transaction(seqno, entry->data(), entry->size())); + } + + return ret; + } +}; + +TEST_CASE( + "multi-threaded indexing - bucketed" * doctest::test_suite("indexing")) +{ + srand(time(NULL)); + + auto kv_store_p = std::make_shared(); + auto& kv_store = *kv_store_p; + + auto ledger_secrets = std::make_shared(); + auto encryptor = std::make_shared(ledger_secrets); + kv_store.set_encryptor(encryptor); + + auto stub_writer = std::make_shared(); + auto cache = std::make_shared( + kv_store, ledger_secrets, stub_writer); + + auto fetcher = std::make_shared(encryptor); + auto indexer_p = std::make_shared(fetcher); + auto& indexer = *indexer_p; + + messaging::BufferProcessor host_bp("lfs_host"); + messaging::BufferProcessor enclave_bp("lfs_enclave"); + + constexpr size_t buf_size = 1 << 16; + auto inbound_buffer = std::make_unique(buf_size); + ringbuffer::Reader inbound_reader(inbound_buffer->bd); + auto outbound_buffer = std::make_unique(buf_size); + + ringbuffer::Reader outbound_reader(outbound_buffer->bd); + asynchost::LFSFileHandler host_files( + std::make_shared(inbound_reader)); + host_files.register_message_handlers(host_bp.get_dispatcher()); + + auto enclave_lfs = std::make_shared( + std::make_shared(outbound_reader)); + enclave_lfs->register_message_handlers(enclave_bp.get_dispatcher()); + + ccfapp::AbstractNodeContext node_context; + node_context.install_subsystem(enclave_lfs); + + using IndexA_Bucketed = + ccf::indexing::strategies::SeqnosByKey_Bucketed; + auto index_a = std::make_shared(map_a, node_context, 100, 5); + REQUIRE(indexer.install_strategy(index_a)); + + auto index_b = std::make_shared(map_b); + REQUIRE(indexer.install_strategy(index_b)); + + auto ledger = add_raft_consensus(kv_store_p, indexer_p); + fetcher->ledger = ledger; + + ledger_secrets->init(); + { + INFO("Store one recovery member"); + // This is necessary to rekey the ledger and issue recovery shares for the + // new ledger secret + auto tx = kv_store.create_tx(); + auto config = tx.rw(ccf::Tables::CONFIGURATION); + constexpr size_t recovery_threshold = 1; + config->put({recovery_threshold}); + auto member_info = tx.rw(ccf::Tables::MEMBER_INFO); + auto member_public_encryption_keys = tx.rw( + ccf::Tables::MEMBER_ENCRYPTION_PUBLIC_KEYS); + + auto kp = crypto::make_key_pair(); + auto cert = kp->self_sign("CN=member", valid_from, valid_to); + auto member_id = + crypto::Sha256Hash(crypto::cert_pem_to_der(cert)).hex_str(); + + member_info->put(member_id, {ccf::MemberStatus::ACTIVE}); + member_public_encryption_keys->put( + member_id, crypto::make_rsa_key_pair()->public_key_pem()); + REQUIRE(tx.commit() == kv::CommitResult::SUCCESS); + } + + std::atomic all_submitted = false; + std::atomic writes_to_hello = 0; + std::atomic writes_to_saluton = 0; + std::atomic writes_to_42 = 0; + + auto tx_advancer = [&]() { + size_t i = 0; + constexpr auto tx_count = +#if NDEBUG + 1'000; +#else + 100; +#endif + + while (i < tx_count) + { + auto tx = kv_store.create_tx(); + tx.wo(map_a)->put(fmt::format("hello"), fmt::format("Value {}", i)); + ++writes_to_hello; + if (i % 2 == 0) + { + ++writes_to_saluton; + tx.wo(map_a)->put(fmt::format("saluton"), fmt::format("Value2 {}", i)); + } + if (i % 3 == 0) + { + ++writes_to_42; + tx.wo(map_b)->put(42, i); + } + + REQUIRE(tx.commit() == kv::CommitResult::SUCCESS); + ++i; + std::this_thread::yield(); + } + all_submitted = true; + }; + + auto get_all = + [&](const std::string& key) -> std::optional { + const auto max_range = index_a->max_requestable_range(); + auto range_start = 0; + + ccf::SeqNoCollection all_results; + + while (true) + { + const auto end_seqno = kv_store.get_txid().seqno; + const auto range_end = std::min(end_seqno, range_start + max_range); + + auto results = + index_a->get_write_txs_in_range(key, range_start, range_end); + + while (!results.has_value()) + { + // May be contesting for limited cached buckets with other users of this + // index (no handle for unique claims). Uniform random sleep to avoid + // deadlock. + const auto sleep_time = std::chrono::milliseconds(rand() % 100); + std::this_thread::sleep_for(sleep_time); + + results = index_a->get_write_txs_in_range(key, range_start, range_end); + } + + for (auto seqno : *results) + { + all_results.insert(seqno); + } + + if (range_end == end_seqno) + { + return all_results; + } + else + { + range_start = range_end + 1; + } + } + }; + + auto fetch_index_a = [&]() { + while (true) + { + const auto hello = get_all("hello"); + const auto saluton = get_all("saluton"); + + if ( + all_submitted && hello.has_value() && + hello->size() == writes_to_hello && saluton.has_value() && + saluton->size() == writes_to_saluton) + { + break; + } + + std::this_thread::yield(); + } + }; + + auto fetch_index_b = [&]() { + while (true) + { + const auto forty_two = index_b->get_all_write_txs(42); + + if ( + all_submitted && forty_two.has_value() && + forty_two->size() == writes_to_42) + { + break; + } + + std::this_thread::yield(); + } + }; + + std::vector threads; + threads.emplace_back(tx_advancer); + threads.emplace_back(fetch_index_a); + threads.emplace_back(fetch_index_a); + threads.emplace_back(fetch_index_a); + threads.emplace_back(fetch_index_b); + threads.emplace_back(fetch_index_b); + + std::atomic work_done = false; + + std::thread ringbuffer_flusher([&]() { + while (!work_done) + { + host_bp.read_all(outbound_reader); + enclave_bp.read_all(inbound_reader); + std::this_thread::yield(); + } + }); + + std::thread index_ticker([&]() { + while (!work_done) + { + while (indexer.update_strategies(step_time, kv_store.current_txid())) + { + std::this_thread::yield(); + } + } + }); + + std::thread watchdog([&]() { + using Clock = std::chrono::system_clock; + const auto start_time = Clock::now(); + + while (!work_done) + { + const auto now = Clock::now(); + REQUIRE(now - start_time < max_multithread_run_time); + std::this_thread::sleep_for(50ms); + } + }); + + for (auto& thread : threads) + { + thread.join(); + } + + work_done = true; + ringbuffer_flusher.join(); + index_ticker.join(); + watchdog.join(); +} \ No newline at end of file