Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions src/init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,19 @@ void PrepareShutdown()
fFeeEstimatesInitialized = false;
}

// FlushStateToDisk generates a SetBestChain callback, which we should avoid missing
FlushStateToDisk();

// After there are no more peers/RPC left to give us new data which may generate
// CValidationInterface callbacks, flush them...
GetMainSignals().FlushBackgroundCallbacks();

// Any future callbacks will be dropped. This should absolutely be safe - if
// missing a callback results in an unrecoverable situation, unclean shutdown
// would too. The only reason to do the above flushes is to let the wallet catch
// up with our current chain to avoid any strange pruning edge cases and make
// next startup faster by avoiding rescan.

{
LOCK(cs_main);
if (pcoinsTip != NULL) {
Expand Down Expand Up @@ -301,6 +314,7 @@ void PrepareShutdown()

// Disconnect all slots
UnregisterAllValidationInterfaces();
GetMainSignals().UnregisterBackgroundSignalScheduler();

#ifndef WIN32
try {
Expand Down Expand Up @@ -1243,6 +1257,8 @@ bool AppInitMain()
CScheduler::Function serviceLoop = std::bind(&CScheduler::serviceQueue, &scheduler);
threadGroup.create_thread(std::bind(&TraceThread<CScheduler::Function>, "scheduler", serviceLoop));

GetMainSignals().RegisterBackgroundSignalScheduler(scheduler);

// Initialize Sapling circuit parameters
LoadSaplingParams();

Expand Down
66 changes: 66 additions & 0 deletions src/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,69 @@ size_t CScheduler::getQueueInfo(boost::chrono::system_clock::time_point &first,
}
return result;
}

bool CScheduler::AreThreadsServicingQueue() const {
boost::unique_lock<boost::mutex> lock(newTaskMutex);
return nThreadsServicingQueue;
}

void SingleThreadedSchedulerClient::MaybeScheduleProcessQueue() {
{
LOCK(m_cs_callbacks_pending);
// Try to avoid scheduling too many copies here, but if we
// accidentally have two ProcessQueue's scheduled at once its
// not a big deal.
if (m_are_callbacks_running) return;
if (m_callbacks_pending.empty()) return;
}
m_pscheduler->schedule(std::bind(&SingleThreadedSchedulerClient::ProcessQueue, this));
}

void SingleThreadedSchedulerClient::ProcessQueue() {
std::function<void (void)> callback;
{
LOCK(m_cs_callbacks_pending);
if (m_are_callbacks_running) return;
if (m_callbacks_pending.empty()) return;
m_are_callbacks_running = true;

callback = std::move(m_callbacks_pending.front());
m_callbacks_pending.pop_front();
}

// RAII the setting of fCallbacksRunning and calling MaybeScheduleProcessQueue
// to ensure both happen safely even if callback() throws.
struct RAIICallbacksRunning {
SingleThreadedSchedulerClient* instance;
explicit RAIICallbacksRunning(SingleThreadedSchedulerClient* _instance) : instance(_instance) {}
~RAIICallbacksRunning() {
{
LOCK(instance->m_cs_callbacks_pending);
instance->m_are_callbacks_running = false;
}
instance->MaybeScheduleProcessQueue();
}
} raiicallbacksrunning(this);

callback();
}

void SingleThreadedSchedulerClient::AddToProcessQueue(std::function<void (void)> func) {
assert(m_pscheduler);

{
LOCK(m_cs_callbacks_pending);
m_callbacks_pending.emplace_back(std::move(func));
}
MaybeScheduleProcessQueue();
}

void SingleThreadedSchedulerClient::EmptyQueue() {
assert(!m_pscheduler->AreThreadsServicingQueue());
bool should_continue = true;
while (should_continue) {
ProcessQueue();
LOCK(m_cs_callbacks_pending);
should_continue = !m_callbacks_pending.empty();
}
}
35 changes: 33 additions & 2 deletions src/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#include <boost/thread.hpp>
#include <map>

#include "sync.h"

//
// Simple class for background tasks that should be run
// periodically or once "after a while"
Expand Down Expand Up @@ -42,7 +44,7 @@ class CScheduler
typedef std::function<void(void)> Function;

// Call func at/after time t
void schedule(Function f, boost::chrono::system_clock::time_point t);
void schedule(Function f, boost::chrono::system_clock::time_point t=boost::chrono::system_clock::now());

// Convenience method: call f once deltaMilliSeconds from now
void scheduleFromNow(Function f, int64_t deltaMilliSeconds);
Expand Down Expand Up @@ -70,14 +72,43 @@ class CScheduler
size_t getQueueInfo(boost::chrono::system_clock::time_point &first,
boost::chrono::system_clock::time_point &last) const;

// Returns true if there are threads actively running in serviceQueue()
bool AreThreadsServicingQueue() const;

private:
std::multimap<boost::chrono::system_clock::time_point, Function> taskQueue;
boost::condition_variable newTaskScheduled;
mutable boost::mutex newTaskMutex;
int nThreadsServicingQueue;
bool stopRequested;
bool stopWhenEmpty;
bool shouldStop() { return stopRequested || (stopWhenEmpty && taskQueue.empty()); }
bool shouldStop() const { return stopRequested || (stopWhenEmpty && taskQueue.empty()); }
};

/**
* Class used by CScheduler clients which may schedule multiple jobs
* which are required to be run serially. Does not require such jobs
* to be executed on the same thread, but no two jobs will be executed
* at the same time.
*/
class SingleThreadedSchedulerClient {
private:
CScheduler *m_pscheduler;

RecursiveMutex m_cs_callbacks_pending;
std::list<std::function<void (void)>> m_callbacks_pending;
bool m_are_callbacks_running = false;

void MaybeScheduleProcessQueue();
void ProcessQueue();

public:
explicit SingleThreadedSchedulerClient(CScheduler *pschedulerIn) : m_pscheduler(pschedulerIn) {}
void AddToProcessQueue(std::function<void (void)> func);

// Processes all remaining queue members on the calling thread, blocking until queue is empty
// Must be called after the CScheduler has no remaining processing threads!
void EmptyQueue();
};

#endif
8 changes: 8 additions & 0 deletions src/test/test_pivx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ TestingSetup::TestingSetup()
pathTemp = GetTempPath() / strprintf("test_pivx_%lu_%i", (unsigned long)GetTime(), (int)(InsecureRandRange(100000)));
fs::create_directories(pathTemp);
gArgs.ForceSetArg("-datadir", pathTemp.string());

// Note that because we don't bother running a scheduler thread here,
// callbacks via CValidationInterface are unreliable, but that's OK,
// our unit tests aren't testing multiple parts of the code at once.
GetMainSignals().RegisterBackgroundSignalScheduler(scheduler);

// Ideally we'd move all the RPC tests to the functional testing framework
// instead of unit tests, but for now we need these here.
RegisterAllCoreRPCCommands(tableRPC);
Expand Down Expand Up @@ -80,6 +86,8 @@ TestingSetup::~TestingSetup()
UnregisterNodeSignals(GetNodeSignals());
threadGroup.interrupt_all();
threadGroup.join_all();
GetMainSignals().FlushBackgroundCallbacks();
GetMainSignals().UnregisterBackgroundSignalScheduler();
UnloadBlockIndex();
delete pcoinsTip;
delete pcoinsdbview;
Expand Down
2 changes: 2 additions & 0 deletions src/test/test_pivx.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#define PIVX_TEST_TEST_PIVX_H

#include "fs.h"
#include "scheduler.h"
#include "txdb.h"

#include <boost/thread.hpp>
Expand Down Expand Up @@ -48,6 +49,7 @@ struct TestingSetup: public BasicTestingSetup {
fs::path pathTemp;
boost::thread_group threadGroup;
CConnman* connman;
CScheduler scheduler;
ECCVerifyHandle globalVerifyHandle;

TestingSetup();
Expand Down
10 changes: 4 additions & 6 deletions src/validation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1388,7 +1388,10 @@ static int64_t nTimeIndex = 0;
static int64_t nTimeCallbacks = 0;
static int64_t nTimeTotal = 0;

bool ConnectBlock(const CBlock& block, CValidationState& state, CBlockIndex* pindex, CCoinsViewCache& view, bool fJustCheck, bool fAlreadyChecked)
/** Apply the effects of this block (with given index) on the UTXO set represented by coins.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit

Suggested change
/** Apply the effects of this block (with given index) on the UTXO set represented by coins.
/** Apply the effects of this block (with given index) on the UTXO set represented by view.

* Validity checks that depend on the UTXO set are also done; ConnectBlock()
* can fail if those validity checks fail (among other reasons). */
static bool ConnectBlock(const CBlock& block, CValidationState& state, CBlockIndex* pindex, CCoinsViewCache& view, bool fJustCheck = false, bool fAlreadyChecked = false)
{
AssertLockHeld(cs_main);
// Check it again in case a previous version let a bad block in
Expand Down Expand Up @@ -1704,11 +1707,6 @@ bool ConnectBlock(const CBlock& block, CValidationState& state, CBlockIndex* pin
nTimeIndex += nTime3 - nTime2;
LogPrint(BCLog::BENCH, " - Index writing: %.2fms [%.2fs]\n", 0.001 * (nTime3 - nTime2), nTimeIndex * 0.000001);

// Watch for changes to the previous coinbase transaction.
static uint256 hashPrevBestCoinBase;
GetMainSignals().UpdatedTransaction(hashPrevBestCoinBase);
hashPrevBestCoinBase = block.vtx[0]->GetHash();

int64_t nTime4 = GetTimeMicros();
nTimeCallbacks += nTime4 - nTime3;
LogPrint(BCLog::BENCH, " - Callbacks: %.2fms [%.2fs]\n", 0.001 * (nTime4 - nTime3), nTimeCallbacks * 0.000001);
Expand Down
3 changes: 0 additions & 3 deletions src/validation.h
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,6 @@ bool ReadBlockFromDisk(CBlock& block, const CBlockIndex* pindex);

/** Functions for validating blocks and updating the block tree */

/** Apply the effects of this block (with given index) on the UTXO set represented by coins */
bool ConnectBlock(const CBlock& block, CValidationState& state, CBlockIndex* pindex, CCoinsViewCache& coins, bool fJustCheck, bool fAlreadyChecked = false);

/** Context-independent validity checks */
bool CheckBlock(const CBlock& block, CValidationState& state, bool fCheckPOW = true, bool fCheckMerkleRoot = true, bool fCheckSig = true);
bool CheckWork(const CBlock& block, const CBlockIndex* const pindexPrev);
Expand Down
40 changes: 22 additions & 18 deletions src/validationinterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
// file COPYING or http://www.opensource.org/licenses/mit-license.php.

#include "validationinterface.h"
#include "scheduler.h"

#include <list>
#include <unordered_map>
#include <boost/signals2/signal.hpp>

Expand All @@ -14,8 +16,6 @@ struct ValidationInterfaceConnections {
boost::signals2::scoped_connection TransactionAddedToMempool;
boost::signals2::scoped_connection BlockConnected;
boost::signals2::scoped_connection BlockDisconnected;
boost::signals2::scoped_connection NotifyTransactionLock;
boost::signals2::scoped_connection UpdatedTransaction;
boost::signals2::scoped_connection SetBestChain;
boost::signals2::scoped_connection Broadcast;
boost::signals2::scoped_connection BlockChecked;
Expand All @@ -34,10 +34,6 @@ struct MainSignalsInstance {
boost::signals2::signal<void (const std::shared_ptr<const CBlock> &, const CBlockIndex *pindex, const std::vector<CTransactionRef> &)> BlockConnected;
/** Notifies listeners of a block being disconnected */
boost::signals2::signal<void (const std::shared_ptr<const CBlock> &, int nBlockHeight)> BlockDisconnected;
/** Notifies listeners of an updated transaction lock without new data. */
boost::signals2::signal<void (const CTransaction &)> NotifyTransactionLock;
/** Notifies listeners of an updated transaction without new data (for now: a coinbase potentially becoming visible). */
boost::signals2::signal<bool (const uint256 &)> UpdatedTransaction;
/** Notifies listeners of a new active block chain. */
boost::signals2::signal<void (const CBlockLocator &)> SetBestChain;
/** Tells listeners to broadcast their data. */
Expand All @@ -46,12 +42,30 @@ struct MainSignalsInstance {
boost::signals2::signal<void (const CBlock&, const CValidationState&)> BlockChecked;

std::unordered_map<CValidationInterface*, ValidationInterfaceConnections> m_connMainSignals;

// We are not allowed to assume the scheduler only runs in one thread,
// but must ensure all callbacks happen in-order, so we end up creating
// our own queue here :(
SingleThreadedSchedulerClient m_schedulerClient;

explicit MainSignalsInstance(CScheduler *pscheduler) : m_schedulerClient(pscheduler) {}
};

static CMainSignals g_signals;

CMainSignals::CMainSignals() {
m_internals.reset(new MainSignalsInstance());
void CMainSignals::RegisterBackgroundSignalScheduler(CScheduler& scheduler) {
assert(!m_internals);
m_internals.reset(new MainSignalsInstance(&scheduler));
}

void CMainSignals::UnregisterBackgroundSignalScheduler() {
m_internals.reset(nullptr);
}

void CMainSignals::FlushBackgroundCallbacks() {
if (m_internals) {
m_internals->m_schedulerClient.EmptyQueue();
}
}

CMainSignals& GetMainSignals()
Expand All @@ -66,8 +80,6 @@ void RegisterValidationInterface(CValidationInterface* pwalletIn)
conns.TransactionAddedToMempool = g_signals.m_internals->TransactionAddedToMempool.connect(std::bind(&CValidationInterface::TransactionAddedToMempool, pwalletIn, std::placeholders::_1));
conns.BlockConnected = g_signals.m_internals->BlockConnected.connect(std::bind(&CValidationInterface::BlockConnected, pwalletIn, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
conns.BlockDisconnected = g_signals.m_internals->BlockDisconnected.connect(std::bind(&CValidationInterface::BlockDisconnected, pwalletIn, std::placeholders::_1, std::placeholders::_2));
conns.NotifyTransactionLock = g_signals.m_internals->NotifyTransactionLock.connect(std::bind(&CValidationInterface::NotifyTransactionLock, pwalletIn, std::placeholders::_1));
conns.UpdatedTransaction = g_signals.m_internals->UpdatedTransaction.connect(std::bind(&CValidationInterface::UpdatedTransaction, pwalletIn, std::placeholders::_1));
conns.SetBestChain = g_signals.m_internals->SetBestChain.connect(std::bind(&CValidationInterface::SetBestChain, pwalletIn, std::placeholders::_1));
conns.Broadcast = g_signals.m_internals->Broadcast.connect(std::bind(&CValidationInterface::ResendWalletTransactions, pwalletIn, std::placeholders::_1));
conns.BlockChecked = g_signals.m_internals->BlockChecked.connect(std::bind(&CValidationInterface::BlockChecked, pwalletIn, std::placeholders::_1, std::placeholders::_2));
Expand Down Expand Up @@ -104,14 +116,6 @@ void CMainSignals::BlockDisconnected(const std::shared_ptr<const CBlock> &block,
m_internals->BlockDisconnected(block, nBlockHeight);
}

void CMainSignals::NotifyTransactionLock(const CTransaction& tx) {
m_internals->NotifyTransactionLock(tx);
}

void CMainSignals::UpdatedTransaction(const uint256& hash) {
m_internals->UpdatedTransaction(hash);
}

void CMainSignals::SetBestChain(const CBlockLocator& locator) {
m_internals->SetBestChain(locator);
}
Expand Down
15 changes: 10 additions & 5 deletions src/validationinterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class CConnman;
class CValidationInterface;
class CValidationState;
class uint256;
class CScheduler;

// These functions dispatch to one or all registered wallets

Expand All @@ -29,16 +30,16 @@ void UnregisterValidationInterface(CValidationInterface* pwalletIn);
void UnregisterAllValidationInterfaces();

class CValidationInterface {
public:
virtual ~CValidationInterface() = default;
protected:
/** Notifies listeners of updated block chain tip */
virtual void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) {}
virtual void TransactionAddedToMempool(const CTransactionRef &ptxn) {}
virtual void BlockConnected(const std::shared_ptr<const CBlock> &block, const CBlockIndex *pindex, const std::vector<CTransactionRef> &txnConflicted) {}
virtual void BlockDisconnected(const std::shared_ptr<const CBlock> &block, int nBlockHeight) {}
virtual void NotifyTransactionLock(const CTransaction &tx) {}
/** Notifies listeners of the new active block chain on-disk. */
virtual void SetBestChain(const CBlockLocator &locator) {}
virtual bool UpdatedTransaction(const uint256 &hash) { return false;}
/** Tells listeners to broadcast their data. */
virtual void ResendWalletTransactions(CConnman* connman) {}
virtual void BlockChecked(const CBlock&, const CValidationState&) {}
Expand All @@ -55,15 +56,19 @@ class CMainSignals {
friend void ::RegisterValidationInterface(CValidationInterface*);
friend void ::UnregisterValidationInterface(CValidationInterface*);
friend void ::UnregisterAllValidationInterfaces();

public:
CMainSignals();
/** Register a CScheduler to give callbacks which should run in the background (may only be called once) */
void RegisterBackgroundSignalScheduler(CScheduler& scheduler);
/** Unregister a CScheduler to give callbacks which should run in the background - these callbacks will now be dropped! */
void UnregisterBackgroundSignalScheduler();
/** Call any remaining callbacks on the calling thread */
void FlushBackgroundCallbacks();

void UpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload);
void TransactionAddedToMempool(const CTransactionRef &ptxn);
void BlockConnected(const std::shared_ptr<const CBlock> &block, const CBlockIndex *pindex, const std::vector<CTransactionRef> &txnConflicted);
void BlockDisconnected(const std::shared_ptr<const CBlock> &block, int nBlockHeight);
void NotifyTransactionLock(const CTransaction&);
void UpdatedTransaction(const uint256 &);
void SetBestChain(const CBlockLocator &);
void Broadcast(CConnman* connman);
void BlockChecked(const CBlock&, const CValidationState&);
Expand Down
Loading