Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .threading_canary
Original file line number Diff line number Diff line change
@@ -1 +1 @@
This looks like a 'job' for Threading Canary!
This looks like a "job" for Threading Canary!
4 changes: 4 additions & 0 deletions include/ccf/indexing/strategies/seqnos_by_key_in_memory.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// Licensed under the Apache 2.0 License.
#pragma once

#include "ccf/ds/pal.h"
#include "ccf/indexing/strategies/visit_each_entry_in_map.h"
#include "ccf/seq_no_collection.h"

Expand All @@ -15,6 +16,9 @@ namespace ccf::indexing::strategies
// Value is every SeqNo which talks about that key.
std::unordered_map<ccf::ByteVector, SeqNoCollection> seqnos_by_key;

// Mutex guarding access to seqnos_by_key
ccf::Pal::Mutex lock;

void visit_entry(
const ccf::TxID& tx_id,
const ccf::ByteVector& k,
Expand Down
6 changes: 6 additions & 0 deletions src/consensus/aft/test/logging_stub.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ namespace aft
protected:
ccf::NodeId _id;

std::mutex ledger_access;

public:
std::vector<std::vector<uint8_t>> ledger;
uint64_t skip_count = 0;
Expand All @@ -31,6 +33,8 @@ namespace aft
kv::Term term,
kv::Version index)
{
std::lock_guard<std::mutex> lock(ledger_access);

// The payload that we eventually deserialise must include the
// ledger entry as well as the View and Index that identify it. In
// the real entries, they are nested in the payload and the IV. For
Expand Down Expand Up @@ -73,6 +77,8 @@ namespace aft

std::optional<std::vector<uint8_t>> get_entry_by_idx(size_t idx)
{
std::lock_guard<std::mutex> lock(ledger_access);

// Ledger indices are 1-based, hence the -1
if (idx > 0 && idx <= ledger.size())
{
Expand Down
5 changes: 5 additions & 0 deletions src/indexing/enclave_lfs_access.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "ccf/crypto/sha256.h"
#include "ccf/crypto/symmetric_key.h"
#include "ccf/ds/hex.h"
#include "ccf/ds/pal.h"
#include "ds/messaging.h"
#include "indexing/lfs_interface.h"
#include "indexing/lfs_ringbuffer_types.h"
Expand Down Expand Up @@ -86,6 +87,7 @@ namespace ccf::indexing
using PendingResult = std::weak_ptr<FetchResult>;

std::unordered_map<LFSKey, PendingResult> pending;
ccf::Pal::Mutex pending_access;

ringbuffer::WriterPtr to_host;

Expand Down Expand Up @@ -136,6 +138,7 @@ namespace ccf::indexing
dispatcher, LFSMsg::response, [this](const uint8_t* data, size_t size) {
auto [obfuscated, encrypted] =
ringbuffer::read_message<LFSMsg::response>(data, size);
std::lock_guard<ccf::Pal::Mutex> guard(pending_access);
auto it = pending.find(obfuscated);
if (it != pending.end())
{
Expand Down Expand Up @@ -193,6 +196,7 @@ namespace ccf::indexing
[this](const uint8_t* data, size_t size) {
auto [obfuscated] =
ringbuffer::read_message<LFSMsg::not_found>(data, size);
std::lock_guard<ccf::Pal::Mutex> guard(pending_access);
auto it = pending.find(obfuscated);
if (it != pending.end())
{
Expand Down Expand Up @@ -258,6 +262,7 @@ namespace ccf::indexing
FetchResultPtr fetch(const LFSKey& key) override
{
const auto obfuscated = obfuscate_key(key);
std::lock_guard<ccf::Pal::Mutex> guard(pending_access);
auto it = pending.find(obfuscated);

FetchResultPtr result;
Expand Down
7 changes: 7 additions & 0 deletions src/indexing/strategies/seqnos_by_key_bucketed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "ccf/ds/hex.h"
#include "ccf/ds/logger.h"
#include "ccf/ds/pal.h"
#include "ds/lru.h"
#include "ds/serialized.h"
#include "indexing/lfs_interface.h"
Expand Down Expand Up @@ -32,6 +33,8 @@ namespace ccf::indexing::strategies
using BucketValue = std::pair<FetchResultPtr, SeqNoCollection>;
LRU<BucketKey, BucketValue> old_results;

ccf::Pal::Mutex results_access;

std::string name;

std::shared_ptr<AbstractLFSAccess> lfs_access;
Expand Down Expand Up @@ -206,6 +209,8 @@ namespace ccf::indexing::strategies

while (true)
{
std::lock_guard<ccf::Pal::Mutex> guard(results_access);

const auto bucket_key = std::make_pair(serialised_key, from_range);
const auto old_it = old_results.find(bucket_key);
if (old_it != old_results.end())
Expand Down Expand Up @@ -350,6 +355,8 @@ namespace ccf::indexing::strategies
{
const auto range = impl->get_range_for(tx_id.seqno);

std::lock_guard<ccf::Pal::Mutex> guard(impl->results_access);

auto it = impl->current_results.find(k);
if (it != impl->current_results.end())
{
Expand Down
2 changes: 2 additions & 0 deletions src/indexing/strategies/seqnos_by_key_in_memory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ namespace ccf::indexing::strategies
void SeqnosByKey_InMemory_Untyped::visit_entry(
const ccf::TxID& tx_id, const ccf::ByteVector& k, const ccf::ByteVector& v)
{
std::lock_guard<ccf::Pal::Mutex> guard(lock);
seqnos_by_key[k].insert(tx_id.seqno);
}

Expand All @@ -18,6 +19,7 @@ namespace ccf::indexing::strategies
ccf::SeqNo to,
std::optional<size_t> max_seqnos)
{
std::lock_guard<ccf::Pal::Mutex> guard(lock);
const auto it = seqnos_by_key.find(serialised_key);
if (it != seqnos_by_key.end())
{
Expand Down
Loading