From 14080027c011c5ea074dddd54e13628e198d0b94 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Mon, 25 Jul 2022 12:18:28 +0000 Subject: [PATCH 01/13] Add a test of multi-threaded indexing --- src/indexing/test/indexing.cpp | 167 +++++++++++++++++++++++++++++++++ 1 file changed, 167 insertions(+) diff --git a/src/indexing/test/indexing.cpp b/src/indexing/test/indexing.cpp index 0c1816c5bd97..e2c61c43cd5d 100644 --- a/src/indexing/test/indexing.cpp +++ b/src/indexing/test/indexing.cpp @@ -448,3 +448,170 @@ TEST_CASE_TEMPLATE( index_a, index_b); } + +// Uses the real classes, and access + update them concurrently +TEST_CASE("multi-threaded indexing" * doctest::test_suite("indexing")) +{ + auto kv_store_p = std::make_shared(); + auto& kv_store = *kv_store_p; + + auto ledger_secrets = std::make_shared(); + kv_store.set_encryptor(std::make_shared(ledger_secrets)); + + auto stub_writer = std::make_shared(); + auto cache = std::make_shared( + kv_store, ledger_secrets, stub_writer); + + auto fetcher = + std::make_shared(cache); + auto indexer_p = std::make_shared(fetcher); + auto& indexer = *indexer_p; + + auto index_a = std::make_shared(map_a); + REQUIRE(indexer.install_strategy(index_a)); + + auto index_b = std::make_shared(map_b); + REQUIRE(indexer.install_strategy(index_b)); + + auto ledger = add_raft_consensus(kv_store_p, indexer_p); + + ledger_secrets->init(); + { + INFO("Store one recovery member"); + // This is necessary to rekey the ledger and issue recovery shares for the + // new ledger secret + auto tx = kv_store.create_tx(); + auto config = tx.rw(ccf::Tables::CONFIGURATION); + constexpr size_t recovery_threshold = 1; + config->put({recovery_threshold}); + auto member_info = tx.rw(ccf::Tables::MEMBER_INFO); + auto member_public_encryption_keys = tx.rw( + ccf::Tables::MEMBER_ENCRYPTION_PUBLIC_KEYS); + + auto kp = crypto::make_key_pair(); + auto cert = kp->self_sign("CN=member", valid_from, valid_to); + auto member_id = + crypto::Sha256Hash(crypto::cert_pem_to_der(cert)).hex_str(); + + member_info->put(member_id, {ccf::MemberStatus::ACTIVE}); + member_public_encryption_keys->put( + member_id, crypto::make_rsa_key_pair()->public_key_pem()); + REQUIRE(tx.commit() == kv::CommitResult::SUCCESS); + } + + std::atomic finished = false; + + auto tx_advancer = [&]() { + size_t i = 0; + while (i < 1000) + { + auto tx = kv_store.create_tx(); + tx.wo(map_a)->put(fmt::format("hello"), fmt::format("Value {}", i)); + if (i % 2 == 0) + { + tx.wo(map_a)->put(fmt::format("saluton"), fmt::format("Value2 {}", i)); + } + if (i % 3 == 0) + { + tx.wo(map_b)->put(42, i); + } + + REQUIRE(tx.commit() == kv::CommitResult::SUCCESS); + ++i; + } + finished = true; + }; + + size_t handled_writes = 0; + const auto& writes = stub_writer->writes; + + auto index_ticker = [&]() { + while (!finished) + { + size_t loops = 0; + while (indexer.update_strategies(step_time, kv_store.current_txid()) || + handled_writes < writes.size()) + { + // Do the fetch, simulating an asynchronous fetch by the historical + // query system + for (auto it = writes.begin() + handled_writes; it != writes.end(); + ++it) + { + const auto& write = *it; + + const uint8_t* data = write.contents.data(); + size_t size = write.contents.size(); + REQUIRE(write.m == consensus::ledger_get_range); + auto [from_seqno, to_seqno, purpose_] = + ringbuffer::read_message(data, size); + auto& purpose = purpose_; + REQUIRE(purpose == consensus::LedgerRequestPurpose::HistoricalQuery); + + std::vector combined; + for (auto seqno = from_seqno; seqno <= to_seqno; ++seqno) + { + auto entry = ledger->get_raw_entry_by_idx(seqno); + if (!entry.has_value()) + { + // Possible that this operation beat consensus to the ledger, so + // pause and retry + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + entry = ledger->get_raw_entry_by_idx(seqno); + } + REQUIRE(entry.has_value()); + combined.insert(combined.end(), entry->begin(), entry->end()); + } + cache->handle_ledger_entries(from_seqno, to_seqno, combined); + } + + handled_writes = writes.end() - writes.begin(); + + if (loops++ > 100) + { + throw std::logic_error("Looks like a permanent loop"); + } + } + } + }; + + // auto fetch_index_a = [&]() { + // while (!finished) + // { + // const auto a = index_a->get_all_write_txs("hello"); + // if (a.has_value()) + // { + // fmt::print("hello: {}\n", a->size()); + // } + // const auto b = index_a->get_all_write_txs("saluton"); + // if (b.has_value()) + // { + // fmt::print("saluton: {}\n", b->size()); + // } + // } + // }; + + // auto fetch_index_b = [&]() { + // while (!finished) + // { + // const auto c = index_b->get_all_write_txs(42); + // if (c.has_value()) + // { + // fmt::print("42: {}\n", c->size()); + // } + // } + // }; + + std::vector threads; + threads.emplace_back(tx_advancer); + threads.emplace_back(index_ticker); + // threads.emplace_back(fetch_index_a); + // threads.emplace_back(fetch_index_a); + // threads.emplace_back(fetch_index_a); + // threads.emplace_back(fetch_index_b); + // threads.emplace_back(fetch_index_b); + + for (auto& thread : threads) + { + thread.join(); + } +} \ No newline at end of file From 2e6242f1fd7a3d86618282b3127af4e94c6b97e3 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Mon, 25 Jul 2022 12:18:42 +0000 Subject: [PATCH 02/13] Lock appropriately in the test helpers --- src/consensus/aft/test/logging_stub.h | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/consensus/aft/test/logging_stub.h b/src/consensus/aft/test/logging_stub.h index 08d4396513b4..95458a0805f9 100644 --- a/src/consensus/aft/test/logging_stub.h +++ b/src/consensus/aft/test/logging_stub.h @@ -17,6 +17,8 @@ namespace aft protected: ccf::NodeId _id; + std::mutex ledger_access; + public: std::vector> ledger; uint64_t skip_count = 0; @@ -31,6 +33,8 @@ namespace aft kv::Term term, kv::Version index) { + std::lock_guard lock(ledger_access); + // The payload that we eventually deserialise must include the // ledger entry as well as the View and Index that identify it. In // the real entries, they are nested in the payload and the IV. For @@ -73,6 +77,8 @@ namespace aft std::optional> get_entry_by_idx(size_t idx) { + std::lock_guard lock(ledger_access); + // Ledger indices are 1-based, hence the -1 if (idx > 0 && idx <= ledger.size()) { From 1d69849b115e6b9a13fbb9669a93f4142157d8cf Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Mon, 25 Jul 2022 12:22:53 +0000 Subject: [PATCH 03/13] Enable the actual indexing, shim tests --- src/indexing/test/indexing.cpp | 62 +++++++++++++++++----------------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/src/indexing/test/indexing.cpp b/src/indexing/test/indexing.cpp index e2c61c43cd5d..57766f4b9057 100644 --- a/src/indexing/test/indexing.cpp +++ b/src/indexing/test/indexing.cpp @@ -574,41 +574,41 @@ TEST_CASE("multi-threaded indexing" * doctest::test_suite("indexing")) } }; - // auto fetch_index_a = [&]() { - // while (!finished) - // { - // const auto a = index_a->get_all_write_txs("hello"); - // if (a.has_value()) - // { - // fmt::print("hello: {}\n", a->size()); - // } - // const auto b = index_a->get_all_write_txs("saluton"); - // if (b.has_value()) - // { - // fmt::print("saluton: {}\n", b->size()); - // } - // } - // }; - - // auto fetch_index_b = [&]() { - // while (!finished) - // { - // const auto c = index_b->get_all_write_txs(42); - // if (c.has_value()) - // { - // fmt::print("42: {}\n", c->size()); - // } - // } - // }; + auto fetch_index_a = [&]() { + while (!finished) + { + const auto a = index_a->get_all_write_txs("hello"); + if (a.has_value()) + { + fmt::print("hello: {}\n", a->size()); + } + const auto b = index_a->get_all_write_txs("saluton"); + if (b.has_value()) + { + fmt::print("saluton: {}\n", b->size()); + } + } + }; + + auto fetch_index_b = [&]() { + while (!finished) + { + const auto c = index_b->get_all_write_txs(42); + if (c.has_value()) + { + fmt::print("42: {}\n", c->size()); + } + } + }; std::vector threads; threads.emplace_back(tx_advancer); threads.emplace_back(index_ticker); - // threads.emplace_back(fetch_index_a); - // threads.emplace_back(fetch_index_a); - // threads.emplace_back(fetch_index_a); - // threads.emplace_back(fetch_index_b); - // threads.emplace_back(fetch_index_b); + threads.emplace_back(fetch_index_a); + threads.emplace_back(fetch_index_a); + threads.emplace_back(fetch_index_a); + threads.emplace_back(fetch_index_b); + threads.emplace_back(fetch_index_b); for (auto& thread : threads) { From bb6020f2028e7d5d5e6abe93ceb23a72d40ba3ee Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Mon, 25 Jul 2022 12:24:40 +0000 Subject: [PATCH 04/13] Add some mutex locking --- include/ccf/indexing/strategies/seqnos_by_key_in_memory.h | 4 ++++ src/indexing/strategies/seqnos_by_key_in_memory.cpp | 2 ++ 2 files changed, 6 insertions(+) diff --git a/include/ccf/indexing/strategies/seqnos_by_key_in_memory.h b/include/ccf/indexing/strategies/seqnos_by_key_in_memory.h index 56cac64d41d0..9b19a6e2a479 100644 --- a/include/ccf/indexing/strategies/seqnos_by_key_in_memory.h +++ b/include/ccf/indexing/strategies/seqnos_by_key_in_memory.h @@ -2,6 +2,7 @@ // Licensed under the Apache 2.0 License. #pragma once +#include "ccf/ds/pal.h" #include "ccf/indexing/strategies/visit_each_entry_in_map.h" #include "ccf/seq_no_collection.h" @@ -15,6 +16,9 @@ namespace ccf::indexing::strategies // Value is every SeqNo which talks about that key. std::unordered_map seqnos_by_key; + // Mutex guarding access to seqnos_by_key + ccf::Pal::Mutex lock; + void visit_entry( const ccf::TxID& tx_id, const ccf::ByteVector& k, diff --git a/src/indexing/strategies/seqnos_by_key_in_memory.cpp b/src/indexing/strategies/seqnos_by_key_in_memory.cpp index cf157340d33f..2ff4d9230be9 100644 --- a/src/indexing/strategies/seqnos_by_key_in_memory.cpp +++ b/src/indexing/strategies/seqnos_by_key_in_memory.cpp @@ -8,6 +8,7 @@ namespace ccf::indexing::strategies void SeqnosByKey_InMemory_Untyped::visit_entry( const ccf::TxID& tx_id, const ccf::ByteVector& k, const ccf::ByteVector& v) { + std::lock_guard guard(lock); seqnos_by_key[k].insert(tx_id.seqno); } @@ -18,6 +19,7 @@ namespace ccf::indexing::strategies ccf::SeqNo to, std::optional max_seqnos) { + std::lock_guard guard(lock); const auto it = seqnos_by_key.find(serialised_key); if (it != seqnos_by_key.end()) { From b823c7e182a1c4052ed498c5ad6a96196780c92a Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Mon, 25 Jul 2022 12:42:40 +0000 Subject: [PATCH 05/13] Add some actual requirements, and a timeout --- src/indexing/test/indexing.cpp | 55 +++++++++++++++++++++++++--------- 1 file changed, 41 insertions(+), 14 deletions(-) diff --git a/src/indexing/test/indexing.cpp b/src/indexing/test/indexing.cpp index 57766f4b9057..e6f9c601a177 100644 --- a/src/indexing/test/indexing.cpp +++ b/src/indexing/test/indexing.cpp @@ -500,19 +500,25 @@ TEST_CASE("multi-threaded indexing" * doctest::test_suite("indexing")) } std::atomic finished = false; + std::atomic writes_to_hello = 0; + std::atomic writes_to_saluton = 0; + std::atomic writes_to_42 = 0; auto tx_advancer = [&]() { size_t i = 0; - while (i < 1000) + while (i < 1'000) { auto tx = kv_store.create_tx(); tx.wo(map_a)->put(fmt::format("hello"), fmt::format("Value {}", i)); + ++writes_to_hello; if (i % 2 == 0) { + ++writes_to_saluton; tx.wo(map_a)->put(fmt::format("saluton"), fmt::format("Value2 {}", i)); } if (i % 3 == 0) { + ++writes_to_42; tx.wo(map_b)->put(42, i); } @@ -575,28 +581,29 @@ TEST_CASE("multi-threaded indexing" * doctest::test_suite("indexing")) }; auto fetch_index_a = [&]() { - while (!finished) + while (true) { - const auto a = index_a->get_all_write_txs("hello"); - if (a.has_value()) - { - fmt::print("hello: {}\n", a->size()); - } - const auto b = index_a->get_all_write_txs("saluton"); - if (b.has_value()) + const auto hello = index_a->get_all_write_txs("hello"); + const auto saluton = index_a->get_all_write_txs("saluton"); + + if ( + finished && hello.has_value() && hello->size() == writes_to_hello && + saluton.has_value() && saluton->size() == writes_to_saluton) { - fmt::print("saluton: {}\n", b->size()); + break; } } }; auto fetch_index_b = [&]() { - while (!finished) + while (true) { - const auto c = index_b->get_all_write_txs(42); - if (c.has_value()) + const auto forty_two = index_b->get_all_write_txs(42); + + if ( + finished && forty_two.has_value() && forty_two->size() == writes_to_42) { - fmt::print("42: {}\n", c->size()); + break; } } }; @@ -610,8 +617,28 @@ TEST_CASE("multi-threaded indexing" * doctest::test_suite("indexing")) threads.emplace_back(fetch_index_b); threads.emplace_back(fetch_index_b); + std::atomic work_done = false; + + std::thread watchdog([&]() { + using Clock = std::chrono::system_clock; + const auto start_time = Clock::now(); + + using namespace std::chrono_literals; + const auto max_run_time = 10s; + + while (!work_done) + { + const auto now = Clock::now(); + REQUIRE(now - start_time < max_run_time); + std::this_thread::sleep_for(50ms); + } + }); + for (auto& thread : threads) { thread.join(); } + + work_done = true; + watchdog.join(); } \ No newline at end of file From 619082a643b8d7fbd1e45873caf289bbaa437784 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Mon, 25 Jul 2022 16:49:55 +0000 Subject: [PATCH 06/13] Thread safety on LFS access --- src/indexing/enclave_lfs_access.h | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/indexing/enclave_lfs_access.h b/src/indexing/enclave_lfs_access.h index a0caf637d84f..e280dd92a185 100644 --- a/src/indexing/enclave_lfs_access.h +++ b/src/indexing/enclave_lfs_access.h @@ -86,6 +86,7 @@ namespace ccf::indexing using PendingResult = std::weak_ptr; std::unordered_map pending; + ccf::Pal::Mutex pending_access; ringbuffer::WriterPtr to_host; @@ -136,6 +137,7 @@ namespace ccf::indexing dispatcher, LFSMsg::response, [this](const uint8_t* data, size_t size) { auto [obfuscated, encrypted] = ringbuffer::read_message(data, size); + std::lock_guard lock(pending_access); auto it = pending.find(obfuscated); if (it != pending.end()) { @@ -193,6 +195,7 @@ namespace ccf::indexing [this](const uint8_t* data, size_t size) { auto [obfuscated] = ringbuffer::read_message(data, size); + std::lock_guard lock(pending_access); auto it = pending.find(obfuscated); if (it != pending.end()) { @@ -258,6 +261,8 @@ namespace ccf::indexing FetchResultPtr fetch(const LFSKey& key) override { const auto obfuscated = obfuscate_key(key); + + std::lock_guard lock(pending_access); auto it = pending.find(obfuscated); FetchResultPtr result; From 0f6b1bb684deebf83f41772a20d5ae30162459ec Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Tue, 26 Jul 2022 13:53:18 +0000 Subject: [PATCH 07/13] WIP test for bucketed multi-threaded indexing --- src/indexing/test/indexing.cpp | 271 ++++++++++++++++++++++++++++++++- 1 file changed, 269 insertions(+), 2 deletions(-) diff --git a/src/indexing/test/indexing.cpp b/src/indexing/test/indexing.cpp index e6f9c601a177..fa717777d695 100644 --- a/src/indexing/test/indexing.cpp +++ b/src/indexing/test/indexing.cpp @@ -1,10 +1,13 @@ // Copyright (c) Microsoft Corporation. All rights reserved. // Licensed under the Apache 2.0 License. +#include "ccf/indexing/strategies/seqnos_by_key_bucketed.h" #include "ccf/indexing/strategies/seqnos_by_key_in_memory.h" #include "consensus/aft/raft.h" #include "consensus/aft/test/logging_stub.h" #include "ds/test/stub_writer.h" +#include "host/lfs_file_handler.h" +#include "indexing/enclave_lfs_access.h" #include "indexing/historical_transaction_fetcher.h" #include "indexing/test/common.h" #include "node/share_manager.h" @@ -450,7 +453,7 @@ TEST_CASE_TEMPLATE( } // Uses the real classes, and access + update them concurrently -TEST_CASE("multi-threaded indexing" * doctest::test_suite("indexing")) +TEST_CASE("multi-threaded indexing - in memory" * doctest::test_suite("indexing")) { auto kv_store_p = std::make_shared(); auto& kv_store = *kv_store_p; @@ -641,4 +644,268 @@ TEST_CASE("multi-threaded indexing" * doctest::test_suite("indexing")) work_done = true; watchdog.join(); -} \ No newline at end of file +} + +// // Uses the real classes, and access + update them concurrently +// using IndexA_Bucketed = +// ccf::indexing::strategies::SeqnosByKey_Bucketed; +// TEST_CASE_TEMPLATE( +// "multi-threaded indexing - bucketed" * doctest::test_suite("indexing"), +// AA, +// // IndexA +// // , +// IndexA_Bucketed) +// { +// auto kv_store_p = std::make_shared(); +// auto& kv_store = *kv_store_p; + +// auto ledger_secrets = std::make_shared(); +// kv_store.set_encryptor(std::make_shared(ledger_secrets)); + +// auto stub_writer = std::make_shared(); +// auto cache = std::make_shared( +// kv_store, ledger_secrets, stub_writer); + +// auto fetcher = std::make_shared(); +// auto indexer_p = std::make_shared(fetcher); +// auto& indexer = *indexer_p; + +// messaging::BufferProcessor host_bp("lfs_host"); +// messaging::BufferProcessor enclave_bp("lfs_enclave"); + +// constexpr size_t buf_size = 1 << 16; +// auto inbound_buffer = std::make_unique(buf_size); +// ringbuffer::Reader inbound_reader(inbound_buffer->bd); +// auto outbound_buffer = std::make_unique(buf_size); + +// ringbuffer::Reader outbound_reader(outbound_buffer->bd); +// asynchost::LFSFileHandler host_files( +// std::make_shared(inbound_reader)); +// host_files.register_message_handlers(host_bp.get_dispatcher()); + +// auto enclave_lfs = std::make_shared( +// std::make_shared(outbound_reader)); +// enclave_lfs->register_message_handlers(enclave_bp.get_dispatcher()); + +// ccfapp::AbstractNodeContext node_context; +// node_context.install_subsystem(enclave_lfs); + +// ccf::indexing::StrategyPtr index_a = nullptr; +// if constexpr (std::is_same_v) +// { +// index_a = std::make_shared(map_a, node_context, 100, 5); +// } +// else +// { +// index_a = std::make_shared(map_a); +// } + +// REQUIRE(indexer.install_strategy(index_a)); + +// auto index_b = std::make_shared(map_b); +// REQUIRE(indexer.install_strategy(index_b)); + +// auto ledger = add_raft_consensus(kv_store_p, indexer_p); + +// ledger_secrets->init(); +// { +// INFO("Store one recovery member"); +// // This is necessary to rekey the ledger and issue recovery shares for the +// // new ledger secret +// auto tx = kv_store.create_tx(); +// auto config = tx.rw(ccf::Tables::CONFIGURATION); +// constexpr size_t recovery_threshold = 1; +// config->put({recovery_threshold}); +// auto member_info = tx.rw(ccf::Tables::MEMBER_INFO); +// auto member_public_encryption_keys = tx.rw( +// ccf::Tables::MEMBER_ENCRYPTION_PUBLIC_KEYS); + +// auto kp = crypto::make_key_pair(); +// auto cert = kp->self_sign("CN=member", valid_from, valid_to); +// auto member_id = +// crypto::Sha256Hash(crypto::cert_pem_to_der(cert)).hex_str(); + +// member_info->put(member_id, {ccf::MemberStatus::ACTIVE}); +// member_public_encryption_keys->put( +// member_id, crypto::make_rsa_key_pair()->public_key_pem()); +// REQUIRE(tx.commit() == kv::CommitResult::SUCCESS); +// } + +// std::atomic finished = false; +// std::atomic writes_to_hello = 0; +// std::atomic writes_to_saluton = 0; +// std::atomic writes_to_42 = 0; + +// auto tx_advancer = [&]() { +// size_t i = 0; +// while (i < 1'000) +// { +// auto tx = kv_store.create_tx(); +// tx.wo(map_a)->put(fmt::format("hello"), fmt::format("Value {}", i)); +// ++writes_to_hello; +// if (i % 2 == 0) +// { +// ++writes_to_saluton; +// tx.wo(map_a)->put(fmt::format("saluton"), fmt::format("Value2 {}", i)); +// } +// if (i % 3 == 0) +// { +// ++writes_to_42; +// tx.wo(map_b)->put(42, i); +// } + +// REQUIRE(tx.commit() == kv::CommitResult::SUCCESS); +// ++i; +// } +// finished = true; +// }; + +// auto index_ticker = [&]() { +// while (!finished) +// { +// std::lock_guard guard(fetcher->lock); +// while (indexer.update_strategies(step_time, kv_store.current_txid()) || +// !fetcher->requested.empty()) +// { +// // Do the fetch, simulating an asynchronous fetch by the historical +// // query system +// for (auto seqno : fetcher->requested) +// { +// const auto entry = ledger->get_raw_entry_by_idx(seqno); +// REQUIRE(entry.has_value()); +// fetcher->fetched_stores[seqno] = fetcher->deserialise_transaction( +// seqno, entry->data(), entry->size()); +// } +// fetcher->requested.clear(); +// } +// } +// }; + +// auto fetch_index_a = [&]() { +// while (true) +// { +// std::optional hello; +// std::optional saluton; + +// auto idx_a = std::dynamic_pointer_cast(index_a); +// if constexpr (std::is_same_v) +// { +// auto get_all = [&](const std::string& key) { +// const auto max_range = idx_a->max_requestable_range(); +// const auto end_seqno = kv_store.get_txid().seqno; +// auto range_start = 0; + +// ccf::SeqNoCollection all_results; + +// auto next_end = [&]() { +// return std::min(end_seqno, range_start + max_range); +// }; + +// auto range_end = next_end(); + +// while (true) +// { +// auto results = +// idx_a->get_write_txs_in_range(key, range_start, range_end); + +// if (!results.has_value()) +// { +// // This required an async load from disk +// std::this_thread::sleep_for(std::chrono::milliseconds(50)); + +// results = +// idx_a->get_write_txs_in_range(key, range_start, range_end); +// REQUIRE(results.has_value()); +// } + +// for (auto seqno : *results) +// { +// all_results.insert(seqno); +// } + +// if (range_end == end_seqno) +// { +// return all_results; +// } +// else +// { +// range_start = range_end + 1; +// range_end = next_end(); +// } +// } +// }; + +// hello = get_all("hello"); +// saluton = get_all("saluton"); +// } +// else +// { +// hello = idx_a->get_all_write_txs("hello"); +// saluton = idx_a->get_all_write_txs("saluton"); +// } + +// if ( +// finished && hello.has_value() && hello->size() == writes_to_hello && +// saluton.has_value() && saluton->size() == writes_to_saluton) +// { +// break; +// } +// } +// }; + +// auto fetch_index_b = [&]() { +// while (true) +// { +// const auto forty_two = index_b->get_all_write_txs(42); + +// if ( +// finished && forty_two.has_value() && forty_two->size() == writes_to_42) +// { +// break; +// } +// } +// }; + +// std::vector threads; +// threads.emplace_back(tx_advancer); +// threads.emplace_back(index_ticker); +// threads.emplace_back(fetch_index_a); +// threads.emplace_back(fetch_index_a); +// threads.emplace_back(fetch_index_a); +// threads.emplace_back(fetch_index_b); +// threads.emplace_back(fetch_index_b); + +// std::atomic work_done = false; + +// std::thread ringbuffer_flusher([&]() { +// while (!work_done) +// { +// host_bp.read_all(outbound_reader); +// enclave_bp.read_all(inbound_reader); +// } +// }); + +// std::thread watchdog([&]() { +// using Clock = std::chrono::system_clock; +// const auto start_time = Clock::now(); + +// using namespace std::chrono_literals; +// const auto max_run_time = 10s; + +// while (!work_done) +// { +// const auto now = Clock::now(); +// REQUIRE(now - start_time < max_run_time); +// std::this_thread::sleep_for(50ms); +// } +// }); + +// for (auto& thread : threads) +// { +// thread.join(); +// } + +// work_done = true; +// ringbuffer_flusher.join(); +// watchdog.join(); +// } \ No newline at end of file From dea52256b6ef628d2612fc4d48d9947c2fd3f0bd Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Tue, 26 Jul 2022 17:00:22 +0000 Subject: [PATCH 08/13] Add some locking --- src/indexing/strategies/seqnos_by_key_bucketed.cpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/indexing/strategies/seqnos_by_key_bucketed.cpp b/src/indexing/strategies/seqnos_by_key_bucketed.cpp index 56b038b91c96..4d6b003ee3bc 100644 --- a/src/indexing/strategies/seqnos_by_key_bucketed.cpp +++ b/src/indexing/strategies/seqnos_by_key_bucketed.cpp @@ -5,6 +5,7 @@ #include "ccf/ds/hex.h" #include "ccf/ds/logger.h" +#include "ccf/ds/pal.h" #include "ds/lru.h" #include "ds/serialized.h" #include "indexing/lfs_interface.h" @@ -32,6 +33,8 @@ namespace ccf::indexing::strategies using BucketValue = std::pair; LRU old_results; + ccf::Pal::Mutex results_access; + std::string name; std::shared_ptr lfs_access; @@ -206,6 +209,8 @@ namespace ccf::indexing::strategies while (true) { + std::lock_guard guard(results_access); + const auto bucket_key = std::make_pair(serialised_key, from_range); const auto old_it = old_results.find(bucket_key); if (old_it != old_results.end()) @@ -350,6 +355,8 @@ namespace ccf::indexing::strategies { const auto range = impl->get_range_for(tx_id.seqno); + std::lock_guard guard(impl->results_access); + auto it = impl->current_results.find(k); if (it != impl->current_results.end()) { From 92b8bc26ad4f0ebd8faa9adda7ec3c784923a72e Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Tue, 26 Jul 2022 17:00:28 +0000 Subject: [PATCH 09/13] Still stalled - stumped --- src/indexing/test/indexing.cpp | 555 +++++++++++++++++---------------- 1 file changed, 291 insertions(+), 264 deletions(-) diff --git a/src/indexing/test/indexing.cpp b/src/indexing/test/indexing.cpp index fa717777d695..bcf98d9ccc81 100644 --- a/src/indexing/test/indexing.cpp +++ b/src/indexing/test/indexing.cpp @@ -12,6 +12,7 @@ #include "indexing/test/common.h" #include "node/share_manager.h" +#include #define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN #include @@ -453,7 +454,8 @@ TEST_CASE_TEMPLATE( } // Uses the real classes, and access + update them concurrently -TEST_CASE("multi-threaded indexing - in memory" * doctest::test_suite("indexing")) +TEST_CASE( + "multi-threaded indexing - in memory" * doctest::test_suite("indexing")) { auto kv_store_p = std::make_shared(); auto& kv_store = *kv_store_p; @@ -646,266 +648,291 @@ TEST_CASE("multi-threaded indexing - in memory" * doctest::test_suite("indexing" watchdog.join(); } -// // Uses the real classes, and access + update them concurrently -// using IndexA_Bucketed = -// ccf::indexing::strategies::SeqnosByKey_Bucketed; -// TEST_CASE_TEMPLATE( -// "multi-threaded indexing - bucketed" * doctest::test_suite("indexing"), -// AA, -// // IndexA -// // , -// IndexA_Bucketed) -// { -// auto kv_store_p = std::make_shared(); -// auto& kv_store = *kv_store_p; - -// auto ledger_secrets = std::make_shared(); -// kv_store.set_encryptor(std::make_shared(ledger_secrets)); - -// auto stub_writer = std::make_shared(); -// auto cache = std::make_shared( -// kv_store, ledger_secrets, stub_writer); - -// auto fetcher = std::make_shared(); -// auto indexer_p = std::make_shared(fetcher); -// auto& indexer = *indexer_p; - -// messaging::BufferProcessor host_bp("lfs_host"); -// messaging::BufferProcessor enclave_bp("lfs_enclave"); - -// constexpr size_t buf_size = 1 << 16; -// auto inbound_buffer = std::make_unique(buf_size); -// ringbuffer::Reader inbound_reader(inbound_buffer->bd); -// auto outbound_buffer = std::make_unique(buf_size); - -// ringbuffer::Reader outbound_reader(outbound_buffer->bd); -// asynchost::LFSFileHandler host_files( -// std::make_shared(inbound_reader)); -// host_files.register_message_handlers(host_bp.get_dispatcher()); - -// auto enclave_lfs = std::make_shared( -// std::make_shared(outbound_reader)); -// enclave_lfs->register_message_handlers(enclave_bp.get_dispatcher()); - -// ccfapp::AbstractNodeContext node_context; -// node_context.install_subsystem(enclave_lfs); - -// ccf::indexing::StrategyPtr index_a = nullptr; -// if constexpr (std::is_same_v) -// { -// index_a = std::make_shared(map_a, node_context, 100, 5); -// } -// else -// { -// index_a = std::make_shared(map_a); -// } - -// REQUIRE(indexer.install_strategy(index_a)); - -// auto index_b = std::make_shared(map_b); -// REQUIRE(indexer.install_strategy(index_b)); - -// auto ledger = add_raft_consensus(kv_store_p, indexer_p); - -// ledger_secrets->init(); -// { -// INFO("Store one recovery member"); -// // This is necessary to rekey the ledger and issue recovery shares for the -// // new ledger secret -// auto tx = kv_store.create_tx(); -// auto config = tx.rw(ccf::Tables::CONFIGURATION); -// constexpr size_t recovery_threshold = 1; -// config->put({recovery_threshold}); -// auto member_info = tx.rw(ccf::Tables::MEMBER_INFO); -// auto member_public_encryption_keys = tx.rw( -// ccf::Tables::MEMBER_ENCRYPTION_PUBLIC_KEYS); - -// auto kp = crypto::make_key_pair(); -// auto cert = kp->self_sign("CN=member", valid_from, valid_to); -// auto member_id = -// crypto::Sha256Hash(crypto::cert_pem_to_der(cert)).hex_str(); - -// member_info->put(member_id, {ccf::MemberStatus::ACTIVE}); -// member_public_encryption_keys->put( -// member_id, crypto::make_rsa_key_pair()->public_key_pem()); -// REQUIRE(tx.commit() == kv::CommitResult::SUCCESS); -// } - -// std::atomic finished = false; -// std::atomic writes_to_hello = 0; -// std::atomic writes_to_saluton = 0; -// std::atomic writes_to_42 = 0; - -// auto tx_advancer = [&]() { -// size_t i = 0; -// while (i < 1'000) -// { -// auto tx = kv_store.create_tx(); -// tx.wo(map_a)->put(fmt::format("hello"), fmt::format("Value {}", i)); -// ++writes_to_hello; -// if (i % 2 == 0) -// { -// ++writes_to_saluton; -// tx.wo(map_a)->put(fmt::format("saluton"), fmt::format("Value2 {}", i)); -// } -// if (i % 3 == 0) -// { -// ++writes_to_42; -// tx.wo(map_b)->put(42, i); -// } - -// REQUIRE(tx.commit() == kv::CommitResult::SUCCESS); -// ++i; -// } -// finished = true; -// }; - -// auto index_ticker = [&]() { -// while (!finished) -// { -// std::lock_guard guard(fetcher->lock); -// while (indexer.update_strategies(step_time, kv_store.current_txid()) || -// !fetcher->requested.empty()) -// { -// // Do the fetch, simulating an asynchronous fetch by the historical -// // query system -// for (auto seqno : fetcher->requested) -// { -// const auto entry = ledger->get_raw_entry_by_idx(seqno); -// REQUIRE(entry.has_value()); -// fetcher->fetched_stores[seqno] = fetcher->deserialise_transaction( -// seqno, entry->data(), entry->size()); -// } -// fetcher->requested.clear(); -// } -// } -// }; - -// auto fetch_index_a = [&]() { -// while (true) -// { -// std::optional hello; -// std::optional saluton; - -// auto idx_a = std::dynamic_pointer_cast(index_a); -// if constexpr (std::is_same_v) -// { -// auto get_all = [&](const std::string& key) { -// const auto max_range = idx_a->max_requestable_range(); -// const auto end_seqno = kv_store.get_txid().seqno; -// auto range_start = 0; - -// ccf::SeqNoCollection all_results; - -// auto next_end = [&]() { -// return std::min(end_seqno, range_start + max_range); -// }; - -// auto range_end = next_end(); - -// while (true) -// { -// auto results = -// idx_a->get_write_txs_in_range(key, range_start, range_end); - -// if (!results.has_value()) -// { -// // This required an async load from disk -// std::this_thread::sleep_for(std::chrono::milliseconds(50)); - -// results = -// idx_a->get_write_txs_in_range(key, range_start, range_end); -// REQUIRE(results.has_value()); -// } - -// for (auto seqno : *results) -// { -// all_results.insert(seqno); -// } - -// if (range_end == end_seqno) -// { -// return all_results; -// } -// else -// { -// range_start = range_end + 1; -// range_end = next_end(); -// } -// } -// }; - -// hello = get_all("hello"); -// saluton = get_all("saluton"); -// } -// else -// { -// hello = idx_a->get_all_write_txs("hello"); -// saluton = idx_a->get_all_write_txs("saluton"); -// } - -// if ( -// finished && hello.has_value() && hello->size() == writes_to_hello && -// saluton.has_value() && saluton->size() == writes_to_saluton) -// { -// break; -// } -// } -// }; - -// auto fetch_index_b = [&]() { -// while (true) -// { -// const auto forty_two = index_b->get_all_write_txs(42); - -// if ( -// finished && forty_two.has_value() && forty_two->size() == writes_to_42) -// { -// break; -// } -// } -// }; - -// std::vector threads; -// threads.emplace_back(tx_advancer); -// threads.emplace_back(index_ticker); -// threads.emplace_back(fetch_index_a); -// threads.emplace_back(fetch_index_a); -// threads.emplace_back(fetch_index_a); -// threads.emplace_back(fetch_index_b); -// threads.emplace_back(fetch_index_b); - -// std::atomic work_done = false; - -// std::thread ringbuffer_flusher([&]() { -// while (!work_done) -// { -// host_bp.read_all(outbound_reader); -// enclave_bp.read_all(inbound_reader); -// } -// }); - -// std::thread watchdog([&]() { -// using Clock = std::chrono::system_clock; -// const auto start_time = Clock::now(); - -// using namespace std::chrono_literals; -// const auto max_run_time = 10s; - -// while (!work_done) -// { -// const auto now = Clock::now(); -// REQUIRE(now - start_time < max_run_time); -// std::this_thread::sleep_for(50ms); -// } -// }); - -// for (auto& thread : threads) -// { -// thread.join(); -// } - -// work_done = true; -// ringbuffer_flusher.join(); -// watchdog.join(); -// } \ No newline at end of file +class MockTransactionFetcher : public ccf::indexing::TransactionFetcher +{ + std::shared_ptr encryptor; + +public: + aft::LedgerStubProxy* ledger; + + MockTransactionFetcher(const std::shared_ptr& e) : + encryptor(e) + {} + + kv::ReadOnlyStorePtr deserialise_transaction( + ccf::SeqNo seqno, const uint8_t* data, size_t size) override + { + auto store = std::make_shared( + false /* Do not start from very first seqno */, + true /* Make use of historical secrets */); + + store->set_encryptor(encryptor); + + bool public_only = false; + auto exec = + store->deserialize({data, data + size}, ConsensusType::CFT, public_only); + if (exec == nullptr) + { + return nullptr; + } + + auto result = exec->apply(); + if (result == kv::ApplyResult::FAIL) + { + return nullptr; + } + + return store; + } + + std::vector fetch_transactions( + const ccf::SeqNoCollection& seqnos) override + { + std::vector ret; + + for (const auto& seqno : seqnos) + { + const auto entry = ledger->get_raw_entry_by_idx(seqno); + if (!entry.has_value()) + { + fmt::print("Ledger still has no entry for {}\n", seqno); + return {}; + } + + ret.push_back( + deserialise_transaction(seqno, entry->data(), entry->size())); + } + + return ret; + } +}; + +TEST_CASE( + "multi-threaded indexing - bucketed" * doctest::test_suite("indexing")) +{ + auto kv_store_p = std::make_shared(); + auto& kv_store = *kv_store_p; + + auto ledger_secrets = std::make_shared(); + auto encryptor = std::make_shared(ledger_secrets); + kv_store.set_encryptor(encryptor); + + auto stub_writer = std::make_shared(); + auto cache = std::make_shared( + kv_store, ledger_secrets, stub_writer); + + auto fetcher = std::make_shared(encryptor); + auto indexer_p = std::make_shared(fetcher); + auto& indexer = *indexer_p; + + messaging::BufferProcessor host_bp("lfs_host"); + messaging::BufferProcessor enclave_bp("lfs_enclave"); + + constexpr size_t buf_size = 1 << 16; + auto inbound_buffer = std::make_unique(buf_size); + ringbuffer::Reader inbound_reader(inbound_buffer->bd); + auto outbound_buffer = std::make_unique(buf_size); + + ringbuffer::Reader outbound_reader(outbound_buffer->bd); + asynchost::LFSFileHandler host_files( + std::make_shared(inbound_reader)); + host_files.register_message_handlers(host_bp.get_dispatcher()); + + auto enclave_lfs = std::make_shared( + std::make_shared(outbound_reader)); + enclave_lfs->register_message_handlers(enclave_bp.get_dispatcher()); + + ccfapp::AbstractNodeContext node_context; + node_context.install_subsystem(enclave_lfs); + + using IndexA_Bucketed = + ccf::indexing::strategies::SeqnosByKey_Bucketed; + auto index_a = std::make_shared(map_a, node_context, 100, 5); + REQUIRE(indexer.install_strategy(index_a)); + + auto index_b = std::make_shared(map_b); + REQUIRE(indexer.install_strategy(index_b)); + + auto ledger = add_raft_consensus(kv_store_p, indexer_p); + fetcher->ledger = ledger; + + ledger_secrets->init(); + { + INFO("Store one recovery member"); + // This is necessary to rekey the ledger and issue recovery shares for the + // new ledger secret + auto tx = kv_store.create_tx(); + auto config = tx.rw(ccf::Tables::CONFIGURATION); + constexpr size_t recovery_threshold = 1; + config->put({recovery_threshold}); + auto member_info = tx.rw(ccf::Tables::MEMBER_INFO); + auto member_public_encryption_keys = tx.rw( + ccf::Tables::MEMBER_ENCRYPTION_PUBLIC_KEYS); + + auto kp = crypto::make_key_pair(); + auto cert = kp->self_sign("CN=member", valid_from, valid_to); + auto member_id = + crypto::Sha256Hash(crypto::cert_pem_to_der(cert)).hex_str(); + + member_info->put(member_id, {ccf::MemberStatus::ACTIVE}); + member_public_encryption_keys->put( + member_id, crypto::make_rsa_key_pair()->public_key_pem()); + REQUIRE(tx.commit() == kv::CommitResult::SUCCESS); + } + + std::atomic all_submitted = false; + std::atomic writes_to_hello = 0; + std::atomic writes_to_saluton = 0; + std::atomic writes_to_42 = 0; + + auto tx_advancer = [&]() { + size_t i = 0; + while (i < 1'000) + { + auto tx = kv_store.create_tx(); + tx.wo(map_a)->put(fmt::format("hello"), fmt::format("Value {}", i)); + ++writes_to_hello; + if (i % 2 == 0) + { + ++writes_to_saluton; + tx.wo(map_a)->put(fmt::format("saluton"), fmt::format("Value2 {}", i)); + } + if (i % 3 == 0) + { + ++writes_to_42; + tx.wo(map_b)->put(42, i); + } + + REQUIRE(tx.commit() == kv::CommitResult::SUCCESS); + ++i; + } + all_submitted = true; + }; + + auto get_all = + [&](const std::string& key) -> std::optional { + const auto max_range = index_a->max_requestable_range(); + auto range_start = 0; + + ccf::SeqNoCollection all_results; + + while (true) + { + const auto end_seqno = kv_store.get_txid().seqno; + const auto range_end = std::min(end_seqno, range_start + max_range); + + auto results = + index_a->get_write_txs_in_range(key, range_start, range_end); + + while (!results.has_value()) + { + std::this_thread::yield(); + fmt::print( + "Got empty result for {} between {} and {}, yielding\n", + key, + range_start, + range_end); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + + results = index_a->get_write_txs_in_range(key, range_start, range_end); + } + + for (auto seqno : *results) + { + all_results.insert(seqno); + } + + if (range_end == end_seqno) + { + return all_results; + } + else + { + range_start = range_end + 1; + } + } + }; + + auto fetch_index_a = [&]() { + while (true) + { + const auto hello = get_all("hello"); + const auto saluton = get_all("saluton"); + + if ( + all_submitted && hello.has_value() && + hello->size() == writes_to_hello && saluton.has_value() && + saluton->size() == writes_to_saluton) + { + break; + } + } + }; + + auto fetch_index_b = [&]() { + while (true) + { + const auto forty_two = index_b->get_all_write_txs(42); + + if ( + all_submitted && forty_two.has_value() && + forty_two->size() == writes_to_42) + { + break; + } + } + }; + + std::vector threads; + threads.emplace_back(tx_advancer); + threads.emplace_back(fetch_index_a); + threads.emplace_back(fetch_index_a); + threads.emplace_back(fetch_index_a); + threads.emplace_back(fetch_index_b); + threads.emplace_back(fetch_index_b); + + std::atomic work_done = false; + + std::thread ringbuffer_flusher([&]() { + while (!work_done) + { + host_bp.read_all(outbound_reader); + enclave_bp.read_all(inbound_reader); + } + }); + + std::thread index_ticker([&]() { + while (!work_done) + { + while (indexer.update_strategies(step_time, kv_store.current_txid())) + { + std::this_thread::yield(); + } + } + }); + + std::thread watchdog([&]() { + using Clock = std::chrono::system_clock; + const auto start_time = Clock::now(); + + using namespace std::chrono_literals; + const auto max_run_time = 5s; + + while (!work_done) + { + const auto now = Clock::now(); + REQUIRE(now - start_time < max_run_time); + std::this_thread::sleep_for(50ms); + } + }); + + for (auto& thread : threads) + { + thread.join(); + } + + work_done = true; + ringbuffer_flusher.join(); + index_ticker.join(); + watchdog.join(); +} \ No newline at end of file From 67900489a0e9e8483013b3ae81ef6459e0961910 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Wed, 27 Jul 2022 13:43:11 +0000 Subject: [PATCH 10/13] Break lock, remove debug logging --- src/indexing/test/indexing.cpp | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/src/indexing/test/indexing.cpp b/src/indexing/test/indexing.cpp index bcf98d9ccc81..9afdbd48258a 100644 --- a/src/indexing/test/indexing.cpp +++ b/src/indexing/test/indexing.cpp @@ -453,6 +453,9 @@ TEST_CASE_TEMPLATE( index_b); } +using namespace std::chrono_literals; +const auto max_multithread_run_time = 10s; + // Uses the real classes, and access + update them concurrently TEST_CASE( "multi-threaded indexing - in memory" * doctest::test_suite("indexing")) @@ -628,13 +631,10 @@ TEST_CASE( using Clock = std::chrono::system_clock; const auto start_time = Clock::now(); - using namespace std::chrono_literals; - const auto max_run_time = 10s; - while (!work_done) { const auto now = Clock::now(); - REQUIRE(now - start_time < max_run_time); + REQUIRE(now - start_time < max_multithread_run_time); std::this_thread::sleep_for(50ms); } }); @@ -695,7 +695,6 @@ class MockTransactionFetcher : public ccf::indexing::TransactionFetcher const auto entry = ledger->get_raw_entry_by_idx(seqno); if (!entry.has_value()) { - fmt::print("Ledger still has no entry for {}\n", seqno); return {}; } @@ -805,6 +804,7 @@ TEST_CASE( REQUIRE(tx.commit() == kv::CommitResult::SUCCESS); ++i; + std::this_thread::yield(); } all_submitted = true; }; @@ -824,15 +824,15 @@ TEST_CASE( auto results = index_a->get_write_txs_in_range(key, range_start, range_end); + std::chrono::milliseconds sleep_time(10); while (!results.has_value()) { - std::this_thread::yield(); - fmt::print( - "Got empty result for {} between {} and {}, yielding\n", - key, - range_start, - range_end); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + // May be contesting for limited cached buckets with other users of this + // index (no handle for unique claims). Back-off exponentially, with + // random variation, to break deadlock + std::this_thread::sleep_for(sleep_time); + + sleep_time += std::chrono::milliseconds(rand() % sleep_time.count()); results = index_a->get_write_txs_in_range(key, range_start, range_end); } @@ -866,6 +866,8 @@ TEST_CASE( { break; } + + std::this_thread::yield(); } }; @@ -880,6 +882,8 @@ TEST_CASE( { break; } + + std::this_thread::yield(); } }; @@ -898,6 +902,7 @@ TEST_CASE( { host_bp.read_all(outbound_reader); enclave_bp.read_all(inbound_reader); + std::this_thread::yield(); } }); @@ -915,13 +920,10 @@ TEST_CASE( using Clock = std::chrono::system_clock; const auto start_time = Clock::now(); - using namespace std::chrono_literals; - const auto max_run_time = 5s; - while (!work_done) { const auto now = Clock::now(); - REQUIRE(now - start_time < max_run_time); + REQUIRE(now - start_time < max_multithread_run_time); std::this_thread::sleep_for(50ms); } }); From db3e507d56d1421e975ead3b92e122f4ffc988b4 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Wed, 27 Jul 2022 14:05:32 +0000 Subject: [PATCH 11/13] Trigger threading canary --- .threading_canary | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.threading_canary b/.threading_canary index 580ef7df91bc..6607897a73a1 100644 --- a/.threading_canary +++ b/.threading_canary @@ -1 +1 @@ -This looks like a 'job' for Threading Canary! +This looks like a "job" for Threading Canary! From 5a3031bf0c7291b0d808fb307fb9003b1df3d19b Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Wed, 27 Jul 2022 14:10:54 +0000 Subject: [PATCH 12/13] Missed a file --- src/indexing/enclave_lfs_access.h | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/indexing/enclave_lfs_access.h b/src/indexing/enclave_lfs_access.h index e280dd92a185..20103725297e 100644 --- a/src/indexing/enclave_lfs_access.h +++ b/src/indexing/enclave_lfs_access.h @@ -6,6 +6,7 @@ #include "ccf/crypto/sha256.h" #include "ccf/crypto/symmetric_key.h" #include "ccf/ds/hex.h" +#include "ccf/ds/pal.h" #include "ds/messaging.h" #include "indexing/lfs_interface.h" #include "indexing/lfs_ringbuffer_types.h" @@ -137,7 +138,7 @@ namespace ccf::indexing dispatcher, LFSMsg::response, [this](const uint8_t* data, size_t size) { auto [obfuscated, encrypted] = ringbuffer::read_message(data, size); - std::lock_guard lock(pending_access); + std::lock_guard guard(pending_access); auto it = pending.find(obfuscated); if (it != pending.end()) { @@ -195,7 +196,7 @@ namespace ccf::indexing [this](const uint8_t* data, size_t size) { auto [obfuscated] = ringbuffer::read_message(data, size); - std::lock_guard lock(pending_access); + std::lock_guard guard(pending_access); auto it = pending.find(obfuscated); if (it != pending.end()) { @@ -261,8 +262,7 @@ namespace ccf::indexing FetchResultPtr fetch(const LFSKey& key) override { const auto obfuscated = obfuscate_key(key); - - std::lock_guard lock(pending_access); + std::lock_guard guard(pending_access); auto it = pending.find(obfuscated); FetchResultPtr result; From ca4d5658006967e123e3dec9f3928ac036e3ef84 Mon Sep 17 00:00:00 2001 From: Eddy Ashton Date: Thu, 28 Jul 2022 08:36:14 +0000 Subject: [PATCH 13/13] Reduce iterations in debug builds --- src/indexing/test/indexing.cpp | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/indexing/test/indexing.cpp b/src/indexing/test/indexing.cpp index 9afdbd48258a..67940a790805 100644 --- a/src/indexing/test/indexing.cpp +++ b/src/indexing/test/indexing.cpp @@ -786,7 +786,14 @@ TEST_CASE( auto tx_advancer = [&]() { size_t i = 0; - while (i < 1'000) + constexpr auto tx_count = +#if NDEBUG + 1'000; +#else + 100; +#endif + + while (i < tx_count) { auto tx = kv_store.create_tx(); tx.wo(map_a)->put(fmt::format("hello"), fmt::format("Value {}", i));