Skip to content
2 changes: 1 addition & 1 deletion .threading_canary
Original file line number Diff line number Diff line change
@@ -1 +1 @@
This looks like a 'job' for Threading Canary!
This looks like a "job" for Threading Canary!
4 changes: 4 additions & 0 deletions include/ccf/indexing/strategies/seqnos_by_key_in_memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the Apache 2.0 License.
#pragma once

#include "ccf/ds/mutex.h"
#include "ccf/indexing/strategies/visit_each_entry_in_map.h"
#include "ccf/seq_no_collection.h"

Expand All @@ -15,6 +16,9 @@ namespace ccf::indexing::strategies
// Value is every SeqNo which talks about that key.
std::unordered_map<ccf::ByteVector, SeqNoCollection> seqnos_by_key;

// Mutex guarding access to seqnos_by_key
ccf::Mutex lock;

void visit_entry(
const ccf::TxID& tx_id,
const ccf::ByteVector& k,
Expand Down
6 changes: 6 additions & 0 deletions src/consensus/aft/test/logging_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ namespace aft
protected:
ccf::NodeId _id;

std::mutex ledger_access;

public:
std::vector<std::vector<uint8_t>> ledger;
uint64_t skip_count = 0;
Expand All @@ -31,6 +33,8 @@ namespace aft
kv::Term term,
kv::Version index)
{
std::lock_guard<std::mutex> lock(ledger_access);

// The payload that we eventually deserialise must include the
// ledger entry as well as the View and Index that identify it. In
// the real entries, they are nested in the payload and the IV. For
Expand Down Expand Up @@ -73,6 +77,8 @@ namespace aft

std::optional<std::vector<uint8_t>> get_entry_by_idx(size_t idx)
{
std::lock_guard<std::mutex> lock(ledger_access);

// Ledger indices are 1-based, hence the -1
if (idx > 0 && idx <= ledger.size())
{
Expand Down
5 changes: 5 additions & 0 deletions src/indexing/enclave_lfs_access.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "ccf/crypto/sha256.h"
#include "ccf/crypto/symmetric_key.h"
#include "ccf/ds/hex.h"
#include "ccf/ds/mutex.h"
#include "ds/messaging.h"
#include "indexing/lfs_interface.h"
#include "indexing/lfs_ringbuffer_types.h"
Expand Down Expand Up @@ -86,6 +87,7 @@ namespace ccf::indexing
using PendingResult = std::weak_ptr<FetchResult>;

std::unordered_map<LFSKey, PendingResult> pending;
ccf::Mutex pending_access;

ringbuffer::WriterPtr to_host;

Expand Down Expand Up @@ -136,6 +138,7 @@ namespace ccf::indexing
dispatcher, LFSMsg::response, [this](const uint8_t* data, size_t size) {
auto [obfuscated, encrypted] =
ringbuffer::read_message<LFSMsg::response>(data, size);
std::lock_guard<ccf::Mutex> guard(pending_access);
auto it = pending.find(obfuscated);
if (it != pending.end())
{
Expand Down Expand Up @@ -193,6 +196,7 @@ namespace ccf::indexing
[this](const uint8_t* data, size_t size) {
auto [obfuscated] =
ringbuffer::read_message<LFSMsg::not_found>(data, size);
std::lock_guard<ccf::Mutex> guard(pending_access);
auto it = pending.find(obfuscated);
if (it != pending.end())
{
Expand Down Expand Up @@ -258,6 +262,7 @@ namespace ccf::indexing
FetchResultPtr fetch(const LFSKey& key) override
{
const auto obfuscated = obfuscate_key(key);
std::lock_guard<ccf::Mutex> guard(pending_access);
auto it = pending.find(obfuscated);

FetchResultPtr result;
Expand Down
7 changes: 7 additions & 0 deletions src/indexing/strategies/seqnos_by_key_bucketed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "ccf/ds/hex.h"
#include "ccf/ds/logger.h"
#include "ccf/ds/mutex.h"
#include "ds/lru.h"
#include "ds/serialized.h"
#include "indexing/lfs_interface.h"
Expand Down Expand Up @@ -32,6 +33,8 @@ namespace ccf::indexing::strategies
using BucketValue = std::pair<FetchResultPtr, SeqNoCollection>;
LRU<BucketKey, BucketValue> old_results;

ccf::Mutex results_access;

std::string name;

std::shared_ptr<AbstractLFSAccess> lfs_access;
Expand Down Expand Up @@ -206,6 +209,8 @@ namespace ccf::indexing::strategies

while (true)
{
std::lock_guard<ccf::Mutex> guard(results_access);

const auto bucket_key = std::make_pair(serialised_key, from_range);
const auto old_it = old_results.find(bucket_key);
if (old_it != old_results.end())
Expand Down Expand Up @@ -350,6 +355,8 @@ namespace ccf::indexing::strategies
{
const auto range = impl->get_range_for(tx_id.seqno);

std::lock_guard<ccf::Mutex> guard(impl->results_access);

auto it = impl->current_results.find(k);
if (it != impl->current_results.end())
{
Expand Down
2 changes: 2 additions & 0 deletions src/indexing/strategies/seqnos_by_key_in_memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace ccf::indexing::strategies
void SeqnosByKey_InMemory_Untyped::visit_entry(
const ccf::TxID& tx_id, const ccf::ByteVector& k, const ccf::ByteVector& v)
{
std::lock_guard<ccf::Mutex> guard(lock);
seqnos_by_key[k].insert(tx_id.seqno);
}

Expand All @@ -18,6 +19,7 @@ namespace ccf::indexing::strategies
ccf::SeqNo to,
std::optional<size_t> max_seqnos)
{
std::lock_guard<ccf::Mutex> guard(lock);
const auto it = seqnos_by_key.find(serialised_key);
if (it != seqnos_by_key.end())
{
Expand Down
Loading